Fix creation of block snapshots (#5579)

* Print snapshot prune

* More print

* Print

* Print

* Print

* Move snapshots stage forward

* Cleanup

* Fix tests

* Print

* Too much logging

* Remove print

* Log, check

* Revert

* No panic, print

* Fix tx numbering

* Harder condition to start retiring blocks

* Disable Pow verification after TTD is reached

* Fix POW verifying

* Print

* Prints

* Fix?

* cleanup

* Add migrations and hack

* More diagnostics

* More print

* Reset sequence only once

* Fix migration

* Remove print reset

* Fix lint

Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local>
Co-authored-by: Alex Sharp <alexsharp@Alexs-MacBook-Pro.local>
This commit is contained in:
ledgerwatch 2022-10-04 11:14:18 +01:00 committed by GitHub
parent 39383d936d
commit fd52a788b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 172 additions and 51 deletions

View File

@ -39,12 +39,14 @@ import (
"github.com/ledgerwatch/erigon/core/state"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/crypto"
"github.com/ledgerwatch/erigon/eth/ethconfig"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/erigon/ethdb"
"github.com/ledgerwatch/erigon/ethdb/cbor"
"github.com/ledgerwatch/erigon/internal/debug"
"github.com/ledgerwatch/erigon/params"
"github.com/ledgerwatch/erigon/rlp"
"github.com/ledgerwatch/erigon/turbo/snapshotsync"
"github.com/ledgerwatch/log/v3"
)
@ -491,8 +493,53 @@ func extractHeaders(chaindata string, block uint64, blockTotalOrOffset int64) er
return nil
}
func extractBodies(chaindata string, block uint64) error {
db := mdbx.MustOpen(chaindata)
func extractBodies(datadir string) error {
snaps := snapshotsync.NewRoSnapshots(ethconfig.Snapshot{
Enabled: true,
KeepBlocks: true,
Produce: false,
}, filepath.Join(datadir, "snapshots"))
snaps.ReopenFolder()
snaps.Bodies.View(func(sns []*snapshotsync.BodySegment) error {
for _, sn := range sns {
var firstBlockNum, firstBaseTxNum, firstAmount uint64
var lastBlockNum, lastBaseTxNum, lastAmount uint64
var prevBlockNum, prevBaseTxNum, prevAmount uint64
first := true
sn.Iterate(func(blockNum uint64, baseTxNum uint64, txAmount uint64) error {
if first {
firstBlockNum = blockNum
firstBaseTxNum = baseTxNum
firstAmount = txAmount
first = false
} else {
if blockNum != prevBlockNum+1 {
fmt.Printf("Discount block Num: %d => %d\n", prevBlockNum, blockNum)
}
if baseTxNum != prevBaseTxNum+prevAmount {
fmt.Printf("Wrong baseTxNum: %d+%d => %d\n", prevBaseTxNum, prevAmount, baseTxNum)
}
}
prevBlockNum = blockNum
lastBlockNum = blockNum
prevBaseTxNum = baseTxNum
lastBaseTxNum = baseTxNum
prevAmount = txAmount
lastAmount = txAmount
return nil
})
fmt.Printf("Seg: [%d, %d, %d] => [%d, %d, %d]\n", firstBlockNum, firstBaseTxNum, firstAmount, lastBlockNum, lastBaseTxNum, lastAmount)
}
return nil
})
if _, err := snaps.ViewTxs(snaps.BlocksAvailable(), func(sn *snapshotsync.TxnSegment) error {
lastTxnID := sn.IdxTxnHash.BaseDataID() + uint64(sn.Seg.Count())
fmt.Printf("txTxnID = %d\n", lastTxnID)
return nil
}); err != nil {
return err
}
db := mdbx.MustOpen(filepath.Join(datadir, "chaindata"))
defer db.Close()
tx, err := db.BeginRo(context.Background())
if err != nil {
@ -504,10 +551,9 @@ func extractBodies(chaindata string, block uint64) error {
return err
}
defer c.Close()
blockEncoded := dbutils.EncodeBlockNumber(block)
i := 0
var txId uint64
for k, _, err := c.Seek(blockEncoded); k != nil; k, _, err = c.Next() {
for k, _, err := c.First(); k != nil; k, _, err = c.Next() {
if err != nil {
return err
}
@ -524,17 +570,15 @@ func extractBodies(chaindata string, block uint64) error {
continue
}
i++
if txId == 0 {
txId = baseTxId
} else {
if txId > 0 {
if txId != baseTxId {
fmt.Printf("Mismatch txId for block %d, txId = %d, baseTxId = %d\n", blockNumber, txId, baseTxId)
}
}
txId += uint64(txAmount) + 2
//if i == 1 {
// break
//}
txId = baseTxId + uint64(txAmount) + 2
if i == 50 {
break
}
}
return nil
}
@ -1324,7 +1368,7 @@ func main() {
err = hackdb.TextInfo(*chaindata, &strings.Builder{})
case "extractBodies":
err = extractBodies(*chaindata, uint64(*block))
err = extractBodies(*chaindata)
case "repairCurrent":
repairCurrent()

View File

@ -81,7 +81,8 @@ func (s *Serenity) VerifyHeader(chain consensus.ChainHeaderReader, header *types
return err
}
if !reached {
return s.eth1Engine.VerifyHeader(chain, header, seal)
// Not verifying seals if the TTD is passed
return s.eth1Engine.VerifyHeader(chain, header, !chain.Config().TerminalTotalDifficultyPassed)
}
// Short circuit if the parent is not known
parent := chain.GetHeader(header.ParentHash, header.Number.Uint64()-1)

View File

@ -1315,6 +1315,7 @@ func TruncateBlocks(ctx context.Context, tx kv.RwTx, blockFrom uint64) error {
if blockFrom < 1 { //protect genesis
blockFrom = 1
}
sequenceTo := map[string]uint64{}
for k, _, err := c.Last(); k != nil; k, _, err = c.Prev() {
if err != nil {
return err
@ -1346,9 +1347,7 @@ func TruncateBlocks(ctx context.Context, tx kv.RwTx, blockFrom uint64) error {
}); err != nil {
return err
}
if err := ResetSequence(tx, bucket, b.BaseTxId); err != nil {
return err
}
sequenceTo[bucket] = b.BaseTxId
}
// Copying k because otherwise the same memory will be reused
// for the next key and Delete below will end up deleting 1 more record than required
@ -1368,6 +1367,11 @@ func TruncateBlocks(ctx context.Context, tx kv.RwTx, blockFrom uint64) error {
default:
}
}
for bucket, sequence := range sequenceTo {
if err := ResetSequence(tx, bucket, sequence); err != nil {
return err
}
}
return nil
}

View File

@ -74,7 +74,7 @@ func SpawnHashStateStage(s *StageState, tx kv.RwTx, cfg HashStateCfg, ctx contex
return err
}
} else {
if err := promoteHashedStateIncrementally(logPrefix, s.BlockNumber, to, tx, cfg, ctx.Done()); err != nil {
if err := promoteHashedStateIncrementally(logPrefix, s.BlockNumber, to, tx, cfg, ctx.Done(), quiet); err != nil {
return err
}
}
@ -494,8 +494,8 @@ func getCodeUnwindExtractFunc(db kv.Tx, changeSetBucket string) etl.ExtractFunc
}
}
func (p *Promoter) PromoteOnHistoryV3(logPrefix string, agg *state.Aggregator22, from, to uint64, storage, codes bool) error {
if to > from+16 {
func (p *Promoter) PromoteOnHistoryV3(logPrefix string, agg *state.Aggregator22, from, to uint64, storage, codes bool, quiet bool) error {
if !quiet && to > from+16 {
log.Info(fmt.Sprintf("[%s] Incremental promotion", logPrefix), "from", from, "to", to, "codes", codes, "storage", storage)
}
@ -616,14 +616,14 @@ func (p *Promoter) PromoteOnHistoryV3(logPrefix string, agg *state.Aggregator22,
}
return nil
}
func (p *Promoter) Promote(logPrefix string, from, to uint64, storage, codes bool) error {
func (p *Promoter) Promote(logPrefix string, from, to uint64, storage, codes bool, quiet bool) error {
var changeSetBucket string
if storage {
changeSetBucket = kv.StorageChangeSet
} else {
changeSetBucket = kv.AccountChangeSet
}
if to > from+16 {
if !quiet && to > from+16 {
log.Info(fmt.Sprintf("[%s] Incremental promotion", logPrefix), "from", from, "to", to, "codes", codes, "csbucket", changeSetBucket)
}
@ -822,29 +822,29 @@ func (p *Promoter) Unwind(logPrefix string, s *StageState, u *UnwindState, stora
)
}
func promoteHashedStateIncrementally(logPrefix string, from, to uint64, tx kv.RwTx, cfg HashStateCfg, quit <-chan struct{}) error {
func promoteHashedStateIncrementally(logPrefix string, from, to uint64, tx kv.RwTx, cfg HashStateCfg, quit <-chan struct{}, quiet bool) error {
prom := NewPromoter(tx, cfg.dirs, quit)
if cfg.historyV3 {
cfg.agg.SetTx(tx)
if err := prom.PromoteOnHistoryV3(logPrefix, cfg.agg, from, to, false, true); err != nil {
if err := prom.PromoteOnHistoryV3(logPrefix, cfg.agg, from, to, false, true, quiet); err != nil {
return err
}
if err := prom.PromoteOnHistoryV3(logPrefix, cfg.agg, from, to, false, false); err != nil {
if err := prom.PromoteOnHistoryV3(logPrefix, cfg.agg, from, to, false, false, quiet); err != nil {
return err
}
if err := prom.PromoteOnHistoryV3(logPrefix, cfg.agg, from, to, true, false); err != nil {
if err := prom.PromoteOnHistoryV3(logPrefix, cfg.agg, from, to, true, false, quiet); err != nil {
return err
}
return nil
}
if err := prom.Promote(logPrefix, from, to, false, true); err != nil {
if err := prom.Promote(logPrefix, from, to, false, true, quiet); err != nil {
return err
}
if err := prom.Promote(logPrefix, from, to, false, false); err != nil {
if err := prom.Promote(logPrefix, from, to, false, false, quiet); err != nil {
return err
}
if err := prom.Promote(logPrefix, from, to, true, false); err != nil {
if err := prom.Promote(logPrefix, from, to, true, false, quiet); err != nil {
return err
}
return nil

View File

@ -48,7 +48,7 @@ func TestPromoteHashedStateIncremental(t *testing.T) {
generateBlocks(t, 51, 50, hashedWriterGen(tx1), changeCodeWithIncarnations)
generateBlocks(t, 51, 50, plainWriterGen(tx2), changeCodeWithIncarnations)
err = promoteHashedStateIncrementally("logPrefix", 50, 101, tx2, cfg, nil)
err = promoteHashedStateIncrementally("logPrefix", 50, 101, tx2, cfg, nil /* quit */, false /* quiet */)
if err != nil {
t.Errorf("error while promoting state: %v", err)
}
@ -66,7 +66,7 @@ func TestPromoteHashedStateIncrementalMixed(t *testing.T) {
generateBlocks(t, 1, 50, hashedWriterGen(tx2), changeCodeWithIncarnations)
generateBlocks(t, 51, 50, plainWriterGen(tx2), changeCodeWithIncarnations)
err := promoteHashedStateIncrementally("logPrefix", 50, 101, tx2, StageHashStateCfg(db2, dirs, historyV3, nil), nil)
err := promoteHashedStateIncrementally("logPrefix", 50, 101, tx2, StageHashStateCfg(db2, dirs, historyV3, nil), nil /* quit */, false /* quiet */)
if err != nil {
t.Errorf("error while promoting state: %v", err)
}
@ -119,7 +119,7 @@ func TestPromoteIncrementallyShutdown(t *testing.T) {
}
db, tx := memdb.NewTestTx(t)
generateBlocks(t, 1, 10, plainWriterGen(tx), changeCodeWithIncarnations)
if err := promoteHashedStateIncrementally("logPrefix", 1, 10, tx, StageHashStateCfg(db, dirs, historyV3, nil), ctx.Done()); !errors.Is(err, tc.errExp) {
if err := promoteHashedStateIncrementally("logPrefix", 1, 10, tx, StageHashStateCfg(db, dirs, historyV3, nil), ctx.Done(), false /* quiet */); !errors.Is(err, tc.errExp) {
t.Errorf("error does not match expected error while shutdown promoteHashedStateIncrementally, got: %v, expected: %v", err, tc.errExp)
}
})

View File

@ -85,6 +85,21 @@ func SpawnStageSnapshots(
if err := DownloadAndIndexSnapshotsIfNeed(s, ctx, tx, cfg, initialCycle); err != nil {
return err
}
var minProgress uint64
for _, stage := range []stages.SyncStage{stages.Headers, stages.Bodies, stages.Senders, stages.TxLookup} {
progress, err := stages.GetStageProgress(tx, stage)
if err != nil {
return err
}
if minProgress == 0 || progress < minProgress {
minProgress = progress
}
}
if minProgress > s.BlockNumber {
if err = s.Update(tx, minProgress); err != nil {
return err
}
}
if !useExternalTx {
if err := tx.Commit(); err != nil {
return err
@ -223,7 +238,7 @@ func FillDBFromSnapshots(logPrefix string, ctx context.Context, tx kv.RwTx, tmpd
// ResetSequence - allow set arbitrary value to sequence (for example to decrement it to exact value)
ok, err := sn.ViewTxs(blocksAvailable, func(sn *snapshotsync.TxnSegment) error {
lastTxnID := sn.IdxTxnHash.BaseDataID() + uint64(sn.Seg.Count())
if err := rawdb.ResetSequence(tx, kv.EthTx, lastTxnID+1); err != nil {
if err := rawdb.ResetSequence(tx, kv.EthTx, lastTxnID); err != nil {
return err
}
return nil
@ -488,14 +503,13 @@ func retireBlocksInSingleBackgroundThread(s *PruneState, blockRetire *snapshotsy
return nil
}
ok, err := blockRetire.BackgroundResult.GetAndReset()
if !ok {
return nil
}
if err != nil {
return fmt.Errorf("[%s] %w", s.LogPrefix(), err)
}
if err := rawdb.WriteSnapshots(tx, blockRetire.Snapshots().Files()); err != nil {
return err
if ok {
if err := rawdb.WriteSnapshots(tx, blockRetire.Snapshots().Files()); err != nil {
return err
}
}
blockRetire.RetireBlocksInBackground(ctx, s.ForwardProgress, log.LvlInfo)

View File

@ -34,7 +34,7 @@ var migrations = map[kv.Label][]Migration{
kv.ChainDB: {
dbSchemaVersion5,
txsBeginEnd,
resetBlocks,
resetBlocks4,
},
kv.TxPoolDB: {},
kv.SentryDB: {},

View File

@ -2,18 +2,22 @@ package migrations
import (
"context"
"encoding/binary"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/core/rawdb/rawdbreset"
"github.com/ledgerwatch/erigon/eth/ethconfig"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/erigon/node/nodecfg/datadir"
"github.com/ledgerwatch/erigon/turbo/snapshotsync"
"github.com/ledgerwatch/erigon/turbo/snapshotsync/snap"
"github.com/ledgerwatch/log/v3"
)
var resetBlocks = Migration{
Name: "reset_blocks_3",
var resetBlocks4 = Migration{
Name: "reset_blocks_4",
Up: func(db kv.RwDB, dirs datadir.Dirs, progress []byte, BeforeCommit Callback) (err error) {
tx, err := db.BeginRw(context.Background())
if err != nil {
@ -32,26 +36,80 @@ var resetBlocks = Migration{
}
return tx.Commit()
}
genesisBlock := rawdb.ReadHeaderByNumber(tx, 0)
if genesisBlock == nil {
// Detect whether the correction is required
snaps := snapshotsync.NewRoSnapshots(ethconfig.Snapshot{
Enabled: true,
KeepBlocks: true,
Produce: false,
}, dirs.Snap)
snaps.ReopenFolder()
var lastFound bool
var lastBlockNum, lastBaseTxNum, lastAmount uint64
if err := snaps.Bodies.View(func(sns []*snapshotsync.BodySegment) error {
// Take the last snapshot
if len(sns) == 0 {
return nil
}
sn := sns[len(sns)-1]
sn.Iterate(func(blockNum uint64, baseTxNum uint64, txAmount uint64) error {
lastBlockNum = blockNum
lastBaseTxNum = baseTxNum
lastAmount = txAmount
lastFound = true
return nil
})
return nil
}); err != nil {
return err
}
if !lastFound {
if err := BeforeCommit(tx, nil, true); err != nil {
return err
}
return tx.Commit()
}
chainConfig, err := rawdb.ReadChainConfig(tx, genesisBlock.Hash())
c, err := tx.Cursor(kv.BlockBody)
if err != nil {
return err
}
defer c.Close()
var fixNeeded bool
for k, _, err := c.First(); k != nil; k, _, err = c.Next() {
if err != nil {
return err
}
blockNumber := binary.BigEndian.Uint64(k[:8])
if blockNumber != lastBlockNum+1 {
continue
}
blockHash := common.BytesToHash(k[8:])
var hash common.Hash
if hash, err = rawdb.ReadCanonicalHash(tx, blockNumber); err != nil {
return err
}
// ReadBody is not returning baseTxId which is written into the DB record, but that value + 1
_, baseTxId, _ := rawdb.ReadBody(tx, blockHash, blockNumber)
if hash != blockHash {
continue
}
if lastBaseTxNum+lastAmount+1 != baseTxId {
log.Info("Fix required, last block in seg files", "height", lastBlockNum, "baseTxNum", lastBaseTxNum, "txAmount", lastAmount, "first txId in DB", baseTxId, "expected", lastBaseTxNum+lastAmount+1)
fixNeeded = true
}
}
if !fixNeeded {
log.Info("Fix is not required")
if err := BeforeCommit(tx, nil, true); err != nil {
return err
}
return tx.Commit()
}
headersProgress, _ := stages.GetStageProgress(tx, stages.Headers)
if headersProgress > 0 {
log.Warn("NOTE: this migration will remove recent blocks (and senders) to fix several recent bugs. Your node will re-download last ~400K blocks, should not take very long")
}
if err := snap.RemoveNonPreverifiedFiles(chainConfig.ChainName, dirs.Snap); err != nil {
return err
}
if err := rawdbreset.ResetBlocks(tx, nil, nil); err != nil {
return err
}

View File

@ -1149,7 +1149,7 @@ func retireBlocks(ctx context.Context, blockFrom, blockTo uint64, chainID uint25
if len(rangesToMerge) == 0 {
return nil
}
err := merger.Merge(ctx, snapshots, rangesToMerge, snapshots.Dir(), true)
err := merger.Merge(ctx, snapshots, rangesToMerge, snapshots.Dir(), true /* doIndex */)
if err != nil {
return err
}
@ -1618,7 +1618,7 @@ func TransactionsIdx(ctx context.Context, chainID uint256.Int, blockFrom, blockT
}
defer d.Close()
if uint64(d.Count()) != expectedCount {
panic(fmt.Errorf("expect: %d, got %d", expectedCount, d.Count()))
return fmt.Errorf("TransactionsIdx: at=%d-%d, pre index building, expect: %d, got %d", blockFrom, blockTo, expectedCount, d.Count())
}
p.Name.Store(segFileName)
p.Total.Store(uint64(d.Count() * 2))
@ -1718,7 +1718,7 @@ RETRY:
}
if i != expectedCount {
panic(fmt.Errorf("expect: %d, got %d", expectedCount, i))
return fmt.Errorf("TransactionsIdx: at=%d-%d, post index building, expect: %d, got %d", blockFrom, blockTo, expectedCount, i)
}
if err := txnHashIdx.Build(); err != nil {

View File

@ -280,7 +280,7 @@ func StateStep(ctx context.Context, batch kv.RwTx, stateSync *stagedsync.Sync, h
return err
}
}
// Once we unwond we can start constructing the chain (assumption: len(headersChain) == len(bodiesChain))
// Once we unwound we can start constructing the chain (assumption: len(headersChain) == len(bodiesChain))
for i := range headersChain {
currentHeader := headersChain[i]
currentBody := bodiesChain[i]