Snapshot: move old blocks to snapshots (non-async version) (#3309)

* save

* save
This commit is contained in:
Alex Sharov 2022-01-20 12:01:02 +07:00 committed by GitHub
parent 70607a849f
commit 6fd002eb5a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 150 additions and 49 deletions

View File

@ -1041,7 +1041,7 @@ func allSnapshots(cc *params.ChainConfig) *snapshotsync.AllSnapshots {
Enabled: true,
Dir: path.Join(datadir, "snapshots"),
}
_allSnapshotsSingleton = snapshotsync.NewAllSnapshots(snapshotCfg.Dir, snapshothashes.KnownConfig(cc.ChainName))
_allSnapshotsSingleton = snapshotsync.NewAllSnapshots(snapshotCfg, snapshothashes.KnownConfig(cc.ChainName))
if err := _allSnapshotsSingleton.ReopenSegments(); err != nil {
panic(err)
}

View File

@ -278,7 +278,7 @@ func RemoteServices(ctx context.Context, cfg Flags, logger log.Logger, rootCance
return nil, nil, nil, nil, nil, nil, fmt.Errorf("chain config not found in db. Need start erigon at least once on this db")
}
allSnapshots := snapshotsync.NewAllSnapshots(cfg.Snapshot.Dir, snapshothashes.KnownConfig(cc.ChainName))
allSnapshots := snapshotsync.NewAllSnapshots(cfg.Snapshot, snapshothashes.KnownConfig(cc.ChainName))
if err := allSnapshots.ReopenSegments(); err != nil {
return nil, nil, nil, nil, nil, nil, err
}

View File

@ -547,6 +547,10 @@ var (
Name: "experimental.snapshot",
Usage: "Enabling experimental snapshot sync",
}
SnapshotRetireFlag = cli.BoolFlag{
Name: "experimental.snapshot.retire",
Usage: "Delete(!) old blocks from DB, by move them to snapshots",
}
HealthCheckFlag = cli.BoolFlag{
Name: "healthcheck",
@ -1271,6 +1275,9 @@ func SetEthConfig(ctx *cli.Context, nodeConfig *node.Config, cfg *ethconfig.Conf
cfg.Snapshot.Enabled = true
cfg.Snapshot.Dir = path.Join(nodeConfig.DataDir, "snapshots")
}
if ctx.GlobalBool(SnapshotRetireFlag.Name) {
cfg.Snapshot.RetireEnabled = true
}
CheckExclusive(ctx, MinerSigningKeyFileFlag, MinerEtherbaseFlag)
setEtherbase(ctx, cfg)

View File

@ -482,6 +482,22 @@ func ResetSequence(tx kv.RwTx, bucket string, newValue uint64) error {
return nil
}
func ReadBodyForStorageByKey(db kv.Getter, k []byte) (*types.BodyForStorage, error) {
bodyRlp, err := db.GetOne(kv.BlockBody, k)
if err != nil {
return nil, err
}
if len(bodyRlp) == 0 {
return nil, nil
}
bodyForStorage := new(types.BodyForStorage)
if err := rlp.DecodeBytes(bodyRlp, bodyForStorage); err != nil {
return nil, err
}
return bodyForStorage, nil
}
func ReadBody(db kv.Getter, hash common.Hash, number uint64) (*types.Body, uint64, uint32) {
data := ReadStorageBodyRLP(db, hash, number)
if len(data) == 0 {
@ -1022,6 +1038,50 @@ func WriteBlock(db kv.RwTx, block *types.Block) error {
return nil
}
// DeleteAncientBlocks - delete old block after moving it to snapshots. [from, to)
func DeleteAncientBlocks(db kv.RwTx, blockFrom, blockTo uint64) error {
//doesn't delete Receipts - because Receipts are not in snapshots yet
for n := blockFrom; n < blockTo; n++ {
canonicalHash, err := ReadCanonicalHash(db, n)
if err != nil {
return err
}
if err := db.ForPrefix(kv.Headers, dbutils.EncodeBlockNumber(n), func(k, v []byte) error {
isCanonical := bytes.Equal(k[8:], canonicalHash[:])
if err := db.Delete(kv.Headers, k, nil); err != nil {
return err
}
b, err := ReadBodyForStorageByKey(db, k)
if err != nil {
return err
}
txIDBytes := make([]byte, 8)
for txID := b.BaseTxId; txID < b.BaseTxId+uint64(b.TxAmount); txID++ {
binary.BigEndian.PutUint64(txIDBytes, txID)
bucket := kv.EthTx
if !isCanonical {
bucket = kv.NonCanonicalTxs
}
if err := db.Delete(bucket, txIDBytes, nil); err != nil {
return err
}
}
if err := db.Delete(kv.BlockBody, k, nil); err != nil {
return err
}
if err := db.Delete(kv.Senders, k, nil); err != nil {
return err
}
return nil
}); err != nil {
return err
}
}
return nil
}
// DeleteBlock removes all block data associated with a hash.
func DeleteBlock(db kv.RwTx, hash common.Hash, number uint64) error {
if err := DeleteReceipts(db, number); err != nil {

View File

@ -337,7 +337,7 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere
return nil, err
}
allSnapshots := snapshotsync.NewAllSnapshots(config.Snapshot.Dir, snConfig)
allSnapshots := snapshotsync.NewAllSnapshots(config.Snapshot, snConfig)
if err != nil {
return nil, err
}

View File

@ -18,7 +18,6 @@
package ethconfig
import (
"github.com/ledgerwatch/erigon/consensus/parlia"
"math/big"
"os"
"os/user"
@ -26,6 +25,8 @@ import (
"runtime"
"time"
"github.com/ledgerwatch/erigon/consensus/parlia"
"github.com/c2h5oh/datasize"
"github.com/davecgh/go-spew/spew"
"github.com/ledgerwatch/erigon/consensus/aura"
@ -118,6 +119,7 @@ func init() {
type Snapshot struct {
Enabled bool
RetireEnabled bool
Dir string
ChainSnapshotConfig *snapshothashes.Config
}

View File

@ -368,13 +368,35 @@ func PruneSendersStage(s *PruneState, tx kv.RwTx, cfg SendersCfg, ctx context.Co
}
defer tx.Rollback()
}
if !cfg.prune.TxIndex.Enabled() {
return nil
}
if err = PruneTable(tx, kv.Senders, s.LogPrefix(), to, logEvery, ctx); err != nil {
return err
if cfg.snapshots != nil && cfg.snapshots.Cfg().RetireEnabled {
if err := cfg.snapshots.EnsureExpectedBlocksAreAvailable(); err != nil {
return err
}
blockFrom := cfg.snapshots.BlocksAvailable() + 1
blockTo := s.ForwardProgress - params.FullImmutabilityThreshold
if blockTo-blockFrom > 10_000 {
// in future we will do it in background
if err := snapshotsync.DumpBlocks(ctx, blockFrom, blockTo, snapshotsync.DEFAULT_SEGMENT_SIZE, cfg.tmpdir, cfg.snapshots.Dir(), cfg.db, 1); err != nil {
return err
}
if err := cfg.snapshots.ReopenSegments(); err != nil {
return err
}
if err := cfg.snapshots.ReopenIndices(); err != nil {
return err
}
if err := rawdb.DeleteAncientBlocks(tx, blockFrom, blockTo); err != nil {
return nil
}
}
}
if cfg.prune.TxIndex.Enabled() {
if err = PruneTable(tx, kv.Senders, s.LogPrefix(), to, logEvery, ctx); err != nil {
return err
}
}
if !useExternalTx {
if err = tx.Commit(); err != nil {
return err

View File

@ -48,6 +48,7 @@ var DefaultFlags = []cli.Flag{
SyncLoopThrottleFlag,
BadBlockFlag,
utils.SnapshotSyncFlag,
utils.SnapshotRetireFlag,
utils.ListenPortFlag,
utils.NATFlag,
utils.NoDiscoverFlag,

View File

@ -17,6 +17,7 @@ import (
"github.com/ledgerwatch/erigon/cmd/hack/tool"
"github.com/ledgerwatch/erigon/cmd/utils"
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/eth/ethconfig"
"github.com/ledgerwatch/erigon/internal/debug"
"github.com/ledgerwatch/erigon/params"
"github.com/ledgerwatch/erigon/turbo/snapshotsync"
@ -70,7 +71,7 @@ var (
SnapshotSegmentSizeFlag = cli.Uint64Flag{
Name: "segment.size",
Usage: "Amount of blocks in each segment",
Value: 500_000,
Value: snapshotsync.DEFAULT_SEGMENT_SIZE,
}
SnapshotRebuildFlag = cli.BoolFlag{
Name: "rebuild",
@ -91,7 +92,8 @@ func doIndicesCommand(cliCtx *cli.Context) error {
defer chainDB.Close()
if rebuild {
if err := rebuildIndices(ctx, chainDB, snapshotDir, tmpDir); err != nil {
cfg := ethconfig.Snapshot{Dir: snapshotDir, RetireEnabled: true, Enabled: true}
if err := rebuildIndices(ctx, chainDB, cfg, tmpDir); err != nil {
log.Error("Error", "err", err)
}
}
@ -121,17 +123,17 @@ func doSnapshotCommand(cliCtx *cli.Context) error {
return nil
}
func rebuildIndices(ctx context.Context, chainDB kv.RoDB, snapshotDir, tmpDir string) error {
func rebuildIndices(ctx context.Context, chainDB kv.RoDB, cfg ethconfig.Snapshot, tmpDir string) error {
chainConfig := tool.ChainConfigFromDB(chainDB)
chainID, _ := uint256.FromBig(chainConfig.ChainID)
_ = chainID
_ = os.MkdirAll(snapshotDir, 0744)
_ = os.MkdirAll(cfg.Dir, 0744)
allSnapshots := snapshotsync.NewAllSnapshots(snapshotDir, snapshothashes.KnownConfig(chainConfig.ChainName))
allSnapshots := snapshotsync.NewAllSnapshots(cfg, snapshothashes.KnownConfig(chainConfig.ChainName))
if err := allSnapshots.ReopenSegments(); err != nil {
return err
}
idxFilesList, err := snapshotsync.IdxFiles(snapshotDir)
idxFilesList, err := snapshotsync.IdxFiles(cfg.Dir)
if err != nil {
return err
}
@ -202,7 +204,8 @@ func checkBlockSnapshot(chaindata string) error {
chainID, _ := uint256.FromBig(chainConfig.ChainID)
_ = chainID
snapshots := snapshotsync.NewAllSnapshots(path.Join(dataDir, "snapshots"), snapshothashes.KnownConfig(chainConfig.ChainName))
cfg := ethconfig.Snapshot{Dir: path.Join(dataDir, "snapshots"), Enabled: true, RetireEnabled: true}
snapshots := snapshotsync.NewAllSnapshots(cfg, snapshothashes.KnownConfig(chainConfig.ChainName))
snapshots.ReopenSegments()
snapshots.ReopenIndices()
//if err := snapshots.BuildIndices(context.Background(), *chainID); err != nil {

View File

@ -28,6 +28,7 @@ import (
"github.com/ledgerwatch/erigon/common/dbutils"
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/eth/ethconfig"
"github.com/ledgerwatch/erigon/rlp"
"github.com/ledgerwatch/erigon/turbo/snapshotsync/snapshothashes"
"github.com/ledgerwatch/log/v3"
@ -64,21 +65,12 @@ var (
ErrInvalidCompressedFileName = fmt.Errorf("invalid compressed file name")
)
func FileName(from, to uint64, name SnapshotType) string {
return fmt.Sprintf("v1-%06d-%06d-%s", from/1_000, to/1_000, name)
}
func SegmentFileName(from, to uint64, name SnapshotType) string {
return FileName(from, to, name) + ".seg"
}
func TmpFileName(from, to uint64, name SnapshotType) string {
return FileName(from, to, name) + ".dat"
}
func IdxFileName(from, to uint64, name SnapshotType) string {
return FileName(from, to, name) + ".idx"
func FileName(from, to uint64, t SnapshotType) string {
return fmt.Sprintf("v1-%06d-%06d-%s", from/1_000, to/1_000, t)
}
func SegmentFileName(from, to uint64, t SnapshotType) string { return FileName(from, to, t) + ".seg" }
func DatFileName(from, to uint64, t SnapshotType) string { return FileName(from, to, t) + ".dat" }
func IdxFileName(from, to uint64, t SnapshotType) string { return FileName(from, to, t) + ".idx" }
func (s BlocksSnapshot) Has(block uint64) bool { return block >= s.From && block < s.To }
@ -89,7 +81,8 @@ type AllSnapshots struct {
segmentsAvailable uint64
idxAvailable uint64
blocks []*BlocksSnapshot
cfg *snapshothashes.Config
chainSnapshotCfg *snapshothashes.Config
cfg ethconfig.Snapshot
}
// NewAllSnapshots - opens all snapshots. But to simplify everything:
@ -97,14 +90,16 @@ type AllSnapshots struct {
// - all snapshots of given blocks range must exist - to make this blocks range available
// - gaps are not allowed
// - segment have [from:to) semantic
func NewAllSnapshots(dir string, cfg *snapshothashes.Config) *AllSnapshots {
if err := os.MkdirAll(dir, 0744); err != nil {
func NewAllSnapshots(cfg ethconfig.Snapshot, snCfg *snapshothashes.Config) *AllSnapshots {
if err := os.MkdirAll(cfg.Dir, 0744); err != nil {
panic(err)
}
return &AllSnapshots{dir: dir, cfg: cfg}
return &AllSnapshots{dir: cfg.Dir, chainSnapshotCfg: snCfg, cfg: cfg}
}
func (s *AllSnapshots) ChainSnapshotConfig() *snapshothashes.Config { return s.cfg }
func (s *AllSnapshots) ChainSnapshotConfig() *snapshothashes.Config { return s.chainSnapshotCfg }
func (s *AllSnapshots) Cfg() ethconfig.Snapshot { return s.cfg }
func (s *AllSnapshots) Dir() string { return s.dir }
func (s *AllSnapshots) AllSegmentsAvailable() bool { return s.allSegmentsAvailable }
func (s *AllSnapshots) SetAllSegmentsAvailable(v bool) { s.allSegmentsAvailable = v }
func (s *AllSnapshots) BlocksAvailable() uint64 { return s.segmentsAvailable }
@ -112,6 +107,13 @@ func (s *AllSnapshots) AllIdxAvailable() bool { return s.a
func (s *AllSnapshots) SetAllIdxAvailable(v bool) { s.allIdxAvailable = v }
func (s *AllSnapshots) IndicesAvailable() uint64 { return s.idxAvailable }
func (s *AllSnapshots) EnsureExpectedBlocksAreAvailable() error {
if s.BlocksAvailable() < s.ChainSnapshotConfig().ExpectBlocks {
return fmt.Errorf("app must wait until all expected snapshots are available. Expected: %d, Available: %d", s.ChainSnapshotConfig().ExpectBlocks, s.BlocksAvailable())
}
return nil
}
func (s *AllSnapshots) SegmentsAvailability() (headers, bodies, txs uint64, err error) {
if headers, err = latestSegment(s.dir, Headers); err != nil {
return
@ -213,7 +215,7 @@ func (s *AllSnapshots) ReopenSegments() error {
if to == prevTo {
continue
}
if from > s.cfg.ExpectBlocks {
if from > s.chainSnapshotCfg.ExpectBlocks {
log.Debug("[open snapshots] skip snapshot because node expect less blocks in snapshots", "file", f)
continue
}
@ -500,6 +502,8 @@ func ParseFileName(name, expectedExt string) (from, to uint64, snapshotType Snap
return from * 1_000, to * 1_000, snapshotType, nil
}
const DEFAULT_SEGMENT_SIZE = 500_000
func DumpBlocks(ctx context.Context, blockFrom, blockTo, blocksPerFile uint64, tmpDir, snapshotDir string, chainDB kv.RoDB, workers int) error {
for i := blockFrom; i < blockTo; i += blocksPerFile {
if err := dumpBlocksRange(ctx, i, i+blocksPerFile, tmpDir, snapshotDir, chainDB, workers); err != nil {

View File

@ -8,6 +8,7 @@ import (
"github.com/ledgerwatch/erigon-lib/compress"
"github.com/ledgerwatch/erigon-lib/recsplit"
"github.com/ledgerwatch/erigon/common/math"
"github.com/ledgerwatch/erigon/eth/ethconfig"
"github.com/ledgerwatch/erigon/params/networkname"
"github.com/ledgerwatch/erigon/turbo/snapshotsync/snapshothashes"
"github.com/stretchr/testify/require"
@ -15,8 +16,9 @@ import (
func TestOpenAllSnapshot(t *testing.T) {
dir, require := t.TempDir(), require.New(t)
cfg := snapshothashes.KnownConfig(networkname.MainnetChainName)
cfg.ExpectBlocks = math.MaxUint64
chainSnapshotCfg := snapshothashes.KnownConfig(networkname.MainnetChainName)
chainSnapshotCfg.ExpectBlocks = math.MaxUint64
cfg := ethconfig.Snapshot{Dir: dir, Enabled: true}
createFile := func(from, to uint64, name SnapshotType) {
c, err := compress.NewCompressor(context.Background(), "test", path.Join(dir, SegmentFileName(from, to, name)), dir, 100, 1)
require.NoError(err)
@ -38,7 +40,7 @@ func TestOpenAllSnapshot(t *testing.T) {
err = idx.Build()
require.NoError(err)
}
s := NewAllSnapshots(dir, cfg)
s := NewAllSnapshots(cfg, chainSnapshotCfg)
defer s.Close()
err := s.ReopenSegments()
require.NoError(err)
@ -46,14 +48,14 @@ func TestOpenAllSnapshot(t *testing.T) {
s.Close()
createFile(500_000, 1_000_000, Bodies)
s = NewAllSnapshots(dir, cfg)
s = NewAllSnapshots(cfg, chainSnapshotCfg)
defer s.Close()
require.Equal(0, len(s.blocks)) //because, no headers and transactions snapshot files are created
s.Close()
createFile(500_000, 1_000_000, Headers)
createFile(500_000, 1_000_000, Transactions)
s = NewAllSnapshots(dir, cfg)
s = NewAllSnapshots(cfg, chainSnapshotCfg)
err = s.ReopenSegments()
require.Error(err)
require.Equal(0, len(s.blocks)) //because, no gaps are allowed (expect snapshots from block 0)
@ -62,7 +64,7 @@ func TestOpenAllSnapshot(t *testing.T) {
createFile(0, 500_000, Bodies)
createFile(0, 500_000, Headers)
createFile(0, 500_000, Transactions)
s = NewAllSnapshots(dir, cfg)
s = NewAllSnapshots(cfg, chainSnapshotCfg)
err = s.ReopenSegments()
require.NoError(err)
defer s.Close()
@ -80,8 +82,8 @@ func TestOpenAllSnapshot(t *testing.T) {
require.False(ok)
// user must be able to limit amount of blocks which read from snapshot
cfg.ExpectBlocks = 500_000 - 1
s = NewAllSnapshots(dir, cfg)
chainSnapshotCfg.ExpectBlocks = 500_000 - 1
s = NewAllSnapshots(cfg, chainSnapshotCfg)
err = s.ReopenSegments()
require.NoError(err)
defer s.Close()
@ -90,8 +92,8 @@ func TestOpenAllSnapshot(t *testing.T) {
createFile(500_000, 900_000, Headers)
createFile(500_000, 900_000, Bodies)
createFile(500_000, 900_000, Transactions)
cfg.ExpectBlocks = math.MaxUint64
s = NewAllSnapshots(dir, cfg)
chainSnapshotCfg.ExpectBlocks = math.MaxUint64
s = NewAllSnapshots(cfg, chainSnapshotCfg)
defer s.Close()
err = s.ReopenSegments()
require.Error(err)

View File

@ -35,9 +35,9 @@ func WrapBySnapshotsFromDownloader(db kv.RwDB, snapshots map[snapshotsync.Snapsh
snKV := snapshotdb.NewSnapshotKV().DB(db)
for k, v := range snapshots {
log.Info("Wrap db by", "snapshot", k.String(), "dir", v.Dbpath)
cfg := BucketConfigs[k]
chainSnapshotCfg := BucketConfigs[k]
snapshotKV, err := kv2.NewMDBX(log.New()).Readonly().Path(v.Dbpath).WithTablessCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg {
return cfg
return chainSnapshotCfg
}).Open()
if err != nil {

View File

@ -257,7 +257,7 @@ func NewStagedSync(
var blockReader interfaces.FullBlockReader
var allSnapshots *snapshotsync.AllSnapshots
if cfg.Snapshot.Enabled {
allSnapshots = snapshotsync.NewAllSnapshots(cfg.Snapshot.Dir, snapshothashes.KnownConfig(controlServer.ChainConfig.ChainName))
allSnapshots = snapshotsync.NewAllSnapshots(cfg.Snapshot, snapshothashes.KnownConfig(controlServer.ChainConfig.ChainName))
blockReader = snapshotsync.NewBlockReaderWithSnapshots(allSnapshots)
} else {
blockReader = snapshotsync.NewBlockReader()