mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2025-01-03 01:27:38 +00:00
FIx outstanding know header==nil errors + reduce bor heimdall logging (#8878)
This PR has fixes for a number of instances in the bor heimdall stage where nil headers are either ignored or inadvertently processed. It also has a demotion of milestone related logging messages to debug for missing blocks because the process is not at the head of the chain + a general reduction in periodic logging to 30 secs rather than 20 to reduce the log output on long runs. In addition there is a refactor of persistValidatorSets to perform validator set initiation in a seperate function. This is intended to clarify the operation of persistValidatorSets - which is till performing 2 actions, persisting the snapshot and then using it to check the header against synthesized validator set in the snapshot.
This commit is contained in:
parent
ad48ecdcbb
commit
85ade6b49a
@ -120,8 +120,10 @@ func retryHeimdallHandler(fn heimdallHandler, config *config, tickerDuration tim
|
||||
cancel()
|
||||
|
||||
if err != nil {
|
||||
if !errors.Is(err, errMissingBlocks) {
|
||||
config.logger.Warn(fmt.Sprintf("[bor] unable to start the %s service - first run", fnName), "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
ticker := time.NewTicker(tickerDuration)
|
||||
defer ticker.Stop()
|
||||
@ -142,8 +144,12 @@ func retryHeimdallHandler(fn heimdallHandler, config *config, tickerDuration tim
|
||||
cancel()
|
||||
|
||||
if err != nil {
|
||||
if errors.Is(err, errMissingBlocks) {
|
||||
config.logger.Debug(fmt.Sprintf("[bor] unable to handle %s", fnName), "err", err)
|
||||
} else {
|
||||
config.logger.Warn(fmt.Sprintf("[bor] unable to handle %s", fnName), "err", err)
|
||||
}
|
||||
}
|
||||
case <-config.closeCh:
|
||||
return
|
||||
}
|
||||
|
@ -37,17 +37,25 @@ func fetchWhitelistCheckpoint(ctx context.Context, heimdallClient heimdall.IHeim
|
||||
return blockNum, blockHash, errCheckpoint
|
||||
}
|
||||
|
||||
config.logger.Info("[bor.heimdall] Got new checkpoint", "start", checkpoint.StartBlock.Uint64(), "end", checkpoint.EndBlock.Uint64(), "rootHash", checkpoint.RootHash.String())
|
||||
|
||||
// Verify if the checkpoint fetched can be added to the local whitelist entry or not
|
||||
// If verified, it returns the hash of the end block of the checkpoint. If not,
|
||||
// it will return appropriate error.
|
||||
hash, err := verifier.verify(ctx, config, checkpoint.StartBlock.Uint64(), checkpoint.EndBlock.Uint64(), checkpoint.RootHash.String()[2:], true)
|
||||
|
||||
if err != nil {
|
||||
if errors.Is(err, errMissingBlocks) {
|
||||
config.logger.Debug("[bor.heimdall] Got new checkpoint", "start", checkpoint.StartBlock.Uint64(), "end", checkpoint.EndBlock.Uint64(), "rootHash", checkpoint.RootHash.String())
|
||||
config.logger.Debug("[bor.heimdall] Failed to whitelist checkpoint", "err", err)
|
||||
} else {
|
||||
config.logger.Info("[bor.heimdall] Got new checkpoint", "start", checkpoint.StartBlock.Uint64(), "end", checkpoint.EndBlock.Uint64(), "rootHash", checkpoint.RootHash.String())
|
||||
config.logger.Warn("[bor.heimdall] Failed to whitelist checkpoint", "err", err)
|
||||
}
|
||||
|
||||
return blockNum, blockHash, err
|
||||
}
|
||||
|
||||
config.logger.Info("[bor.heimdall] Got new checkpoint", "start", checkpoint.StartBlock.Uint64(), "end", checkpoint.EndBlock.Uint64(), "rootHash", checkpoint.RootHash.String())
|
||||
|
||||
blockNum = checkpoint.EndBlock.Uint64()
|
||||
blockHash = common.HexToHash(hash)
|
||||
|
||||
|
@ -233,7 +233,7 @@ func BorHeimdallForward(
|
||||
var eventRecords int
|
||||
var lastSpanId uint64
|
||||
|
||||
logTimer := time.NewTicker(30 * time.Second)
|
||||
logTimer := time.NewTicker(logInterval)
|
||||
defer logTimer.Stop()
|
||||
|
||||
if endSpanID >= nextSpanId {
|
||||
@ -288,10 +288,25 @@ func BorHeimdallForward(
|
||||
fetchTime += callTime
|
||||
}
|
||||
|
||||
if err = PersistValidatorSets(u, ctx, tx, cfg.blockReader, cfg.chainConfig.Bor, chain, blockNum, header.Hash(), recents, signatures, cfg.snapDb, logger, s.LogPrefix()); err != nil {
|
||||
return fmt.Errorf("persistValidatorSets: %w", err)
|
||||
var snap *bor.Snapshot
|
||||
|
||||
if header != nil {
|
||||
snap = loadSnapshot(blockNum, header.Hash(), cfg.chainConfig.Bor, recents, signatures, cfg.snapDb, logger)
|
||||
|
||||
if snap == nil && blockNum <= chain.FrozenBlocks() {
|
||||
snap, err = initValidatorSets(ctx, snap, tx, cfg.blockReader, cfg.chainConfig.Bor,
|
||||
chain, blockNum, recents, signatures, cfg.snapDb, logger, s.LogPrefix())
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't initialise validator sets: %w", err)
|
||||
}
|
||||
if !mine && header != nil {
|
||||
}
|
||||
|
||||
if err = persistValidatorSets(ctx, snap, u, tx, cfg.blockReader, cfg.chainConfig.Bor, chain, blockNum, header.Hash(), recents, signatures, cfg.snapDb, logger, s.LogPrefix()); err != nil {
|
||||
return fmt.Errorf("can't persist validator sets: %w", err)
|
||||
}
|
||||
|
||||
if !mine {
|
||||
sprintLength := cfg.chainConfig.Bor.CalculateSprint(blockNum)
|
||||
if blockNum > zerothSpanEnd && ((blockNum+1)%sprintLength == 0) {
|
||||
if err = checkHeaderExtraData(u, ctx, chain, blockNum, header, cfg.chainConfig.Bor); err != nil {
|
||||
@ -300,6 +315,7 @@ func BorHeimdallForward(
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err = s.Update(tx, headNumber); err != nil {
|
||||
return err
|
||||
@ -378,6 +394,10 @@ func fetchAndWriteBorEvents(
|
||||
to time.Time
|
||||
)
|
||||
|
||||
if header == nil {
|
||||
return 0, 0, 0, fmt.Errorf("can't fetch events for nil header")
|
||||
}
|
||||
|
||||
blockNum := header.Number.Uint64()
|
||||
|
||||
if config.IsIndore(blockNum) {
|
||||
@ -485,10 +505,29 @@ func fetchAndWriteSpans(
|
||||
return spanId, nil
|
||||
}
|
||||
|
||||
// Not used currently
|
||||
func PersistValidatorSets(
|
||||
u Unwinder,
|
||||
func loadSnapshot(blockNum uint64, hash libcommon.Hash, config *chain.BorConfig, recents *lru.ARCCache[libcommon.Hash, *bor.Snapshot],
|
||||
signatures *lru.ARCCache[libcommon.Hash, libcommon.Address],
|
||||
snapDb kv.RwDB,
|
||||
logger log.Logger) *bor.Snapshot {
|
||||
|
||||
if s, ok := recents.Get(hash); ok {
|
||||
return s
|
||||
}
|
||||
|
||||
if blockNum%snapshotPersistInterval == 0 {
|
||||
if s, err := bor.LoadSnapshot(config, signatures, snapDb, hash); err == nil {
|
||||
logger.Trace("Loaded snapshot from disk", "number", blockNum, "hash", hash)
|
||||
return s
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func persistValidatorSets(
|
||||
ctx context.Context,
|
||||
snap *bor.Snapshot,
|
||||
u Unwinder,
|
||||
tx kv.Tx,
|
||||
blockReader services.FullBlockReader,
|
||||
config *chain.BorConfig,
|
||||
@ -504,7 +543,6 @@ func PersistValidatorSets(
|
||||
logEvery := time.NewTicker(logInterval)
|
||||
defer logEvery.Stop()
|
||||
// Search for a snapshot in memory or on disk for checkpoints
|
||||
var snap *bor.Snapshot
|
||||
|
||||
headers := make([]*types.Header, 0, 16)
|
||||
var parent *types.Header
|
||||
@ -565,72 +603,6 @@ func PersistValidatorSets(
|
||||
default:
|
||||
}
|
||||
}
|
||||
if snap == nil && chain != nil && blockNum <= chain.FrozenBlocks() {
|
||||
// Special handling of the headers in the snapshot
|
||||
zeroHeader := chain.GetHeaderByNumber(0)
|
||||
if zeroHeader != nil {
|
||||
// get checkpoint data
|
||||
hash := zeroHeader.Hash()
|
||||
|
||||
// get validators and current span
|
||||
zeroSpanBytes, err := blockReader.Span(ctx, tx, 0)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var zeroSpan span.HeimdallSpan
|
||||
if err = json.Unmarshal(zeroSpanBytes, &zeroSpan); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// new snap shot
|
||||
snap = bor.NewSnapshot(config, signatures, 0, hash, zeroSpan.ValidatorSet.Validators, logger)
|
||||
if err := snap.Store(snapDb); err != nil {
|
||||
return fmt.Errorf("snap.Store (0): %w", err)
|
||||
}
|
||||
logger.Info(fmt.Sprintf("[%s] Stored proposer snapshot to disk", logPrefix), "number", 0, "hash", hash)
|
||||
g := errgroup.Group{}
|
||||
g.SetLimit(estimate.AlmostAllCPUs())
|
||||
defer g.Wait()
|
||||
|
||||
batchSize := 128 // must be < inmemorySignatures
|
||||
initialHeaders := make([]*types.Header, 0, batchSize)
|
||||
parentHeader := zeroHeader
|
||||
for i := uint64(1); i <= blockNum; i++ {
|
||||
header := chain.GetHeaderByNumber(i)
|
||||
{
|
||||
// `snap.apply` bottleneck - is recover of signer.
|
||||
// to speedup: recover signer in background goroutines and save in `sigcache`
|
||||
// `batchSize` < `inmemorySignatures`: means all current batch will fit in cache - and `snap.apply` will find it there.
|
||||
g.Go(func() error {
|
||||
if header == nil {
|
||||
return nil
|
||||
}
|
||||
_, _ = bor.Ecrecover(header, signatures, config)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
if header == nil {
|
||||
log.Debug(fmt.Sprintf("[%s] PersistValidatorSets nil header", logPrefix), "blockNum", i)
|
||||
}
|
||||
initialHeaders = append(initialHeaders, header)
|
||||
if len(initialHeaders) == cap(initialHeaders) {
|
||||
if snap, err = snap.Apply(parentHeader, initialHeaders, logger); err != nil {
|
||||
return fmt.Errorf("snap.Apply (inside loop): %w", err)
|
||||
}
|
||||
parentHeader = initialHeaders[len(initialHeaders)-1]
|
||||
initialHeaders = initialHeaders[:0]
|
||||
}
|
||||
select {
|
||||
case <-logEvery.C:
|
||||
logger.Info(fmt.Sprintf("[%s] Computing validator proposer prorities (forward)", logPrefix), "blockNum", i)
|
||||
default:
|
||||
}
|
||||
}
|
||||
if snap, err = snap.Apply(parentHeader, initialHeaders, logger); err != nil {
|
||||
return fmt.Errorf("snap.Apply (outside loop): %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// check if snapshot is nil
|
||||
if snap == nil {
|
||||
@ -674,6 +646,97 @@ func PersistValidatorSets(
|
||||
return nil
|
||||
}
|
||||
|
||||
func initValidatorSets(
|
||||
ctx context.Context,
|
||||
snap *bor.Snapshot,
|
||||
tx kv.Tx,
|
||||
blockReader services.FullBlockReader,
|
||||
config *chain.BorConfig,
|
||||
chain consensus.ChainHeaderReader,
|
||||
blockNum uint64,
|
||||
recents *lru.ARCCache[libcommon.Hash, *bor.Snapshot],
|
||||
signatures *lru.ARCCache[libcommon.Hash, libcommon.Address],
|
||||
snapDb kv.RwDB,
|
||||
logger log.Logger,
|
||||
logPrefix string) (*bor.Snapshot, error) {
|
||||
|
||||
logEvery := time.NewTicker(logInterval)
|
||||
defer logEvery.Stop()
|
||||
|
||||
if snap == nil {
|
||||
// Special handling of the headers in the snapshot
|
||||
zeroHeader := chain.GetHeaderByNumber(0)
|
||||
if zeroHeader != nil {
|
||||
// get checkpoint data
|
||||
hash := zeroHeader.Hash()
|
||||
|
||||
if zeroSnap := loadSnapshot(0, hash, config, recents, signatures, snapDb, logger); zeroSnap != nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// get validators and current span
|
||||
zeroSpanBytes, err := blockReader.Span(ctx, tx, 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var zeroSpan span.HeimdallSpan
|
||||
if err = json.Unmarshal(zeroSpanBytes, &zeroSpan); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// new snap shot
|
||||
snap = bor.NewSnapshot(config, signatures, 0, hash, zeroSpan.ValidatorSet.Validators, logger)
|
||||
if err := snap.Store(snapDb); err != nil {
|
||||
return nil, fmt.Errorf("snap.Store (0): %w", err)
|
||||
}
|
||||
logger.Info(fmt.Sprintf("[%s] Stored proposer snapshot to disk", logPrefix), "number", 0, "hash", hash)
|
||||
g := errgroup.Group{}
|
||||
g.SetLimit(estimate.AlmostAllCPUs())
|
||||
defer g.Wait()
|
||||
|
||||
batchSize := 128 // must be < inmemorySignatures
|
||||
initialHeaders := make([]*types.Header, 0, batchSize)
|
||||
parentHeader := zeroHeader
|
||||
for i := uint64(1); i <= blockNum; i++ {
|
||||
header := chain.GetHeaderByNumber(i)
|
||||
{
|
||||
// `snap.apply` bottleneck - is recover of signer.
|
||||
// to speedup: recover signer in background goroutines and save in `sigcache`
|
||||
// `batchSize` < `inmemorySignatures`: means all current batch will fit in cache - and `snap.apply` will find it there.
|
||||
g.Go(func() error {
|
||||
if header == nil {
|
||||
return nil
|
||||
}
|
||||
_, _ = bor.Ecrecover(header, signatures, config)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
if header == nil {
|
||||
return nil, fmt.Errorf("missing header persisting validator sets: (inside loop at %d)", i)
|
||||
}
|
||||
initialHeaders = append(initialHeaders, header)
|
||||
if len(initialHeaders) == cap(initialHeaders) {
|
||||
if snap, err = snap.Apply(parentHeader, initialHeaders, logger); err != nil {
|
||||
return nil, fmt.Errorf("snap.Apply (inside loop): %w", err)
|
||||
}
|
||||
parentHeader = initialHeaders[len(initialHeaders)-1]
|
||||
initialHeaders = initialHeaders[:0]
|
||||
}
|
||||
select {
|
||||
case <-logEvery.C:
|
||||
logger.Info(fmt.Sprintf("[%s] Computing validator proposer prorities (forward)", logPrefix), "blockNum", i)
|
||||
default:
|
||||
}
|
||||
}
|
||||
if snap, err = snap.Apply(parentHeader, initialHeaders, logger); err != nil {
|
||||
return nil, fmt.Errorf("snap.Apply (outside loop): %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return snap, nil
|
||||
}
|
||||
|
||||
func BorHeimdallUnwind(u *UnwindState, ctx context.Context, s *StageState, tx kv.RwTx, cfg BorHeimdallCfg) (err error) {
|
||||
if cfg.chainConfig.Bor == nil {
|
||||
return
|
||||
|
@ -49,7 +49,7 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
logInterval = 20 * time.Second
|
||||
logInterval = 30 * time.Second
|
||||
|
||||
// stateStreamLimit - don't accumulate state changes if jump is bigger than this amount of blocks
|
||||
stateStreamLimit uint64 = 1_000
|
||||
|
Loading…
Reference in New Issue
Block a user