Snapsthos: build indices on retire blocks (#3823)

* grpc up

* grpc up

* grpc up
This commit is contained in:
Alex Sharov 2022-04-05 16:22:11 +07:00 committed by GitHub
parent 4cdc38ca58
commit 80bd44fce5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 40 additions and 25 deletions

View File

@ -576,11 +576,18 @@ func stageSenders(db kv.RwDB, ctx context.Context) error {
s := stage(sync, tx, nil, stages.Senders)
log.Info("Stage", "name", s.ID, "progress", s.BlockNumber)
snapshots := allSnapshots(chainConfig)
d, err := dir.OpenRw(snapshots.Dir())
if err != nil {
return err
}
pm, err := prune.Get(tx)
if err != nil {
return err
}
cfg := stagedsync.StageSendersCfg(db, chainConfig, tmpdir, pm, snapshotsync.NewBlockRetire(runtime.NumCPU(), tmpdir, allSnapshots(chainConfig), db, nil, nil))
cfg := stagedsync.StageSendersCfg(db, chainConfig, tmpdir, pm, snapshotsync.NewBlockRetire(runtime.NumCPU(), tmpdir, snapshots, d, db, nil, nil))
if unwind > 0 {
u := sync.NewUnwindState(stages.Senders, s.BlockNumber-unwind, s.BlockNumber)
if err = stagedsync.UnwindSendersStage(u, tx, cfg, ctx); err != nil {
@ -1166,7 +1173,7 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig)
sync, err := stages2.NewStagedSync(context.Background(), logger, db, p2p.Config{}, cfg,
chainConfig.TerminalTotalDifficulty, sentryControlServer, tmpdir,
&stagedsync.Notifications{}, nil, allSn,
&stagedsync.Notifications{}, nil, allSn, cfg.SnapshotDir,
)
if err != nil {
panic(err)

View File

@ -31,6 +31,7 @@ import (
"github.com/holiman/uint256"
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/common/dir"
"github.com/ledgerwatch/erigon-lib/direct"
"github.com/ledgerwatch/erigon-lib/etl"
proto_downloader "github.com/ledgerwatch/erigon-lib/gointerfaces/downloader"
@ -481,11 +482,14 @@ func New(stack *node.Node, config *ethconfig.Config, txpoolCfg txpool2.Config, l
if err := backend.StartMining(context.Background(), backend.chainDB, mining, backend.config.Miner, backend.gasPrice, backend.quitMining); err != nil {
return nil, err
}
d, err := dir.OpenRw(allSnapshots.Dir())
if err != nil {
return nil, err
}
backend.stagedSync, err = stages2.NewStagedSync(backend.sentryCtx, backend.log, backend.chainDB,
stack.Config().P2P, *config, chainConfig.TerminalTotalDifficulty,
backend.sentryControlServer, tmpdir, backend.notifications,
backend.downloaderClient, allSnapshots)
backend.downloaderClient, allSnapshots, d)
if err != nil {
return nil, err
}

View File

@ -109,7 +109,7 @@ func TestSenders(t *testing.T) {
require.NoError(stages.SaveStageProgress(tx, stages.Bodies, 3))
cfg := StageSendersCfg(db, params.TestChainConfig, "", prune.Mode{}, snapshotsync.NewBlockRetire(1, "", nil, db, nil, nil))
cfg := StageSendersCfg(db, params.TestChainConfig, "", prune.Mode{}, snapshotsync.NewBlockRetire(1, "", nil, nil, db, nil, nil))
err := SpawnRecoverSendersStage(cfg, &StageState{ID: stages.Senders}, nil, tx, 3, ctx)
assert.NoError(t, err)

View File

@ -245,7 +245,7 @@ func doRetireCommand(cliCtx *cli.Context) error {
snapshots := snapshotsync.NewRoSnapshots(cfg, snapshotDir)
snapshots.Reopen()
br := snapshotsync.NewBlockRetire(runtime.NumCPU()-1, tmpDir, snapshots, chainDB, nil, nil)
br := snapshotsync.NewBlockRetire(runtime.NumCPU()-1, tmpDir, snapshots, rwSnapshotDir, chainDB, nil, nil)
for i := from; i < to; i += every {
br.RetireBlocksInBackground(ctx, i, i+every, *chainID, log.LvlInfo)

View File

@ -953,10 +953,11 @@ type BlockRetire struct {
wg *sync.WaitGroup
result *BlockRetireResult
workers int
tmpDir string
snapshots *RoSnapshots
db kv.RoDB
workers int
tmpDir string
snapshots *RoSnapshots
snapshotDir *dir.Rw
db kv.RoDB
downloader proto_downloader.DownloaderClient
notifier DBEventNotifier
@ -967,8 +968,8 @@ type BlockRetireResult struct {
Err error
}
func NewBlockRetire(workers int, tmpDir string, snapshots *RoSnapshots, db kv.RoDB, downloader proto_downloader.DownloaderClient, notifier DBEventNotifier) *BlockRetire {
return &BlockRetire{workers: workers, tmpDir: tmpDir, snapshots: snapshots, wg: &sync.WaitGroup{}, db: db, downloader: downloader, notifier: notifier}
func NewBlockRetire(workers int, tmpDir string, snapshots *RoSnapshots, snapshotDir *dir.Rw, db kv.RoDB, downloader proto_downloader.DownloaderClient, notifier DBEventNotifier) *BlockRetire {
return &BlockRetire{workers: workers, tmpDir: tmpDir, snapshots: snapshots, snapshotDir: snapshotDir, wg: &sync.WaitGroup{}, db: db, downloader: downloader, notifier: notifier}
}
func (br *BlockRetire) Snapshots() *RoSnapshots { return br.snapshots }
func (br *BlockRetire) Working() bool { return br.working.Load() }
@ -1025,7 +1026,7 @@ func (br *BlockRetire) RetireBlocksInBackground(ctx context.Context, blockFrom,
defer br.working.Store(false)
defer br.wg.Done()
err := retireBlocks(ctx, blockFrom, blockTo, chainID, br.tmpDir, br.snapshots, br.db, br.workers, br.downloader, lvl, br.notifier)
err := retireBlocks(ctx, blockFrom, blockTo, chainID, br.tmpDir, br.snapshots, br.snapshotDir, br.db, br.workers, br.downloader, lvl, br.notifier)
br.result = &BlockRetireResult{
BlockFrom: blockFrom,
BlockTo: blockTo,
@ -1038,7 +1039,7 @@ type DBEventNotifier interface {
OnNewSnapshot()
}
func retireBlocks(ctx context.Context, blockFrom, blockTo uint64, chainID uint256.Int, tmpDir string, snapshots *RoSnapshots, db kv.RoDB, workers int, downloader proto_downloader.DownloaderClient, lvl log.Lvl, notifier DBEventNotifier) error {
func retireBlocks(ctx context.Context, blockFrom, blockTo uint64, chainID uint256.Int, tmpDir string, snapshots *RoSnapshots, rwSnapshotDir *dir.Rw, db kv.RoDB, workers int, downloader proto_downloader.DownloaderClient, lvl log.Lvl, notifier DBEventNotifier) error {
log.Log(lvl, "[snapshots] Retire Blocks", "range", fmt.Sprintf("%dk-%dk", blockFrom/1000, blockTo/1000))
// in future we will do it in background
if err := DumpBlocks(ctx, blockFrom, blockTo, DEFAULT_SEGMENT_SIZE, tmpDir, snapshots.Dir(), db, workers, lvl); err != nil {
@ -1047,18 +1048,18 @@ func retireBlocks(ctx context.Context, blockFrom, blockTo uint64, chainID uint25
if err := snapshots.Reopen(); err != nil {
return fmt.Errorf("ReopenSegments: %w", err)
}
if err := BuildIndices(ctx, snapshots, rwSnapshotDir, chainID, tmpDir, snapshots.IndicesAvailable(), log.LvlInfo); err != nil {
return err
}
merger := NewMerger(tmpDir, workers, lvl, chainID)
ranges := merger.FindMergeRanges(snapshots)
if len(ranges) == 0 {
return nil
}
err := merger.Merge(ctx, snapshots, ranges, &dir.Rw{Path: snapshots.Dir()}, true)
err := merger.Merge(ctx, snapshots, ranges, rwSnapshotDir, true)
if err != nil {
return err
}
if err := snapshots.Reopen(); err != nil {
return fmt.Errorf("ReopenSegments: %w", err)
}
if notifier != nil { // notify about new snapshots of any size
notifier.OnNewSnapshot()
}

View File

@ -316,7 +316,7 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey
blockReader,
),
stagedsync.StageIssuanceCfg(mock.DB, mock.ChainConfig, blockReader, true),
stagedsync.StageSendersCfg(mock.DB, mock.ChainConfig, mock.tmpdir, prune, snapshotsync.NewBlockRetire(1, mock.tmpdir, allSnapshots, mock.DB, snapshotsDownloader, mock.Notifications.Events)),
stagedsync.StageSendersCfg(mock.DB, mock.ChainConfig, mock.tmpdir, prune, snapshotsync.NewBlockRetire(1, mock.tmpdir, allSnapshots, snapshotDir, mock.DB, snapshotsDownloader, mock.Notifications.Events)),
stagedsync.StageExecuteBlocksCfg(
mock.DB,
prune,

View File

@ -10,6 +10,7 @@ import (
"github.com/holiman/uint256"
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/common/dbg"
"github.com/ledgerwatch/erigon-lib/common/dir"
proto_downloader "github.com/ledgerwatch/erigon-lib/gointerfaces/downloader"
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
"github.com/ledgerwatch/erigon-lib/kv"
@ -249,14 +250,16 @@ func NewStagedSync(
tmpdir string,
notifications *stagedsync.Notifications,
snapshotDownloader proto_downloader.DownloaderClient,
allSnapshots *snapshotsync.RoSnapshots,
snapshots *snapshotsync.RoSnapshots,
snapshotDir *dir.Rw,
) (*stagedsync.Sync, error) {
var blockReader interfaces.FullBlockReader
if cfg.Snapshot.Enabled {
blockReader = snapshotsync.NewBlockReaderWithSnapshots(allSnapshots)
blockReader = snapshotsync.NewBlockReaderWithSnapshots(snapshots)
} else {
blockReader = snapshotsync.NewBlockReader()
}
blockRetire := snapshotsync.NewBlockRetire(1, tmpdir, snapshots, snapshotDir, db, snapshotDownloader, notifications.Events)
// During Import we don't want other services like header requests, body requests etc. to be running.
// Hence we run it in the test mode.
@ -274,7 +277,7 @@ func NewStagedSync(
controlServer.Penalize,
cfg.BatchSize,
p2pCfg.NoDiscovery,
allSnapshots,
snapshots,
snapshotDownloader,
blockReader,
tmpdir,
@ -291,11 +294,11 @@ func NewStagedSync(
cfg.BodyDownloadTimeoutSeconds,
*controlServer.ChainConfig,
cfg.BatchSize,
allSnapshots,
snapshots,
blockReader,
),
stagedsync.StageIssuanceCfg(db, controlServer.ChainConfig, blockReader, cfg.EnabledIssuance),
stagedsync.StageSendersCfg(db, controlServer.ChainConfig, tmpdir, cfg.Prune, snapshotsync.NewBlockRetire(1, tmpdir, allSnapshots, db, snapshotDownloader, notifications.Events)),
stagedsync.StageSendersCfg(db, controlServer.ChainConfig, tmpdir, cfg.Prune, blockRetire),
stagedsync.StageExecuteBlocksCfg(
db,
cfg.Prune,
@ -315,7 +318,7 @@ func NewStagedSync(
stagedsync.StageHistoryCfg(db, cfg.Prune, tmpdir),
stagedsync.StageLogIndexCfg(db, cfg.Prune, tmpdir),
stagedsync.StageCallTracesCfg(db, cfg.Prune, 0, tmpdir),
stagedsync.StageTxLookupCfg(db, cfg.Prune, tmpdir, allSnapshots, isBor),
stagedsync.StageTxLookupCfg(db, cfg.Prune, tmpdir, snapshots, isBor),
stagedsync.StageFinishCfg(db, tmpdir, logger), runInTestMode),
stagedsync.DefaultUnwindOrder,
stagedsync.DefaultPruneOrder,