diff --git a/cmd/erigon-el/stages/stages.go b/cmd/erigon-el/stages/stages.go index df1d5d283..63642b745 100644 --- a/cmd/erigon-el/stages/stages.go +++ b/cmd/erigon-el/stages/stages.go @@ -47,7 +47,7 @@ func NewStagedSync( blockWriter *blockio.BlockWriter, ) (*stagedsync.Sync, error) { dirs := cfg.Dirs - blockRetire := snapshotsync.NewBlockRetire(1, dirs.Tmp, blockReader, db, snapDownloader, notifications.Events, logger) + blockRetire := snapshotsync.NewBlockRetire(1, dirs.Tmp, blockReader, blockWriter, db, snapDownloader, notifications.Events, logger) // During Import we don't want other services like header requests, body requests etc. to be running. // Hence we run it in the test mode. diff --git a/cmd/hack/tool/fromdb/tool.go b/cmd/hack/tool/fromdb/tool.go index 0a8d90871..edf18e616 100644 --- a/cmd/hack/tool/fromdb/tool.go +++ b/cmd/hack/tool/fromdb/tool.go @@ -36,6 +36,19 @@ func PruneMode(db kv.RoDB) (pm prune.Mode) { return } func TxsV3(db kv.RoDB) (enabled bool) { + if err := db.View(context.Background(), func(tx kv.Tx) error { + var err error + enabled, err = kvcfg.TransactionsV3.Enabled(tx) + if err != nil { + return err + } + return nil + }); err != nil { + panic(err) + } + return +} +func HistV3(db kv.RoDB) (enabled bool) { if err := db.View(context.Background(), func(tx kv.Tx) error { var err error enabled, err = kvcfg.HistoryV3.Enabled(tx) diff --git a/cmd/integration/commands/stages.go b/cmd/integration/commands/stages.go index d00ae44fa..4d4165344 100644 --- a/cmd/integration/commands/stages.go +++ b/cmd/integration/commands/stages.go @@ -868,7 +868,7 @@ func stageSenders(db kv.RwDB, ctx context.Context, logger log.Logger) error { var blockRetire *snapshotsync.BlockRetire if sn.Cfg().Enabled { - blockRetire = snapshotsync.NewBlockRetire(estimate.CompressSnapshot.Workers(), tmpdir, br, db, nil, nil, logger) + blockRetire = snapshotsync.NewBlockRetire(estimate.CompressSnapshot.Workers(), tmpdir, br, bw, db, nil, nil, logger) } pm, err := prune.Get(tx) diff --git a/cmd/state/commands/check_change_sets.go b/cmd/state/commands/check_change_sets.go index 852cc249d..03d8d111c 100644 --- a/cmd/state/commands/check_change_sets.go +++ b/cmd/state/commands/check_change_sets.go @@ -16,6 +16,7 @@ import ( "github.com/ledgerwatch/erigon-lib/kv" kv2 "github.com/ledgerwatch/erigon-lib/kv/mdbx" "github.com/ledgerwatch/erigon-lib/kv/temporal/historyv2" + "github.com/ledgerwatch/erigon/cmd/utils" "github.com/ledgerwatch/log/v3" "github.com/spf13/cobra" @@ -41,7 +42,7 @@ func init() { withSnapshotBlocks(checkChangeSetsCmd) checkChangeSetsCmd.Flags().StringVar(&historyfile, "historyfile", "", "path to the file where the changesets and history are expected to be. If omitted, the same as /erion/chaindata") checkChangeSetsCmd.Flags().BoolVar(&nocheck, "nocheck", false, "set to turn off the changeset checking and only execute transaction (for performance testing)") - checkChangeSetsCmd.Flags().BoolVar(&transactionsV3, "experimental.transactions.v3", false, "(this flag is in testing stage) Not recommended yet: Can't change this flag after node creation. New DB table for transactions allows keeping multiple branches of block bodies in the DB simultaneously") + checkChangeSetsCmd.Flags().BoolVar(&transactionsV3, utils.TransactionV3Flag.Name, utils.TransactionV3Flag.Value, utils.TransactionV3Flag.Usage) rootCmd.AddCommand(checkChangeSetsCmd) } diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go index aa1254834..10ebbc2da 100644 --- a/core/rawdb/accessors_chain.go +++ b/core/rawdb/accessors_chain.go @@ -1291,12 +1291,15 @@ func DeleteAncientBlocks(tx kv.RwTx, blockTo uint64, blocksDeleteLimit int) erro // Copying k because otherwise the same memory will be reused // for the next key and Delete below will end up deleting 1 more record than required kCopy := common.CopyBytes(k) - if err = tx.Delete(kv.Headers, kCopy); err != nil { + if err = tx.Delete(kv.Senders, kCopy); err != nil { return err } if err = tx.Delete(kv.BlockBody, kCopy); err != nil { return err } + if err = tx.Delete(kv.Headers, kCopy); err != nil { + return err + } } return nil diff --git a/core/rawdb/blockio/block_writer.go b/core/rawdb/blockio/block_writer.go index e789b53d6..e683c60b3 100644 --- a/core/rawdb/blockio/block_writer.go +++ b/core/rawdb/blockio/block_writer.go @@ -19,6 +19,10 @@ import ( "github.com/ledgerwatch/log/v3" ) +//Naming: +// Prune: delete old data +// Unwind: delete recent data + // BlockReader can read blocks from db and snapshots type BlockWriter struct { historyV3 bool @@ -114,6 +118,11 @@ func (w *BlockWriter) MakeBodiesNonCanonical(tx kv.RwTx, from uint64, deleteBodi return nil } + //if deleteBodies { + //if err := rawdb.MakeBodiesNonCanonical(tx, from, deleteBodies, ctx, logPrefix, logEvery); err != nil { + // return err + //} + //} if w.historyV3 { if err := rawdbv3.TxNums.Truncate(tx, from); err != nil { return err @@ -122,7 +131,7 @@ func (w *BlockWriter) MakeBodiesNonCanonical(tx kv.RwTx, from uint64, deleteBodi return nil } -func extractHeaders(k []byte, v []byte, next etl.ExtractNextFunc) error { +func extractHeaders(k []byte, _ []byte, next etl.ExtractNextFunc) error { // We only want to extract entries composed by Block Number + Header Hash if len(k) != 40 { return nil @@ -169,10 +178,10 @@ func (w *BlockWriter) ResetSenders(ctx context.Context, db kv.RoDB, tx kv.RwTx) return backup.ClearTables(ctx, db, tx, kv.Senders) } -// Prune - [1, to) old blocks after moving it to snapshots. +// PruneBlocks - [1, to) old blocks after moving it to snapshots. // keeps genesis in db // doesn't change sequences of kv.EthTx and kv.NonCanonicalTxs // doesn't delete Receipts, Senders, Canonical markers, TotalDifficulty -func (w *BlockWriter) Prune(ctx context.Context, tx kv.RwTx, blockTo uint64, blocksDeleteLimit int) error { +func (w *BlockWriter) PruneBlocks(ctx context.Context, tx kv.RwTx, blockTo uint64, blocksDeleteLimit int) error { return rawdb.DeleteAncientBlocks(tx, blockTo, blocksDeleteLimit) } diff --git a/eth/stagedsync/default_stages.go b/eth/stagedsync/default_stages.go index 7ee8152e1..c5d25bf76 100644 --- a/eth/stagedsync/default_stages.go +++ b/eth/stagedsync/default_stages.go @@ -368,7 +368,6 @@ var StateUnwindOrder = UnwindOrder{ var DefaultPruneOrder = PruneOrder{ stages.Finish, - stages.Snapshots, stages.TxLookup, stages.LogIndex, stages.StorageHistoryIndex, @@ -386,6 +385,7 @@ var DefaultPruneOrder = PruneOrder{ stages.Bodies, stages.BlockHashes, stages.Headers, + stages.Snapshots, } var MiningUnwindOrder = UnwindOrder{} // nothing to unwind in mining - because mining does not commit db changes diff --git a/eth/stagedsync/stage_headers.go b/eth/stagedsync/stage_headers.go index b51c17d12..74d59368c 100644 --- a/eth/stagedsync/stage_headers.go +++ b/eth/stagedsync/stage_headers.go @@ -711,7 +711,9 @@ func saveDownloadedPoSHeaders(tx kv.RwTx, cfg HeadersCfg, headerInserter *header logger.Info("PoS headers verified and saved", "requestId", cfg.hd.RequestId(), "fork head", lastValidHash) } - cfg.hd.HeadersCollector().Close() + if cfg.hd.HeadersCollector() != nil { + cfg.hd.HeadersCollector().Close() + } cfg.hd.SetHeadersCollector(nil) cfg.hd.SetPosStatus(headerdownload.Idle) } diff --git a/eth/stagedsync/stage_senders.go b/eth/stagedsync/stage_senders.go index b93819a86..fae2c617b 100644 --- a/eth/stagedsync/stage_senders.go +++ b/eth/stagedsync/stage_senders.go @@ -75,7 +75,6 @@ func SpawnRecoverSendersStage(cfg SendersCfg, s *StageState, u Unwinder, tx kv.R if cfg.blockRetire != nil && cfg.blockRetire.Snapshots() != nil && cfg.blockRetire.Snapshots().Cfg().Enabled && s.BlockNumber < cfg.blockRetire.Snapshots().BlocksAvailable() { s.BlockNumber = cfg.blockRetire.Snapshots().BlocksAvailable() } - txsV3Enabled := cfg.blockWriter.TxsV3Enabled() // allow stor senders for non-canonical blocks quitCh := ctx.Done() useExternalTx := tx != nil @@ -108,40 +107,7 @@ func SpawnRecoverSendersStage(cfg SendersCfg, s *StageState, u Unwinder, tx kv.R logEvery := time.NewTicker(30 * time.Second) defer logEvery.Stop() - canonicalC, err := tx.Cursor(kv.HeaderCanonical) - if err != nil { - return err - } - defer canonicalC.Close() - startFrom := s.BlockNumber + 1 - currentHeaderIdx := uint64(0) - canonical := make([]libcommon.Hash, to-s.BlockNumber) - - if !txsV3Enabled { - for k, v, err := canonicalC.Seek(hexutility.EncodeTs(startFrom)); k != nil; k, v, err = canonicalC.Next() { - if err != nil { - return err - } - if err := libcommon.Stopped(quitCh); err != nil { - return err - } - - if currentHeaderIdx >= to-s.BlockNumber { // if header stage is ehead of body stage - break - } - - copy(canonical[currentHeaderIdx][:], v) - currentHeaderIdx++ - - select { - default: - case <-logEvery.C: - logger.Info(fmt.Sprintf("[%s] Preload headers", logPrefix), "block_number", binary.BigEndian.Uint64(k)) - } - } - } - logger.Trace(fmt.Sprintf("[%s] Read canonical hashes", logPrefix), "amount", len(canonical)) jobs := make(chan *senderRecoveryJob, cfg.batchSize) out := make(chan *senderRecoveryJob, cfg.batchSize) @@ -214,14 +180,14 @@ func SpawnRecoverSendersStage(cfg SendersCfg, s *StageState, u Unwinder, tx kv.R return nil } - bodiesC, err := tx.Cursor(kv.BlockBody) + bodiesC, err := tx.Cursor(kv.HeaderCanonical) if err != nil { return err } defer bodiesC.Close() Loop: - for k, _, err := bodiesC.Seek(hexutility.EncodeTs(startFrom)); k != nil; k, _, err = bodiesC.Next() { + for k, v, err := bodiesC.Seek(hexutility.EncodeTs(startFrom)); k != nil; k, v, err = bodiesC.Next() { if err != nil { return err } @@ -229,32 +195,28 @@ Loop: return err } - blockNumber := binary.BigEndian.Uint64(k[:8]) - blockHash := libcommon.BytesToHash(k[8:]) + blockNumber := binary.BigEndian.Uint64(k) + blockHash := libcommon.BytesToHash(v) if blockNumber > to { break } + has, err := cfg.blockReader.HasSenders(ctx, tx, blockHash, blockNumber) + if err != nil { + return err + } + if has { + continue + } + var body *types.Body - if txsV3Enabled { - if body, err = cfg.blockReader.BodyWithTransactions(ctx, tx, blockHash, blockNumber); err != nil { - return err - } - if body == nil { - logger.Warn(fmt.Sprintf("[%s] blockReader.BodyWithTransactions can't find block", logPrefix), "num", blockNumber, "hash", blockHash) - continue - } - } else { - if canonical[blockNumber-s.BlockNumber-1] != blockHash { - // non-canonical case - continue - } - body = rawdb.ReadCanonicalBodyWithTransactions(tx, blockHash, blockNumber) - if body == nil { - logger.Warn(fmt.Sprintf("[%s] ReadCanonicalBodyWithTransactions can't find block", logPrefix), "num", blockNumber, "hash", blockHash) - continue - } + if body, err = cfg.blockReader.BodyWithTransactions(ctx, tx, blockHash, blockNumber); err != nil { + return err + } + if body == nil { + logger.Warn(fmt.Sprintf("[%s] ReadBodyWithTransactions can't find block", logPrefix), "num", blockNumber, "hash", blockHash) + continue } select { @@ -397,8 +359,6 @@ func UnwindSendersStage(s *UnwindState, tx kv.RwTx, cfg SendersCfg, ctx context. } func PruneSendersStage(s *PruneState, tx kv.RwTx, cfg SendersCfg, ctx context.Context) (err error) { - logEvery := time.NewTicker(logInterval) - defer logEvery.Stop() useExternalTx := tx != nil if !useExternalTx { tx, err = cfg.db.BeginRw(ctx) @@ -408,9 +368,11 @@ func PruneSendersStage(s *PruneState, tx kv.RwTx, cfg SendersCfg, ctx context.Co defer tx.Rollback() } sn := cfg.blockRetire.Snapshots() - if !(sn != nil && sn.Cfg().Enabled && sn.Cfg().Produce) && cfg.prune.TxIndex.Enabled() { + if sn.Cfg().Enabled { + // noop. in this case senders will be deleted by BlockRetire.PruneAncientBlocks after data-freezing. + } else if cfg.prune.TxIndex.Enabled() { to := cfg.prune.TxIndex.PruneTo(s.ForwardProgress) - if err = rawdb.PruneTable(tx, kv.Senders, to, ctx, 1_000); err != nil { + if err = rawdb.PruneTable(tx, kv.Senders, to, ctx, 100); err != nil { return err } } diff --git a/eth/stagedsync/stage_senders_test.go b/eth/stagedsync/stage_senders_test.go index 7bb2ab2f6..5fc03d731 100644 --- a/eth/stagedsync/stage_senders_test.go +++ b/eth/stagedsync/stage_senders_test.go @@ -130,7 +130,7 @@ func TestSenders(t *testing.T) { require.NoError(stages.SaveStageProgress(tx, stages.Bodies, 3)) - blockRetire := snapshotsync.NewBlockRetire(1, "", br, db, nil, nil, logger) + blockRetire := snapshotsync.NewBlockRetire(1, "", br, bw, db, nil, nil, logger) cfg := stagedsync.StageSendersCfg(db, params.TestChainConfig, false, "", prune.Mode{}, blockRetire, bw, br, nil) err = stagedsync.SpawnRecoverSendersStage(cfg, &stagedsync.StageState{ID: stages.Senders}, nil, tx, 3, m.Ctx, log.New()) require.NoError(err) diff --git a/eth/stagedsync/stage_snapshots.go b/eth/stagedsync/stage_snapshots.go index bb3eeaa64..e9c312631 100644 --- a/eth/stagedsync/stage_snapshots.go +++ b/eth/stagedsync/stage_snapshots.go @@ -14,7 +14,6 @@ import ( "github.com/ledgerwatch/erigon-lib/chain" libcommon "github.com/ledgerwatch/erigon-lib/common" - "github.com/ledgerwatch/erigon-lib/common/cmp" "github.com/ledgerwatch/erigon-lib/common/datadir" "github.com/ledgerwatch/erigon-lib/common/dbg" "github.com/ledgerwatch/erigon-lib/downloader/snaptype" @@ -269,23 +268,11 @@ func FillDBFromSnapshots(logPrefix string, ctx context.Context, tx kv.RwTx, return err } if historyV3 { - var toBlock uint64 - if sn != nil { - toBlock = sn.BlocksAvailable() - } - toBlock = cmp.Max(toBlock, progress) - _ = tx.ClearBucket(kv.MaxTxNum) - if err := rawdbv3.TxNums.WriteForGenesis(tx, 1); err != nil { - return err - } type IterBody interface { - IterateBodies(f func(blockNum, baseTxNum, txAmount uint64) error) error + IterateFrozenBodies(f func(blockNum, baseTxNum, txAmount uint64) error) error } - if err := blockReader.(IterBody).IterateBodies(func(blockNum, baseTxNum, txAmount uint64) error { - if blockNum == 0 || blockNum > toBlock { - return nil - } + if err := blockReader.(IterBody).IterateFrozenBodies(func(blockNum, baseTxNum, txAmount uint64) error { select { case <-ctx.Done(): return ctx.Err() @@ -302,6 +289,15 @@ func FillDBFromSnapshots(logPrefix string, ctx context.Context, tx kv.RwTx, }); err != nil { return fmt.Errorf("build txNum => blockNum mapping: %w", err) } + if blockReader.Snapshots().BlocksAvailable() > 0 { + if err := rawdb.AppendCanonicalTxNums(tx, blockReader.Snapshots().BlocksAvailable()+1); err != nil { + return err + } + } else { + if err := rawdb.AppendCanonicalTxNums(tx, 0); err != nil { + return err + } + } } if err := rawdb.WriteSnapshots(tx, sn.Files(), agg.Files()); err != nil { return err @@ -498,13 +494,14 @@ func SnapshotsPrune(s *PruneState, cfg SnapshotsCfg, ctx context.Context, tx kv. defer tx.Rollback() } - sn := cfg.blockRetire.Snapshots() - if sn != nil && sn.Cfg().Enabled && sn.Cfg().Produce { - br := cfg.blockRetire + br := cfg.blockRetire + sn := br.Snapshots() + if sn.Cfg().Enabled { if err := br.PruneAncientBlocks(tx, 100); err != nil { return err } - + } + if sn.Cfg().Enabled && sn.Cfg().Produce { //TODO: initialSync maybe save files progress here if cfg.agg.NeedSaveFilesListInDB() || br.NeedSaveFilesListInDB() { if err := rawdb.WriteSnapshots(tx, br.Snapshots().Files(), cfg.agg.Files()); err != nil { diff --git a/eth/stagedsync/stage_txlookup.go b/eth/stagedsync/stage_txlookup.go index 6d14e4bd3..64bc3cd52 100644 --- a/eth/stagedsync/stage_txlookup.go +++ b/eth/stagedsync/stage_txlookup.go @@ -117,14 +117,12 @@ func SpawnTxLookup(s *StageState, tx kv.RwTx, toBlock uint64, cfg TxLookupCfg, c // txnLookupTransform - [startKey, endKey) func txnLookupTransform(logPrefix string, tx kv.RwTx, blockFrom, blockTo uint64, ctx context.Context, cfg TxLookupCfg, logger log.Logger) (err error) { bigNum := new(big.Int) - txsV3Enabled := cfg.blockReader.TxsV3Enabled() - if txsV3Enabled { - panic("implement me. need iterate by kv.BlockID instead of kv.HeaderCanonical") - } - return etl.Transform(logPrefix, tx, kv.HeaderCanonical, kv.TxLookup, cfg.tmpdir, func(k, v []byte, next etl.ExtractNextFunc) error { blocknum, blockHash := binary.BigEndian.Uint64(k), libcommon.CastToHash(v) - body := rawdb.ReadCanonicalBodyWithTransactions(tx, blockHash, blocknum) + body, err := cfg.blockReader.BodyWithTransactions(ctx, tx, blockHash, blocknum) + if err != nil { + return err + } if body == nil { return fmt.Errorf("transform: empty block body %d, hash %x", blocknum, v) } @@ -236,10 +234,8 @@ func PruneTxLookup(s *PruneState, tx kv.RwTx, cfg TxLookupCfg, ctx context.Conte } else if blockSnapshots != nil && blockSnapshots.Cfg().Enabled { blockTo = snapshotsync.CanDeleteTo(s.ForwardProgress, blockSnapshots) } - - if !initialCycle { // limit time for pruning - blockTo = cmp.Min(blockTo, blockFrom+100) - } + // can't prune much here: because tx_lookup index has crypto-hashed-keys, and 1 block producing hundreds of deletes + blockTo = cmp.Min(blockTo, blockFrom+10) if blockFrom < blockTo { if err = deleteTxLookupRange(tx, logPrefix, blockFrom, blockTo, ctx, cfg, logger); err != nil { @@ -269,7 +265,10 @@ func PruneTxLookup(s *PruneState, tx kv.RwTx, cfg TxLookupCfg, ctx context.Conte func deleteTxLookupRange(tx kv.RwTx, logPrefix string, blockFrom, blockTo uint64, ctx context.Context, cfg TxLookupCfg, logger log.Logger) error { return etl.Transform(logPrefix, tx, kv.HeaderCanonical, kv.TxLookup, cfg.tmpdir, func(k, v []byte, next etl.ExtractNextFunc) error { blocknum, blockHash := binary.BigEndian.Uint64(k), libcommon.CastToHash(v) - body := rawdb.ReadCanonicalBodyWithTransactions(tx, blockHash, blocknum) + body, err := cfg.blockReader.BodyWithTransactions(ctx, tx, blockHash, blocknum) + if err != nil { + return err + } if body == nil { log.Debug("TxLookup pruning, empty block body", "height", blocknum) return nil diff --git a/metrics/exp/exp.go b/metrics/exp/exp.go index e4e5dbc0c..b66c2c854 100644 --- a/metrics/exp/exp.go +++ b/metrics/exp/exp.go @@ -17,7 +17,7 @@ import ( func Setup(address string) { http.HandleFunc("/debug/metrics/prometheus", func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Access-Control-Allow-Origin", "*") - metrics2.WritePrometheus(w, false) + metrics2.WritePrometheus(w, true) contentType := expfmt.Negotiate(r.Header) enc := expfmt.NewEncoder(w, contentType) mf, err := prometheus.DefaultGatherer.Gather() diff --git a/turbo/app/snapshots_cmd.go b/turbo/app/snapshots_cmd.go index d145c4111..b4e62b048 100644 --- a/turbo/app/snapshots_cmd.go +++ b/turbo/app/snapshots_cmd.go @@ -28,6 +28,7 @@ import ( "github.com/ledgerwatch/erigon/cmd/hack/tool/fromdb" "github.com/ledgerwatch/erigon/cmd/utils" "github.com/ledgerwatch/erigon/core/rawdb" + "github.com/ledgerwatch/erigon/core/rawdb/blockio" "github.com/ledgerwatch/erigon/eth/ethconfig" "github.com/ledgerwatch/erigon/eth/ethconfig/estimate" "github.com/ledgerwatch/erigon/eth/stagedsync/stages" @@ -415,8 +416,9 @@ func doRetireCommand(cliCtx *cli.Context) error { return err } blockReader := snapshotsync.NewBlockReader(snapshots, fromdb.TxsV3(db)) + blockWriter := blockio.NewBlockWriter(fromdb.HistV3(db), fromdb.TxsV3(db)) - br := snapshotsync.NewBlockRetire(estimate.CompressSnapshot.Workers(), dirs.Tmp, blockReader, db, nil, nil, logger) + br := snapshotsync.NewBlockRetire(estimate.CompressSnapshot.Workers(), dirs.Tmp, blockReader, blockWriter, db, nil, nil, logger) agg, err := libstate.NewAggregatorV3(ctx, dirs.SnapHistory, dirs.Tmp, ethconfig.HistoryV3AggregationStep, db, logger) if err != nil { return err diff --git a/turbo/snapshotsync/block_reader.go b/turbo/snapshotsync/block_reader.go index 83e6d4618..d468f157e 100644 --- a/turbo/snapshotsync/block_reader.go +++ b/turbo/snapshotsync/block_reader.go @@ -788,7 +788,7 @@ func (r *BlockReader) FirstTxNumNotInSnapshots() uint64 { return lastTxnID } -func (r *BlockReader) IterateBodies(f func(blockNum, baseTxNum, txAmount uint64) error) error { +func (r *BlockReader) IterateFrozenBodies(f func(blockNum, baseTxNum, txAmount uint64) error) error { view := r.sn.View() defer view.Close() diff --git a/turbo/snapshotsync/block_snapshots.go b/turbo/snapshotsync/block_snapshots.go index 270ca04ac..b70346b15 100644 --- a/turbo/snapshotsync/block_snapshots.go +++ b/turbo/snapshotsync/block_snapshots.go @@ -36,6 +36,7 @@ import ( types2 "github.com/ledgerwatch/erigon-lib/types" "github.com/ledgerwatch/erigon/cmd/hack/tool/fromdb" "github.com/ledgerwatch/erigon/core/rawdb" + "github.com/ledgerwatch/erigon/core/rawdb/blockio" "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/crypto" "github.com/ledgerwatch/erigon/crypto/cryptopool" @@ -1008,10 +1009,11 @@ type BlockRetire struct { notifier DBEventNotifier logger log.Logger blockReader services.FullBlockReader + blockWriter *blockio.BlockWriter } -func NewBlockRetire(workers int, tmpDir string, blockReader services.FullBlockReader, db kv.RoDB, downloader proto_downloader.DownloaderClient, notifier DBEventNotifier, logger log.Logger) *BlockRetire { - return &BlockRetire{workers: workers, tmpDir: tmpDir, blockReader: blockReader, db: db, downloader: downloader, notifier: notifier, logger: logger} +func NewBlockRetire(workers int, tmpDir string, blockReader services.FullBlockReader, blockWriter *blockio.BlockWriter, db kv.RoDB, downloader proto_downloader.DownloaderClient, notifier DBEventNotifier, logger log.Logger) *BlockRetire { + return &BlockRetire{workers: workers, tmpDir: tmpDir, blockReader: blockReader, blockWriter: blockWriter, db: db, downloader: downloader, notifier: notifier, logger: logger} } func (br *BlockRetire) Snapshots() *RoSnapshots { return br.blockReader.Snapshots().(*RoSnapshots) } func (br *BlockRetire) NeedSaveFilesListInDB() bool { @@ -1122,12 +1124,9 @@ func (br *BlockRetire) PruneAncientBlocks(tx kv.RwTx, limit int) error { return err } canDeleteTo := CanDeleteTo(currentProgress, blockSnapshots) - if err := rawdb.DeleteAncientBlocks(tx, canDeleteTo, limit); err != nil { + if err := br.blockWriter.PruneBlocks(context.Background(), tx, canDeleteTo, limit); err != nil { return nil } - if err := rawdb.PruneTable(tx, kv.Senders, canDeleteTo, context.Background(), limit); err != nil { - return err - } return nil } diff --git a/turbo/stages/mock_sentry.go b/turbo/stages/mock_sentry.go index 494f846d6..0b67040ff 100644 --- a/turbo/stages/mock_sentry.go +++ b/turbo/stages/mock_sentry.go @@ -370,7 +370,7 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK var snapshotsDownloader proto_downloader.DownloaderClient - blockRetire := snapshotsync.NewBlockRetire(1, dirs.Tmp, blockReader, mock.DB, snapshotsDownloader, mock.Notifications.Events, logger) + blockRetire := snapshotsync.NewBlockRetire(1, dirs.Tmp, blockReader, blockWriter, mock.DB, snapshotsDownloader, mock.Notifications.Events, logger) mock.Sync = stagedsync.New( stagedsync.DefaultStages(mock.Ctx, stagedsync.StageSnapshotsCfg(mock.DB, *mock.ChainConfig, dirs, blockRetire, snapshotsDownloader, blockReader, mock.Notifications.Events, mock.HistoryV3, mock.agg), diff --git a/turbo/stages/stageloop.go b/turbo/stages/stageloop.go index cb67819be..91b1471f8 100644 --- a/turbo/stages/stageloop.go +++ b/turbo/stages/stageloop.go @@ -406,7 +406,7 @@ func NewDefaultStages(ctx context.Context, ) []*stagedsync.Stage { dirs := cfg.Dirs blockWriter := blockio.NewBlockWriter(cfg.HistoryV3, cfg.TransactionsV3) - blockRetire := snapshotsync.NewBlockRetire(1, dirs.Tmp, blockReader, db, snapDownloader, notifications.Events, logger) + blockRetire := snapshotsync.NewBlockRetire(1, dirs.Tmp, blockReader, blockWriter, db, snapDownloader, notifications.Events, logger) // During Import we don't want other services like header requests, body requests etc. to be running. // Hence we run it in the test mode.