stagedsync: fix bor heimdall mining flow (#9149)

Currently the mining loop is broken for the polygon chain. This PR fixes
this.

High level changes:

- Introduces new Bor<->Heimdall stage specifically for the needs of the
mining flow
- Extracts out common logic from Bor<->Heimdall sync and mining stages
into shared functions
- Removes `mine` flag for the Bor<->Heimdall sync stage
- Extends the current `StartMining` function to prefetch span zero if
needed before the mining loop is started
- Fixes Bor to read span zero (instead of span 1) from heimdall when the
span is not initially set in the local smart contract that the Spanner
uses

Test with devnet "state-sync" scenario:
![Screenshot 2024-01-05 at 17 41
23](https://github.com/ledgerwatch/erigon/assets/94537774/34ca903a-69b8-416a-900f-a32f2d4417fa)
This commit is contained in:
milen 2024-01-09 11:37:39 +00:00 committed by GitHub
parent 302f1f772d
commit 74ec3a9db7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 599 additions and 420 deletions

View File

@ -1271,39 +1271,25 @@ func (c *Bor) checkAndCommitSpan(
) error {
headerNumber := header.Number.Uint64()
span, err := c.spanner.GetCurrentSpan(syscall)
currentSpan, err := c.spanner.GetCurrentSpan(syscall)
if err != nil {
return err
}
if c.needToCommitSpan(span, headerNumber) {
err := c.fetchAndCommitSpan(span.ID+1, state, header, chain, syscall)
return err
// check span is not set initially
if currentSpan.EndBlock == 0 {
return c.fetchAndCommitSpan(currentSpan.ID, state, header, chain, syscall)
}
// if current block is first block of last sprint in current span
sprintLength := c.config.CalculateSprintLength(headerNumber)
if currentSpan.EndBlock > sprintLength && currentSpan.EndBlock-sprintLength+1 == headerNumber {
return c.fetchAndCommitSpan(currentSpan.ID+1, state, header, chain, syscall)
}
return nil
}
func (c *Bor) needToCommitSpan(currentSpan *span.Span, headerNumber uint64) bool {
// if span is nil
if currentSpan == nil {
return false
}
// check span is not set initially
if currentSpan.EndBlock == 0 {
return true
}
sprintLength := c.config.CalculateSprintLength(headerNumber)
// if current block is first block of last sprint in current span
if currentSpan.EndBlock > sprintLength && currentSpan.EndBlock-sprintLength+1 == headerNumber {
return true
}
return false
}
func (c *Bor) fetchAndCommitSpan(
newSpanID uint64,
state *state.IntraBlockState,

View File

@ -32,12 +32,38 @@ import (
"time"
lru "github.com/hashicorp/golang-lru/arc/v2"
"github.com/holiman/uint256"
"github.com/ledgerwatch/log/v3"
"golang.org/x/exp/slices"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/protobuf/types/known/emptypb"
"github.com/ledgerwatch/erigon-lib/chain"
"github.com/ledgerwatch/erigon-lib/chain/networkname"
"github.com/ledgerwatch/erigon-lib/chain/snapcfg"
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/common/datadir"
"github.com/ledgerwatch/erigon-lib/diagnostics"
"github.com/ledgerwatch/erigon-lib/direct"
"github.com/ledgerwatch/erigon-lib/downloader"
"github.com/ledgerwatch/erigon-lib/downloader/downloadercfg"
"github.com/ledgerwatch/erigon-lib/downloader/downloadergrpc"
proto_downloader "github.com/ledgerwatch/erigon-lib/gointerfaces/downloader"
"github.com/ledgerwatch/erigon-lib/gointerfaces/grpcutil"
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
rpcsentinel "github.com/ledgerwatch/erigon-lib/gointerfaces/sentinel"
proto_sentry "github.com/ledgerwatch/erigon-lib/gointerfaces/sentry"
txpool_proto "github.com/ledgerwatch/erigon-lib/gointerfaces/txpool"
prototypes "github.com/ledgerwatch/erigon-lib/gointerfaces/types"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/kvcache"
"github.com/ledgerwatch/erigon-lib/kv/kvcfg"
"github.com/ledgerwatch/erigon-lib/kv/remotedbserver"
libstate "github.com/ledgerwatch/erigon-lib/state"
"github.com/ledgerwatch/erigon-lib/txpool"
"github.com/ledgerwatch/erigon-lib/txpool/txpooluitl"
types2 "github.com/ledgerwatch/erigon-lib/types"
"github.com/ledgerwatch/erigon-lib/wrap"
"github.com/ledgerwatch/erigon/cl/clparams"
"github.com/ledgerwatch/erigon/cl/cltypes"
@ -49,54 +75,9 @@ import (
"github.com/ledgerwatch/erigon/cl/phase1/execution_client"
"github.com/ledgerwatch/erigon/cl/sentinel"
"github.com/ledgerwatch/erigon/cl/sentinel/service"
"github.com/ledgerwatch/erigon/core/rawdb/blockio"
"github.com/ledgerwatch/erigon/ethdb/prune"
"github.com/ledgerwatch/erigon/p2p/sentry"
"github.com/ledgerwatch/erigon/p2p/sentry/sentry_multi_client"
"github.com/ledgerwatch/erigon/turbo/builder"
"github.com/ledgerwatch/erigon/turbo/engineapi"
"github.com/ledgerwatch/erigon/turbo/engineapi/engine_block_downloader"
"github.com/ledgerwatch/erigon/turbo/engineapi/engine_helpers"
"github.com/ledgerwatch/erigon/turbo/execution/eth1"
"github.com/ledgerwatch/erigon/turbo/execution/eth1/eth1_chain_reader.go"
"github.com/ledgerwatch/erigon/turbo/jsonrpc"
"github.com/ledgerwatch/erigon/turbo/silkworm"
"github.com/ledgerwatch/erigon/turbo/snapshotsync/freezeblocks"
"github.com/ledgerwatch/erigon/turbo/snapshotsync/snap"
"github.com/holiman/uint256"
"github.com/ledgerwatch/log/v3"
"golang.org/x/exp/slices"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/protobuf/types/known/emptypb"
"github.com/ledgerwatch/erigon-lib/chain"
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/common/datadir"
"github.com/ledgerwatch/erigon-lib/direct"
downloader "github.com/ledgerwatch/erigon-lib/downloader"
"github.com/ledgerwatch/erigon-lib/downloader/downloadercfg"
proto_downloader "github.com/ledgerwatch/erigon-lib/gointerfaces/downloader"
"github.com/ledgerwatch/erigon-lib/gointerfaces/grpcutil"
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
proto_sentry "github.com/ledgerwatch/erigon-lib/gointerfaces/sentry"
txpool_proto "github.com/ledgerwatch/erigon-lib/gointerfaces/txpool"
prototypes "github.com/ledgerwatch/erigon-lib/gointerfaces/types"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/kvcache"
"github.com/ledgerwatch/erigon-lib/kv/remotedbserver"
libstate "github.com/ledgerwatch/erigon-lib/state"
"github.com/ledgerwatch/erigon-lib/txpool"
"github.com/ledgerwatch/erigon-lib/txpool/txpooluitl"
types2 "github.com/ledgerwatch/erigon-lib/types"
"github.com/ledgerwatch/erigon/cmd/caplin/caplin1"
"github.com/ledgerwatch/erigon/cmd/rpcdaemon/cli"
"github.com/ledgerwatch/erigon/common/debug"
rpcsentinel "github.com/ledgerwatch/erigon-lib/gointerfaces/sentinel"
"github.com/ledgerwatch/erigon/consensus"
"github.com/ledgerwatch/erigon/consensus/bor"
"github.com/ledgerwatch/erigon/consensus/bor/finality/flags"
@ -108,6 +89,7 @@ import (
"github.com/ledgerwatch/erigon/consensus/misc"
"github.com/ledgerwatch/erigon/core"
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/core/rawdb/blockio"
"github.com/ledgerwatch/erigon/core/state/temporal"
"github.com/ledgerwatch/erigon/core/systemcontracts"
"github.com/ledgerwatch/erigon/core/types"
@ -120,14 +102,27 @@ import (
"github.com/ledgerwatch/erigon/eth/stagedsync"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/erigon/ethdb/privateapi"
"github.com/ledgerwatch/erigon/ethdb/prune"
"github.com/ledgerwatch/erigon/ethstats"
"github.com/ledgerwatch/erigon/node"
"github.com/ledgerwatch/erigon/p2p"
"github.com/ledgerwatch/erigon/p2p/enode"
"github.com/ledgerwatch/erigon/p2p/sentry"
"github.com/ledgerwatch/erigon/p2p/sentry/sentry_multi_client"
"github.com/ledgerwatch/erigon/params"
"github.com/ledgerwatch/erigon/rpc"
"github.com/ledgerwatch/erigon/turbo/builder"
"github.com/ledgerwatch/erigon/turbo/engineapi"
"github.com/ledgerwatch/erigon/turbo/engineapi/engine_block_downloader"
"github.com/ledgerwatch/erigon/turbo/engineapi/engine_helpers"
"github.com/ledgerwatch/erigon/turbo/execution/eth1"
"github.com/ledgerwatch/erigon/turbo/execution/eth1/eth1_chain_reader.go"
"github.com/ledgerwatch/erigon/turbo/jsonrpc"
"github.com/ledgerwatch/erigon/turbo/services"
"github.com/ledgerwatch/erigon/turbo/shards"
"github.com/ledgerwatch/erigon/turbo/silkworm"
"github.com/ledgerwatch/erigon/turbo/snapshotsync/freezeblocks"
"github.com/ledgerwatch/erigon/turbo/snapshotsync/snap"
stages2 "github.com/ledgerwatch/erigon/turbo/stages"
"github.com/ledgerwatch/erigon/turbo/stages/headerdownload"
)
@ -1051,6 +1046,11 @@ func (s *Ethereum) StartMining(ctx context.Context, db kv.RwDB, mining *stagedsy
borcfg.Authorize(eb, func(_ libcommon.Address, mimeType string, message []byte) ([]byte, error) {
return crypto.Sign(crypto.Keccak256(message), cfg.SigKey)
})
err := stagedsync.FetchSpanZeroForMiningIfNeeded(ctx, s.chainDB, s.blockReader, borcfg.HeimdallClient, logger)
if err != nil {
return err
}
} else {
// for the bor dev network without heimdall we need the authorizer to be set otherwise there is no
// validator defined in the bor validator set and non mining nodes will reject all blocks

View File

@ -0,0 +1,313 @@
package stagedsync
import (
"context"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"math/big"
"strconv"
"strings"
"time"
"github.com/ledgerwatch/log/v3"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/consensus/bor/heimdall"
"github.com/ledgerwatch/erigon/consensus/bor/heimdall/span"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/rlp"
"github.com/ledgerwatch/erigon/turbo/services"
)
var (
ErrHeaderValidatorsLengthMismatch = errors.New("header validators length mismatch")
ErrHeaderValidatorsBytesMismatch = errors.New("header validators bytes mismatch")
)
// LastSpanID TODO - move to block reader
func LastSpanID(tx kv.RwTx, blockReader services.FullBlockReader) (uint64, bool, error) {
sCursor, err := tx.Cursor(kv.BorSpans)
if err != nil {
return 0, false, err
}
defer sCursor.Close()
k, _, err := sCursor.Last()
if err != nil {
return 0, false, err
}
var lastSpanId uint64
if k != nil {
lastSpanId = binary.BigEndian.Uint64(k)
}
// TODO tidy this out when moving to block reader
type LastFrozen interface {
LastFrozenSpanID() uint64
}
snapshotLastSpanId := blockReader.(LastFrozen).LastFrozenSpanID()
if snapshotLastSpanId > lastSpanId {
return snapshotLastSpanId, true, nil
}
return lastSpanId, k != nil, nil
}
// LastStateSyncEventID TODO - move to block reader
func LastStateSyncEventID(tx kv.RwTx, blockReader services.FullBlockReader) (uint64, error) {
cursor, err := tx.Cursor(kv.BorEvents)
if err != nil {
return 0, err
}
defer cursor.Close()
k, _, err := cursor.Last()
if err != nil {
return 0, err
}
var lastEventId uint64
if k != nil {
lastEventId = binary.BigEndian.Uint64(k)
}
// TODO tidy this out when moving to block reader
type LastFrozen interface {
LastFrozenEventID() uint64
}
snapshotLastEventId := blockReader.(LastFrozen).LastFrozenEventID()
if snapshotLastEventId > lastEventId {
return snapshotLastEventId, nil
}
return lastEventId, nil
}
func FetchSpanZeroForMiningIfNeeded(
ctx context.Context,
db kv.RwDB,
blockReader services.FullBlockReader,
heimdallClient heimdall.IHeimdallClient,
logger log.Logger,
) error {
return db.Update(ctx, func(tx kv.RwTx) error {
_, err := blockReader.Span(ctx, tx, 0)
if err == nil {
return err
}
// TODO refactor to use errors.Is
if !strings.Contains(err.Error(), "not found") {
// span exists, no need to fetch
return nil
}
_, err = fetchAndWriteHeimdallSpan(ctx, 0, tx, heimdallClient, "FetchSpanZeroForMiningIfNeeded", logger)
return err
})
}
func fetchRequiredHeimdallSpansIfNeeded(
ctx context.Context,
toBlockNum uint64,
tx kv.RwTx,
cfg BorHeimdallCfg,
logPrefix string,
logger log.Logger,
) (uint64, error) {
requiredSpanID := span.IDAt(toBlockNum)
if span.BlockInLastSprintOfSpan(toBlockNum, cfg.borConfig) {
requiredSpanID++
}
lastSpanID, exists, err := LastSpanID(tx, cfg.blockReader)
if err != nil {
return 0, err
}
if exists && requiredSpanID <= lastSpanID {
return lastSpanID, nil
}
var from uint64
if lastSpanID > 0 {
from = lastSpanID + 1
} // else fetch from span 0
logger.Info(fmt.Sprintf("[%s] Processing spans...", logPrefix), "from", from, "to", requiredSpanID)
for spanID := from; spanID <= requiredSpanID; spanID++ {
if _, err = fetchAndWriteHeimdallSpan(ctx, spanID, tx, cfg.heimdallClient, logPrefix, logger); err != nil {
return 0, err
}
}
return requiredSpanID, err
}
func fetchAndWriteHeimdallSpan(
ctx context.Context,
spanID uint64,
tx kv.RwTx,
heimdallClient heimdall.IHeimdallClient,
logPrefix string,
logger log.Logger,
) (uint64, error) {
response, err := heimdallClient.Span(ctx, spanID)
if err != nil {
return 0, err
}
spanBytes, err := json.Marshal(response)
if err != nil {
return 0, err
}
var spanIDBytes [8]byte
binary.BigEndian.PutUint64(spanIDBytes[:], spanID)
if err = tx.Put(kv.BorSpans, spanIDBytes[:], spanBytes); err != nil {
return 0, err
}
logger.Debug(fmt.Sprintf("[%s] Wrote span", logPrefix), "id", spanID)
return spanID, nil
}
func fetchRequiredHeimdallStateSyncEventsIfNeeded(
ctx context.Context,
header *types.Header,
tx kv.RwTx,
cfg BorHeimdallCfg,
logPrefix string,
logger log.Logger,
lastStateSyncEventIDGetter func() (uint64, error),
) (uint64, int, time.Duration, error) {
lastStateSyncEventID, err := lastStateSyncEventIDGetter()
if err != nil {
return 0, 0, 0, err
}
headerNum := header.Number.Uint64()
if headerNum%cfg.borConfig.CalculateSprintLength(headerNum) != 0 || headerNum == 0 {
// we fetch events only at beginning of each sprint
return lastStateSyncEventID, 0, 0, nil
}
return fetchAndWriteHeimdallStateSyncEvents(ctx, header, lastStateSyncEventID, tx, cfg, logPrefix, logger)
}
func fetchAndWriteHeimdallStateSyncEvents(
ctx context.Context,
header *types.Header,
lastStateSyncEventID uint64,
tx kv.RwTx,
cfg BorHeimdallCfg,
logPrefix string,
logger log.Logger,
) (uint64, int, time.Duration, error) {
fetchStart := time.Now()
config := cfg.borConfig
blockReader := cfg.blockReader
heimdallClient := cfg.heimdallClient
chainID := cfg.chainConfig.ChainID.String()
stateReceiverABI := cfg.stateReceiverABI
// Find out the latest eventId
var (
from uint64
to time.Time
)
blockNum := header.Number.Uint64()
if config.IsIndore(blockNum) {
stateSyncDelay := config.CalculateStateSyncDelay(blockNum)
to = time.Unix(int64(header.Time-stateSyncDelay), 0)
} else {
pHeader, err := blockReader.HeaderByNumber(ctx, tx, blockNum-config.CalculateSprintLength(blockNum))
if err != nil {
return lastStateSyncEventID, 0, time.Since(fetchStart), err
}
to = time.Unix(int64(pHeader.Time), 0)
}
from = lastStateSyncEventID + 1
logger.Debug(
fmt.Sprintf("[%s] Fetching state updates from Heimdall", logPrefix),
"fromID", from,
"to", to.Format(time.RFC3339),
)
eventRecords, err := heimdallClient.StateSyncEvents(ctx, from, to.Unix())
if err != nil {
return lastStateSyncEventID, 0, time.Since(fetchStart), err
}
if config.OverrideStateSyncRecords != nil {
if val, ok := config.OverrideStateSyncRecords[strconv.FormatUint(blockNum, 10)]; ok {
eventRecords = eventRecords[0:val]
}
}
if len(eventRecords) > 0 {
var key, val [8]byte
binary.BigEndian.PutUint64(key[:], blockNum)
binary.BigEndian.PutUint64(val[:], lastStateSyncEventID+1)
}
const method = "commitState"
wroteIndex := false
for i, eventRecord := range eventRecords {
if eventRecord.ID <= lastStateSyncEventID {
continue
}
if lastStateSyncEventID+1 != eventRecord.ID || eventRecord.ChainID != chainID || !eventRecord.Time.Before(to) {
return lastStateSyncEventID, i, time.Since(fetchStart), fmt.Errorf(fmt.Sprintf(
"invalid event record received %s, %s, %s, %s",
fmt.Sprintf("blockNum=%d", blockNum),
fmt.Sprintf("eventId=%d (exp %d)", eventRecord.ID, lastStateSyncEventID+1),
fmt.Sprintf("chainId=%s (exp %s)", eventRecord.ChainID, chainID),
fmt.Sprintf("time=%s (exp to %s)", eventRecord.Time, to),
))
}
eventRecordWithoutTime := eventRecord.BuildEventRecord()
recordBytes, err := rlp.EncodeToBytes(eventRecordWithoutTime)
if err != nil {
return lastStateSyncEventID, i, time.Since(fetchStart), err
}
data, err := stateReceiverABI.Pack(method, big.NewInt(eventRecord.Time.Unix()), recordBytes)
if err != nil {
logger.Error(fmt.Sprintf("[%s] Unable to pack tx for commitState", logPrefix), "err", err)
return lastStateSyncEventID, i, time.Since(fetchStart), err
}
var eventIdBuf [8]byte
binary.BigEndian.PutUint64(eventIdBuf[:], eventRecord.ID)
if err = tx.Put(kv.BorEvents, eventIdBuf[:], data); err != nil {
return lastStateSyncEventID, i, time.Since(fetchStart), err
}
if !wroteIndex {
var blockNumBuf [8]byte
binary.BigEndian.PutUint64(blockNumBuf[:], blockNum)
binary.BigEndian.PutUint64(eventIdBuf[:], eventRecord.ID)
if err = tx.Put(kv.BorEventNums, blockNumBuf[:], eventIdBuf[:]); err != nil {
return lastStateSyncEventID, i, time.Since(fetchStart), err
}
wroteIndex = true
}
lastStateSyncEventID++
}
return lastStateSyncEventID, len(eventRecords), time.Since(fetchStart), nil
}

View File

@ -3,12 +3,13 @@ package stagedsync
import (
"context"
"github.com/ledgerwatch/log/v3"
"github.com/ledgerwatch/erigon-lib/common/dbg"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/wrap"
"github.com/ledgerwatch/erigon/eth/ethconfig"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/log/v3"
)
func DefaultStages(ctx context.Context,
@ -67,7 +68,7 @@ func DefaultStages(ctx context.Context,
if badBlockUnwind {
return nil
}
return BorHeimdallForward(s, u, ctx, txc.Tx, borHeimdallCfg, false, logger)
return BorHeimdallForward(s, u, ctx, txc.Tx, borHeimdallCfg, logger)
},
Unwind: func(firstCycle bool, u *UnwindState, s *StageState, txc wrap.TxContainer, logger log.Logger) error {
return BorHeimdallUnwind(u, ctx, s, txc.Tx, borHeimdallCfg)

View File

@ -5,11 +5,8 @@ import (
"context"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"math/big"
"sort"
"strconv"
"time"
lru "github.com/hashicorp/golang-lru/arc/v2"
@ -34,7 +31,6 @@ import (
"github.com/ledgerwatch/erigon/dataflow"
"github.com/ledgerwatch/erigon/eth/ethconfig/estimate"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/erigon/rlp"
"github.com/ledgerwatch/erigon/turbo/services"
"github.com/ledgerwatch/erigon/turbo/stages/headerdownload"
)
@ -47,11 +43,6 @@ const (
extraSeal = 65 // Fixed number of extra-data suffix bytes reserved for signer seal
)
var (
ErrHeaderValidatorsLengthMismatch = errors.New("header validators length mismatch")
ErrHeaderValidatorsBytesMismatch = errors.New("header validators bytes mismatch")
)
type BorHeimdallCfg struct {
db kv.RwDB
snapDb kv.RwDB // Database to store and retrieve snapshot checkpoints
@ -109,7 +100,6 @@ func BorHeimdallForward(
ctx context.Context,
tx kv.RwTx,
cfg BorHeimdallCfg,
mine bool,
logger log.Logger,
) (err error) {
processStart := time.Now()
@ -130,114 +120,42 @@ func BorHeimdallForward(
defer tx.Rollback()
}
var header *types.Header
var headNumber uint64
headNumber, err = stages.GetStageProgress(tx, stages.Headers)
headNumber, err := stages.GetStageProgress(tx, stages.Headers)
if err != nil {
return err
}
service := whitelist.GetWhitelistingService()
if generics.BorMilestoneRewind.Load() != nil && *generics.BorMilestoneRewind.Load() != 0 {
unwindPoint := *generics.BorMilestoneRewind.Load()
var reset uint64 = 0
generics.BorMilestoneRewind.Store(&reset)
if service != nil && unwindPoint < headNumber {
header, err = cfg.blockReader.HeaderByNumber(ctx, tx, headNumber)
logger.Debug("[BorHeimdall] Verification failed for header", "hash", header.Hash(), "height", headNumber, "err", err)
cfg.penalize(ctx, []headerdownload.PenaltyItem{
{Penalty: headerdownload.BadBlockPenalty, PeerID: cfg.hd.SourcePeerId(header.Hash())}})
whitelistService := whitelist.GetWhitelistingService()
if unwindPointPtr := generics.BorMilestoneRewind.Load(); unwindPointPtr != nil && *unwindPointPtr != 0 {
unwindPoint := *unwindPointPtr
if whitelistService != nil && unwindPoint < headNumber {
header, err := cfg.blockReader.HeaderByNumber(ctx, tx, headNumber)
if err != nil {
return err
}
hash := header.Hash()
logger.Debug(
fmt.Sprintf("[%s] Verification failed for header due to milestone rewind", s.LogPrefix()),
"hash", hash,
"height", headNumber,
)
cfg.penalize(ctx, []headerdownload.PenaltyItem{{
Penalty: headerdownload.BadBlockPenalty,
PeerID: cfg.hd.SourcePeerId(hash),
}})
dataflow.HeaderDownloadStates.AddChange(headNumber, dataflow.HeaderInvalidated)
s.state.UnwindTo(unwindPoint, ForkReset(header.Hash()))
s.state.UnwindTo(unwindPoint, ForkReset(hash))
var reset uint64 = 0
generics.BorMilestoneRewind.Store(&reset)
return fmt.Errorf("verification failed for header %d: %x", headNumber, header.Hash())
}
}
if mine {
minedHeader := cfg.miningState.MiningBlock.Header
if minedHeadNumber := minedHeader.Number.Uint64(); minedHeadNumber > headNumber {
// Whitelist service is called to check if the bor chain is
// on the cannonical chain according to milestones
if service != nil {
if !service.IsValidChain(minedHeadNumber, []*types.Header{minedHeader}) {
logger.Debug("[BorHeimdall] Verification failed for mined header", "hash", minedHeader.Hash(), "height", minedHeadNumber, "err", err)
dataflow.HeaderDownloadStates.AddChange(minedHeadNumber, dataflow.HeaderInvalidated)
s.state.UnwindTo(minedHeadNumber-1, ForkReset(minedHeader.Hash()))
return fmt.Errorf("mining on a wrong fork %d:%x", minedHeadNumber, minedHeader.Hash())
}
}
} else {
return fmt.Errorf("attempting to mine %d, which is behind current head: %d", minedHeadNumber, headNumber)
}
}
if err != nil {
return fmt.Errorf("getting headers progress: %w", err)
}
if s.BlockNumber == headNumber {
return nil
}
// Find out the latest event Id
cursor, err := tx.Cursor(kv.BorEvents)
if err != nil {
return err
}
defer cursor.Close()
k, _, err := cursor.Last()
if err != nil {
return err
}
var lastEventId uint64
if k != nil {
lastEventId = binary.BigEndian.Uint64(k)
}
type LastFrozen interface {
LastFrozenEventID() uint64
LastFrozenSpanID() uint64
}
snapshotLastEventId := cfg.blockReader.(LastFrozen).LastFrozenEventID()
if snapshotLastEventId > lastEventId {
lastEventId = snapshotLastEventId
}
sCursor, err := tx.Cursor(kv.BorSpans)
if err != nil {
return err
}
defer sCursor.Close()
k, _, err = sCursor.Last()
if err != nil {
return err
}
var lastSpanId uint64
if k != nil {
lastSpanId = binary.BigEndian.Uint64(k)
}
snapshotLastSpanId := cfg.blockReader.(LastFrozen).LastFrozenSpanID()
if snapshotLastSpanId > lastSpanId {
lastSpanId = snapshotLastSpanId
}
var nextSpanId uint64
if lastSpanId > 0 {
nextSpanId = lastSpanId + 1
}
var endSpanID uint64
if span.IDAt(headNumber) > 0 {
endSpanID = span.IDAt(headNumber + 1)
}
if span.BlockInLastSprintOfSpan(headNumber, cfg.borConfig) {
endSpanID++
}
lastBlockNum := s.BlockNumber
if cfg.blockReader.FrozenBorBlocks() > lastBlockNum {
lastBlockNum = cfg.blockReader.FrozenBorBlocks()
@ -256,97 +174,95 @@ func BorHeimdallForward(
var fetchTime time.Duration
var eventRecords int
lastSpanID, err := fetchRequiredHeimdallSpansIfNeeded(ctx, headNumber, tx, cfg, s.LogPrefix(), logger)
if err != nil {
return err
}
lastStateSyncEventID, err := LastStateSyncEventID(tx, cfg.blockReader)
if err != nil {
return err
}
logTimer := time.NewTicker(logInterval)
defer logTimer.Stop()
if endSpanID >= nextSpanId {
logger.Info("["+s.LogPrefix()+"] Processing spans...", "from", nextSpanId, "to", endSpanID)
}
for spanID := nextSpanId; spanID <= endSpanID; spanID++ {
if lastSpanId, err = fetchAndWriteSpans(ctx, spanID, tx, cfg.heimdallClient, s.LogPrefix(), logger); err != nil {
return err
}
}
if !mine {
logger.Info("["+s.LogPrefix()+"] Processing sync events...", "from", lastBlockNum+1, "to", headNumber)
}
logger.Info("["+s.LogPrefix()+"] Processing sync events...", "from", lastBlockNum+1, "to", headNumber)
for blockNum = lastBlockNum + 1; blockNum <= headNumber; blockNum++ {
select {
default:
case <-logTimer.C:
logger.Info("["+s.LogPrefix()+"] StateSync Progress", "progress", blockNum, "lastSpanId", lastSpanId, "lastEventId", lastEventId, "total records", eventRecords, "fetch time", fetchTime, "process time", time.Since(processStart))
logger.Info("["+s.LogPrefix()+"] StateSync Progress", "progress", blockNum, "lastSpanID", lastSpanID, "lastStateSyncEventID", lastStateSyncEventID, "total records", eventRecords, "fetch time", fetchTime, "process time", time.Since(processStart))
}
if !mine {
header, err = cfg.blockReader.HeaderByNumber(ctx, tx, blockNum)
if err != nil {
return err
}
if header == nil {
return fmt.Errorf("header not found: %d", blockNum)
}
header, err := cfg.blockReader.HeaderByNumber(ctx, tx, blockNum)
if err != nil {
return err
}
if header == nil {
return fmt.Errorf("header not found: %d", blockNum)
}
// Whitelist service is called to check if the bor chain is
// on the cannonical chain according to milestones
if service != nil {
if !service.IsValidChain(blockNum, []*types.Header{header}) {
logger.Debug("["+s.LogPrefix()+"] Verification failed for header", "height", blockNum, "hash", header.Hash())
cfg.penalize(ctx, []headerdownload.PenaltyItem{
{Penalty: headerdownload.BadBlockPenalty, PeerID: cfg.hd.SourcePeerId(header.Hash())}})
dataflow.HeaderDownloadStates.AddChange(blockNum, dataflow.HeaderInvalidated)
s.state.UnwindTo(blockNum-1, ForkReset(header.Hash()))
return fmt.Errorf("verification failed for header %d: %x", blockNum, header.Hash())
// Whitelist whitelistService is called to check if the bor chain is
// on the cannonical chain according to milestones
if whitelistService != nil && !whitelistService.IsValidChain(blockNum, []*types.Header{header}) {
logger.Debug("["+s.LogPrefix()+"] Verification failed for header", "height", blockNum, "hash", header.Hash())
cfg.penalize(ctx, []headerdownload.PenaltyItem{{
Penalty: headerdownload.BadBlockPenalty,
PeerID: cfg.hd.SourcePeerId(header.Hash()),
}})
dataflow.HeaderDownloadStates.AddChange(blockNum, dataflow.HeaderInvalidated)
s.state.UnwindTo(blockNum-1, ForkReset(header.Hash()))
return fmt.Errorf("verification failed for header %d: %x", blockNum, header.Hash())
}
if blockNum > cfg.blockReader.BorSnapshots().SegmentsMin() {
// SegmentsMin is only set if running as an uploader process (check SnapshotsCfg.snapshotUploader and
// UploadLocationFlag) when we remove snapshots based on FrozenBlockLimit and number of uploaded snapshots
// avoid calling this if block for blockNums <= SegmentsMin to avoid reinsertion of snapshots
snap := loadSnapshot(blockNum, header.Hash(), cfg.borConfig, recents, signatures, cfg.snapDb, logger)
if snap == nil {
snap, err = initValidatorSets(ctx, tx, cfg.blockReader, cfg.borConfig,
cfg.heimdallClient, chain, blockNum, recents, signatures, cfg.snapDb, logger, s.LogPrefix())
if err != nil {
return fmt.Errorf("can't initialise validator sets: %w", err)
}
}
sprintLength := cfg.borConfig.CalculateSprintLength(blockNum)
spanID := span.IDAt(blockNum)
if (spanID > 0) && ((blockNum+1)%sprintLength == 0) {
if err = checkHeaderExtraData(u, ctx, chain, blockNum, header, cfg.borConfig); err != nil {
return err
}
if err = persistValidatorSets(ctx, snap, u, tx, cfg.blockReader, cfg.borConfig, chain, blockNum, header.Hash(), recents, signatures, cfg.snapDb, logger, s.LogPrefix()); err != nil {
return fmt.Errorf("can't persist validator sets: %w", err)
}
}
if blockNum > 0 && blockNum%cfg.borConfig.CalculateSprintLength(blockNum) == 0 {
var callTime time.Duration
var records int
if lastEventId, records, callTime, err = fetchAndWriteBorEvents(ctx, cfg.blockReader, cfg.borConfig, header, lastEventId, cfg.chainConfig.ChainID.String(), tx, cfg.heimdallClient, cfg.stateReceiverABI, s.LogPrefix(), logger); err != nil {
return err
}
eventRecords += records
fetchTime += callTime
if err := checkBorHeaderExtraDataIfRequired(chain, header, cfg.borConfig); err != nil {
return err
}
var snap *bor.Snapshot
if header != nil {
if blockNum > cfg.blockReader.BorSnapshots().SegmentsMin() {
// SegmentsMin is only set if running as an uploader process (check SnapshotsCfg.snapshotUploader and
// UploadLocationFlag) when we remove snapshots based on FrozenBlockLimit and number of uploaded snapshots
// avoid calling this if block for blockNums <= SegmentsMin to avoid reinsertion of snapshots
snap = loadSnapshot(blockNum, header.Hash(), cfg.borConfig, recents, signatures, cfg.snapDb, logger)
if snap == nil {
snap, err = initValidatorSets(ctx, tx, cfg.blockReader, cfg.borConfig,
cfg.heimdallClient, chain, blockNum, recents, signatures, cfg.snapDb, logger, s.LogPrefix())
if err != nil {
return fmt.Errorf("can't initialise validator sets: %w", err)
}
}
if err = persistValidatorSets(ctx, snap, u, tx, cfg.blockReader, cfg.borConfig, chain, blockNum, header.Hash(), recents, signatures, cfg.snapDb, logger, s.LogPrefix()); err != nil {
return fmt.Errorf("can't persist validator sets: %w", err)
}
}
var callTime time.Duration
var records int
lastStateSyncEventID, records, callTime, err = fetchRequiredHeimdallStateSyncEventsIfNeeded(
ctx,
header,
tx,
cfg,
s.LogPrefix(),
logger,
func() (uint64, error) {
return lastStateSyncEventID, nil
},
)
if err != nil {
return err
}
eventRecords += records
fetchTime += callTime
if cfg.loopBreakCheck != nil && cfg.loopBreakCheck(int(blockNum-lastBlockNum)) {
break
}
}
if err = s.Update(tx, headNumber); err != nil {
@ -359,181 +275,11 @@ func BorHeimdallForward(
}
}
logger.Info("["+s.LogPrefix()+"] Sync events processed", "progress", blockNum-1, "lastSpanId", lastSpanId, "lastEventId", lastEventId, "total records", eventRecords, "fetch time", fetchTime, "process time", time.Since(processStart))
logger.Info("["+s.LogPrefix()+"] Sync events processed", "progress", blockNum-1, "lastSpanID", lastSpanID, "lastStateSyncEventID", lastStateSyncEventID, "total records", eventRecords, "fetch time", fetchTime, "process time", time.Since(processStart))
return
}
func checkHeaderExtraData(
u Unwinder,
ctx context.Context,
chain consensus.ChainHeaderReader,
blockNum uint64,
header *types.Header,
config *borcfg.BorConfig,
) error {
spanID := span.IDAt(blockNum + 1)
spanBytes := chain.BorSpan(spanID)
var sp span.HeimdallSpan
if err := json.Unmarshal(spanBytes, &sp); err != nil {
return err
}
producerSet := make([]*valset.Validator, len(sp.SelectedProducers))
for i := range sp.SelectedProducers {
producerSet[i] = &sp.SelectedProducers[i]
}
sort.Sort(valset.ValidatorsByAddress(producerSet))
headerVals, err := valset.ParseValidators(bor.GetValidatorBytes(header, config))
if err != nil {
return err
}
if len(producerSet) != len(headerVals) {
return ErrHeaderValidatorsLengthMismatch
}
for i, val := range producerSet {
if !bytes.Equal(val.HeaderBytes(), headerVals[i].HeaderBytes()) {
return ErrHeaderValidatorsBytesMismatch
}
}
return nil
}
func fetchAndWriteBorEvents(
ctx context.Context,
blockReader services.FullBlockReader,
config *borcfg.BorConfig,
header *types.Header,
lastEventId uint64,
chainID string,
tx kv.RwTx,
heimdallClient heimdall.IHeimdallClient,
stateReceiverABI abi.ABI,
logPrefix string,
logger log.Logger,
) (uint64, int, time.Duration, error) {
fetchStart := time.Now()
// Find out the latest eventId
var (
from uint64
to time.Time
)
if header == nil {
return 0, 0, 0, fmt.Errorf("can't fetch events for nil header")
}
blockNum := header.Number.Uint64()
if config.IsIndore(blockNum) {
stateSyncDelay := config.CalculateStateSyncDelay(blockNum)
to = time.Unix(int64(header.Time-stateSyncDelay), 0)
} else {
pHeader, err := blockReader.HeaderByNumber(ctx, tx, blockNum-config.CalculateSprintLength(blockNum))
if err != nil {
return lastEventId, 0, time.Since(fetchStart), err
}
to = time.Unix(int64(pHeader.Time), 0)
}
from = lastEventId + 1
logger.Debug(
fmt.Sprintf("[%s] Fetching state updates from Heimdall", logPrefix),
"fromID", from,
"to", to.Format(time.RFC3339),
)
eventRecords, err := heimdallClient.StateSyncEvents(ctx, from, to.Unix())
if err != nil {
return lastEventId, 0, time.Since(fetchStart), err
}
if config.OverrideStateSyncRecords != nil {
if val, ok := config.OverrideStateSyncRecords[strconv.FormatUint(blockNum, 10)]; ok {
eventRecords = eventRecords[0:val]
}
}
if len(eventRecords) > 0 {
var key, val [8]byte
binary.BigEndian.PutUint64(key[:], blockNum)
binary.BigEndian.PutUint64(val[:], lastEventId+1)
}
const method = "commitState"
wroteIndex := false
for i, eventRecord := range eventRecords {
if eventRecord.ID <= lastEventId {
continue
}
if lastEventId+1 != eventRecord.ID || eventRecord.ChainID != chainID || !eventRecord.Time.Before(to) {
return lastEventId, i, time.Since(fetchStart), fmt.Errorf("invalid event record received blockNum=%d, eventId=%d (exp %d), chainId=%s (exp %s), time=%s (exp to %s)", blockNum, eventRecord.ID, lastEventId+1, eventRecord.ChainID, chainID, eventRecord.Time, to)
}
eventRecordWithoutTime := eventRecord.BuildEventRecord()
recordBytes, err := rlp.EncodeToBytes(eventRecordWithoutTime)
if err != nil {
return lastEventId, i, time.Since(fetchStart), err
}
data, err := stateReceiverABI.Pack(method, big.NewInt(eventRecord.Time.Unix()), recordBytes)
if err != nil {
logger.Error(fmt.Sprintf("[%s] Unable to pack tx for commitState", logPrefix), "err", err)
return lastEventId, i, time.Since(fetchStart), err
}
var eventIdBuf [8]byte
binary.BigEndian.PutUint64(eventIdBuf[:], eventRecord.ID)
if err = tx.Put(kv.BorEvents, eventIdBuf[:], data); err != nil {
return lastEventId, i, time.Since(fetchStart), err
}
if !wroteIndex {
var blockNumBuf [8]byte
binary.BigEndian.PutUint64(blockNumBuf[:], blockNum)
binary.BigEndian.PutUint64(eventIdBuf[:], eventRecord.ID)
if err = tx.Put(kv.BorEventNums, blockNumBuf[:], eventIdBuf[:]); err != nil {
return lastEventId, i, time.Since(fetchStart), err
}
wroteIndex = true
}
lastEventId++
}
return lastEventId, len(eventRecords), time.Since(fetchStart), nil
}
func fetchAndWriteSpans(
ctx context.Context,
spanId uint64,
tx kv.RwTx,
heimdallClient heimdall.IHeimdallClient,
logPrefix string,
logger log.Logger,
) (uint64, error) {
response, err := heimdallClient.Span(ctx, spanId)
if err != nil {
return 0, err
}
spanBytes, err := json.Marshal(response)
if err != nil {
return 0, err
}
var spanIDBytes [8]byte
binary.BigEndian.PutUint64(spanIDBytes[:], spanId)
if err = tx.Put(kv.BorSpans, spanIDBytes[:], spanBytes); err != nil {
return 0, err
}
logger.Debug(fmt.Sprintf("[%s] Wrote span", logPrefix), "id", spanId)
return spanId, nil
}
func loadSnapshot(blockNum uint64, hash libcommon.Hash, config *borcfg.BorConfig, recents *lru.ARCCache[libcommon.Hash, *bor.Snapshot],
signatures *lru.ARCCache[libcommon.Hash, libcommon.Address],
snapDb kv.RwDB,
@ -705,7 +451,7 @@ func initValidatorSets(
zeroSpanBytes, err := blockReader.Span(ctx, tx, 0)
if err != nil {
if _, err := fetchAndWriteSpans(ctx, 0, tx, heimdallClient, logPrefix, logger); err != nil {
if _, err := fetchAndWriteHeimdallSpan(ctx, 0, tx, heimdallClient, logPrefix, logger); err != nil {
return nil, err
}
@ -777,6 +523,50 @@ func initValidatorSets(
return snap, nil
}
func checkBorHeaderExtraDataIfRequired(chr consensus.ChainHeaderReader, header *types.Header, cfg *borcfg.BorConfig) error {
blockNum := header.Number.Uint64()
sprintLength := cfg.CalculateSprintLength(blockNum)
if (blockNum+1)%sprintLength != 0 {
// not last block of a sprint in a span, so no check needed (we only check last block of a sprint)
return nil
}
return checkBorHeaderExtraData(chr, header, cfg)
}
func checkBorHeaderExtraData(chr consensus.ChainHeaderReader, header *types.Header, cfg *borcfg.BorConfig) error {
spanID := span.IDAt(header.Number.Uint64() + 1)
spanBytes := chr.BorSpan(spanID)
var sp span.HeimdallSpan
if err := json.Unmarshal(spanBytes, &sp); err != nil {
return err
}
producerSet := make([]*valset.Validator, len(sp.SelectedProducers))
for i := range sp.SelectedProducers {
producerSet[i] = &sp.SelectedProducers[i]
}
sort.Sort(valset.ValidatorsByAddress(producerSet))
headerVals, err := valset.ParseValidators(bor.GetValidatorBytes(header, cfg))
if err != nil {
return err
}
if len(producerSet) != len(headerVals) {
return ErrHeaderValidatorsLengthMismatch
}
for i, val := range producerSet {
if !bytes.Equal(val.HeaderBytes(), headerVals[i].HeaderBytes()) {
return ErrHeaderValidatorsBytesMismatch
}
}
return nil
}
func BorHeimdallUnwind(u *UnwindState, ctx context.Context, s *StageState, tx kv.RwTx, cfg BorHeimdallCfg) (err error) {
if cfg.borConfig == nil {
return

View File

@ -0,0 +1,89 @@
package stagedsync
import (
"context"
"fmt"
"github.com/ledgerwatch/log/v3"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/consensus/bor/finality/whitelist"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/dataflow"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
)
func MiningBorHeimdallForward(
ctx context.Context,
cfg BorHeimdallCfg,
stageStage *StageState,
unwinder Unwinder,
tx kv.RwTx,
logger log.Logger,
) error {
if cfg.borConfig == nil || cfg.heimdallClient == nil {
return nil
}
logPrefix := stageStage.LogPrefix()
headerStageProgress, err := stages.GetStageProgress(tx, stages.Headers)
if err != nil {
return err
}
header := cfg.miningState.MiningBlock.Header
headerNum := header.Number.Uint64()
if headerNum <= headerStageProgress {
return fmt.Errorf("attempting to mine %d, which is behind current head: %d", headerNum, headerStageProgress)
}
// Whitelist service is called to check if the bor chain is on the canonical chain according to milestones
whitelistService := whitelist.GetWhitelistingService()
if whitelistService != nil && !whitelistService.IsValidChain(headerNum, []*types.Header{header}) {
hash := header.Hash()
logger.Debug(
fmt.Sprintf("[%s] Verification failed for mined header", logPrefix),
"hash", hash,
"height", headerNum,
"err", err,
)
dataflow.HeaderDownloadStates.AddChange(headerNum, dataflow.HeaderInvalidated)
unwinder.UnwindTo(headerNum-1, ForkReset(hash))
return fmt.Errorf("mining on a wrong fork %d:%x", headerNum, hash)
}
lastSpanID, err := fetchRequiredHeimdallSpansIfNeeded(ctx, headerNum, tx, cfg, logPrefix, logger)
if err != nil {
return err
}
lastStateSyncEventID, records, fetchTime, err := fetchRequiredHeimdallStateSyncEventsIfNeeded(
ctx,
header,
tx,
cfg,
logPrefix,
logger,
func() (uint64, error) {
return LastStateSyncEventID(tx, cfg.blockReader)
},
)
if err != nil {
return err
}
if err = stageStage.Update(tx, headerNum); err != nil {
return err
}
logger.Info(
"[%s] Finished processing", logPrefix,
"progress", headerNum,
"lastSpanID", lastSpanID,
"lastStateSyncEventID", lastStateSyncEventID,
"stateSyncEventTotalRecords", records,
"stateSyncEventFetchTime", fetchTime,
)
return nil
}

View File

@ -3,12 +3,13 @@ package stagedsync
import (
"context"
"github.com/ledgerwatch/log/v3"
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/wrap"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/log/v3"
)
type ChainEventNotifier interface {
@ -40,13 +41,13 @@ func MiningStages(
Prune: func(firstCycle bool, u *PruneState, tx kv.RwTx, logger log.Logger) error { return nil },
},
{
ID: stages.BorHeimdall,
ID: stages.MiningBorHeimdall,
Description: "Download Bor-specific data from Heimdall",
Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, txc wrap.TxContainer, logger log.Logger) error {
if badBlockUnwind {
return nil
}
return BorHeimdallForward(s, u, ctx, txc.Tx, borHeimdallCfg, true, logger)
return MiningBorHeimdallForward(ctx, borHeimdallCfg, s, u, txc.Tx, logger)
},
Unwind: func(firstCycle bool, u *UnwindState, s *StageState, txc wrap.TxContainer, logger log.Logger) error {
return BorHeimdallUnwind(u, ctx, s, txc.Tx, borHeimdallCfg)
@ -59,8 +60,6 @@ func MiningStages(
ID: stages.MiningExecution,
Description: "Mining: execute new block from tx pool",
Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, txc wrap.TxContainer, logger log.Logger) error {
//fmt.Println("SpawnMiningExecStage")
//defer fmt.Println("SpawnMiningExecStage", "DONE")
return SpawnMiningExecStage(s, txc.Tx, execCfg, ctx.Done(), logger)
},
Unwind: func(firstCycle bool, u *UnwindState, s *StageState, txc wrap.TxContainer, logger log.Logger) error {

View File

@ -49,6 +49,7 @@ var (
Finish SyncStage = "Finish" // Nominal stage after all other stages
MiningCreateBlock SyncStage = "MiningCreateBlock"
MiningBorHeimdall SyncStage = "MiningBorHeimdall"
MiningExecution SyncStage = "MiningExecution"
MiningFinish SyncStage = "MiningFinish"
// Beacon chain stages