StagedSync: break dependency to CurrentHeader. Always run non-initial cycle in 1 RwTx (#6348)

cc: @AlexeyAkhunov
This commit is contained in:
Alex Sharov 2022-12-18 07:59:31 +07:00 committed by GitHub
parent 0a3bfef484
commit d45bddc5ad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 33 additions and 56 deletions

View File

@ -52,7 +52,7 @@ func TestEthSubscribe(t *testing.T) {
initialCycle := true initialCycle := true
highestSeenHeader := chain.TopBlock.NumberU64() highestSeenHeader := chain.TopBlock.NumberU64()
if _, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, highestSeenHeader, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil { if _, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -59,8 +59,7 @@ func TestSendRawTransaction(t *testing.T) {
m.ReceiveWg.Wait() // Wait for all messages to be processed before we proceeed m.ReceiveWg.Wait() // Wait for all messages to be processed before we proceeed
initialCycle := true initialCycle := true
highestSeenHeader := chain.TopBlock.NumberU64() if _, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil {
if _, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, highestSeenHeader, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil {
t.Fatal(err) t.Fatal(err)
} }
} }

View File

@ -203,7 +203,6 @@ func missingBlocks(chainDB kv.RwDB, blocks []*types.Block) []*types.Block {
func InsertChain(ethereum *eth.Ethereum, chain *core.ChainPack) error { func InsertChain(ethereum *eth.Ethereum, chain *core.ChainPack) error {
sentryControlServer := ethereum.SentryControlServer() sentryControlServer := ethereum.SentryControlServer()
initialCycle := false initialCycle := false
highestSeenHeader := chain.TopBlock.NumberU64()
for _, b := range chain.Blocks { for _, b := range chain.Blocks {
sentryControlServer.Hd.AddMinedHeader(b.Header()) sentryControlServer.Hd.AddMinedHeader(b.Header())
@ -212,7 +211,7 @@ func InsertChain(ethereum *eth.Ethereum, chain *core.ChainPack) error {
sentryControlServer.Hd.MarkAllVerified() sentryControlServer.Hd.MarkAllVerified()
_, err := stages.StageLoopStep(ethereum.SentryCtx(), ethereum.ChainConfig(), ethereum.ChainDB(), ethereum.StagedSync(), highestSeenHeader, ethereum.Notifications(), initialCycle, sentryControlServer.UpdateHead, nil) _, err := stages.StageLoopStep(ethereum.SentryCtx(), ethereum.ChainConfig(), ethereum.ChainDB(), ethereum.StagedSync(), ethereum.Notifications(), initialCycle, sentryControlServer.UpdateHead, nil)
if err != nil { if err != nil {
return err return err
} }

View File

@ -550,11 +550,10 @@ func (ms *MockSentry) insertPoWBlocks(chain *core.ChainPack) error {
ms.ReceiveWg.Wait() // Wait for all messages to be processed before we proceed ms.ReceiveWg.Wait() // Wait for all messages to be processed before we proceed
initialCycle := false initialCycle := false
highestSeenHeader := chain.Blocks[n-1].NumberU64()
if ms.TxPool != nil { if ms.TxPool != nil {
ms.ReceiveWg.Add(1) ms.ReceiveWg.Add(1)
} }
if _, err = StageLoopStep(ms.Ctx, ms.ChainConfig, ms.DB, ms.Sync, highestSeenHeader, ms.Notifications, initialCycle, ms.UpdateHead, nil); err != nil { if _, err = StageLoopStep(ms.Ctx, ms.ChainConfig, ms.DB, ms.Sync, ms.Notifications, initialCycle, ms.UpdateHead, nil); err != nil {
return err return err
} }
if ms.TxPool != nil { if ms.TxPool != nil {
@ -574,8 +573,7 @@ func (ms *MockSentry) insertPoSBlocks(chain *core.ChainPack) error {
} }
initialCycle := false initialCycle := false
highestSeenHeader := chain.TopBlock.NumberU64() headBlockHash, err := StageLoopStep(ms.Ctx, ms.ChainConfig, ms.DB, ms.Sync, ms.Notifications, initialCycle, ms.UpdateHead, nil)
headBlockHash, err := StageLoopStep(ms.Ctx, ms.ChainConfig, ms.DB, ms.Sync, highestSeenHeader, ms.Notifications, initialCycle, ms.UpdateHead, nil)
if err != nil { if err != nil {
return err return err
} }
@ -588,7 +586,7 @@ func (ms *MockSentry) insertPoSBlocks(chain *core.ChainPack) error {
FinalizedBlockHash: chain.TopBlock.Hash(), FinalizedBlockHash: chain.TopBlock.Hash(),
} }
ms.SendForkChoiceRequest(&fc) ms.SendForkChoiceRequest(&fc)
headBlockHash, err = StageLoopStep(ms.Ctx, ms.ChainConfig, ms.DB, ms.Sync, highestSeenHeader, ms.Notifications, initialCycle, ms.UpdateHead, nil) headBlockHash, err = StageLoopStep(ms.Ctx, ms.ChainConfig, ms.DB, ms.Sync, ms.Notifications, initialCycle, ms.UpdateHead, nil)
if err != nil { if err != nil {
return err return err
} }

View File

@ -56,8 +56,7 @@ func TestHeaderStep(t *testing.T) {
m.ReceiveWg.Wait() // Wait for all messages to be processed before we proceed m.ReceiveWg.Wait() // Wait for all messages to be processed before we proceed
initialCycle := true initialCycle := true
highestSeenHeader := chain.TopBlock.NumberU64() if _, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil {
if _, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, highestSeenHeader, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil {
t.Fatal(err) t.Fatal(err)
} }
} }
@ -95,8 +94,7 @@ func TestMineBlockWith1Tx(t *testing.T) {
m.ReceiveWg.Wait() // Wait for all messages to be processed before we proceeed m.ReceiveWg.Wait() // Wait for all messages to be processed before we proceeed
initialCycle := true initialCycle := true
highestSeenHeader := chain.TopBlock.NumberU64() if _, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil {
if _, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, highestSeenHeader, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil {
t.Fatal(err) t.Fatal(err)
} }
} }
@ -164,8 +162,7 @@ func TestReorg(t *testing.T) {
m.ReceiveWg.Wait() // Wait for all messages to be processed before we proceeed m.ReceiveWg.Wait() // Wait for all messages to be processed before we proceeed
initialCycle := true initialCycle := true
highestSeenHeader := chain.TopBlock.NumberU64() if _, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil {
if _, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, highestSeenHeader, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -217,9 +214,8 @@ func TestReorg(t *testing.T) {
} }
m.ReceiveWg.Wait() // Wait for all messages to be processed before we proceeed m.ReceiveWg.Wait() // Wait for all messages to be processed before we proceeed
highestSeenHeader = short.TopBlock.NumberU64()
initialCycle = false initialCycle = false
if _, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, highestSeenHeader, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil { if _, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -262,8 +258,7 @@ func TestReorg(t *testing.T) {
m.ReceiveWg.Wait() // Wait for all messages to be processed before we proceeed m.ReceiveWg.Wait() // Wait for all messages to be processed before we proceeed
// This is unwind step // This is unwind step
highestSeenHeader = long1.TopBlock.NumberU64() if _, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil {
if _, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, highestSeenHeader, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -299,9 +294,8 @@ func TestReorg(t *testing.T) {
} }
m.ReceiveWg.Wait() // Wait for all messages to be processed before we proceeed m.ReceiveWg.Wait() // Wait for all messages to be processed before we proceeed
highestSeenHeader = short2.TopBlock.NumberU64()
initialCycle = false initialCycle = false
if _, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, highestSeenHeader, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil { if _, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil {
t.Fatal(err) t.Fatal(err)
} }
} }
@ -396,9 +390,8 @@ func TestAnchorReplace(t *testing.T) {
m.ReceiveWg.Wait() // Wait for all messages to be processed before we proceeed m.ReceiveWg.Wait() // Wait for all messages to be processed before we proceeed
highestSeenHeader := long.TopBlock.NumberU64()
initialCycle := true initialCycle := true
if _, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, highestSeenHeader, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil { if _, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil {
t.Fatal(err) t.Fatal(err)
} }
} }
@ -501,9 +494,8 @@ func TestAnchorReplace2(t *testing.T) {
m.ReceiveWg.Wait() // Wait for all messages to be processed before we proceeed m.ReceiveWg.Wait() // Wait for all messages to be processed before we proceeed
highestSeenHeader := long.TopBlock.NumberU64()
initialCycle := true initialCycle := true
if _, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, highestSeenHeader, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil { if _, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil {
t.Fatal(err) t.Fatal(err)
} }
} }
@ -520,7 +512,7 @@ func TestForkchoiceToGenesis(t *testing.T) {
m.SendForkChoiceRequest(&forkChoiceMessage) m.SendForkChoiceRequest(&forkChoiceMessage)
initialCycle := false initialCycle := false
headBlockHash, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, 0, m.Notifications, initialCycle, m.UpdateHead, nil) headBlockHash, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, m.Notifications, initialCycle, m.UpdateHead, nil)
require.NoError(t, err) require.NoError(t, err)
stages.SendPayloadStatus(m.HeaderDownload(), headBlockHash, err) stages.SendPayloadStatus(m.HeaderDownload(), headBlockHash, err)
@ -542,7 +534,7 @@ func TestBogusForkchoice(t *testing.T) {
m.SendForkChoiceRequest(&forkChoiceMessage) m.SendForkChoiceRequest(&forkChoiceMessage)
initialCycle := false initialCycle := false
headBlockHash, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, 0, m.Notifications, initialCycle, m.UpdateHead, nil) headBlockHash, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, m.Notifications, initialCycle, m.UpdateHead, nil)
require.NoError(t, err) require.NoError(t, err)
stages.SendPayloadStatus(m.HeaderDownload(), headBlockHash, err) stages.SendPayloadStatus(m.HeaderDownload(), headBlockHash, err)
@ -557,7 +549,7 @@ func TestBogusForkchoice(t *testing.T) {
} }
m.SendForkChoiceRequest(&forkChoiceMessage) m.SendForkChoiceRequest(&forkChoiceMessage)
headBlockHash, err = stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, 0, m.Notifications, initialCycle, m.UpdateHead, nil) headBlockHash, err = stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, m.Notifications, initialCycle, m.UpdateHead, nil)
require.NoError(t, err) require.NoError(t, err)
stages.SendPayloadStatus(m.HeaderDownload(), headBlockHash, err) stages.SendPayloadStatus(m.HeaderDownload(), headBlockHash, err)
@ -577,7 +569,7 @@ func TestPoSDownloader(t *testing.T) {
m.SendPayloadRequest(chain.TopBlock) m.SendPayloadRequest(chain.TopBlock)
initialCycle := false initialCycle := false
headBlockHash, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, 0, m.Notifications, initialCycle, m.UpdateHead, nil) headBlockHash, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, m.Notifications, initialCycle, m.UpdateHead, nil)
require.NoError(t, err) require.NoError(t, err)
stages.SendPayloadStatus(m.HeaderDownload(), headBlockHash, err) stages.SendPayloadStatus(m.HeaderDownload(), headBlockHash, err)
@ -597,12 +589,12 @@ func TestPoSDownloader(t *testing.T) {
m.ReceiveWg.Wait() m.ReceiveWg.Wait()
// First cycle: save the downloaded header // First cycle: save the downloaded header
headBlockHash, err = stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, 0, m.Notifications, initialCycle, m.UpdateHead, nil) headBlockHash, err = stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, m.Notifications, initialCycle, m.UpdateHead, nil)
require.NoError(t, err) require.NoError(t, err)
stages.SendPayloadStatus(m.HeaderDownload(), headBlockHash, err) stages.SendPayloadStatus(m.HeaderDownload(), headBlockHash, err)
// Second cycle: process the previous beacon request // Second cycle: process the previous beacon request
headBlockHash, err = stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, 0, m.Notifications, initialCycle, m.UpdateHead, nil) headBlockHash, err = stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, m.Notifications, initialCycle, m.UpdateHead, nil)
require.NoError(t, err) require.NoError(t, err)
stages.SendPayloadStatus(m.HeaderDownload(), headBlockHash, err) stages.SendPayloadStatus(m.HeaderDownload(), headBlockHash, err)
@ -613,7 +605,7 @@ func TestPoSDownloader(t *testing.T) {
FinalizedBlockHash: chain.TopBlock.Hash(), FinalizedBlockHash: chain.TopBlock.Hash(),
} }
m.SendForkChoiceRequest(&forkChoiceMessage) m.SendForkChoiceRequest(&forkChoiceMessage)
headBlockHash, err = stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, 0, m.Notifications, initialCycle, m.UpdateHead, nil) headBlockHash, err = stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, m.Notifications, initialCycle, m.UpdateHead, nil)
require.NoError(t, err) require.NoError(t, err)
stages.SendPayloadStatus(m.HeaderDownload(), headBlockHash, err) stages.SendPayloadStatus(m.HeaderDownload(), headBlockHash, err)
assert.Equal(t, chain.TopBlock.Hash(), headBlockHash) assert.Equal(t, chain.TopBlock.Hash(), headBlockHash)
@ -645,7 +637,7 @@ func TestPoSSyncWithInvalidHeader(t *testing.T) {
m.SendPayloadRequest(payloadMessage) m.SendPayloadRequest(payloadMessage)
initialCycle := false initialCycle := false
headBlockHash, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, 0, m.Notifications, initialCycle, m.UpdateHead, nil) headBlockHash, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, m.Notifications, initialCycle, m.UpdateHead, nil)
require.NoError(t, err) require.NoError(t, err)
stages.SendPayloadStatus(m.HeaderDownload(), headBlockHash, err) stages.SendPayloadStatus(m.HeaderDownload(), headBlockHash, err)
@ -664,7 +656,7 @@ func TestPoSSyncWithInvalidHeader(t *testing.T) {
} }
m.ReceiveWg.Wait() m.ReceiveWg.Wait()
headBlockHash, err = stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, 0, m.Notifications, initialCycle, m.UpdateHead, nil) headBlockHash, err = stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, m.Notifications, initialCycle, m.UpdateHead, nil)
require.NoError(t, err) require.NoError(t, err)
stages.SendPayloadStatus(m.HeaderDownload(), headBlockHash, err) stages.SendPayloadStatus(m.HeaderDownload(), headBlockHash, err)
@ -675,7 +667,7 @@ func TestPoSSyncWithInvalidHeader(t *testing.T) {
FinalizedBlockHash: invalidTip.Hash(), FinalizedBlockHash: invalidTip.Hash(),
} }
m.SendForkChoiceRequest(&forkChoiceMessage) m.SendForkChoiceRequest(&forkChoiceMessage)
_, err = stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, 0, m.Notifications, initialCycle, m.UpdateHead, nil) _, err = stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, m.Notifications, initialCycle, m.UpdateHead, nil)
require.NoError(t, err) require.NoError(t, err)
bad, lastValidHash := m.HeaderDownload().IsBadHeaderPoS(invalidTip.Hash()) bad, lastValidHash := m.HeaderDownload().IsBadHeaderPoS(invalidTip.Hash())

