This commit is contained in:
alex.sharov 2022-12-01 13:47:35 +07:00
parent d3d65f6caa
commit 5d9e74d39c
4 changed files with 173 additions and 252 deletions

View File

@ -632,11 +632,7 @@ func stageSenders(db kv.RwDB, ctx context.Context) error {
}
if reset {
err = reset2.ResetSenders(tx)
if err != nil {
return err
}
return tx.Commit()
return db.Update(ctx, func(tx kv.RwTx) error { return reset2.ResetSenders(ctx, db, tx) })
}
s := stage(sync, tx, nil, stages.Senders)
@ -687,10 +683,7 @@ func stageExec(db kv.RwDB, ctx context.Context) error {
_, agg := allSnapshots(db)
if reset {
if err := db.Update(ctx, func(tx kv.RwTx) error { return reset2.ResetExec(tx, chain) }); err != nil {
return err
}
return nil
return reset2.ResetExec(ctx, db, chain)
}
if txtrace {
@ -754,18 +747,15 @@ func stageTrie(db kv.RwDB, ctx context.Context) error {
must(sync.SetCurrentStage(stages.IntermediateHashes))
_, agg := allSnapshots(db)
if reset {
return reset2.ResetIH(ctx, db)
}
tx, err := db.BeginRw(ctx)
if err != nil {
return err
}
defer tx.Rollback()
if reset {
if err := stagedsync.ResetIH(tx); err != nil {
return err
}
return tx.Commit()
}
execStage := stage(sync, tx, nil, stages.Execution)
s := stage(sync, tx, nil, stages.IntermediateHashes)
@ -808,20 +798,16 @@ func stageHashState(db kv.RwDB, ctx context.Context) error {
must(sync.SetCurrentStage(stages.HashState))
_, agg := allSnapshots(db)
if reset {
return reset2.ResetHashState(ctx, db)
}
tx, err := db.BeginRw(ctx)
if err != nil {
return err
}
defer tx.Rollback()
if reset {
err = stagedsync.ResetHashState(tx)
if err != nil {
return err
}
return tx.Commit()
}
s := stage(sync, tx, nil, stages.HashState)
if pruneTo > 0 {
pm.History = prune.Distance(s.BlockNumber - pruneTo)
@ -864,20 +850,15 @@ func stageLogIndex(db kv.RwDB, ctx context.Context) error {
}
_, _, sync, _, _ := newSync(ctx, db, nil)
must(sync.SetCurrentStage(stages.LogIndex))
if reset {
return reset2.ResetLogIndex(ctx, db)
}
tx, err := db.BeginRw(ctx)
if err != nil {
return err
}
defer tx.Rollback()
if reset {
err = reset2.ResetLogIndex(tx)
if err != nil {
return err
}
return tx.Commit()
}
execAt := progress(tx, stages.Execution)
s := stage(sync, tx, nil, stages.LogIndex)
if pruneTo > 0 {
@ -921,19 +902,16 @@ func stageCallTraces(db kv.RwDB, ctx context.Context) error {
}
_, _, sync, _, _ := newSync(ctx, db, nil)
must(sync.SetCurrentStage(stages.CallTraces))
if reset {
return reset2.ResetCallTraces(ctx, db)
}
tx, err := db.BeginRw(ctx)
if err != nil {
return err
}
defer tx.Rollback()
if reset {
err = reset2.ResetCallTraces(tx)
if err != nil {
return err
}
return tx.Commit()
}
var batchSize datasize.ByteSize
must(batchSize.UnmarshalText([]byte(batchSizeStr)))
@ -985,19 +963,15 @@ func stageHistory(db kv.RwDB, ctx context.Context) error {
_, _, sync, _, _ := newSync(ctx, db, nil)
must(sync.SetCurrentStage(stages.AccountHistoryIndex))
if reset {
return reset2.ResetHistory(ctx, db)
}
tx, err := db.BeginRw(ctx)
if err != nil {
return err
}
defer tx.Rollback()
if reset {
err = reset2.ResetHistory(tx)
if err != nil {
return err
}
return tx.Commit()
}
execStage := progress(tx, stages.Execution)
stageStorage := stage(sync, tx, nil, stages.StorageHistoryIndex)
stageAcc := stage(sync, tx, nil, stages.AccountHistoryIndex)
@ -1058,19 +1032,15 @@ func stageTxLookup(db kv.RwDB, ctx context.Context) error {
must(sync.SetCurrentStage(stages.TxLookup))
sn, _ := allSnapshots(db)
if reset {
return db.Update(ctx, func(tx kv.RwTx) error { return reset2.ResetTxLookup(tx) })
}
tx, err := db.BeginRw(ctx)
if err != nil {
return err
}
defer tx.Rollback()
if reset {
err = reset2.ResetTxLookup(tx)
if err != nil {
return err
}
return tx.Commit()
}
s := stage(sync, tx, nil, stages.TxLookup)
if pruneTo > 0 {
pm.History = prune.Distance(s.BlockNumber - pruneTo)

View File

@ -17,51 +17,35 @@ import (
func ResetState(db kv.RwDB, ctx context.Context, chain string) error {
// don't reset senders here
if err := db.Update(ctx, stagedsync.ResetHashState); err != nil {
if err := ResetHashState(ctx, db); err != nil {
return err
}
if err := db.Update(ctx, stagedsync.ResetIH); err != nil {
if err := ResetIH(ctx, db); err != nil {
return err
}
if err := db.Update(ctx, ResetHistory); err != nil {
if err := ResetHistory(ctx, db); err != nil {
return err
}
if err := db.Update(ctx, ResetLogIndex); err != nil {
if err := ResetLogIndex(ctx, db); err != nil {
return err
}
if err := db.Update(ctx, ResetCallTraces); err != nil {
if err := ResetCallTraces(ctx, db); err != nil {
return err
}
if err := db.Update(ctx, ResetTxLookup); err != nil {
return err
}
if err := db.Update(ctx, ResetFinish); err != nil {
if err := ResetFinish(ctx, db); err != nil {
return err
}
if err := db.Update(ctx, func(tx kv.RwTx) error { return ResetExec(tx, chain) }); err != nil {
if err := ResetExec(ctx, db, chain); err != nil {
return err
}
return nil
}
func ResetBlocks(tx kv.RwTx, db kv.RoDB, snapshots *snapshotsync.RoSnapshots, br services.HeaderAndCanonicalReader, tmpdir string) error {
go func() { //inverted read-ahead - to warmup data
_ = db.View(context.Background(), func(tx kv.Tx) error {
c, err := tx.Cursor(kv.EthTx)
if err != nil {
return err
}
defer c.Close()
for k, _, err := c.Last(); k != nil; k, _, err = c.Prev() {
if err != nil {
return err
}
}
return nil
})
}()
// keep Genesis
if err := rawdb.TruncateBlocks(context.Background(), tx, 1); err != nil {
return err
@ -95,13 +79,12 @@ func ResetBlocks(tx kv.RwTx, db kv.RoDB, snapshots *snapshotsync.RoSnapshots, br
if err := tx.ForEach(kv.BlockBody, dbutils.EncodeBlockNumber(2), func(k, _ []byte) error { return tx.Delete(kv.BlockBody, k) }); err != nil {
return err
}
if err := tx.ClearBucket(kv.NonCanonicalTxs); err != nil {
return err
}
if err := tx.ClearBucket(kv.EthTx); err != nil {
return err
}
if err := tx.ClearBucket(kv.MaxTxNum); err != nil {
if err := clearTables(context.Background(), db, tx,
kv.NonCanonicalTxs,
kv.EthTx,
kv.MaxTxNum,
); err != nil {
return err
}
if err := rawdb.ResetSequence(tx, kv.EthTx, 0); err != nil {
@ -123,153 +106,99 @@ func ResetBlocks(tx kv.RwTx, db kv.RoDB, snapshots *snapshotsync.RoSnapshots, br
return nil
}
func ResetSenders(tx kv.RwTx) error {
if err := tx.ClearBucket(kv.Senders); err != nil {
return err
func ResetSenders(ctx context.Context, db kv.RwDB, tx kv.RwTx) error {
if err := clearTables(ctx, db, tx, kv.Senders); err != nil {
return nil
}
if err := stages.SaveStageProgress(tx, stages.Senders, 0); err != nil {
return err
}
if err := stages.SaveStagePruneProgress(tx, stages.Senders, 0); err != nil {
return err
}
return nil
return clearStageProgress(tx, stages.Senders)
}
func ResetExec(tx kv.RwTx, chain string) (err error) {
if err = stages.SaveStageProgress(tx, stages.Execution, 0); err != nil {
return err
}
if err = stages.SaveStagePruneProgress(tx, stages.Execution, 0); err != nil {
return err
}
if err = stages.SaveStageProgress(tx, stages.HashState, 0); err != nil {
return err
}
if err = stages.SaveStagePruneProgress(tx, stages.HashState, 0); err != nil {
return err
}
if err = stages.SaveStageProgress(tx, stages.IntermediateHashes, 0); err != nil {
return err
}
if err = stages.SaveStagePruneProgress(tx, stages.IntermediateHashes, 0); err != nil {
return err
}
stateBuckets := []string{
kv.PlainState, kv.HashedAccounts, kv.HashedStorage, kv.TrieOfAccounts, kv.TrieOfStorage,
kv.Epoch, kv.PendingEpoch, kv.BorReceipts,
kv.Code, kv.PlainContractCode, kv.ContractCode, kv.IncarnationMap,
}
for _, b := range stateBuckets {
log.Info("Clear", "table", b)
if err := tx.ClearBucket(b); err != nil {
func ResetExec(ctx context.Context, db kv.RwDB, chain string) (err error) {
return db.Update(ctx, func(tx kv.RwTx) error {
if err := clearStageProgress(tx, stages.Execution, stages.HashState, stages.IntermediateHashes); err != nil {
return err
}
}
historyV3, err := rawdb.HistoryV3.Enabled(tx)
if err != nil {
return err
}
if historyV3 {
buckets := []string{
kv.AccountHistoryKeys, kv.AccountIdx, kv.AccountHistoryVals, kv.AccountSettings,
kv.StorageKeys, kv.StorageVals, kv.StorageHistoryKeys, kv.StorageHistoryVals, kv.StorageSettings, kv.StorageIdx,
kv.CodeKeys, kv.CodeVals, kv.CodeHistoryKeys, kv.CodeHistoryVals, kv.CodeSettings, kv.CodeIdx,
kv.AccountHistoryKeys, kv.AccountIdx, kv.AccountHistoryVals, kv.AccountSettings,
kv.StorageHistoryKeys, kv.StorageIdx, kv.StorageHistoryVals, kv.StorageSettings,
kv.CodeHistoryKeys, kv.CodeIdx, kv.CodeHistoryVals, kv.CodeSettings,
kv.LogAddressKeys, kv.LogAddressIdx,
kv.LogTopicsKeys, kv.LogTopicsIdx,
kv.TracesFromKeys, kv.TracesFromIdx,
kv.TracesToKeys, kv.TracesToIdx,
stateBuckets := []string{
kv.PlainState, kv.HashedAccounts, kv.HashedStorage, kv.TrieOfAccounts, kv.TrieOfStorage,
kv.Epoch, kv.PendingEpoch, kv.BorReceipts,
kv.Code, kv.PlainContractCode, kv.ContractCode, kv.IncarnationMap,
}
for _, b := range buckets {
log.Info("Clear", "table", b)
if err := clearTables(ctx, db, tx, stateBuckets...); err != nil {
return nil
}
for _, b := range stateBuckets {
if err := tx.ClearBucket(b); err != nil {
return err
}
}
} else {
if err := tx.ClearBucket(kv.AccountChangeSet); err != nil {
historyV3, err := rawdb.HistoryV3.Enabled(tx)
if err != nil {
return err
}
if err := tx.ClearBucket(kv.StorageChangeSet); err != nil {
return err
}
if err := tx.ClearBucket(kv.Receipts); err != nil {
return err
}
if err := tx.ClearBucket(kv.Log); err != nil {
return err
}
if err := tx.ClearBucket(kv.CallTraceSet); err != nil {
return err
if historyV3 {
buckets := []string{
kv.AccountHistoryKeys, kv.AccountIdx, kv.AccountHistoryVals, kv.AccountSettings,
kv.StorageKeys, kv.StorageVals, kv.StorageHistoryKeys, kv.StorageHistoryVals, kv.StorageSettings, kv.StorageIdx,
kv.CodeKeys, kv.CodeVals, kv.CodeHistoryKeys, kv.CodeHistoryVals, kv.CodeSettings, kv.CodeIdx,
kv.AccountHistoryKeys, kv.AccountIdx, kv.AccountHistoryVals, kv.AccountSettings,
kv.StorageHistoryKeys, kv.StorageIdx, kv.StorageHistoryVals, kv.StorageSettings,
kv.CodeHistoryKeys, kv.CodeIdx, kv.CodeHistoryVals, kv.CodeSettings,
kv.LogAddressKeys, kv.LogAddressIdx,
kv.LogTopicsKeys, kv.LogTopicsIdx,
kv.TracesFromKeys, kv.TracesFromIdx,
kv.TracesToKeys, kv.TracesToIdx,
}
if err := clearTables(ctx, db, tx, buckets...); err != nil {
return nil
}
} else {
if err := clearTables(ctx, db, tx,
kv.AccountChangeSet,
kv.StorageChangeSet,
kv.Receipts,
kv.Log,
kv.CallTraceSet,
); err != nil {
return nil
}
genesis := core.DefaultGenesisBlockByChainName(chain)
if _, _, err := genesis.WriteGenesisState(tx); err != nil {
return err
}
}
genesis := core.DefaultGenesisBlockByChainName(chain)
if _, _, err := genesis.WriteGenesisState(tx); err != nil {
return err
}
}
return nil
return nil
})
}
func ResetHistory(tx kv.RwTx) error {
if err := tx.ClearBucket(kv.AccountsHistory); err != nil {
return err
}
if err := tx.ClearBucket(kv.StorageHistory); err != nil {
return err
}
if err := stages.SaveStageProgress(tx, stages.AccountHistoryIndex, 0); err != nil {
return err
}
if err := stages.SaveStageProgress(tx, stages.StorageHistoryIndex, 0); err != nil {
return err
}
if err := stages.SaveStagePruneProgress(tx, stages.AccountHistoryIndex, 0); err != nil {
return err
}
if err := stages.SaveStagePruneProgress(tx, stages.StorageHistoryIndex, 0); err != nil {
return err
}
return nil
func ResetHistory(ctx context.Context, db kv.RwDB) error {
return db.Update(ctx, func(tx kv.RwTx) error {
if err := clearTables(ctx, db, tx, kv.AccountsHistory, kv.StorageHistory); err != nil {
return nil
}
return clearStageProgress(tx, stages.AccountHistoryIndex, stages.StorageHistoryIndex)
})
}
func ResetLogIndex(tx kv.RwTx) error {
if err := tx.ClearBucket(kv.LogAddressIndex); err != nil {
return err
}
if err := tx.ClearBucket(kv.LogTopicIndex); err != nil {
return err
}
if err := stages.SaveStageProgress(tx, stages.LogIndex, 0); err != nil {
return err
}
if err := stages.SaveStagePruneProgress(tx, stages.LogIndex, 0); err != nil {
return err
}
return nil
func ResetLogIndex(ctx context.Context, db kv.RwDB) error {
return db.Update(ctx, func(tx kv.RwTx) error {
if err := clearTables(ctx, db, tx, kv.LogAddressIndex, kv.LogTopicIndex); err != nil {
return nil
}
return clearStageProgress(tx, stages.LogIndex)
})
}
func ResetCallTraces(tx kv.RwTx) error {
if err := tx.ClearBucket(kv.CallFromIndex); err != nil {
return err
}
if err := tx.ClearBucket(kv.CallToIndex); err != nil {
return err
}
if err := stages.SaveStageProgress(tx, stages.CallTraces, 0); err != nil {
return err
}
if err := stages.SaveStagePruneProgress(tx, stages.CallTraces, 0); err != nil {
return err
}
return nil
func ResetCallTraces(ctx context.Context, db kv.RwDB) error {
return db.Update(ctx, func(tx kv.RwTx) error {
if err := clearTables(ctx, db, tx, kv.CallFromIndex, kv.CallToIndex); err != nil {
return nil
}
return clearStageProgress(tx, stages.CallTraces)
})
}
func ResetTxLookup(tx kv.RwTx) error {
@ -285,12 +214,71 @@ func ResetTxLookup(tx kv.RwTx) error {
return nil
}
func ResetFinish(tx kv.RwTx) error {
if err := stages.SaveStageProgress(tx, stages.Finish, 0); err != nil {
return err
func ResetFinish(ctx context.Context, db kv.RwDB) error {
return db.Update(ctx, func(tx kv.RwTx) error {
return clearStageProgress(tx, stages.Finish)
})
}
func ResetHashState(ctx context.Context, db kv.RwDB) error {
return db.Update(ctx, func(tx kv.RwTx) error {
if err := clearTables(ctx, db, tx, kv.HashedAccounts, kv.HashedStorage, kv.ContractCode); err != nil {
return nil
}
return clearStageProgress(tx, stages.HashState)
})
}
func ResetIH(ctx context.Context, db kv.RwDB) error {
return db.Update(ctx, func(tx kv.RwTx) error {
if err := clearTables(ctx, db, tx, kv.TrieOfAccounts, kv.TrieOfStorage); err != nil {
return nil
}
return clearStageProgress(tx, stages.IntermediateHashes)
})
}
func warmup(ctx context.Context, db kv.RoDB, bucket string) {
for i := 0; i < 256; i++ {
prefix := []byte{byte(i)}
go func(perfix []byte) {
_ = db.View(ctx, func(tx kv.Tx) error {
return tx.ForEach(bucket, prefix, func(k, v []byte) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
return nil
})
})
}(prefix)
}
if err := stages.SaveStagePruneProgress(tx, stages.Finish, 0); err != nil {
return err
}
func clearTables(ctx context.Context, db kv.RoDB, tx kv.RwTx, tables ...string) error {
for _, tbl := range tables {
if err := clearTable(ctx, db, tx, tbl); err != nil {
return err
}
}
return nil
}
func clearTable(ctx context.Context, db kv.RoDB, tx kv.RwTx, table string) error {
warmup(ctx, db, table)
log.Info("Clear", "table", table)
return tx.ClearBucket(kv.HashedAccounts)
}
func clearStageProgress(tx kv.RwTx, stagesList ...stages.SyncStage) error {
for _, stage := range stagesList {
if err := stages.SaveStageProgress(tx, stage, 0); err != nil {
return err
}
if err := stages.SaveStagePruneProgress(tx, stage, 0); err != nil {
return err
}
}
return nil
}

View File

@ -18,7 +18,6 @@ import (
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/core/types/accounts"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/erigon/turbo/services"
"github.com/ledgerwatch/erigon/turbo/stages/headerdownload"
"github.com/ledgerwatch/erigon/turbo/trie"
@ -682,42 +681,6 @@ func unwindIntermediateHashesStageImpl(logPrefix string, u *UnwindState, s *Stag
return nil
}
func ResetHashState(tx kv.RwTx) error {
if err := tx.ClearBucket(kv.HashedAccounts); err != nil {
return err
}
if err := tx.ClearBucket(kv.HashedStorage); err != nil {
return err
}
if err := tx.ClearBucket(kv.ContractCode); err != nil {
return err
}
if err := stages.SaveStageProgress(tx, stages.HashState, 0); err != nil {
return err
}
if err := stages.SaveStagePruneProgress(tx, stages.HashState, 0); err != nil {
return err
}
return nil
}
func ResetIH(tx kv.RwTx) error {
if err := tx.ClearBucket(kv.TrieOfAccounts); err != nil {
return err
}
if err := tx.ClearBucket(kv.TrieOfStorage); err != nil {
return err
}
if err := stages.SaveStageProgress(tx, stages.IntermediateHashes, 0); err != nil {
return err
}
if err := stages.SaveStagePruneProgress(tx, stages.IntermediateHashes, 0); err != nil {
return err
}
return nil
}
func assertSubset(a, b uint16) {
if (a & b) != a { // a & b == a - checks whether a is subset of b
panic(fmt.Errorf("invariant 'is subset' failed: %b, %b", a, b))

View File

@ -114,7 +114,7 @@ var resetBlocks4 = Migration{
return err
}
if err := rawdbreset.ResetSenders(tx); err != nil {
if err := rawdbreset.ResetSenders(context.Background(), db, tx); err != nil {
return err
}