From d45bddc5ade08aa727ab153b2e3f0a39f05b0e85 Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Sun, 18 Dec 2022 07:59:31 +0700 Subject: [PATCH] StagedSync: break dependency to CurrentHeader. Always run non-initial cycle in 1 RwTx (#6348) cc: @AlexeyAkhunov --- cmd/rpcdaemon/commands/eth_subscribe_test.go | 2 +- .../commands/send_transaction_test.go | 3 +- turbo/app/import.go | 3 +- turbo/stages/mock_sentry.go | 8 ++-- turbo/stages/sentry_mock_test.go | 44 ++++++++----------- turbo/stages/stageloop.go | 29 ++++-------- 6 files changed, 33 insertions(+), 56 deletions(-) diff --git a/cmd/rpcdaemon/commands/eth_subscribe_test.go b/cmd/rpcdaemon/commands/eth_subscribe_test.go index 614be7eae..4dc01d5ab 100644 --- a/cmd/rpcdaemon/commands/eth_subscribe_test.go +++ b/cmd/rpcdaemon/commands/eth_subscribe_test.go @@ -52,7 +52,7 @@ func TestEthSubscribe(t *testing.T) { initialCycle := true highestSeenHeader := chain.TopBlock.NumberU64() - if _, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, highestSeenHeader, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil { + if _, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil { t.Fatal(err) } diff --git a/cmd/rpcdaemon/commands/send_transaction_test.go b/cmd/rpcdaemon/commands/send_transaction_test.go index 264bb0311..fda31e908 100644 --- a/cmd/rpcdaemon/commands/send_transaction_test.go +++ b/cmd/rpcdaemon/commands/send_transaction_test.go @@ -59,8 +59,7 @@ func TestSendRawTransaction(t *testing.T) { m.ReceiveWg.Wait() // Wait for all messages to be processed before we proceeed initialCycle := true - highestSeenHeader := chain.TopBlock.NumberU64() - if _, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, highestSeenHeader, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil { + if _, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil { t.Fatal(err) } } diff --git a/turbo/app/import.go b/turbo/app/import.go index 58ba1c81b..f10358f55 100644 --- a/turbo/app/import.go +++ b/turbo/app/import.go @@ -203,7 +203,6 @@ func missingBlocks(chainDB kv.RwDB, blocks []*types.Block) []*types.Block { func InsertChain(ethereum *eth.Ethereum, chain *core.ChainPack) error { sentryControlServer := ethereum.SentryControlServer() initialCycle := false - highestSeenHeader := chain.TopBlock.NumberU64() for _, b := range chain.Blocks { sentryControlServer.Hd.AddMinedHeader(b.Header()) @@ -212,7 +211,7 @@ func InsertChain(ethereum *eth.Ethereum, chain *core.ChainPack) error { sentryControlServer.Hd.MarkAllVerified() - _, err := stages.StageLoopStep(ethereum.SentryCtx(), ethereum.ChainConfig(), ethereum.ChainDB(), ethereum.StagedSync(), highestSeenHeader, ethereum.Notifications(), initialCycle, sentryControlServer.UpdateHead, nil) + _, err := stages.StageLoopStep(ethereum.SentryCtx(), ethereum.ChainConfig(), ethereum.ChainDB(), ethereum.StagedSync(), ethereum.Notifications(), initialCycle, sentryControlServer.UpdateHead, nil) if err != nil { return err } diff --git a/turbo/stages/mock_sentry.go b/turbo/stages/mock_sentry.go index 59f72e13e..0a77b78e9 100644 --- a/turbo/stages/mock_sentry.go +++ b/turbo/stages/mock_sentry.go @@ -550,11 +550,10 @@ func (ms *MockSentry) insertPoWBlocks(chain *core.ChainPack) error { ms.ReceiveWg.Wait() // Wait for all messages to be processed before we proceed initialCycle := false - highestSeenHeader := chain.Blocks[n-1].NumberU64() if ms.TxPool != nil { ms.ReceiveWg.Add(1) } - if _, err = StageLoopStep(ms.Ctx, ms.ChainConfig, ms.DB, ms.Sync, highestSeenHeader, ms.Notifications, initialCycle, ms.UpdateHead, nil); err != nil { + if _, err = StageLoopStep(ms.Ctx, ms.ChainConfig, ms.DB, ms.Sync, ms.Notifications, initialCycle, ms.UpdateHead, nil); err != nil { return err } if ms.TxPool != nil { @@ -574,8 +573,7 @@ func (ms *MockSentry) insertPoSBlocks(chain *core.ChainPack) error { } initialCycle := false - highestSeenHeader := chain.TopBlock.NumberU64() - headBlockHash, err := StageLoopStep(ms.Ctx, ms.ChainConfig, ms.DB, ms.Sync, highestSeenHeader, ms.Notifications, initialCycle, ms.UpdateHead, nil) + headBlockHash, err := StageLoopStep(ms.Ctx, ms.ChainConfig, ms.DB, ms.Sync, ms.Notifications, initialCycle, ms.UpdateHead, nil) if err != nil { return err } @@ -588,7 +586,7 @@ func (ms *MockSentry) insertPoSBlocks(chain *core.ChainPack) error { FinalizedBlockHash: chain.TopBlock.Hash(), } ms.SendForkChoiceRequest(&fc) - headBlockHash, err = StageLoopStep(ms.Ctx, ms.ChainConfig, ms.DB, ms.Sync, highestSeenHeader, ms.Notifications, initialCycle, ms.UpdateHead, nil) + headBlockHash, err = StageLoopStep(ms.Ctx, ms.ChainConfig, ms.DB, ms.Sync, ms.Notifications, initialCycle, ms.UpdateHead, nil) if err != nil { return err } diff --git a/turbo/stages/sentry_mock_test.go b/turbo/stages/sentry_mock_test.go index 1090da0cf..fa65542bb 100644 --- a/turbo/stages/sentry_mock_test.go +++ b/turbo/stages/sentry_mock_test.go @@ -56,8 +56,7 @@ func TestHeaderStep(t *testing.T) { m.ReceiveWg.Wait() // Wait for all messages to be processed before we proceed initialCycle := true - highestSeenHeader := chain.TopBlock.NumberU64() - if _, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, highestSeenHeader, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil { + if _, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil { t.Fatal(err) } } @@ -95,8 +94,7 @@ func TestMineBlockWith1Tx(t *testing.T) { m.ReceiveWg.Wait() // Wait for all messages to be processed before we proceeed initialCycle := true - highestSeenHeader := chain.TopBlock.NumberU64() - if _, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, highestSeenHeader, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil { + if _, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil { t.Fatal(err) } } @@ -164,8 +162,7 @@ func TestReorg(t *testing.T) { m.ReceiveWg.Wait() // Wait for all messages to be processed before we proceeed initialCycle := true - highestSeenHeader := chain.TopBlock.NumberU64() - if _, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, highestSeenHeader, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil { + if _, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil { t.Fatal(err) } @@ -217,9 +214,8 @@ func TestReorg(t *testing.T) { } m.ReceiveWg.Wait() // Wait for all messages to be processed before we proceeed - highestSeenHeader = short.TopBlock.NumberU64() initialCycle = false - if _, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, highestSeenHeader, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil { + if _, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil { t.Fatal(err) } @@ -262,8 +258,7 @@ func TestReorg(t *testing.T) { m.ReceiveWg.Wait() // Wait for all messages to be processed before we proceeed // This is unwind step - highestSeenHeader = long1.TopBlock.NumberU64() - if _, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, highestSeenHeader, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil { + if _, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil { t.Fatal(err) } @@ -299,9 +294,8 @@ func TestReorg(t *testing.T) { } m.ReceiveWg.Wait() // Wait for all messages to be processed before we proceeed - highestSeenHeader = short2.TopBlock.NumberU64() initialCycle = false - if _, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, highestSeenHeader, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil { + if _, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil { t.Fatal(err) } } @@ -396,9 +390,8 @@ func TestAnchorReplace(t *testing.T) { m.ReceiveWg.Wait() // Wait for all messages to be processed before we proceeed - highestSeenHeader := long.TopBlock.NumberU64() initialCycle := true - if _, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, highestSeenHeader, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil { + if _, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil { t.Fatal(err) } } @@ -501,9 +494,8 @@ func TestAnchorReplace2(t *testing.T) { m.ReceiveWg.Wait() // Wait for all messages to be processed before we proceeed - highestSeenHeader := long.TopBlock.NumberU64() initialCycle := true - if _, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, highestSeenHeader, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil { + if _, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil { t.Fatal(err) } } @@ -520,7 +512,7 @@ func TestForkchoiceToGenesis(t *testing.T) { m.SendForkChoiceRequest(&forkChoiceMessage) initialCycle := false - headBlockHash, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, 0, m.Notifications, initialCycle, m.UpdateHead, nil) + headBlockHash, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, m.Notifications, initialCycle, m.UpdateHead, nil) require.NoError(t, err) stages.SendPayloadStatus(m.HeaderDownload(), headBlockHash, err) @@ -542,7 +534,7 @@ func TestBogusForkchoice(t *testing.T) { m.SendForkChoiceRequest(&forkChoiceMessage) initialCycle := false - headBlockHash, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, 0, m.Notifications, initialCycle, m.UpdateHead, nil) + headBlockHash, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, m.Notifications, initialCycle, m.UpdateHead, nil) require.NoError(t, err) stages.SendPayloadStatus(m.HeaderDownload(), headBlockHash, err) @@ -557,7 +549,7 @@ func TestBogusForkchoice(t *testing.T) { } m.SendForkChoiceRequest(&forkChoiceMessage) - headBlockHash, err = stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, 0, m.Notifications, initialCycle, m.UpdateHead, nil) + headBlockHash, err = stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, m.Notifications, initialCycle, m.UpdateHead, nil) require.NoError(t, err) stages.SendPayloadStatus(m.HeaderDownload(), headBlockHash, err) @@ -577,7 +569,7 @@ func TestPoSDownloader(t *testing.T) { m.SendPayloadRequest(chain.TopBlock) initialCycle := false - headBlockHash, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, 0, m.Notifications, initialCycle, m.UpdateHead, nil) + headBlockHash, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, m.Notifications, initialCycle, m.UpdateHead, nil) require.NoError(t, err) stages.SendPayloadStatus(m.HeaderDownload(), headBlockHash, err) @@ -597,12 +589,12 @@ func TestPoSDownloader(t *testing.T) { m.ReceiveWg.Wait() // First cycle: save the downloaded header - headBlockHash, err = stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, 0, m.Notifications, initialCycle, m.UpdateHead, nil) + headBlockHash, err = stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, m.Notifications, initialCycle, m.UpdateHead, nil) require.NoError(t, err) stages.SendPayloadStatus(m.HeaderDownload(), headBlockHash, err) // Second cycle: process the previous beacon request - headBlockHash, err = stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, 0, m.Notifications, initialCycle, m.UpdateHead, nil) + headBlockHash, err = stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, m.Notifications, initialCycle, m.UpdateHead, nil) require.NoError(t, err) stages.SendPayloadStatus(m.HeaderDownload(), headBlockHash, err) @@ -613,7 +605,7 @@ func TestPoSDownloader(t *testing.T) { FinalizedBlockHash: chain.TopBlock.Hash(), } m.SendForkChoiceRequest(&forkChoiceMessage) - headBlockHash, err = stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, 0, m.Notifications, initialCycle, m.UpdateHead, nil) + headBlockHash, err = stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, m.Notifications, initialCycle, m.UpdateHead, nil) require.NoError(t, err) stages.SendPayloadStatus(m.HeaderDownload(), headBlockHash, err) assert.Equal(t, chain.TopBlock.Hash(), headBlockHash) @@ -645,7 +637,7 @@ func TestPoSSyncWithInvalidHeader(t *testing.T) { m.SendPayloadRequest(payloadMessage) initialCycle := false - headBlockHash, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, 0, m.Notifications, initialCycle, m.UpdateHead, nil) + headBlockHash, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, m.Notifications, initialCycle, m.UpdateHead, nil) require.NoError(t, err) stages.SendPayloadStatus(m.HeaderDownload(), headBlockHash, err) @@ -664,7 +656,7 @@ func TestPoSSyncWithInvalidHeader(t *testing.T) { } m.ReceiveWg.Wait() - headBlockHash, err = stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, 0, m.Notifications, initialCycle, m.UpdateHead, nil) + headBlockHash, err = stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, m.Notifications, initialCycle, m.UpdateHead, nil) require.NoError(t, err) stages.SendPayloadStatus(m.HeaderDownload(), headBlockHash, err) @@ -675,7 +667,7 @@ func TestPoSSyncWithInvalidHeader(t *testing.T) { FinalizedBlockHash: invalidTip.Hash(), } m.SendForkChoiceRequest(&forkChoiceMessage) - _, err = stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, 0, m.Notifications, initialCycle, m.UpdateHead, nil) + _, err = stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, m.Notifications, initialCycle, m.UpdateHead, nil) require.NoError(t, err) bad, lastValidHash := m.HeaderDownload().IsBadHeaderPoS(invalidTip.Hash()) diff --git a/turbo/stages/stageloop.go b/turbo/stages/stageloop.go index 1f445b834..9eaa8071b 100644 --- a/turbo/stages/stageloop.go +++ b/turbo/stages/stageloop.go @@ -90,8 +90,7 @@ func StageLoop( } // Estimate the current top height seen from the peer - height := hd.TopSeenHeight() - headBlockHash, err := StageLoopStep(ctx, chainConfig, db, sync, height, notifications, initialCycle, updateHead, nil) + headBlockHash, err := StageLoopStep(ctx, chainConfig, db, sync, notifications, initialCycle, updateHead, nil) SendPayloadStatus(hd, headBlockHash, err) @@ -124,29 +123,20 @@ func StageLoop( } } -func StageLoopStep( - ctx context.Context, - chainConfig *params.ChainConfig, - db kv.RwDB, - sync *stagedsync.Sync, - highestSeenHeader uint64, - notifications *shards.Notifications, - initialCycle bool, - updateHead func(ctx context.Context, headHeight, headTime uint64, hash common.Hash, td *uint256.Int), - snapshotMigratorFinal func(tx kv.Tx) error, -) (headBlockHash common.Hash, err error) { +func StageLoopStep(ctx context.Context, + chainConfig *params.ChainConfig, db kv.RwDB, sync *stagedsync.Sync, + notifications *shards.Notifications, initialCycle bool, + updateHead func(ctx context.Context, headHeight uint64, + headTime uint64, hash common.Hash, td *uint256.Int, + ), snapshotMigratorFinal func(tx kv.Tx) error) (headBlockHash common.Hash, err error) { defer func() { if rec := recover(); rec != nil { err = fmt.Errorf("%+v, trace: %s", rec, dbg.Stack()) } }() // avoid crash because Erigon's core does many things - var origin, finishProgressBefore uint64 + var finishProgressBefore uint64 if err := db.View(ctx, func(tx kv.Tx) error { - origin, err = stages.GetStageProgress(tx, stages.Headers) - if err != nil { - return err - } finishProgressBefore, err = stages.GetStageProgress(tx, stages.Finish) if err != nil { return err @@ -155,8 +145,7 @@ func StageLoopStep( }); err != nil { return headBlockHash, err } - - canRunCycleInOneTransaction := !initialCycle && highestSeenHeader < origin+32_000 && highestSeenHeader < finishProgressBefore+32_000 + canRunCycleInOneTransaction := !initialCycle var tx kv.RwTx // on this variable will run sync cycle. if canRunCycleInOneTransaction {