diff --git a/cmd/integration/commands/reset_state.go b/cmd/integration/commands/reset_state.go index 2b6eb9563..9b6c7bfd4 100644 --- a/cmd/integration/commands/reset_state.go +++ b/cmd/integration/commands/reset_state.go @@ -229,7 +229,7 @@ func resetFinish(tx kv.RwTx) error { return nil } -func printStages(db kv.Getter) error { +func printStages(db kv.Tx) error { var err error var progress uint64 w := new(tabwriter.Writer) @@ -253,5 +253,16 @@ func printStages(db kv.Getter) error { } fmt.Fprintf(w, "--\n") fmt.Fprintf(w, "prune distance: %s\n\n", pm.String()) + + s1, err := db.ReadSequence(kv.EthTx) + if err != nil { + return err + } + s2, err := db.ReadSequence(kv.NonCanonicalTxs) + if err != nil { + return err + } + fmt.Fprintf(w, "sequence: EthTx=%d, NonCanonicalTx=%d\n\n", s1, s2) + return nil } diff --git a/cmd/integration/commands/stages.go b/cmd/integration/commands/stages.go index 960cc3914..e9ea85b7e 100644 --- a/cmd/integration/commands/stages.go +++ b/cmd/integration/commands/stages.go @@ -1063,10 +1063,7 @@ func allSnapshots(cc *params.ChainConfig) *snapshotsync.RoSnapshots { snapshotCfg := ethconfig.NewSnapshotCfg(enableSnapshot, true) dir.MustExist(filepath.Join(datadir, "snapshots")) _allSnapshotsSingleton = snapshotsync.NewRoSnapshots(snapshotCfg, filepath.Join(datadir, "snapshots")) - if err := _allSnapshotsSingleton.ReopenSegments(); err != nil { - panic(err) - } - if err := _allSnapshotsSingleton.ReopenSomeIndices(snapshotsync.AllSnapshotTypes...); err != nil { + if err := _allSnapshotsSingleton.Reopen(); err != nil { panic(err) } } diff --git a/cmd/rpcdaemon/cli/config.go b/cmd/rpcdaemon/cli/config.go index 07f792ce9..49e19a537 100644 --- a/cmd/rpcdaemon/cli/config.go +++ b/cmd/rpcdaemon/cli/config.go @@ -345,8 +345,7 @@ func RemoteServices(ctx context.Context, cfg httpcfg.HttpCfg, logger log.Logger, allSnapshots := snapshotsync.NewRoSnapshots(cfg.Snapshot, filepath.Join(cfg.DataDir, "snapshots")) allSnapshots.AsyncOpenAll(ctx) onNewSnapshot = func() { - allSnapshots.ReopenSegments() - allSnapshots.ReopenIndices() + allSnapshots.Reopen() } blockReader = snapshotsync.NewBlockReaderWithSnapshots(allSnapshots) } else { diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go index ad26eca25..8ff95d668 100644 --- a/core/rawdb/accessors_chain.go +++ b/core/rawdb/accessors_chain.go @@ -358,13 +358,19 @@ func WriteTransactions(db kv.RwTx, txs []types.Transaction, baseTxId uint64) err return nil } -func WriteRawTransactions(db kv.StatelessWriteTx, txs [][]byte, baseTxId uint64) error { +func WriteRawTransactions(db kv.RwTx, txs [][]byte, baseTxId uint64) error { txId := baseTxId for _, tx := range txs { txIdKey := make([]byte, 8) binary.BigEndian.PutUint64(txIdKey, txId) // If next Append returns KeyExists error - it means you need to open transaction in App code before calling this func. Batch is also fine. if err := db.Append(kv.EthTx, txIdKey, tx); err != nil { + c, err := db.Cursor(kv.EthTx) + if err != nil { + kk, _, _ := c.Last() + c.Close() + return fmt.Errorf("txId=%d, baseTxId=%d, lastInDb=%d, %w", txId, baseTxId, binary.BigEndian.Uint64(kk), err) + } return err } txId++ @@ -473,6 +479,18 @@ func RawTransactionsRange(db kv.Getter, from, to uint64) (res [][]byte, err erro // ResetSequence - allow set arbitrary value to sequence (for example to decrement it to exact value) func ResetSequence(tx kv.RwTx, bucket string, newValue uint64) error { + c, err := tx.Cursor(bucket) + if err != nil { + return err + } + k, _, err := c.Last() + if err != nil { + return err + } + if k != nil && binary.BigEndian.Uint64(k) >= newValue { + panic(fmt.Sprintf("must not happen. ResetSequence: %s, %d < lastInDB: %d\n", bucket, newValue, binary.BigEndian.Uint64(k))) + } + newVBytes := make([]byte, 8) binary.BigEndian.PutUint64(newVBytes, newValue) if err := tx.Put(kv.Sequence, []byte(bucket), newVBytes); err != nil { @@ -529,7 +547,7 @@ func ReadSenders(db kv.Getter, hash common.Hash, number uint64) ([]common.Addres return senders, nil } -func WriteRawBodyIfNotExists(db kv.StatelessRwTx, hash common.Hash, number uint64, body *types.RawBody) error { +func WriteRawBodyIfNotExists(db kv.RwTx, hash common.Hash, number uint64, body *types.RawBody) error { exists, err := db.Has(kv.BlockBody, dbutils.BlockBodyKey(number, hash)) if err != nil { return err @@ -540,7 +558,7 @@ func WriteRawBodyIfNotExists(db kv.StatelessRwTx, hash common.Hash, number uint6 return WriteRawBody(db, hash, number, body) } -func WriteRawBody(db kv.StatelessRwTx, hash common.Hash, number uint64, body *types.RawBody) error { +func WriteRawBody(db kv.RwTx, hash common.Hash, number uint64, body *types.RawBody) error { baseTxId, err := db.IncrementSequence(kv.EthTx, uint64(len(body.Transactions))+2) if err != nil { return err @@ -551,10 +569,10 @@ func WriteRawBody(db kv.StatelessRwTx, hash common.Hash, number uint64, body *ty Uncles: body.Uncles, } if err = WriteBodyForStorage(db, hash, number, &data); err != nil { - return fmt.Errorf("failed to write body: %w", err) + return fmt.Errorf("WriteBodyForStorage: %w", err) } if err = WriteRawTransactions(db, body.Transactions, baseTxId+1); err != nil { - return fmt.Errorf("failed to WriteRawTransactions: %w", err) + return fmt.Errorf("WriteRawTransactions: %w", err) } return nil } @@ -600,7 +618,7 @@ func DeleteBody(db kv.Deleter, hash common.Hash, number uint64) { } // MakeBodiesCanonical - move all txs of non-canonical blocks from NonCanonicalTxs table to EthTx table -func MakeBodiesCanonical(tx kv.StatelessRwTx, from uint64, ctx context.Context, logPrefix string, logEvery *time.Ticker) error { +func MakeBodiesCanonical(tx kv.RwTx, from uint64, ctx context.Context, logPrefix string, logEvery *time.Ticker) error { for blockNum := from; ; blockNum++ { h, err := ReadCanonicalHash(tx, blockNum) if err != nil { @@ -717,6 +735,18 @@ func MakeBodiesNonCanonical(tx kv.RwTx, from uint64, ctx context.Context, logPre // EthTx must have canonical id's - means need decrement it's sequence on unwind if firstMovedTxnIDIsSet { + c, err := tx.Cursor(kv.EthTx) + if err != nil { + return err + } + k, _, err := c.Last() + if err != nil { + return err + } + if k != nil && binary.BigEndian.Uint64(k) >= firstMovedTxnID { + panic(fmt.Sprintf("must not happen, ResetSequence: %d, lastInDB: %d\n", firstMovedTxnID, binary.BigEndian.Uint64(k))) + } + if err := ResetSequence(tx, kv.EthTx, firstMovedTxnID); err != nil { return err } diff --git a/eth/stagedsync/stage_bodies.go b/eth/stagedsync/stage_bodies.go index 39a3e81c4..3b5a7d6fb 100644 --- a/eth/stagedsync/stage_bodies.go +++ b/eth/stagedsync/stage_bodies.go @@ -189,7 +189,7 @@ Loop: // Check existence before write - because WriteRawBody isn't idempotent (it allocates new sequence range for transactions on every call) if err = rawdb.WriteRawBodyIfNotExists(tx, header.Hash(), blockHeight, rawBody); err != nil { - return fmt.Errorf("writing block body: %w", err) + return fmt.Errorf("WriteRawBodyIfNotExists: %w", err) } if blockHeight > bodyProgress { diff --git a/eth/stagedsync/stage_headers.go b/eth/stagedsync/stage_headers.go index e12f91c86..f12060b95 100644 --- a/eth/stagedsync/stage_headers.go +++ b/eth/stagedsync/stage_headers.go @@ -952,14 +952,14 @@ func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.R if err := WaitForDownloader(ctx, tx, cfg); err != nil { return err } - if err := cfg.snapshots.ReopenSegments(); err != nil { + if err := cfg.snapshots.Reopen(); err != nil { return fmt.Errorf("ReopenSegments: %w", err) } expect := cfg.snapshotHashesCfg.ExpectBlocks if cfg.snapshots.SegmentsAvailable() < expect { return fmt.Errorf("not enough snapshots available: %d > %d", expect, cfg.snapshots.SegmentsAvailable()) } - if err := cfg.snapshots.ReopenIndices(); err != nil { + if err := cfg.snapshots.Reopen(); err != nil { return fmt.Errorf("ReopenIndices: %w", err) } @@ -977,7 +977,7 @@ func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.R } } - if err := cfg.snapshots.ReopenIndices(); err != nil { + if err := cfg.snapshots.Reopen(); err != nil { return fmt.Errorf("ReopenIndices: %w", err) } } diff --git a/migrations/txs_begin_end.go b/migrations/txs_begin_end.go index 8be749d24..c857dd71f 100644 --- a/migrations/txs_begin_end.go +++ b/migrations/txs_begin_end.go @@ -224,7 +224,7 @@ var txsBeginEnd = Migration{ }, } -func writeRawBodyDeprecated(db kv.StatelessRwTx, hash common.Hash, number uint64, body *types.RawBody) error { +func writeRawBodyDeprecated(db kv.RwTx, hash common.Hash, number uint64, body *types.RawBody) error { baseTxId, err := db.IncrementSequence(kv.EthTx, uint64(len(body.Transactions))) if err != nil { return err diff --git a/turbo/app/snapshots.go b/turbo/app/snapshots.go index 7c8ca0b83..0040d06bf 100644 --- a/turbo/app/snapshots.go +++ b/turbo/app/snapshots.go @@ -155,7 +155,7 @@ func doRetireCommand(cliCtx *cli.Context) error { chainConfig := tool.ChainConfigFromDB(chainDB) chainID, _ := uint256.FromBig(chainConfig.ChainID) snapshots := snapshotsync.NewRoSnapshots(cfg, snapshotDir) - snapshots.ReopenSegments() + snapshots.Reopen() br := snapshotsync.NewBlockRetire(runtime.NumCPU()/2, tmpDir, snapshots, chainDB, nil, nil) @@ -215,7 +215,7 @@ func rebuildIndices(ctx context.Context, chainDB kv.RoDB, cfg ethconfig.Snapshot chainID, _ := uint256.FromBig(chainConfig.ChainID) allSnapshots := snapshotsync.NewRoSnapshots(cfg, snapshotDir.Path) - if err := allSnapshots.ReopenSegments(); err != nil { + if err := allSnapshots.Reopen(); err != nil { return err } if err := snapshotsync.BuildIndices(ctx, allSnapshots, snapshotDir, *chainID, tmpDir, from, log.LvlInfo); err != nil { @@ -281,8 +281,7 @@ func checkBlockSnapshot(chaindata string) error { cfg := ethconfig.NewSnapshotCfg(true, true) snapshots := snapshotsync.NewRoSnapshots(cfg, filepath.Join(datadir, "snapshots")) - snapshots.ReopenSegments() - snapshots.ReopenIndices() + snapshots.Reopen() //if err := snapshots.BuildIndices(context.Background(), *chainID); err != nil { // panic(err) //} diff --git a/turbo/snapshotsync/block_reader.go b/turbo/snapshotsync/block_reader.go index 1862bfaa7..5ccc55fad 100644 --- a/turbo/snapshotsync/block_reader.go +++ b/turbo/snapshotsync/block_reader.go @@ -192,6 +192,13 @@ func (back *BlockReaderWithSnapshots) HeaderByNumber(ctx context.Context, tx kv. ok, err := back.sn.ViewHeaders(blockHeight, func(segment *HeaderSegment) error { if segment.idxHeaderHash == nil { fmt.Printf("why? %d, %d, %d, %d, %d\n", blockHeight, segment.From, segment.To, back.sn.segmentsAvailable.Load(), back.sn.idxAvailable.Load()) + back.sn.PrintDebug() + for _, sn := range back.sn.Headers.segments { + if sn.idxHeaderHash == nil { + fmt.Printf("seg with nil idx: %d,%d\n", segment.From, segment.To) + } + } + fmt.Printf("==== end debug print ====\n") } h, err = back.headerFromSnapshot(blockHeight, segment, nil) if err != nil { diff --git a/turbo/snapshotsync/block_snapshots.go b/turbo/snapshotsync/block_snapshots.go index b5adba076..b3396c3ab 100644 --- a/turbo/snapshotsync/block_snapshots.go +++ b/turbo/snapshotsync/block_snapshots.go @@ -447,10 +447,7 @@ func (s *RoSnapshots) AsyncOpenAll(ctx context.Context) { return default: } - if err := s.ReopenSegments(); err != nil && !errors.Is(err, os.ErrNotExist) && !errors.Is(err, ErrSnapshotMissed) { - log.Error("AsyncOpenAll", "err", err) - } - if err := s.ReopenIndices(); err != nil && !errors.Is(err, os.ErrNotExist) && !errors.Is(err, ErrSnapshotMissed) { + if err := s.Reopen(); err != nil && !errors.Is(err, os.ErrNotExist) && !errors.Is(err, ErrSnapshotMissed) { log.Error("AsyncOpenAll", "err", err) } time.Sleep(15 * time.Second) @@ -458,6 +455,96 @@ func (s *RoSnapshots) AsyncOpenAll(ctx context.Context) { }() } +func (s *RoSnapshots) Reopen() error { + s.Headers.lock.Lock() + defer s.Headers.lock.Unlock() + s.Bodies.lock.Lock() + defer s.Bodies.lock.Unlock() + s.Txs.lock.Lock() + defer s.Txs.lock.Unlock() + s.closeSegmentsLocked() + files, err := segments2(s.dir) + if err != nil { + return err + } + for _, f := range files { + { + seg := &BodySegment{From: f.From, To: f.To} + fileName := SegmentFileName(f.From, f.To, Bodies) + seg.seg, err = compress.NewDecompressor(path.Join(s.dir, fileName)) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + break + } + return err + } + s.Bodies.segments = append(s.Bodies.segments, seg) + } + { + seg := &HeaderSegment{From: f.From, To: f.To} + fileName := SegmentFileName(f.From, f.To, Headers) + seg.seg, err = compress.NewDecompressor(path.Join(s.dir, fileName)) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + break + } + return err + } + s.Headers.segments = append(s.Headers.segments, seg) + } + { + seg := &TxnSegment{From: f.From, To: f.To} + fileName := SegmentFileName(f.From, f.To, Transactions) + seg.Seg, err = compress.NewDecompressor(path.Join(s.dir, fileName)) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + break + } + return err + } + s.Txs.segments = append(s.Txs.segments, seg) + } + + if f.To > 0 { + s.segmentsAvailable.Store(f.To - 1) + } else { + s.segmentsAvailable.Store(0) + } + } + s.segmentsReady.Store(true) + + for _, sn := range s.Headers.segments { + sn.idxHeaderHash, err = recsplit.OpenIndex(path.Join(s.dir, IdxFileName(sn.From, sn.To, Headers.String()))) + if err != nil && !errors.Is(err, os.ErrNotExist) { + return err + } + } + for _, sn := range s.Bodies.segments { + sn.idxBodyNumber, err = recsplit.OpenIndex(path.Join(s.dir, IdxFileName(sn.From, sn.To, Bodies.String()))) + if err != nil && !errors.Is(err, os.ErrNotExist) { + return err + } + } + for _, sn := range s.Txs.segments { + sn.IdxTxnHash, err = recsplit.OpenIndex(path.Join(s.dir, IdxFileName(sn.From, sn.To, Transactions.String()))) + if err != nil && !errors.Is(err, os.ErrNotExist) { + return err + } + sn.IdxTxnId, err = recsplit.OpenIndex(path.Join(s.dir, IdxFileName(sn.From, sn.To, TransactionsId.String()))) + if err != nil && !errors.Is(err, os.ErrNotExist) { + return err + } + sn.IdxTxnHash2BlockNum, err = recsplit.OpenIndex(path.Join(s.dir, IdxFileName(sn.From, sn.To, Transactions2Block.String()))) + if err != nil && !errors.Is(err, os.ErrNotExist) { + return err + } + } + + s.idxAvailable.Store(s.idxAvailability()) + s.indicesReady.Store(true) + + return nil +} func (s *RoSnapshots) ReopenSegments() error { s.Headers.lock.Lock() defer s.Headers.lock.Unlock() @@ -484,6 +571,7 @@ func (s *RoSnapshots) ReopenSegments() error { s.Bodies.segments = append(s.Bodies.segments, seg) } { + fmt.Printf("reopen segment: %d-%d\n", f.From, f.To) seg := &HeaderSegment{From: f.From, To: f.To} fileName := SegmentFileName(f.From, f.To, Headers) seg.seg, err = compress.NewDecompressor(path.Join(s.dir, fileName)) @@ -542,12 +630,12 @@ func (s *RoSnapshots) closeSegmentsLocked() { } } func (s *RoSnapshots) PrintDebug() { - s.Headers.lock.Lock() - defer s.Headers.lock.Unlock() - s.Bodies.lock.Lock() - defer s.Bodies.lock.Unlock() - s.Txs.lock.Lock() - defer s.Txs.lock.Unlock() + s.Headers.lock.RLock() + defer s.Headers.lock.RUnlock() + s.Bodies.lock.RLock() + defer s.Bodies.lock.RUnlock() + s.Txs.lock.RLock() + defer s.Txs.lock.RUnlock() fmt.Printf("sn: %d, %d\n", s.segmentsAvailable.Load(), s.idxAvailable.Load()) fmt.Println(" == Snapshots, Header") for _, sn := range s.Headers.segments { @@ -956,7 +1044,7 @@ func retireBlocks(ctx context.Context, blockFrom, blockTo uint64, chainID uint25 if err := DumpBlocks(ctx, blockFrom, blockTo, DEFAULT_SEGMENT_SIZE, tmpDir, snapshots.Dir(), db, workers, lvl); err != nil { return fmt.Errorf("DumpBlocks: %w", err) } - if err := snapshots.ReopenSegments(); err != nil { + if err := snapshots.Reopen(); err != nil { return fmt.Errorf("ReopenSegments: %w", err) } merger := NewMerger(tmpDir, workers, lvl, chainID) @@ -1799,7 +1887,7 @@ func (m *Merger) Merge(ctx context.Context, snapshots *RoSnapshots, mergeRanges if len(mergeRanges) == 0 { return nil } - logEvery := time.NewTicker(20 * time.Second) + logEvery := time.NewTicker(30 * time.Second) defer logEvery.Stop() log.Log(m.lvl, "[snapshots] Merge segments", "ranges", fmt.Sprintf("%v", mergeRanges)) for _, r := range mergeRanges { @@ -1809,7 +1897,7 @@ func (m *Merger) Merge(ctx context.Context, snapshots *RoSnapshots, mergeRanges } { segFilePath := filepath.Join(snapshotDir.Path, SegmentFileName(r.from, r.to, Bodies)) - if err := m.merge(ctx, toMergeBodies, segFilePath); err != nil { + if err := m.merge(ctx, toMergeBodies, segFilePath, logEvery); err != nil { return fmt.Errorf("mergeByAppendSegments: %w", err) } if doIndex { @@ -1821,7 +1909,7 @@ func (m *Merger) Merge(ctx context.Context, snapshots *RoSnapshots, mergeRanges { segFilePath := filepath.Join(snapshotDir.Path, SegmentFileName(r.from, r.to, Headers)) - if err := m.merge(ctx, toMergeHeaders, segFilePath); err != nil { + if err := m.merge(ctx, toMergeHeaders, segFilePath, logEvery); err != nil { return fmt.Errorf("mergeByAppendSegments: %w", err) } if doIndex { @@ -1833,7 +1921,7 @@ func (m *Merger) Merge(ctx context.Context, snapshots *RoSnapshots, mergeRanges { segFilePath := filepath.Join(snapshotDir.Path, SegmentFileName(r.from, r.to, Transactions)) - if err := m.merge(ctx, toMergeTxs, segFilePath); err != nil { + if err := m.merge(ctx, toMergeTxs, segFilePath, logEvery); err != nil { return fmt.Errorf("mergeByAppendSegments: %w", err) } if doIndex { @@ -1843,10 +1931,7 @@ func (m *Merger) Merge(ctx context.Context, snapshots *RoSnapshots, mergeRanges } } - if err := snapshots.ReopenSegments(); err != nil { - return fmt.Errorf("ReopenSegments: %w", err) - } - if err := snapshots.ReopenIndices(); err != nil { + if err := snapshots.Reopen(); err != nil { return fmt.Errorf("ReopenSegments: %w", err) } if err := m.removeOldFiles(toMergeHeaders, snapshotDir); err != nil { @@ -1865,27 +1950,30 @@ func (m *Merger) Merge(ctx context.Context, snapshots *RoSnapshots, mergeRanges return nil } -func (m *Merger) merge(ctx context.Context, toMerge []string, targetFile string) error { - fileNames := make([]string, len(toMerge)) - for i, f := range toMerge { - _, fName := filepath.Split(f) - fileNames[i] = fName - } +func (m *Merger) merge(ctx context.Context, toMerge []string, targetFile string, logEvery *time.Ticker) error { f, err := compress.NewCompressor(ctx, "merge", targetFile, m.tmpDir, compress.MinPatternScore, m.workers) if err != nil { return err } defer f.Close() var word = make([]byte, 0, 4096) - for _, cFile := range toMerge { + var cnt, total int + cList := make([]*compress.Decompressor, len(toMerge)) + for i, cFile := range toMerge { d, err := compress.NewDecompressor(cFile) if err != nil { return err } defer d.Close() + cList[i] = d + total += d.Count() + } + + for _, d := range cList { if err := d.WithReadAhead(func() error { g := d.MakeGetter() for g.HasNext() { + cnt++ word, _ = g.Next(word[:0]) if err := f.AddWord(word); err != nil { return err @@ -1893,6 +1981,9 @@ func (m *Merger) merge(ctx context.Context, toMerge []string, targetFile string) select { case <-ctx.Done(): return ctx.Err() + case <-logEvery.C: + _, fName := filepath.Split(targetFile) + log.Info("[snapshots] Merge", "progress", fmt.Sprintf("%.2f%%", 100*float64(cnt)/float64(total)), "to", fName) default: } } diff --git a/turbo/snapshotsync/block_snapshots_test.go b/turbo/snapshotsync/block_snapshots_test.go index 37bf04687..f8efdbe82 100644 --- a/turbo/snapshotsync/block_snapshots_test.go +++ b/turbo/snapshotsync/block_snapshots_test.go @@ -87,7 +87,7 @@ func TestMergeSnapshots(t *testing.T) { cfg := ethconfig.Snapshot{Enabled: true} s := NewRoSnapshots(cfg, dir) defer s.Close() - require.NoError(s.ReopenSegments()) + require.NoError(s.Reopen()) { merger := NewMerger(dir, 1, log.LvlInfo, uint256.Int{}) @@ -161,7 +161,7 @@ func TestOpenAllSnapshot(t *testing.T) { createFile := func(from, to uint64, name Type) { createTestSegmentFile(t, from, to, name, dir) } s := NewRoSnapshots(cfg, dir) defer s.Close() - err := s.ReopenSegments() + err := s.Reopen() require.NoError(err) require.Equal(0, len(s.Headers.segments)) s.Close() @@ -175,7 +175,7 @@ func TestOpenAllSnapshot(t *testing.T) { createFile(500_000, 1_000_000, Headers) createFile(500_000, 1_000_000, Transactions) s = NewRoSnapshots(cfg, dir) - err = s.ReopenSegments() + err = s.Reopen() require.Error(err) require.Equal(0, len(s.Headers.segments)) //because, no gaps are allowed (expect snapshots from block 0) s.Close() @@ -186,11 +186,8 @@ func TestOpenAllSnapshot(t *testing.T) { s = NewRoSnapshots(cfg, dir) defer s.Close() - err = s.ReopenSegments() + err = s.Reopen() require.NoError(err) - err = s.ReopenIndices() - require.NoError(err) - s.indicesReady.Store(true) require.Equal(2, len(s.Headers.segments)) ok, err := s.ViewTxs(10, func(sn *TxnSegment) error { @@ -217,7 +214,7 @@ func TestOpenAllSnapshot(t *testing.T) { // ExpectedBlocks - says only how much block must come from Torrent chainSnapshotCfg.ExpectBlocks = 500_000 - 1 s = NewRoSnapshots(cfg, dir) - err = s.ReopenSegments() + err = s.Reopen() require.NoError(err) defer s.Close() require.Equal(2, len(s.Headers.segments)) @@ -228,7 +225,7 @@ func TestOpenAllSnapshot(t *testing.T) { chainSnapshotCfg.ExpectBlocks = math.MaxUint64 s = NewRoSnapshots(cfg, dir) defer s.Close() - err = s.ReopenSegments() + err = s.Reopen() require.NoError(err) }