diff --git a/cmd/downloader/downloader/downloader.go b/cmd/downloader/downloader/downloader.go index d8593e162..a0f5c7ea6 100644 --- a/cmd/downloader/downloader/downloader.go +++ b/cmd/downloader/downloader/downloader.go @@ -204,6 +204,25 @@ func CalcStats(prevStats AggStats, interval time.Duration, client *torrent.Clien return result } +func AddTorrentFile(ctx context.Context, torrentFilePath string, torrentClient *torrent.Client) (mi *metainfo.MetaInfo, err error) { + mi, err = metainfo.LoadFromFile(torrentFilePath) + if err != nil { + return nil, err + } + mi.AnnounceList = Trackers + + t := time.Now() + _, err = torrentClient.AddTorrent(mi) + if err != nil { + return mi, err + } + took := time.Since(t) + if took > 3*time.Second { + log.Info("[torrent] Check validity", "file", torrentFilePath, "took", took) + } + return mi, nil +} + // AddTorrentFiles - adding .torrent files to torrentClient (and checking their hashes), if .torrent file // added first time - pieces verification process will start (disk IO heavy) - Progress // kept in `piece completion storage` (surviving reboot). Once it done - no disk IO needed again. @@ -214,27 +233,15 @@ func AddTorrentFiles(ctx context.Context, snapshotsDir *dir.Rw, torrentClient *t return err } for _, torrentFilePath := range files { + if _, err := AddTorrentFile(ctx, torrentFilePath, torrentClient); err != nil { + return err + } select { case <-ctx.Done(): return ctx.Err() default: } - mi, err := metainfo.LoadFromFile(torrentFilePath) - if err != nil { - return err - } - mi.AnnounceList = Trackers - - t := time.Now() - _, err = torrentClient.AddTorrent(mi) - if err != nil { - return err - } - took := time.Since(t) - if took > 3*time.Second { - log.Info("[torrent] Check validity", "file", torrentFilePath, "took", took) - } } return nil diff --git a/cmd/downloader/downloader/server.go b/cmd/downloader/downloader/grpc_server.go similarity index 87% rename from cmd/downloader/downloader/server.go rename to cmd/downloader/downloader/grpc_server.go index dfb86f425..f3fa18def 100644 --- a/cmd/downloader/downloader/server.go +++ b/cmd/downloader/downloader/grpc_server.go @@ -3,6 +3,7 @@ package downloader import ( "context" "errors" + "path/filepath" "github.com/anacrolix/torrent" "github.com/anacrolix/torrent/metainfo" @@ -58,8 +59,18 @@ type GrpcServer struct { func (s *GrpcServer) Download(ctx context.Context, request *proto_downloader.DownloadRequest) (*emptypb.Empty, error) { infoHashes := make([]metainfo.Hash, len(request.Items)) for i, it := range request.Items { - //TODO: if hash is empty - create .torrent file from path file (if it exists) - infoHashes[i] = gointerfaces.ConvertH160toAddress(it.TorrentHash) + if it.TorrentHash == nil { + if err := BuildTorrentFileIfNeed(ctx, it.Path, s.snapshotDir); err != nil { + return nil, err + } + metaInfo, err := AddTorrentFile(ctx, filepath.Join(s.snapshotDir.Path, it.Path+".torrent"), s.t.TorrentClient) + if err != nil { + return nil, err + } + infoHashes[i] = metaInfo.HashInfoBytes() + } else { + infoHashes[i] = gointerfaces.ConvertH160toAddress(it.TorrentHash) + } } if err := ResolveAbsentTorrents(ctx, s.t.TorrentClient, infoHashes, s.snapshotDir, s.silent); err != nil { return nil, err diff --git a/cmd/downloader/downloader/util.go b/cmd/downloader/downloader/util.go index ad055ff87..57e2b709a 100644 --- a/cmd/downloader/downloader/util.go +++ b/cmd/downloader/downloader/util.go @@ -85,6 +85,24 @@ func allSegmentFiles(dir string) ([]string, error) { return res, nil } +// BuildTorrentFileIfNeed - create .torrent files from .seg files (big IO) - if .seg files were added manually +func BuildTorrentFileIfNeed(ctx context.Context, originalFileName string, root *dir.Rw) (err error) { + torrentFilePath := filepath.Join(root.Path, originalFileName+".torrent") + if _, err := os.Stat(torrentFilePath); err != nil { + if !errors.Is(err, os.ErrNotExist) { + return err + } + info, err := BuildInfoBytesForFile(root.Path, originalFileName) + if err != nil { + return err + } + if err := CreateTorrentFile(root, info, nil); err != nil { + return err + } + } + return nil +} + // BuildTorrentFilesIfNeed - create .torrent files from .seg files (big IO) - if .seg files were added manually func BuildTorrentFilesIfNeed(ctx context.Context, root *dir.Rw) error { logEvery := time.NewTicker(20 * time.Second) @@ -95,18 +113,8 @@ func BuildTorrentFilesIfNeed(ctx context.Context, root *dir.Rw) error { return err } for i, f := range files { - torrentFileName := filepath.Join(root.Path, f+".torrent") - if _, err := os.Stat(torrentFileName); err != nil { - if !errors.Is(err, os.ErrNotExist) { - return err - } - info, err := BuildInfoBytesForFile(root.Path, f) - if err != nil { - return err - } - if err := CreateTorrentFile(root, info, nil); err != nil { - return err - } + if err := BuildTorrentFileIfNeed(ctx, f, root); err != nil { + return err } select { diff --git a/cmd/downloader/trackers/trackerslist b/cmd/downloader/trackers/trackerslist index c378dbeac..0a2affcd4 160000 --- a/cmd/downloader/trackers/trackerslist +++ b/cmd/downloader/trackers/trackerslist @@ -1 +1 @@ -Subproject commit c378dbeac796719b7b0210d4a97d575a6e8dc66f +Subproject commit 0a2affcd4120a141e0d0ac8c7c43e2eaa7fee894 diff --git a/cmd/integration/commands/stages.go b/cmd/integration/commands/stages.go index ac0acf5df..05af41ff7 100644 --- a/cmd/integration/commands/stages.go +++ b/cmd/integration/commands/stages.go @@ -579,7 +579,7 @@ func stageSenders(db kv.RwDB, ctx context.Context) error { if err != nil { return err } - cfg := stagedsync.StageSendersCfg(db, chainConfig, tmpdir, pm, snapshotsync.NewBlockRetire(runtime.NumCPU(), tmpdir, allSnapshots(chainConfig), db)) + cfg := stagedsync.StageSendersCfg(db, chainConfig, tmpdir, pm, snapshotsync.NewBlockRetire(runtime.NumCPU(), tmpdir, allSnapshots(chainConfig), db, nil)) if unwind > 0 { u := sync.NewUnwindState(stages.Senders, s.BlockNumber-unwind, s.BlockNumber) if err = stagedsync.UnwindSendersStage(u, tx, cfg, ctx); err != nil { @@ -1058,6 +1058,7 @@ func allSnapshots(cc *params.ChainConfig) *snapshotsync.RoSnapshots { openSnapshotOnce.Do(func() { if enableSnapshot { snapshotCfg := ethconfig.NewSnapshotCfg(enableSnapshot, true) + dir.MustExist(filepath.Join(datadir, "snapshots")) _allSnapshotsSingleton = snapshotsync.NewRoSnapshots(snapshotCfg, filepath.Join(datadir, "snapshots")) if err := _allSnapshotsSingleton.ReopenSegments(); err != nil { panic(err) diff --git a/cmd/state/commands/erigon2.go b/cmd/state/commands/erigon2.go index fd7e1baf7..f92e2322b 100644 --- a/cmd/state/commands/erigon2.go +++ b/cmd/state/commands/erigon2.go @@ -33,11 +33,9 @@ import ( "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/core/types/accounts" "github.com/ledgerwatch/erigon/core/vm" - "github.com/ledgerwatch/erigon/eth" "github.com/ledgerwatch/erigon/eth/ethconfig" "github.com/ledgerwatch/erigon/params" "github.com/ledgerwatch/erigon/turbo/snapshotsync" - "github.com/ledgerwatch/erigon/turbo/snapshotsync/snapshothashes" ) const ( @@ -182,11 +180,6 @@ func Erigon2(genesis *core.Genesis, chainConfig *params.ChainConfig, logger log. engine := initConsensusEngine(chainConfig, logger) var blockReader interfaces.FullBlockReader if snapshotBlocks { - snConfig := snapshothashes.KnownConfig(chainConfig.ChainName) - snConfig.ExpectBlocks, err = eth.RestoreExpectedExternalSnapshot(historyDb, snConfig) - if err != nil { - return err - } allSnapshots := snapshotsync.NewRoSnapshots(ethconfig.NewSnapshotCfg(true, false), path.Join(datadir, "snapshots")) defer allSnapshots.Close() blockReader = snapshotsync.NewBlockReaderWithSnapshots(allSnapshots) diff --git a/eth/backend.go b/eth/backend.go index f75d9e68e..5538239c6 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -19,7 +19,6 @@ package eth import ( "context" - "encoding/binary" "errors" "fmt" "math/big" @@ -77,7 +76,6 @@ import ( "github.com/ledgerwatch/erigon/rpc" "github.com/ledgerwatch/erigon/turbo/shards" "github.com/ledgerwatch/erigon/turbo/snapshotsync" - "github.com/ledgerwatch/erigon/turbo/snapshotsync/snapshothashes" "github.com/ledgerwatch/erigon/turbo/snapshotsync/snapshotsynccli" stages2 "github.com/ledgerwatch/erigon/turbo/stages" "github.com/ledgerwatch/log/v3" @@ -313,12 +311,6 @@ func New(stack *node.Node, config *ethconfig.Config, txpoolCfg txpool2.Config, l var blockReader interfaces.FullBlockReader var allSnapshots *snapshotsync.RoSnapshots if config.Snapshot.Enabled { - snConfig := snapshothashes.KnownConfig(chainConfig.ChainName) - snConfig.ExpectBlocks, err = RestoreExpectedExternalSnapshot(chainKv, snConfig) - if err != nil { - return nil, err - } - allSnapshots = snapshotsync.NewRoSnapshots(config.Snapshot, config.SnapshotDir.Path) allSnapshots.AsyncOpenAll(ctx) blockReader = snapshotsync.NewBlockReaderWithSnapshots(allSnapshots) @@ -564,49 +556,6 @@ func New(stack *node.Node, config *ethconfig.Config, txpoolCfg txpool2.Config, l return backend, nil } -func RestoreExpectedExternalSnapshot(db kv.RwDB, snConfig *snapshothashes.Config) (uint64, error) { - const SyncedWithSnapshot = "synced_with_snapshot" - var snapshotToBlockInDB *uint64 - // Check if we have an already initialized chain and fall back to - // that if so. Otherwise we need to generate a new genesis spec. - if err := db.View(context.Background(), func(tx kv.Tx) error { - v, err := tx.GetOne(kv.DatabaseInfo, []byte(SyncedWithSnapshot)) - if err != nil { - return err - } - if v != nil { - valueInDB := binary.BigEndian.Uint64(v) - snapshotToBlockInDB = &valueInDB - return nil - } - - return nil - }); err != nil { - return 0, err - } - - if snapshotToBlockInDB != nil { - if *snapshotToBlockInDB != snConfig.ExpectBlocks { - return *snapshotToBlockInDB, nil // - //log.Warn(fmt.Sprintf("'incremental snapshots feature' not implemented yet. New snapshots available up to block %d, but this node was synced to snapshot %d and will keep other blocks in db. (it's safe, re-sync may reduce db size)", snapshotToBlockInDB, snConfig.ExpectBlocks)) - //snConfig.ExpectBlocks = *snapshotToBlockInDB - } - } - - if err := db.Update(context.Background(), func(tx kv.RwTx) error { - num := make([]byte, 8) - binary.BigEndian.PutUint64(num, snConfig.ExpectBlocks) - if err := tx.Put(kv.DatabaseInfo, []byte(SyncedWithSnapshot), num); err != nil { - return err - } - return nil - }); err != nil { - return 0, err - } - - return snConfig.ExpectBlocks, nil -} - func (s *Ethereum) APIs() []rpc.API { return []rpc.API{} } diff --git a/eth/stagedsync/stage_headers.go b/eth/stagedsync/stage_headers.go index c069c0676..0d0ba7916 100644 --- a/eth/stagedsync/stage_headers.go +++ b/eth/stagedsync/stage_headers.go @@ -1009,7 +1009,7 @@ func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.R expect := cfg.snapshotHashesCfg.ExpectBlocks if headers >= expect && bodies >= expect && txs >= expect { if err := cfg.snapshots.ReopenSegments(); err != nil { - return err + return fmt.Errorf("ReopenSegments: %w", err) } if expect > cfg.snapshots.BlocksAvailable() { return fmt.Errorf("not enough snapshots available: %d > %d", expect, cfg.snapshots.BlocksAvailable()) @@ -1031,31 +1031,21 @@ func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.R } // Create .idx files - if !cfg.snapshots.IndicesReady() { + if cfg.snapshots.IndicesAvailable() < cfg.snapshots.SegmentsAvailable() { if !cfg.snapshots.SegmentsReady() { return fmt.Errorf("not all snapshot segments are available") } // wait for Downloader service to download all expected snapshots - logEvery := time.NewTicker(logInterval) - defer logEvery.Stop() - headers, bodies, txs, err := cfg.snapshots.IdxAvailability() - if err != nil { - return err - } - expect := cfg.snapshotHashesCfg.ExpectBlocks - if headers < expect || bodies < expect || txs < expect { + if cfg.snapshots.IndicesAvailable() < cfg.snapshots.SegmentsAvailable() { chainID, _ := uint256.FromBig(cfg.chainConfig.ChainID) - if err := snapshotsync.BuildIndices(ctx, cfg.snapshots, cfg.snapshotDir, *chainID, cfg.tmpdir, 0, log.LvlInfo); err != nil { + if err := snapshotsync.BuildIndices(ctx, cfg.snapshots, cfg.snapshotDir, *chainID, cfg.tmpdir, cfg.snapshots.IndicesAvailable(), log.LvlInfo); err != nil { return err } } if err := cfg.snapshots.ReopenIndices(); err != nil { - return err - } - if expect > cfg.snapshots.IndicesAvailable() { - return fmt.Errorf("not enough snapshots available: %d > %d", expect, cfg.snapshots.BlocksAvailable()) + return fmt.Errorf("ReopenIndices: %w", err) } } diff --git a/eth/stagedsync/stage_senders.go b/eth/stagedsync/stage_senders.go index fcede85ba..fc972862b 100644 --- a/eth/stagedsync/stage_senders.go +++ b/eth/stagedsync/stage_senders.go @@ -401,16 +401,17 @@ func retireBlocks(s *PruneState, tx kv.RwTx, cfg SendersCfg, ctx context.Context } } if !cfg.blockRetire.Snapshots().Cfg().KeepBlocks { + // TODO: remove this check for the release + if err := cfg.blockRetire.Snapshots().EnsureExpectedBlocksAreAvailable(cfg.snapshotHashesCfg); err != nil { + return err + } + canDeleteTo := cfg.blockRetire.CanDeleteTo(s.ForwardProgress) if err := rawdb.DeleteAncientBlocks(tx, canDeleteTo, 1_000); err != nil { return nil } } - // TODO: remove this check for the release - if err := cfg.blockRetire.Snapshots().EnsureExpectedBlocksAreAvailable(cfg.snapshotHashesCfg); err != nil { - return err - } blockFrom, blockTo, ok := cfg.blockRetire.CanRetire(s.ForwardProgress) if !ok { return nil diff --git a/eth/stagedsync/stage_senders_test.go b/eth/stagedsync/stage_senders_test.go index d549449ca..36288852a 100644 --- a/eth/stagedsync/stage_senders_test.go +++ b/eth/stagedsync/stage_senders_test.go @@ -109,7 +109,7 @@ func TestSenders(t *testing.T) { require.NoError(stages.SaveStageProgress(tx, stages.Bodies, 3)) - cfg := StageSendersCfg(db, params.TestChainConfig, "", prune.Mode{}, snapshotsync.NewBlockRetire(1, "", nil, db)) + cfg := StageSendersCfg(db, params.TestChainConfig, "", prune.Mode{}, snapshotsync.NewBlockRetire(1, "", nil, db, nil)) err := SpawnRecoverSendersStage(cfg, &StageState{ID: stages.Senders}, nil, tx, 3, ctx) assert.NoError(t, err) diff --git a/turbo/app/snapshots.go b/turbo/app/snapshots.go index c9eb05b1a..c3bf37690 100644 --- a/turbo/app/snapshots.go +++ b/turbo/app/snapshots.go @@ -157,7 +157,7 @@ func doRetireCommand(cliCtx *cli.Context) error { snapshots := snapshotsync.NewRoSnapshots(cfg, snapshotDir) snapshots.ReopenSegments() - br := snapshotsync.NewBlockRetire(runtime.NumCPU()/2, tmpDir, snapshots, chainDB) + br := snapshotsync.NewBlockRetire(runtime.NumCPU()/2, tmpDir, snapshots, chainDB, nil) for i := from; i < to; i += every { br.RetireBlocksInBackground(ctx, i, i+every, *chainID, log.LvlInfo) diff --git a/turbo/snapshotsync/block_snapshots.go b/turbo/snapshotsync/block_snapshots.go index a5e916d86..cc2a1d8a5 100644 --- a/turbo/snapshotsync/block_snapshots.go +++ b/turbo/snapshotsync/block_snapshots.go @@ -22,6 +22,7 @@ import ( common2 "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/common/dir" "github.com/ledgerwatch/erigon-lib/compress" + proto_downloader "github.com/ledgerwatch/erigon-lib/gointerfaces/downloader" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/recsplit" "github.com/ledgerwatch/erigon-lib/txpool" @@ -346,12 +347,15 @@ func NewRoSnapshots(cfg ethconfig.Snapshot, snapshotDir string) *RoSnapshots { return &RoSnapshots{dir: snapshotDir, cfg: cfg, Headers: &headerSegments{}, Bodies: &bodySegments{}, Txs: &txnSegments{}} } -func (s *RoSnapshots) Cfg() ethconfig.Snapshot { return s.cfg } -func (s *RoSnapshots) Dir() string { return s.dir } -func (s *RoSnapshots) SegmentsReady() bool { return s.segmentsReady.Load() } -func (s *RoSnapshots) BlocksAvailable() uint64 { return s.segmentsAvailable.Load() } -func (s *RoSnapshots) IndicesReady() bool { return s.indicesReady.Load() } -func (s *RoSnapshots) IndicesAvailable() uint64 { return s.idxAvailable.Load() } +func (s *RoSnapshots) Cfg() ethconfig.Snapshot { return s.cfg } +func (s *RoSnapshots) Dir() string { return s.dir } +func (s *RoSnapshots) SegmentsReady() bool { return s.segmentsReady.Load() } +func (s *RoSnapshots) IndicesReady() bool { return s.indicesReady.Load() } +func (s *RoSnapshots) IndicesAvailable() uint64 { return s.idxAvailable.Load() } +func (s *RoSnapshots) SegmentsAvailable() uint64 { return s.segmentsAvailable.Load() } +func (s *RoSnapshots) BlocksAvailable() uint64 { + return min(s.segmentsAvailable.Load(), s.idxAvailable.Load()) +} func (s *RoSnapshots) EnsureExpectedBlocksAreAvailable(cfg *snapshothashes.Config) error { if s.BlocksAvailable() < cfg.ExpectBlocks { @@ -396,18 +400,28 @@ func (s *RoSnapshots) ReopenSomeIndices(types ...Type) (err error) { defer s.Bodies.lock.Unlock() s.Txs.lock.Lock() defer s.Txs.lock.Unlock() +Loop: for _, t := range types { switch t { case Headers: if err := s.Headers.reopen(s.dir); err != nil { + if errors.Is(err, os.ErrNotExist) { + break Loop + } return err } case Bodies: if err := s.Bodies.reopen(s.dir); err != nil { + if errors.Is(err, os.ErrNotExist) { + break Loop + } return err } case Transactions: if err := s.Txs.reopen(s.dir); err != nil { + if errors.Is(err, os.ErrNotExist) { + break Loop + } return err } default: @@ -530,19 +544,19 @@ func (s *RoSnapshots) closeSegmentsLocked() { } } func (s *RoSnapshots) ViewHeaders(blockNum uint64, f func(sn *HeaderSegment) error) (found bool, err error) { - if !s.indicesReady.Load() || blockNum > s.segmentsAvailable.Load() { + if !s.indicesReady.Load() || blockNum > s.BlocksAvailable() { return false, nil } return s.Headers.ViewSegment(blockNum, f) } func (s *RoSnapshots) ViewBodies(blockNum uint64, f func(sn *BodySegment) error) (found bool, err error) { - if !s.indicesReady.Load() || blockNum > s.segmentsAvailable.Load() { + if !s.indicesReady.Load() || blockNum > s.BlocksAvailable() { return false, nil } return s.Bodies.ViewSegment(blockNum, f) } func (s *RoSnapshots) ViewTxs(blockNum uint64, f func(sn *TxnSegment) error) (found bool, err error) { - if !s.indicesReady.Load() || blockNum > s.segmentsAvailable.Load() { + if !s.indicesReady.Load() || blockNum > s.BlocksAvailable() { return false, nil } return s.Txs.ViewSegment(blockNum, f) @@ -701,7 +715,7 @@ func noGaps(in []FileInfo) (out []FileInfo, err error) { continue } if f.From != prevTo { // no gaps - return nil, fmt.Errorf("[open snapshots] snapshot missed: from %d to %d", prevTo, f.From) + return nil, fmt.Errorf("snapshot missed: from %d to %d", prevTo, f.From) } prevTo = f.To out = append(out, f) @@ -884,6 +898,8 @@ type BlockRetire struct { tmpDir string snapshots *RoSnapshots db kv.RoDB + + snapshotDownloader proto_downloader.DownloaderClient } type BlockRetireResult struct { @@ -891,8 +907,8 @@ type BlockRetireResult struct { Err error } -func NewBlockRetire(workers int, tmpDir string, snapshots *RoSnapshots, db kv.RoDB) *BlockRetire { - return &BlockRetire{workers: workers, tmpDir: tmpDir, snapshots: snapshots, wg: &sync.WaitGroup{}, db: db} +func NewBlockRetire(workers int, tmpDir string, snapshots *RoSnapshots, db kv.RoDB, snapshotDownloader proto_downloader.DownloaderClient) *BlockRetire { + return &BlockRetire{workers: workers, tmpDir: tmpDir, snapshots: snapshots, wg: &sync.WaitGroup{}, db: db, snapshotDownloader: snapshotDownloader} } func (br *BlockRetire) Snapshots() *RoSnapshots { return br.snapshots } func (br *BlockRetire) Working() bool { return br.working.Load() } @@ -909,7 +925,16 @@ func (br *BlockRetire) CanRetire(curBlockNum uint64) (blockFrom, blockTo uint64, func canRetire(from, to uint64) (blockFrom, blockTo uint64, can bool) { blockFrom = (from / 1_000) * 1_000 roundedTo1K := (to / 1_000) * 1_000 - jump := roundedTo1K - blockFrom + var maxJump uint64 = 1_000 + if blockFrom%500_000 == 0 { + maxJump = 500_000 + } else if blockFrom%100_000 == 0 { + maxJump = 100_000 + } else if blockFrom%10_000 == 0 { + maxJump = 10_000 + } + //roundedTo1K := (to / 1_000) * 1_000 + jump := min(maxJump, roundedTo1K-blockFrom) switch { // only next segment sizes are allowed case jump >= 500_000: blockTo = blockFrom + 500_000 @@ -940,7 +965,7 @@ func (br *BlockRetire) RetireBlocksInBackground(ctx context.Context, blockFrom, defer br.working.Store(false) defer br.wg.Done() - err := retireBlocks(ctx, blockFrom, blockTo, chainID, br.tmpDir, br.snapshots, br.db, br.workers, lvl) + err := retireBlocks(ctx, blockFrom, blockTo, chainID, br.tmpDir, br.snapshots, br.db, br.workers, br.snapshotDownloader, lvl) br.result = &BlockRetireResult{ BlockFrom: blockFrom, BlockTo: blockTo, @@ -949,7 +974,7 @@ func (br *BlockRetire) RetireBlocksInBackground(ctx context.Context, blockFrom, }() } -func retireBlocks(ctx context.Context, blockFrom, blockTo uint64, chainID uint256.Int, tmpDir string, snapshots *RoSnapshots, db kv.RoDB, workers int, lvl log.Lvl) error { +func retireBlocks(ctx context.Context, blockFrom, blockTo uint64, chainID uint256.Int, tmpDir string, snapshots *RoSnapshots, db kv.RoDB, workers int, snapshotDownloader proto_downloader.DownloaderClient, lvl log.Lvl) error { log.Log(lvl, "[snapshots] Retire Blocks", "range", fmt.Sprintf("%dk-%dk", blockFrom/1000, blockTo/1000)) // in future we will do it in background if err := DumpBlocks(ctx, blockFrom, blockTo, DEFAULT_SEGMENT_SIZE, tmpDir, snapshots.Dir(), db, workers, lvl); err != nil { @@ -974,6 +999,20 @@ func retireBlocks(ctx context.Context, blockFrom, blockTo uint64, chainID uint25 return fmt.Errorf("ReopenIndices: %w", err) } + // start seed large .seg of large size + if blockTo-blockFrom == DEFAULT_SEGMENT_SIZE { + req := &proto_downloader.DownloadRequest{Items: make([]*proto_downloader.DownloadItem, len(AllSnapshotTypes))} + for _, t := range AllSnapshotTypes { + req.Items = append(req.Items, &proto_downloader.DownloadItem{ + Path: SegmentFileName(blockFrom, blockTo, t), + }) + } + if snapshotDownloader != nil { + if _, err := snapshotDownloader.Download(ctx, req); err != nil { + return err + } + } + } return nil } diff --git a/turbo/snapshotsync/block_snapshots_test.go b/turbo/snapshotsync/block_snapshots_test.go index f7d429822..272d62488 100644 --- a/turbo/snapshotsync/block_snapshots_test.go +++ b/turbo/snapshotsync/block_snapshots_test.go @@ -117,6 +117,7 @@ func TestCanRetire(t *testing.T) { {1_000_000, 1_120_000, 1_000_000, 1_100_000, true}, {2_500_000, 4_100_000, 2_500_000, 3_000_000, true}, {2_500_000, 2_500_100, 2_500_000, 2_500_000, false}, + {1_001_000, 2_000_000, 1_001_000, 1_002_000, true}, } for _, tc := range cases { from, to, can := canRetire(tc.inFrom, tc.inTo) @@ -174,6 +175,8 @@ func TestOpenAllSnapshot(t *testing.T) { err = s.ReopenSegments() require.NoError(err) + err = s.ReopenIndices() + require.NoError(err) s.indicesReady.Store(true) require.Equal(2, len(s.Headers.segments)) diff --git a/turbo/snapshotsync/snapshothashes/embed.go b/turbo/snapshotsync/snapshothashes/embed.go index c129cf325..79e617627 100644 --- a/turbo/snapshotsync/snapshothashes/embed.go +++ b/turbo/snapshotsync/snapshothashes/embed.go @@ -38,10 +38,7 @@ var ( ) func newConfig(preverified Preverified) *Config { - return &Config{ - ExpectBlocks: maxBlockNum(preverified), - Preverified: preverified, - } + return &Config{ExpectBlocks: maxBlockNum(preverified), Preverified: preverified} } func maxBlockNum(preverified Preverified) uint64 { diff --git a/turbo/stages/mock_sentry.go b/turbo/stages/mock_sentry.go index efc711312..75b43729f 100644 --- a/turbo/stages/mock_sentry.go +++ b/turbo/stages/mock_sentry.go @@ -326,7 +326,7 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey blockReader, ), stagedsync.StageIssuanceCfg(mock.DB, mock.ChainConfig, blockReader, true), - stagedsync.StageSendersCfg(mock.DB, mock.ChainConfig, mock.tmpdir, prune, snapshotsync.NewBlockRetire(1, mock.tmpdir, allSnapshots, mock.DB)), + stagedsync.StageSendersCfg(mock.DB, mock.ChainConfig, mock.tmpdir, prune, snapshotsync.NewBlockRetire(1, mock.tmpdir, allSnapshots, mock.DB, snapshotsDownloader)), stagedsync.StageExecuteBlocksCfg( mock.DB, prune, diff --git a/turbo/stages/stageloop.go b/turbo/stages/stageloop.go index 15c266fe3..43a25a122 100644 --- a/turbo/stages/stageloop.go +++ b/turbo/stages/stageloop.go @@ -302,7 +302,7 @@ func NewStagedSync( blockReader, ), stagedsync.StageIssuanceCfg(db, controlServer.ChainConfig, blockReader, cfg.EnabledIssuance), - stagedsync.StageSendersCfg(db, controlServer.ChainConfig, tmpdir, cfg.Prune, snapshotsync.NewBlockRetire(1, tmpdir, allSnapshots, db)), + stagedsync.StageSendersCfg(db, controlServer.ChainConfig, tmpdir, cfg.Prune, snapshotsync.NewBlockRetire(1, tmpdir, allSnapshots, db, snapshotDownloader)), stagedsync.StageExecuteBlocksCfg( db, cfg.Prune,