From 6fd002eb5af7a519d768a22dd2fede583508bc4a Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Thu, 20 Jan 2022 12:01:02 +0700 Subject: [PATCH] Snapshot: move old blocks to snapshots (non-async version) (#3309) * save * save --- cmd/integration/commands/stages.go | 2 +- cmd/rpcdaemon/cli/config.go | 2 +- cmd/utils/flags.go | 7 +++ core/rawdb/accessors_chain.go | 60 ++++++++++++++++++++++ eth/backend.go | 2 +- eth/ethconfig/config.go | 4 +- eth/stagedsync/stage_senders.go | 32 ++++++++++-- turbo/cli/default_flags.go | 1 + turbo/cli/snapshots.go | 17 +++--- turbo/snapshotsync/block_snapshots.go | 44 ++++++++-------- turbo/snapshotsync/block_snapshots_test.go | 22 ++++---- turbo/snapshotsync/wrapdb.go | 4 +- turbo/stages/stageloop.go | 2 +- 13 files changed, 150 insertions(+), 49 deletions(-) diff --git a/cmd/integration/commands/stages.go b/cmd/integration/commands/stages.go index f6e9773b6..936a59fcd 100644 --- a/cmd/integration/commands/stages.go +++ b/cmd/integration/commands/stages.go @@ -1041,7 +1041,7 @@ func allSnapshots(cc *params.ChainConfig) *snapshotsync.AllSnapshots { Enabled: true, Dir: path.Join(datadir, "snapshots"), } - _allSnapshotsSingleton = snapshotsync.NewAllSnapshots(snapshotCfg.Dir, snapshothashes.KnownConfig(cc.ChainName)) + _allSnapshotsSingleton = snapshotsync.NewAllSnapshots(snapshotCfg, snapshothashes.KnownConfig(cc.ChainName)) if err := _allSnapshotsSingleton.ReopenSegments(); err != nil { panic(err) } diff --git a/cmd/rpcdaemon/cli/config.go b/cmd/rpcdaemon/cli/config.go index 171a9021d..dac024046 100644 --- a/cmd/rpcdaemon/cli/config.go +++ b/cmd/rpcdaemon/cli/config.go @@ -278,7 +278,7 @@ func RemoteServices(ctx context.Context, cfg Flags, logger log.Logger, rootCance return nil, nil, nil, nil, nil, nil, fmt.Errorf("chain config not found in db. Need start erigon at least once on this db") } - allSnapshots := snapshotsync.NewAllSnapshots(cfg.Snapshot.Dir, snapshothashes.KnownConfig(cc.ChainName)) + allSnapshots := snapshotsync.NewAllSnapshots(cfg.Snapshot, snapshothashes.KnownConfig(cc.ChainName)) if err := allSnapshots.ReopenSegments(); err != nil { return nil, nil, nil, nil, nil, nil, err } diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index dcc94fcf0..6f64cf51f 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -547,6 +547,10 @@ var ( Name: "experimental.snapshot", Usage: "Enabling experimental snapshot sync", } + SnapshotRetireFlag = cli.BoolFlag{ + Name: "experimental.snapshot.retire", + Usage: "Delete(!) old blocks from DB, by move them to snapshots", + } HealthCheckFlag = cli.BoolFlag{ Name: "healthcheck", @@ -1271,6 +1275,9 @@ func SetEthConfig(ctx *cli.Context, nodeConfig *node.Config, cfg *ethconfig.Conf cfg.Snapshot.Enabled = true cfg.Snapshot.Dir = path.Join(nodeConfig.DataDir, "snapshots") } + if ctx.GlobalBool(SnapshotRetireFlag.Name) { + cfg.Snapshot.RetireEnabled = true + } CheckExclusive(ctx, MinerSigningKeyFileFlag, MinerEtherbaseFlag) setEtherbase(ctx, cfg) diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go index f63069fc9..d5f149162 100644 --- a/core/rawdb/accessors_chain.go +++ b/core/rawdb/accessors_chain.go @@ -482,6 +482,22 @@ func ResetSequence(tx kv.RwTx, bucket string, newValue uint64) error { return nil } +func ReadBodyForStorageByKey(db kv.Getter, k []byte) (*types.BodyForStorage, error) { + bodyRlp, err := db.GetOne(kv.BlockBody, k) + if err != nil { + return nil, err + } + if len(bodyRlp) == 0 { + return nil, nil + } + bodyForStorage := new(types.BodyForStorage) + if err := rlp.DecodeBytes(bodyRlp, bodyForStorage); err != nil { + return nil, err + } + + return bodyForStorage, nil +} + func ReadBody(db kv.Getter, hash common.Hash, number uint64) (*types.Body, uint64, uint32) { data := ReadStorageBodyRLP(db, hash, number) if len(data) == 0 { @@ -1022,6 +1038,50 @@ func WriteBlock(db kv.RwTx, block *types.Block) error { return nil } +// DeleteAncientBlocks - delete old block after moving it to snapshots. [from, to) +func DeleteAncientBlocks(db kv.RwTx, blockFrom, blockTo uint64) error { + //doesn't delete Receipts - because Receipts are not in snapshots yet + + for n := blockFrom; n < blockTo; n++ { + canonicalHash, err := ReadCanonicalHash(db, n) + if err != nil { + return err + } + if err := db.ForPrefix(kv.Headers, dbutils.EncodeBlockNumber(n), func(k, v []byte) error { + isCanonical := bytes.Equal(k[8:], canonicalHash[:]) + if err := db.Delete(kv.Headers, k, nil); err != nil { + return err + } + b, err := ReadBodyForStorageByKey(db, k) + if err != nil { + return err + } + txIDBytes := make([]byte, 8) + for txID := b.BaseTxId; txID < b.BaseTxId+uint64(b.TxAmount); txID++ { + binary.BigEndian.PutUint64(txIDBytes, txID) + bucket := kv.EthTx + if !isCanonical { + bucket = kv.NonCanonicalTxs + } + if err := db.Delete(bucket, txIDBytes, nil); err != nil { + return err + } + } + if err := db.Delete(kv.BlockBody, k, nil); err != nil { + return err + } + if err := db.Delete(kv.Senders, k, nil); err != nil { + return err + } + return nil + }); err != nil { + return err + } + } + + return nil +} + // DeleteBlock removes all block data associated with a hash. func DeleteBlock(db kv.RwTx, hash common.Hash, number uint64) error { if err := DeleteReceipts(db, number); err != nil { diff --git a/eth/backend.go b/eth/backend.go index 4b399ece0..07780e306 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -337,7 +337,7 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere return nil, err } - allSnapshots := snapshotsync.NewAllSnapshots(config.Snapshot.Dir, snConfig) + allSnapshots := snapshotsync.NewAllSnapshots(config.Snapshot, snConfig) if err != nil { return nil, err } diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index 74de615fd..ed6328226 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -18,7 +18,6 @@ package ethconfig import ( - "github.com/ledgerwatch/erigon/consensus/parlia" "math/big" "os" "os/user" @@ -26,6 +25,8 @@ import ( "runtime" "time" + "github.com/ledgerwatch/erigon/consensus/parlia" + "github.com/c2h5oh/datasize" "github.com/davecgh/go-spew/spew" "github.com/ledgerwatch/erigon/consensus/aura" @@ -118,6 +119,7 @@ func init() { type Snapshot struct { Enabled bool + RetireEnabled bool Dir string ChainSnapshotConfig *snapshothashes.Config } diff --git a/eth/stagedsync/stage_senders.go b/eth/stagedsync/stage_senders.go index d11d3db89..7465273fc 100644 --- a/eth/stagedsync/stage_senders.go +++ b/eth/stagedsync/stage_senders.go @@ -368,13 +368,35 @@ func PruneSendersStage(s *PruneState, tx kv.RwTx, cfg SendersCfg, ctx context.Co } defer tx.Rollback() } - if !cfg.prune.TxIndex.Enabled() { - return nil - } - if err = PruneTable(tx, kv.Senders, s.LogPrefix(), to, logEvery, ctx); err != nil { - return err + + if cfg.snapshots != nil && cfg.snapshots.Cfg().RetireEnabled { + if err := cfg.snapshots.EnsureExpectedBlocksAreAvailable(); err != nil { + return err + } + blockFrom := cfg.snapshots.BlocksAvailable() + 1 + blockTo := s.ForwardProgress - params.FullImmutabilityThreshold + if blockTo-blockFrom > 10_000 { + // in future we will do it in background + if err := snapshotsync.DumpBlocks(ctx, blockFrom, blockTo, snapshotsync.DEFAULT_SEGMENT_SIZE, cfg.tmpdir, cfg.snapshots.Dir(), cfg.db, 1); err != nil { + return err + } + if err := cfg.snapshots.ReopenSegments(); err != nil { + return err + } + if err := cfg.snapshots.ReopenIndices(); err != nil { + return err + } + if err := rawdb.DeleteAncientBlocks(tx, blockFrom, blockTo); err != nil { + return nil + } + } } + if cfg.prune.TxIndex.Enabled() { + if err = PruneTable(tx, kv.Senders, s.LogPrefix(), to, logEvery, ctx); err != nil { + return err + } + } if !useExternalTx { if err = tx.Commit(); err != nil { return err diff --git a/turbo/cli/default_flags.go b/turbo/cli/default_flags.go index b72266d40..e53806c77 100644 --- a/turbo/cli/default_flags.go +++ b/turbo/cli/default_flags.go @@ -48,6 +48,7 @@ var DefaultFlags = []cli.Flag{ SyncLoopThrottleFlag, BadBlockFlag, utils.SnapshotSyncFlag, + utils.SnapshotRetireFlag, utils.ListenPortFlag, utils.NATFlag, utils.NoDiscoverFlag, diff --git a/turbo/cli/snapshots.go b/turbo/cli/snapshots.go index 06376641c..d7f5bace4 100644 --- a/turbo/cli/snapshots.go +++ b/turbo/cli/snapshots.go @@ -17,6 +17,7 @@ import ( "github.com/ledgerwatch/erigon/cmd/hack/tool" "github.com/ledgerwatch/erigon/cmd/utils" "github.com/ledgerwatch/erigon/core/rawdb" + "github.com/ledgerwatch/erigon/eth/ethconfig" "github.com/ledgerwatch/erigon/internal/debug" "github.com/ledgerwatch/erigon/params" "github.com/ledgerwatch/erigon/turbo/snapshotsync" @@ -70,7 +71,7 @@ var ( SnapshotSegmentSizeFlag = cli.Uint64Flag{ Name: "segment.size", Usage: "Amount of blocks in each segment", - Value: 500_000, + Value: snapshotsync.DEFAULT_SEGMENT_SIZE, } SnapshotRebuildFlag = cli.BoolFlag{ Name: "rebuild", @@ -91,7 +92,8 @@ func doIndicesCommand(cliCtx *cli.Context) error { defer chainDB.Close() if rebuild { - if err := rebuildIndices(ctx, chainDB, snapshotDir, tmpDir); err != nil { + cfg := ethconfig.Snapshot{Dir: snapshotDir, RetireEnabled: true, Enabled: true} + if err := rebuildIndices(ctx, chainDB, cfg, tmpDir); err != nil { log.Error("Error", "err", err) } } @@ -121,17 +123,17 @@ func doSnapshotCommand(cliCtx *cli.Context) error { return nil } -func rebuildIndices(ctx context.Context, chainDB kv.RoDB, snapshotDir, tmpDir string) error { +func rebuildIndices(ctx context.Context, chainDB kv.RoDB, cfg ethconfig.Snapshot, tmpDir string) error { chainConfig := tool.ChainConfigFromDB(chainDB) chainID, _ := uint256.FromBig(chainConfig.ChainID) _ = chainID - _ = os.MkdirAll(snapshotDir, 0744) + _ = os.MkdirAll(cfg.Dir, 0744) - allSnapshots := snapshotsync.NewAllSnapshots(snapshotDir, snapshothashes.KnownConfig(chainConfig.ChainName)) + allSnapshots := snapshotsync.NewAllSnapshots(cfg, snapshothashes.KnownConfig(chainConfig.ChainName)) if err := allSnapshots.ReopenSegments(); err != nil { return err } - idxFilesList, err := snapshotsync.IdxFiles(snapshotDir) + idxFilesList, err := snapshotsync.IdxFiles(cfg.Dir) if err != nil { return err } @@ -202,7 +204,8 @@ func checkBlockSnapshot(chaindata string) error { chainID, _ := uint256.FromBig(chainConfig.ChainID) _ = chainID - snapshots := snapshotsync.NewAllSnapshots(path.Join(dataDir, "snapshots"), snapshothashes.KnownConfig(chainConfig.ChainName)) + cfg := ethconfig.Snapshot{Dir: path.Join(dataDir, "snapshots"), Enabled: true, RetireEnabled: true} + snapshots := snapshotsync.NewAllSnapshots(cfg, snapshothashes.KnownConfig(chainConfig.ChainName)) snapshots.ReopenSegments() snapshots.ReopenIndices() //if err := snapshots.BuildIndices(context.Background(), *chainID); err != nil { diff --git a/turbo/snapshotsync/block_snapshots.go b/turbo/snapshotsync/block_snapshots.go index e256fd854..ce60b8dd4 100644 --- a/turbo/snapshotsync/block_snapshots.go +++ b/turbo/snapshotsync/block_snapshots.go @@ -28,6 +28,7 @@ import ( "github.com/ledgerwatch/erigon/common/dbutils" "github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/erigon/core/types" + "github.com/ledgerwatch/erigon/eth/ethconfig" "github.com/ledgerwatch/erigon/rlp" "github.com/ledgerwatch/erigon/turbo/snapshotsync/snapshothashes" "github.com/ledgerwatch/log/v3" @@ -64,21 +65,12 @@ var ( ErrInvalidCompressedFileName = fmt.Errorf("invalid compressed file name") ) -func FileName(from, to uint64, name SnapshotType) string { - return fmt.Sprintf("v1-%06d-%06d-%s", from/1_000, to/1_000, name) -} - -func SegmentFileName(from, to uint64, name SnapshotType) string { - return FileName(from, to, name) + ".seg" -} - -func TmpFileName(from, to uint64, name SnapshotType) string { - return FileName(from, to, name) + ".dat" -} - -func IdxFileName(from, to uint64, name SnapshotType) string { - return FileName(from, to, name) + ".idx" +func FileName(from, to uint64, t SnapshotType) string { + return fmt.Sprintf("v1-%06d-%06d-%s", from/1_000, to/1_000, t) } +func SegmentFileName(from, to uint64, t SnapshotType) string { return FileName(from, to, t) + ".seg" } +func DatFileName(from, to uint64, t SnapshotType) string { return FileName(from, to, t) + ".dat" } +func IdxFileName(from, to uint64, t SnapshotType) string { return FileName(from, to, t) + ".idx" } func (s BlocksSnapshot) Has(block uint64) bool { return block >= s.From && block < s.To } @@ -89,7 +81,8 @@ type AllSnapshots struct { segmentsAvailable uint64 idxAvailable uint64 blocks []*BlocksSnapshot - cfg *snapshothashes.Config + chainSnapshotCfg *snapshothashes.Config + cfg ethconfig.Snapshot } // NewAllSnapshots - opens all snapshots. But to simplify everything: @@ -97,14 +90,16 @@ type AllSnapshots struct { // - all snapshots of given blocks range must exist - to make this blocks range available // - gaps are not allowed // - segment have [from:to) semantic -func NewAllSnapshots(dir string, cfg *snapshothashes.Config) *AllSnapshots { - if err := os.MkdirAll(dir, 0744); err != nil { +func NewAllSnapshots(cfg ethconfig.Snapshot, snCfg *snapshothashes.Config) *AllSnapshots { + if err := os.MkdirAll(cfg.Dir, 0744); err != nil { panic(err) } - return &AllSnapshots{dir: dir, cfg: cfg} + return &AllSnapshots{dir: cfg.Dir, chainSnapshotCfg: snCfg, cfg: cfg} } -func (s *AllSnapshots) ChainSnapshotConfig() *snapshothashes.Config { return s.cfg } +func (s *AllSnapshots) ChainSnapshotConfig() *snapshothashes.Config { return s.chainSnapshotCfg } +func (s *AllSnapshots) Cfg() ethconfig.Snapshot { return s.cfg } +func (s *AllSnapshots) Dir() string { return s.dir } func (s *AllSnapshots) AllSegmentsAvailable() bool { return s.allSegmentsAvailable } func (s *AllSnapshots) SetAllSegmentsAvailable(v bool) { s.allSegmentsAvailable = v } func (s *AllSnapshots) BlocksAvailable() uint64 { return s.segmentsAvailable } @@ -112,6 +107,13 @@ func (s *AllSnapshots) AllIdxAvailable() bool { return s.a func (s *AllSnapshots) SetAllIdxAvailable(v bool) { s.allIdxAvailable = v } func (s *AllSnapshots) IndicesAvailable() uint64 { return s.idxAvailable } +func (s *AllSnapshots) EnsureExpectedBlocksAreAvailable() error { + if s.BlocksAvailable() < s.ChainSnapshotConfig().ExpectBlocks { + return fmt.Errorf("app must wait until all expected snapshots are available. Expected: %d, Available: %d", s.ChainSnapshotConfig().ExpectBlocks, s.BlocksAvailable()) + } + return nil +} + func (s *AllSnapshots) SegmentsAvailability() (headers, bodies, txs uint64, err error) { if headers, err = latestSegment(s.dir, Headers); err != nil { return @@ -213,7 +215,7 @@ func (s *AllSnapshots) ReopenSegments() error { if to == prevTo { continue } - if from > s.cfg.ExpectBlocks { + if from > s.chainSnapshotCfg.ExpectBlocks { log.Debug("[open snapshots] skip snapshot because node expect less blocks in snapshots", "file", f) continue } @@ -500,6 +502,8 @@ func ParseFileName(name, expectedExt string) (from, to uint64, snapshotType Snap return from * 1_000, to * 1_000, snapshotType, nil } +const DEFAULT_SEGMENT_SIZE = 500_000 + func DumpBlocks(ctx context.Context, blockFrom, blockTo, blocksPerFile uint64, tmpDir, snapshotDir string, chainDB kv.RoDB, workers int) error { for i := blockFrom; i < blockTo; i += blocksPerFile { if err := dumpBlocksRange(ctx, i, i+blocksPerFile, tmpDir, snapshotDir, chainDB, workers); err != nil { diff --git a/turbo/snapshotsync/block_snapshots_test.go b/turbo/snapshotsync/block_snapshots_test.go index 014fcb936..af9e88bbc 100644 --- a/turbo/snapshotsync/block_snapshots_test.go +++ b/turbo/snapshotsync/block_snapshots_test.go @@ -8,6 +8,7 @@ import ( "github.com/ledgerwatch/erigon-lib/compress" "github.com/ledgerwatch/erigon-lib/recsplit" "github.com/ledgerwatch/erigon/common/math" + "github.com/ledgerwatch/erigon/eth/ethconfig" "github.com/ledgerwatch/erigon/params/networkname" "github.com/ledgerwatch/erigon/turbo/snapshotsync/snapshothashes" "github.com/stretchr/testify/require" @@ -15,8 +16,9 @@ import ( func TestOpenAllSnapshot(t *testing.T) { dir, require := t.TempDir(), require.New(t) - cfg := snapshothashes.KnownConfig(networkname.MainnetChainName) - cfg.ExpectBlocks = math.MaxUint64 + chainSnapshotCfg := snapshothashes.KnownConfig(networkname.MainnetChainName) + chainSnapshotCfg.ExpectBlocks = math.MaxUint64 + cfg := ethconfig.Snapshot{Dir: dir, Enabled: true} createFile := func(from, to uint64, name SnapshotType) { c, err := compress.NewCompressor(context.Background(), "test", path.Join(dir, SegmentFileName(from, to, name)), dir, 100, 1) require.NoError(err) @@ -38,7 +40,7 @@ func TestOpenAllSnapshot(t *testing.T) { err = idx.Build() require.NoError(err) } - s := NewAllSnapshots(dir, cfg) + s := NewAllSnapshots(cfg, chainSnapshotCfg) defer s.Close() err := s.ReopenSegments() require.NoError(err) @@ -46,14 +48,14 @@ func TestOpenAllSnapshot(t *testing.T) { s.Close() createFile(500_000, 1_000_000, Bodies) - s = NewAllSnapshots(dir, cfg) + s = NewAllSnapshots(cfg, chainSnapshotCfg) defer s.Close() require.Equal(0, len(s.blocks)) //because, no headers and transactions snapshot files are created s.Close() createFile(500_000, 1_000_000, Headers) createFile(500_000, 1_000_000, Transactions) - s = NewAllSnapshots(dir, cfg) + s = NewAllSnapshots(cfg, chainSnapshotCfg) err = s.ReopenSegments() require.Error(err) require.Equal(0, len(s.blocks)) //because, no gaps are allowed (expect snapshots from block 0) @@ -62,7 +64,7 @@ func TestOpenAllSnapshot(t *testing.T) { createFile(0, 500_000, Bodies) createFile(0, 500_000, Headers) createFile(0, 500_000, Transactions) - s = NewAllSnapshots(dir, cfg) + s = NewAllSnapshots(cfg, chainSnapshotCfg) err = s.ReopenSegments() require.NoError(err) defer s.Close() @@ -80,8 +82,8 @@ func TestOpenAllSnapshot(t *testing.T) { require.False(ok) // user must be able to limit amount of blocks which read from snapshot - cfg.ExpectBlocks = 500_000 - 1 - s = NewAllSnapshots(dir, cfg) + chainSnapshotCfg.ExpectBlocks = 500_000 - 1 + s = NewAllSnapshots(cfg, chainSnapshotCfg) err = s.ReopenSegments() require.NoError(err) defer s.Close() @@ -90,8 +92,8 @@ func TestOpenAllSnapshot(t *testing.T) { createFile(500_000, 900_000, Headers) createFile(500_000, 900_000, Bodies) createFile(500_000, 900_000, Transactions) - cfg.ExpectBlocks = math.MaxUint64 - s = NewAllSnapshots(dir, cfg) + chainSnapshotCfg.ExpectBlocks = math.MaxUint64 + s = NewAllSnapshots(cfg, chainSnapshotCfg) defer s.Close() err = s.ReopenSegments() require.Error(err) diff --git a/turbo/snapshotsync/wrapdb.go b/turbo/snapshotsync/wrapdb.go index e13442ab0..819ebcf9f 100644 --- a/turbo/snapshotsync/wrapdb.go +++ b/turbo/snapshotsync/wrapdb.go @@ -35,9 +35,9 @@ func WrapBySnapshotsFromDownloader(db kv.RwDB, snapshots map[snapshotsync.Snapsh snKV := snapshotdb.NewSnapshotKV().DB(db) for k, v := range snapshots { log.Info("Wrap db by", "snapshot", k.String(), "dir", v.Dbpath) - cfg := BucketConfigs[k] + chainSnapshotCfg := BucketConfigs[k] snapshotKV, err := kv2.NewMDBX(log.New()).Readonly().Path(v.Dbpath).WithTablessCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { - return cfg + return chainSnapshotCfg }).Open() if err != nil { diff --git a/turbo/stages/stageloop.go b/turbo/stages/stageloop.go index fb11d85b0..b9b40da09 100644 --- a/turbo/stages/stageloop.go +++ b/turbo/stages/stageloop.go @@ -257,7 +257,7 @@ func NewStagedSync( var blockReader interfaces.FullBlockReader var allSnapshots *snapshotsync.AllSnapshots if cfg.Snapshot.Enabled { - allSnapshots = snapshotsync.NewAllSnapshots(cfg.Snapshot.Dir, snapshothashes.KnownConfig(controlServer.ChainConfig.ChainName)) + allSnapshots = snapshotsync.NewAllSnapshots(cfg.Snapshot, snapshothashes.KnownConfig(controlServer.ChainConfig.ChainName)) blockReader = snapshotsync.NewBlockReaderWithSnapshots(allSnapshots) } else { blockReader = snapshotsync.NewBlockReader()