dvovk/snapidx (#9049)

This commit is contained in:
Dmytro 2023-12-22 12:25:55 +01:00 committed by GitHub
parent a3a61701e2
commit a36071e7ff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 110 additions and 16 deletions

View File

@ -27,6 +27,7 @@ func (d *DiagnosticClient) Setup() {
d.runSnapshotListener() d.runSnapshotListener()
d.runSegmentDownloadingListener() d.runSegmentDownloadingListener()
d.runSegmentIndexingListener() d.runSegmentIndexingListener()
d.runSegmentIndexingFinishedListener()
} }
func (d *DiagnosticClient) runSnapshotListener() { func (d *DiagnosticClient) runSnapshotListener() {
@ -107,8 +108,68 @@ func (d *DiagnosticClient) runSegmentIndexingListener() {
cancel() cancel()
return return
case info := <-ch: case info := <-ch:
d.snapshotDownload.SegmentIndexing = info d.addOrUpdateSegmentIndexingState(info)
} }
} }
}() }()
} }
func (d *DiagnosticClient) runSegmentIndexingFinishedListener() {
go func() {
ctx, ch, cancel := diaglib.Context[diaglib.SnapshotSegmentIndexingFinishedUpdate](context.Background(), 1)
defer cancel()
rootCtx, _ := common.RootContext()
diaglib.StartProviders(ctx, diaglib.TypeOf(diaglib.SnapshotSegmentIndexingFinishedUpdate{}), log.Root())
for {
select {
case <-rootCtx.Done():
cancel()
return
case info := <-ch:
found := false
for i := range d.snapshotDownload.SegmentIndexing.Segments {
if d.snapshotDownload.SegmentIndexing.Segments[i].SegmentName == info.SegmentName {
found = true
d.snapshotDownload.SegmentIndexing.Segments[i].Percent = 100
}
}
if !found {
d.snapshotDownload.SegmentIndexing.Segments = append(d.snapshotDownload.SegmentIndexing.Segments, diaglib.SnapshotSegmentIndexingStatistics{
SegmentName: info.SegmentName,
Percent: 100,
Alloc: 0,
Sys: 0,
})
}
}
}
}()
}
func (d *DiagnosticClient) addOrUpdateSegmentIndexingState(upd diaglib.SnapshotIndexingStatistics) {
if d.snapshotDownload.SegmentIndexing.Segments == nil {
d.snapshotDownload.SegmentIndexing.Segments = []diaglib.SnapshotSegmentIndexingStatistics{}
}
for i := range upd.Segments {
found := false
for j := range d.snapshotDownload.SegmentIndexing.Segments {
if d.snapshotDownload.SegmentIndexing.Segments[j].SegmentName == upd.Segments[i].SegmentName {
d.snapshotDownload.SegmentIndexing.Segments[j].Percent = upd.Segments[i].Percent
d.snapshotDownload.SegmentIndexing.Segments[j].Alloc = upd.Segments[i].Alloc
d.snapshotDownload.SegmentIndexing.Segments[j].Sys = upd.Segments[i].Sys
found = true
break
}
}
if !found {
d.snapshotDownload.SegmentIndexing.Segments = append(d.snapshotDownload.SegmentIndexing.Segments, upd.Segments[i])
}
}
d.snapshotDownload.SegmentIndexing.TimeElapsed = upd.TimeElapsed
}

View File