View File

@ -90,8 +90,7 @@ func StageLoop(
} }
// Estimate the current top height seen from the peer // Estimate the current top height seen from the peer
height := hd.TopSeenHeight() headBlockHash, err := StageLoopStep(ctx, chainConfig, db, sync, notifications, initialCycle, updateHead, nil)
headBlockHash, err := StageLoopStep(ctx, chainConfig, db, sync, height, notifications, initialCycle, updateHead, nil)
SendPayloadStatus(hd, headBlockHash, err) SendPayloadStatus(hd, headBlockHash, err)
@ -124,29 +123,20 @@ func StageLoop(
} }
} }
func StageLoopStep( func StageLoopStep(ctx context.Context,
ctx context.Context, chainConfig *params.ChainConfig, db kv.RwDB, sync *stagedsync.Sync,
chainConfig *params.ChainConfig, notifications *shards.Notifications, initialCycle bool,
db kv.RwDB, updateHead func(ctx context.Context, headHeight uint64,
sync *stagedsync.Sync, headTime uint64, hash common.Hash, td *uint256.Int,
highestSeenHeader uint64, ), snapshotMigratorFinal func(tx kv.Tx) error) (headBlockHash common.Hash, err error) {
notifications *shards.Notifications,
initialCycle bool,
updateHead func(ctx context.Context, headHeight, headTime uint64, hash common.Hash, td *uint256.Int),
snapshotMigratorFinal func(tx kv.Tx) error,
) (headBlockHash common.Hash, err error) {
defer func() { defer func() {
if rec := recover(); rec != nil { if rec := recover(); rec != nil {
err = fmt.Errorf("%+v, trace: %s", rec, dbg.Stack()) err = fmt.Errorf("%+v, trace: %s", rec, dbg.Stack())
} }
}() // avoid crash because Erigon's core does many things }() // avoid crash because Erigon's core does many things
var origin, finishProgressBefore uint64 var finishProgressBefore uint64
if err := db.View(ctx, func(tx kv.Tx) error { if err := db.View(ctx, func(tx kv.Tx) error {
origin, err = stages.GetStageProgress(tx, stages.Headers)
if err != nil {
return err
}
finishProgressBefore, err = stages.GetStageProgress(tx, stages.Finish) finishProgressBefore, err = stages.GetStageProgress(tx, stages.Finish)
if err != nil { if err != nil {
return err return err
@ -155,8 +145,7 @@ func StageLoopStep(
}); err != nil { }); err != nil {
return headBlockHash, err return headBlockHash, err
} }
canRunCycleInOneTransaction := !initialCycle
canRunCycleInOneTransaction := !initialCycle && highestSeenHeader < origin+32_000 && highestSeenHeader < finishProgressBefore+32_000
var tx kv.RwTx // on this variable will run sync cycle. var tx kv.RwTx // on this variable will run sync cycle.
if canRunCycleInOneTransaction { if canRunCycleInOneTransaction {