mirror of
https://gitlab.com/pulsechaincom/prysm-pulse.git
synced 2024-12-22 03:30:35 +00:00
Normalize backfill logs/errors (#13642)
* Normalize backfill logs * improve flag desc * review
This commit is contained in:
parent
f795e09ecf
commit
7a9608ea20
@ -6,6 +6,7 @@ go_library(
|
||||
"batch.go",
|
||||
"batcher.go",
|
||||
"blobs.go",
|
||||
"log.go",
|
||||
"metrics.go",
|
||||
"pool.go",
|
||||
"service.go",
|
||||
|
@ -12,7 +12,7 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
|
||||
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
|
||||
eth "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// ErrChainBroken indicates a backfill batch can't be imported to the db because it is not known to be the ancestor
|
||||
@ -73,9 +73,9 @@ type batch struct {
|
||||
bs *blobSync
|
||||
}
|
||||
|
||||
func (b batch) logFields() log.Fields {
|
||||
func (b batch) logFields() logrus.Fields {
|
||||
return map[string]interface{}{
|
||||
"batch_id": b.id(),
|
||||
"batchId": b.id(),
|
||||
"state": b.state.String(),
|
||||
"scheduled": b.scheduled.String(),
|
||||
"seq": b.seq,
|
||||
@ -139,7 +139,7 @@ func (b batch) withResults(results verifiedROBlocks, bs *blobSync) batch {
|
||||
|
||||
func (b batch) postBlobSync() batch {
|
||||
if b.blobsNeeded() > 0 {
|
||||
log.WithFields(b.logFields()).WithField("blobs_missing", b.blobsNeeded()).Error("batch still missing blobs after downloading from peer")
|
||||
log.WithFields(b.logFields()).WithField("blobsMissing", b.blobsNeeded()).Error("Batch still missing blobs after downloading from peer")
|
||||
b.bs = nil
|
||||
b.results = []blocks.ROBlock{}
|
||||
return b.withState(batchErrRetryable)
|
||||
@ -153,14 +153,14 @@ func (b batch) withState(s batchState) batch {
|
||||
switch b.state {
|
||||
case batchErrRetryable:
|
||||
b.retries += 1
|
||||
log.WithFields(b.logFields()).Info("sequencing batch for retry")
|
||||
log.WithFields(b.logFields()).Info("Sequencing batch for retry")
|
||||
case batchInit, batchNil:
|
||||
b.firstScheduled = b.scheduled
|
||||
}
|
||||
}
|
||||
if s == batchImportComplete {
|
||||
backfillBatchTimeRoundtrip.Observe(float64(time.Since(b.firstScheduled).Milliseconds()))
|
||||
log.WithFields(b.logFields()).Debug("Backfill batch imported.")
|
||||
log.WithFields(b.logFields()).Debug("Backfill batch imported")
|
||||
}
|
||||
b.state = s
|
||||
b.seq += 1
|
||||
|
5
beacon-chain/sync/backfill/log.go
Normal file
5
beacon-chain/sync/backfill/log.go
Normal file
@ -0,0 +1,5 @@
|
||||
package backfill
|
||||
|
||||
import "github.com/sirupsen/logrus"
|
||||
|
||||
var log = logrus.WithField("prefix", "backfill")
|
@ -14,7 +14,6 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/sync"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
|
||||
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type batchWorkerPool interface {
|
||||
|
@ -17,7 +17,6 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/v5/proto/dbval"
|
||||
"github.com/prysmaticlabs/prysm/v5/runtime"
|
||||
"github.com/prysmaticlabs/prysm/v5/time/slots"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type Service struct {
|
||||
@ -149,12 +148,12 @@ func (s *Service) initVerifier(ctx context.Context) (*verifier, sync.ContextByte
|
||||
}
|
||||
keys, err := cps.PublicKeys()
|
||||
if err != nil {
|
||||
return nil, nil, errors.Wrap(err, "Unable to retrieve public keys for all validators in the origin state")
|
||||
return nil, nil, errors.Wrap(err, "unable to retrieve public keys for all validators in the origin state")
|
||||
}
|
||||
vr := cps.GenesisValidatorsRoot()
|
||||
ctxMap, err := sync.ContextByteVersionsForValRoot(bytesutil.ToBytes32(vr))
|
||||
if err != nil {
|
||||
return nil, nil, errors.Wrapf(err, "unable to initialize context version map using genesis validator root = %#x", vr)
|
||||
return nil, nil, errors.Wrapf(err, "unable to initialize context version map using genesis validator root %#x", vr)
|
||||
}
|
||||
v, err := newBackfillVerifier(vr, keys)
|
||||
return v, ctxMap, err
|
||||
@ -164,10 +163,10 @@ func (s *Service) updateComplete() bool {
|
||||
b, err := s.pool.complete()
|
||||
if err != nil {
|
||||
if errors.Is(err, errEndSequence) {
|
||||
log.WithField("backfill_slot", b.begin).Info("Backfill is complete.")
|
||||
log.WithField("backfillSlot", b.begin).Info("Backfill is complete")
|
||||
return true
|
||||
}
|
||||
log.WithError(err).Error("Backfill service received unhandled error from worker pool.")
|
||||
log.WithError(err).Error("Backfill service received unhandled error from worker pool")
|
||||
return true
|
||||
}
|
||||
s.batchSeq.update(b)
|
||||
@ -187,11 +186,11 @@ func (s *Service) importBatches(ctx context.Context) {
|
||||
for i := range importable {
|
||||
ib := importable[i]
|
||||
if len(ib.results) == 0 {
|
||||
log.WithFields(ib.logFields()).Error("Batch with no results, skipping importer.")
|
||||
log.WithFields(ib.logFields()).Error("Batch with no results, skipping importer")
|
||||
}
|
||||
_, err := s.batchImporter(ctx, current, ib, s.store)
|
||||
if err != nil {
|
||||
log.WithError(err).WithFields(ib.logFields()).Debug("Backfill batch failed to import.")
|
||||
log.WithError(err).WithFields(ib.logFields()).Debug("Backfill batch failed to import")
|
||||
s.downscore(ib)
|
||||
s.batchSeq.update(ib.withState(batchErrRetryable))
|
||||
// If a batch fails, the subsequent batches are no longer considered importable.
|
||||
@ -204,8 +203,8 @@ func (s *Service) importBatches(ctx context.Context) {
|
||||
|
||||
nt := s.batchSeq.numTodo()
|
||||
log.WithField("imported", imported).WithField("importable", len(importable)).
|
||||
WithField("batches_remaining", nt).
|
||||
Info("Backfill batches processed.")
|
||||
WithField("batchesRemaining", nt).
|
||||
Info("Backfill batches processed")
|
||||
|
||||
backfillRemainingBatches.Set(float64(nt))
|
||||
}
|
||||
@ -220,7 +219,7 @@ func (s *Service) scheduleTodos() {
|
||||
// and then we'll have the parent_root expected by 90 to ensure it matches the root for 89,
|
||||
// at which point we know we can process [80..90).
|
||||
if errors.Is(err, errMaxBatches) {
|
||||
log.Debug("Backfill batches waiting for descendent batch to complete.")
|
||||
log.Debug("Backfill batches waiting for descendent batch to complete")
|
||||
return
|
||||
}
|
||||
}
|
||||
@ -232,17 +231,17 @@ func (s *Service) scheduleTodos() {
|
||||
// Start begins the runloop of backfill.Service in the current goroutine.
|
||||
func (s *Service) Start() {
|
||||
if !s.enabled {
|
||||
log.Info("Backfill service not enabled.")
|
||||
log.Info("Backfill service not enabled")
|
||||
return
|
||||
}
|
||||
ctx, cancel := context.WithCancel(s.ctx)
|
||||
defer func() {
|
||||
log.Info("Backfill service is shutting down.")
|
||||
log.Info("Backfill service is shutting down")
|
||||
cancel()
|
||||
}()
|
||||
clock, err := s.cw.WaitForClock(ctx)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Backfill service failed to start while waiting for genesis data.")
|
||||
log.WithError(err).Error("Backfill service failed to start while waiting for genesis data")
|
||||
return
|
||||
}
|
||||
s.clock = clock
|
||||
@ -250,39 +249,39 @@ func (s *Service) Start() {
|
||||
s.newBlobVerifier = newBlobVerifierFromInitializer(v)
|
||||
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Could not initialize blob verifier in backfill service.")
|
||||
log.WithError(err).Error("Could not initialize blob verifier in backfill service")
|
||||
return
|
||||
}
|
||||
|
||||
if s.store.isGenesisSync() {
|
||||
log.Info("Backfill short-circuit; node synced from genesis.")
|
||||
log.Info("Backfill short-circuit; node synced from genesis")
|
||||
return
|
||||
}
|
||||
status := s.store.status()
|
||||
// Exit early if there aren't going to be any batches to backfill.
|
||||
if primitives.Slot(status.LowSlot) <= s.ms(s.clock.CurrentSlot()) {
|
||||
log.WithField("minimum_required_slot", s.ms(s.clock.CurrentSlot())).
|
||||
WithField("backfill_lowest_slot", status.LowSlot).
|
||||
Info("Exiting backfill service; minimum block retention slot > lowest backfilled block.")
|
||||
log.WithField("minimumRequiredSlot", s.ms(s.clock.CurrentSlot())).
|
||||
WithField("backfillLowestSlot", status.LowSlot).
|
||||
Info("Exiting backfill service; minimum block retention slot > lowest backfilled block")
|
||||
return
|
||||
}
|
||||
s.verifier, s.ctxMap, err = s.initVerifier(ctx)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Unable to initialize backfill verifier.")
|
||||
log.WithError(err).Error("Unable to initialize backfill verifier")
|
||||
return
|
||||
}
|
||||
|
||||
if s.initSyncWaiter != nil {
|
||||
log.Info("Backfill service waiting for initial-sync to reach head before starting.")
|
||||
log.Info("Backfill service waiting for initial-sync to reach head before starting")
|
||||
if err := s.initSyncWaiter(); err != nil {
|
||||
log.WithError(err).Error("Error waiting for init-sync to complete.")
|
||||
log.WithError(err).Error("Error waiting for init-sync to complete")
|
||||
return
|
||||
}
|
||||
}
|
||||
s.pool.spawn(ctx, s.nWorkers, clock, s.pa, s.verifier, s.ctxMap, s.newBlobVerifier, s.blobStore)
|
||||
s.batchSeq = newBatchSequencer(s.nWorkers, s.ms(s.clock.CurrentSlot()), primitives.Slot(status.LowSlot), primitives.Slot(s.batchSize))
|
||||
if err = s.initBatches(); err != nil {
|
||||
log.WithError(err).Error("Non-recoverable error in backfill service.")
|
||||
log.WithError(err).Error("Non-recoverable error in backfill service")
|
||||
return
|
||||
}
|
||||
|
||||
@ -296,7 +295,7 @@ func (s *Service) Start() {
|
||||
s.importBatches(ctx)
|
||||
batchesWaiting.Set(float64(s.batchSeq.countWithState(batchImportable)))
|
||||
if err := s.batchSeq.moveMinimum(s.ms(s.clock.CurrentSlot())); err != nil {
|
||||
log.WithError(err).Error("Non-recoverable error while adjusting backfill minimum slot.")
|
||||
log.WithError(err).Error("Non-recoverable error while adjusting backfill minimum slot")
|
||||
}
|
||||
s.scheduleTodos()
|
||||
}
|
||||
|
@ -15,7 +15,7 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/v5/proto/dbval"
|
||||
)
|
||||
|
||||
var errBatchDisconnected = errors.New("Highest block root in backfill batch doesn't match next parent_root")
|
||||
var errBatchDisconnected = errors.New("highest block root in backfill batch doesn't match next parent_root")
|
||||
|
||||
// NewUpdater correctly initializes a StatusUpdater value with the required database value.
|
||||
func NewUpdater(ctx context.Context, store BeaconDB) (*Store, error) {
|
||||
|
@ -10,7 +10,6 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/startup"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/sync"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type workerId int
|
||||
@ -31,14 +30,14 @@ func (w *p2pWorker) run(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
case b := <-w.todo:
|
||||
log.WithFields(b.logFields()).WithField("backfill_worker", w.id).Debug("Backfill worker received batch.")
|
||||
log.WithFields(b.logFields()).WithField("backfillWorker", w.id).Debug("Backfill worker received batch")
|
||||
if b.state == batchBlobSync {
|
||||
w.done <- w.handleBlobs(ctx, b)
|
||||
} else {
|
||||
w.done <- w.handleBlocks(ctx, b)
|
||||
}
|
||||
case <-ctx.Done():
|
||||
log.WithField("backfill_worker", w.id).Info("Backfill worker exiting after context canceled.")
|
||||
log.WithField("backfillWorker", w.id).Info("Backfill worker exiting after context canceled")
|
||||
return
|
||||
}
|
||||
}
|
||||
@ -73,7 +72,7 @@ func (w *p2pWorker) handleBlocks(ctx context.Context, b batch) batch {
|
||||
bdl += vb[i].SizeSSZ()
|
||||
}
|
||||
backfillBlocksApproximateBytes.Add(float64(bdl))
|
||||
log.WithFields(b.logFields()).WithField("dlbytes", bdl).Debug("backfill batch block bytes downloaded")
|
||||
log.WithFields(b.logFields()).WithField("dlbytes", bdl).Debug("Backfill batch block bytes downloaded")
|
||||
bs, err := newBlobSync(cs, vb, &blobSyncConfig{retentionStart: blobRetentionStart, nbv: w.nbv, store: w.bfs})
|
||||
if err != nil {
|
||||
return b.withRetryableError(err)
|
||||
@ -97,7 +96,7 @@ func (w *p2pWorker) handleBlobs(ctx context.Context, b batch) batch {
|
||||
// All blobs are the same size, so we can compute 1 and use it for all in the batch.
|
||||
sz := blobs[0].SizeSSZ() * len(blobs)
|
||||
backfillBlobsApproximateBytes.Add(float64(sz))
|
||||
log.WithFields(b.logFields()).WithField("dlbytes", sz).Debug("backfill batch blob bytes downloaded")
|
||||
log.WithFields(b.logFields()).WithField("dlbytes", sz).Debug("Backfill batch blob bytes downloaded")
|
||||
}
|
||||
return b.postBlobSync()
|
||||
}
|
||||
|
@ -12,7 +12,7 @@ var (
|
||||
// This flag will be removed onced backfill is enabled by default.
|
||||
EnableExperimentalBackfill = &cli.BoolFlag{
|
||||
Name: "enable-experimental-backfill",
|
||||
Usage: "Backfill is still experimental at this time." +
|
||||
Usage: "Backfill is still experimental at this time. " +
|
||||
"It will only be enabled if this flag is specified and the node was started using checkpoint sync.",
|
||||
}
|
||||
// BackfillBatchSize allows users to tune block backfill request sizes to maximize network utilization
|
||||
@ -21,7 +21,7 @@ var (
|
||||
Name: backfillBatchSizeName,
|
||||
Usage: "Number of blocks per backfill batch. " +
|
||||
"A larger number will request more blocks at once from peers, but also consume more system memory to " +
|
||||
"hold batches in memory during processing. This has a multiplicative effect with " + backfillWorkerCountName,
|
||||
"hold batches in memory during processing. This has a multiplicative effect with " + backfillWorkerCountName + ".",
|
||||
Value: 64,
|
||||
}
|
||||
// BackfillWorkerCount allows users to tune the number of concurrent backfill batches to download, to maximize
|
||||
@ -30,9 +30,9 @@ var (
|
||||
Name: backfillWorkerCountName,
|
||||
Usage: "Number of concurrent backfill batch requests. " +
|
||||
"A larger number will better utilize network resources, up to a system-dependent limit, but will also " +
|
||||
"consume more system memory to hold batches in memory during processing. Multiply by backfill-batch-size and " +
|
||||
"consume more system memory to hold batches in memory during processing. Multiply by " + backfillBatchSizeName + " and " +
|
||||
"average block size (~2MB before deneb) to find the right number for your system. " +
|
||||
"This has a multiplicatice effect with " + backfillBatchSizeName,
|
||||
"This has a multiplicative effect with " + backfillBatchSizeName + ".",
|
||||
Value: 2,
|
||||
}
|
||||
)
|
||||
|
Loading…
Reference in New Issue
Block a user