Snapshots: retire blocks by default (#3707)

This commit is contained in:
Alex Sharov 2022-03-16 09:57:48 +07:00 committed by GitHub
parent 10aee02e2e
commit 469b75c3d3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 937 additions and 519 deletions

View File

@ -24,6 +24,8 @@ downloader --downloader.api.addr=127.0.0.1:9093 --torrent.port=42068 --datadir=<
erigon --experimental.snapshot --downloader.api.addr=127.0.0.1:9093 --datadir=<your_datadir>
```
Use `--experimental.snapshot.keepblocks=true` to don't delete retired blocks from DB
## How to create new network or bootnode
```shell

View File

@ -90,7 +90,7 @@ func withBucket(cmd *cobra.Command) {
}
func withDataDir2(cmd *cobra.Command) {
cmd.Flags().String(utils.DataDirFlag.Name, paths.DefaultDataDir(), utils.DataDirFlag.Usage)
cmd.Flags().StringVar(&datadir, utils.DataDirFlag.Name, paths.DefaultDataDir(), utils.DataDirFlag.Usage)
must(cmd.MarkFlagDirname(utils.DataDirFlag.Name))
must(cmd.MarkFlagRequired(utils.DataDirFlag.Name))
cmd.Flags().IntVar(&databaseVerbosity, "database.verbosity", 2, "Enabling internal db logs. Very high verbosity levels may require recompile db. Default: 2, means warning.")

View File

@ -5,12 +5,14 @@ import (
"context"
"fmt"
"path/filepath"
"runtime"
"sort"
"strings"
"sync"
"github.com/c2h5oh/datasize"
common2 "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/common/dir"
"github.com/ledgerwatch/erigon-lib/etl"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/log/v3"
@ -551,20 +553,26 @@ func stageSenders(db kv.RwDB, ctx context.Context) error {
if err != nil {
return err
}
cfg := stagedsync.StageSendersCfg(db, chainConfig, tmpdir, pm, allSnapshots(chainConfig))
cfg := stagedsync.StageSendersCfg(db, chainConfig, tmpdir, pm, snapshotsync.NewBlockRetire(runtime.NumCPU(), tmpdir, allSnapshots(chainConfig), db))
if unwind > 0 {
u := sync.NewUnwindState(stages.Senders, s.BlockNumber-unwind, s.BlockNumber)
err = stagedsync.UnwindSendersStage(u, tx, cfg, ctx)
if err = stagedsync.UnwindSendersStage(u, tx, cfg, ctx); err != nil {
return err
}
} else if pruneTo > 0 {
p, err := sync.PruneStageState(stages.Senders, s.BlockNumber, tx, db)
if err != nil {
return err
}
if err = stagedsync.PruneSendersStage(p, tx, cfg, ctx); err != nil {
return err
}
return nil
} else {
err = stagedsync.SpawnRecoverSendersStage(cfg, s, sync, tx, block, ctx)
if err != nil {
if err = stagedsync.SpawnRecoverSendersStage(cfg, s, sync, tx, block, ctx); err != nil {
return err
}
}
return tx.Commit()
}
@ -1023,7 +1031,7 @@ var _allSnapshotsSingleton *snapshotsync.RoSnapshots
func allSnapshots(cc *params.ChainConfig) *snapshotsync.RoSnapshots {
openSnapshotOnce.Do(func() {
if enableSnapshot {
snapshotCfg := ethconfig.NewSnapshotCfg(true, false)
snapshotCfg := ethconfig.NewSnapshotCfg(enableSnapshot, true)
_allSnapshotsSingleton = snapshotsync.NewRoSnapshots(snapshotCfg, filepath.Join(datadir, "snapshots"))
if err := _allSnapshotsSingleton.ReopenSegments(); err != nil {
panic(err)
@ -1036,12 +1044,18 @@ func allSnapshots(cc *params.ChainConfig) *snapshotsync.RoSnapshots {
return _allSnapshotsSingleton
}
var openBlockReaderOnce sync.Once
var _blockReaderSingleton interfaces.FullBlockReader
func getBlockReader(cc *params.ChainConfig) (blockReader interfaces.FullBlockReader) {
blockReader = snapshotsync.NewBlockReader()
if sn := allSnapshots(cc); sn != nil {
blockReader = snapshotsync.NewBlockReaderWithSnapshots(sn)
}
return blockReader
openBlockReaderOnce.Do(func() {
_blockReaderSingleton = snapshotsync.NewBlockReader()
if sn := allSnapshots(cc); sn != nil {
x := snapshotsync.NewBlockReaderWithSnapshots(sn)
_blockReaderSingleton = x
}
})
return _blockReaderSingleton
}
func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig) (prune.Mode, consensus.Engine, *params.ChainConfig, *vm.Config, *stagedsync.Sync, *stagedsync.Sync, stagedsync.MiningState) {
@ -1099,8 +1113,9 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig)
var batchSize datasize.ByteSize
must(batchSize.UnmarshalText([]byte(batchSizeStr)))
br := getBlockReader(chainConfig)
blockDownloaderWindow := 65536
sentryControlServer, err := sentry.NewControlServer(db, "", chainConfig, genesisBlock.Hash(), engine, 1, nil, blockDownloaderWindow, getBlockReader(chainConfig))
sentryControlServer, err := sentry.NewControlServer(db, "", chainConfig, genesisBlock.Hash(), engine, 1, nil, blockDownloaderWindow, br)
if err != nil {
panic(err)
}
@ -1112,10 +1127,19 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig)
if miningConfig != nil {
cfg.Miner = *miningConfig
}
cfg.Snapshot = allSnapshots(chainConfig).Cfg()
if cfg.Snapshot.Enabled {
snDir, err := dir.OpenRw(filepath.Join(datadir, "snapshots"))
if err != nil {
panic(err)
}
cfg.SnapshotDir = snDir
}
sync, err := stages2.NewStagedSync(context.Background(), logger, db, p2p.Config{}, cfg,
chainConfig.TerminalTotalDifficulty, sentryControlServer, tmpdir,
nil, nil, nil, nil, nil,
allSnapshots(chainConfig),
)
if err != nil {
panic(err)
@ -1127,7 +1151,7 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig)
stagedsync.StageMiningCreateBlockCfg(db, miner, *chainConfig, engine, nil, nil, nil, tmpdir),
stagedsync.StageMiningExecCfg(db, miner, events, *chainConfig, engine, &vm.Config{}, tmpdir),
stagedsync.StageHashStateCfg(db, tmpdir),
stagedsync.StageTrieCfg(db, false, true, tmpdir, getBlockReader(chainConfig)),
stagedsync.StageTrieCfg(db, false, true, tmpdir, br),
stagedsync.StageMiningFinishCfg(db, *chainConfig, engine, miner, ctx.Done()),
),
stagedsync.MiningUnwindOrder,

View File

@ -115,7 +115,7 @@ func RootCommand() (*cobra.Command, *httpcfg.HttpCfg) {
if cfg.Chaindata == "" {
cfg.Chaindata = filepath.Join(cfg.DataDir, "chaindata")
}
cfg.Snapshot = ethconfig.NewSnapshotCfg(cfg.Snapshot.Enabled, cfg.Snapshot.RetireEnabled)
cfg.Snapshot = ethconfig.NewSnapshotCfg(cfg.Snapshot.Enabled, cfg.Snapshot.KeepBlocks)
}
if cfg.TxPoolApiAddr == "" {
cfg.TxPoolApiAddr = cfg.PrivateApiAddr

View File

@ -130,7 +130,7 @@ func Erigon2(genesis *core.Genesis, chainConfig *params.ChainConfig, logger log.
if err != nil {
return err
}
allSnapshots := snapshotsync.NewRoSnapshots(ethconfig.NewSnapshotCfg(true, false), path.Join(datadir, "snapshots"))
allSnapshots := snapshotsync.NewRoSnapshots(ethconfig.NewSnapshotCfg(true, true), path.Join(datadir, "snapshots"))
defer allSnapshots.Close()
blockReader = snapshotsync.NewBlockReaderWithSnapshots(allSnapshots)
} else {

View File

@ -619,9 +619,9 @@ var (
Name: "experimental.snapshot",
Usage: "Enabling experimental snapshot sync",
}
SnapshotRetireFlag = cli.BoolFlag{
Name: ethconfig.FlagSnapshotRetire,
Usage: "Delete(!) old blocks from DB, by moving them to snapshots",
SnapshotKeepBlocksFlag = cli.BoolFlag{
Name: ethconfig.FlagSnapshotKeepBlocks,
Usage: "Keep ancient blocks in db (useful for debug)",
}
TorrentVerbosityFlag = cli.StringFlag{
Name: "torrent.verbosity",
@ -1333,9 +1333,7 @@ func CheckExclusive(ctx *cli.Context, args ...interface{}) {
// SetEthConfig applies eth-related command line flags to the config.
func SetEthConfig(ctx *cli.Context, nodeConfig *node.Config, cfg *ethconfig.Config) {
if ctx.GlobalBool(SnapshotSyncFlag.Name) {
cfg.Snapshot.Enabled = true
}
cfg.Snapshot.Enabled = ctx.GlobalBool(SnapshotSyncFlag.Name)
if cfg.Snapshot.Enabled {
snDir, err := dir.OpenRw(filepath.Join(nodeConfig.DataDir, "snapshots"))
if err != nil {
@ -1343,9 +1341,7 @@ func SetEthConfig(ctx *cli.Context, nodeConfig *node.Config, cfg *ethconfig.Conf
}
cfg.SnapshotDir = snDir
}
if ctx.GlobalBool(SnapshotRetireFlag.Name) {
cfg.Snapshot.RetireEnabled = true
}
cfg.Snapshot.KeepBlocks = ctx.GlobalBool(SnapshotKeepBlocksFlag.Name)
torrentVerbosity := lg.Warning
if ctx.GlobalIsSet(TorrentVerbosityFlag.Name) {
torrentVerbosity = torrentcfg.String2LogLevel[ctx.GlobalString(TorrentVerbosityFlag.Name)]

View File

@ -1042,43 +1042,69 @@ func WriteBlock(db kv.RwTx, block *types.Block) error {
return nil
}
// DeleteAncientBlocks - delete old block after moving it to snapshots. [from, to)
func DeleteAncientBlocks(db kv.RwTx, blockFrom, blockTo uint64) error {
//doesn't delete Receipts - because Receipts are not in snapshots yet
func min(a, b uint64) uint64 {
if a < b {
return a
}
return b
}
// DeleteAncientBlocks - delete old block after moving it to snapshots. [from, to)
// doesn't delete reciepts
func DeleteAncientBlocks(db kv.RwTx, blockTo uint64, blocksDeleteLimit int) error {
c, err := db.Cursor(kv.Headers)
if err != nil {
return err
}
defer c.Close()
var stopAtBlock uint64
{
k, _, err := c.First()
if err != nil {
return err
}
firstBlock := binary.BigEndian.Uint64(k)
stopAtBlock = min(blockTo, firstBlock+uint64(blocksDeleteLimit))
}
for k, _, err := c.First(); k != nil; k, _, err = c.Next() {
if err != nil {
return err
}
n := binary.BigEndian.Uint64(k)
if n >= stopAtBlock {
break
}
for n := blockFrom; n < blockTo; n++ {
canonicalHash, err := ReadCanonicalHash(db, n)
if err != nil {
return err
}
if err := db.ForPrefix(kv.Headers, dbutils.EncodeBlockNumber(n), func(k, v []byte) error {
isCanonical := bytes.Equal(k[8:], canonicalHash[:])
if err := db.Delete(kv.Headers, k, nil); err != nil {
isCanonical := bytes.Equal(k[8:], canonicalHash[:])
b, err := ReadBodyForStorageByKey(db, k)
if err != nil {
return err
}
txIDBytes := make([]byte, 8)
for txID := b.BaseTxId; txID < b.BaseTxId+uint64(b.TxAmount); txID++ {
binary.BigEndian.PutUint64(txIDBytes, txID)
bucket := kv.EthTx
if !isCanonical {
bucket = kv.NonCanonicalTxs
}
if err := db.Delete(bucket, txIDBytes, nil); err != nil {
return err
}
b, err := ReadBodyForStorageByKey(db, k)
if err != nil {
return err
}
txIDBytes := make([]byte, 8)
for txID := b.BaseTxId; txID < b.BaseTxId+uint64(b.TxAmount); txID++ {
binary.BigEndian.PutUint64(txIDBytes, txID)
bucket := kv.EthTx
if !isCanonical {
bucket = kv.NonCanonicalTxs
}
if err := db.Delete(bucket, txIDBytes, nil); err != nil {
return err
}
}
if err := db.Delete(kv.BlockBody, k, nil); err != nil {
return err
}
if err := db.Delete(kv.Senders, k, nil); err != nil {
return err
}
return nil
}); err != nil {
}
if err := db.Delete(kv.Headers, k, nil); err != nil {
return err
}
if err := db.Delete(kv.BlockBody, k, nil); err != nil {
return err
}
if err := db.Delete(kv.Senders, k, nil); err != nil {
return err
}
}

View File

@ -311,6 +311,7 @@ func New(stack *node.Node, config *ethconfig.Config, txpoolCfg txpool2.Config, l
}
var blockReader interfaces.FullBlockReader
var allSnapshots *snapshotsync.RoSnapshots
if config.Snapshot.Enabled {
snConfig := snapshothashes.KnownConfig(chainConfig.ChainName)
snConfig.ExpectBlocks, err = RestoreExpectedExternalSnapshot(chainKv, snConfig)
@ -318,7 +319,7 @@ func New(stack *node.Node, config *ethconfig.Config, txpoolCfg txpool2.Config, l
return nil, err
}
allSnapshots := snapshotsync.NewRoSnapshots(config.Snapshot, config.SnapshotDir.Path)
allSnapshots = snapshotsync.NewRoSnapshots(config.Snapshot, config.SnapshotDir.Path)
allSnapshots.AsyncOpenAll(ctx)
blockReader = snapshotsync.NewBlockReaderWithSnapshots(allSnapshots)
@ -502,7 +503,7 @@ func New(stack *node.Node, config *ethconfig.Config, txpoolCfg txpool2.Config, l
stack.Config().P2P, *config, chainConfig.TerminalTotalDifficulty,
backend.sentryControlServer, tmpdir, backend.notifications.Accumulator,
backend.newPayloadCh, backend.forkChoiceCh, &backend.waitingForBeaconChain,
backend.downloaderClient)
backend.downloaderClient, allSnapshots)
if err != nil {
return nil, err
}

View File

@ -123,8 +123,8 @@ func init() {
//go:generate gencodec -type Config -formats toml -out gen_config.go
type Snapshot struct {
Enabled bool
RetireEnabled bool
Enabled bool
KeepBlocks bool
}
func (s Snapshot) String() string {
@ -132,19 +132,19 @@ func (s Snapshot) String() string {
if s.Enabled {
out = append(out, "--"+FlagSnapshot+"=true")
}
if s.RetireEnabled {
out = append(out, "--"+FlagSnapshotRetire+"=true")
if s.KeepBlocks {
out = append(out, "--"+FlagSnapshotKeepBlocks+"=true")
}
return strings.Join(out, " ")
}
var (
FlagSnapshot = "experimental.snapshot"
FlagSnapshotRetire = "experimental.snapshot.retire"
FlagSnapshot = "experimental.snapshot"
FlagSnapshotKeepBlocks = "experimental.snapshot.keepblocks"
)
func NewSnapshotCfg(enabled, retireEnabled bool) Snapshot {
return Snapshot{Enabled: enabled, RetireEnabled: retireEnabled}
func NewSnapshotCfg(enabled, keepBlocks bool) Snapshot {
return Snapshot{Enabled: enabled, KeepBlocks: keepBlocks}
}
// Config contains configuration options for ETH protocol.

View File

@ -1082,16 +1082,20 @@ func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.R
return err
}
sn, ok := cfg.snapshots.Blocks(cfg.snapshots.BlocksAvailable())
// ResetSequence - allow set arbitrary value to sequence (for example to decrement it to exact value)
ok, err := cfg.snapshots.ViewTxs(cfg.snapshots.BlocksAvailable(), func(sn *snapshotsync.TxnSegment) error {
lastTxnID := sn.IdxTxnHash.BaseDataID() + uint64(sn.Seg.Count())
if err := rawdb.ResetSequence(tx, kv.EthTx, lastTxnID+1); err != nil {
return err
}
return nil
})
if err != nil {
return err
}
if !ok {
return fmt.Errorf("snapshot not found for block: %d", cfg.snapshots.BlocksAvailable())
}
// ResetSequence - allow set arbitrary value to sequence (for example to decrement it to exact value)
lastTxnID := sn.TxnHashIdx.BaseDataID() + uint64(sn.Transactions.Count())
if err := rawdb.ResetSequence(tx, kv.EthTx, lastTxnID+1); err != nil {
return err
}
}
// Add last headers from snapshots to HeaderDownloader (as persistent links)

View File

@ -38,11 +38,11 @@ type SendersCfg struct {
tmpdir string
prune prune.Mode
chainConfig *params.ChainConfig
snapshots *snapshotsync.RoSnapshots
blockRetire *snapshotsync.BlockRetire
snapshotHashesCfg *snapshothashes.Config
}
func StageSendersCfg(db kv.RwDB, chainCfg *params.ChainConfig, tmpdir string, prune prune.Mode, snapshots *snapshotsync.RoSnapshots) SendersCfg {
func StageSendersCfg(db kv.RwDB, chainCfg *params.ChainConfig, tmpdir string, prune prune.Mode, br *snapshotsync.BlockRetire) SendersCfg {
const sendersBatchSize = 10000
const sendersBlockSize = 4096
@ -56,7 +56,7 @@ func StageSendersCfg(db kv.RwDB, chainCfg *params.ChainConfig, tmpdir string, pr
tmpdir: tmpdir,
chainConfig: chainCfg,
prune: prune,
snapshots: snapshots,
blockRetire: br,
snapshotHashesCfg: snapshothashes.KnownConfig(chainCfg.ChainName),
}
}
@ -103,8 +103,8 @@ func SpawnRecoverSendersStage(cfg SendersCfg, s *StageState, u Unwinder, tx kv.R
defer canonicalC.Close()
startFrom := s.BlockNumber + 1
if cfg.snapshots != nil && startFrom < cfg.snapshots.BlocksAvailable() {
startFrom = cfg.snapshots.BlocksAvailable()
if cfg.blockRetire.Snapshots() != nil && startFrom < cfg.blockRetire.Snapshots().BlocksAvailable() {
startFrom = cfg.blockRetire.Snapshots().BlocksAvailable()
}
for k, v, err := canonicalC.Seek(dbutils.EncodeBlockNumber(startFrom)); k != nil; k, v, err = canonicalC.Next() {
@ -373,17 +373,16 @@ func PruneSendersStage(s *PruneState, tx kv.RwTx, cfg SendersCfg, ctx context.Co
defer tx.Rollback()
}
if cfg.snapshots != nil && cfg.snapshots.Cfg().RetireEnabled {
if cfg.blockRetire.Snapshots() != nil && cfg.blockRetire.Snapshots().Cfg().Enabled {
if err := retireBlocks(s, tx, cfg, ctx); err != nil {
return fmt.Errorf("retireBlocks: %w", err)
}
}
if cfg.prune.TxIndex.Enabled() {
} else if cfg.prune.TxIndex.Enabled() {
if err = PruneTable(tx, kv.Senders, s.LogPrefix(), to, logEvery, ctx); err != nil {
return err
}
}
if !useExternalTx {
if err = tx.Commit(); err != nil {
return err
@ -393,43 +392,32 @@ func PruneSendersStage(s *PruneState, tx kv.RwTx, cfg SendersCfg, ctx context.Co
}
func retireBlocks(s *PruneState, tx kv.RwTx, cfg SendersCfg, ctx context.Context) (err error) {
if err := cfg.snapshots.EnsureExpectedBlocksAreAvailable(cfg.snapshotHashesCfg); err != nil {
return err
}
blockFrom := cfg.snapshots.BlocksAvailable() + 1
blockTo := s.ForwardProgress - params.FullImmutabilityThreshold
if blockTo-blockFrom < 1000 {
if cfg.blockRetire.Working() {
return nil
}
if res := cfg.blockRetire.Result(); res != nil {
if res.Err != nil {
return fmt.Errorf("[%s] retire blocks last error: %w", s.LogPrefix(), res.Err)
}
}
if !cfg.blockRetire.Snapshots().Cfg().KeepBlocks {
canDeleteTo := cfg.blockRetire.CanDeleteTo(s.ForwardProgress)
if err := rawdb.DeleteAncientBlocks(tx, canDeleteTo, 1_000); err != nil {
return nil
}
}
// TODO: remove this check for the release
if err := cfg.blockRetire.Snapshots().EnsureExpectedBlocksAreAvailable(cfg.snapshotHashesCfg); err != nil {
return err
}
blockFrom, blockTo, ok := cfg.blockRetire.CanRetire(s.ForwardProgress)
if !ok {
return nil
}
//TODO: avoid too large deletes
chainID, _ := uint256.FromBig(cfg.chainConfig.ChainID)
wg := sync.WaitGroup{}
wg.Add(1)
go func() { // move to own goroutine, because in this goroutine already living RwTx
// in future we will do it in background
if err := snapshotsync.RetireBlocks(ctx, blockFrom, blockTo, *chainID, cfg.tmpdir, cfg.snapshots, cfg.db, 1, log.LvlDebug); err != nil {
panic(err)
//return err
}
if err := cfg.snapshots.ReopenSegments(); err != nil {
panic(err)
//return err
}
if err := cfg.snapshots.ReopenIndices(); err != nil {
panic(err)
cfg.blockRetire.RetireBlocksInBackground(ctx, blockFrom, blockTo, *chainID, log.LvlInfo)
//return err
}
// RoSnapshots must be atomic? Or we can create new instance?
// seed new 500K files
//if err := rawdb.DeleteAncientBlocks(tx, blockFrom, blockTo); err != nil {
// return nil
//}
defer wg.Done()
}()
wg.Wait()
return nil
}

View File

@ -13,6 +13,7 @@ import (
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/erigon/ethdb/prune"
"github.com/ledgerwatch/erigon/params"
"github.com/ledgerwatch/erigon/turbo/snapshotsync"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@ -108,7 +109,7 @@ func TestSenders(t *testing.T) {
require.NoError(stages.SaveStageProgress(tx, stages.Bodies, 3))
cfg := StageSendersCfg(db, params.TestChainConfig, "", prune.Mode{}, nil)
cfg := StageSendersCfg(db, params.TestChainConfig, "", prune.Mode{}, snapshotsync.NewBlockRetire(1, "", nil, db))
err := SpawnRecoverSendersStage(cfg, &StageState{ID: stages.Senders}, nil, tx, 3, ctx)
assert.NoError(t, err)

View File

@ -157,10 +157,14 @@ func doRetireCommand(cliCtx *cli.Context) error {
snapshots := snapshotsync.NewRoSnapshots(cfg, snapshotDir)
snapshots.ReopenSegments()
br := snapshotsync.NewBlockRetire(runtime.NumCPU()/2, tmpDir, snapshots, chainDB)
for i := from; i < to; i += every {
if err := snapshotsync.RetireBlocks(ctx, i, i+every, *chainID, tmpDir, snapshots, chainDB, runtime.NumCPU()/2, log.LvlInfo); err != nil {
panic(err)
//return err
br.RetireBlocksInBackground(ctx, i, i+every, *chainID, log.LvlInfo)
br.Wait()
res := br.Result()
if res.Err != nil {
panic(res.Err)
}
}
return nil

View File

@ -70,7 +70,7 @@ var DefaultFlags = []cli.Flag{
utils.TraceMaxtracesFlag,
utils.SnapshotSyncFlag,
utils.SnapshotRetireFlag,
utils.SnapshotKeepBlocksFlag,
utils.DbPageSizeFlag,
utils.TorrentPortFlag,
utils.TorrentUploadRateFlag,

View File

@ -5,7 +5,6 @@ import (
"context"
"encoding/binary"
"fmt"
"sync"
"github.com/ledgerwatch/erigon-lib/gointerfaces"
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
@ -183,25 +182,33 @@ func (back *RemoteBlockReader) BodyRlp(ctx context.Context, tx kv.Getter, hash c
// BlockReaderWithSnapshots can read blocks from db and snapshots
type BlockReaderWithSnapshots struct {
sn *RoSnapshots
lock sync.RWMutex
sn *RoSnapshots
}
func NewBlockReaderWithSnapshots(snapshots *RoSnapshots) *BlockReaderWithSnapshots {
return &BlockReaderWithSnapshots{sn: snapshots}
}
func (back *BlockReaderWithSnapshots) HeaderByNumber(ctx context.Context, tx kv.Getter, blockHeight uint64) (*types.Header, error) {
sn, ok := back.sn.Blocks(blockHeight)
if !ok {
h := rawdb.ReadHeaderByNumber(tx, blockHeight)
func (back *BlockReaderWithSnapshots) HeaderByNumber(ctx context.Context, tx kv.Getter, blockHeight uint64) (h *types.Header, err error) {
ok, err := back.sn.ViewHeaders(blockHeight, func(segment *HeaderSegment) error {
h, err = back.headerFromSnapshot(blockHeight, segment, nil)
if err != nil {
return err
}
return nil
})
if err != nil {
return nil, err
}
if ok {
return h, nil
}
return back.headerFromSnapshot(blockHeight, sn, nil)
return rawdb.ReadHeaderByNumber(tx, blockHeight), nil
}
// HeaderByHash - will search header in all snapshots starting from recent
func (back *BlockReaderWithSnapshots) HeaderByHash(ctx context.Context, tx kv.Getter, hash common.Hash) (*types.Header, error) {
h, err := rawdb.ReadHeaderByHash(tx, hash)
func (back *BlockReaderWithSnapshots) HeaderByHash(ctx context.Context, tx kv.Getter, hash common.Hash) (h *types.Header, err error) {
h, err = rawdb.ReadHeaderByHash(tx, hash)
if err != nil {
return nil, err
}
@ -210,65 +217,109 @@ func (back *BlockReaderWithSnapshots) HeaderByHash(ctx context.Context, tx kv.Ge
}
buf := make([]byte, 128)
for i := len(back.sn.blocks) - 1; i >= 0; i-- {
h, err := back.headerFromSnapshotByHash(hash, back.sn.blocks[i], buf)
if err := back.sn.Headers.View(func(segments []*HeaderSegment) error {
for i := len(segments) - 1; i >= 0; i-- {
h, err = back.headerFromSnapshotByHash(hash, segments[i], buf)
if err != nil {
return err
}
}
return nil
}); err != nil {
return nil, err
}
return h, nil
}
func (back *BlockReaderWithSnapshots) CanonicalHash(ctx context.Context, tx kv.Getter, blockHeight uint64) (h common.Hash, err error) {
ok, err := back.sn.ViewHeaders(blockHeight, func(segment *HeaderSegment) error {
header, err := back.headerFromSnapshot(blockHeight, segment, nil)
if err != nil {
return nil, nil
return err
}
if h != nil {
return h, nil
if header == nil {
return nil
}
}
return nil, nil
}
func (back *BlockReaderWithSnapshots) CanonicalHash(ctx context.Context, tx kv.Getter, blockHeight uint64) (common.Hash, error) {
sn, ok := back.sn.Blocks(blockHeight)
if !ok {
return rawdb.ReadCanonicalHash(tx, blockHeight)
}
h, err := back.headerFromSnapshot(blockHeight, sn, nil)
h = header.Hash()
return nil
})
if err != nil {
return common.Hash{}, err
return h, err
}
if h == nil {
return common.Hash{}, err
}
return h.Hash(), nil
}
func (back *BlockReaderWithSnapshots) Header(ctx context.Context, tx kv.Getter, hash common.Hash, blockHeight uint64) (*types.Header, error) {
sn, ok := back.sn.Blocks(blockHeight)
if !ok {
h := rawdb.ReadHeader(tx, hash, blockHeight)
if ok {
return h, nil
}
return back.headerFromSnapshot(blockHeight, sn, nil)
return rawdb.ReadCanonicalHash(tx, blockHeight)
}
func (back *BlockReaderWithSnapshots) ReadHeaderByNumber(ctx context.Context, tx kv.Getter, hash common.Hash, blockHeight uint64) (*types.Header, error) {
sn, ok := back.sn.Blocks(blockHeight)
if !ok {
h := rawdb.ReadHeader(tx, hash, blockHeight)
func (back *BlockReaderWithSnapshots) Header(ctx context.Context, tx kv.Getter, hash common.Hash, blockHeight uint64) (h *types.Header, err error) {
ok, err := back.sn.ViewHeaders(blockHeight, func(segment *HeaderSegment) error {
h, err = back.headerFromSnapshot(blockHeight, segment, nil)
if err != nil {
return err
}
return nil
})
if ok {
return h, nil
}
return back.headerFromSnapshot(blockHeight, sn, nil)
h = rawdb.ReadHeader(tx, hash, blockHeight)
return h, nil
}
func (back *BlockReaderWithSnapshots) ReadHeaderByNumber(ctx context.Context, tx kv.Getter, hash common.Hash, blockHeight uint64) (h *types.Header, err error) {
ok, err := back.sn.ViewHeaders(blockHeight, func(segment *HeaderSegment) error {
h, err = back.headerFromSnapshot(blockHeight, segment, nil)
if err != nil {
return err
}
return nil
})
if err != nil {
return nil, err
}
if !ok {
return h, nil
}
h = rawdb.ReadHeader(tx, hash, blockHeight)
return h, nil
}
func (back *BlockReaderWithSnapshots) BodyWithTransactions(ctx context.Context, tx kv.Getter, hash common.Hash, blockHeight uint64) (body *types.Body, err error) {
sn, ok := back.sn.Blocks(blockHeight)
if !ok {
body, err := rawdb.ReadBodyWithTransactions(tx, hash, blockHeight)
var baseTxnID uint64
var txsAmount uint32
ok, err := back.sn.ViewBodies(blockHeight, func(seg *BodySegment) error {
body, baseTxnID, txsAmount, err = back.bodyFromSnapshot(blockHeight, seg, nil)
if err != nil {
return err
}
return nil
})
if err != nil {
return nil, err
}
if ok {
ok, err = back.sn.ViewTxs(blockHeight, func(seg *TxnSegment) error {
txs, senders, err := back.txsFromSnapshot(baseTxnID, txsAmount, seg, nil)
if err != nil {
return err
}
body.Transactions = txs
body.SendersToTxs(senders)
return nil
})
if err != nil {
return nil, err
}
return body, nil
if ok {
return body, nil
}
}
body, _, _, _, err = back.bodyWithTransactionsFromSnapshot(blockHeight, sn, nil)
body, err = rawdb.ReadBodyWithTransactions(tx, hash, blockHeight)
if err != nil {
return nil, err
}
@ -288,99 +339,127 @@ func (back *BlockReaderWithSnapshots) BodyRlp(ctx context.Context, tx kv.Getter,
}
func (back *BlockReaderWithSnapshots) Body(ctx context.Context, tx kv.Getter, hash common.Hash, blockHeight uint64) (body *types.Body, err error) {
sn, ok := back.sn.Blocks(blockHeight)
if !ok {
body, _, _ := rawdb.ReadBody(tx, hash, blockHeight)
return body, nil
}
body, _, _, err = back.bodyFromSnapshot(blockHeight, sn, nil)
ok, err := back.sn.ViewBodies(blockHeight, func(seg *BodySegment) error {
body, _, _, err = back.bodyFromSnapshot(blockHeight, seg, nil)
if err != nil {
return err
}
return nil
})
if err != nil {
return nil, err
}
if ok {
return body, nil
}
body, _, _ = rawdb.ReadBody(tx, hash, blockHeight)
return body, nil
}
func (back *BlockReaderWithSnapshots) BlockWithSenders(ctx context.Context, tx kv.Getter, hash common.Hash, blockHeight uint64) (block *types.Block, senders []common.Address, err error) {
sn, ok := back.sn.Blocks(blockHeight)
if !ok {
canonicalHash, err := rawdb.ReadCanonicalHash(tx, blockHeight)
if err != nil {
return nil, nil, fmt.Errorf("requested non-canonical hash %x. canonical=%x", hash, canonicalHash)
var buf []byte
var h *types.Header
ok, err := back.sn.ViewHeaders(blockHeight, func(seg *HeaderSegment) error {
headerOffset := seg.idxHeaderHash.Lookup2(blockHeight - seg.idxHeaderHash.BaseDataID())
gg := seg.seg.MakeGetter()
gg.Reset(headerOffset)
buf, _ = gg.Next(buf[:0])
h = &types.Header{}
if err = rlp.DecodeBytes(buf[1:], h); err != nil {
return err
}
if canonicalHash == hash {
block, senders, err = rawdb.ReadBlockWithSenders(tx, hash, blockHeight)
if err != nil {
return nil, nil, err
}
return block, senders, nil
}
return rawdb.NonCanonicalBlockWithSenders(tx, hash, blockHeight)
return nil
})
if err != nil {
return
}
buf := make([]byte, 16)
back.lock.Lock()
defer back.lock.Unlock()
headerOffset := sn.HeaderHashIdx.Lookup2(blockHeight - sn.HeaderHashIdx.BaseDataID())
bodyOffset := sn.BodyNumberIdx.Lookup2(blockHeight - sn.BodyNumberIdx.BaseDataID())
gg := sn.Headers.MakeGetter()
gg.Reset(headerOffset)
buf, _ = gg.Next(buf[:0])
h := &types.Header{}
if err = rlp.DecodeBytes(buf[1:], h); err != nil {
return nil, nil, err
}
gg = sn.Bodies.MakeGetter()
gg.Reset(bodyOffset)
buf, _ = gg.Next(buf[:0])
b := &types.BodyForStorage{}
reader := bytes.NewReader(buf)
if err = rlp.Decode(reader, b); err != nil {
return nil, nil, err
}
if b.BaseTxId < sn.TxnHashIdx.BaseDataID() {
return nil, nil, fmt.Errorf(".idx file has wrong baseDataID? %d<%d, %s", b.BaseTxId, sn.TxnHashIdx.BaseDataID(), sn.Transactions.FilePath())
}
txs := make([]types.Transaction, b.TxAmount-2)
senders = make([]common.Address, b.TxAmount-2)
if b.TxAmount > 2 {
r := recsplit.NewIndexReader(sn.TxnIdsIdx)
binary.BigEndian.PutUint64(buf[:8], b.BaseTxId-sn.TxnIdsIdx.BaseDataID())
txnOffset := r.Lookup(buf[:8])
gg = sn.Transactions.MakeGetter()
gg.Reset(txnOffset)
stream := rlp.NewStream(reader, 0)
buf, _ = gg.Next(buf[:0]) //first system-tx
for i := uint32(0); i < b.TxAmount-2; i++ {
if ok {
var b *types.BodyForStorage
ok, err = back.sn.ViewBodies(blockHeight, func(seg *BodySegment) error {
bodyOffset := seg.idxBodyNumber.Lookup2(blockHeight - seg.idxBodyNumber.BaseDataID())
gg := seg.seg.MakeGetter()
gg.Reset(bodyOffset)
buf, _ = gg.Next(buf[:0])
senders[i].SetBytes(buf[1 : 1+20])
txRlp := buf[1+20:]
reader.Reset(txRlp)
stream.Reset(reader, 0)
txs[i], err = types.DecodeTransaction(stream)
b = &types.BodyForStorage{}
reader := bytes.NewReader(buf)
if err = rlp.Decode(reader, b); err != nil {
return err
}
return nil
})
if err != nil {
return
}
if ok {
if b.TxAmount <= 2 {
block = types.NewBlockFromStorage(hash, h, nil, b.Uncles)
if len(senders) != block.Transactions().Len() {
return block, senders, nil // no senders is fine - will recover them on the fly
}
block.SendersToTxs(senders)
return block, senders, nil
}
reader := bytes.NewReader(nil)
txs := make([]types.Transaction, b.TxAmount-2)
senders = make([]common.Address, b.TxAmount-2)
ok, err = back.sn.ViewTxs(blockHeight, func(seg *TxnSegment) error {
if b.BaseTxId < seg.IdxTxnHash.BaseDataID() {
return fmt.Errorf(".idx file has wrong baseDataID? %d<%d, %s", b.BaseTxId, seg.IdxTxnHash.BaseDataID(), seg.Seg.FilePath())
}
r := recsplit.NewIndexReader(seg.IdxTxnId)
binary.BigEndian.PutUint64(buf[:8], b.BaseTxId-seg.IdxTxnId.BaseDataID())
txnOffset := r.Lookup(buf[:8])
gg := seg.Seg.MakeGetter()
gg.Reset(txnOffset)
stream := rlp.NewStream(reader, 0)
buf, _ = gg.Next(buf[:0]) //first system-tx
for i := uint32(0); i < b.TxAmount-2; i++ {
buf, _ = gg.Next(buf[:0])
if len(buf) < 1+20 {
return fmt.Errorf("segment %s has too short record: len(buf)=%d < 21", seg.Seg.FilePath(), len(buf))
}
senders[i].SetBytes(buf[1 : 1+20])
txRlp := buf[1+20:]
reader.Reset(txRlp)
stream.Reset(reader, 0)
txs[i], err = types.DecodeTransaction(stream)
if err != nil {
return err
}
txs[i].SetSender(senders[i])
}
return nil
})
if err != nil {
return nil, nil, err
}
if ok {
block = types.NewBlockFromStorage(hash, h, txs, b.Uncles)
if len(senders) != block.Transactions().Len() {
return block, senders, nil // no senders is fine - will recover them on the fly
}
block.SendersToTxs(senders)
return block, senders, nil
}
}
}
block = types.NewBlockFromStorage(hash, h, txs, b.Uncles)
if len(senders) != block.Transactions().Len() {
return block, senders, nil // no senders is fine - will recover them on the fly
canonicalHash, err := rawdb.ReadCanonicalHash(tx, blockHeight)
if err != nil {
return nil, nil, fmt.Errorf("requested non-canonical hash %x. canonical=%x", hash, canonicalHash)
}
block.SendersToTxs(senders)
return block, senders, nil
if canonicalHash == hash {
block, senders, err = rawdb.ReadBlockWithSenders(tx, hash, blockHeight)
if err != nil {
return nil, nil, err
}
return block, senders, nil
}
return rawdb.NonCanonicalBlockWithSenders(tx, hash, blockHeight)
}
func (back *BlockReaderWithSnapshots) headerFromSnapshot(blockHeight uint64, sn *BlocksSnapshot, buf []byte) (*types.Header, error) {
headerOffset := sn.HeaderHashIdx.Lookup2(blockHeight - sn.HeaderHashIdx.BaseDataID())
gg := sn.Headers.MakeGetter()
func (back *BlockReaderWithSnapshots) headerFromSnapshot(blockHeight uint64, sn *HeaderSegment, buf []byte) (*types.Header, error) {
headerOffset := sn.idxHeaderHash.Lookup2(blockHeight - sn.idxHeaderHash.BaseDataID())
gg := sn.seg.MakeGetter()
gg.Reset(headerOffset)
buf, _ = gg.Next(buf[:0])
h := &types.Header{}
@ -394,11 +473,11 @@ func (back *BlockReaderWithSnapshots) headerFromSnapshot(blockHeight uint64, sn
// because HeaderByHash method will search header in all snapshots - and may request header which doesn't exists
// but because our indices are based on PerfectHashMap, no way to know is given key exists or not, only way -
// to make sure is to fetch it and compare hash
func (back *BlockReaderWithSnapshots) headerFromSnapshotByHash(hash common.Hash, sn *BlocksSnapshot, buf []byte) (*types.Header, error) {
reader := recsplit.NewIndexReader(sn.HeaderHashIdx)
func (back *BlockReaderWithSnapshots) headerFromSnapshotByHash(hash common.Hash, sn *HeaderSegment, buf []byte) (*types.Header, error) {
reader := recsplit.NewIndexReader(sn.idxHeaderHash)
localID := reader.Lookup(hash[:])
headerOffset := sn.HeaderHashIdx.Lookup2(localID)
gg := sn.Headers.MakeGetter()
headerOffset := sn.idxHeaderHash.Lookup2(localID)
gg := sn.seg.MakeGetter()
gg.Reset(headerOffset)
buf, _ = gg.Next(buf[:0])
if hash[0] != buf[0] {
@ -415,10 +494,10 @@ func (back *BlockReaderWithSnapshots) headerFromSnapshotByHash(hash common.Hash,
return h, nil
}
func (back *BlockReaderWithSnapshots) bodyFromSnapshot(blockHeight uint64, sn *BlocksSnapshot, buf []byte) (*types.Body, uint64, uint32, error) {
bodyOffset := sn.BodyNumberIdx.Lookup2(blockHeight - sn.BodyNumberIdx.BaseDataID())
func (back *BlockReaderWithSnapshots) bodyFromSnapshot(blockHeight uint64, sn *BodySegment, buf []byte) (*types.Body, uint64, uint32, error) {
bodyOffset := sn.idxBodyNumber.Lookup2(blockHeight - sn.idxBodyNumber.BaseDataID())
gg := sn.Bodies.MakeGetter()
gg := sn.seg.MakeGetter()
gg.Reset(bodyOffset)
buf, _ = gg.Next(buf[:0])
b := &types.BodyForStorage{}
@ -427,8 +506,8 @@ func (back *BlockReaderWithSnapshots) bodyFromSnapshot(blockHeight uint64, sn *B
return nil, 0, 0, err
}
if b.BaseTxId < sn.TxnHashIdx.BaseDataID() {
return nil, 0, 0, fmt.Errorf(".idx file has wrong baseDataID? %d<%d, %s", b.BaseTxId, sn.TxnHashIdx.BaseDataID(), sn.Transactions.FilePath())
if b.BaseTxId < sn.idxBodyNumber.BaseDataID() {
return nil, 0, 0, fmt.Errorf(".idx file has wrong baseDataID? %d<%d, %s", b.BaseTxId, sn.idxBodyNumber.BaseDataID(), sn.seg.FilePath())
}
body := new(types.Body)
@ -436,19 +515,15 @@ func (back *BlockReaderWithSnapshots) bodyFromSnapshot(blockHeight uint64, sn *B
return body, b.BaseTxId + 1, b.TxAmount - 2, nil // empty txs in the beginning and end of block
}
func (back *BlockReaderWithSnapshots) bodyWithTransactionsFromSnapshot(blockHeight uint64, sn *BlocksSnapshot, buf []byte) (*types.Body, []common.Address, uint64, uint32, error) {
body, baseTxnID, txsAmount, err := back.bodyFromSnapshot(blockHeight, sn, buf)
if err != nil {
return nil, nil, 0, 0, err
}
func (back *BlockReaderWithSnapshots) txsFromSnapshot(baseTxnID uint64, txsAmount uint32, txsSeg *TxnSegment, buf []byte) ([]types.Transaction, []common.Address, error) {
txs := make([]types.Transaction, txsAmount)
senders := make([]common.Address, txsAmount)
reader := bytes.NewReader(buf)
if txsAmount > 0 {
r := recsplit.NewIndexReader(sn.TxnIdsIdx)
binary.BigEndian.PutUint64(buf[:8], baseTxnID-sn.TxnIdsIdx.BaseDataID())
r := recsplit.NewIndexReader(txsSeg.IdxTxnId)
binary.BigEndian.PutUint64(buf[:8], baseTxnID-txsSeg.IdxTxnId.BaseDataID())
txnOffset := r.Lookup(buf[:8])
gg := sn.Transactions.MakeGetter()
gg := txsSeg.Seg.MakeGetter()
gg.Reset(txnOffset)
stream := rlp.NewStream(reader, 0)
for i := uint32(0); i < txsAmount; i++ {
@ -460,30 +535,29 @@ func (back *BlockReaderWithSnapshots) bodyWithTransactionsFromSnapshot(blockHeig
var err error
txs[i], err = types.DecodeTransaction(stream)
if err != nil {
return nil, nil, 0, 0, err
return nil, nil, err
}
}
}
return body, senders, baseTxnID, txsAmount, nil
return txs, senders, nil
}
func (back *BlockReaderWithSnapshots) txnByHash(txnHash common.Hash, buf []byte) (txn types.Transaction, blockNum, txnID uint64, err error) {
for i := len(back.sn.blocks) - 1; i >= 0; i-- {
sn := back.sn.blocks[i]
func (back *BlockReaderWithSnapshots) txnByHash(txnHash common.Hash, segments []*TxnSegment, buf []byte) (txn types.Transaction, blockNum, txnID uint64, err error) {
for i := len(segments) - 1; i >= 0; i-- {
sn := segments[i]
reader := recsplit.NewIndexReader(sn.TxnHashIdx)
reader := recsplit.NewIndexReader(sn.IdxTxnHash)
offset := reader.Lookup(txnHash[:])
gg := sn.Transactions.MakeGetter()
gg := sn.Seg.MakeGetter()
gg.Reset(offset)
//fmt.Printf("try: %d, %d, %d, %d\n", i, sn.From, localID, blockNum)
buf, _ = gg.Next(buf[:0])
// first byte txnHash check - reducing false-positives 256 times. Allows don't store and don't calculate full hash of entity - when checking many snapshots.
if txnHash[0] != buf[0] {
continue
}
reader2 := recsplit.NewIndexReader(sn.TxnHash2BlockNumIdx)
reader2 := recsplit.NewIndexReader(sn.IdxTxnHash2BlockNum)
blockNum = reader2.Lookup(txnHash[:])
sender := buf[1 : 1+20]
txn, err = types.DecodeTransaction(rlp.NewStream(bytes.NewReader(buf[1+20:]), uint64(len(buf))))
@ -493,10 +567,8 @@ func (back *BlockReaderWithSnapshots) txnByHash(txnHash common.Hash, buf []byte)
txn.SetSender(common.BytesToAddress(sender))
// final txnHash check - completely avoid false-positives
if txn.Hash() == txnHash {
//fmt.Printf("try_succeed: %d, %d, %d, %d\n", i, sn.From, localID, blockNum)
return
}
//fmt.Printf("try_failed: %x, %x\n", txn.Hash(), txnHash)
}
return
}
@ -511,8 +583,18 @@ func (back *BlockReaderWithSnapshots) TxnLookup(ctx context.Context, tx kv.Gette
return *n, true, nil
}
txn, blockNum, _, err := back.txnByHash(txnHash, nil)
if err != nil {
var txn types.Transaction
var blockNum uint64
if err := back.sn.Txs.View(func(segments []*TxnSegment) error {
txn, blockNum, _, err = back.txnByHash(txnHash, segments, nil)
if err != nil {
return err
}
if txn == nil {
return nil
}
return nil
}); err != nil {
return 0, false, err
}
if txn == nil {

View File

@ -31,6 +31,7 @@ import (
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/eth/ethconfig"
"github.com/ledgerwatch/erigon/params"
"github.com/ledgerwatch/erigon/rlp"
"github.com/ledgerwatch/erigon/turbo/snapshotsync/snapshothashes"
"github.com/ledgerwatch/log/v3"
@ -109,13 +110,230 @@ func IdxFileName(from, to uint64, fType string) string { return FileName(from, t
func (s BlocksSnapshot) Has(block uint64) bool { return block >= s.From && block < s.To }
type HeaderSegment struct {
seg *compress.Decompressor // value: first_byte_of_header_hash + header_rlp
idxHeaderHash *recsplit.Index // header_hash -> headers_segment_offset
From, To uint64
}
func (sn *HeaderSegment) close() {
if sn.seg != nil {
sn.seg.Close()
sn.seg = nil
}
if sn.idxHeaderHash != nil {
sn.idxHeaderHash.Close()
sn.idxHeaderHash = nil
}
}
func (sn *HeaderSegment) reopen(dir string) (err error) {
sn.close()
fileName := SegmentFileName(sn.From, sn.To, Headers)
sn.seg, err = compress.NewDecompressor(path.Join(dir, fileName))
if err != nil {
return err
}
sn.idxHeaderHash, err = recsplit.OpenIndex(path.Join(dir, IdxFileName(sn.From, sn.To, Headers.String())))
if err != nil {
return err
}
return nil
}
type BodySegment struct {
seg *compress.Decompressor // value: rlp(types.BodyForStorage)
idxBodyNumber *recsplit.Index // block_num_u64 -> bodies_segment_offset
From, To uint64
}
func (sn *BodySegment) close() {
if sn.seg != nil {
sn.seg.Close()
sn.seg = nil
}
if sn.idxBodyNumber != nil {
sn.idxBodyNumber.Close()
sn.idxBodyNumber = nil
}
}
func (sn *BodySegment) reopen(dir string) (err error) {
sn.close()
fileName := SegmentFileName(sn.From, sn.To, Bodies)
sn.seg, err = compress.NewDecompressor(path.Join(dir, fileName))
if err != nil {
return err
}
sn.idxBodyNumber, err = recsplit.OpenIndex(path.Join(dir, IdxFileName(sn.From, sn.To, Bodies.String())))
if err != nil {
return err
}
return nil
}
type TxnSegment struct {
Seg *compress.Decompressor // value: first_byte_of_transaction_hash + transaction_rlp
IdxTxnHash *recsplit.Index // transaction_hash -> transactions_segment_offset
IdxTxnId *recsplit.Index // transaction_id -> transactions_segment_offset
IdxTxnHash2BlockNum *recsplit.Index // transaction_hash -> block_number
From, To uint64
}
func (sn *TxnSegment) close() {
if sn.Seg != nil {
sn.Seg.Close()
sn.Seg = nil
}
if sn.IdxTxnHash != nil {
sn.IdxTxnHash.Close()
sn.IdxTxnHash = nil
}
if sn.IdxTxnId != nil {
sn.IdxTxnId.Close()
sn.IdxTxnId = nil
}
if sn.IdxTxnHash2BlockNum != nil {
sn.IdxTxnHash2BlockNum.Close()
sn.IdxTxnHash2BlockNum = nil
}
}
func (sn *TxnSegment) reopen(dir string) (err error) {
sn.close()
fileName := SegmentFileName(sn.From, sn.To, Transactions)
sn.Seg, err = compress.NewDecompressor(path.Join(dir, fileName))
if err != nil {
return err
}
sn.IdxTxnHash, err = recsplit.OpenIndex(path.Join(dir, IdxFileName(sn.From, sn.To, Transactions.String())))
if err != nil {
return err
}
sn.IdxTxnId, err = recsplit.OpenIndex(path.Join(dir, IdxFileName(sn.From, sn.To, TransactionsId.String())))
if err != nil {
return err
}
sn.IdxTxnHash2BlockNum, err = recsplit.OpenIndex(path.Join(dir, IdxFileName(sn.From, sn.To, Transactions2Block.String())))
if err != nil {
return err
}
return nil
}
type headerSegments struct {
lock sync.RWMutex
segments []*HeaderSegment
}
func (s *headerSegments) closeLocked() {
for i := range s.segments {
s.segments[i].close()
}
}
func (s *headerSegments) reopen(dir string) error {
for i := range s.segments {
if err := s.segments[i].reopen(dir); err != nil {
return err
}
}
return nil
}
func (s *headerSegments) View(f func(segments []*HeaderSegment) error) error {
s.lock.RLock()
defer s.lock.RUnlock()
return f(s.segments)
}
func (s *headerSegments) ViewSegment(blockNum uint64, f func(sn *HeaderSegment) error) (found bool, err error) {
s.lock.RLock()
defer s.lock.RUnlock()
for _, seg := range s.segments {
if !(blockNum >= seg.From && blockNum < seg.To) {
continue
}
return true, f(seg)
}
return false, nil
}
type bodySegments struct {
lock sync.RWMutex
segments []*BodySegment
}
func (s *bodySegments) closeLocked() {
for i := range s.segments {
s.segments[i].close()
}
}
func (s *bodySegments) reopen(dir string) error {
for i := range s.segments {
if err := s.segments[i].reopen(dir); err != nil {
return err
}
}
return nil
}
func (s *bodySegments) View(f func([]*BodySegment) error) error {
s.lock.RLock()
defer s.lock.RUnlock()
return f(s.segments)
}
func (s *bodySegments) ViewSegment(blockNum uint64, f func(*BodySegment) error) (found bool, err error) {
s.lock.RLock()
defer s.lock.RUnlock()
for _, seg := range s.segments {
if !(blockNum >= seg.From && blockNum < seg.To) {
continue
}
return true, f(seg)
}
return false, nil
}
type txnSegments struct {
lock sync.RWMutex
segments []*TxnSegment
}
func (s *txnSegments) closeLocked() {
for i := range s.segments {
s.segments[i].close()
}
}
func (s *txnSegments) reopen(dir string) error {
for i := range s.segments {
if err := s.segments[i].reopen(dir); err != nil {
return err
}
}
return nil
}
func (s *txnSegments) View(f func([]*TxnSegment) error) error {
s.lock.RLock()
defer s.lock.RUnlock()
return f(s.segments)
}
func (s *txnSegments) ViewSegment(blockNum uint64, f func(*TxnSegment) error) (found bool, err error) {
s.lock.RLock()
defer s.lock.RUnlock()
for _, seg := range s.segments {
if !(blockNum >= seg.From && blockNum < seg.To) {
continue
}
return true, f(seg)
}
return false, nil
}
type RoSnapshots struct {
indicesReady atomic.Bool
segmentsReady atomic.Bool
blocks []*BlocksSnapshot
indicesReady atomic.Bool
segmentsReady atomic.Bool
Headers *headerSegments
Bodies *bodySegments
Txs *txnSegments
dir string
segmentsAvailable uint64
idxAvailable uint64
segmentsAvailable atomic.Uint64
idxAvailable atomic.Uint64
cfg ethconfig.Snapshot
}
@ -125,15 +343,15 @@ type RoSnapshots struct {
// - gaps are not allowed
// - segment have [from:to) semantic
func NewRoSnapshots(cfg ethconfig.Snapshot, snapshotDir string) *RoSnapshots {
return &RoSnapshots{dir: snapshotDir, cfg: cfg}
return &RoSnapshots{dir: snapshotDir, cfg: cfg, Headers: &headerSegments{}, Bodies: &bodySegments{}, Txs: &txnSegments{}}
}
func (s *RoSnapshots) Cfg() ethconfig.Snapshot { return s.cfg }
func (s *RoSnapshots) Dir() string { return s.dir }
func (s *RoSnapshots) SegmentsReady() bool { return s.segmentsReady.Load() }
func (s *RoSnapshots) BlocksAvailable() uint64 { return s.segmentsAvailable }
func (s *RoSnapshots) BlocksAvailable() uint64 { return s.segmentsAvailable.Load() }
func (s *RoSnapshots) IndicesReady() bool { return s.indicesReady.Load() }
func (s *RoSnapshots) IndicesAvailable() uint64 { return s.idxAvailable }
func (s *RoSnapshots) IndicesAvailable() uint64 { return s.idxAvailable.Load() }
func (s *RoSnapshots) EnsureExpectedBlocksAreAvailable(cfg *snapshothashes.Config) error {
if s.BlocksAvailable() < cfg.ExpectBlocks {
@ -168,70 +386,43 @@ func (s *RoSnapshots) IdxAvailability() (headers, bodies, txs uint64, err error)
}
func (s *RoSnapshots) ReopenIndices() error {
s.closeIndices()
return s.ReopenSomeIndices(AllSnapshotTypes...)
}
func (s *RoSnapshots) ReopenSomeIndices(types ...Type) (err error) {
for _, bs := range s.blocks {
for _, snapshotType := range types {
switch snapshotType {
case Headers:
if bs.HeaderHashIdx != nil {
bs.HeaderHashIdx.Close()
bs.HeaderHashIdx = nil
}
bs.HeaderHashIdx, err = recsplit.OpenIndex(path.Join(s.dir, IdxFileName(bs.From, bs.To, Headers.String())))
if err != nil {
return err
}
case Bodies:
if bs.BodyNumberIdx != nil {
bs.BodyNumberIdx.Close()
bs.BodyNumberIdx = nil
}
bs.BodyNumberIdx, err = recsplit.OpenIndex(path.Join(s.dir, IdxFileName(bs.From, bs.To, Bodies.String())))
if err != nil {
return err
}
case Transactions:
if bs.TxnHashIdx != nil {
bs.TxnHashIdx.Close()
bs.TxnHashIdx = nil
}
bs.TxnHashIdx, err = recsplit.OpenIndex(path.Join(s.dir, IdxFileName(bs.From, bs.To, Transactions.String())))
if err != nil {
return err
}
if bs.TxnIdsIdx != nil {
bs.TxnIdsIdx.Close()
bs.TxnIdsIdx = nil
}
bs.TxnIdsIdx, err = recsplit.OpenIndex(path.Join(s.dir, IdxFileName(bs.From, bs.To, TransactionsId.String())))
if err != nil {
return err
}
if bs.TxnHash2BlockNumIdx != nil {
bs.TxnHash2BlockNumIdx.Close()
bs.TxnHash2BlockNumIdx = nil
}
bs.TxnHash2BlockNumIdx, err = recsplit.OpenIndex(path.Join(s.dir, IdxFileName(bs.From, bs.To, Transactions2Block.String())))
if err != nil {
return err
}
default:
panic(fmt.Sprintf("unknown snapshot type: %s", snapshotType))
s.Headers.lock.Lock()
defer s.Headers.lock.Unlock()
s.Bodies.lock.Lock()
defer s.Bodies.lock.Unlock()
s.Txs.lock.Lock()
defer s.Txs.lock.Unlock()
for _, t := range types {
switch t {
case Headers:
if err := s.Headers.reopen(s.dir); err != nil {
return err
}
}
if bs.To > 0 {
s.idxAvailable = bs.To - 1
} else {
s.idxAvailable = 0
case Bodies:
if err := s.Bodies.reopen(s.dir); err != nil {
return err
}
case Transactions:
if err := s.Txs.reopen(s.dir); err != nil {
return err
}
default:
panic(fmt.Sprintf("unknown snapshot type: %s", t))
}
}
//TODO: make calculatable?
segments := s.Headers.segments
if len(segments) > 0 && segments[len(segments)-1].To > 0 {
s.idxAvailable.Store(segments[len(segments)-1].To - 1)
} else {
s.idxAvailable.Store(0)
}
s.indicesReady.Store(true)
return nil
}
@ -256,51 +447,59 @@ func (s *RoSnapshots) AsyncOpenAll(ctx context.Context) {
}
func (s *RoSnapshots) ReopenSegments() error {
s.closeSegements()
s.closeIndices()
s.blocks = nil
s.Headers.lock.Lock()
defer s.Headers.lock.Unlock()
s.Bodies.lock.Lock()
defer s.Bodies.lock.Unlock()
s.Txs.lock.Lock()
defer s.Txs.lock.Unlock()
s.closeSegmentsLocked()
files, err := segmentsOfType(s.dir, Headers)
if err != nil {
return err
}
for _, f := range files {
blocksSnapshot := &BlocksSnapshot{From: f.From, To: f.To}
{
seg := &BodySegment{From: f.From, To: f.To}
fileName := SegmentFileName(f.From, f.To, Bodies)
blocksSnapshot.Bodies, err = compress.NewDecompressor(path.Join(s.dir, fileName))
seg.seg, err = compress.NewDecompressor(path.Join(s.dir, fileName))
if err != nil {
if errors.Is(err, os.ErrNotExist) {
break
}
return err
}
s.Bodies.segments = append(s.Bodies.segments, seg)
}
{
seg := &HeaderSegment{From: f.From, To: f.To}
fileName := SegmentFileName(f.From, f.To, Headers)
blocksSnapshot.Headers, err = compress.NewDecompressor(path.Join(s.dir, fileName))
seg.seg, err = compress.NewDecompressor(path.Join(s.dir, fileName))
if err != nil {
if errors.Is(err, os.ErrNotExist) {
break
}
return err
}
s.Headers.segments = append(s.Headers.segments, seg)
}
{
seg := &TxnSegment{From: f.From, To: f.To}
fileName := SegmentFileName(f.From, f.To, Transactions)
blocksSnapshot.Transactions, err = compress.NewDecompressor(path.Join(s.dir, fileName))
seg.Seg, err = compress.NewDecompressor(path.Join(s.dir, fileName))
if err != nil {
if errors.Is(err, os.ErrNotExist) {
break
}
return err
}
s.Txs.segments = append(s.Txs.segments, seg)
}
s.blocks = append(s.blocks, blocksSnapshot)
if blocksSnapshot.To > 0 {
s.segmentsAvailable = blocksSnapshot.To - 1
if f.To > 0 {
s.segmentsAvailable.Store(f.To - 1)
} else {
s.segmentsAvailable = 0
s.segmentsAvailable.Store(0)
}
}
s.segmentsReady.Store(true)
@ -308,116 +507,127 @@ func (s *RoSnapshots) ReopenSegments() error {
}
func (s *RoSnapshots) Close() {
s.closeSegements()
s.closeIndices()
s.blocks = nil
s.Headers.lock.Lock()
defer s.Headers.lock.Unlock()
s.Bodies.lock.Lock()
defer s.Bodies.lock.Unlock()
s.Txs.lock.Lock()
defer s.Txs.lock.Unlock()
s.closeSegmentsLocked()
}
func (s *RoSnapshots) closeSegements() {
for _, s := range s.blocks {
if s.Headers != nil {
s.Headers.Close()
}
if s.Bodies != nil {
s.Bodies.Close()
}
if s.Transactions != nil {
s.Transactions.Close()
}
func (s *RoSnapshots) closeSegmentsLocked() {
if s.Headers != nil {
s.Headers.closeLocked()
s.Headers.segments = nil
}
if s.Bodies != nil {
s.Bodies.closeLocked()
s.Bodies.segments = nil
}
if s.Txs != nil {
s.Txs.closeLocked()
s.Txs.segments = nil
}
}
func (s *RoSnapshots) closeIndices() {
for _, s := range s.blocks {
if s.HeaderHashIdx != nil {
s.HeaderHashIdx.Close()
}
if s.BodyNumberIdx != nil {
s.BodyNumberIdx.Close()
}
if s.TxnHashIdx != nil {
s.TxnHashIdx.Close()
}
if s.TxnIdsIdx != nil {
s.TxnIdsIdx.Close()
}
if s.TxnHash2BlockNumIdx != nil {
s.TxnHash2BlockNumIdx.Close()
}
func (s *RoSnapshots) ViewHeaders(blockNum uint64, f func(sn *HeaderSegment) error) (found bool, err error) {
if !s.indicesReady.Load() || blockNum > s.segmentsAvailable.Load() {
return false, nil
}
return s.Headers.ViewSegment(blockNum, f)
}
func (s *RoSnapshots) Blocks(blockNumber uint64) (snapshot *BlocksSnapshot, found bool) {
if !s.indicesReady.Load() {
return nil, false
func (s *RoSnapshots) ViewBodies(blockNum uint64, f func(sn *BodySegment) error) (found bool, err error) {
if !s.indicesReady.Load() || blockNum > s.segmentsAvailable.Load() {
return false, nil
}
if blockNumber > s.segmentsAvailable {
return snapshot, false
return s.Bodies.ViewSegment(blockNum, f)
}
func (s *RoSnapshots) ViewTxs(blockNum uint64, f func(sn *TxnSegment) error) (found bool, err error) {
if !s.indicesReady.Load() || blockNum > s.segmentsAvailable.Load() {
return false, nil
}
for _, blocksSnapshot := range s.blocks {
if blocksSnapshot.Has(blockNumber) {
return blocksSnapshot, true
}
}
return snapshot, false
return s.Txs.ViewSegment(blockNum, f)
}
func BuildIndices(ctx context.Context, s *RoSnapshots, snapshotDir *dir.Rw, chainID uint256.Int, tmpDir string, from uint64, lvl log.Lvl) error {
logEvery := time.NewTicker(20 * time.Second)
defer logEvery.Stop()
for _, sn := range s.blocks {
if sn.From < from {
continue
}
f := filepath.Join(snapshotDir.Path, SegmentFileName(sn.From, sn.To, Headers))
if err := HeadersHashIdx(ctx, f, sn.From, tmpDir, logEvery, lvl); err != nil {
return err
if err := s.Headers.View(func(segments []*HeaderSegment) error {
for _, sn := range segments {
if sn.From < from {
continue
}
f := filepath.Join(snapshotDir.Path, SegmentFileName(sn.From, sn.To, Headers))
if err := HeadersHashIdx(ctx, f, sn.From, tmpDir, logEvery, lvl); err != nil {
return err
}
}
return nil
}); err != nil {
return nil
}
for _, sn := range s.blocks {
if sn.From < from {
continue
}
f := filepath.Join(snapshotDir.Path, SegmentFileName(sn.From, sn.To, Bodies))
if err := BodiesIdx(ctx, f, sn.From, tmpDir, logEvery, lvl); err != nil {
return err
if err := s.Bodies.View(func(segments []*BodySegment) error {
for _, sn := range segments {
if sn.From < from {
continue
}
f := filepath.Join(snapshotDir.Path, SegmentFileName(sn.From, sn.To, Bodies))
if err := BodiesIdx(ctx, f, sn.From, tmpDir, logEvery, lvl); err != nil {
return err
}
}
return nil
}); err != nil {
return nil
}
// hack to read first block body - to get baseTxId from there
if err := s.ReopenSomeIndices(Headers, Bodies); err != nil {
return err
}
for _, sn := range s.blocks {
if sn.From < from {
continue
}
// build txs idx
gg := sn.Bodies.MakeGetter()
buf, _ := gg.Next(nil)
firstBody := &types.BodyForStorage{}
if err := rlp.DecodeBytes(buf, firstBody); err != nil {
return err
}
var expectedTxsAmount uint64
{
off := sn.BodyNumberIdx.Lookup2(sn.To - 1 - sn.From)
gg.Reset(off)
buf, _ = gg.Next(buf[:0])
lastBody := new(types.BodyForStorage)
err := rlp.DecodeBytes(buf, lastBody)
if err != nil {
return err
if err := s.Txs.View(func(segments []*TxnSegment) error {
for i, sn := range segments {
if sn.From < from {
continue
}
expectedTxsAmount = lastBody.BaseTxId + uint64(lastBody.TxAmount) - firstBody.BaseTxId
}
f := filepath.Join(snapshotDir.Path, SegmentFileName(sn.From, sn.To, Transactions))
if err := TransactionsHashIdx(ctx, chainID, sn, firstBody.BaseTxId, sn.From, expectedTxsAmount, f, tmpDir, logEvery, lvl); err != nil {
return err
if err := s.Bodies.View(func(bodySegments []*BodySegment) error {
// build txs idx
gg := bodySegments[i].seg.MakeGetter()
buf, _ := gg.Next(nil)
firstBody := &types.BodyForStorage{}
if err := rlp.DecodeBytes(buf, firstBody); err != nil {
return err
}
var expectedTxsAmount uint64
{
off := bodySegments[i].idxBodyNumber.Lookup2(sn.To - 1 - sn.From)
gg.Reset(off)
buf, _ = gg.Next(buf[:0])
lastBody := new(types.BodyForStorage)
err := rlp.DecodeBytes(buf, lastBody)
if err != nil {
return err
}
expectedTxsAmount = lastBody.BaseTxId + uint64(lastBody.TxAmount) - firstBody.BaseTxId
}
f := filepath.Join(snapshotDir.Path, SegmentFileName(sn.From, sn.To, Transactions))
if err := TransactionsHashIdx(ctx, chainID, sn, bodySegments[i], firstBody.BaseTxId, sn.From, expectedTxsAmount, f, tmpDir, logEvery, lvl); err != nil {
return err
}
return nil
}); err != nil {
return nil
}
}
return nil
}); err != nil {
return nil
}
return nil
@ -665,7 +875,81 @@ func min(a, b uint64) uint64 {
return b
}
func RetireBlocks(ctx context.Context, blockFrom, blockTo uint64, chainID uint256.Int, tmpDir string, snapshots *RoSnapshots, db kv.RoDB, workers int, lvl log.Lvl) error {
type BlockRetire struct {
working atomic.Bool
wg *sync.WaitGroup
result *BlockRetireResult
workers int
tmpDir string
snapshots *RoSnapshots
db kv.RoDB
}
type BlockRetireResult struct {
BlockFrom, BlockTo uint64
Err error
}
func NewBlockRetire(workers int, tmpDir string, snapshots *RoSnapshots, db kv.RoDB) *BlockRetire {
return &BlockRetire{workers: workers, tmpDir: tmpDir, snapshots: snapshots, wg: &sync.WaitGroup{}, db: db}
}
func (br *BlockRetire) Snapshots() *RoSnapshots { return br.snapshots }
func (br *BlockRetire) Working() bool { return br.working.Load() }
func (br *BlockRetire) Wait() { br.wg.Wait() }
func (br *BlockRetire) Result() *BlockRetireResult {
r := br.result
br.result = nil
return r
}
func (br *BlockRetire) CanRetire(curBlockNum uint64) (blockFrom, blockTo uint64, can bool) {
blockFrom = br.snapshots.BlocksAvailable() + 1
return canRetire(blockFrom, curBlockNum-params.FullImmutabilityThreshold)
}
func canRetire(from, to uint64) (blockFrom, blockTo uint64, can bool) {
blockFrom = (from / 1_000) * 1_000
roundedTo1K := (to / 1_000) * 1_000
jump := roundedTo1K - blockFrom
switch { // only next segment sizes are allowed
case jump >= 500_000:
blockTo = blockFrom + 500_000
case jump >= 100_000:
blockTo = blockFrom + 100_000
case jump >= 10_000:
blockTo = blockFrom + 10_000
case jump >= 1_000:
blockTo = blockFrom + 1_000
default:
blockTo = blockFrom
}
return blockFrom, blockTo, blockTo-blockFrom >= 1_000
}
func (br *BlockRetire) CanDeleteTo(curBlockNum uint64) (blockTo uint64) {
hardLimit := (curBlockNum/1_000)*1_000 - params.FullImmutabilityThreshold
return min(hardLimit, br.snapshots.BlocksAvailable()+1)
}
func (br *BlockRetire) RetireBlocksInBackground(ctx context.Context, blockFrom, blockTo uint64, chainID uint256.Int, lvl log.Lvl) {
br.result = nil
if br.working.Load() {
return
}
br.wg.Add(1)
go func() {
br.working.Store(true)
defer br.working.Store(false)
defer br.wg.Done()
err := retireBlocks(ctx, blockFrom, blockTo, chainID, br.tmpDir, br.snapshots, br.db, br.workers, lvl)
br.result = &BlockRetireResult{
BlockFrom: blockFrom,
BlockTo: blockTo,
Err: err,
}
}()
}
func retireBlocks(ctx context.Context, blockFrom, blockTo uint64, chainID uint256.Int, tmpDir string, snapshots *RoSnapshots, db kv.RoDB, workers int, lvl log.Lvl) error {
log.Log(lvl, "[snapshots] Retire Blocks", "range", fmt.Sprintf("%dk-%dk", blockFrom/1000, blockTo/1000))
// in future we will do it in background
if err := DumpBlocks(ctx, blockFrom, blockTo, DEFAULT_SEGMENT_SIZE, tmpDir, snapshots.Dir(), db, workers, lvl); err != nil {
@ -1001,7 +1285,7 @@ func DumpBodies(ctx context.Context, db kv.RoDB, segmentFilePath, tmpDir string,
var EmptyTxHash = common.Hash{}
func TransactionsHashIdx(ctx context.Context, chainID uint256.Int, sn *BlocksSnapshot, firstTxID, firstBlockNum, expectedCount uint64, segmentFilePath, tmpDir string, logEvery *time.Ticker, lvl log.Lvl) error {
func TransactionsHashIdx(ctx context.Context, chainID uint256.Int, txsSegment *TxnSegment, bodiesSegment *BodySegment, firstTxID, firstBlockNum, expectedCount uint64, segmentFilePath, tmpDir string, logEvery *time.Ticker, lvl log.Lvl) error {
dir, _ := filepath.Split(segmentFilePath)
d, err := compress.NewDecompressor(segmentFilePath)
@ -1018,7 +1302,7 @@ func TransactionsHashIdx(ctx context.Context, chainID uint256.Int, sn *BlocksSna
BucketSize: 2000,
LeafSize: 8,
TmpDir: tmpDir,
IndexFile: filepath.Join(dir, IdxFileName(sn.From, sn.To, Transactions.String())),
IndexFile: filepath.Join(dir, IdxFileName(txsSegment.From, txsSegment.To, Transactions.String())),
BaseDataID: firstTxID,
})
if err != nil {
@ -1030,7 +1314,7 @@ func TransactionsHashIdx(ctx context.Context, chainID uint256.Int, sn *BlocksSna
BucketSize: 2000,
LeafSize: 8,
TmpDir: tmpDir,
IndexFile: filepath.Join(dir, IdxFileName(sn.From, sn.To, TransactionsId.String())),
IndexFile: filepath.Join(dir, IdxFileName(txsSegment.From, txsSegment.To, TransactionsId.String())),
BaseDataID: firstTxID,
})
if err != nil {
@ -1042,7 +1326,7 @@ func TransactionsHashIdx(ctx context.Context, chainID uint256.Int, sn *BlocksSna
BucketSize: 2000,
LeafSize: 8,
TmpDir: tmpDir,
IndexFile: filepath.Join(dir, IdxFileName(sn.From, sn.To, Transactions2Block.String())),
IndexFile: filepath.Join(dir, IdxFileName(txsSegment.From, txsSegment.To, Transactions2Block.String())),
BaseDataID: firstBlockNum,
})
if err != nil {
@ -1149,8 +1433,8 @@ RETRY:
defer wg.Done()
blockNum := firstBlockNum
body := &types.BodyForStorage{}
if err := sn.Bodies.WithReadAhead(func() error {
bodyGetter := sn.Bodies.MakeGetter()
if err := bodiesSegment.seg.WithReadAhead(func() error {
bodyGetter := bodiesSegment.seg.MakeGetter()
bodyGetter.Reset(0)
buf, _ = bodyGetter.Next(buf[:0])
if err := rlp.DecodeBytes(buf, body); err != nil {
@ -1364,23 +1648,30 @@ RETRY:
func ForEachHeader(ctx context.Context, s *RoSnapshots, walker func(header *types.Header) error) error {
r := bytes.NewReader(nil)
for _, sn := range s.blocks {
ch := forEachAsync(ctx, sn.Headers)
for it := range ch {
if it.err != nil {
return nil
}
err := s.Headers.View(func(snapshots []*HeaderSegment) error {
for _, sn := range snapshots {
ch := forEachAsync(ctx, sn.seg)
for it := range ch {
if it.err != nil {
return nil
}
header := new(types.Header)
r.Reset(it.word[1:])
if err := rlp.Decode(r, header); err != nil {
return err
}
if err := walker(header); err != nil {
return err
header := new(types.Header)
r.Reset(it.word[1:])
if err := rlp.Decode(r, header); err != nil {
return err
}
if err := walker(header); err != nil {
return err
}
}
}
return nil
})
if err != nil {
return err
}
return nil
}
@ -1394,35 +1685,6 @@ func NewMerger(tmpDir string, workers int, lvl log.Lvl) *Merger {
return &Merger{tmpDir: tmpDir, workers: workers, lvl: lvl}
}
/*
a.fileLocks[fType].RLock()
defer a.fileLocks[fType].RUnlock()
var maxEndBlock uint64
a.files[fType].Ascend(func(i btree.Item) bool {
item := i.(*byEndBlockItem)
if item.decompressor == nil {
return true // Skip B-tree based items
}
pre = append(pre, item)
if aggTo == 0 {
var doubleEnd uint64
nextDouble := item.endBlock
for nextDouble <= maxEndBlock && nextDouble-item.startBlock < maxSpan {
doubleEnd = nextDouble
nextDouble = doubleEnd + (doubleEnd - item.startBlock) + 1
}
if doubleEnd != item.endBlock {
aggFrom = item.startBlock
aggTo = doubleEnd
} else {
post = append(post, item)
return true
}
}
toAggregate = append(toAggregate, item)
return item.endBlock < aggTo
})
*/
type mergeRange struct {
from, to uint64
}
@ -1430,8 +1692,8 @@ type mergeRange struct {
func (r mergeRange) String() string { return fmt.Sprintf("%dk-%dk", r.from/1000, r.to/1000) }
func (*Merger) FindMergeRanges(snapshots *RoSnapshots) (res []mergeRange) {
for i := len(snapshots.blocks) - 1; i > 0; i-- {
sn := snapshots.blocks[i]
for i := len(snapshots.Headers.segments) - 1; i > 0; i-- {
sn := snapshots.Headers.segments[i]
if sn.To-sn.From >= DEFAULT_SEGMENT_SIZE { // is complete .seg
continue
}
@ -1445,7 +1707,7 @@ func (*Merger) FindMergeRanges(snapshots *RoSnapshots) (res []mergeRange) {
}
aggFrom := sn.To - span
res = append(res, mergeRange{from: aggFrom, to: sn.To})
for snapshots.blocks[i].From > aggFrom {
for snapshots.Headers.segments[i].From > aggFrom {
i--
}
break
@ -1455,17 +1717,27 @@ func (*Merger) FindMergeRanges(snapshots *RoSnapshots) (res []mergeRange) {
return res
}
func (m *Merger) filesByRange(snapshots *RoSnapshots, from, to uint64) (toMergeHeaders, toMergeBodies, toMergeTxs []string) {
for _, sn := range snapshots.blocks {
if sn.From < from {
continue
}
if sn.To > to {
break
}
if err := snapshots.Headers.View(func(hSegments []*HeaderSegment) error {
return snapshots.Bodies.View(func(bSegments []*BodySegment) error {
return snapshots.Txs.View(func(tSegments []*TxnSegment) error {
for i, sn := range hSegments {
if sn.From < from {
continue
}
if sn.To > to {
break
}
toMergeBodies = append(toMergeBodies, sn.Bodies.FilePath())
toMergeHeaders = append(toMergeHeaders, sn.Headers.FilePath())
toMergeTxs = append(toMergeTxs, sn.Transactions.FilePath())
toMergeHeaders = append(toMergeHeaders, hSegments[i].seg.FilePath())
toMergeBodies = append(toMergeBodies, bSegments[i].seg.FilePath())
toMergeTxs = append(toMergeTxs, tSegments[i].Seg.FilePath())
}
return nil
})
})
}); err != nil {
panic(err)
}
return
}

View File

@ -107,6 +107,24 @@ func TestMergeSnapshots(t *testing.T) {
require.Equal(1, a)
}
func TestCanRetire(t *testing.T) {
require := require.New(t)
cases := []struct {
inFrom, inTo, outFrom, outTo uint64
can bool
}{
{0, 1234, 0, 1000, true},
{1_000_000, 1_120_000, 1_000_000, 1_100_000, true},
{2_500_000, 4_100_000, 2_500_000, 3_000_000, true},
{2_500_000, 2_500_100, 2_500_000, 2_500_000, false},
}
for _, tc := range cases {
from, to, can := canRetire(tc.inFrom, tc.inTo)
require.Equal(int(tc.outFrom), int(from))
require.Equal(int(tc.outTo), int(to))
require.Equal(tc.can, can, tc.inFrom, tc.inTo)
}
}
func TestRecompress(t *testing.T) {
dir, require := t.TempDir(), require.New(t)
createFile := func(from, to uint64) { createTestSegmentFile(t, from, to, Headers, dir) }
@ -131,13 +149,13 @@ func TestOpenAllSnapshot(t *testing.T) {
defer s.Close()
err := s.ReopenSegments()
require.NoError(err)
require.Equal(0, len(s.blocks))
require.Equal(0, len(s.Headers.segments))
s.Close()
createFile(500_000, 1_000_000, Bodies)
s = NewRoSnapshots(cfg, dir)
defer s.Close()
require.Equal(0, len(s.blocks)) //because, no headers and transactions snapshot files are created
require.Equal(0, len(s.Bodies.segments)) //because, no headers and transactions snapshot files are created
s.Close()
createFile(500_000, 1_000_000, Headers)
@ -145,7 +163,7 @@ func TestOpenAllSnapshot(t *testing.T) {
s = NewRoSnapshots(cfg, dir)
err = s.ReopenSegments()
require.Error(err)
require.Equal(0, len(s.blocks)) //because, no gaps are allowed (expect snapshots from block 0)
require.Equal(0, len(s.Headers.segments)) //because, no gaps are allowed (expect snapshots from block 0)
s.Close()
createFile(0, 500_000, Bodies)
@ -157,17 +175,26 @@ func TestOpenAllSnapshot(t *testing.T) {
err = s.ReopenSegments()
require.NoError(err)
s.indicesReady.Store(true)
require.Equal(2, len(s.blocks))
require.Equal(2, len(s.Headers.segments))
sn, ok := s.Blocks(10)
ok, err := s.ViewTxs(10, func(sn *TxnSegment) error {
require.Equal(int(sn.To), 500_000)
return nil
})
require.NoError(err)
require.True(ok)
require.Equal(int(sn.To), 500_000)
sn, ok = s.Blocks(500_000)
ok, err = s.ViewTxs(500_000, func(sn *TxnSegment) error {
require.Equal(int(sn.To), 1_000_000) // [from:to)
return nil
})
require.NoError(err)
require.True(ok)
require.Equal(int(sn.To), 1_000_000) // [from:to)
_, ok = s.Blocks(1_000_000)
ok, err = s.ViewTxs(1_000_000, func(sn *TxnSegment) error {
return nil
})
require.NoError(err)
require.False(ok)
// Erigon may create new snapshots by itself - with high bigger than hardcoded ExpectedBlocks
@ -177,7 +204,7 @@ func TestOpenAllSnapshot(t *testing.T) {
err = s.ReopenSegments()
require.NoError(err)
defer s.Close()
require.Equal(2, len(s.blocks))
require.Equal(2, len(s.Headers.segments))
createFile(500_000, 900_000, Headers)
createFile(500_000, 900_000, Bodies)

View File

@ -8,8 +8,7 @@ import (
)
var (
blockSnapshotEnabledKey = []byte("blocksSnapshotEnabled")
blockSnapshotRetireEnabledKey = []byte("blocksSnapshotRetireEnabled")
blockSnapshotEnabledKey = []byte("blocksSnapshotEnabled")
)
func EnsureNotChanged(tx kv.GetPut, cfg ethconfig.Snapshot) error {
@ -20,12 +19,5 @@ func EnsureNotChanged(tx kv.GetPut, cfg ethconfig.Snapshot) error {
if !ok {
return fmt.Errorf("node was started with --%s=%v, can't change it", ethconfig.FlagSnapshot, v)
}
ok, v, err = kv.EnsureNotChangedBool(tx, kv.DatabaseInfo, blockSnapshotRetireEnabledKey, cfg.RetireEnabled)
if err != nil {
return err
}
if !ok {
return fmt.Errorf("node was started with --%s=%v, can't change it", ethconfig.FlagSnapshotRetire, v)
}
return nil
}

View File

@ -326,7 +326,7 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey
blockReader,
),
stagedsync.StageIssuanceCfg(mock.DB, mock.ChainConfig, blockReader, true),
stagedsync.StageSendersCfg(mock.DB, mock.ChainConfig, mock.tmpdir, prune, allSnapshots),
stagedsync.StageSendersCfg(mock.DB, mock.ChainConfig, mock.tmpdir, prune, snapshotsync.NewBlockRetire(1, mock.tmpdir, allSnapshots, mock.DB)),
stagedsync.StageExecuteBlocksCfg(
mock.DB,
prune,

View File

@ -253,11 +253,10 @@ func NewStagedSync(
forkChoiceCh chan privateapi.ForkChoiceMessage,
waitingForBeaconChain *uint32,
snapshotDownloader proto_downloader.DownloaderClient,
allSnapshots *snapshotsync.RoSnapshots,
) (*stagedsync.Sync, error) {
var blockReader interfaces.FullBlockReader
var allSnapshots *snapshotsync.RoSnapshots
if cfg.Snapshot.Enabled {
allSnapshots = snapshotsync.NewRoSnapshots(cfg.Snapshot, cfg.SnapshotDir.Path)
blockReader = snapshotsync.NewBlockReaderWithSnapshots(allSnapshots)
} else {
blockReader = snapshotsync.NewBlockReader()
@ -303,7 +302,7 @@ func NewStagedSync(
blockReader,
),
stagedsync.StageIssuanceCfg(db, controlServer.ChainConfig, blockReader, cfg.EnabledIssuance),
stagedsync.StageSendersCfg(db, controlServer.ChainConfig, tmpdir, cfg.Prune, allSnapshots),
stagedsync.StageSendersCfg(db, controlServer.ChainConfig, tmpdir, cfg.Prune, snapshotsync.NewBlockRetire(1, tmpdir, allSnapshots, db)),
stagedsync.StageExecuteBlocksCfg(
db,
cfg.Prune,