Fixing hive SideChain reorg test (#5620)

the root cause is that when `inMemoryExecution` lambda gets created in
the `eth/backend.go`, it captures the reference of
`backend.notifications`, and so the execution of side-forks actually
adds notifications to there, and it all gets sent out to tx pool (and
RPC daemon) at the end of the stage loop (regardless of whether there
was forkchoice update or not)

so we can create a separate notification, but then somehow flush it to
the "main" nofitications when the in-memory exec state is flushed

Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local>
Co-authored-by: Alex Sharp <alexsharp@Alexs-MacBook-Pro.local>
This commit is contained in:
ledgerwatch 2022-10-05 05:42:38 +01:00 committed by GitHub
parent ff8fcf8070
commit 94f4ea805d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 119 additions and 93 deletions

View File

@ -36,13 +36,13 @@ import (
"github.com/ledgerwatch/erigon/eth/integrity"
"github.com/ledgerwatch/erigon/eth/stagedsync"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/erigon/ethdb/privateapi"
"github.com/ledgerwatch/erigon/ethdb/prune"
"github.com/ledgerwatch/erigon/migrations"
"github.com/ledgerwatch/erigon/node/nodecfg/datadir"
"github.com/ledgerwatch/erigon/p2p"
"github.com/ledgerwatch/erigon/params"
"github.com/ledgerwatch/erigon/turbo/services"
"github.com/ledgerwatch/erigon/turbo/shards"
"github.com/ledgerwatch/erigon/turbo/snapshotsync"
"github.com/ledgerwatch/erigon/turbo/snapshotsync/snap"
stages2 "github.com/ledgerwatch/erigon/turbo/stages"
@ -1199,7 +1199,7 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig)
vmConfig := &vm.Config{}
events := privateapi.NewEvents()
events := shards.NewEvents()
genesis := core.DefaultGenesisBlockByChainName(chain)
chainConfig, genesisBlock, genesisErr := core.CommitGenesisBlock(db, genesis)
@ -1248,7 +1248,7 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig)
panic(err)
}
sync, err := stages2.NewStagedSync(context.Background(), db, p2p.Config{}, &cfg, sentryControlServer, &stagedsync.Notifications{}, nil, allSn, agg, nil)
sync, err := stages2.NewStagedSync(context.Background(), db, p2p.Config{}, &cfg, sentryControlServer, &shards.Notifications{}, nil, allSn, agg, nil)
if err != nil {
panic(err)
}

View File

