added collecting info about snapshot indexing, renamed downloading prop (#8987)

This commit is contained in:
Dmytro 2023-12-15 01:23:26 +01:00 committed by GitHub
parent 1a6b83b82c
commit e82147caf3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 67 additions and 6 deletions

View File

@ -25,7 +25,8 @@ func NewDiagnosticClient(ctx *cli.Context, metricsMux *http.ServeMux, node *node
func (d *DiagnosticClient) Setup() {
d.runSnapshotListener()
d.runTorrentListener()
d.runSegmentDownloadingListener()
d.runSegmentIndexingListener()
}
func (d *DiagnosticClient) runSnapshotListener() {
@ -68,7 +69,7 @@ func (d *DiagnosticClient) SnapshotDownload() diaglib.SnapshotDownloadStatistics
return d.snapshotDownload
}
func (d *DiagnosticClient) runTorrentListener() {
func (d *DiagnosticClient) runSegmentDownloadingListener() {
go func() {
ctx, ch, cancel := diaglib.Context[diaglib.SegmentDownloadStatistics](context.Background(), 1)
defer cancel()
@ -82,11 +83,31 @@ func (d *DiagnosticClient) runTorrentListener() {
cancel()
return
case info := <-ch:
if d.snapshotDownload.Segments == nil {
d.snapshotDownload.Segments = map[string]diaglib.SegmentDownloadStatistics{}
if d.snapshotDownload.SegmentsDownloading == nil {
d.snapshotDownload.SegmentsDownloading = map[string]diaglib.SegmentDownloadStatistics{}
}
d.snapshotDownload.Segments[info.Name] = info
d.snapshotDownload.SegmentsDownloading[info.Name] = info
}
}
}()
}
func (d *DiagnosticClient) runSegmentIndexingListener() {
go func() {
ctx, ch, cancel := diaglib.Context[diaglib.SnapshotIndexingStatistics](context.Background(), 1)
defer cancel()
rootCtx, _ := common.RootContext()
diaglib.StartProviders(ctx, diaglib.TypeOf(diaglib.SnapshotIndexingStatistics{}), log.Root())
for {
select {
case <-rootCtx.Done():
cancel()
return
case info := <-ch:
d.snapshotDownload.SegmentIndexing = info
}
}
}()

View File

@ -96,3 +96,21 @@ func (s *ProgressSet) String() string {
})
return sb.String()
}
func (s *ProgressSet) DiagnossticsData() map[string]int {
s.lock.RLock()
defer s.lock.RUnlock()
var arr = make(map[string]int, s.list.Len())
s.list.Scan(func(_ int, p *Progress) bool {
if p == nil {
return true
}
namePtr := p.Name.Load()
if namePtr == nil {
return true
}
arr[*namePtr] = p.percent()
return true
})
return arr
}

View File

@ -41,7 +41,8 @@ type SnapshotDownloadStatistics struct {
Alloc uint64 `json:"alloc"`
Sys uint64 `json:"sys"`
DownloadFinished bool `json:"downloadFinished"`
Segments map[string]SegmentDownloadStatistics `json:"segments"`
SegmentsDownloading map[string]SegmentDownloadStatistics `json:"segmentsDownloading"`
SegmentIndexing SnapshotIndexingStatistics `json:"segmentsIndexing"`
TorrentMetadataReady int32 `json:"torrentMetadataReady"`
}
@ -55,6 +56,11 @@ type SegmentDownloadStatistics struct {
PeersRate uint64 `json:"peersRate"`
}
type SnapshotIndexingStatistics struct {
Segments map[string]int `json:"segments"`
TimeElapsed float64 `json:"timeElapsed"`
}
func (ti SnapshotDownloadStatistics) Type() Type {
return TypeOf(ti)
}
@ -62,3 +68,7 @@ func (ti SnapshotDownloadStatistics) Type() Type {
func (ti SegmentDownloadStatistics) Type() Type {
return TypeOf(ti)
}
func (ti SnapshotIndexingStatistics) Type() Type {
return TypeOf(ti)
}

View File

@ -28,6 +28,7 @@ import (
dir2 "github.com/ledgerwatch/erigon-lib/common/dir"
"github.com/ledgerwatch/erigon-lib/common/hexutility"
"github.com/ledgerwatch/erigon-lib/compress"
"github.com/ledgerwatch/erigon-lib/diagnostics"
"github.com/ledgerwatch/erigon-lib/downloader/snaptype"
"github.com/ledgerwatch/erigon-lib/etl"
"github.com/ledgerwatch/erigon-lib/kv"
@ -927,6 +928,12 @@ func BuildMissedIndices(logPrefix string, ctx context.Context, dirs datadir.Dirs
case <-logEvery.C:
var m runtime.MemStats
dbg.ReadMemStats(&m)
diagnostics.Send(diagnostics.SnapshotIndexingStatistics{
Segments: ps.DiagnossticsData(),
TimeElapsed: time.Since(startIndexingTime).Round(time.Second).Seconds(),
})
logger.Info(fmt.Sprintf("[%s] Indexing", logPrefix), "progress", ps.String(), "total-indexing-time", time.Since(startIndexingTime).Round(time.Second).String(), "alloc", common2.ByteCount(m.Alloc), "sys", common2.ByteCount(m.Sys))
case <-finish:
return
@ -1015,6 +1022,11 @@ func BuildBorMissedIndices(logPrefix string, ctx context.Context, dirs datadir.D
case <-logEvery.C:
var m runtime.MemStats
dbg.ReadMemStats(&m)
dd := ps.DiagnossticsData()
diagnostics.Send(diagnostics.SnapshotIndexingStatistics{
Segments: dd,
TimeElapsed: time.Since(startIndexingTime).Round(time.Second).Seconds(),
})
logger.Info(fmt.Sprintf("[%s] Indexing", logPrefix), "progress", ps.String(), "total-indexing-time", time.Since(startIndexingTime).Round(time.Second).String(), "alloc", common2.ByteCount(m.Alloc), "sys", common2.ByteCount(m.Sys))
}
}