"erigon snapshots recompress" to apply new compression rules without db access (#3571)

* mainnet

* save

* save

* save

* save
This commit is contained in:
Alex Sharov 2022-02-22 15:35:04 +07:00 committed by GitHub
parent f624d1cc22
commit dbd8a93fa8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -4,13 +4,16 @@ import (
"context"
"encoding/binary"
"fmt"
"os"
"path"
"path/filepath"
"runtime"
"time"
"github.com/holiman/uint256"
"github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/common/dir"
"github.com/ledgerwatch/erigon-lib/compress"
"github.com/ledgerwatch/erigon-lib/etl"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/mdbx"
@ -43,6 +46,15 @@ var snapshotCommand = cli.Command{
SnapshotSegmentSizeFlag,
}, debug.Flags...),
},
{
Name: "recompress",
Action: doRecompressCommand,
Usage: "Recompress existing .seg files to apply new compression rules",
Before: func(ctx *cli.Context) error { return debug.Setup(ctx) },
Flags: append([]cli.Flag{
utils.DataDirFlag,
}, debug.Flags...),
},
{
Name: "index",
Action: doIndicesCommand,
@ -129,6 +141,23 @@ func doSnapshotCommand(cliCtx *cli.Context) error {
}
return nil
}
func doRecompressCommand(cliCtx *cli.Context) error {
ctx, cancel := common.RootContext()
defer cancel()
dataDir := cliCtx.String(utils.DataDirFlag.Name)
snapshotDir, err := dir.OpenRw(filepath.Join(dataDir, "snapshots"))
if err != nil {
return err
}
tmpDir := filepath.Join(dataDir, etl.TmpDirName)
dir.MustExist(tmpDir)
if err := recompressSegments(ctx, snapshotDir, tmpDir); err != nil {
log.Error("Error", "err", err)
}
return nil
}
func rebuildIndices(ctx context.Context, chainDB kv.RoDB, cfg ethconfig.Snapshot, snapshotDir *dir.Rw, tmpDir string, from uint64) error {
chainConfig := tool.ChainConfigFromDB(chainDB)
chainID, _ := uint256.FromBig(chainConfig.ChainID)
@ -143,6 +172,74 @@ func rebuildIndices(ctx context.Context, chainDB kv.RoDB, cfg ethconfig.Snapshot
return nil
}
func recompressSegments(ctx context.Context, snapshotDir *dir.Rw, tmpDir string) error {
allFiles, err := snapshotsync.Segments(snapshotDir.Path)
if err != nil {
return err
}
for _, f := range allFiles {
f = filepath.Join(snapshotDir.Path, f)
outFile := f + ".tmp2"
if err := cpSegmentByWords(ctx, f, outFile, tmpDir); err != nil {
return err
}
if err = os.Remove(f); err != nil {
return err
}
if err = os.Rename(outFile, f); err != nil {
return err
}
}
return nil
}
func cpSegmentByWords(ctx context.Context, srcF, dstF, tmpDir string) error {
logEvery := time.NewTicker(10 * time.Second)
defer logEvery.Stop()
workers := runtime.NumCPU() - 1
if workers < 1 {
workers = 1
}
buf := make([]byte, 4096)
d, err := compress.NewDecompressor(srcF)
if err != nil {
return err
}
defer d.Close()
out, err := compress.NewCompressor(ctx, "", dstF, tmpDir, compress.MinPatternScore, workers)
if err != nil {
return err
}
defer out.Close()
i := 0
if err := d.WithReadAhead(func() error {
g := d.MakeGetter()
for g.HasNext() {
buf, _ = g.Next(buf[:0])
if err := out.AddWord(buf); err != nil {
return err
}
select {
default:
case <-ctx.Done():
return ctx.Err()
case <-logEvery.C:
log.Info("[snapshots] Recompress", "file", srcF, "progress", fmt.Sprintf("%.2f%%", 100*float64(i)/float64(d.Count())))
}
}
return nil
}); err != nil {
return err
}
if err := out.Compress(); err != nil {
return err
}
return nil
}
func snapshotBlocks(ctx context.Context, chainDB kv.RoDB, fromBlock, toBlock, blocksPerFile uint64, snapshotDir, tmpDir string) error {
var last uint64