erigon-pulse/diagnostics/diagnostic.go
Dmytro ff92b701c3
dvovk/updsync (#9134)
refactored data structure for sync statistics
2024-01-08 10:43:04 +01:00

222 lines
6.1 KiB
Go

package diagnostics
import (
"context"
"net/http"
"github.com/ledgerwatch/erigon-lib/common"
diaglib "github.com/ledgerwatch/erigon-lib/diagnostics"
"github.com/ledgerwatch/erigon/turbo/node"
"github.com/ledgerwatch/log/v3"
"github.com/urfave/cli/v2"
)
type DiagnosticClient struct {
ctx *cli.Context
metricsMux *http.ServeMux
node *node.ErigonNode
syncStats diaglib.SyncStatistics
}
func NewDiagnosticClient(ctx *cli.Context, metricsMux *http.ServeMux, node *node.ErigonNode) *DiagnosticClient {
return &DiagnosticClient{ctx: ctx, metricsMux: metricsMux, node: node, syncStats: diaglib.SyncStatistics{}}
}
func (d *DiagnosticClient) Setup() {
d.runSnapshotListener()
d.runSegmentDownloadingListener()
d.runSegmentIndexingListener()
d.runSegmentIndexingFinishedListener()
d.runCurrentSyncStageListener()
d.runSyncStagesListListener()
}
func (d *DiagnosticClient) runSnapshotListener() {
go func() {
ctx, ch, cancel := diaglib.Context[diaglib.SnapshotDownloadStatistics](context.Background(), 1)
defer cancel()
rootCtx, _ := common.RootContext()
diaglib.StartProviders(ctx, diaglib.TypeOf(diaglib.SnapshotDownloadStatistics{}), log.Root())
for {
select {
case <-rootCtx.Done():
cancel()
return
case info := <-ch:
d.syncStats.SnapshotDownload.Downloaded = info.Downloaded
d.syncStats.SnapshotDownload.Total = info.Total
d.syncStats.SnapshotDownload.TotalTime = info.TotalTime
d.syncStats.SnapshotDownload.DownloadRate = info.DownloadRate
d.syncStats.SnapshotDownload.UploadRate = info.UploadRate
d.syncStats.SnapshotDownload.Peers = info.Peers
d.syncStats.SnapshotDownload.Files = info.Files
d.syncStats.SnapshotDownload.Connections = info.Connections
d.syncStats.SnapshotDownload.Alloc = info.Alloc
d.syncStats.SnapshotDownload.Sys = info.Sys
d.syncStats.SnapshotDownload.DownloadFinished = info.DownloadFinished
d.syncStats.SnapshotDownload.TorrentMetadataReady = info.TorrentMetadataReady
if info.DownloadFinished {
return
}
}
}
}()
}
func (d *DiagnosticClient) SyncStatistics() diaglib.SyncStatistics {
return d.syncStats
}
func (d *DiagnosticClient) runSegmentDownloadingListener() {
go func() {
ctx, ch, cancel := diaglib.Context[diaglib.SegmentDownloadStatistics](context.Background(), 1)
defer cancel()
rootCtx, _ := common.RootContext()
diaglib.StartProviders(ctx, diaglib.TypeOf(diaglib.SegmentDownloadStatistics{}), log.Root())
for {
select {
case <-rootCtx.Done():
cancel()
return
case info := <-ch:
if d.syncStats.SnapshotDownload.SegmentsDownloading == nil {
d.syncStats.SnapshotDownload.SegmentsDownloading = map[string]diaglib.SegmentDownloadStatistics{}
}
d.syncStats.SnapshotDownload.SegmentsDownloading[info.Name] = info
}
}
}()
}
func (d *DiagnosticClient) runSegmentIndexingListener() {
go func() {
ctx, ch, cancel := diaglib.Context[diaglib.SnapshotIndexingStatistics](context.Background(), 1)
defer cancel()
rootCtx, _ := common.RootContext()
diaglib.StartProviders(ctx, diaglib.TypeOf(diaglib.SnapshotIndexingStatistics{}), log.Root())
for {
select {
case <-rootCtx.Done():
cancel()
return
case info := <-ch:
d.addOrUpdateSegmentIndexingState(info)
}
}
}()
}
func (d *DiagnosticClient) runSegmentIndexingFinishedListener() {
go func() {
ctx, ch, cancel := diaglib.Context[diaglib.SnapshotSegmentIndexingFinishedUpdate](context.Background(), 1)
defer cancel()
rootCtx, _ := common.RootContext()
diaglib.StartProviders(ctx, diaglib.TypeOf(diaglib.SnapshotSegmentIndexingFinishedUpdate{}), log.Root())
for {
select {
case <-rootCtx.Done():
cancel()
return
case info := <-ch:
found := false
for i := range d.syncStats.SnapshotIndexing.Segments {
if d.syncStats.SnapshotIndexing.Segments[i].SegmentName == info.SegmentName {
found = true
d.syncStats.SnapshotIndexing.Segments[i].Percent = 100
}
}
if !found {
d.syncStats.SnapshotIndexing.Segments = append(d.syncStats.SnapshotIndexing.Segments, diaglib.SnapshotSegmentIndexingStatistics{
SegmentName: info.SegmentName,
Percent: 100,
Alloc: 0,
Sys: 0,
})
}
}
}
}()
}
func (d *DiagnosticClient) addOrUpdateSegmentIndexingState(upd diaglib.SnapshotIndexingStatistics) {
if d.syncStats.SnapshotIndexing.Segments == nil {
d.syncStats.SnapshotIndexing.Segments = []diaglib.SnapshotSegmentIndexingStatistics{}
}
for i := range upd.Segments {
found := false
for j := range d.syncStats.SnapshotIndexing.Segments {
if d.syncStats.SnapshotIndexing.Segments[j].SegmentName == upd.Segments[i].SegmentName {
d.syncStats.SnapshotIndexing.Segments[j].Percent = upd.Segments[i].Percent
d.syncStats.SnapshotIndexing.Segments[j].Alloc = upd.Segments[i].Alloc
d.syncStats.SnapshotIndexing.Segments[j].Sys = upd.Segments[i].Sys
found = true
break
}
}
if !found {
d.syncStats.SnapshotIndexing.Segments = append(d.syncStats.SnapshotIndexing.Segments, upd.Segments[i])
}
}
d.syncStats.SnapshotIndexing.TimeElapsed = upd.TimeElapsed
}
func (d *DiagnosticClient) runSyncStagesListListener() {
go func() {
ctx, ch, cancel := diaglib.Context[diaglib.SyncStagesList](context.Background(), 1)
defer cancel()
rootCtx, _ := common.RootContext()
diaglib.StartProviders(ctx, diaglib.TypeOf(diaglib.SyncStagesList{}), log.Root())
for {
select {
case <-rootCtx.Done():
cancel()
return
case info := <-ch:
d.syncStats.SyncStages.StagesList = info.Stages
return
}
}
}()
}
func (d *DiagnosticClient) runCurrentSyncStageListener() {
go func() {
ctx, ch, cancel := diaglib.Context[diaglib.CurrentSyncStage](context.Background(), 1)
defer cancel()
rootCtx, _ := common.RootContext()
diaglib.StartProviders(ctx, diaglib.TypeOf(diaglib.CurrentSyncStage{}), log.Root())
for {
select {
case <-rootCtx.Done():
cancel()
return
case info := <-ch:
d.syncStats.SyncStages.CurrentStage = info.Stage
if int(d.syncStats.SyncStages.CurrentStage) >= len(d.syncStats.SyncStages.StagesList) {
return
}
}
}
}()
}