diff --git a/beacon-chain/sync/backfill/BUILD.bazel b/beacon-chain/sync/backfill/BUILD.bazel index a151acf9b..90e7dbbd4 100644 --- a/beacon-chain/sync/backfill/BUILD.bazel +++ b/beacon-chain/sync/backfill/BUILD.bazel @@ -6,6 +6,7 @@ go_library( "batch.go", "batcher.go", "blobs.go", + "log.go", "metrics.go", "pool.go", "service.go", diff --git a/beacon-chain/sync/backfill/batch.go b/beacon-chain/sync/backfill/batch.go index 1f5199e86..0691fffaf 100644 --- a/beacon-chain/sync/backfill/batch.go +++ b/beacon-chain/sync/backfill/batch.go @@ -12,7 +12,7 @@ import ( "github.com/prysmaticlabs/prysm/v5/consensus-types/blocks" "github.com/prysmaticlabs/prysm/v5/consensus-types/primitives" eth "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" - log "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus" ) // ErrChainBroken indicates a backfill batch can't be imported to the db because it is not known to be the ancestor @@ -73,9 +73,9 @@ type batch struct { bs *blobSync } -func (b batch) logFields() log.Fields { +func (b batch) logFields() logrus.Fields { return map[string]interface{}{ - "batch_id": b.id(), + "batchId": b.id(), "state": b.state.String(), "scheduled": b.scheduled.String(), "seq": b.seq, @@ -139,7 +139,7 @@ func (b batch) withResults(results verifiedROBlocks, bs *blobSync) batch { func (b batch) postBlobSync() batch { if b.blobsNeeded() > 0 { - log.WithFields(b.logFields()).WithField("blobs_missing", b.blobsNeeded()).Error("batch still missing blobs after downloading from peer") + log.WithFields(b.logFields()).WithField("blobsMissing", b.blobsNeeded()).Error("Batch still missing blobs after downloading from peer") b.bs = nil b.results = []blocks.ROBlock{} return b.withState(batchErrRetryable) @@ -153,14 +153,14 @@ func (b batch) withState(s batchState) batch { switch b.state { case batchErrRetryable: b.retries += 1 - log.WithFields(b.logFields()).Info("sequencing batch for retry") + log.WithFields(b.logFields()).Info("Sequencing batch for retry") case batchInit, batchNil: b.firstScheduled = b.scheduled } } if s == batchImportComplete { backfillBatchTimeRoundtrip.Observe(float64(time.Since(b.firstScheduled).Milliseconds())) - log.WithFields(b.logFields()).Debug("Backfill batch imported.") + log.WithFields(b.logFields()).Debug("Backfill batch imported") } b.state = s b.seq += 1 diff --git a/beacon-chain/sync/backfill/log.go b/beacon-chain/sync/backfill/log.go new file mode 100644 index 000000000..9d88c5f24 --- /dev/null +++ b/beacon-chain/sync/backfill/log.go @@ -0,0 +1,5 @@ +package backfill + +import "github.com/sirupsen/logrus" + +var log = logrus.WithField("prefix", "backfill") diff --git a/beacon-chain/sync/backfill/pool.go b/beacon-chain/sync/backfill/pool.go index b42309387..70f90414c 100644 --- a/beacon-chain/sync/backfill/pool.go +++ b/beacon-chain/sync/backfill/pool.go @@ -14,7 +14,6 @@ import ( "github.com/prysmaticlabs/prysm/v5/beacon-chain/sync" "github.com/prysmaticlabs/prysm/v5/beacon-chain/verification" "github.com/prysmaticlabs/prysm/v5/consensus-types/primitives" - log "github.com/sirupsen/logrus" ) type batchWorkerPool interface { diff --git a/beacon-chain/sync/backfill/service.go b/beacon-chain/sync/backfill/service.go index fbcd77182..ec4119719 100644 --- a/beacon-chain/sync/backfill/service.go +++ b/beacon-chain/sync/backfill/service.go @@ -17,7 +17,6 @@ import ( "github.com/prysmaticlabs/prysm/v5/proto/dbval" "github.com/prysmaticlabs/prysm/v5/runtime" "github.com/prysmaticlabs/prysm/v5/time/slots" - log "github.com/sirupsen/logrus" ) type Service struct { @@ -149,12 +148,12 @@ func (s *Service) initVerifier(ctx context.Context) (*verifier, sync.ContextByte } keys, err := cps.PublicKeys() if err != nil { - return nil, nil, errors.Wrap(err, "Unable to retrieve public keys for all validators in the origin state") + return nil, nil, errors.Wrap(err, "unable to retrieve public keys for all validators in the origin state") } vr := cps.GenesisValidatorsRoot() ctxMap, err := sync.ContextByteVersionsForValRoot(bytesutil.ToBytes32(vr)) if err != nil { - return nil, nil, errors.Wrapf(err, "unable to initialize context version map using genesis validator root = %#x", vr) + return nil, nil, errors.Wrapf(err, "unable to initialize context version map using genesis validator root %#x", vr) } v, err := newBackfillVerifier(vr, keys) return v, ctxMap, err @@ -164,10 +163,10 @@ func (s *Service) updateComplete() bool { b, err := s.pool.complete() if err != nil { if errors.Is(err, errEndSequence) { - log.WithField("backfill_slot", b.begin).Info("Backfill is complete.") + log.WithField("backfillSlot", b.begin).Info("Backfill is complete") return true } - log.WithError(err).Error("Backfill service received unhandled error from worker pool.") + log.WithError(err).Error("Backfill service received unhandled error from worker pool") return true } s.batchSeq.update(b) @@ -187,11 +186,11 @@ func (s *Service) importBatches(ctx context.Context) { for i := range importable { ib := importable[i] if len(ib.results) == 0 { - log.WithFields(ib.logFields()).Error("Batch with no results, skipping importer.") + log.WithFields(ib.logFields()).Error("Batch with no results, skipping importer") } _, err := s.batchImporter(ctx, current, ib, s.store) if err != nil { - log.WithError(err).WithFields(ib.logFields()).Debug("Backfill batch failed to import.") + log.WithError(err).WithFields(ib.logFields()).Debug("Backfill batch failed to import") s.downscore(ib) s.batchSeq.update(ib.withState(batchErrRetryable)) // If a batch fails, the subsequent batches are no longer considered importable. @@ -204,8 +203,8 @@ func (s *Service) importBatches(ctx context.Context) { nt := s.batchSeq.numTodo() log.WithField("imported", imported).WithField("importable", len(importable)). - WithField("batches_remaining", nt). - Info("Backfill batches processed.") + WithField("batchesRemaining", nt). + Info("Backfill batches processed") backfillRemainingBatches.Set(float64(nt)) } @@ -220,7 +219,7 @@ func (s *Service) scheduleTodos() { // and then we'll have the parent_root expected by 90 to ensure it matches the root for 89, // at which point we know we can process [80..90). if errors.Is(err, errMaxBatches) { - log.Debug("Backfill batches waiting for descendent batch to complete.") + log.Debug("Backfill batches waiting for descendent batch to complete") return } } @@ -232,17 +231,17 @@ func (s *Service) scheduleTodos() { // Start begins the runloop of backfill.Service in the current goroutine. func (s *Service) Start() { if !s.enabled { - log.Info("Backfill service not enabled.") + log.Info("Backfill service not enabled") return } ctx, cancel := context.WithCancel(s.ctx) defer func() { - log.Info("Backfill service is shutting down.") + log.Info("Backfill service is shutting down") cancel() }() clock, err := s.cw.WaitForClock(ctx) if err != nil { - log.WithError(err).Error("Backfill service failed to start while waiting for genesis data.") + log.WithError(err).Error("Backfill service failed to start while waiting for genesis data") return } s.clock = clock @@ -250,39 +249,39 @@ func (s *Service) Start() { s.newBlobVerifier = newBlobVerifierFromInitializer(v) if err != nil { - log.WithError(err).Error("Could not initialize blob verifier in backfill service.") + log.WithError(err).Error("Could not initialize blob verifier in backfill service") return } if s.store.isGenesisSync() { - log.Info("Backfill short-circuit; node synced from genesis.") + log.Info("Backfill short-circuit; node synced from genesis") return } status := s.store.status() // Exit early if there aren't going to be any batches to backfill. if primitives.Slot(status.LowSlot) <= s.ms(s.clock.CurrentSlot()) { - log.WithField("minimum_required_slot", s.ms(s.clock.CurrentSlot())). - WithField("backfill_lowest_slot", status.LowSlot). - Info("Exiting backfill service; minimum block retention slot > lowest backfilled block.") + log.WithField("minimumRequiredSlot", s.ms(s.clock.CurrentSlot())). + WithField("backfillLowestSlot", status.LowSlot). + Info("Exiting backfill service; minimum block retention slot > lowest backfilled block") return } s.verifier, s.ctxMap, err = s.initVerifier(ctx) if err != nil { - log.WithError(err).Error("Unable to initialize backfill verifier.") + log.WithError(err).Error("Unable to initialize backfill verifier") return } if s.initSyncWaiter != nil { - log.Info("Backfill service waiting for initial-sync to reach head before starting.") + log.Info("Backfill service waiting for initial-sync to reach head before starting") if err := s.initSyncWaiter(); err != nil { - log.WithError(err).Error("Error waiting for init-sync to complete.") + log.WithError(err).Error("Error waiting for init-sync to complete") return } } s.pool.spawn(ctx, s.nWorkers, clock, s.pa, s.verifier, s.ctxMap, s.newBlobVerifier, s.blobStore) s.batchSeq = newBatchSequencer(s.nWorkers, s.ms(s.clock.CurrentSlot()), primitives.Slot(status.LowSlot), primitives.Slot(s.batchSize)) if err = s.initBatches(); err != nil { - log.WithError(err).Error("Non-recoverable error in backfill service.") + log.WithError(err).Error("Non-recoverable error in backfill service") return } @@ -296,7 +295,7 @@ func (s *Service) Start() { s.importBatches(ctx) batchesWaiting.Set(float64(s.batchSeq.countWithState(batchImportable))) if err := s.batchSeq.moveMinimum(s.ms(s.clock.CurrentSlot())); err != nil { - log.WithError(err).Error("Non-recoverable error while adjusting backfill minimum slot.") + log.WithError(err).Error("Non-recoverable error while adjusting backfill minimum slot") } s.scheduleTodos() } diff --git a/beacon-chain/sync/backfill/status.go b/beacon-chain/sync/backfill/status.go index 073b0f310..99de1a06b 100644 --- a/beacon-chain/sync/backfill/status.go +++ b/beacon-chain/sync/backfill/status.go @@ -15,7 +15,7 @@ import ( "github.com/prysmaticlabs/prysm/v5/proto/dbval" ) -var errBatchDisconnected = errors.New("Highest block root in backfill batch doesn't match next parent_root") +var errBatchDisconnected = errors.New("highest block root in backfill batch doesn't match next parent_root") // NewUpdater correctly initializes a StatusUpdater value with the required database value. func NewUpdater(ctx context.Context, store BeaconDB) (*Store, error) { diff --git a/beacon-chain/sync/backfill/worker.go b/beacon-chain/sync/backfill/worker.go index f56bded84..405afb429 100644 --- a/beacon-chain/sync/backfill/worker.go +++ b/beacon-chain/sync/backfill/worker.go @@ -10,7 +10,6 @@ import ( "github.com/prysmaticlabs/prysm/v5/beacon-chain/startup" "github.com/prysmaticlabs/prysm/v5/beacon-chain/sync" "github.com/prysmaticlabs/prysm/v5/beacon-chain/verification" - log "github.com/sirupsen/logrus" ) type workerId int @@ -31,14 +30,14 @@ func (w *p2pWorker) run(ctx context.Context) { for { select { case b := <-w.todo: - log.WithFields(b.logFields()).WithField("backfill_worker", w.id).Debug("Backfill worker received batch.") + log.WithFields(b.logFields()).WithField("backfillWorker", w.id).Debug("Backfill worker received batch") if b.state == batchBlobSync { w.done <- w.handleBlobs(ctx, b) } else { w.done <- w.handleBlocks(ctx, b) } case <-ctx.Done(): - log.WithField("backfill_worker", w.id).Info("Backfill worker exiting after context canceled.") + log.WithField("backfillWorker", w.id).Info("Backfill worker exiting after context canceled") return } } @@ -73,7 +72,7 @@ func (w *p2pWorker) handleBlocks(ctx context.Context, b batch) batch { bdl += vb[i].SizeSSZ() } backfillBlocksApproximateBytes.Add(float64(bdl)) - log.WithFields(b.logFields()).WithField("dlbytes", bdl).Debug("backfill batch block bytes downloaded") + log.WithFields(b.logFields()).WithField("dlbytes", bdl).Debug("Backfill batch block bytes downloaded") bs, err := newBlobSync(cs, vb, &blobSyncConfig{retentionStart: blobRetentionStart, nbv: w.nbv, store: w.bfs}) if err != nil { return b.withRetryableError(err) @@ -97,7 +96,7 @@ func (w *p2pWorker) handleBlobs(ctx context.Context, b batch) batch { // All blobs are the same size, so we can compute 1 and use it for all in the batch. sz := blobs[0].SizeSSZ() * len(blobs) backfillBlobsApproximateBytes.Add(float64(sz)) - log.WithFields(b.logFields()).WithField("dlbytes", sz).Debug("backfill batch blob bytes downloaded") + log.WithFields(b.logFields()).WithField("dlbytes", sz).Debug("Backfill batch blob bytes downloaded") } return b.postBlobSync() } diff --git a/cmd/beacon-chain/sync/backfill/flags/flags.go b/cmd/beacon-chain/sync/backfill/flags/flags.go index 0d7b9d291..37037c6ee 100644 --- a/cmd/beacon-chain/sync/backfill/flags/flags.go +++ b/cmd/beacon-chain/sync/backfill/flags/flags.go @@ -12,7 +12,7 @@ var ( // This flag will be removed onced backfill is enabled by default. EnableExperimentalBackfill = &cli.BoolFlag{ Name: "enable-experimental-backfill", - Usage: "Backfill is still experimental at this time." + + Usage: "Backfill is still experimental at this time. " + "It will only be enabled if this flag is specified and the node was started using checkpoint sync.", } // BackfillBatchSize allows users to tune block backfill request sizes to maximize network utilization @@ -21,7 +21,7 @@ var ( Name: backfillBatchSizeName, Usage: "Number of blocks per backfill batch. " + "A larger number will request more blocks at once from peers, but also consume more system memory to " + - "hold batches in memory during processing. This has a multiplicative effect with " + backfillWorkerCountName, + "hold batches in memory during processing. This has a multiplicative effect with " + backfillWorkerCountName + ".", Value: 64, } // BackfillWorkerCount allows users to tune the number of concurrent backfill batches to download, to maximize @@ -30,9 +30,9 @@ var ( Name: backfillWorkerCountName, Usage: "Number of concurrent backfill batch requests. " + "A larger number will better utilize network resources, up to a system-dependent limit, but will also " + - "consume more system memory to hold batches in memory during processing. Multiply by backfill-batch-size and " + + "consume more system memory to hold batches in memory during processing. Multiply by " + backfillBatchSizeName + " and " + "average block size (~2MB before deneb) to find the right number for your system. " + - "This has a multiplicatice effect with " + backfillBatchSizeName, + "This has a multiplicative effect with " + backfillBatchSizeName + ".", Value: 2, } )