From 85ade6b49a9af67116bb17a2a029b0fb9eb7c4b7 Mon Sep 17 00:00:00 2001 From: Mark Holt <135143369+mh0lt@users.noreply.github.com> Date: Fri, 1 Dec 2023 17:52:50 +0000 Subject: [PATCH] FIx outstanding know header==nil errors + reduce bor heimdall logging (#8878) This PR has fixes for a number of instances in the bor heimdall stage where nil headers are either ignored or inadvertently processed. It also has a demotion of milestone related logging messages to debug for missing blocks because the process is not at the head of the chain + a general reduction in periodic logging to 30 secs rather than 20 to reduce the log output on long runs. In addition there is a refactor of persistValidatorSets to perform validator set initiation in a seperate function. This is intended to clarify the operation of persistValidatorSets - which is till performing 2 actions, persisting the snapshot and then using it to check the header against synthesized validator set in the snapshot. --- consensus/bor/finality/whitelist.go | 10 +- consensus/bor/finality/whitelist_helpers.go | 14 +- eth/stagedsync/stage_bor_heimdall.go | 221 +++++++++++++------- eth/stagedsync/stage_execute.go | 2 +- 4 files changed, 162 insertions(+), 85 deletions(-) diff --git a/consensus/bor/finality/whitelist.go b/consensus/bor/finality/whitelist.go index cb21f15d2..76abfcc0d 100644 --- a/consensus/bor/finality/whitelist.go +++ b/consensus/bor/finality/whitelist.go @@ -120,7 +120,9 @@ func retryHeimdallHandler(fn heimdallHandler, config *config, tickerDuration tim cancel() if err != nil { - config.logger.Warn(fmt.Sprintf("[bor] unable to start the %s service - first run", fnName), "err", err) + if !errors.Is(err, errMissingBlocks) { + config.logger.Warn(fmt.Sprintf("[bor] unable to start the %s service - first run", fnName), "err", err) + } } ticker := time.NewTicker(tickerDuration) @@ -142,7 +144,11 @@ func retryHeimdallHandler(fn heimdallHandler, config *config, tickerDuration tim cancel() if err != nil { - config.logger.Warn(fmt.Sprintf("[bor] unable to handle %s", fnName), "err", err) + if errors.Is(err, errMissingBlocks) { + config.logger.Debug(fmt.Sprintf("[bor] unable to handle %s", fnName), "err", err) + } else { + config.logger.Warn(fmt.Sprintf("[bor] unable to handle %s", fnName), "err", err) + } } case <-config.closeCh: return diff --git a/consensus/bor/finality/whitelist_helpers.go b/consensus/bor/finality/whitelist_helpers.go index 42e1ecc90..54dbff496 100644 --- a/consensus/bor/finality/whitelist_helpers.go +++ b/consensus/bor/finality/whitelist_helpers.go @@ -37,17 +37,25 @@ func fetchWhitelistCheckpoint(ctx context.Context, heimdallClient heimdall.IHeim return blockNum, blockHash, errCheckpoint } - config.logger.Info("[bor.heimdall] Got new checkpoint", "start", checkpoint.StartBlock.Uint64(), "end", checkpoint.EndBlock.Uint64(), "rootHash", checkpoint.RootHash.String()) - // Verify if the checkpoint fetched can be added to the local whitelist entry or not // If verified, it returns the hash of the end block of the checkpoint. If not, // it will return appropriate error. hash, err := verifier.verify(ctx, config, checkpoint.StartBlock.Uint64(), checkpoint.EndBlock.Uint64(), checkpoint.RootHash.String()[2:], true) + if err != nil { - config.logger.Warn("[bor.heimdall] Failed to whitelist checkpoint", "err", err) + if errors.Is(err, errMissingBlocks) { + config.logger.Debug("[bor.heimdall] Got new checkpoint", "start", checkpoint.StartBlock.Uint64(), "end", checkpoint.EndBlock.Uint64(), "rootHash", checkpoint.RootHash.String()) + config.logger.Debug("[bor.heimdall] Failed to whitelist checkpoint", "err", err) + } else { + config.logger.Info("[bor.heimdall] Got new checkpoint", "start", checkpoint.StartBlock.Uint64(), "end", checkpoint.EndBlock.Uint64(), "rootHash", checkpoint.RootHash.String()) + config.logger.Warn("[bor.heimdall] Failed to whitelist checkpoint", "err", err) + } + return blockNum, blockHash, err } + config.logger.Info("[bor.heimdall] Got new checkpoint", "start", checkpoint.StartBlock.Uint64(), "end", checkpoint.EndBlock.Uint64(), "rootHash", checkpoint.RootHash.String()) + blockNum = checkpoint.EndBlock.Uint64() blockHash = common.HexToHash(hash) diff --git a/eth/stagedsync/stage_bor_heimdall.go b/eth/stagedsync/stage_bor_heimdall.go index d990fc952..fcce29163 100644 --- a/eth/stagedsync/stage_bor_heimdall.go +++ b/eth/stagedsync/stage_bor_heimdall.go @@ -233,7 +233,7 @@ func BorHeimdallForward( var eventRecords int var lastSpanId uint64 - logTimer := time.NewTicker(30 * time.Second) + logTimer := time.NewTicker(logInterval) defer logTimer.Stop() if endSpanID >= nextSpanId { @@ -288,14 +288,30 @@ func BorHeimdallForward( fetchTime += callTime } - if err = PersistValidatorSets(u, ctx, tx, cfg.blockReader, cfg.chainConfig.Bor, chain, blockNum, header.Hash(), recents, signatures, cfg.snapDb, logger, s.LogPrefix()); err != nil { - return fmt.Errorf("persistValidatorSets: %w", err) - } - if !mine && header != nil { - sprintLength := cfg.chainConfig.Bor.CalculateSprint(blockNum) - if blockNum > zerothSpanEnd && ((blockNum+1)%sprintLength == 0) { - if err = checkHeaderExtraData(u, ctx, chain, blockNum, header, cfg.chainConfig.Bor); err != nil { - return err + var snap *bor.Snapshot + + if header != nil { + snap = loadSnapshot(blockNum, header.Hash(), cfg.chainConfig.Bor, recents, signatures, cfg.snapDb, logger) + + if snap == nil && blockNum <= chain.FrozenBlocks() { + snap, err = initValidatorSets(ctx, snap, tx, cfg.blockReader, cfg.chainConfig.Bor, + chain, blockNum, recents, signatures, cfg.snapDb, logger, s.LogPrefix()) + + if err != nil { + return fmt.Errorf("can't initialise validator sets: %w", err) + } + } + + if err = persistValidatorSets(ctx, snap, u, tx, cfg.blockReader, cfg.chainConfig.Bor, chain, blockNum, header.Hash(), recents, signatures, cfg.snapDb, logger, s.LogPrefix()); err != nil { + return fmt.Errorf("can't persist validator sets: %w", err) + } + + if !mine { + sprintLength := cfg.chainConfig.Bor.CalculateSprint(blockNum) + if blockNum > zerothSpanEnd && ((blockNum+1)%sprintLength == 0) { + if err = checkHeaderExtraData(u, ctx, chain, blockNum, header, cfg.chainConfig.Bor); err != nil { + return err + } } } } @@ -378,6 +394,10 @@ func fetchAndWriteBorEvents( to time.Time ) + if header == nil { + return 0, 0, 0, fmt.Errorf("can't fetch events for nil header") + } + blockNum := header.Number.Uint64() if config.IsIndore(blockNum) { @@ -485,10 +505,29 @@ func fetchAndWriteSpans( return spanId, nil } -// Not used currently -func PersistValidatorSets( - u Unwinder, +func loadSnapshot(blockNum uint64, hash libcommon.Hash, config *chain.BorConfig, recents *lru.ARCCache[libcommon.Hash, *bor.Snapshot], + signatures *lru.ARCCache[libcommon.Hash, libcommon.Address], + snapDb kv.RwDB, + logger log.Logger) *bor.Snapshot { + + if s, ok := recents.Get(hash); ok { + return s + } + + if blockNum%snapshotPersistInterval == 0 { + if s, err := bor.LoadSnapshot(config, signatures, snapDb, hash); err == nil { + logger.Trace("Loaded snapshot from disk", "number", blockNum, "hash", hash) + return s + } + } + + return nil +} + +func persistValidatorSets( ctx context.Context, + snap *bor.Snapshot, + u Unwinder, tx kv.Tx, blockReader services.FullBlockReader, config *chain.BorConfig, @@ -504,7 +543,6 @@ func PersistValidatorSets( logEvery := time.NewTicker(logInterval) defer logEvery.Stop() // Search for a snapshot in memory or on disk for checkpoints - var snap *bor.Snapshot headers := make([]*types.Header, 0, 16) var parent *types.Header @@ -565,72 +603,6 @@ func PersistValidatorSets( default: } } - if snap == nil && chain != nil && blockNum <= chain.FrozenBlocks() { - // Special handling of the headers in the snapshot - zeroHeader := chain.GetHeaderByNumber(0) - if zeroHeader != nil { - // get checkpoint data - hash := zeroHeader.Hash() - - // get validators and current span - zeroSpanBytes, err := blockReader.Span(ctx, tx, 0) - if err != nil { - return err - } - var zeroSpan span.HeimdallSpan - if err = json.Unmarshal(zeroSpanBytes, &zeroSpan); err != nil { - return err - } - - // new snap shot - snap = bor.NewSnapshot(config, signatures, 0, hash, zeroSpan.ValidatorSet.Validators, logger) - if err := snap.Store(snapDb); err != nil { - return fmt.Errorf("snap.Store (0): %w", err) - } - logger.Info(fmt.Sprintf("[%s] Stored proposer snapshot to disk", logPrefix), "number", 0, "hash", hash) - g := errgroup.Group{} - g.SetLimit(estimate.AlmostAllCPUs()) - defer g.Wait() - - batchSize := 128 // must be < inmemorySignatures - initialHeaders := make([]*types.Header, 0, batchSize) - parentHeader := zeroHeader - for i := uint64(1); i <= blockNum; i++ { - header := chain.GetHeaderByNumber(i) - { - // `snap.apply` bottleneck - is recover of signer. - // to speedup: recover signer in background goroutines and save in `sigcache` - // `batchSize` < `inmemorySignatures`: means all current batch will fit in cache - and `snap.apply` will find it there. - g.Go(func() error { - if header == nil { - return nil - } - _, _ = bor.Ecrecover(header, signatures, config) - return nil - }) - } - if header == nil { - log.Debug(fmt.Sprintf("[%s] PersistValidatorSets nil header", logPrefix), "blockNum", i) - } - initialHeaders = append(initialHeaders, header) - if len(initialHeaders) == cap(initialHeaders) { - if snap, err = snap.Apply(parentHeader, initialHeaders, logger); err != nil { - return fmt.Errorf("snap.Apply (inside loop): %w", err) - } - parentHeader = initialHeaders[len(initialHeaders)-1] - initialHeaders = initialHeaders[:0] - } - select { - case <-logEvery.C: - logger.Info(fmt.Sprintf("[%s] Computing validator proposer prorities (forward)", logPrefix), "blockNum", i) - default: - } - } - if snap, err = snap.Apply(parentHeader, initialHeaders, logger); err != nil { - return fmt.Errorf("snap.Apply (outside loop): %w", err) - } - } - } // check if snapshot is nil if snap == nil { @@ -674,6 +646,97 @@ func PersistValidatorSets( return nil } +func initValidatorSets( + ctx context.Context, + snap *bor.Snapshot, + tx kv.Tx, + blockReader services.FullBlockReader, + config *chain.BorConfig, + chain consensus.ChainHeaderReader, + blockNum uint64, + recents *lru.ARCCache[libcommon.Hash, *bor.Snapshot], + signatures *lru.ARCCache[libcommon.Hash, libcommon.Address], + snapDb kv.RwDB, + logger log.Logger, + logPrefix string) (*bor.Snapshot, error) { + + logEvery := time.NewTicker(logInterval) + defer logEvery.Stop() + + if snap == nil { + // Special handling of the headers in the snapshot + zeroHeader := chain.GetHeaderByNumber(0) + if zeroHeader != nil { + // get checkpoint data + hash := zeroHeader.Hash() + + if zeroSnap := loadSnapshot(0, hash, config, recents, signatures, snapDb, logger); zeroSnap != nil { + return nil, nil + } + + // get validators and current span + zeroSpanBytes, err := blockReader.Span(ctx, tx, 0) + if err != nil { + return nil, err + } + var zeroSpan span.HeimdallSpan + if err = json.Unmarshal(zeroSpanBytes, &zeroSpan); err != nil { + return nil, err + } + + // new snap shot + snap = bor.NewSnapshot(config, signatures, 0, hash, zeroSpan.ValidatorSet.Validators, logger) + if err := snap.Store(snapDb); err != nil { + return nil, fmt.Errorf("snap.Store (0): %w", err) + } + logger.Info(fmt.Sprintf("[%s] Stored proposer snapshot to disk", logPrefix), "number", 0, "hash", hash) + g := errgroup.Group{} + g.SetLimit(estimate.AlmostAllCPUs()) + defer g.Wait() + + batchSize := 128 // must be < inmemorySignatures + initialHeaders := make([]*types.Header, 0, batchSize) + parentHeader := zeroHeader + for i := uint64(1); i <= blockNum; i++ { + header := chain.GetHeaderByNumber(i) + { + // `snap.apply` bottleneck - is recover of signer. + // to speedup: recover signer in background goroutines and save in `sigcache` + // `batchSize` < `inmemorySignatures`: means all current batch will fit in cache - and `snap.apply` will find it there. + g.Go(func() error { + if header == nil { + return nil + } + _, _ = bor.Ecrecover(header, signatures, config) + return nil + }) + } + if header == nil { + return nil, fmt.Errorf("missing header persisting validator sets: (inside loop at %d)", i) + } + initialHeaders = append(initialHeaders, header) + if len(initialHeaders) == cap(initialHeaders) { + if snap, err = snap.Apply(parentHeader, initialHeaders, logger); err != nil { + return nil, fmt.Errorf("snap.Apply (inside loop): %w", err) + } + parentHeader = initialHeaders[len(initialHeaders)-1] + initialHeaders = initialHeaders[:0] + } + select { + case <-logEvery.C: + logger.Info(fmt.Sprintf("[%s] Computing validator proposer prorities (forward)", logPrefix), "blockNum", i) + default: + } + } + if snap, err = snap.Apply(parentHeader, initialHeaders, logger); err != nil { + return nil, fmt.Errorf("snap.Apply (outside loop): %w", err) + } + } + } + + return snap, nil +} + func BorHeimdallUnwind(u *UnwindState, ctx context.Context, s *StageState, tx kv.RwTx, cfg BorHeimdallCfg) (err error) { if cfg.chainConfig.Bor == nil { return diff --git a/eth/stagedsync/stage_execute.go b/eth/stagedsync/stage_execute.go index f66222d33..8a512eba3 100644 --- a/eth/stagedsync/stage_execute.go +++ b/eth/stagedsync/stage_execute.go @@ -49,7 +49,7 @@ import ( ) const ( - logInterval = 20 * time.Second + logInterval = 30 * time.Second // stateStreamLimit - don't accumulate state changes if jump is bigger than this amount of blocks stateStreamLimit uint64 = 1_000