@ -51,7 +51,7 @@ func TestEthSubscribe(t *testing.T) {
initialCycle := true
highestSeenHeader := chain.TopBlock.NumberU64()
if _, err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, highestSeenHeader, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil {
if _, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, highestSeenHeader, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil {
t.Fatal(err)
}

View File

@ -60,7 +60,7 @@ func TestSendRawTransaction(t *testing.T) {
initialCycle := true
highestSeenHeader := chain.TopBlock.NumberU64()
if _, err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, highestSeenHeader, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil {
if _, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, highestSeenHeader, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil {
t.Fatal(err)
}
}

View File

@ -131,7 +131,7 @@ type Ethereum struct {
downloaderClient proto_downloader.DownloaderClient
notifications *stagedsync.Notifications
notifications *shards.Notifications
unsubscribeEthstat func()
waitForStageLoopStop chan struct{}
@ -257,9 +257,9 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere
genesisHash: genesis.Hash(),
waitForStageLoopStop: make(chan struct{}),
waitForMiningStop: make(chan struct{}),
notifications: &stagedsync.Notifications{
Events: privateapi.NewEvents(),
Accumulator: shards.NewAccumulator(chainConfig),
notifications: &shards.Notifications{
Events: shards.NewEvents(),
Accumulator: shards.NewAccumulator(),
},
}
blockReader, allSnapshots, agg, err := backend.setUpBlockReader(ctx, config.Dirs, config.Snapshot, config.Downloader)
@ -325,8 +325,10 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere
}()
}
inMemoryExecution := func(batch kv.RwTx, header *types.Header, body *types.RawBody, unwindPoint uint64, headersChain []*types.Header, bodiesChain []*types.RawBody) error {
stateSync, err := stages2.NewInMemoryExecution(backend.sentryCtx, backend.chainDB, config, backend.sentriesClient, dirs, backend.notifications, allSnapshots, backend.agg)
inMemoryExecution := func(batch kv.RwTx, header *types.Header, body *types.RawBody, unwindPoint uint64, headersChain []*types.Header, bodiesChain []*types.RawBody,
notifications *shards.Notifications) error {
// Needs its own notifications to not update RPC daemon and txpool about pending blocks
stateSync, err := stages2.NewInMemoryExecution(backend.sentryCtx, backend.chainDB, config, backend.sentriesClient, dirs, notifications, allSnapshots, backend.agg)
if err != nil {
return err
}
@ -890,7 +892,7 @@ func (s *Ethereum) Start() error {
s.sentriesClient.StartStreamLoops(s.sentryCtx)
time.Sleep(10 * time.Millisecond) // just to reduce logs order confusion
go stages2.StageLoop(s.sentryCtx, s.chainDB, s.stagedSync, s.sentriesClient.Hd, s.notifications, s.sentriesClient.UpdateHead, s.waitForStageLoopStop, s.config.Sync.LoopThrottle)
go stages2.StageLoop(s.sentryCtx, s.chainConfig, s.chainDB, s.stagedSync, s.sentriesClient.Hd, s.notifications, s.sentriesClient.UpdateHead, s.waitForStageLoopStop, s.config.Sync.LoopThrottle)
return nil
}
@ -942,11 +944,15 @@ func (s *Ethereum) ChainDB() kv.RwDB {
return s.chainDB
}
func (s *Ethereum) ChainConfig() *params.ChainConfig {
return s.chainConfig
}
func (s *Ethereum) StagedSync() *stagedsync.Sync {
return s.stagedSync
}
func (s *Ethereum) Notifications() *stagedsync.Notifications {
func (s *Ethereum) Notifications() *shards.Notifications {
return s.notifications
}

View File

@ -24,6 +24,7 @@ import (
"github.com/ledgerwatch/erigon/rlp"
"github.com/ledgerwatch/erigon/turbo/engineapi"
"github.com/ledgerwatch/erigon/turbo/services"
"github.com/ledgerwatch/erigon/turbo/shards"
"github.com/ledgerwatch/erigon/turbo/snapshotsync"
"github.com/ledgerwatch/erigon/turbo/stages/bodydownload"
"github.com/ledgerwatch/erigon/turbo/stages/headerdownload"
@ -49,7 +50,7 @@ type HeadersCfg struct {
snapshots *snapshotsync.RoSnapshots
blockReader services.FullBlockReader
forkValidator *engineapi.ForkValidator
notifications *Notifications
notifications *shards.Notifications
}
func StageHeadersCfg(
@ -65,7 +66,7 @@ func StageHeadersCfg(
snapshots *snapshotsync.RoSnapshots,
blockReader services.FullBlockReader,
tmpdir string,
notifications *Notifications,
notifications *shards.Notifications,
forkValidator *engineapi.ForkValidator) HeadersCfg {
return HeadersCfg{
db: db,
@ -331,7 +332,7 @@ func startHandlingForkChoice(
if headerHash == cfg.forkValidator.ExtendingForkHeadHash() {
log.Info(fmt.Sprintf("[%s] Fork choice update: flushing in-memory state (built by previous newPayload)", s.LogPrefix()))
if err := cfg.forkValidator.FlushExtendingFork(tx); err != nil {
if err := cfg.forkValidator.FlushExtendingFork(tx, cfg.notifications.Accumulator); err != nil {
return nil, err
}
cfg.hd.BeaconRequestList.Remove(requestId)

View File

@ -7,8 +7,6 @@ import (
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/erigon/ethdb/privateapi"
"github.com/ledgerwatch/erigon/turbo/shards"
)
type ChainEventNotifier interface {
@ -18,12 +16,6 @@ type ChainEventNotifier interface {
HasLogSubsriptions() bool
}
type Notifications struct {
Events *privateapi.Events
Accumulator *shards.Accumulator
StateChangesConsumer shards.StateChangeConsumer
}
func MiningStages(
ctx context.Context,
createBlockCfg MiningCreateBlockCfg,

View File

@ -13,6 +13,7 @@ import (
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/params"
"github.com/ledgerwatch/erigon/turbo/engineapi"
"github.com/ledgerwatch/erigon/turbo/shards"
"github.com/ledgerwatch/erigon/turbo/stages/headerdownload"
"github.com/stretchr/testify/require"
)
@ -92,7 +93,7 @@ func TestMockDownloadRequest(t *testing.T) {
makeTestDb(ctx, db)
hd := headerdownload.NewHeaderDownload(0, 0, nil, nil)
hd.SetPOSSync(true)
events := NewEvents()
events := shards.NewEvents()
backend := NewEthBackendServer(ctx, nil, db, events, nil, &params.ChainConfig{TerminalTotalDifficulty: common.Big1}, nil, hd, false)
var err error
@ -151,7 +152,7 @@ func TestMockValidExecution(t *testing.T) {
hd := headerdownload.NewHeaderDownload(0, 0, nil, nil)
hd.SetPOSSync(true)
events := NewEvents()
events := shards.NewEvents()
backend := NewEthBackendServer(ctx, nil, db, events, nil, &params.ChainConfig{TerminalTotalDifficulty: common.Big1}, nil, hd, false)
var err error
@ -187,7 +188,7 @@ func TestMockInvalidExecution(t *testing.T) {
hd := headerdownload.NewHeaderDownload(0, 0, nil, nil)
hd.SetPOSSync(true)
events := NewEvents()
events := shards.NewEvents()
backend := NewEthBackendServer(ctx, nil, db, events, nil, &params.ChainConfig{TerminalTotalDifficulty: common.Big1}, nil, hd, false)
var err error
@ -222,7 +223,7 @@ func TestNoTTD(t *testing.T) {
hd := headerdownload.NewHeaderDownload(0, 0, nil, nil)
events := NewEvents()
events := shards.NewEvents()
backend := NewEthBackendServer(ctx, nil, db, events, nil, &params.ChainConfig{}, nil, hd, false)
var err error

View File

@ -28,6 +28,7 @@ import (
"github.com/ledgerwatch/erigon/turbo/builder"
"github.com/ledgerwatch/erigon/turbo/engineapi"
"github.com/ledgerwatch/erigon/turbo/services"
"github.com/ledgerwatch/erigon/turbo/shards"
"github.com/ledgerwatch/erigon/turbo/stages/headerdownload"
)
@ -50,7 +51,7 @@ type EthBackendServer struct {
ctx context.Context
eth EthBackend
events *Events
events *shards.Events
db kv.RoDB
blockReader services.BlockAndTxnReader
config *params.ChainConfig
@ -73,7 +74,7 @@ type EthBackend interface {
Peers(ctx context.Context) (*remote.PeersReply, error)
}
func NewEthBackendServer(ctx context.Context, eth EthBackend, db kv.RwDB, events *Events, blockReader services.BlockAndTxnReader,
func NewEthBackendServer(ctx context.Context, eth EthBackend, db kv.RwDB, events *shards.Events, blockReader services.BlockAndTxnReader,
config *params.ChainConfig, builderFunc builder.BlockBuilderFunc, hd *headerdownload.HeaderDownload, proposing bool,
) *EthBackendServer {
s := &EthBackendServer{ctx: ctx, eth: eth, events: events, db: db, blockReader: blockReader, config: config,
@ -469,18 +470,6 @@ func (s *EthBackendServer) getQuickPayloadStatusIfPossible(blockHash common.Hash
log.Debug(fmt.Sprintf("[%s] Downloading some other PoS stuff", prefix), "hash", blockHash)
return &engineapi.PayloadStatus{Status: remote.EngineStatus_SYNCING}, nil
}
headHash := rawdb.ReadHeadBlockHash(tx)
if err != nil {
return nil, err
}
// We add the extra restriction blockHash != headHash for the FCU case of canonicalHash == blockHash
// because otherwise (when FCU points to the head) we want go to stage headers
// so that it calls writeForkChoiceHashes.
if blockHash != headHash && canonicalHash == blockHash {
return &engineapi.PayloadStatus{Status: remote.EngineStatus_VALID, LatestValidHash: blockHash}, nil
}
}
// If another payload is already commissioned then we just reply with syncing

View File

@ -9,6 +9,7 @@ import (
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
"github.com/ledgerwatch/erigon-lib/gointerfaces/types"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/turbo/shards"
)
type LogsFilterAggregator struct {
@ -16,7 +17,7 @@ type LogsFilterAggregator struct {
logsFilters map[uint64]*LogsFilter // Filter for each subscriber, keyed by filterID
logsFilterLock sync.Mutex
nextFilterId uint64
events *Events
events *shards.Events
}
// LogsFilter is used for both representing log filter for a specific subscriber (RPC daemon usually)
@ -32,7 +33,7 @@ type LogsFilter struct {
sender remote.ETHBACKEND_SubscribeLogsServer // nil for aggregate subscriber, for appropriate stream server otherwise
}
func NewLogsFilterAggregator(events *Events) *LogsFilterAggregator {
func NewLogsFilterAggregator(events *shards.Events) *LogsFilterAggregator {
return &LogsFilterAggregator{
aggLogsFilter: LogsFilter{
addrs: make(map[common.Address]int),

View File

@ -10,6 +10,7 @@ import (
"google.golang.org/grpc"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/turbo/shards"
)
var (
@ -65,7 +66,7 @@ func createLog() *remote.SubscribeLogsReply {
}
func TestLogsFilter_EmptyFilter_DoesNotDistributeAnything(t *testing.T) {
events := NewEvents()
events := shards.NewEvents()
agg := NewLogsFilterAggregator(events)
srv := &testServer{
@ -103,7 +104,7 @@ func TestLogsFilter_EmptyFilter_DoesNotDistributeAnything(t *testing.T) {
}
func TestLogsFilter_AllAddressesAndTopicsFilter_DistributesLogRegardless(t *testing.T) {
events := NewEvents()
events := shards.NewEvents()
agg := NewLogsFilterAggregator(events)
srv := &testServer{
@ -155,7 +156,7 @@ func TestLogsFilter_AllAddressesAndTopicsFilter_DistributesLogRegardless(t *test
}
func TestLogsFilter_TopicFilter_OnlyAllowsThatTopicThrough(t *testing.T) {
events := NewEvents()
events := shards.NewEvents()
agg := NewLogsFilterAggregator(events)
srv := &testServer{
@ -200,7 +201,7 @@ func TestLogsFilter_TopicFilter_OnlyAllowsThatTopicThrough(t *testing.T) {
}
func TestLogsFilter_AddressFilter_OnlyAllowsThatAddressThrough(t *testing.T) {
events := NewEvents()
events := shards.NewEvents()
agg := NewLogsFilterAggregator(events)
srv := &testServer{

View File

@ -212,7 +212,7 @@ func InsertChain(ethereum *eth.Ethereum, chain *core.ChainPack) error {
sentryControlServer.Hd.MarkAllVerified()
_, err := stages.StageLoopStep(ethereum.SentryCtx(), ethereum.ChainDB(), ethereum.StagedSync(), highestSeenHeader, ethereum.Notifications(), initialCycle, sentryControlServer.UpdateHead, nil)
_, err := stages.StageLoopStep(ethereum.SentryCtx(), ethereum.ChainConfig(), ethereum.ChainDB(), ethereum.StagedSync(), highestSeenHeader, ethereum.Notifications(), initialCycle, sentryControlServer.UpdateHead, nil)
if err != nil {
return err
}

View File

@ -35,7 +35,7 @@ import (
// the maximum point from the current head, past which side forks are not validated anymore.
const maxForkDepth = 32 // 32 slots is the duration of an epoch thus there cannot be side forks in PoS deeper than 32 blocks from head.
type validatePayloadFunc func(kv.RwTx, *types.Header, *types.RawBody, uint64, []*types.Header, []*types.RawBody) error
type validatePayloadFunc func(kv.RwTx, *types.Header, *types.RawBody, uint64, []*types.Header, []*types.RawBody, *shards.Notifications) error
// Fork segment is a side fork segment and repressent a full side fork block.
type forkSegment struct {
@ -50,6 +50,8 @@ type ForkValidator struct {
sideForksBlock map[common.Hash]forkSegment
// current memory batch containing chain head that extend canonical fork.
extendingFork *memdb.MemoryMutation
// notifications accumulated for the extending fork
extendingForkNotifications *shards.Notifications
// hash of chain head that extend canonical fork.
extendingForkHeadHash common.Hash
// this is the function we use to perform payload validation.
@ -108,27 +110,33 @@ func (fv *ForkValidator) notifyTxPool(to uint64, accumulator *shards.Accumulator
func (fv *ForkValidator) NotifyCurrentHeight(currentHeight uint64) {
fv.lock.Lock()
defer fv.lock.Unlock()
if fv.currentHeight == currentHeight {
return
}
fv.currentHeight = currentHeight
// If the head changed,e previous assumptions on head are incorrect now.
if fv.extendingFork != nil {
fv.extendingFork.Rollback()
}
fv.extendingFork = nil
fv.extendingForkNotifications = nil
fv.extendingForkHeadHash = common.Hash{}
}
// FlushExtendingFork flush the current extending fork if fcu chooses its head hash as the its forkchoice.
func (fv *ForkValidator) FlushExtendingFork(tx kv.RwTx) error {
func (fv *ForkValidator) FlushExtendingFork(tx kv.RwTx, accumulator *shards.Accumulator) error {
fv.lock.Lock()
defer fv.lock.Unlock()
// Flush changes to db.
if err := fv.extendingFork.Flush(tx); err != nil {
return err
}
fv.extendingForkNotifications.Accumulator.CopyAndReset(accumulator)
// Clean extending fork data
fv.extendingFork.Rollback()
fv.extendingForkHeadHash = common.Hash{}
fv.extendingFork = nil
fv.extendingForkNotifications = nil
return nil
}
@ -156,12 +164,16 @@ func (fv *ForkValidator) ValidatePayload(tx kv.RwTx, header *types.Header, body
// If the new block extends the canonical chain we update extendingFork.
if fv.extendingFork == nil {
fv.extendingFork = memdb.NewMemoryBatch(tx)
fv.extendingForkNotifications = &shards.Notifications{
Events: shards.NewEvents(),
Accumulator: shards.NewAccumulator(),
}
} else {
fv.extendingFork.UpdateTxn(tx)
}
// Update fork head hash.
fv.extendingForkHeadHash = header.Hash()
return fv.validateAndStorePayload(fv.extendingFork, header, body, 0, nil, nil)
return fv.validateAndStorePayload(fv.extendingFork, header, body, 0, nil, nil, fv.extendingForkNotifications)
}
// if the block is not in range of maxForkDepth from head then we do not validate it.
@ -213,7 +225,11 @@ func (fv *ForkValidator) ValidatePayload(tx kv.RwTx, header *types.Header, body
}
batch := memdb.NewMemoryBatch(tx)
defer batch.Rollback()
return fv.validateAndStorePayload(batch, header, body, unwindPoint, headersChain, bodiesChain)
notifications := &shards.Notifications{
Events: shards.NewEvents(),
Accumulator: shards.NewAccumulator(),
}
return fv.validateAndStorePayload(batch, header, body, unwindPoint, headersChain, bodiesChain, notifications)
}
// Clear wipes out current extending fork data, this method is called after fcu is called,
@ -254,8 +270,9 @@ func (fv *ForkValidator) ClearWithUnwind(tx kv.RwTx, accumulator *shards.Accumul
}
// validateAndStorePayload validate and store a payload fork chain if such chain results valid.
func (fv *ForkValidator) validateAndStorePayload(tx kv.RwTx, header *types.Header, body *types.RawBody, unwindPoint uint64, headersChain []*types.Header, bodiesChain []*types.RawBody) (status remote.EngineStatus, latestValidHash common.Hash, validationError error, criticalError error) {
validationError = fv.validatePayload(tx, header, body, unwindPoint, headersChain, bodiesChain)
func (fv *ForkValidator) validateAndStorePayload(tx kv.RwTx, header *types.Header, body *types.RawBody, unwindPoint uint64, headersChain []*types.Header, bodiesChain []*types.RawBody,
notifications *shards.Notifications) (status remote.EngineStatus, latestValidHash common.Hash, validationError error, criticalError error) {
validationError = fv.validatePayload(tx, header, body, unwindPoint, headersChain, bodiesChain, notifications)
latestValidHash = header.Hash()
if validationError != nil {
latestValidHash = header.ParentHash

View File

@ -1,4 +1,4 @@
package privateapi
package shards
import (
"sync"
@ -137,3 +137,9 @@ func (e *Events) OnLogs(logs []*remote.SubscribeLogsReply) {
common.PrioritizedSend(ch, logs)
}
}
type Notifications struct {
Events *Events
Accumulator *Accumulator
StateChangesConsumer StateChangeConsumer
}

View File

@ -7,21 +7,19 @@ import (
"github.com/ledgerwatch/erigon-lib/gointerfaces"
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/params"
)
// Accumulator collects state changes in a form that can then be delivered to the RPC daemon
type Accumulator struct {
viewID uint64 // mdbx's txID
chainConfig *params.ChainConfig
changes []*remote.StateChange
latestChange *remote.StateChange
accountChangeIndex map[common.Address]int // For the latest changes, allows finding account change by account's address
storageChangeIndex map[common.Address]map[common.Hash]int
}
func NewAccumulator(chainConfig *params.ChainConfig) *Accumulator {
return &Accumulator{chainConfig: chainConfig}
func NewAccumulator() *Accumulator {
return &Accumulator{}
}
type StateChangeConsumer interface {
@ -35,7 +33,6 @@ func (a *Accumulator) Reset(viewID uint64) {
a.storageChangeIndex = nil
a.viewID = viewID
}
func (a *Accumulator) ChainConfig() *params.ChainConfig { return a.chainConfig }
func (a *Accumulator) SendAndReset(ctx context.Context, c StateChangeConsumer, pendingBaseFee uint64, blockGasLimit uint64) {
if a == nil || c == nil || len(a.changes) == 0 {
return
@ -161,3 +158,14 @@ func (a *Accumulator) ChangeStorage(address common.Address, incarnation uint64,
storageChange.Location = gointerfaces.ConvertHashToH256(location)
storageChange.Data = data
}
func (a *Accumulator) CopyAndReset(target *Accumulator) {
target.changes = a.changes
a.changes = nil
target.latestChange = a.latestChange
a.latestChange = nil
target.accountChangeIndex = a.accountChangeIndex
a.accountChangeIndex = nil
target.storageChangeIndex = a.storageChangeIndex
a.storageChangeIndex = nil
}

View File

@ -41,7 +41,6 @@ import (
"github.com/ledgerwatch/erigon/eth/protocols/eth"
"github.com/ledgerwatch/erigon/eth/stagedsync"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/erigon/ethdb/privateapi"
"github.com/ledgerwatch/erigon/ethdb/prune"
"github.com/ledgerwatch/erigon/node/nodecfg/datadir"
"github.com/ledgerwatch/erigon/params"
@ -80,7 +79,7 @@ type MockSentry struct {
ReceiveWg sync.WaitGroup
Address common.Address
Notifications *stagedsync.Notifications
Notifications *shards.Notifications
// TxPool
TxPoolFetch *txpool.Fetch
@ -219,9 +218,9 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey
gspec: gspec,
ChainConfig: gspec.Config,
Key: key,
Notifications: &stagedsync.Notifications{
Events: privateapi.NewEvents(),
Accumulator: shards.NewAccumulator(gspec.Config),
Notifications: &shards.Notifications{
Events: shards.NewEvents(),
Accumulator: shards.NewAccumulator(),
StateChangesConsumer: erigonGrpcServeer,
},
UpdateHead: func(Ctx context.Context, head uint64, hash common.Hash, td *uint256.Int) {
@ -560,7 +559,7 @@ func (ms *MockSentry) insertPoWBlocks(chain *core.ChainPack) error {
if ms.TxPool != nil {
ms.ReceiveWg.Add(1)
}
if _, err = StageLoopStep(ms.Ctx, ms.DB, ms.Sync, highestSeenHeader, ms.Notifications, initialCycle, ms.UpdateHead, nil); err != nil {
if _, err = StageLoopStep(ms.Ctx, ms.ChainConfig, ms.DB, ms.Sync, highestSeenHeader, ms.Notifications, initialCycle, ms.UpdateHead, nil); err != nil {
return err
}
if ms.TxPool != nil {
@ -581,7 +580,7 @@ func (ms *MockSentry) insertPoSBlocks(chain *core.ChainPack) error {
initialCycle := false
highestSeenHeader := chain.TopBlock.NumberU64()
headBlockHash, err := StageLoopStep(ms.Ctx, ms.DB, ms.Sync, highestSeenHeader, ms.Notifications, initialCycle, ms.UpdateHead, nil)
headBlockHash, err := StageLoopStep(ms.Ctx, ms.ChainConfig, ms.DB, ms.Sync, highestSeenHeader, ms.Notifications, initialCycle, ms.UpdateHead, nil)
if err != nil {
return err
}
@ -594,7 +593,7 @@ func (ms *MockSentry) insertPoSBlocks(chain *core.ChainPack) error {
FinalizedBlockHash: chain.TopBlock.Hash(),
}
ms.SendForkChoiceRequest(&fc)
headBlockHash, err = StageLoopStep(ms.Ctx, ms.DB, ms.Sync, highestSeenHeader, ms.Notifications, initialCycle, ms.UpdateHead, nil)
headBlockHash, err = StageLoopStep(ms.Ctx, ms.ChainConfig, ms.DB, ms.Sync, highestSeenHeader, ms.Notifications, initialCycle, ms.UpdateHead, nil)
if err != nil {
return err
}

View File

@ -57,7 +57,7 @@ func TestHeaderStep(t *testing.T) {
initialCycle := true
highestSeenHeader := chain.TopBlock.NumberU64()
if _, err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, highestSeenHeader, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil {
if _, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, highestSeenHeader, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil {
t.Fatal(err)
}
}
@ -96,7 +96,7 @@ func TestMineBlockWith1Tx(t *testing.T) {
initialCycle := true
highestSeenHeader := chain.TopBlock.NumberU64()
if _, err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, highestSeenHeader, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil {
if _, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, highestSeenHeader, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil {
t.Fatal(err)
}
}
@ -165,7 +165,7 @@ func TestReorg(t *testing.T) {
initialCycle := true
highestSeenHeader := chain.TopBlock.NumberU64()
if _, err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, highestSeenHeader, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil {
if _, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, highestSeenHeader, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil {
t.Fatal(err)
}
@ -219,7 +219,7 @@ func TestReorg(t *testing.T) {
highestSeenHeader = short.TopBlock.NumberU64()
initialCycle = false
if _, err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, highestSeenHeader, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil {
if _, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, highestSeenHeader, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil {
t.Fatal(err)
}
@ -263,7 +263,7 @@ func TestReorg(t *testing.T) {
// This is unwind step
highestSeenHeader = long1.TopBlock.NumberU64()
if _, err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, highestSeenHeader, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil {
if _, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, highestSeenHeader, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil {
t.Fatal(err)
}
@ -301,7 +301,7 @@ func TestReorg(t *testing.T) {
highestSeenHeader = short2.TopBlock.NumberU64()
initialCycle = false
if _, err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, highestSeenHeader, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil {
if _, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, highestSeenHeader, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil {
t.Fatal(err)
}
}
@ -398,7 +398,7 @@ func TestAnchorReplace(t *testing.T) {
highestSeenHeader := long.TopBlock.NumberU64()
initialCycle := true
if _, err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, highestSeenHeader, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil {
if _, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, highestSeenHeader, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil {
t.Fatal(err)
}
}
@ -503,7 +503,7 @@ func TestAnchorReplace2(t *testing.T) {
highestSeenHeader := long.TopBlock.NumberU64()
initialCycle := true
if _, err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, highestSeenHeader, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil {
if _, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, highestSeenHeader, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil {
t.Fatal(err)
}
}
@ -520,7 +520,7 @@ func TestForkchoiceToGenesis(t *testing.T) {
m.SendForkChoiceRequest(&forkChoiceMessage)
initialCycle := false
headBlockHash, err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, 0, m.Notifications, initialCycle, m.UpdateHead, nil)
headBlockHash, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, 0, m.Notifications, initialCycle, m.UpdateHead, nil)
require.NoError(t, err)
stages.SendPayloadStatus(m.HeaderDownload(), headBlockHash, err)
@ -542,7 +542,7 @@ func TestBogusForkchoice(t *testing.T) {
m.SendForkChoiceRequest(&forkChoiceMessage)
initialCycle := false
headBlockHash, err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, 0, m.Notifications, initialCycle, m.UpdateHead, nil)
headBlockHash, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, 0, m.Notifications, initialCycle, m.UpdateHead, nil)
require.NoError(t, err)
stages.SendPayloadStatus(m.HeaderDownload(), headBlockHash, err)
@ -557,7 +557,7 @@ func TestBogusForkchoice(t *testing.T) {
}
m.SendForkChoiceRequest(&forkChoiceMessage)
headBlockHash, err = stages.StageLoopStep(m.Ctx, m.DB, m.Sync, 0, m.Notifications, initialCycle, m.UpdateHead, nil)
headBlockHash, err = stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, 0, m.Notifications, initialCycle, m.UpdateHead, nil)
require.NoError(t, err)
stages.SendPayloadStatus(m.HeaderDownload(), headBlockHash, err)
@ -577,7 +577,7 @@ func TestPoSDownloader(t *testing.T) {
m.SendPayloadRequest(chain.TopBlock)
initialCycle := false
headBlockHash, err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, 0, m.Notifications, initialCycle, m.UpdateHead, nil)
headBlockHash, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, 0, m.Notifications, initialCycle, m.UpdateHead, nil)
require.NoError(t, err)
stages.SendPayloadStatus(m.HeaderDownload(), headBlockHash, err)
@ -597,12 +597,12 @@ func TestPoSDownloader(t *testing.T) {
m.ReceiveWg.Wait()
// First cycle: save the downloaded header
headBlockHash, err = stages.StageLoopStep(m.Ctx, m.DB, m.Sync, 0, m.Notifications, initialCycle, m.UpdateHead, nil)
headBlockHash, err = stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, 0, m.Notifications, initialCycle, m.UpdateHead, nil)
require.NoError(t, err)
stages.SendPayloadStatus(m.HeaderDownload(), headBlockHash, err)
// Second cycle: process the previous beacon request
headBlockHash, err = stages.StageLoopStep(m.Ctx, m.DB, m.Sync, 0, m.Notifications, initialCycle, m.UpdateHead, nil)
headBlockHash, err = stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, 0, m.Notifications, initialCycle, m.UpdateHead, nil)
require.NoError(t, err)
stages.SendPayloadStatus(m.HeaderDownload(), headBlockHash, err)
@ -613,7 +613,7 @@ func TestPoSDownloader(t *testing.T) {
FinalizedBlockHash: chain.TopBlock.Hash(),
}
m.SendForkChoiceRequest(&forkChoiceMessage)
headBlockHash, err = stages.StageLoopStep(m.Ctx, m.DB, m.Sync, 0, m.Notifications, initialCycle, m.UpdateHead, nil)
headBlockHash, err = stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, 0, m.Notifications, initialCycle, m.UpdateHead, nil)
require.NoError(t, err)
stages.SendPayloadStatus(m.HeaderDownload(), headBlockHash, err)
assert.Equal(t, chain.TopBlock.Hash(), headBlockHash)
@ -645,7 +645,7 @@ func TestPoSSyncWithInvalidHeader(t *testing.T) {
m.SendPayloadRequest(payloadMessage)
initialCycle := false
headBlockHash, err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, 0, m.Notifications, initialCycle, m.UpdateHead, nil)
headBlockHash, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, 0, m.Notifications, initialCycle, m.UpdateHead, nil)
require.NoError(t, err)
stages.SendPayloadStatus(m.HeaderDownload(), headBlockHash, err)
@ -664,7 +664,7 @@ func TestPoSSyncWithInvalidHeader(t *testing.T) {
}
m.ReceiveWg.Wait()
headBlockHash, err = stages.StageLoopStep(m.Ctx, m.DB, m.Sync, 0, m.Notifications, initialCycle, m.UpdateHead, nil)
headBlockHash, err = stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, 0, m.Notifications, initialCycle, m.UpdateHead, nil)
require.NoError(t, err)
stages.SendPayloadStatus(m.HeaderDownload(), headBlockHash, err)
@ -675,7 +675,7 @@ func TestPoSSyncWithInvalidHeader(t *testing.T) {
FinalizedBlockHash: invalidTip.Hash(),
}
m.SendForkChoiceRequest(&forkChoiceMessage)
_, err = stages.StageLoopStep(m.Ctx, m.DB, m.Sync, 0, m.Notifications, initialCycle, m.UpdateHead, nil)
_, err = stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, 0, m.Notifications, initialCycle, m.UpdateHead, nil)
require.NoError(t, err)
bad, lastValidHash := m.HeaderDownload().IsBadHeaderPoS(invalidTip.Hash())

View File

@ -26,8 +26,10 @@ import (
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/erigon/node/nodecfg/datadir"
"github.com/ledgerwatch/erigon/p2p"
"github.com/ledgerwatch/erigon/params"
"github.com/ledgerwatch/erigon/turbo/engineapi"
"github.com/ledgerwatch/erigon/turbo/services"
"github.com/ledgerwatch/erigon/turbo/shards"
"github.com/ledgerwatch/erigon/turbo/snapshotsync"
"github.com/ledgerwatch/erigon/turbo/stages/headerdownload"
"github.com/ledgerwatch/log/v3"
@ -64,10 +66,11 @@ func SendPayloadStatus(hd *headerdownload.HeaderDownload, headBlockHash common.H
// StageLoop runs the continuous loop of staged sync
func StageLoop(
ctx context.Context,
chainConfig *params.ChainConfig,
db kv.RwDB,
sync *stagedsync.Sync,
hd *headerdownload.HeaderDownload,
notifications *stagedsync.Notifications,
notifications *shards.Notifications,
updateHead func(ctx context.Context, head uint64, hash common.Hash, td *uint256.Int),
waitForDone chan struct{},
loopMinTime time.Duration,
@ -87,7 +90,7 @@ func StageLoop(
// Estimate the current top height seen from the peer
height := hd.TopSeenHeight()
headBlockHash, err := StageLoopStep(ctx, db, sync, height, notifications, initialCycle, updateHead, nil)
headBlockHash, err := StageLoopStep(ctx, chainConfig, db, sync, height, notifications, initialCycle, updateHead, nil)
SendPayloadStatus(hd, headBlockHash, err)
@ -122,10 +125,11 @@ func StageLoop(
func StageLoopStep(
ctx context.Context,
chainConfig *params.ChainConfig,
db kv.RwDB,
sync *stagedsync.Sync,
highestSeenHeader uint64,
notifications *stagedsync.Notifications,
notifications *shards.Notifications,
initialCycle bool,
updateHead func(ctx context.Context, head uint64, hash common.Hash, td *uint256.Int),
snapshotMigratorFinal func(tx kv.Tx) error,
@ -226,10 +230,11 @@ func StageLoopStep(
if notifications.Accumulator != nil {
header := rawdb.ReadCurrentHeader(rotx)
if header != nil {
pendingBaseFee := misc.CalcBaseFee(notifications.Accumulator.ChainConfig(), header)
pendingBaseFee := misc.CalcBaseFee(chainConfig, header)
if header.Number.Uint64() == 0 {
notifications.Accumulator.StartChange(0, header.Hash(), nil, false)
}
notifications.Accumulator.SendAndReset(ctx, notifications.StateChangesConsumer, pendingBaseFee.Uint64(), header.GasLimit)
if err = stagedsync.NotifyNewHeaders(ctx, finishProgressBefore, head, sync.PrevUnwindPoint(), notifications.Events, rotx); err != nil {
@ -372,7 +377,7 @@ func NewStagedSync(ctx context.Context,
p2pCfg p2p.Config,
cfg *ethconfig.Config,
controlServer *sentry.MultiClient,
notifications *stagedsync.Notifications,
notifications *shards.Notifications,
snapDownloader proto_downloader.DownloaderClient,
snapshots *snapshotsync.RoSnapshots,
agg *state.Aggregator22,
@ -450,7 +455,7 @@ func NewStagedSync(ctx context.Context,
), nil
}
func NewInMemoryExecution(ctx context.Context, db kv.RwDB, cfg *ethconfig.Config, controlServer *sentry.MultiClient, dirs datadir.Dirs, notifications *stagedsync.Notifications, snapshots *snapshotsync.RoSnapshots, agg *state.Aggregator22) (*stagedsync.Sync, error) {
func NewInMemoryExecution(ctx context.Context, db kv.RwDB, cfg *ethconfig.Config, controlServer *sentry.MultiClient, dirs datadir.Dirs, notifications *shards.Notifications, snapshots *snapshotsync.RoSnapshots, agg *state.Aggregator22) (*stagedsync.Sync, error) {
var blockReader services.FullBlockReader
if cfg.Snapshot.Enabled {
blockReader = snapshotsync.NewBlockReaderWithSnapshots(snapshots)