@ -57,8 +57,19 @@ type SegmentDownloadStatistics struct {
} }
type SnapshotIndexingStatistics struct { type SnapshotIndexingStatistics struct {
Segments map[string]int `json:"segments"` Segments []SnapshotSegmentIndexingStatistics `json:"segments"`
TimeElapsed float64 `json:"timeElapsed"` TimeElapsed float64 `json:"timeElapsed"`
}
type SnapshotSegmentIndexingStatistics struct {
SegmentName string `json:"segmentName"`
Percent int `json:"percent"`
Alloc uint64 `json:"alloc"`
Sys uint64 `json:"sys"`
}
type SnapshotSegmentIndexingFinishedUpdate struct {
SegmentName string `json:"segmentName"`
} }
func (ti SnapshotDownloadStatistics) Type() Type { func (ti SnapshotDownloadStatistics) Type() Type {
@ -72,3 +83,7 @@ func (ti SegmentDownloadStatistics) Type() Type {
func (ti SnapshotIndexingStatistics) Type() Type { func (ti SnapshotIndexingStatistics) Type() Type {
return TypeOf(ti) return TypeOf(ti)
} }
func (ti SnapshotSegmentIndexingFinishedUpdate) Type() Type {
return TypeOf(ti)
}

View File

@ -180,6 +180,7 @@ type FileInfo struct {
func (f FileInfo) TorrentFileExists() bool { return dir.FileExist(f.Path + ".torrent") } func (f FileInfo) TorrentFileExists() bool { return dir.FileExist(f.Path + ".torrent") }
func (f FileInfo) Seedable() bool { return f.To-f.From == Erigon2MergeLimit } func (f FileInfo) Seedable() bool { return f.To-f.From == Erigon2MergeLimit }
func (f FileInfo) NeedTorrentFile() bool { return f.Seedable() && !f.TorrentFileExists() } func (f FileInfo) NeedTorrentFile() bool { return f.Seedable() && !f.TorrentFileExists() }
func (f FileInfo) Name() string { return filepath.Base(f.Path) }
func IdxFiles(dir string) (res []FileInfo, err error) { return FilesWithExt(dir, ".idx") } func IdxFiles(dir string) (res []FileInfo, err error) { return FilesWithExt(dir, ".idx") }
func Segments(dir string) (res []FileInfo, err error) { return FilesWithExt(dir, ".seg") } func Segments(dir string) (res []FileInfo, err error) { return FilesWithExt(dir, ".seg") }

View File

@ -7,7 +7,6 @@ import (
"encoding/hex" "encoding/hex"
"errors" "errors"
"fmt" "fmt"
"github.com/ledgerwatch/erigon/consensus/bor"
"os" "os"
"path" "path"
"path/filepath" "path/filepath"
@ -18,6 +17,8 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/ledgerwatch/erigon/consensus/bor"
"github.com/holiman/uint256" "github.com/holiman/uint256"
"github.com/ledgerwatch/erigon-lib/chain" "github.com/ledgerwatch/erigon-lib/chain"
"github.com/ledgerwatch/erigon-lib/chain/snapcfg" "github.com/ledgerwatch/erigon-lib/chain/snapcfg"
@ -929,12 +930,7 @@ func BuildMissedIndices(logPrefix string, ctx context.Context, dirs datadir.Dirs
case <-logEvery.C: case <-logEvery.C:
var m runtime.MemStats var m runtime.MemStats
dbg.ReadMemStats(&m) dbg.ReadMemStats(&m)
sendDiagnostics(startIndexingTime, ps.DiagnossticsData(), m.Alloc, m.Sys)
diagnostics.Send(diagnostics.SnapshotIndexingStatistics{
Segments: ps.DiagnossticsData(),
TimeElapsed: time.Since(startIndexingTime).Round(time.Second).Seconds(),
})
logger.Info(fmt.Sprintf("[%s] Indexing", logPrefix), "progress", ps.String(), "total-indexing-time", time.Since(startIndexingTime).Round(time.Second).String(), "alloc", common2.ByteCount(m.Alloc), "sys", common2.ByteCount(m.Sys)) logger.Info(fmt.Sprintf("[%s] Indexing", logPrefix), "progress", ps.String(), "total-indexing-time", time.Since(startIndexingTime).Round(time.Second).String(), "alloc", common2.ByteCount(m.Alloc), "sys", common2.ByteCount(m.Sys))
case <-finish: case <-finish:
return return
@ -957,6 +953,7 @@ func BuildMissedIndices(logPrefix string, ctx context.Context, dirs datadir.Dirs
g.Go(func() error { g.Go(func() error {
p := &background.Progress{} p := &background.Progress{}
ps.Add(p) ps.Add(p)
defer notifySegmentIndexingFinished(sn.Name())
defer ps.Delete(p) defer ps.Delete(p)
return buildIdx(gCtx, sn, chainConfig, tmpDir, p, log.LvlInfo, logger) return buildIdx(gCtx, sn, chainConfig, tmpDir, p, log.LvlInfo, logger)
}) })
@ -974,7 +971,6 @@ func BuildMissedIndices(logPrefix string, ctx context.Context, dirs datadir.Dirs
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
} }
} }
func BuildBorMissedIndices(logPrefix string, ctx context.Context, dirs datadir.Dirs, chainConfig *chain.Config, workers int, logger log.Logger) error { func BuildBorMissedIndices(logPrefix string, ctx context.Context, dirs datadir.Dirs, chainConfig *chain.Config, workers int, logger log.Logger) error {
@ -1001,6 +997,7 @@ func BuildBorMissedIndices(logPrefix string, ctx context.Context, dirs datadir.D
g.Go(func() error { g.Go(func() error {
p := &background.Progress{} p := &background.Progress{}
ps.Add(p) ps.Add(p)
defer notifySegmentIndexingFinished(sn.Name())
defer ps.Delete(p) defer ps.Delete(p)
return buildIdx(gCtx, sn, chainConfig, tmpDir, p, log.LvlInfo, logger) return buildIdx(gCtx, sn, chainConfig, tmpDir, p, log.LvlInfo, logger)
}) })
@ -1023,16 +1020,36 @@ func BuildBorMissedIndices(logPrefix string, ctx context.Context, dirs datadir.D
case <-logEvery.C: case <-logEvery.C:
var m runtime.MemStats var m runtime.MemStats
dbg.ReadMemStats(&m) dbg.ReadMemStats(&m)
dd := ps.DiagnossticsData() sendDiagnostics(startIndexingTime, ps.DiagnossticsData(), m.Alloc, m.Sys)
diagnostics.Send(diagnostics.SnapshotIndexingStatistics{
Segments: dd,
TimeElapsed: time.Since(startIndexingTime).Round(time.Second).Seconds(),
})
logger.Info(fmt.Sprintf("[%s] Indexing", logPrefix), "progress", ps.String(), "total-indexing-time", time.Since(startIndexingTime).Round(time.Second).String(), "alloc", common2.ByteCount(m.Alloc), "sys", common2.ByteCount(m.Sys)) logger.Info(fmt.Sprintf("[%s] Indexing", logPrefix), "progress", ps.String(), "total-indexing-time", time.Since(startIndexingTime).Round(time.Second).String(), "alloc", common2.ByteCount(m.Alloc), "sys", common2.ByteCount(m.Sys))
} }
} }
} }
func notifySegmentIndexingFinished(name string) {
diagnostics.Send(
diagnostics.SnapshotSegmentIndexingFinishedUpdate{
SegmentName: name,
},
)
}
func sendDiagnostics(startIndexingTime time.Time, indexPercent map[string]int, alloc uint64, sys uint64) {
segmentsStats := make([]diagnostics.SnapshotSegmentIndexingStatistics, 0, len(indexPercent))
for k, v := range indexPercent {
segmentsStats = append(segmentsStats, diagnostics.SnapshotSegmentIndexingStatistics{
SegmentName: k,
Percent: v,
Alloc: alloc,
Sys: sys,
})
}
diagnostics.Send(diagnostics.SnapshotIndexingStatistics{
Segments: segmentsStats,
TimeElapsed: time.Since(startIndexingTime).Round(time.Second).Seconds(),
})
}
func noGaps(in []snaptype.FileInfo) (out []snaptype.FileInfo, missingSnapshots []Range) { func noGaps(in []snaptype.FileInfo) (out []snaptype.FileInfo, missingSnapshots []Range) {
var prevTo uint64 var prevTo uint64
for _, f := range in { for _, f := range in {