"erigon snapshots retire" to save progress (#6485)

This commit is contained in:
Alex Sharov 2023-01-02 12:26:56 +07:00 committed by GitHub
parent 06de4aeb91
commit 94fd30c5eb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 22 additions and 132 deletions

View File

@ -41,7 +41,7 @@ Flag `--snapshots` is compatible with `--prune` flag
# Create new snapshots (can change snapshot size by: --from=0 --to=1_000_000 --segment.size=500_000)
# It will dump blocks from Database to .seg files:
erigon snapshots create --datadir=<your_datadir>
erigon snapshots retire --datadir=<your_datadir>
# Create .torrent files (Downloader will seed automatically all .torrent files)
# output format is compatible with https://github.com/ledgerwatch/erigon-snapshot

View File

@ -2,7 +2,6 @@ package app
import (
"bufio"
"context"
"encoding/binary"
"errors"
"fmt"
@ -34,14 +33,11 @@ import (
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/eth/ethconfig"
"github.com/ledgerwatch/erigon/eth/ethconfig/estimate"
"github.com/ledgerwatch/erigon/params"
"github.com/ledgerwatch/erigon/turbo/debug"
"github.com/ledgerwatch/erigon/turbo/logging"
"github.com/ledgerwatch/erigon/turbo/snapshotsync"
)
const ASSERT = false
func joinFlags(lists ...[]cli.Flag) (res []cli.Flag) {
for _, list := range lists {
res = append(res, list...)
@ -53,18 +49,6 @@ var snapshotCommand = cli.Command{
Name: "snapshots",
Description: `Managing snapshots (historical data partitions)`,
Subcommands: []*cli.Command{
{
Name: "create",
Action: doSnapshotCommand,
Usage: "Create snapshots for given range of blocks",
Before: func(ctx *cli.Context) error { return debug.Setup(ctx) },
Flags: joinFlags([]cli.Flag{
&utils.DataDirFlag,
&SnapshotFromFlag,
&SnapshotToFlag,
&SnapshotSegmentSizeFlag,
}, debug.Flags, logging.Flags),
},
{
Name: "index",
Action: doIndicesCommand,
@ -208,25 +192,32 @@ func doRam(cliCtx *cli.Context) error {
return nil
}
func doIndicesCommand(cliCtx *cli.Context) error {
ctx, cancel := common.RootContext()
defer cancel()
ctx := cliCtx.Context
dirs := datadir.New(cliCtx.String(utils.DataDirFlag.Name))
rebuild := cliCtx.Bool(SnapshotRebuildFlag.Name)
from := cliCtx.Uint64(SnapshotFromFlag.Name)
//from := cliCtx.Uint64(SnapshotFromFlag.Name)
chainDB := mdbx.NewMDBX(log.New()).Path(dirs.Chaindata).Readonly().MustOpen()
defer chainDB.Close()
dir.MustExist(dirs.SnapHistory)
chainConfig := fromdb.ChainConfig(chainDB)
chainID, _ := uint256.FromBig(chainConfig.ChainID)
if rebuild {
panic("not implemented")
}
cfg := ethconfig.NewSnapCfg(true, true, false)
sem := semaphore.NewWeighted(int64(estimate.IndexSnapshot.Workers()))
if err := rebuildIndices("Indexing", ctx, chainDB, cfg, dirs, from, sem); err != nil {
log.Error("Error", "err", err)
allSnapshots := snapshotsync.NewRoSnapshots(cfg, dirs.Snap)
if err := allSnapshots.ReopenFolder(); err != nil {
return err
}
allSnapshots.LogStat()
if err := snapshotsync.BuildMissedIndices("Indexing", ctx, dirs, *chainID, sem); err != nil {
return err
}
agg, err := libstate.NewAggregator22(ctx, dirs.SnapHistory, dirs.Tmp, ethconfig.HistoryV3AggregationStep, chainDB)
if err != nil {
@ -240,12 +231,13 @@ func doIndicesCommand(cliCtx *cli.Context) error {
if err != nil {
return err
}
return nil
}
func doUncompress(cliCtx *cli.Context) error {
ctx, cancel := common.RootContext()
defer cancel()
ctx := cliCtx.Context
args := cliCtx.Args()
if args.Len() != 1 {
return fmt.Errorf("expecting .seg file path")
@ -293,8 +285,8 @@ func doUncompress(cliCtx *cli.Context) error {
return nil
}
func doCompress(cliCtx *cli.Context) error {
ctx, cancel := common.RootContext()
defer cancel()
ctx := cliCtx.Context
args := cliCtx.Args()
if args.Len() != 1 {
return fmt.Errorf("expecting .seg file path")
@ -338,9 +330,7 @@ func doCompress(cliCtx *cli.Context) error {
}
func doRetireCommand(cliCtx *cli.Context) error {
defer log.Info("Retire Done")
ctx, cancel := common.RootContext()
defer cancel()
ctx := cliCtx.Context
dirs := datadir.New(cliCtx.String(utils.DataDirFlag.Name))
from := cliCtx.Uint64(SnapshotFromFlag.Name)
@ -357,7 +347,6 @@ func doRetireCommand(cliCtx *cli.Context) error {
}
br := snapshotsync.NewBlockRetire(estimate.CompressSnapshot.Workers(), dirs.Tmp, snapshots, db, nil, nil)
agg, err := libstate.NewAggregator22(ctx, dirs.SnapHistory, dirs.Tmp, ethconfig.HistoryV3AggregationStep, db)
if err != nil {
return err
@ -436,6 +425,9 @@ func doRetireCommand(cliCtx *cli.Context) error {
for i := 0; i < 1024; i++ {
if err := db.Update(ctx, func(tx kv.RwTx) error {
agg.SetTx(tx)
if err := rawdb.WriteSnapshots(tx, br.Snapshots().Files(), agg.Files()); err != nil {
return err
}
if err = agg.Prune(ctx, ethconfig.HistoryV3AggregationStep); err != nil {
return err
}
@ -447,105 +439,3 @@ func doRetireCommand(cliCtx *cli.Context) error {
return nil
}
func doSnapshotCommand(cliCtx *cli.Context) error {
ctx, cancel := common.RootContext()
defer cancel()
fromBlock := cliCtx.Uint64(SnapshotFromFlag.Name)
toBlock := cliCtx.Uint64(SnapshotToFlag.Name)
segmentSize := cliCtx.Uint64(SnapshotSegmentSizeFlag.Name)
if segmentSize < 1000 {
return fmt.Errorf("too small --segment.size %d", segmentSize)
}
dirs := datadir.New(cliCtx.String(utils.DataDirFlag.Name))
dir.MustExist(dirs.Snap)
dir.MustExist(dirs.SnapHistory)
dir.MustExist(dirs.Tmp)
db := mdbx.NewMDBX(log.New()).Label(kv.ChainDB).Path(dirs.Chaindata).MustOpen()
defer db.Close()
{
if err := snapshotBlocks(ctx, db, fromBlock, toBlock, segmentSize, dirs.Snap, dirs.Tmp); err != nil {
log.Error("Error", "err", err)
}
allSnapshots := snapshotsync.NewRoSnapshots(ethconfig.NewSnapCfg(true, true, true), dirs.Snap)
if err := allSnapshots.ReopenFolder(); err != nil {
return err
}
agg, err := libstate.NewAggregator22(ctx, dirs.SnapHistory, dirs.Tmp, ethconfig.HistoryV3AggregationStep, db)
if err != nil {
return err
}
err = agg.ReopenFiles()
if err != nil {
return err
}
agg.SetWorkers(estimate.CompressSnapshot.Workers())
if err := db.Update(ctx, func(tx kv.RwTx) error {
return rawdb.WriteSnapshots(tx, allSnapshots.Files(), agg.Files())
}); err != nil {
return err
}
}
return nil
}
func rebuildIndices(logPrefix string, ctx context.Context, db kv.RoDB, cfg ethconfig.Snapshot, dirs datadir.Dirs, from uint64, sem *semaphore.Weighted) error {
chainConfig := fromdb.ChainConfig(db)
chainID, _ := uint256.FromBig(chainConfig.ChainID)
allSnapshots := snapshotsync.NewRoSnapshots(cfg, dirs.Snap)
if err := allSnapshots.ReopenFolder(); err != nil {
return err
}
allSnapshots.LogStat()
if err := snapshotsync.BuildMissedIndices(logPrefix, ctx, dirs, *chainID, sem); err != nil {
return err
}
return nil
}
func snapshotBlocks(ctx context.Context, db kv.RoDB, fromBlock, toBlock, blocksPerFile uint64, snapDir, tmpDir string) error {
var last uint64
if toBlock > 0 {
last = toBlock
} else {
lastChunk := func(tx kv.Tx, blocksPerFile uint64) (uint64, error) {
c, err := tx.Cursor(kv.BlockBody)
if err != nil {
return 0, err
}
k, _, err := c.Last()
if err != nil {
return 0, err
}
last := binary.BigEndian.Uint64(k)
if last > params.FullImmutabilityThreshold {
last -= params.FullImmutabilityThreshold
} else {
last = 0
}
last = last - last%blocksPerFile
return last, nil
}
if err := db.View(context.Background(), func(tx kv.Tx) (err error) {
last, err = lastChunk(tx, blocksPerFile)
return err
}); err != nil {
return err
}
}
log.Info("Last body number", "last", last)
if err := snapshotsync.DumpBlocks(ctx, fromBlock, last, blocksPerFile, tmpDir, snapDir, db, estimate.CompressSnapshot.Workers(), log.LvlInfo); err != nil {
return fmt.Errorf("DumpBlocks: %w", err)
}
return nil
}