diff --git a/turbo/snapshotsync/block_snapshots.go b/turbo/snapshotsync/block_snapshots.go index de7d002cc..6fd4b06ae 100644 --- a/turbo/snapshotsync/block_snapshots.go +++ b/turbo/snapshotsync/block_snapshots.go @@ -627,23 +627,16 @@ func RetireBlocks(ctx context.Context, blockFrom, blockTo uint64, chainID uint25 return fmt.Errorf("ReopenSegments: %w", err) } merger := NewMerger(tmpDir, workers, lvl) - toMergeHeaders, toMergeBodies, toMergeTxs, from, to, recommendedMerge := merger.FindCandidates(snapshots) - if !recommendedMerge { + ranges := merger.FindMergeRanges(snapshots) + if len(ranges) == 0 { return nil } - if err := merger.Merge(ctx, toMergeHeaders, toMergeBodies, toMergeTxs, from, to, &dir.Rw{Path: snapshots.Dir()}); err != nil { + if err := merger.Merge(ctx, snapshots, ranges, &dir.Rw{Path: snapshots.Dir()}); err != nil { return err } - snapshots.Close() - if err := merger.RemoveOldFiles(toMergeHeaders, toMergeBodies, toMergeTxs, &dir.Rw{Path: snapshots.Dir()}); err != nil { - return err - } - if err := snapshots.ReopenSegments(); err != nil { - return fmt.Errorf("ReopenSegments: %w", err) - } - log.Log(lvl, "[snapshots] Merge done. Indexing new segments", "from", from) - if err := BuildIndices(ctx, snapshots, &dir.Rw{Path: snapshots.Dir()}, chainID, tmpDir, from, lvl); err != nil { + log.Log(lvl, "[snapshots] Merge done. Indexing new segments", "from", ranges[0].from) + if err := BuildIndices(ctx, snapshots, &dir.Rw{Path: snapshots.Dir()}, chainID, tmpDir, ranges[0].from, lvl); err != nil { return fmt.Errorf("BuildIndices: %w", err) } if err := snapshots.ReopenIndices(); err != nil { @@ -1253,41 +1246,100 @@ type Merger struct { func NewMerger(tmpDir string, workers int, lvl log.Lvl) *Merger { return &Merger{tmpDir: tmpDir, workers: workers, lvl: lvl} } -func (*Merger) FindCandidates(snapshots *RoSnapshots) (toMergeHeaders, toMergeBodies, toMergeTxs []string, from, to uint64, mergeRecommended bool) { - var stopAt uint64 - // merge segments - for _, sn := range snapshots.blocks { + +/* + a.fileLocks[fType].RLock() + defer a.fileLocks[fType].RUnlock() + var maxEndBlock uint64 + a.files[fType].Ascend(func(i btree.Item) bool { + item := i.(*byEndBlockItem) + if item.decompressor == nil { + return true // Skip B-tree based items + } + pre = append(pre, item) + if aggTo == 0 { + var doubleEnd uint64 + nextDouble := item.endBlock + for nextDouble <= maxEndBlock && nextDouble-item.startBlock < maxSpan { + doubleEnd = nextDouble + nextDouble = doubleEnd + (doubleEnd - item.startBlock) + 1 + } + if doubleEnd != item.endBlock { + aggFrom = item.startBlock + aggTo = doubleEnd + } else { + post = append(post, item) + return true + } + } + toAggregate = append(toAggregate, item) + return item.endBlock < aggTo + }) +*/ +type mergeRange struct { + from, to uint64 +} + +func (*Merger) FindMergeRanges(snapshots *RoSnapshots) (res []mergeRange) { + for i := len(snapshots.blocks) - 1; i > 0; i-- { + sn := snapshots.blocks[i] if sn.To-sn.From >= DEFAULT_SEGMENT_SIZE { // is complete .seg continue } - if from == 0 { - from = sn.From - stopAt = chooseSegmentEnd(from, from+DEFAULT_SEGMENT_SIZE, DEFAULT_SEGMENT_SIZE) - } - if sn.To > stopAt { + for _, span := range []uint64{500_000, 100_000, 10_000} { + if sn.To%span != 0 { + continue + } + if sn.To-sn.From == span { + break + } + aggFrom := sn.To - span + res = append(res, mergeRange{from: aggFrom, to: sn.To}) + for snapshots.blocks[i].From > aggFrom { + i-- + } break } - to = sn.To + } + sort.Slice(res, func(i, j int) bool { return res[i].from < res[j].from }) + return res +} +func (m *Merger) filesByRange(snapshots *RoSnapshots, from, to uint64) (toMergeHeaders, toMergeBodies, toMergeTxs []string) { + for _, sn := range snapshots.blocks { + if sn.From < from { + continue + } + if sn.To > to { + break + } + toMergeBodies = append(toMergeBodies, sn.Bodies.FilePath()) toMergeHeaders = append(toMergeHeaders, sn.Headers.FilePath()) toMergeTxs = append(toMergeTxs, sn.Transactions.FilePath()) } - enoughSmallSegments := len(toMergeHeaders) >= MERGE_THRESHOLD - canFormNewCompleteSegment := to-from == DEFAULT_SEGMENT_SIZE && len(toMergeHeaders) > 1 - mergeRecommended = enoughSmallSegments || canFormNewCompleteSegment return } -func (m *Merger) Merge(ctx context.Context, toMergeHeaders, toMergeBodies, toMergeTxs []string, from, to uint64, snapshotDir *dir.Rw) error { - if err := m.merge(ctx, toMergeBodies, filepath.Join(snapshotDir.Path, SegmentFileName(from, to, Bodies))); err != nil { - return fmt.Errorf("mergeByAppendSegments: %w", err) - } - if err := m.merge(ctx, toMergeHeaders, filepath.Join(snapshotDir.Path, SegmentFileName(from, to, Headers))); err != nil { - return fmt.Errorf("mergeByAppendSegments: %w", err) - } - if err := m.merge(ctx, toMergeTxs, filepath.Join(snapshotDir.Path, SegmentFileName(from, to, Transactions))); err != nil { - return fmt.Errorf("mergeByAppendSegments: %w", err) +func (m *Merger) Merge(ctx context.Context, snapshots *RoSnapshots, mergeRanges []mergeRange, snapshotDir *dir.Rw) error { + for _, r := range mergeRanges { + toMergeHeaders, toMergeBodies, toMergeTxs := m.filesByRange(snapshots, r.from, r.to) + if err := m.merge(ctx, toMergeBodies, filepath.Join(snapshotDir.Path, SegmentFileName(r.from, r.to, Bodies))); err != nil { + return fmt.Errorf("mergeByAppendSegments: %w", err) + } + if err := m.merge(ctx, toMergeHeaders, filepath.Join(snapshotDir.Path, SegmentFileName(r.from, r.to, Headers))); err != nil { + return fmt.Errorf("mergeByAppendSegments: %w", err) + } + if err := m.merge(ctx, toMergeTxs, filepath.Join(snapshotDir.Path, SegmentFileName(r.from, r.to, Transactions))); err != nil { + return fmt.Errorf("mergeByAppendSegments: %w", err) + } + snapshots.Close() + if err := m.RemoveOldFiles(toMergeHeaders, toMergeBodies, toMergeTxs, &dir.Rw{Path: snapshots.Dir()}); err != nil { + return err + } + if err := snapshots.ReopenSegments(); err != nil { + return fmt.Errorf("ReopenSegments: %w", err) + } } return nil } diff --git a/turbo/snapshotsync/block_snapshots_test.go b/turbo/snapshotsync/block_snapshots_test.go index f9895ea99..0a40a2719 100644 --- a/turbo/snapshotsync/block_snapshots_test.go +++ b/turbo/snapshotsync/block_snapshots_test.go @@ -77,9 +77,9 @@ func TestMerge(t *testing.T) { { merger := NewMerger(dir, 1, log.LvlInfo) - toMergeHeaders, toMergeBodies, toMergeTxs, from, to, recommendedMerge := merger.FindCandidates(s) - require.True(recommendedMerge) - err := merger.Merge(context.Background(), toMergeHeaders, toMergeBodies, toMergeTxs, from, to, &dir2.Rw{Path: s.Dir()}) + ranges := merger.FindMergeRanges(s) + require.True(len(ranges) > 0) + err := merger.Merge(context.Background(), s, ranges, &dir2.Rw{Path: s.Dir()}) require.NoError(err) require.NoError(s.ReopenSegments()) } @@ -93,19 +93,18 @@ func TestMerge(t *testing.T) { { merger := NewMerger(dir, 1, log.LvlInfo) - toMergeHeaders, toMergeBodies, toMergeTxs, from, to, recommendedMerge := merger.FindCandidates(s) - require.True(recommendedMerge) - err := merger.Merge(context.Background(), toMergeHeaders, toMergeBodies, toMergeTxs, from, to, &dir2.Rw{Path: s.Dir()}) + ranges := merger.FindMergeRanges(s) + require.True(len(ranges) == 0) + err := merger.Merge(context.Background(), s, ranges, &dir2.Rw{Path: s.Dir()}) require.NoError(err) - require.NoError(s.ReopenSegments()) } - expectedFileName = SegmentFileName(1_000_000, 1_200_000, Transactions) + expectedFileName = SegmentFileName(1_100_000, 1_200_000, Transactions) d, err = compress.NewDecompressor(filepath.Join(dir, expectedFileName)) require.NoError(err) defer d.Close() a = d.Count() - require.Equal(2, a) + require.Equal(1, a) } func TestRecompress(t *testing.T) {