Move validator set snapshot computation to bor_heimdall stage (#8646)

Co-authored-by: Alex Sharp <alexsharp@Alexs-MacBook-Pro-2.local>
This commit is contained in:
ledgerwatch 2023-11-06 08:24:33 +00:00 committed by GitHub
parent 86e7abecc4
commit 1185587b20
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 364 additions and 176 deletions

View File

@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
@ -15,7 +16,6 @@ import (
"sync/atomic"
"time"
"github.com/google/btree"
lru "github.com/hashicorp/golang-lru/arc/v2"
"github.com/ledgerwatch/log/v3"
"github.com/xsleonard/go-merkle"
@ -264,8 +264,7 @@ type Bor struct {
// scope event.SubscriptionScope
// The fields below are for testing only
fakeDiff bool // Skip difficulty verifications
spanCache *btree.BTree
fakeDiff bool // Skip difficulty verifications
closeOnce sync.Once
logger log.Logger
@ -401,7 +400,6 @@ func New(
spanner: spanner,
GenesisContractsClient: genesisContracts,
HeimdallClient: heimdallClient,
spanCache: btree.New(32),
execCtx: context.Background(),
logger: logger,
closeCh: make(chan struct{}),
@ -467,7 +465,6 @@ func NewRo(chainConfig *chain.Config, db kv.RoDB, blockReader services.FullBlock
logger: logger,
Recents: recents,
Signatures: signatures,
spanCache: btree.New(32),
execCtx: context.Background(),
closeCh: make(chan struct{}),
}
@ -644,67 +641,7 @@ func (c *Bor) verifyCascadingFields(chain consensus.ChainHeaderReader, header *t
if parent.Time+c.config.CalculatePeriod(number) > header.Time {
return ErrInvalidTimestamp
}
sprintLength := c.config.CalculateSprint(number)
// Verify the validator list match the local contract
//
// Note: Here we fetch the data from span instead of contract
// as done in bor client. The contract (validator set) returns
// a fixed span for 0th span i.e. 0 - 255 blocks. Hence, the
// contract data and span data won't match for that. Skip validating
// for 0th span. TODO: Remove `number > zerothSpanEnd` check
// once we start fetching validator data from contract.
if number > zerothSpanEnd && isSprintStart(number+1, sprintLength) {
producerSet, err := c.spanner.GetCurrentProducers(number+1, c.authorizedSigner.Load().signer, c.getSpanForBlock)
if err != nil {
return err
}
sort.Sort(valset.ValidatorsByAddress(producerSet))
headerVals, err := valset.ParseValidators(header.Extra[extraVanity : len(header.Extra)-extraSeal])
if err != nil {
return err
}
if len(producerSet) != len(headerVals) {
return ErrInvalidSpanValidators
}
for i, val := range producerSet {
if !bytes.Equal(val.HeaderBytes(), headerVals[i].HeaderBytes()) {
return ErrInvalidSpanValidators
}
}
}
snap, err := c.snapshot(chain, number-1, header.ParentHash, parents)
if err != nil {
return err
}
// verify the validator list in the last sprint block
if isSprintStart(number, sprintLength) {
// Retrieve the snapshot needed to verify this header and cache it
parentValidatorBytes := parent.Extra[extraVanity : len(parent.Extra)-extraSeal]
validatorsBytes := make([]byte, len(snap.ValidatorSet.Validators)*validatorHeaderBytesLength)
currentValidators := snap.ValidatorSet.Copy().Validators
// sort validator by address
sort.Sort(valset.ValidatorsByAddress(currentValidators))
for i, validator := range currentValidators {
copy(validatorsBytes[i*validatorHeaderBytesLength:], validator.HeaderBytes())
}
// len(header.Extra) >= extraVanity+extraSeal has already been validated in ValidateHeaderExtraField, so this won't result in a panic
if !bytes.Equal(parentValidatorBytes, validatorsBytes) {
return &MismatchingValidatorsError{number - 1, validatorsBytes, parentValidatorBytes}
}
}
// All basic checks passed, verify the seal and return
return c.verifySeal(chain, header, parents, snap)
return nil
}
func (c *Bor) initFrozenSnapshot(chain consensus.ChainHeaderReader, number uint64, logEvery *time.Ticker) (snap *Snapshot, err error) {
@ -723,7 +660,7 @@ func (c *Bor) initFrozenSnapshot(chain consensus.ChainHeaderReader, number uint6
// get validators and current span
var validators []*valset.Validator
validators, err = c.spanner.GetCurrentValidators(1, c.authorizedSigner.Load().signer, c.getSpanForBlock)
validators, err = c.spanner.GetCurrentValidators(0, c.authorizedSigner.Load().signer, chain)
if err != nil {
return nil, err
@ -759,7 +696,7 @@ func (c *Bor) initFrozenSnapshot(chain consensus.ChainHeaderReader, number uint6
}
initialHeaders = append(initialHeaders, header)
if len(initialHeaders) == cap(initialHeaders) {
snap, err = snap.Apply(initialHeaders, c.logger)
snap, err = snap.Apply(nil, initialHeaders, c.logger)
if err != nil {
return nil, err
@ -774,7 +711,7 @@ func (c *Bor) initFrozenSnapshot(chain consensus.ChainHeaderReader, number uint6
}
}
if snap, err = snap.Apply(initialHeaders, c.logger); err != nil {
if snap, err = snap.Apply(nil, initialHeaders, c.logger); err != nil {
return nil, err
}
}
@ -852,7 +789,6 @@ func (c *Bor) snapshot(chain consensus.ChainHeaderReader, number uint64, hash li
if snap == nil && chain != nil && number <= chain.FrozenBlocks() {
var err error
c.frozenSnapshotsInit.Do(func() {
snap, err = c.initFrozenSnapshot(chain, number, logEvery)
})
@ -873,7 +809,7 @@ func (c *Bor) snapshot(chain consensus.ChainHeaderReader, number uint64, hash li
}
var err error
if snap, err = snap.Apply(headers, c.logger); err != nil {
if snap, err = snap.Apply(nil, headers, c.logger); err != nil {
return nil, err
}
@ -993,7 +929,11 @@ func (c *Bor) Prepare(chain consensus.ChainHeaderReader, header *types.Header, s
// where it fetches producers internally. As we fetch data from span
// in Erigon, use directly the `GetCurrentProducers` function.
if isSprintStart(number+1, c.config.CalculateSprint(number)) {
newValidators, err := c.spanner.GetCurrentProducers(number+1, c.authorizedSigner.Load().signer, c.getSpanForBlock)
var spanID uint64
if number+1 > zerothSpanEnd {
spanID = 1 + (number+1-zerothSpanEnd-1)/spanLength
}
newValidators, err := c.spanner.GetCurrentProducers(spanID, c.authorizedSigner.Load().signer, chain)
if err != nil {
return errUnknownValidators
}
@ -1054,13 +994,13 @@ func (c *Bor) Finalize(config *chain.Config, header *types.Header, state *state.
if isSprintStart(headerNumber, c.config.CalculateSprint(headerNumber)) {
cx := statefull.ChainContext{Chain: chain, Bor: c}
// check and commit span
if err := c.checkAndCommitSpan(state, header, cx, syscall); err != nil {
c.logger.Error("Error while committing span", "err", err)
return nil, types.Receipts{}, err
}
if c.blockReader != nil {
// check and commit span
if err := c.checkAndCommitSpan(state, header, cx, syscall); err != nil {
c.logger.Error("Error while committing span", "err", err)
return nil, types.Receipts{}, err
}
// commit states
if err := c.CommitStates(state, header, cx, syscall); err != nil {
c.logger.Error("Error while committing states", "err", err)
@ -1119,16 +1059,14 @@ func (c *Bor) FinalizeAndAssemble(chainConfig *chain.Config, header *types.Heade
if isSprintStart(headerNumber, c.config.CalculateSprint(headerNumber)) {
cx := statefull.ChainContext{Chain: chain, Bor: c}
// check and commit span
err := c.checkAndCommitSpan(state, header, cx, syscall)
if err != nil {
c.logger.Error("Error while committing span", "err", err)
return nil, nil, types.Receipts{}, err
}
if c.HeimdallClient != nil {
if c.blockReader != nil {
// check and commit span
if err := c.checkAndCommitSpan(state, header, cx, syscall); err != nil {
c.logger.Error("Error while committing span", "err", err)
return nil, nil, types.Receipts{}, err
}
// commit states
if err = c.CommitStates(state, header, cx, syscall); err != nil {
if err := c.CommitStates(state, header, cx, syscall); err != nil {
c.logger.Error("Error while committing states", "err", err)
return nil, nil, types.Receipts{}, err
}
@ -1421,46 +1359,6 @@ func (c *Bor) needToCommitSpan(currentSpan *span.Span, headerNumber uint64) bool
return false
}
func (c *Bor) getSpanForBlock(blockNum uint64) (*span.HeimdallSpan, error) {
c.logger.Debug("Getting span", "for block", blockNum)
var borSpan *span.HeimdallSpan
c.spanCache.AscendGreaterOrEqual(&span.HeimdallSpan{Span: span.Span{EndBlock: blockNum}}, func(item btree.Item) bool {
borSpan = item.(*span.HeimdallSpan)
return false
})
if borSpan != nil && borSpan.StartBlock <= blockNum && borSpan.EndBlock >= blockNum {
return borSpan, nil
}
// Span with given block block number is not loaded
// As span has fixed set of blocks (except 0th span), we can
// formulate it and get the exact ID we'd need to fetch.
var spanID uint64
if blockNum > zerothSpanEnd {
spanID = 1 + (blockNum-zerothSpanEnd-1)/spanLength
}
if c.HeimdallClient == nil {
return nil, fmt.Errorf("span with given block number is not loaded: %d", spanID)
}
c.logger.Debug("Span with given block number is not loaded", "fetching span", spanID)
response, err := c.HeimdallClient.Span(c.execCtx, spanID)
if err != nil {
return nil, err
}
borSpan = response
c.spanCache.ReplaceOrInsert(borSpan)
for c.spanCache.Len() > 128 {
c.spanCache.DeleteMin()
}
return borSpan, nil
}
func (c *Bor) fetchAndCommitSpan(
newSpanID uint64,
state *state.IntraBlockState,
@ -1479,12 +1377,10 @@ func (c *Bor) fetchAndCommitSpan(
heimdallSpan = *s
} else {
response, err := c.HeimdallClient.Span(c.execCtx, newSpanID)
if err != nil {
spanJson := chain.Chain.BorSpan(newSpanID)
if err := json.Unmarshal(spanJson, &heimdallSpan); err != nil {
return err
}
heimdallSpan = *response
}
// check if chain id matches with heimdall span
@ -1594,10 +1490,6 @@ func (c *Bor) SetHeimdallClient(h heimdall.IHeimdallClient) {
c.HeimdallClient = h
}
func (c *Bor) GetCurrentValidators(blockNumber uint64, signer libcommon.Address, getSpanForBlock func(blockNum uint64) (*span.HeimdallSpan, error)) ([]*valset.Validator, error) {
return c.spanner.GetCurrentValidators(blockNumber, signer, getSpanForBlock)
}
//
// Private methods
//

View File

@ -8,6 +8,7 @@ import (
"testing"
"github.com/ledgerwatch/erigon-lib/chain"
"github.com/ledgerwatch/erigon-lib/common"
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/gointerfaces/sentry"
"github.com/ledgerwatch/erigon-lib/kv/memdb"
@ -181,7 +182,8 @@ func (r headerReader) BorSpan(spanId uint64) []byte {
type spanner struct {
*span.ChainSpanner
currentSpan span.Span
validatorAddress common.Address
currentSpan span.Span
}
func (c spanner) GetCurrentSpan(_ consensus.SystemCall) (*span.Span, error) {
@ -193,6 +195,16 @@ func (c *spanner) CommitSpan(heimdallSpan span.HeimdallSpan, syscall consensus.S
return nil
}
func (c *spanner) GetCurrentValidators(spanId uint64, signer libcommon.Address, chain consensus.ChainHeaderReader) ([]*valset.Validator, error) {
return []*valset.Validator{
{
ID: 1,
Address: c.validatorAddress,
VotingPower: 1000,
ProposerPriority: 1,
}}, nil
}
type validator struct {
*mock.MockSentry
heimdall *test_heimdall
@ -254,19 +266,18 @@ func (v validator) verifyBlocks(blocks []*types.Block) error {
func newValidator(t *testing.T, heimdall *test_heimdall, blocks map[uint64]*types.Block) validator {
logger := log.Root()
validatorKey, _ := crypto.GenerateKey()
validatorAddress := crypto.PubkeyToAddress(validatorKey.PublicKey)
bor := bor.New(
heimdall.chainConfig,
memdb.New(""),
nil, /* blockReader */
&spanner{span.NewChainSpanner(contract.ValidatorSet(), heimdall.chainConfig, false, logger), span.Span{}},
&spanner{span.NewChainSpanner(contract.ValidatorSet(), heimdall.chainConfig, false, logger), validatorAddress, span.Span{}},
heimdall,
test_genesisContract{},
logger,
)
validatorKey, _ := crypto.GenerateKey()
validatorAddress := crypto.PubkeyToAddress(validatorKey.PublicKey)
/*fmt.Printf("Private: 0x%s\nPublic: 0x%s\nAddress: %s\n",
hex.EncodeToString(crypto.FromECDSA(validatorKey)),
hex.EncodeToString(crypto.MarshalPubkey(&validatorKey.PublicKey)),

View File

@ -2,6 +2,7 @@ package span
import (
"encoding/hex"
"encoding/json"
"math/big"
"github.com/ledgerwatch/erigon-lib/chain"
@ -67,28 +68,30 @@ func (c *ChainSpanner) GetCurrentSpan(syscall consensus.SystemCall) (*Span, erro
return &span, nil
}
func (c *ChainSpanner) GetCurrentValidators(blockNumber uint64, signer libcommon.Address, getSpanForBlock func(blockNum uint64) (*HeimdallSpan, error)) ([]*valset.Validator, error) {
func (c *ChainSpanner) GetCurrentValidators(spanId uint64, signer libcommon.Address, chain consensus.ChainHeaderReader) ([]*valset.Validator, error) {
// Use hardcoded bor devnet valset if chain-name = bor-devnet
if NetworkNameVals[c.chainConfig.ChainName] != nil && c.withoutHeimdall {
return NetworkNameVals[c.chainConfig.ChainName], nil
}
span, err := getSpanForBlock(blockNumber)
if err != nil {
spanBytes := chain.BorSpan(spanId)
var span HeimdallSpan
if err := json.Unmarshal(spanBytes, &span); err != nil {
return nil, err
}
return span.ValidatorSet.Validators, nil
}
func (c *ChainSpanner) GetCurrentProducers(blockNumber uint64, signer libcommon.Address, getSpanForBlock func(blockNum uint64) (*HeimdallSpan, error)) ([]*valset.Validator, error) {
func (c *ChainSpanner) GetCurrentProducers(spanId uint64, signer libcommon.Address, chain consensus.ChainHeaderReader) ([]*valset.Validator, error) {
// Use hardcoded bor devnet valset if chain-name = bor-devnet
if NetworkNameVals[c.chainConfig.ChainName] != nil && c.withoutHeimdall {
return NetworkNameVals[c.chainConfig.ChainName], nil
}
span, err := getSpanForBlock(blockNumber)
if err != nil {
spanBytes := chain.BorSpan(spanId)
var span HeimdallSpan
if err := json.Unmarshal(spanBytes, &span); err != nil {
return nil, err
}

View File

@ -118,7 +118,7 @@ func (s *Snapshot) copy() *Snapshot {
return cpy
}
func (s *Snapshot) Apply(headers []*types.Header, logger log.Logger) (*Snapshot, error) {
func (s *Snapshot) Apply(parent *types.Header, headers []*types.Header, logger log.Logger) (*Snapshot, error) {
// Allow passing in no headers for cleaner code
if len(headers) == 0 {
return s, nil
@ -153,23 +153,29 @@ func (s *Snapshot) Apply(headers []*types.Header, logger log.Logger) (*Snapshot,
}
var validSigner bool
var succession int
// check if signer is in validator set
if snap.ValidatorSet.HasAddress(signer) {
if _, err = snap.GetSignerSuccessionNumber(signer); err != nil {
return nil, err
}
if !snap.ValidatorSet.HasAddress(signer) {
return snap, &UnauthorizedSignerError{number, signer.Bytes()}
}
if succession, err = snap.GetSignerSuccessionNumber(signer); err != nil {
return snap, err
}
// add recents
snap.Recents[number] = signer
// add recents
snap.Recents[number] = signer
validSigner = true
validSigner = true
if parent != nil && header.Time < parent.Time+CalcProducerDelay(number, succession, s.config) {
return snap, &BlockTooSoonError{number, succession}
}
// change validator set and change proposer
if number > 0 && (number+1)%sprintLen == 0 {
if err := ValidateHeaderExtraField(header.Extra); err != nil {
return nil, err
return snap, err
}
validatorBytes := header.Extra[extraVanity : len(header.Extra)-extraSeal]
@ -181,13 +187,13 @@ func (s *Snapshot) Apply(headers []*types.Header, logger log.Logger) (*Snapshot,
}
if number > 64 && !validSigner {
return nil, &UnauthorizedSignerError{number, signer.Bytes()}
return snap, &UnauthorizedSignerError{number, signer.Bytes()}
}
parent = header
snap.Number = number
snap.Hash = header.Hash()
}
snap.Number += uint64(len(headers))
snap.Hash = headers[len(headers)-1].Hash()
return snap, nil
}

View File

@ -10,7 +10,7 @@ import (
//go:generate mockgen -destination=./span_mock.go -package=bor . Spanner
type Spanner interface {
GetCurrentSpan(syscall consensus.SystemCall) (*span.Span, error)
GetCurrentValidators(blockNumber uint64, signer libcommon.Address, getSpanForBlock func(blockNum uint64) (*span.HeimdallSpan, error)) ([]*valset.Validator, error)
GetCurrentProducers(blockNumber uint64, signer libcommon.Address, getSpanForBlock func(blockNum uint64) (*span.HeimdallSpan, error)) ([]*valset.Validator, error)
GetCurrentValidators(spanId uint64, signer libcommon.Address, chain consensus.ChainHeaderReader) ([]*valset.Validator, error)
GetCurrentProducers(spanId uint64, signer libcommon.Address, chain consensus.ChainHeaderReader) ([]*valset.Validator, error)
CommitSpan(heimdallSpan span.HeimdallSpan, syscall consensus.SystemCall) error
}

View File

@ -1,36 +1,49 @@
package stagedsync
import (
"bytes"
"context"
"encoding/binary"
"encoding/json"
"fmt"
"math/big"
"sort"
"strconv"
"time"
lru "github.com/hashicorp/golang-lru/arc/v2"
"github.com/ledgerwatch/erigon-lib/chain"
"github.com/ledgerwatch/erigon-lib/common"
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/accounts/abi"
"github.com/ledgerwatch/erigon/consensus"
"github.com/ledgerwatch/erigon/consensus/bor"
"github.com/ledgerwatch/erigon/consensus/bor/contract"
"github.com/ledgerwatch/erigon/consensus/bor/finality/generics"
"github.com/ledgerwatch/erigon/consensus/bor/finality/whitelist"
"github.com/ledgerwatch/erigon/consensus/bor/heimdall"
"github.com/ledgerwatch/erigon/consensus/bor/heimdall/span"
"github.com/ledgerwatch/erigon/consensus/bor/valset"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/dataflow"
"github.com/ledgerwatch/erigon/eth/ethconfig/estimate"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/erigon/rlp"
"github.com/ledgerwatch/erigon/turbo/services"
"github.com/ledgerwatch/erigon/turbo/stages/headerdownload"
"github.com/ledgerwatch/log/v3"
"golang.org/x/sync/errgroup"
)
const (
spanLength = 6400 // Number of blocks in a span
zerothSpanEnd = 255 // End block of 0th span
spanLength = 6400 // Number of blocks in a span
zerothSpanEnd = 255 // End block of 0th span
inmemorySnapshots = 128 // Number of recent vote snapshots to keep in memory
inmemorySignatures = 4096 // Number of recent block signatures to keep in memory
snapshotPersistInterval = 1024 // Number of blocks after which to persist the vote snapshot to the database
extraVanity = 32 // Fixed number of extra-data prefix bytes reserved for signer vanity
extraSeal = 65 // Fixed number of extra-data suffix bytes reserved for signer seal
)
type BorHeimdallCfg struct {
@ -171,21 +184,49 @@ func BorHeimdallForward(
if k != nil {
lastEventId = binary.BigEndian.Uint64(k)
}
type LastFrozenEvent interface {
type LastFrozen interface {
LastFrozenEventID() uint64
LastFrozenSpanID() uint64
}
snapshotLastEventId := cfg.blockReader.(LastFrozenEvent).LastFrozenEventID()
snapshotLastEventId := cfg.blockReader.(LastFrozen).LastFrozenEventID()
if snapshotLastEventId > lastEventId {
lastEventId = snapshotLastEventId
}
sCursor, err := tx.Cursor(kv.BorSpans)
if err != nil {
return err
}
defer sCursor.Close()
k, _, err = sCursor.Last()
if err != nil {
return err
}
var nextSpanId uint64
if k != nil {
nextSpanId = binary.BigEndian.Uint64(k) + 1
}
snapshotLastSpanId := cfg.blockReader.(LastFrozen).LastFrozenSpanID()
if snapshotLastSpanId+1 > nextSpanId {
nextSpanId = snapshotLastSpanId + 1
}
var endSpanID uint64
if headNumber > zerothSpanEnd {
endSpanID = 2 + (headNumber-zerothSpanEnd)/spanLength
}
lastBlockNum := s.BlockNumber
if cfg.blockReader.FrozenBorBlocks() > lastBlockNum {
lastBlockNum = cfg.blockReader.FrozenBorBlocks()
}
if !mine {
logger.Info("["+s.LogPrefix()+"] Processng sync events...", "from", lastBlockNum+1)
recents, err := lru.NewARC[libcommon.Hash, *bor.Snapshot](inmemorySnapshots)
if err != nil {
return err
}
signatures, err := lru.NewARC[libcommon.Hash, libcommon.Address](inmemorySignatures)
if err != nil {
return err
}
chain := NewChainReaderImpl(&cfg.chainConfig, tx, cfg.blockReader, logger)
var blockNum uint64
var fetchTime time.Duration
@ -195,6 +236,17 @@ func BorHeimdallForward(
logTimer := time.NewTicker(30 * time.Second)
defer logTimer.Stop()
if endSpanID >= nextSpanId {
logger.Info("["+s.LogPrefix()+"] Processing spans...", "from", nextSpanId, "to", endSpanID)
}
for spanID := nextSpanId; spanID <= endSpanID; spanID++ {
if lastSpanId, err = fetchAndWriteSpans(ctx, spanID, tx, cfg.heimdallClient, s.LogPrefix(), logger); err != nil {
return err
}
}
if !mine {
logger.Info("["+s.LogPrefix()+"] Processing sync events...", "from", lastBlockNum+1, "to", headNumber)
}
for blockNum = lastBlockNum + 1; blockNum <= headNumber; blockNum++ {
select {
default:
@ -233,9 +285,15 @@ func BorHeimdallForward(
fetchTime += callTime
}
if blockNum == 1 || (blockNum > zerothSpanEnd && ((blockNum-zerothSpanEnd-1)%spanLength) == 0) {
if lastSpanId, err = fetchAndWriteSpans(ctx, blockNum, tx, cfg.heimdallClient, s.LogPrefix(), logger); err != nil {
return err
if err = PersistValidatorSets(u, ctx, tx, cfg.blockReader, cfg.chainConfig.Bor, chain, blockNum, header.Hash(), recents, signatures, cfg.snapDb, logger); err != nil {
return fmt.Errorf("persistValidatorSets: %w", err)
}
if !mine && header != nil {
sprintLength := cfg.chainConfig.Bor.CalculateSprint(blockNum)
if blockNum > zerothSpanEnd && ((blockNum+1)%sprintLength == 0) {
if err = checkHeaderExtraData(u, ctx, chain, blockNum, header); err != nil {
return err
}
}
}
}
@ -255,6 +313,46 @@ func BorHeimdallForward(
return
}
func checkHeaderExtraData(
u Unwinder,
ctx context.Context,
chain consensus.ChainHeaderReader,
blockNum uint64,
header *types.Header,
) error {
var spanID uint64
if blockNum+1 > zerothSpanEnd {
spanID = 1 + (blockNum+1-zerothSpanEnd-1)/spanLength
}
spanBytes := chain.BorSpan(spanID)
var sp span.HeimdallSpan
if err := json.Unmarshal(spanBytes, &sp); err != nil {
return err
}
producerSet := make([]*valset.Validator, len(sp.SelectedProducers))
for i := range sp.SelectedProducers {
producerSet[i] = &sp.SelectedProducers[i]
}
sort.Sort(valset.ValidatorsByAddress(producerSet))
headerVals, err := valset.ParseValidators(header.Extra[extraVanity : len(header.Extra)-extraSeal])
if err != nil {
return err
}
if len(producerSet) != len(headerVals) {
return bor.ErrInvalidSpanValidators
}
for i, val := range producerSet {
if !bytes.Equal(val.HeaderBytes(), headerVals[i].HeaderBytes()) {
return bor.ErrInvalidSpanValidators
}
}
return nil
}
func fetchAndWriteBorEvents(
ctx context.Context,
blockReader services.FullBlockReader,
@ -360,17 +458,12 @@ func fetchAndWriteBorEvents(
func fetchAndWriteSpans(
ctx context.Context,
blockNum uint64,
spanId uint64,
tx kv.RwTx,
heimdallClient heimdall.IHeimdallClient,
logPrefix string,
logger log.Logger,
) (uint64, error) {
var spanId uint64
if blockNum > zerothSpanEnd {
spanId = 1 + (blockNum-zerothSpanEnd-1)/spanLength
}
logger.Debug(fmt.Sprintf("[%s] Fetching span", logPrefix), "id", spanId)
response, err := heimdallClient.Span(ctx, spanId)
if err != nil {
return 0, err
@ -384,9 +477,192 @@ func fetchAndWriteSpans(
if err = tx.Put(kv.BorSpans, spanIDBytes[:], spanBytes); err != nil {
return 0, err
}
logger.Debug(fmt.Sprintf("[%s] Wrote span", logPrefix), "id", spanId)
return spanId, nil
}
// Not used currently
func PersistValidatorSets(
u Unwinder,
ctx context.Context,
tx kv.Tx,
blockReader services.FullBlockReader,
config *chain.BorConfig,
chain consensus.ChainHeaderReader,
blockNum uint64,
hash libcommon.Hash,
recents *lru.ARCCache[libcommon.Hash, *bor.Snapshot],
signatures *lru.ARCCache[libcommon.Hash, libcommon.Address],
snapDb kv.RwDB,
logger log.Logger) error {
logEvery := time.NewTicker(logInterval)
defer logEvery.Stop()
// Search for a snapshot in memory or on disk for checkpoints
var snap *bor.Snapshot
headers := make([]*types.Header, 0, 16)
var parent *types.Header
if s, ok := recents.Get(hash); ok {
snap = s
}
//nolint:govet
for snap == nil {
// If an on-disk snapshot can be found, use that
if blockNum%snapshotPersistInterval == 0 {
if s, err := bor.LoadSnapshot(config, signatures, snapDb, hash); err == nil {
logger.Trace("Loaded snapshot from disk", "number", blockNum, "hash", hash)
snap = s
break
}
}
// No snapshot for this header, gather the header and move backward
var header *types.Header
// No explicit parents (or no more left), reach out to the database
if parent != nil {
header = parent
} else if chain != nil {
header = chain.GetHeader(hash, blockNum)
//logger.Info(fmt.Sprintf("header %d %x => %+v\n", header.Number.Uint64(), header.Hash(), header))
}
if header == nil {
return consensus.ErrUnknownAncestor
}
if blockNum == 0 {
break
}
headers = append(headers, header)
blockNum, hash = blockNum-1, header.ParentHash
if chain != nil {
parent = chain.GetHeader(hash, blockNum)
}
// If an in-memory snapshot was found, use that
if s, ok := recents.Get(hash); ok {
snap = s
break
}
if chain != nil && blockNum < chain.FrozenBlocks() {
break
}
select {
case <-logEvery.C:
logger.Info("Gathering headers for validator proposer prorities (backwards)", "blockNum", blockNum)
default:
}
}
if snap == nil && chain != nil && blockNum <= chain.FrozenBlocks() {
// Special handling of the headers in the snapshot
zeroHeader := chain.GetHeaderByNumber(0)
if zeroHeader != nil {
// get checkpoint data
hash := zeroHeader.Hash()
// get validators and current span
zeroSpanBytes, err := blockReader.Span(ctx, tx, 0)
if err != nil {
return err
}
var zeroSpan span.HeimdallSpan
if err = json.Unmarshal(zeroSpanBytes, &zeroSpan); err != nil {
return err
}
// new snap shot
snap = bor.NewSnapshot(config, signatures, 0, hash, zeroSpan.ValidatorSet.Validators, logger)
if err := snap.Store(snapDb); err != nil {
return fmt.Errorf("snap.Store (0): %w", err)
}
logger.Info("Stored proposer snapshot to disk", "number", 0, "hash", hash)
g := errgroup.Group{}
g.SetLimit(estimate.AlmostAllCPUs())
defer g.Wait()
batchSize := 128 // must be < inmemorySignatures
initialHeaders := make([]*types.Header, 0, batchSize)
parentHeader := zeroHeader
for i := uint64(1); i <= blockNum; i++ {
header := chain.GetHeaderByNumber(i)
{
// `snap.apply` bottleneck - is recover of signer.
// to speedup: recover signer in background goroutines and save in `sigcache`
// `batchSize` < `inmemorySignatures`: means all current batch will fit in cache - and `snap.apply` will find it there.
g.Go(func() error {
_, _ = bor.Ecrecover(header, signatures, config)
return nil
})
}
initialHeaders = append(initialHeaders, header)
if len(initialHeaders) == cap(initialHeaders) {
if snap, err = snap.Apply(parentHeader, initialHeaders, logger); err != nil {
return fmt.Errorf("snap.Apply (inside loop): %w", err)
}
parentHeader = initialHeaders[len(initialHeaders)-1]
initialHeaders = initialHeaders[:0]
}
select {
case <-logEvery.C:
logger.Info("Computing validator proposer prorities (forward)", "blockNum", i)
default:
}
}
if snap, err = snap.Apply(parentHeader, initialHeaders, logger); err != nil {
return fmt.Errorf("snap.Apply (outside loop): %w", err)
}
}
}
// check if snapshot is nil
if snap == nil {
return fmt.Errorf("unknown error while retrieving snapshot at block number %v", blockNum)
}
// Previous snapshot found, apply any pending headers on top of it
for i := 0; i < len(headers)/2; i++ {
headers[i], headers[len(headers)-1-i] = headers[len(headers)-1-i], headers[i]
}
if len(headers) > 0 {
var err error
if snap, err = snap.Apply(parent, headers, logger); err != nil {
if snap != nil {
var badHash common.Hash
for _, header := range headers {
if header.Number.Uint64() == snap.Number+1 {
badHash = header.Hash()
break
}
}
u.UnwindTo(snap.Number, BadBlock(badHash, err))
} else {
return fmt.Errorf("snap.Apply %d, headers %d-%d: %w", blockNum, headers[0].Number.Uint64(), headers[len(headers)-1].Number.Uint64(), err)
}
}
}
recents.Add(snap.Hash, snap)
// If we've generated a new persistent snapshot, save to disk
if snap.Number%snapshotPersistInterval == 0 && len(headers) > 0 {
if err := snap.Store(snapDb); err != nil {
return fmt.Errorf("snap.Store: %w", err)
}
logger.Info("Stored proposer snapshot to disk", "number", snap.Number, "hash", snap.Hash)
}
return nil
}
func BorHeimdallUnwind(u *UnwindState, ctx context.Context, s *StageState, tx kv.RwTx, cfg BorHeimdallCfg) (err error) {
if cfg.chainConfig.Bor == nil {
return