From 94fd30c5eb10e248e472f9e94f34f1d32848c2b9 Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Mon, 2 Jan 2023 12:26:56 +0700 Subject: [PATCH] "erigon snapshots retire" to save progress (#6485) --- cmd/downloader/readme.md | 2 +- turbo/app/snapshots.go | 152 ++++++--------------------------------- 2 files changed, 22 insertions(+), 132 deletions(-) diff --git a/cmd/downloader/readme.md b/cmd/downloader/readme.md index 5afc7aaf7..14fcfd3a1 100644 --- a/cmd/downloader/readme.md +++ b/cmd/downloader/readme.md @@ -41,7 +41,7 @@ Flag `--snapshots` is compatible with `--prune` flag # Create new snapshots (can change snapshot size by: --from=0 --to=1_000_000 --segment.size=500_000) # It will dump blocks from Database to .seg files: -erigon snapshots create --datadir= +erigon snapshots retire --datadir= # Create .torrent files (Downloader will seed automatically all .torrent files) # output format is compatible with https://github.com/ledgerwatch/erigon-snapshot diff --git a/turbo/app/snapshots.go b/turbo/app/snapshots.go index 487f64aef..1fe2dc706 100644 --- a/turbo/app/snapshots.go +++ b/turbo/app/snapshots.go @@ -2,7 +2,6 @@ package app import ( "bufio" - "context" "encoding/binary" "errors" "fmt" @@ -34,14 +33,11 @@ import ( "github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/erigon/eth/ethconfig" "github.com/ledgerwatch/erigon/eth/ethconfig/estimate" - "github.com/ledgerwatch/erigon/params" "github.com/ledgerwatch/erigon/turbo/debug" "github.com/ledgerwatch/erigon/turbo/logging" "github.com/ledgerwatch/erigon/turbo/snapshotsync" ) -const ASSERT = false - func joinFlags(lists ...[]cli.Flag) (res []cli.Flag) { for _, list := range lists { res = append(res, list...) @@ -53,18 +49,6 @@ var snapshotCommand = cli.Command{ Name: "snapshots", Description: `Managing snapshots (historical data partitions)`, Subcommands: []*cli.Command{ - { - Name: "create", - Action: doSnapshotCommand, - Usage: "Create snapshots for given range of blocks", - Before: func(ctx *cli.Context) error { return debug.Setup(ctx) }, - Flags: joinFlags([]cli.Flag{ - &utils.DataDirFlag, - &SnapshotFromFlag, - &SnapshotToFlag, - &SnapshotSegmentSizeFlag, - }, debug.Flags, logging.Flags), - }, { Name: "index", Action: doIndicesCommand, @@ -208,25 +192,32 @@ func doRam(cliCtx *cli.Context) error { return nil } func doIndicesCommand(cliCtx *cli.Context) error { - ctx, cancel := common.RootContext() - defer cancel() + ctx := cliCtx.Context dirs := datadir.New(cliCtx.String(utils.DataDirFlag.Name)) rebuild := cliCtx.Bool(SnapshotRebuildFlag.Name) - from := cliCtx.Uint64(SnapshotFromFlag.Name) + //from := cliCtx.Uint64(SnapshotFromFlag.Name) chainDB := mdbx.NewMDBX(log.New()).Path(dirs.Chaindata).Readonly().MustOpen() defer chainDB.Close() dir.MustExist(dirs.SnapHistory) + chainConfig := fromdb.ChainConfig(chainDB) + chainID, _ := uint256.FromBig(chainConfig.ChainID) if rebuild { panic("not implemented") } cfg := ethconfig.NewSnapCfg(true, true, false) sem := semaphore.NewWeighted(int64(estimate.IndexSnapshot.Workers())) - if err := rebuildIndices("Indexing", ctx, chainDB, cfg, dirs, from, sem); err != nil { - log.Error("Error", "err", err) + + allSnapshots := snapshotsync.NewRoSnapshots(cfg, dirs.Snap) + if err := allSnapshots.ReopenFolder(); err != nil { + return err + } + allSnapshots.LogStat() + if err := snapshotsync.BuildMissedIndices("Indexing", ctx, dirs, *chainID, sem); err != nil { + return err } agg, err := libstate.NewAggregator22(ctx, dirs.SnapHistory, dirs.Tmp, ethconfig.HistoryV3AggregationStep, chainDB) if err != nil { @@ -240,12 +231,13 @@ func doIndicesCommand(cliCtx *cli.Context) error { if err != nil { return err } + return nil } func doUncompress(cliCtx *cli.Context) error { - ctx, cancel := common.RootContext() - defer cancel() + ctx := cliCtx.Context + args := cliCtx.Args() if args.Len() != 1 { return fmt.Errorf("expecting .seg file path") @@ -293,8 +285,8 @@ func doUncompress(cliCtx *cli.Context) error { return nil } func doCompress(cliCtx *cli.Context) error { - ctx, cancel := common.RootContext() - defer cancel() + ctx := cliCtx.Context + args := cliCtx.Args() if args.Len() != 1 { return fmt.Errorf("expecting .seg file path") @@ -338,9 +330,7 @@ func doCompress(cliCtx *cli.Context) error { } func doRetireCommand(cliCtx *cli.Context) error { defer log.Info("Retire Done") - - ctx, cancel := common.RootContext() - defer cancel() + ctx := cliCtx.Context dirs := datadir.New(cliCtx.String(utils.DataDirFlag.Name)) from := cliCtx.Uint64(SnapshotFromFlag.Name) @@ -357,7 +347,6 @@ func doRetireCommand(cliCtx *cli.Context) error { } br := snapshotsync.NewBlockRetire(estimate.CompressSnapshot.Workers(), dirs.Tmp, snapshots, db, nil, nil) - agg, err := libstate.NewAggregator22(ctx, dirs.SnapHistory, dirs.Tmp, ethconfig.HistoryV3AggregationStep, db) if err != nil { return err @@ -436,6 +425,9 @@ func doRetireCommand(cliCtx *cli.Context) error { for i := 0; i < 1024; i++ { if err := db.Update(ctx, func(tx kv.RwTx) error { agg.SetTx(tx) + if err := rawdb.WriteSnapshots(tx, br.Snapshots().Files(), agg.Files()); err != nil { + return err + } if err = agg.Prune(ctx, ethconfig.HistoryV3AggregationStep); err != nil { return err } @@ -447,105 +439,3 @@ func doRetireCommand(cliCtx *cli.Context) error { return nil } - -func doSnapshotCommand(cliCtx *cli.Context) error { - ctx, cancel := common.RootContext() - defer cancel() - - fromBlock := cliCtx.Uint64(SnapshotFromFlag.Name) - toBlock := cliCtx.Uint64(SnapshotToFlag.Name) - segmentSize := cliCtx.Uint64(SnapshotSegmentSizeFlag.Name) - if segmentSize < 1000 { - return fmt.Errorf("too small --segment.size %d", segmentSize) - } - dirs := datadir.New(cliCtx.String(utils.DataDirFlag.Name)) - dir.MustExist(dirs.Snap) - dir.MustExist(dirs.SnapHistory) - dir.MustExist(dirs.Tmp) - - db := mdbx.NewMDBX(log.New()).Label(kv.ChainDB).Path(dirs.Chaindata).MustOpen() - defer db.Close() - - { - if err := snapshotBlocks(ctx, db, fromBlock, toBlock, segmentSize, dirs.Snap, dirs.Tmp); err != nil { - log.Error("Error", "err", err) - } - allSnapshots := snapshotsync.NewRoSnapshots(ethconfig.NewSnapCfg(true, true, true), dirs.Snap) - if err := allSnapshots.ReopenFolder(); err != nil { - return err - } - - agg, err := libstate.NewAggregator22(ctx, dirs.SnapHistory, dirs.Tmp, ethconfig.HistoryV3AggregationStep, db) - if err != nil { - return err - } - err = agg.ReopenFiles() - if err != nil { - return err - } - agg.SetWorkers(estimate.CompressSnapshot.Workers()) - - if err := db.Update(ctx, func(tx kv.RwTx) error { - return rawdb.WriteSnapshots(tx, allSnapshots.Files(), agg.Files()) - }); err != nil { - return err - } - } - return nil -} - -func rebuildIndices(logPrefix string, ctx context.Context, db kv.RoDB, cfg ethconfig.Snapshot, dirs datadir.Dirs, from uint64, sem *semaphore.Weighted) error { - chainConfig := fromdb.ChainConfig(db) - chainID, _ := uint256.FromBig(chainConfig.ChainID) - - allSnapshots := snapshotsync.NewRoSnapshots(cfg, dirs.Snap) - if err := allSnapshots.ReopenFolder(); err != nil { - return err - } - allSnapshots.LogStat() - - if err := snapshotsync.BuildMissedIndices(logPrefix, ctx, dirs, *chainID, sem); err != nil { - return err - } - return nil -} - -func snapshotBlocks(ctx context.Context, db kv.RoDB, fromBlock, toBlock, blocksPerFile uint64, snapDir, tmpDir string) error { - var last uint64 - - if toBlock > 0 { - last = toBlock - } else { - lastChunk := func(tx kv.Tx, blocksPerFile uint64) (uint64, error) { - c, err := tx.Cursor(kv.BlockBody) - if err != nil { - return 0, err - } - k, _, err := c.Last() - if err != nil { - return 0, err - } - last := binary.BigEndian.Uint64(k) - if last > params.FullImmutabilityThreshold { - last -= params.FullImmutabilityThreshold - } else { - last = 0 - } - last = last - last%blocksPerFile - return last, nil - } - - if err := db.View(context.Background(), func(tx kv.Tx) (err error) { - last, err = lastChunk(tx, blocksPerFile) - return err - }); err != nil { - return err - } - } - - log.Info("Last body number", "last", last) - if err := snapshotsync.DumpBlocks(ctx, fromBlock, last, blocksPerFile, tmpDir, snapDir, db, estimate.CompressSnapshot.Workers(), log.LvlInfo); err != nil { - return fmt.Errorf("DumpBlocks: %w", err) - } - return nil -}