From f99f3263631181f8b8cfb3f3936de3f01c090177 Mon Sep 17 00:00:00 2001 From: Mark Holt <135143369+mh0lt@users.noreply.github.com> Date: Wed, 27 Sep 2023 13:45:09 +0100 Subject: [PATCH] Bor fix frozen snapshot load (#8305) This is a fix for at least one cause of this isssue: https://github.com/ledgerwatch/erigon/issues/8212. It happens after the end of the snapshot load, because the snapshot processing which was introduced a couple of month ago does not deal with validation of the headers at the start of the start of the chain. I have also added a fix to the persistence so that the last snapshot is recorded so that subsequent runs are not forced to process the whole snapshot run from start. The relationship between this an memory usage is that the fact that headers are not processed leads to a queue of pending headers with size of around 5GB. I have not changed any header parameters - so likely a prolonged stop to header processing will result in a similar level of memory growth. --- cmd/silkworm_api/snapshot_idx.go | 2 +- consensus/bor/bor.go | 108 ++++++++++++++++++++----------- consensus/bor/snapshot.go | 24 ++++--- 3 files changed, 86 insertions(+), 48 deletions(-) diff --git a/cmd/silkworm_api/snapshot_idx.go b/cmd/silkworm_api/snapshot_idx.go index e7fd51672..4265ef194 100644 --- a/cmd/silkworm_api/snapshot_idx.go +++ b/cmd/silkworm_api/snapshot_idx.go @@ -88,7 +88,7 @@ func buildIndex(cliCtx *cli.Context, dataDir string, snapshotPaths []string) err return s.Path == snapshotPath }) if !found { - return fmt.Errorf("Segment %s not found\n", snapshotPath) + return fmt.Errorf("segment %s not found", snapshotPath) } switch segment.T { diff --git a/consensus/bor/bor.go b/consensus/bor/bor.go index 40533496a..3ad04dbce 100644 --- a/consensus/bor/bor.go +++ b/consensus/bor/bor.go @@ -258,9 +258,10 @@ type Bor struct { fakeDiff bool // Skip difficulty verifications spanCache *btree.BTree - closeOnce sync.Once - logger log.Logger - closeCh chan struct{} // Channel to signal the background processes to exit + closeOnce sync.Once + logger log.Logger + closeCh chan struct{} // Channel to signal the background processes to exit + frozenSnapshotsInit sync.Once } type signer struct { @@ -641,6 +642,66 @@ func (c *Bor) verifyCascadingFields(chain consensus.ChainHeaderReader, header *t return c.verifySeal(chain, header, parents, snap) } +func (c *Bor) initFrozenSnapshot(chain consensus.ChainHeaderReader, number uint64, logEvery *time.Ticker) (snap *Snapshot, err error) { + c.logger.Info("Initializing frozen snapshots to", "number", number) + defer func() { + c.logger.Info("Done initializing frozen snapshots to", "number", number, "err", err) + }() + + // Special handling of the headers in the snapshot + zeroHeader := chain.GetHeaderByNumber(0) + + if zeroHeader != nil { + // get checkpoint data + hash := zeroHeader.Hash() + + // get validators and current span + var validators []*valset.Validator + + validators, err = c.spanner.GetCurrentValidators(1, c.authorizedSigner.Load().signer, c.getSpanForBlock) + + if err != nil { + return nil, err + } + + // new snap shot + snap = newSnapshot(c.config, c.signatures, 0, hash, validators, c.logger) + + if err = snap.store(c.DB); err != nil { + return nil, err + } + + c.logger.Info("Stored proposer snapshot to disk", "number", 0, "hash", hash) + + initialHeaders := make([]*types.Header, 0, 128) + + for i := uint64(1); i <= number; i++ { + header := chain.GetHeaderByNumber(i) + initialHeaders = append(initialHeaders, header) + if len(initialHeaders) == cap(initialHeaders) { + snap, err = snap.apply(initialHeaders, c.logger) + + if err != nil { + return nil, err + } + + initialHeaders = initialHeaders[:0] + } + select { + case <-logEvery.C: + log.Info("Computing validator proposer prorities (forward)", "blockNum", i) + default: + } + } + + if snap, err = snap.apply(initialHeaders, c.logger); err != nil { + return nil, err + } + } + + return snap, nil +} + // snapshot retrieves the authorization snapshot at a given point in time. func (c *Bor) snapshot(chain consensus.ChainHeaderReader, number uint64, hash libcommon.Hash, parents []*types.Header) (*Snapshot, error) { logEvery := time.NewTicker(logInterval) @@ -710,43 +771,14 @@ func (c *Bor) snapshot(chain consensus.ChainHeaderReader, number uint64, hash li } if snap == nil && chain != nil && number <= chain.FrozenBlocks() { - // Special handling of the headers in the snapshot - zeroHeader := chain.GetHeaderByNumber(0) - if zeroHeader != nil { - // get checkpoint data - hash := zeroHeader.Hash() + var err error - // get validators and current span - validators, err := c.spanner.GetCurrentValidators(1, c.authorizedSigner.Load().signer, c.getSpanForBlock) - if err != nil { - return nil, err - } + c.frozenSnapshotsInit.Do(func() { + snap, err = c.initFrozenSnapshot(chain, number, logEvery) + }) - // new snap shot - snap = newSnapshot(c.config, c.signatures, 0, hash, validators, c.logger) - if err := snap.store(c.DB); err != nil { - return nil, err - } - c.logger.Info("Stored proposer snapshot to disk", "number", 0, "hash", hash) - initialHeaders := make([]*types.Header, 0, 128) - for i := uint64(1); i <= number; i++ { - header := chain.GetHeaderByNumber(i) - initialHeaders = append(initialHeaders, header) - if len(initialHeaders) == cap(initialHeaders) { - if snap, err = snap.apply(initialHeaders, c.logger); err != nil { - return nil, err - } - initialHeaders = initialHeaders[:0] - } - select { - case <-logEvery.C: - log.Info("Computing validator proposer prorities (forward)", "blockNum", i) - default: - } - } - if snap, err = snap.apply(initialHeaders, c.logger); err != nil { - return nil, err - } + if err != nil { + return nil, err } } diff --git a/consensus/bor/snapshot.go b/consensus/bor/snapshot.go index 0e8825dd5..9d98e458b 100644 --- a/consensus/bor/snapshot.go +++ b/consensus/bor/snapshot.go @@ -145,24 +145,26 @@ func (s *Snapshot) apply(headers []*types.Header, logger log.Logger) (*Snapshot, if number >= sprintLen { delete(snap.Recents, number-sprintLen) } - // Resolve the authorization key and check against signers signer, err := ecrecover(header, s.sigcache, s.config) + if err != nil { return nil, err } + var validSigner bool + // check if signer is in validator set - if !snap.ValidatorSet.HasAddress(signer) { - return nil, &UnauthorizedSignerError{number, signer.Bytes()} - } + if snap.ValidatorSet.HasAddress(signer) { + if _, err = snap.GetSignerSuccessionNumber(signer); err != nil { + return nil, err + } - if _, err = snap.GetSignerSuccessionNumber(signer); err != nil { - return nil, err - } + // add recents + snap.Recents[number] = signer - // add recents - snap.Recents[number] = signer + validSigner = true + } // change validator set and change proposer if number > 0 && (number+1)%sprintLen == 0 { @@ -177,6 +179,10 @@ func (s *Snapshot) apply(headers []*types.Header, logger log.Logger) (*Snapshot, v.IncrementProposerPriority(1, logger) snap.ValidatorSet = v } + + if number > 64 && !validSigner { + return nil, &UnauthorizedSignerError{number, signer.Bytes()} + } } snap.Number += uint64(len(headers))