From fd52a788b7f4aab57ca879a7738b9b6733fa62c7 Mon Sep 17 00:00:00 2001 From: ledgerwatch Date: Tue, 4 Oct 2022 11:14:18 +0100 Subject: [PATCH] Fix creation of block snapshots (#5579) * Print snapshot prune * More print * Print * Print * Print * Move snapshots stage forward * Cleanup * Fix tests * Print * Too much logging * Remove print * Log, check * Revert * No panic, print * Fix tx numbering * Harder condition to start retiring blocks * Disable Pow verification after TTD is reached * Fix POW verifying * Print * Prints * Fix? * cleanup * Add migrations and hack * More diagnostics * More print * Reset sequence only once * Fix migration * Remove print reset * Fix lint Co-authored-by: Alexey Sharp Co-authored-by: Alex Sharp --- cmd/hack/hack.go | 68 +++++++++++++++++++---- consensus/serenity/serenity.go | 3 +- core/rawdb/accessors_chain.go | 10 +++- eth/stagedsync/stage_hashstate.go | 24 ++++---- eth/stagedsync/stage_hashstate_test.go | 6 +- eth/stagedsync/stage_snapshots.go | 26 +++++++-- migrations/migrations.go | 2 +- migrations/reset_blocks.go | 76 +++++++++++++++++++++++--- turbo/snapshotsync/block_snapshots.go | 6 +- turbo/stages/stageloop.go | 2 +- 10 files changed, 172 insertions(+), 51 deletions(-) diff --git a/cmd/hack/hack.go b/cmd/hack/hack.go index a01d7c983..41f355dd5 100644 --- a/cmd/hack/hack.go +++ b/cmd/hack/hack.go @@ -39,12 +39,14 @@ import ( "github.com/ledgerwatch/erigon/core/state" "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/crypto" + "github.com/ledgerwatch/erigon/eth/ethconfig" "github.com/ledgerwatch/erigon/eth/stagedsync/stages" "github.com/ledgerwatch/erigon/ethdb" "github.com/ledgerwatch/erigon/ethdb/cbor" "github.com/ledgerwatch/erigon/internal/debug" "github.com/ledgerwatch/erigon/params" "github.com/ledgerwatch/erigon/rlp" + "github.com/ledgerwatch/erigon/turbo/snapshotsync" "github.com/ledgerwatch/log/v3" ) @@ -491,8 +493,53 @@ func extractHeaders(chaindata string, block uint64, blockTotalOrOffset int64) er return nil } -func extractBodies(chaindata string, block uint64) error { - db := mdbx.MustOpen(chaindata) +func extractBodies(datadir string) error { + snaps := snapshotsync.NewRoSnapshots(ethconfig.Snapshot{ + Enabled: true, + KeepBlocks: true, + Produce: false, + }, filepath.Join(datadir, "snapshots")) + snaps.ReopenFolder() + snaps.Bodies.View(func(sns []*snapshotsync.BodySegment) error { + for _, sn := range sns { + var firstBlockNum, firstBaseTxNum, firstAmount uint64 + var lastBlockNum, lastBaseTxNum, lastAmount uint64 + var prevBlockNum, prevBaseTxNum, prevAmount uint64 + first := true + sn.Iterate(func(blockNum uint64, baseTxNum uint64, txAmount uint64) error { + if first { + firstBlockNum = blockNum + firstBaseTxNum = baseTxNum + firstAmount = txAmount + first = false + } else { + if blockNum != prevBlockNum+1 { + fmt.Printf("Discount block Num: %d => %d\n", prevBlockNum, blockNum) + } + if baseTxNum != prevBaseTxNum+prevAmount { + fmt.Printf("Wrong baseTxNum: %d+%d => %d\n", prevBaseTxNum, prevAmount, baseTxNum) + } + } + prevBlockNum = blockNum + lastBlockNum = blockNum + prevBaseTxNum = baseTxNum + lastBaseTxNum = baseTxNum + prevAmount = txAmount + lastAmount = txAmount + return nil + }) + fmt.Printf("Seg: [%d, %d, %d] => [%d, %d, %d]\n", firstBlockNum, firstBaseTxNum, firstAmount, lastBlockNum, lastBaseTxNum, lastAmount) + } + return nil + }) + if _, err := snaps.ViewTxs(snaps.BlocksAvailable(), func(sn *snapshotsync.TxnSegment) error { + lastTxnID := sn.IdxTxnHash.BaseDataID() + uint64(sn.Seg.Count()) + fmt.Printf("txTxnID = %d\n", lastTxnID) + return nil + }); err != nil { + return err + } + db := mdbx.MustOpen(filepath.Join(datadir, "chaindata")) defer db.Close() tx, err := db.BeginRo(context.Background()) if err != nil { @@ -504,10 +551,9 @@ func extractBodies(chaindata string, block uint64) error { return err } defer c.Close() - blockEncoded := dbutils.EncodeBlockNumber(block) i := 0 var txId uint64 - for k, _, err := c.Seek(blockEncoded); k != nil; k, _, err = c.Next() { + for k, _, err := c.First(); k != nil; k, _, err = c.Next() { if err != nil { return err } @@ -524,17 +570,15 @@ func extractBodies(chaindata string, block uint64) error { continue } i++ - if txId == 0 { - txId = baseTxId - } else { + if txId > 0 { if txId != baseTxId { fmt.Printf("Mismatch txId for block %d, txId = %d, baseTxId = %d\n", blockNumber, txId, baseTxId) } } - txId += uint64(txAmount) + 2 - //if i == 1 { - // break - //} + txId = baseTxId + uint64(txAmount) + 2 + if i == 50 { + break + } } return nil } @@ -1324,7 +1368,7 @@ func main() { err = hackdb.TextInfo(*chaindata, &strings.Builder{}) case "extractBodies": - err = extractBodies(*chaindata, uint64(*block)) + err = extractBodies(*chaindata) case "repairCurrent": repairCurrent() diff --git a/consensus/serenity/serenity.go b/consensus/serenity/serenity.go index c1192ea63..2eae340e0 100644 --- a/consensus/serenity/serenity.go +++ b/consensus/serenity/serenity.go @@ -81,7 +81,8 @@ func (s *Serenity) VerifyHeader(chain consensus.ChainHeaderReader, header *types return err } if !reached { - return s.eth1Engine.VerifyHeader(chain, header, seal) + // Not verifying seals if the TTD is passed + return s.eth1Engine.VerifyHeader(chain, header, !chain.Config().TerminalTotalDifficultyPassed) } // Short circuit if the parent is not known parent := chain.GetHeader(header.ParentHash, header.Number.Uint64()-1) diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go index e6a860c5f..c02ea9ab9 100644 --- a/core/rawdb/accessors_chain.go +++ b/core/rawdb/accessors_chain.go @@ -1315,6 +1315,7 @@ func TruncateBlocks(ctx context.Context, tx kv.RwTx, blockFrom uint64) error { if blockFrom < 1 { //protect genesis blockFrom = 1 } + sequenceTo := map[string]uint64{} for k, _, err := c.Last(); k != nil; k, _, err = c.Prev() { if err != nil { return err @@ -1346,9 +1347,7 @@ func TruncateBlocks(ctx context.Context, tx kv.RwTx, blockFrom uint64) error { }); err != nil { return err } - if err := ResetSequence(tx, bucket, b.BaseTxId); err != nil { - return err - } + sequenceTo[bucket] = b.BaseTxId } // Copying k because otherwise the same memory will be reused // for the next key and Delete below will end up deleting 1 more record than required @@ -1368,6 +1367,11 @@ func TruncateBlocks(ctx context.Context, tx kv.RwTx, blockFrom uint64) error { default: } } + for bucket, sequence := range sequenceTo { + if err := ResetSequence(tx, bucket, sequence); err != nil { + return err + } + } return nil } diff --git a/eth/stagedsync/stage_hashstate.go b/eth/stagedsync/stage_hashstate.go index 625b3cccf..32ea79fc5 100644 --- a/eth/stagedsync/stage_hashstate.go +++ b/eth/stagedsync/stage_hashstate.go @@ -74,7 +74,7 @@ func SpawnHashStateStage(s *StageState, tx kv.RwTx, cfg HashStateCfg, ctx contex return err } } else { - if err := promoteHashedStateIncrementally(logPrefix, s.BlockNumber, to, tx, cfg, ctx.Done()); err != nil { + if err := promoteHashedStateIncrementally(logPrefix, s.BlockNumber, to, tx, cfg, ctx.Done(), quiet); err != nil { return err } } @@ -494,8 +494,8 @@ func getCodeUnwindExtractFunc(db kv.Tx, changeSetBucket string) etl.ExtractFunc } } -func (p *Promoter) PromoteOnHistoryV3(logPrefix string, agg *state.Aggregator22, from, to uint64, storage, codes bool) error { - if to > from+16 { +func (p *Promoter) PromoteOnHistoryV3(logPrefix string, agg *state.Aggregator22, from, to uint64, storage, codes bool, quiet bool) error { + if !quiet && to > from+16 { log.Info(fmt.Sprintf("[%s] Incremental promotion", logPrefix), "from", from, "to", to, "codes", codes, "storage", storage) } @@ -616,14 +616,14 @@ func (p *Promoter) PromoteOnHistoryV3(logPrefix string, agg *state.Aggregator22, } return nil } -func (p *Promoter) Promote(logPrefix string, from, to uint64, storage, codes bool) error { +func (p *Promoter) Promote(logPrefix string, from, to uint64, storage, codes bool, quiet bool) error { var changeSetBucket string if storage { changeSetBucket = kv.StorageChangeSet } else { changeSetBucket = kv.AccountChangeSet } - if to > from+16 { + if !quiet && to > from+16 { log.Info(fmt.Sprintf("[%s] Incremental promotion", logPrefix), "from", from, "to", to, "codes", codes, "csbucket", changeSetBucket) } @@ -822,29 +822,29 @@ func (p *Promoter) Unwind(logPrefix string, s *StageState, u *UnwindState, stora ) } -func promoteHashedStateIncrementally(logPrefix string, from, to uint64, tx kv.RwTx, cfg HashStateCfg, quit <-chan struct{}) error { +func promoteHashedStateIncrementally(logPrefix string, from, to uint64, tx kv.RwTx, cfg HashStateCfg, quit <-chan struct{}, quiet bool) error { prom := NewPromoter(tx, cfg.dirs, quit) if cfg.historyV3 { cfg.agg.SetTx(tx) - if err := prom.PromoteOnHistoryV3(logPrefix, cfg.agg, from, to, false, true); err != nil { + if err := prom.PromoteOnHistoryV3(logPrefix, cfg.agg, from, to, false, true, quiet); err != nil { return err } - if err := prom.PromoteOnHistoryV3(logPrefix, cfg.agg, from, to, false, false); err != nil { + if err := prom.PromoteOnHistoryV3(logPrefix, cfg.agg, from, to, false, false, quiet); err != nil { return err } - if err := prom.PromoteOnHistoryV3(logPrefix, cfg.agg, from, to, true, false); err != nil { + if err := prom.PromoteOnHistoryV3(logPrefix, cfg.agg, from, to, true, false, quiet); err != nil { return err } return nil } - if err := prom.Promote(logPrefix, from, to, false, true); err != nil { + if err := prom.Promote(logPrefix, from, to, false, true, quiet); err != nil { return err } - if err := prom.Promote(logPrefix, from, to, false, false); err != nil { + if err := prom.Promote(logPrefix, from, to, false, false, quiet); err != nil { return err } - if err := prom.Promote(logPrefix, from, to, true, false); err != nil { + if err := prom.Promote(logPrefix, from, to, true, false, quiet); err != nil { return err } return nil diff --git a/eth/stagedsync/stage_hashstate_test.go b/eth/stagedsync/stage_hashstate_test.go index 7d1121407..89b5a4210 100644 --- a/eth/stagedsync/stage_hashstate_test.go +++ b/eth/stagedsync/stage_hashstate_test.go @@ -48,7 +48,7 @@ func TestPromoteHashedStateIncremental(t *testing.T) { generateBlocks(t, 51, 50, hashedWriterGen(tx1), changeCodeWithIncarnations) generateBlocks(t, 51, 50, plainWriterGen(tx2), changeCodeWithIncarnations) - err = promoteHashedStateIncrementally("logPrefix", 50, 101, tx2, cfg, nil) + err = promoteHashedStateIncrementally("logPrefix", 50, 101, tx2, cfg, nil /* quit */, false /* quiet */) if err != nil { t.Errorf("error while promoting state: %v", err) } @@ -66,7 +66,7 @@ func TestPromoteHashedStateIncrementalMixed(t *testing.T) { generateBlocks(t, 1, 50, hashedWriterGen(tx2), changeCodeWithIncarnations) generateBlocks(t, 51, 50, plainWriterGen(tx2), changeCodeWithIncarnations) - err := promoteHashedStateIncrementally("logPrefix", 50, 101, tx2, StageHashStateCfg(db2, dirs, historyV3, nil), nil) + err := promoteHashedStateIncrementally("logPrefix", 50, 101, tx2, StageHashStateCfg(db2, dirs, historyV3, nil), nil /* quit */, false /* quiet */) if err != nil { t.Errorf("error while promoting state: %v", err) } @@ -119,7 +119,7 @@ func TestPromoteIncrementallyShutdown(t *testing.T) { } db, tx := memdb.NewTestTx(t) generateBlocks(t, 1, 10, plainWriterGen(tx), changeCodeWithIncarnations) - if err := promoteHashedStateIncrementally("logPrefix", 1, 10, tx, StageHashStateCfg(db, dirs, historyV3, nil), ctx.Done()); !errors.Is(err, tc.errExp) { + if err := promoteHashedStateIncrementally("logPrefix", 1, 10, tx, StageHashStateCfg(db, dirs, historyV3, nil), ctx.Done(), false /* quiet */); !errors.Is(err, tc.errExp) { t.Errorf("error does not match expected error while shutdown promoteHashedStateIncrementally, got: %v, expected: %v", err, tc.errExp) } }) diff --git a/eth/stagedsync/stage_snapshots.go b/eth/stagedsync/stage_snapshots.go index afa2a1100..ac61120ae 100644 --- a/eth/stagedsync/stage_snapshots.go +++ b/eth/stagedsync/stage_snapshots.go @@ -85,6 +85,21 @@ func SpawnStageSnapshots( if err := DownloadAndIndexSnapshotsIfNeed(s, ctx, tx, cfg, initialCycle); err != nil { return err } + var minProgress uint64 + for _, stage := range []stages.SyncStage{stages.Headers, stages.Bodies, stages.Senders, stages.TxLookup} { + progress, err := stages.GetStageProgress(tx, stage) + if err != nil { + return err + } + if minProgress == 0 || progress < minProgress { + minProgress = progress + } + } + if minProgress > s.BlockNumber { + if err = s.Update(tx, minProgress); err != nil { + return err + } + } if !useExternalTx { if err := tx.Commit(); err != nil { return err @@ -223,7 +238,7 @@ func FillDBFromSnapshots(logPrefix string, ctx context.Context, tx kv.RwTx, tmpd // ResetSequence - allow set arbitrary value to sequence (for example to decrement it to exact value) ok, err := sn.ViewTxs(blocksAvailable, func(sn *snapshotsync.TxnSegment) error { lastTxnID := sn.IdxTxnHash.BaseDataID() + uint64(sn.Seg.Count()) - if err := rawdb.ResetSequence(tx, kv.EthTx, lastTxnID+1); err != nil { + if err := rawdb.ResetSequence(tx, kv.EthTx, lastTxnID); err != nil { return err } return nil @@ -488,14 +503,13 @@ func retireBlocksInSingleBackgroundThread(s *PruneState, blockRetire *snapshotsy return nil } ok, err := blockRetire.BackgroundResult.GetAndReset() - if !ok { - return nil - } if err != nil { return fmt.Errorf("[%s] %w", s.LogPrefix(), err) } - if err := rawdb.WriteSnapshots(tx, blockRetire.Snapshots().Files()); err != nil { - return err + if ok { + if err := rawdb.WriteSnapshots(tx, blockRetire.Snapshots().Files()); err != nil { + return err + } } blockRetire.RetireBlocksInBackground(ctx, s.ForwardProgress, log.LvlInfo) diff --git a/migrations/migrations.go b/migrations/migrations.go index e5158f427..b5994461a 100644 --- a/migrations/migrations.go +++ b/migrations/migrations.go @@ -34,7 +34,7 @@ var migrations = map[kv.Label][]Migration{ kv.ChainDB: { dbSchemaVersion5, txsBeginEnd, - resetBlocks, + resetBlocks4, }, kv.TxPoolDB: {}, kv.SentryDB: {}, diff --git a/migrations/reset_blocks.go b/migrations/reset_blocks.go index 86c643d50..cfc1ce6ad 100644 --- a/migrations/reset_blocks.go +++ b/migrations/reset_blocks.go @@ -2,18 +2,22 @@ package migrations import ( "context" + "encoding/binary" "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/erigon/core/rawdb/rawdbreset" + "github.com/ledgerwatch/erigon/eth/ethconfig" "github.com/ledgerwatch/erigon/eth/stagedsync/stages" "github.com/ledgerwatch/erigon/node/nodecfg/datadir" + "github.com/ledgerwatch/erigon/turbo/snapshotsync" "github.com/ledgerwatch/erigon/turbo/snapshotsync/snap" "github.com/ledgerwatch/log/v3" ) -var resetBlocks = Migration{ - Name: "reset_blocks_3", +var resetBlocks4 = Migration{ + Name: "reset_blocks_4", Up: func(db kv.RwDB, dirs datadir.Dirs, progress []byte, BeforeCommit Callback) (err error) { tx, err := db.BeginRw(context.Background()) if err != nil { @@ -32,26 +36,80 @@ var resetBlocks = Migration{ } return tx.Commit() } - genesisBlock := rawdb.ReadHeaderByNumber(tx, 0) - if genesisBlock == nil { + // Detect whether the correction is required + snaps := snapshotsync.NewRoSnapshots(ethconfig.Snapshot{ + Enabled: true, + KeepBlocks: true, + Produce: false, + }, dirs.Snap) + snaps.ReopenFolder() + var lastFound bool + var lastBlockNum, lastBaseTxNum, lastAmount uint64 + if err := snaps.Bodies.View(func(sns []*snapshotsync.BodySegment) error { + // Take the last snapshot + if len(sns) == 0 { + return nil + } + sn := sns[len(sns)-1] + sn.Iterate(func(blockNum uint64, baseTxNum uint64, txAmount uint64) error { + lastBlockNum = blockNum + lastBaseTxNum = baseTxNum + lastAmount = txAmount + lastFound = true + return nil + }) + return nil + }); err != nil { + return err + } + if !lastFound { if err := BeforeCommit(tx, nil, true); err != nil { return err } return tx.Commit() } - chainConfig, err := rawdb.ReadChainConfig(tx, genesisBlock.Hash()) + c, err := tx.Cursor(kv.BlockBody) if err != nil { return err } + defer c.Close() + var fixNeeded bool + for k, _, err := c.First(); k != nil; k, _, err = c.Next() { + if err != nil { + return err + } + blockNumber := binary.BigEndian.Uint64(k[:8]) + if blockNumber != lastBlockNum+1 { + continue + } + blockHash := common.BytesToHash(k[8:]) + var hash common.Hash + if hash, err = rawdb.ReadCanonicalHash(tx, blockNumber); err != nil { + return err + } + // ReadBody is not returning baseTxId which is written into the DB record, but that value + 1 + _, baseTxId, _ := rawdb.ReadBody(tx, blockHash, blockNumber) + if hash != blockHash { + continue + } + if lastBaseTxNum+lastAmount+1 != baseTxId { + log.Info("Fix required, last block in seg files", "height", lastBlockNum, "baseTxNum", lastBaseTxNum, "txAmount", lastAmount, "first txId in DB", baseTxId, "expected", lastBaseTxNum+lastAmount+1) + fixNeeded = true + } + } + if !fixNeeded { + log.Info("Fix is not required") + if err := BeforeCommit(tx, nil, true); err != nil { + return err + } + return tx.Commit() + } + headersProgress, _ := stages.GetStageProgress(tx, stages.Headers) if headersProgress > 0 { log.Warn("NOTE: this migration will remove recent blocks (and senders) to fix several recent bugs. Your node will re-download last ~400K blocks, should not take very long") } - if err := snap.RemoveNonPreverifiedFiles(chainConfig.ChainName, dirs.Snap); err != nil { - return err - } - if err := rawdbreset.ResetBlocks(tx, nil, nil); err != nil { return err } diff --git a/turbo/snapshotsync/block_snapshots.go b/turbo/snapshotsync/block_snapshots.go index 60d8c9d11..103a92960 100644 --- a/turbo/snapshotsync/block_snapshots.go +++ b/turbo/snapshotsync/block_snapshots.go @@ -1149,7 +1149,7 @@ func retireBlocks(ctx context.Context, blockFrom, blockTo uint64, chainID uint25 if len(rangesToMerge) == 0 { return nil } - err := merger.Merge(ctx, snapshots, rangesToMerge, snapshots.Dir(), true) + err := merger.Merge(ctx, snapshots, rangesToMerge, snapshots.Dir(), true /* doIndex */) if err != nil { return err } @@ -1618,7 +1618,7 @@ func TransactionsIdx(ctx context.Context, chainID uint256.Int, blockFrom, blockT } defer d.Close() if uint64(d.Count()) != expectedCount { - panic(fmt.Errorf("expect: %d, got %d", expectedCount, d.Count())) + return fmt.Errorf("TransactionsIdx: at=%d-%d, pre index building, expect: %d, got %d", blockFrom, blockTo, expectedCount, d.Count()) } p.Name.Store(segFileName) p.Total.Store(uint64(d.Count() * 2)) @@ -1718,7 +1718,7 @@ RETRY: } if i != expectedCount { - panic(fmt.Errorf("expect: %d, got %d", expectedCount, i)) + return fmt.Errorf("TransactionsIdx: at=%d-%d, post index building, expect: %d, got %d", blockFrom, blockTo, expectedCount, i) } if err := txnHashIdx.Build(); err != nil { diff --git a/turbo/stages/stageloop.go b/turbo/stages/stageloop.go index c501d9d21..bf8549422 100644 --- a/turbo/stages/stageloop.go +++ b/turbo/stages/stageloop.go @@ -280,7 +280,7 @@ func StateStep(ctx context.Context, batch kv.RwTx, stateSync *stagedsync.Sync, h return err } } - // Once we unwond we can start constructing the chain (assumption: len(headersChain) == len(bodiesChain)) + // Once we unwound we can start constructing the chain (assumption: len(headersChain) == len(bodiesChain)) for i := range headersChain { currentHeader := headersChain[i] currentBody := bodiesChain[i]