dvovk/updsync (#9134)

refactored data structure for sync statistics
This commit is contained in:
Dmytro 2024-01-08 10:43:04 +01:00 committed by GitHub
parent b4fd278533
commit ff92b701c3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 147 additions and 55 deletions

View File

@ -796,7 +796,7 @@ func (d *DownloadSnapshots) Run(ctx *Context) error {
freezeblocks.NewBlockReader(
freezeblocks.NewRoSnapshots(ethconfig.NewSnapCfg(false, false, false), dirs.Snap, snapshotVersion, log.Root()),
freezeblocks.NewBorRoSnapshots(ethconfig.NewSnapCfg(false, false, false), dirs.Snap, snapshotVersion, log.Root())),
params.ChainConfigByChainName(d.Chain), direct.NewDownloaderClient(bittorrentServer))
params.ChainConfigByChainName(d.Chain), direct.NewDownloaderClient(bittorrentServer), []string{})
}
type RetrieveHistoricalState struct {

View File

@ -16,11 +16,11 @@ type DiagnosticClient struct {
metricsMux *http.ServeMux
node *node.ErigonNode
snapshotDownload diaglib.SnapshotDownloadStatistics
syncStats diaglib.SyncStatistics
}
func NewDiagnosticClient(ctx *cli.Context, metricsMux *http.ServeMux, node *node.ErigonNode) *DiagnosticClient {
return &DiagnosticClient{ctx: ctx, metricsMux: metricsMux, node: node, snapshotDownload: diaglib.SnapshotDownloadStatistics{}}
return &DiagnosticClient{ctx: ctx, metricsMux: metricsMux, node: node, syncStats: diaglib.SyncStatistics{}}
}
func (d *DiagnosticClient) Setup() {
@ -28,6 +28,8 @@ func (d *DiagnosticClient) Setup() {
d.runSegmentDownloadingListener()
d.runSegmentIndexingListener()
d.runSegmentIndexingFinishedListener()
d.runCurrentSyncStageListener()
d.runSyncStagesListListener()
}
func (d *DiagnosticClient) runSnapshotListener() {
@ -44,19 +46,18 @@ func (d *DiagnosticClient) runSnapshotListener() {
cancel()
return
case info := <-ch:
d.snapshotDownload.Downloaded = info.Downloaded
d.snapshotDownload.Total = info.Total
d.snapshotDownload.TotalTime = info.TotalTime
d.snapshotDownload.DownloadRate = info.DownloadRate
d.snapshotDownload.UploadRate = info.UploadRate
d.snapshotDownload.Peers = info.Peers
d.snapshotDownload.Files = info.Files
d.snapshotDownload.Connections = info.Connections
d.snapshotDownload.Alloc = info.Alloc
d.snapshotDownload.Sys = info.Sys
d.snapshotDownload.DownloadFinished = info.DownloadFinished
d.snapshotDownload.TorrentMetadataReady = info.TorrentMetadataReady
d.snapshotDownload.LogPrefix = info.LogPrefix
d.syncStats.SnapshotDownload.Downloaded = info.Downloaded
d.syncStats.SnapshotDownload.Total = info.Total
d.syncStats.SnapshotDownload.TotalTime = info.TotalTime
d.syncStats.SnapshotDownload.DownloadRate = info.DownloadRate
d.syncStats.SnapshotDownload.UploadRate = info.UploadRate
d.syncStats.SnapshotDownload.Peers = info.Peers
d.syncStats.SnapshotDownload.Files = info.Files
d.syncStats.SnapshotDownload.Connections = info.Connections
d.syncStats.SnapshotDownload.Alloc = info.Alloc
d.syncStats.SnapshotDownload.Sys = info.Sys
d.syncStats.SnapshotDownload.DownloadFinished = info.DownloadFinished
d.syncStats.SnapshotDownload.TorrentMetadataReady = info.TorrentMetadataReady
if info.DownloadFinished {
return
@ -67,8 +68,8 @@ func (d *DiagnosticClient) runSnapshotListener() {
}()
}
func (d *DiagnosticClient) SnapshotDownload() diaglib.SnapshotDownloadStatistics {
return d.snapshotDownload
func (d *DiagnosticClient) SyncStatistics() diaglib.SyncStatistics {
return d.syncStats
}
func (d *DiagnosticClient) runSegmentDownloadingListener() {
@ -85,11 +86,11 @@ func (d *DiagnosticClient) runSegmentDownloadingListener() {
cancel()
return
case info := <-ch:
if d.snapshotDownload.SegmentsDownloading == nil {
d.snapshotDownload.SegmentsDownloading = map[string]diaglib.SegmentDownloadStatistics{}
if d.syncStats.SnapshotDownload.SegmentsDownloading == nil {
d.syncStats.SnapshotDownload.SegmentsDownloading = map[string]diaglib.SegmentDownloadStatistics{}
}
d.snapshotDownload.SegmentsDownloading[info.Name] = info
d.syncStats.SnapshotDownload.SegmentsDownloading[info.Name] = info
}
}
}()
@ -130,15 +131,15 @@ func (d *DiagnosticClient) runSegmentIndexingFinishedListener() {
return
case info := <-ch:
found := false
for i := range d.snapshotDownload.SegmentIndexing.Segments {
if d.snapshotDownload.SegmentIndexing.Segments[i].SegmentName == info.SegmentName {
for i := range d.syncStats.SnapshotIndexing.Segments {
if d.syncStats.SnapshotIndexing.Segments[i].SegmentName == info.SegmentName {
found = true
d.snapshotDownload.SegmentIndexing.Segments[i].Percent = 100
d.syncStats.SnapshotIndexing.Segments[i].Percent = 100
}
}
if !found {
d.snapshotDownload.SegmentIndexing.Segments = append(d.snapshotDownload.SegmentIndexing.Segments, diaglib.SnapshotSegmentIndexingStatistics{
d.syncStats.SnapshotIndexing.Segments = append(d.syncStats.SnapshotIndexing.Segments, diaglib.SnapshotSegmentIndexingStatistics{
SegmentName: info.SegmentName,
Percent: 100,
Alloc: 0,
@ -151,26 +152,70 @@ func (d *DiagnosticClient) runSegmentIndexingFinishedListener() {
}
func (d *DiagnosticClient) addOrUpdateSegmentIndexingState(upd diaglib.SnapshotIndexingStatistics) {
if d.snapshotDownload.SegmentIndexing.Segments == nil {
d.snapshotDownload.SegmentIndexing.Segments = []diaglib.SnapshotSegmentIndexingStatistics{}
if d.syncStats.SnapshotIndexing.Segments == nil {
d.syncStats.SnapshotIndexing.Segments = []diaglib.SnapshotSegmentIndexingStatistics{}
}
for i := range upd.Segments {
found := false
for j := range d.snapshotDownload.SegmentIndexing.Segments {
if d.snapshotDownload.SegmentIndexing.Segments[j].SegmentName == upd.Segments[i].SegmentName {
d.snapshotDownload.SegmentIndexing.Segments[j].Percent = upd.Segments[i].Percent
d.snapshotDownload.SegmentIndexing.Segments[j].Alloc = upd.Segments[i].Alloc
d.snapshotDownload.SegmentIndexing.Segments[j].Sys = upd.Segments[i].Sys
for j := range d.syncStats.SnapshotIndexing.Segments {
if d.syncStats.SnapshotIndexing.Segments[j].SegmentName == upd.Segments[i].SegmentName {
d.syncStats.SnapshotIndexing.Segments[j].Percent = upd.Segments[i].Percent
d.syncStats.SnapshotIndexing.Segments[j].Alloc = upd.Segments[i].Alloc
d.syncStats.SnapshotIndexing.Segments[j].Sys = upd.Segments[i].Sys
found = true
break
}
}
if !found {
d.snapshotDownload.SegmentIndexing.Segments = append(d.snapshotDownload.SegmentIndexing.Segments, upd.Segments[i])
d.syncStats.SnapshotIndexing.Segments = append(d.syncStats.SnapshotIndexing.Segments, upd.Segments[i])
}
}
d.snapshotDownload.SegmentIndexing.TimeElapsed = upd.TimeElapsed
d.syncStats.SnapshotIndexing.TimeElapsed = upd.TimeElapsed
}
func (d *DiagnosticClient) runSyncStagesListListener() {
go func() {
ctx, ch, cancel := diaglib.Context[diaglib.SyncStagesList](context.Background(), 1)
defer cancel()
rootCtx, _ := common.RootContext()
diaglib.StartProviders(ctx, diaglib.TypeOf(diaglib.SyncStagesList{}), log.Root())
for {
select {
case <-rootCtx.Done():
cancel()
return
case info := <-ch:
d.syncStats.SyncStages.StagesList = info.Stages
return
}
}
}()
}
func (d *DiagnosticClient) runCurrentSyncStageListener() {
go func() {
ctx, ch, cancel := diaglib.Context[diaglib.CurrentSyncStage](context.Background(), 1)
defer cancel()
rootCtx, _ := common.RootContext()
diaglib.StartProviders(ctx, diaglib.TypeOf(diaglib.CurrentSyncStage{}), log.Root())
for {
select {
case <-rootCtx.Done():
cancel()
return
case info := <-ch:
d.syncStats.SyncStages.CurrentStage = info.Stage
if int(d.syncStats.SyncStages.CurrentStage) >= len(d.syncStats.SyncStages.StagesList) {
return
}
}
}
}()
}

View File

@ -14,5 +14,5 @@ func SetupStagesAccess(metricsMux *http.ServeMux, diag *DiagnosticClient) {
}
func writeStages(w http.ResponseWriter, diag *DiagnosticClient) {
json.NewEncoder(w).Encode(diag.SnapshotDownload())
json.NewEncoder(w).Encode(diag.SyncStatistics())
}

View File

@ -29,6 +29,12 @@ type PeerStatistics struct {
TypeBytesOut map[string]uint64
}
type SyncStatistics struct {
SyncStages SyncStages `json:"syncStages"`
SnapshotDownload SnapshotDownloadStatistics `json:"snapshotDownload"`
SnapshotIndexing SnapshotIndexingStatistics `json:"snapshotIndexing"`
}
type SnapshotDownloadStatistics struct {
Downloaded uint64 `json:"downloaded"`
Total uint64 `json:"total"`
@ -42,9 +48,7 @@ type SnapshotDownloadStatistics struct {
Sys uint64 `json:"sys"`
DownloadFinished bool `json:"downloadFinished"`
SegmentsDownloading map[string]SegmentDownloadStatistics `json:"segmentsDownloading"`
SegmentIndexing SnapshotIndexingStatistics `json:"segmentsIndexing"`
TorrentMetadataReady int32 `json:"torrentMetadataReady"`
LogPrefix string `json:"logPrefix"`
}
type SegmentDownloadStatistics struct {
@ -73,6 +77,19 @@ type SnapshotSegmentIndexingFinishedUpdate struct {
SegmentName string `json:"segmentName"`
}
type SyncStagesList struct {
Stages []string `json:"stages"`
}
type CurrentSyncStage struct {
Stage uint `json:"stage"`
}
type SyncStages struct {
StagesList []string `json:"stagesList"`
CurrentStage uint `json:"currentStage"`
}
func (ti SnapshotDownloadStatistics) Type() Type {
return TypeOf(ti)
}
@ -88,3 +105,11 @@ func (ti SnapshotIndexingStatistics) Type() Type {
func (ti SnapshotSegmentIndexingFinishedUpdate) Type() Type {
return TypeOf(ti)
}
func (ti SyncStagesList) Type() Type {
return TypeOf(ti)
}
func (ti CurrentSyncStage) Type() Type {
return TypeOf(ti)
}

View File

@ -231,7 +231,7 @@ func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.R
cfg.notifier.Events.OnNewSnapshot()
}
} else {
if err := snapshotsync.WaitForDownloader(ctx, s.LogPrefix(), cfg.historyV3, cstate, cfg.agg, tx, cfg.blockReader, &cfg.chainConfig, cfg.snapshotDownloader); err != nil {
if err := snapshotsync.WaitForDownloader(ctx, s.LogPrefix(), cfg.historyV3, cstate, cfg.agg, tx, cfg.blockReader, &cfg.chainConfig, cfg.snapshotDownloader, s.state.StagesIdsList()); err != nil {
return err
}
}

View File

@ -9,6 +9,7 @@ import (
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/common/dbg"
"github.com/ledgerwatch/erigon-lib/diagnostics"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/eth/ethconfig"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
@ -28,6 +29,7 @@ type Sync struct {
timings []Timing
logPrefixes []string
logger log.Logger
stagesIdsList []string
}
type Timing struct {
@ -86,6 +88,11 @@ func (s *Sync) NextStage() {
return
}
s.currentStage++
isDiagEnabled := diagnostics.TypeOf(diagnostics.CurrentSyncStage{}).Enabled()
if isDiagEnabled {
diagnostics.Send(diagnostics.CurrentSyncStage{Stage: s.currentStage})
}
}
// IsBefore returns true if stage1 goes before stage2 in staged sync
@ -144,10 +151,22 @@ func (s *Sync) LogPrefix() string {
return s.logPrefixes[s.currentStage]
}
func (s *Sync) StagesIdsList() []string {
if s == nil {
return []string{}
}
return s.stagesIdsList
}
func (s *Sync) SetCurrentStage(id stages.SyncStage) error {
for i, stage := range s.stages {
if stage.ID == id {
s.currentStage = uint(i)
isDiagEnabled := diagnostics.TypeOf(diagnostics.CurrentSyncStage{}).Enabled()
if isDiagEnabled {
diagnostics.Send(diagnostics.CurrentSyncStage{Stage: s.currentStage})
}
return nil
}
}
@ -173,9 +192,12 @@ func New(cfg ethconfig.Sync, stagesList []*Stage, unwindOrder UnwindOrder, prune
}
}
}
logPrefixes := make([]string, len(stagesList))
stagesIdsList := make([]string, len(stagesList))
for i := range stagesList {
logPrefixes[i] = fmt.Sprintf("%d/%d %s", i+1, len(stagesList), stagesList[i].ID)
stagesIdsList[i] = string(stagesList[i].ID)
}
return &Sync{
@ -186,6 +208,7 @@ func New(cfg ethconfig.Sync, stagesList []*Stage, unwindOrder UnwindOrder, prune
pruningOrder: pruneStages,
logPrefixes: logPrefixes,
logger: logger,
stagesIdsList: stagesIdsList,
}
}

View File

@ -67,7 +67,7 @@ func RequestSnapshotsDownload(ctx context.Context, downloadRequest []services.Do
// WaitForDownloader - wait for Downloader service to download all expected snapshots
// for MVP we sync with Downloader only once, in future will send new snapshots also
func WaitForDownloader(ctx context.Context, logPrefix string, histV3 bool, caplin CaplinMode, agg *state.AggregatorV3, tx kv.RwTx, blockReader services.FullBlockReader, cc *chain.Config, snapshotDownloader proto_downloader.DownloaderClient) error {
func WaitForDownloader(ctx context.Context, logPrefix string, histV3 bool, caplin CaplinMode, agg *state.AggregatorV3, tx kv.RwTx, blockReader services.FullBlockReader, cc *chain.Config, snapshotDownloader proto_downloader.DownloaderClient, stagesIdsList []string) error {
snapshots := blockReader.Snapshots()
borSnapshots := blockReader.BorSnapshots()
if blockReader.FreezingCfg().NoDownloader {
@ -157,12 +157,12 @@ Loop:
Alloc: m.Alloc,
Sys: m.Sys,
DownloadFinished: stats.Completed,
LogPrefix: logPrefix,
})
log.Info(fmt.Sprintf("[%s] download finished", logPrefix), "time", time.Since(downloadStartTime).String())
break Loop
} else {
diagnostics.Send(diagnostics.SyncStagesList{Stages: stagesIdsList})
diagnostics.Send(diagnostics.SnapshotDownloadStatistics{
Downloaded: stats.BytesCompleted,
Total: stats.BytesTotal,
@ -176,7 +176,6 @@ Loop:
Sys: m.Sys,
DownloadFinished: stats.Completed,
TorrentMetadataReady: stats.MetadataReady,
LogPrefix: logPrefix,
})
if stats.MetadataReady < stats.FilesTotal {