From ff92b701c36595ca5daa010dd90620628893d2d7 Mon Sep 17 00:00:00 2001 From: Dmytro Date: Mon, 8 Jan 2024 10:43:04 +0100 Subject: [PATCH] dvovk/updsync (#9134) refactored data structure for sync statistics --- cmd/capcli/cli.go | 2 +- diagnostics/diagnostic.go | 111 ++++++++++++++++++++--------- diagnostics/snapshot_sync.go | 2 +- erigon-lib/diagnostics/entities.go | 29 +++++++- eth/stagedsync/stage_snapshots.go | 2 +- eth/stagedsync/sync.go | 51 +++++++++---- turbo/snapshotsync/snapshotsync.go | 5 +- 7 files changed, 147 insertions(+), 55 deletions(-) diff --git a/cmd/capcli/cli.go b/cmd/capcli/cli.go index 4a5be83f9..f012add35 100644 --- a/cmd/capcli/cli.go +++ b/cmd/capcli/cli.go @@ -796,7 +796,7 @@ func (d *DownloadSnapshots) Run(ctx *Context) error { freezeblocks.NewBlockReader( freezeblocks.NewRoSnapshots(ethconfig.NewSnapCfg(false, false, false), dirs.Snap, snapshotVersion, log.Root()), freezeblocks.NewBorRoSnapshots(ethconfig.NewSnapCfg(false, false, false), dirs.Snap, snapshotVersion, log.Root())), - params.ChainConfigByChainName(d.Chain), direct.NewDownloaderClient(bittorrentServer)) + params.ChainConfigByChainName(d.Chain), direct.NewDownloaderClient(bittorrentServer), []string{}) } type RetrieveHistoricalState struct { diff --git a/diagnostics/diagnostic.go b/diagnostics/diagnostic.go index 2f3ef2af4..dcd332d99 100644 --- a/diagnostics/diagnostic.go +++ b/diagnostics/diagnostic.go @@ -16,11 +16,11 @@ type DiagnosticClient struct { metricsMux *http.ServeMux node *node.ErigonNode - snapshotDownload diaglib.SnapshotDownloadStatistics + syncStats diaglib.SyncStatistics } func NewDiagnosticClient(ctx *cli.Context, metricsMux *http.ServeMux, node *node.ErigonNode) *DiagnosticClient { - return &DiagnosticClient{ctx: ctx, metricsMux: metricsMux, node: node, snapshotDownload: diaglib.SnapshotDownloadStatistics{}} + return &DiagnosticClient{ctx: ctx, metricsMux: metricsMux, node: node, syncStats: diaglib.SyncStatistics{}} } func (d *DiagnosticClient) Setup() { @@ -28,6 +28,8 @@ func (d *DiagnosticClient) Setup() { d.runSegmentDownloadingListener() d.runSegmentIndexingListener() d.runSegmentIndexingFinishedListener() + d.runCurrentSyncStageListener() + d.runSyncStagesListListener() } func (d *DiagnosticClient) runSnapshotListener() { @@ -44,19 +46,18 @@ func (d *DiagnosticClient) runSnapshotListener() { cancel() return case info := <-ch: - d.snapshotDownload.Downloaded = info.Downloaded - d.snapshotDownload.Total = info.Total - d.snapshotDownload.TotalTime = info.TotalTime - d.snapshotDownload.DownloadRate = info.DownloadRate - d.snapshotDownload.UploadRate = info.UploadRate - d.snapshotDownload.Peers = info.Peers - d.snapshotDownload.Files = info.Files - d.snapshotDownload.Connections = info.Connections - d.snapshotDownload.Alloc = info.Alloc - d.snapshotDownload.Sys = info.Sys - d.snapshotDownload.DownloadFinished = info.DownloadFinished - d.snapshotDownload.TorrentMetadataReady = info.TorrentMetadataReady - d.snapshotDownload.LogPrefix = info.LogPrefix + d.syncStats.SnapshotDownload.Downloaded = info.Downloaded + d.syncStats.SnapshotDownload.Total = info.Total + d.syncStats.SnapshotDownload.TotalTime = info.TotalTime + d.syncStats.SnapshotDownload.DownloadRate = info.DownloadRate + d.syncStats.SnapshotDownload.UploadRate = info.UploadRate + d.syncStats.SnapshotDownload.Peers = info.Peers + d.syncStats.SnapshotDownload.Files = info.Files + d.syncStats.SnapshotDownload.Connections = info.Connections + d.syncStats.SnapshotDownload.Alloc = info.Alloc + d.syncStats.SnapshotDownload.Sys = info.Sys + d.syncStats.SnapshotDownload.DownloadFinished = info.DownloadFinished + d.syncStats.SnapshotDownload.TorrentMetadataReady = info.TorrentMetadataReady if info.DownloadFinished { return @@ -67,8 +68,8 @@ func (d *DiagnosticClient) runSnapshotListener() { }() } -func (d *DiagnosticClient) SnapshotDownload() diaglib.SnapshotDownloadStatistics { - return d.snapshotDownload +func (d *DiagnosticClient) SyncStatistics() diaglib.SyncStatistics { + return d.syncStats } func (d *DiagnosticClient) runSegmentDownloadingListener() { @@ -85,11 +86,11 @@ func (d *DiagnosticClient) runSegmentDownloadingListener() { cancel() return case info := <-ch: - if d.snapshotDownload.SegmentsDownloading == nil { - d.snapshotDownload.SegmentsDownloading = map[string]diaglib.SegmentDownloadStatistics{} + if d.syncStats.SnapshotDownload.SegmentsDownloading == nil { + d.syncStats.SnapshotDownload.SegmentsDownloading = map[string]diaglib.SegmentDownloadStatistics{} } - d.snapshotDownload.SegmentsDownloading[info.Name] = info + d.syncStats.SnapshotDownload.SegmentsDownloading[info.Name] = info } } }() @@ -130,15 +131,15 @@ func (d *DiagnosticClient) runSegmentIndexingFinishedListener() { return case info := <-ch: found := false - for i := range d.snapshotDownload.SegmentIndexing.Segments { - if d.snapshotDownload.SegmentIndexing.Segments[i].SegmentName == info.SegmentName { + for i := range d.syncStats.SnapshotIndexing.Segments { + if d.syncStats.SnapshotIndexing.Segments[i].SegmentName == info.SegmentName { found = true - d.snapshotDownload.SegmentIndexing.Segments[i].Percent = 100 + d.syncStats.SnapshotIndexing.Segments[i].Percent = 100 } } if !found { - d.snapshotDownload.SegmentIndexing.Segments = append(d.snapshotDownload.SegmentIndexing.Segments, diaglib.SnapshotSegmentIndexingStatistics{ + d.syncStats.SnapshotIndexing.Segments = append(d.syncStats.SnapshotIndexing.Segments, diaglib.SnapshotSegmentIndexingStatistics{ SegmentName: info.SegmentName, Percent: 100, Alloc: 0, @@ -151,26 +152,70 @@ func (d *DiagnosticClient) runSegmentIndexingFinishedListener() { } func (d *DiagnosticClient) addOrUpdateSegmentIndexingState(upd diaglib.SnapshotIndexingStatistics) { - if d.snapshotDownload.SegmentIndexing.Segments == nil { - d.snapshotDownload.SegmentIndexing.Segments = []diaglib.SnapshotSegmentIndexingStatistics{} + if d.syncStats.SnapshotIndexing.Segments == nil { + d.syncStats.SnapshotIndexing.Segments = []diaglib.SnapshotSegmentIndexingStatistics{} } for i := range upd.Segments { found := false - for j := range d.snapshotDownload.SegmentIndexing.Segments { - if d.snapshotDownload.SegmentIndexing.Segments[j].SegmentName == upd.Segments[i].SegmentName { - d.snapshotDownload.SegmentIndexing.Segments[j].Percent = upd.Segments[i].Percent - d.snapshotDownload.SegmentIndexing.Segments[j].Alloc = upd.Segments[i].Alloc - d.snapshotDownload.SegmentIndexing.Segments[j].Sys = upd.Segments[i].Sys + for j := range d.syncStats.SnapshotIndexing.Segments { + if d.syncStats.SnapshotIndexing.Segments[j].SegmentName == upd.Segments[i].SegmentName { + d.syncStats.SnapshotIndexing.Segments[j].Percent = upd.Segments[i].Percent + d.syncStats.SnapshotIndexing.Segments[j].Alloc = upd.Segments[i].Alloc + d.syncStats.SnapshotIndexing.Segments[j].Sys = upd.Segments[i].Sys found = true break } } if !found { - d.snapshotDownload.SegmentIndexing.Segments = append(d.snapshotDownload.SegmentIndexing.Segments, upd.Segments[i]) + d.syncStats.SnapshotIndexing.Segments = append(d.syncStats.SnapshotIndexing.Segments, upd.Segments[i]) } } - d.snapshotDownload.SegmentIndexing.TimeElapsed = upd.TimeElapsed + d.syncStats.SnapshotIndexing.TimeElapsed = upd.TimeElapsed +} + +func (d *DiagnosticClient) runSyncStagesListListener() { + go func() { + ctx, ch, cancel := diaglib.Context[diaglib.SyncStagesList](context.Background(), 1) + defer cancel() + + rootCtx, _ := common.RootContext() + + diaglib.StartProviders(ctx, diaglib.TypeOf(diaglib.SyncStagesList{}), log.Root()) + for { + select { + case <-rootCtx.Done(): + cancel() + return + case info := <-ch: + d.syncStats.SyncStages.StagesList = info.Stages + return + } + } + }() +} + +func (d *DiagnosticClient) runCurrentSyncStageListener() { + go func() { + ctx, ch, cancel := diaglib.Context[diaglib.CurrentSyncStage](context.Background(), 1) + defer cancel() + + rootCtx, _ := common.RootContext() + + diaglib.StartProviders(ctx, diaglib.TypeOf(diaglib.CurrentSyncStage{}), log.Root()) + for { + select { + case <-rootCtx.Done(): + cancel() + return + case info := <-ch: + d.syncStats.SyncStages.CurrentStage = info.Stage + if int(d.syncStats.SyncStages.CurrentStage) >= len(d.syncStats.SyncStages.StagesList) { + return + } + } + } + }() } diff --git a/diagnostics/snapshot_sync.go b/diagnostics/snapshot_sync.go index 66bb2a8a3..6e99b8ba4 100644 --- a/diagnostics/snapshot_sync.go +++ b/diagnostics/snapshot_sync.go @@ -14,5 +14,5 @@ func SetupStagesAccess(metricsMux *http.ServeMux, diag *DiagnosticClient) { } func writeStages(w http.ResponseWriter, diag *DiagnosticClient) { - json.NewEncoder(w).Encode(diag.SnapshotDownload()) + json.NewEncoder(w).Encode(diag.SyncStatistics()) } diff --git a/erigon-lib/diagnostics/entities.go b/erigon-lib/diagnostics/entities.go index f91ab9cea..fe8656249 100644 --- a/erigon-lib/diagnostics/entities.go +++ b/erigon-lib/diagnostics/entities.go @@ -29,6 +29,12 @@ type PeerStatistics struct { TypeBytesOut map[string]uint64 } +type SyncStatistics struct { + SyncStages SyncStages `json:"syncStages"` + SnapshotDownload SnapshotDownloadStatistics `json:"snapshotDownload"` + SnapshotIndexing SnapshotIndexingStatistics `json:"snapshotIndexing"` +} + type SnapshotDownloadStatistics struct { Downloaded uint64 `json:"downloaded"` Total uint64 `json:"total"` @@ -42,9 +48,7 @@ type SnapshotDownloadStatistics struct { Sys uint64 `json:"sys"` DownloadFinished bool `json:"downloadFinished"` SegmentsDownloading map[string]SegmentDownloadStatistics `json:"segmentsDownloading"` - SegmentIndexing SnapshotIndexingStatistics `json:"segmentsIndexing"` TorrentMetadataReady int32 `json:"torrentMetadataReady"` - LogPrefix string `json:"logPrefix"` } type SegmentDownloadStatistics struct { @@ -73,6 +77,19 @@ type SnapshotSegmentIndexingFinishedUpdate struct { SegmentName string `json:"segmentName"` } +type SyncStagesList struct { + Stages []string `json:"stages"` +} + +type CurrentSyncStage struct { + Stage uint `json:"stage"` +} + +type SyncStages struct { + StagesList []string `json:"stagesList"` + CurrentStage uint `json:"currentStage"` +} + func (ti SnapshotDownloadStatistics) Type() Type { return TypeOf(ti) } @@ -88,3 +105,11 @@ func (ti SnapshotIndexingStatistics) Type() Type { func (ti SnapshotSegmentIndexingFinishedUpdate) Type() Type { return TypeOf(ti) } + +func (ti SyncStagesList) Type() Type { + return TypeOf(ti) +} + +func (ti CurrentSyncStage) Type() Type { + return TypeOf(ti) +} diff --git a/eth/stagedsync/stage_snapshots.go b/eth/stagedsync/stage_snapshots.go index 7ef851ff0..4b9cbc370 100644 --- a/eth/stagedsync/stage_snapshots.go +++ b/eth/stagedsync/stage_snapshots.go @@ -231,7 +231,7 @@ func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.R cfg.notifier.Events.OnNewSnapshot() } } else { - if err := snapshotsync.WaitForDownloader(ctx, s.LogPrefix(), cfg.historyV3, cstate, cfg.agg, tx, cfg.blockReader, &cfg.chainConfig, cfg.snapshotDownloader); err != nil { + if err := snapshotsync.WaitForDownloader(ctx, s.LogPrefix(), cfg.historyV3, cstate, cfg.agg, tx, cfg.blockReader, &cfg.chainConfig, cfg.snapshotDownloader, s.state.StagesIdsList()); err != nil { return err } } diff --git a/eth/stagedsync/sync.go b/eth/stagedsync/sync.go index cf1cd273f..df75def7c 100644 --- a/eth/stagedsync/sync.go +++ b/eth/stagedsync/sync.go @@ -9,6 +9,7 @@ import ( libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/common/dbg" + "github.com/ledgerwatch/erigon-lib/diagnostics" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon/eth/ethconfig" "github.com/ledgerwatch/erigon/eth/stagedsync/stages" @@ -21,13 +22,14 @@ type Sync struct { unwindReason UnwindReason posTransition *uint64 - stages []*Stage - unwindOrder []*Stage - pruningOrder []*Stage - currentStage uint - timings []Timing - logPrefixes []string - logger log.Logger + stages []*Stage + unwindOrder []*Stage + pruningOrder []*Stage + currentStage uint + timings []Timing + logPrefixes []string + logger log.Logger + stagesIdsList []string } type Timing struct { @@ -86,6 +88,11 @@ func (s *Sync) NextStage() { return } s.currentStage++ + + isDiagEnabled := diagnostics.TypeOf(diagnostics.CurrentSyncStage{}).Enabled() + if isDiagEnabled { + diagnostics.Send(diagnostics.CurrentSyncStage{Stage: s.currentStage}) + } } // IsBefore returns true if stage1 goes before stage2 in staged sync @@ -144,10 +151,22 @@ func (s *Sync) LogPrefix() string { return s.logPrefixes[s.currentStage] } +func (s *Sync) StagesIdsList() []string { + if s == nil { + return []string{} + } + return s.stagesIdsList +} + func (s *Sync) SetCurrentStage(id stages.SyncStage) error { for i, stage := range s.stages { if stage.ID == id { s.currentStage = uint(i) + isDiagEnabled := diagnostics.TypeOf(diagnostics.CurrentSyncStage{}).Enabled() + if isDiagEnabled { + diagnostics.Send(diagnostics.CurrentSyncStage{Stage: s.currentStage}) + } + return nil } } @@ -173,19 +192,23 @@ func New(cfg ethconfig.Sync, stagesList []*Stage, unwindOrder UnwindOrder, prune } } } + logPrefixes := make([]string, len(stagesList)) + stagesIdsList := make([]string, len(stagesList)) for i := range stagesList { logPrefixes[i] = fmt.Sprintf("%d/%d %s", i+1, len(stagesList), stagesList[i].ID) + stagesIdsList[i] = string(stagesList[i].ID) } return &Sync{ - cfg: cfg, - stages: stagesList, - currentStage: 0, - unwindOrder: unwindStages, - pruningOrder: pruneStages, - logPrefixes: logPrefixes, - logger: logger, + cfg: cfg, + stages: stagesList, + currentStage: 0, + unwindOrder: unwindStages, + pruningOrder: pruneStages, + logPrefixes: logPrefixes, + logger: logger, + stagesIdsList: stagesIdsList, } } diff --git a/turbo/snapshotsync/snapshotsync.go b/turbo/snapshotsync/snapshotsync.go index a33b5e2ef..7197fc683 100644 --- a/turbo/snapshotsync/snapshotsync.go +++ b/turbo/snapshotsync/snapshotsync.go @@ -67,7 +67,7 @@ func RequestSnapshotsDownload(ctx context.Context, downloadRequest []services.Do // WaitForDownloader - wait for Downloader service to download all expected snapshots // for MVP we sync with Downloader only once, in future will send new snapshots also -func WaitForDownloader(ctx context.Context, logPrefix string, histV3 bool, caplin CaplinMode, agg *state.AggregatorV3, tx kv.RwTx, blockReader services.FullBlockReader, cc *chain.Config, snapshotDownloader proto_downloader.DownloaderClient) error { +func WaitForDownloader(ctx context.Context, logPrefix string, histV3 bool, caplin CaplinMode, agg *state.AggregatorV3, tx kv.RwTx, blockReader services.FullBlockReader, cc *chain.Config, snapshotDownloader proto_downloader.DownloaderClient, stagesIdsList []string) error { snapshots := blockReader.Snapshots() borSnapshots := blockReader.BorSnapshots() if blockReader.FreezingCfg().NoDownloader { @@ -157,12 +157,12 @@ Loop: Alloc: m.Alloc, Sys: m.Sys, DownloadFinished: stats.Completed, - LogPrefix: logPrefix, }) log.Info(fmt.Sprintf("[%s] download finished", logPrefix), "time", time.Since(downloadStartTime).String()) break Loop } else { + diagnostics.Send(diagnostics.SyncStagesList{Stages: stagesIdsList}) diagnostics.Send(diagnostics.SnapshotDownloadStatistics{ Downloaded: stats.BytesCompleted, Total: stats.BytesTotal, @@ -176,7 +176,6 @@ Loop: Sys: m.Sys, DownloadFinished: stats.Completed, TorrentMetadataReady: stats.MetadataReady, - LogPrefix: logPrefix, }) if stats.MetadataReady < stats.FilesTotal {