mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2024-12-23 04:03:49 +00:00
Snapshots: start seed new large .seg files (#3724)
This commit is contained in:
parent
d0db4ed62d
commit
f314222180
@ -204,6 +204,25 @@ func CalcStats(prevStats AggStats, interval time.Duration, client *torrent.Clien
|
||||
return result
|
||||
}
|
||||
|
||||
func AddTorrentFile(ctx context.Context, torrentFilePath string, torrentClient *torrent.Client) (mi *metainfo.MetaInfo, err error) {
|
||||
mi, err = metainfo.LoadFromFile(torrentFilePath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
mi.AnnounceList = Trackers
|
||||
|
||||
t := time.Now()
|
||||
_, err = torrentClient.AddTorrent(mi)
|
||||
if err != nil {
|
||||
return mi, err
|
||||
}
|
||||
took := time.Since(t)
|
||||
if took > 3*time.Second {
|
||||
log.Info("[torrent] Check validity", "file", torrentFilePath, "took", took)
|
||||
}
|
||||
return mi, nil
|
||||
}
|
||||
|
||||
// AddTorrentFiles - adding .torrent files to torrentClient (and checking their hashes), if .torrent file
|
||||
// added first time - pieces verification process will start (disk IO heavy) - Progress
|
||||
// kept in `piece completion storage` (surviving reboot). Once it done - no disk IO needed again.
|
||||
@ -214,27 +233,15 @@ func AddTorrentFiles(ctx context.Context, snapshotsDir *dir.Rw, torrentClient *t
|
||||
return err
|
||||
}
|
||||
for _, torrentFilePath := range files {
|
||||
if _, err := AddTorrentFile(ctx, torrentFilePath, torrentClient); err != nil {
|
||||
return err
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
mi, err := metainfo.LoadFromFile(torrentFilePath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
mi.AnnounceList = Trackers
|
||||
|
||||
t := time.Now()
|
||||
_, err = torrentClient.AddTorrent(mi)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
took := time.Since(t)
|
||||
if took > 3*time.Second {
|
||||
log.Info("[torrent] Check validity", "file", torrentFilePath, "took", took)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -3,6 +3,7 @@ package downloader
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/anacrolix/torrent"
|
||||
"github.com/anacrolix/torrent/metainfo"
|
||||
@ -58,8 +59,18 @@ type GrpcServer struct {
|
||||
func (s *GrpcServer) Download(ctx context.Context, request *proto_downloader.DownloadRequest) (*emptypb.Empty, error) {
|
||||
infoHashes := make([]metainfo.Hash, len(request.Items))
|
||||
for i, it := range request.Items {
|
||||
//TODO: if hash is empty - create .torrent file from path file (if it exists)
|
||||
infoHashes[i] = gointerfaces.ConvertH160toAddress(it.TorrentHash)
|
||||
if it.TorrentHash == nil {
|
||||
if err := BuildTorrentFileIfNeed(ctx, it.Path, s.snapshotDir); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
metaInfo, err := AddTorrentFile(ctx, filepath.Join(s.snapshotDir.Path, it.Path+".torrent"), s.t.TorrentClient)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
infoHashes[i] = metaInfo.HashInfoBytes()
|
||||
} else {
|
||||
infoHashes[i] = gointerfaces.ConvertH160toAddress(it.TorrentHash)
|
||||
}
|
||||
}
|
||||
if err := ResolveAbsentTorrents(ctx, s.t.TorrentClient, infoHashes, s.snapshotDir, s.silent); err != nil {
|
||||
return nil, err
|
@ -85,6 +85,24 @@ func allSegmentFiles(dir string) ([]string, error) {
|
||||
return res, nil
|
||||
}
|
||||
|
||||
// BuildTorrentFileIfNeed - create .torrent files from .seg files (big IO) - if .seg files were added manually
|
||||
func BuildTorrentFileIfNeed(ctx context.Context, originalFileName string, root *dir.Rw) (err error) {
|
||||
torrentFilePath := filepath.Join(root.Path, originalFileName+".torrent")
|
||||
if _, err := os.Stat(torrentFilePath); err != nil {
|
||||
if !errors.Is(err, os.ErrNotExist) {
|
||||
return err
|
||||
}
|
||||
info, err := BuildInfoBytesForFile(root.Path, originalFileName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := CreateTorrentFile(root, info, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// BuildTorrentFilesIfNeed - create .torrent files from .seg files (big IO) - if .seg files were added manually
|
||||
func BuildTorrentFilesIfNeed(ctx context.Context, root *dir.Rw) error {
|
||||
logEvery := time.NewTicker(20 * time.Second)
|
||||
@ -95,18 +113,8 @@ func BuildTorrentFilesIfNeed(ctx context.Context, root *dir.Rw) error {
|
||||
return err
|
||||
}
|
||||
for i, f := range files {
|
||||
torrentFileName := filepath.Join(root.Path, f+".torrent")
|
||||
if _, err := os.Stat(torrentFileName); err != nil {
|
||||
if !errors.Is(err, os.ErrNotExist) {
|
||||
return err
|
||||
}
|
||||
info, err := BuildInfoBytesForFile(root.Path, f)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := CreateTorrentFile(root, info, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := BuildTorrentFileIfNeed(ctx, f, root); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
select {
|
||||
|
@ -1 +1 @@
|
||||
Subproject commit c378dbeac796719b7b0210d4a97d575a6e8dc66f
|
||||
Subproject commit 0a2affcd4120a141e0d0ac8c7c43e2eaa7fee894
|
@ -579,7 +579,7 @@ func stageSenders(db kv.RwDB, ctx context.Context) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cfg := stagedsync.StageSendersCfg(db, chainConfig, tmpdir, pm, snapshotsync.NewBlockRetire(runtime.NumCPU(), tmpdir, allSnapshots(chainConfig), db))
|
||||
cfg := stagedsync.StageSendersCfg(db, chainConfig, tmpdir, pm, snapshotsync.NewBlockRetire(runtime.NumCPU(), tmpdir, allSnapshots(chainConfig), db, nil))
|
||||
if unwind > 0 {
|
||||
u := sync.NewUnwindState(stages.Senders, s.BlockNumber-unwind, s.BlockNumber)
|
||||
if err = stagedsync.UnwindSendersStage(u, tx, cfg, ctx); err != nil {
|
||||
@ -1058,6 +1058,7 @@ func allSnapshots(cc *params.ChainConfig) *snapshotsync.RoSnapshots {
|
||||
openSnapshotOnce.Do(func() {
|
||||
if enableSnapshot {
|
||||
snapshotCfg := ethconfig.NewSnapshotCfg(enableSnapshot, true)
|
||||
dir.MustExist(filepath.Join(datadir, "snapshots"))
|
||||
_allSnapshotsSingleton = snapshotsync.NewRoSnapshots(snapshotCfg, filepath.Join(datadir, "snapshots"))
|
||||
if err := _allSnapshotsSingleton.ReopenSegments(); err != nil {
|
||||
panic(err)
|
||||
|
@ -33,11 +33,9 @@ import (
|
||||
"github.com/ledgerwatch/erigon/core/types"
|
||||
"github.com/ledgerwatch/erigon/core/types/accounts"
|
||||
"github.com/ledgerwatch/erigon/core/vm"
|
||||
"github.com/ledgerwatch/erigon/eth"
|
||||
"github.com/ledgerwatch/erigon/eth/ethconfig"
|
||||
"github.com/ledgerwatch/erigon/params"
|
||||
"github.com/ledgerwatch/erigon/turbo/snapshotsync"
|
||||
"github.com/ledgerwatch/erigon/turbo/snapshotsync/snapshothashes"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -182,11 +180,6 @@ func Erigon2(genesis *core.Genesis, chainConfig *params.ChainConfig, logger log.
|
||||
engine := initConsensusEngine(chainConfig, logger)
|
||||
var blockReader interfaces.FullBlockReader
|
||||
if snapshotBlocks {
|
||||
snConfig := snapshothashes.KnownConfig(chainConfig.ChainName)
|
||||
snConfig.ExpectBlocks, err = eth.RestoreExpectedExternalSnapshot(historyDb, snConfig)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
allSnapshots := snapshotsync.NewRoSnapshots(ethconfig.NewSnapshotCfg(true, false), path.Join(datadir, "snapshots"))
|
||||
defer allSnapshots.Close()
|
||||
blockReader = snapshotsync.NewBlockReaderWithSnapshots(allSnapshots)
|
||||
|
@ -19,7 +19,6 @@ package eth
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/big"
|
||||
@ -77,7 +76,6 @@ import (
|
||||
"github.com/ledgerwatch/erigon/rpc"
|
||||
"github.com/ledgerwatch/erigon/turbo/shards"
|
||||
"github.com/ledgerwatch/erigon/turbo/snapshotsync"
|
||||
"github.com/ledgerwatch/erigon/turbo/snapshotsync/snapshothashes"
|
||||
"github.com/ledgerwatch/erigon/turbo/snapshotsync/snapshotsynccli"
|
||||
stages2 "github.com/ledgerwatch/erigon/turbo/stages"
|
||||
"github.com/ledgerwatch/log/v3"
|
||||
@ -313,12 +311,6 @@ func New(stack *node.Node, config *ethconfig.Config, txpoolCfg txpool2.Config, l
|
||||
var blockReader interfaces.FullBlockReader
|
||||
var allSnapshots *snapshotsync.RoSnapshots
|
||||
if config.Snapshot.Enabled {
|
||||
snConfig := snapshothashes.KnownConfig(chainConfig.ChainName)
|
||||
snConfig.ExpectBlocks, err = RestoreExpectedExternalSnapshot(chainKv, snConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
allSnapshots = snapshotsync.NewRoSnapshots(config.Snapshot, config.SnapshotDir.Path)
|
||||
allSnapshots.AsyncOpenAll(ctx)
|
||||
blockReader = snapshotsync.NewBlockReaderWithSnapshots(allSnapshots)
|
||||
@ -564,49 +556,6 @@ func New(stack *node.Node, config *ethconfig.Config, txpoolCfg txpool2.Config, l
|
||||
return backend, nil
|
||||
}
|
||||
|
||||
func RestoreExpectedExternalSnapshot(db kv.RwDB, snConfig *snapshothashes.Config) (uint64, error) {
|
||||
const SyncedWithSnapshot = "synced_with_snapshot"
|
||||
var snapshotToBlockInDB *uint64
|
||||
// Check if we have an already initialized chain and fall back to
|
||||
// that if so. Otherwise we need to generate a new genesis spec.
|
||||
if err := db.View(context.Background(), func(tx kv.Tx) error {
|
||||
v, err := tx.GetOne(kv.DatabaseInfo, []byte(SyncedWithSnapshot))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if v != nil {
|
||||
valueInDB := binary.BigEndian.Uint64(v)
|
||||
snapshotToBlockInDB = &valueInDB
|
||||
return nil
|
||||
}
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if snapshotToBlockInDB != nil {
|
||||
if *snapshotToBlockInDB != snConfig.ExpectBlocks {
|
||||
return *snapshotToBlockInDB, nil //
|
||||
//log.Warn(fmt.Sprintf("'incremental snapshots feature' not implemented yet. New snapshots available up to block %d, but this node was synced to snapshot %d and will keep other blocks in db. (it's safe, re-sync may reduce db size)", snapshotToBlockInDB, snConfig.ExpectBlocks))
|
||||
//snConfig.ExpectBlocks = *snapshotToBlockInDB
|
||||
}
|
||||
}
|
||||
|
||||
if err := db.Update(context.Background(), func(tx kv.RwTx) error {
|
||||
num := make([]byte, 8)
|
||||
binary.BigEndian.PutUint64(num, snConfig.ExpectBlocks)
|
||||
if err := tx.Put(kv.DatabaseInfo, []byte(SyncedWithSnapshot), num); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return snConfig.ExpectBlocks, nil
|
||||
}
|
||||
|
||||
func (s *Ethereum) APIs() []rpc.API {
|
||||
return []rpc.API{}
|
||||
}
|
||||
|
@ -1009,7 +1009,7 @@ func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.R
|
||||
expect := cfg.snapshotHashesCfg.ExpectBlocks
|
||||
if headers >= expect && bodies >= expect && txs >= expect {
|
||||
if err := cfg.snapshots.ReopenSegments(); err != nil {
|
||||
return err
|
||||
return fmt.Errorf("ReopenSegments: %w", err)
|
||||
}
|
||||
if expect > cfg.snapshots.BlocksAvailable() {
|
||||
return fmt.Errorf("not enough snapshots available: %d > %d", expect, cfg.snapshots.BlocksAvailable())
|
||||
@ -1031,31 +1031,21 @@ func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.R
|
||||
}
|
||||
|
||||
// Create .idx files
|
||||
if !cfg.snapshots.IndicesReady() {
|
||||
if cfg.snapshots.IndicesAvailable() < cfg.snapshots.SegmentsAvailable() {
|
||||
if !cfg.snapshots.SegmentsReady() {
|
||||
return fmt.Errorf("not all snapshot segments are available")
|
||||
}
|
||||
|
||||
// wait for Downloader service to download all expected snapshots
|
||||
logEvery := time.NewTicker(logInterval)
|
||||
defer logEvery.Stop()
|
||||
headers, bodies, txs, err := cfg.snapshots.IdxAvailability()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
expect := cfg.snapshotHashesCfg.ExpectBlocks
|
||||
if headers < expect || bodies < expect || txs < expect {
|
||||
if cfg.snapshots.IndicesAvailable() < cfg.snapshots.SegmentsAvailable() {
|
||||
chainID, _ := uint256.FromBig(cfg.chainConfig.ChainID)
|
||||
if err := snapshotsync.BuildIndices(ctx, cfg.snapshots, cfg.snapshotDir, *chainID, cfg.tmpdir, 0, log.LvlInfo); err != nil {
|
||||
if err := snapshotsync.BuildIndices(ctx, cfg.snapshots, cfg.snapshotDir, *chainID, cfg.tmpdir, cfg.snapshots.IndicesAvailable(), log.LvlInfo); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err := cfg.snapshots.ReopenIndices(); err != nil {
|
||||
return err
|
||||
}
|
||||
if expect > cfg.snapshots.IndicesAvailable() {
|
||||
return fmt.Errorf("not enough snapshots available: %d > %d", expect, cfg.snapshots.BlocksAvailable())
|
||||
return fmt.Errorf("ReopenIndices: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -401,16 +401,17 @@ func retireBlocks(s *PruneState, tx kv.RwTx, cfg SendersCfg, ctx context.Context
|
||||
}
|
||||
}
|
||||
if !cfg.blockRetire.Snapshots().Cfg().KeepBlocks {
|
||||
// TODO: remove this check for the release
|
||||
if err := cfg.blockRetire.Snapshots().EnsureExpectedBlocksAreAvailable(cfg.snapshotHashesCfg); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
canDeleteTo := cfg.blockRetire.CanDeleteTo(s.ForwardProgress)
|
||||
if err := rawdb.DeleteAncientBlocks(tx, canDeleteTo, 1_000); err != nil {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: remove this check for the release
|
||||
if err := cfg.blockRetire.Snapshots().EnsureExpectedBlocksAreAvailable(cfg.snapshotHashesCfg); err != nil {
|
||||
return err
|
||||
}
|
||||
blockFrom, blockTo, ok := cfg.blockRetire.CanRetire(s.ForwardProgress)
|
||||
if !ok {
|
||||
return nil
|
||||
|
@ -109,7 +109,7 @@ func TestSenders(t *testing.T) {
|
||||
|
||||
require.NoError(stages.SaveStageProgress(tx, stages.Bodies, 3))
|
||||
|
||||
cfg := StageSendersCfg(db, params.TestChainConfig, "", prune.Mode{}, snapshotsync.NewBlockRetire(1, "", nil, db))
|
||||
cfg := StageSendersCfg(db, params.TestChainConfig, "", prune.Mode{}, snapshotsync.NewBlockRetire(1, "", nil, db, nil))
|
||||
err := SpawnRecoverSendersStage(cfg, &StageState{ID: stages.Senders}, nil, tx, 3, ctx)
|
||||
assert.NoError(t, err)
|
||||
|
||||
|
@ -157,7 +157,7 @@ func doRetireCommand(cliCtx *cli.Context) error {
|
||||
snapshots := snapshotsync.NewRoSnapshots(cfg, snapshotDir)
|
||||
snapshots.ReopenSegments()
|
||||
|
||||
br := snapshotsync.NewBlockRetire(runtime.NumCPU()/2, tmpDir, snapshots, chainDB)
|
||||
br := snapshotsync.NewBlockRetire(runtime.NumCPU()/2, tmpDir, snapshots, chainDB, nil)
|
||||
|
||||
for i := from; i < to; i += every {
|
||||
br.RetireBlocksInBackground(ctx, i, i+every, *chainID, log.LvlInfo)
|
||||
|
@ -22,6 +22,7 @@ import (
|
||||
common2 "github.com/ledgerwatch/erigon-lib/common"
|
||||
"github.com/ledgerwatch/erigon-lib/common/dir"
|
||||
"github.com/ledgerwatch/erigon-lib/compress"
|
||||
proto_downloader "github.com/ledgerwatch/erigon-lib/gointerfaces/downloader"
|
||||
"github.com/ledgerwatch/erigon-lib/kv"
|
||||
"github.com/ledgerwatch/erigon-lib/recsplit"
|
||||
"github.com/ledgerwatch/erigon-lib/txpool"
|
||||
@ -346,12 +347,15 @@ func NewRoSnapshots(cfg ethconfig.Snapshot, snapshotDir string) *RoSnapshots {
|
||||
return &RoSnapshots{dir: snapshotDir, cfg: cfg, Headers: &headerSegments{}, Bodies: &bodySegments{}, Txs: &txnSegments{}}
|
||||
}
|
||||
|
||||
func (s *RoSnapshots) Cfg() ethconfig.Snapshot { return s.cfg }
|
||||
func (s *RoSnapshots) Dir() string { return s.dir }
|
||||
func (s *RoSnapshots) SegmentsReady() bool { return s.segmentsReady.Load() }
|
||||
func (s *RoSnapshots) BlocksAvailable() uint64 { return s.segmentsAvailable.Load() }
|
||||
func (s *RoSnapshots) IndicesReady() bool { return s.indicesReady.Load() }
|
||||
func (s *RoSnapshots) IndicesAvailable() uint64 { return s.idxAvailable.Load() }
|
||||
func (s *RoSnapshots) Cfg() ethconfig.Snapshot { return s.cfg }
|
||||
func (s *RoSnapshots) Dir() string { return s.dir }
|
||||
func (s *RoSnapshots) SegmentsReady() bool { return s.segmentsReady.Load() }
|
||||
func (s *RoSnapshots) IndicesReady() bool { return s.indicesReady.Load() }
|
||||
func (s *RoSnapshots) IndicesAvailable() uint64 { return s.idxAvailable.Load() }
|
||||
func (s *RoSnapshots) SegmentsAvailable() uint64 { return s.segmentsAvailable.Load() }
|
||||
func (s *RoSnapshots) BlocksAvailable() uint64 {
|
||||
return min(s.segmentsAvailable.Load(), s.idxAvailable.Load())
|
||||
}
|
||||
|
||||
func (s *RoSnapshots) EnsureExpectedBlocksAreAvailable(cfg *snapshothashes.Config) error {
|
||||
if s.BlocksAvailable() < cfg.ExpectBlocks {
|
||||
@ -396,18 +400,28 @@ func (s *RoSnapshots) ReopenSomeIndices(types ...Type) (err error) {
|
||||
defer s.Bodies.lock.Unlock()
|
||||
s.Txs.lock.Lock()
|
||||
defer s.Txs.lock.Unlock()
|
||||
Loop:
|
||||
for _, t := range types {
|
||||
switch t {
|
||||
case Headers:
|
||||
if err := s.Headers.reopen(s.dir); err != nil {
|
||||
if errors.Is(err, os.ErrNotExist) {
|
||||
break Loop
|
||||
}
|
||||
return err
|
||||
}
|
||||
case Bodies:
|
||||
if err := s.Bodies.reopen(s.dir); err != nil {
|
||||
if errors.Is(err, os.ErrNotExist) {
|
||||
break Loop
|
||||
}
|
||||
return err
|
||||
}
|
||||
case Transactions:
|
||||
if err := s.Txs.reopen(s.dir); err != nil {
|
||||
if errors.Is(err, os.ErrNotExist) {
|
||||
break Loop
|
||||
}
|
||||
return err
|
||||
}
|
||||
default:
|
||||
@ -530,19 +544,19 @@ func (s *RoSnapshots) closeSegmentsLocked() {
|
||||
}
|
||||
}
|
||||
func (s *RoSnapshots) ViewHeaders(blockNum uint64, f func(sn *HeaderSegment) error) (found bool, err error) {
|
||||
if !s.indicesReady.Load() || blockNum > s.segmentsAvailable.Load() {
|
||||
if !s.indicesReady.Load() || blockNum > s.BlocksAvailable() {
|
||||
return false, nil
|
||||
}
|
||||
return s.Headers.ViewSegment(blockNum, f)
|
||||
}
|
||||
func (s *RoSnapshots) ViewBodies(blockNum uint64, f func(sn *BodySegment) error) (found bool, err error) {
|
||||
if !s.indicesReady.Load() || blockNum > s.segmentsAvailable.Load() {
|
||||
if !s.indicesReady.Load() || blockNum > s.BlocksAvailable() {
|
||||
return false, nil
|
||||
}
|
||||
return s.Bodies.ViewSegment(blockNum, f)
|
||||
}
|
||||
func (s *RoSnapshots) ViewTxs(blockNum uint64, f func(sn *TxnSegment) error) (found bool, err error) {
|
||||
if !s.indicesReady.Load() || blockNum > s.segmentsAvailable.Load() {
|
||||
if !s.indicesReady.Load() || blockNum > s.BlocksAvailable() {
|
||||
return false, nil
|
||||
}
|
||||
return s.Txs.ViewSegment(blockNum, f)
|
||||
@ -701,7 +715,7 @@ func noGaps(in []FileInfo) (out []FileInfo, err error) {
|
||||
continue
|
||||
}
|
||||
if f.From != prevTo { // no gaps
|
||||
return nil, fmt.Errorf("[open snapshots] snapshot missed: from %d to %d", prevTo, f.From)
|
||||
return nil, fmt.Errorf("snapshot missed: from %d to %d", prevTo, f.From)
|
||||
}
|
||||
prevTo = f.To
|
||||
out = append(out, f)
|
||||
@ -884,6 +898,8 @@ type BlockRetire struct {
|
||||
tmpDir string
|
||||
snapshots *RoSnapshots
|
||||
db kv.RoDB
|
||||
|
||||
snapshotDownloader proto_downloader.DownloaderClient
|
||||
}
|
||||
|
||||
type BlockRetireResult struct {
|
||||
@ -891,8 +907,8 @@ type BlockRetireResult struct {
|
||||
Err error
|
||||
}
|
||||
|
||||
func NewBlockRetire(workers int, tmpDir string, snapshots *RoSnapshots, db kv.RoDB) *BlockRetire {
|
||||
return &BlockRetire{workers: workers, tmpDir: tmpDir, snapshots: snapshots, wg: &sync.WaitGroup{}, db: db}
|
||||
func NewBlockRetire(workers int, tmpDir string, snapshots *RoSnapshots, db kv.RoDB, snapshotDownloader proto_downloader.DownloaderClient) *BlockRetire {
|
||||
return &BlockRetire{workers: workers, tmpDir: tmpDir, snapshots: snapshots, wg: &sync.WaitGroup{}, db: db, snapshotDownloader: snapshotDownloader}
|
||||
}
|
||||
func (br *BlockRetire) Snapshots() *RoSnapshots { return br.snapshots }
|
||||
func (br *BlockRetire) Working() bool { return br.working.Load() }
|
||||
@ -909,7 +925,16 @@ func (br *BlockRetire) CanRetire(curBlockNum uint64) (blockFrom, blockTo uint64,
|
||||
func canRetire(from, to uint64) (blockFrom, blockTo uint64, can bool) {
|
||||
blockFrom = (from / 1_000) * 1_000
|
||||
roundedTo1K := (to / 1_000) * 1_000
|
||||
jump := roundedTo1K - blockFrom
|
||||
var maxJump uint64 = 1_000
|
||||
if blockFrom%500_000 == 0 {
|
||||
maxJump = 500_000
|
||||
} else if blockFrom%100_000 == 0 {
|
||||
maxJump = 100_000
|
||||
} else if blockFrom%10_000 == 0 {
|
||||
maxJump = 10_000
|
||||
}
|
||||
//roundedTo1K := (to / 1_000) * 1_000
|
||||
jump := min(maxJump, roundedTo1K-blockFrom)
|
||||
switch { // only next segment sizes are allowed
|
||||
case jump >= 500_000:
|
||||
blockTo = blockFrom + 500_000
|
||||
@ -940,7 +965,7 @@ func (br *BlockRetire) RetireBlocksInBackground(ctx context.Context, blockFrom,
|
||||
defer br.working.Store(false)
|
||||
defer br.wg.Done()
|
||||
|
||||
err := retireBlocks(ctx, blockFrom, blockTo, chainID, br.tmpDir, br.snapshots, br.db, br.workers, lvl)
|
||||
err := retireBlocks(ctx, blockFrom, blockTo, chainID, br.tmpDir, br.snapshots, br.db, br.workers, br.snapshotDownloader, lvl)
|
||||
br.result = &BlockRetireResult{
|
||||
BlockFrom: blockFrom,
|
||||
BlockTo: blockTo,
|
||||
@ -949,7 +974,7 @@ func (br *BlockRetire) RetireBlocksInBackground(ctx context.Context, blockFrom,
|
||||
}()
|
||||
}
|
||||
|
||||
func retireBlocks(ctx context.Context, blockFrom, blockTo uint64, chainID uint256.Int, tmpDir string, snapshots *RoSnapshots, db kv.RoDB, workers int, lvl log.Lvl) error {
|
||||
func retireBlocks(ctx context.Context, blockFrom, blockTo uint64, chainID uint256.Int, tmpDir string, snapshots *RoSnapshots, db kv.RoDB, workers int, snapshotDownloader proto_downloader.DownloaderClient, lvl log.Lvl) error {
|
||||
log.Log(lvl, "[snapshots] Retire Blocks", "range", fmt.Sprintf("%dk-%dk", blockFrom/1000, blockTo/1000))
|
||||
// in future we will do it in background
|
||||
if err := DumpBlocks(ctx, blockFrom, blockTo, DEFAULT_SEGMENT_SIZE, tmpDir, snapshots.Dir(), db, workers, lvl); err != nil {
|
||||
@ -974,6 +999,20 @@ func retireBlocks(ctx context.Context, blockFrom, blockTo uint64, chainID uint25
|
||||
return fmt.Errorf("ReopenIndices: %w", err)
|
||||
}
|
||||
|
||||
// start seed large .seg of large size
|
||||
if blockTo-blockFrom == DEFAULT_SEGMENT_SIZE {
|
||||
req := &proto_downloader.DownloadRequest{Items: make([]*proto_downloader.DownloadItem, len(AllSnapshotTypes))}
|
||||
for _, t := range AllSnapshotTypes {
|
||||
req.Items = append(req.Items, &proto_downloader.DownloadItem{
|
||||
Path: SegmentFileName(blockFrom, blockTo, t),
|
||||
})
|
||||
}
|
||||
if snapshotDownloader != nil {
|
||||
if _, err := snapshotDownloader.Download(ctx, req); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -117,6 +117,7 @@ func TestCanRetire(t *testing.T) {
|
||||
{1_000_000, 1_120_000, 1_000_000, 1_100_000, true},
|
||||
{2_500_000, 4_100_000, 2_500_000, 3_000_000, true},
|
||||
{2_500_000, 2_500_100, 2_500_000, 2_500_000, false},
|
||||
{1_001_000, 2_000_000, 1_001_000, 1_002_000, true},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
from, to, can := canRetire(tc.inFrom, tc.inTo)
|
||||
@ -174,6 +175,8 @@ func TestOpenAllSnapshot(t *testing.T) {
|
||||
|
||||
err = s.ReopenSegments()
|
||||
require.NoError(err)
|
||||
err = s.ReopenIndices()
|
||||
require.NoError(err)
|
||||
s.indicesReady.Store(true)
|
||||
require.Equal(2, len(s.Headers.segments))
|
||||
|
||||
|
@ -38,10 +38,7 @@ var (
|
||||
)
|
||||
|
||||
func newConfig(preverified Preverified) *Config {
|
||||
return &Config{
|
||||
ExpectBlocks: maxBlockNum(preverified),
|
||||
Preverified: preverified,
|
||||
}
|
||||
return &Config{ExpectBlocks: maxBlockNum(preverified), Preverified: preverified}
|
||||
}
|
||||
|
||||
func maxBlockNum(preverified Preverified) uint64 {
|
||||
|
@ -326,7 +326,7 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey
|
||||
blockReader,
|
||||
),
|
||||
stagedsync.StageIssuanceCfg(mock.DB, mock.ChainConfig, blockReader, true),
|
||||
stagedsync.StageSendersCfg(mock.DB, mock.ChainConfig, mock.tmpdir, prune, snapshotsync.NewBlockRetire(1, mock.tmpdir, allSnapshots, mock.DB)),
|
||||
stagedsync.StageSendersCfg(mock.DB, mock.ChainConfig, mock.tmpdir, prune, snapshotsync.NewBlockRetire(1, mock.tmpdir, allSnapshots, mock.DB, snapshotsDownloader)),
|
||||
stagedsync.StageExecuteBlocksCfg(
|
||||
mock.DB,
|
||||
prune,
|
||||
|
@ -302,7 +302,7 @@ func NewStagedSync(
|
||||
blockReader,
|
||||
),
|
||||
stagedsync.StageIssuanceCfg(db, controlServer.ChainConfig, blockReader, cfg.EnabledIssuance),
|
||||
stagedsync.StageSendersCfg(db, controlServer.ChainConfig, tmpdir, cfg.Prune, snapshotsync.NewBlockRetire(1, tmpdir, allSnapshots, db)),
|
||||
stagedsync.StageSendersCfg(db, controlServer.ChainConfig, tmpdir, cfg.Prune, snapshotsync.NewBlockRetire(1, tmpdir, allSnapshots, db, snapshotDownloader)),
|
||||
stagedsync.StageExecuteBlocksCfg(
|
||||
db,
|
||||
cfg.Prune,
|
||||
|
Loading…
Reference in New Issue
Block a user