snapshots: merge segments strategy (#3645)

* save

* save

* save
This commit is contained in:
Alex Sharov 2022-03-04 14:16:16 +07:00 committed by GitHub
parent 902824a8cf
commit f5be8919ef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 94 additions and 43 deletions

View File

@ -627,23 +627,16 @@ func RetireBlocks(ctx context.Context, blockFrom, blockTo uint64, chainID uint25
return fmt.Errorf("ReopenSegments: %w", err)
}
merger := NewMerger(tmpDir, workers, lvl)
toMergeHeaders, toMergeBodies, toMergeTxs, from, to, recommendedMerge := merger.FindCandidates(snapshots)
if !recommendedMerge {
ranges := merger.FindMergeRanges(snapshots)
if len(ranges) == 0 {
return nil
}
if err := merger.Merge(ctx, toMergeHeaders, toMergeBodies, toMergeTxs, from, to, &dir.Rw{Path: snapshots.Dir()}); err != nil {
if err := merger.Merge(ctx, snapshots, ranges, &dir.Rw{Path: snapshots.Dir()}); err != nil {
return err
}
snapshots.Close()
if err := merger.RemoveOldFiles(toMergeHeaders, toMergeBodies, toMergeTxs, &dir.Rw{Path: snapshots.Dir()}); err != nil {
return err
}
if err := snapshots.ReopenSegments(); err != nil {
return fmt.Errorf("ReopenSegments: %w", err)
}
log.Log(lvl, "[snapshots] Merge done. Indexing new segments", "from", from)
if err := BuildIndices(ctx, snapshots, &dir.Rw{Path: snapshots.Dir()}, chainID, tmpDir, from, lvl); err != nil {
log.Log(lvl, "[snapshots] Merge done. Indexing new segments", "from", ranges[0].from)
if err := BuildIndices(ctx, snapshots, &dir.Rw{Path: snapshots.Dir()}, chainID, tmpDir, ranges[0].from, lvl); err != nil {
return fmt.Errorf("BuildIndices: %w", err)
}
if err := snapshots.ReopenIndices(); err != nil {
@ -1253,41 +1246,100 @@ type Merger struct {
func NewMerger(tmpDir string, workers int, lvl log.Lvl) *Merger {
return &Merger{tmpDir: tmpDir, workers: workers, lvl: lvl}
}
func (*Merger) FindCandidates(snapshots *RoSnapshots) (toMergeHeaders, toMergeBodies, toMergeTxs []string, from, to uint64, mergeRecommended bool) {
var stopAt uint64
// merge segments
for _, sn := range snapshots.blocks {
/*
a.fileLocks[fType].RLock()
defer a.fileLocks[fType].RUnlock()
var maxEndBlock uint64
a.files[fType].Ascend(func(i btree.Item) bool {
item := i.(*byEndBlockItem)
if item.decompressor == nil {
return true // Skip B-tree based items
}
pre = append(pre, item)
if aggTo == 0 {
var doubleEnd uint64
nextDouble := item.endBlock
for nextDouble <= maxEndBlock && nextDouble-item.startBlock < maxSpan {
doubleEnd = nextDouble
nextDouble = doubleEnd + (doubleEnd - item.startBlock) + 1
}
if doubleEnd != item.endBlock {
aggFrom = item.startBlock
aggTo = doubleEnd
} else {
post = append(post, item)
return true
}
}
toAggregate = append(toAggregate, item)
return item.endBlock < aggTo
})
*/
type mergeRange struct {
from, to uint64
}
func (*Merger) FindMergeRanges(snapshots *RoSnapshots) (res []mergeRange) {
for i := len(snapshots.blocks) - 1; i > 0; i-- {
sn := snapshots.blocks[i]
if sn.To-sn.From >= DEFAULT_SEGMENT_SIZE { // is complete .seg
continue
}
if from == 0 {
from = sn.From
stopAt = chooseSegmentEnd(from, from+DEFAULT_SEGMENT_SIZE, DEFAULT_SEGMENT_SIZE)
}
if sn.To > stopAt {
for _, span := range []uint64{500_000, 100_000, 10_000} {
if sn.To%span != 0 {
continue
}
if sn.To-sn.From == span {
break
}
aggFrom := sn.To - span
res = append(res, mergeRange{from: aggFrom, to: sn.To})
for snapshots.blocks[i].From > aggFrom {
i--
}
break
}
to = sn.To
}
sort.Slice(res, func(i, j int) bool { return res[i].from < res[j].from })
return res
}
func (m *Merger) filesByRange(snapshots *RoSnapshots, from, to uint64) (toMergeHeaders, toMergeBodies, toMergeTxs []string) {
for _, sn := range snapshots.blocks {
if sn.From < from {
continue
}
if sn.To > to {
break
}
toMergeBodies = append(toMergeBodies, sn.Bodies.FilePath())
toMergeHeaders = append(toMergeHeaders, sn.Headers.FilePath())
toMergeTxs = append(toMergeTxs, sn.Transactions.FilePath())
}
enoughSmallSegments := len(toMergeHeaders) >= MERGE_THRESHOLD
canFormNewCompleteSegment := to-from == DEFAULT_SEGMENT_SIZE && len(toMergeHeaders) > 1
mergeRecommended = enoughSmallSegments || canFormNewCompleteSegment
return
}
func (m *Merger) Merge(ctx context.Context, toMergeHeaders, toMergeBodies, toMergeTxs []string, from, to uint64, snapshotDir *dir.Rw) error {
if err := m.merge(ctx, toMergeBodies, filepath.Join(snapshotDir.Path, SegmentFileName(from, to, Bodies))); err != nil {
return fmt.Errorf("mergeByAppendSegments: %w", err)
}
if err := m.merge(ctx, toMergeHeaders, filepath.Join(snapshotDir.Path, SegmentFileName(from, to, Headers))); err != nil {
return fmt.Errorf("mergeByAppendSegments: %w", err)
}
if err := m.merge(ctx, toMergeTxs, filepath.Join(snapshotDir.Path, SegmentFileName(from, to, Transactions))); err != nil {
return fmt.Errorf("mergeByAppendSegments: %w", err)
func (m *Merger) Merge(ctx context.Context, snapshots *RoSnapshots, mergeRanges []mergeRange, snapshotDir *dir.Rw) error {
for _, r := range mergeRanges {
toMergeHeaders, toMergeBodies, toMergeTxs := m.filesByRange(snapshots, r.from, r.to)
if err := m.merge(ctx, toMergeBodies, filepath.Join(snapshotDir.Path, SegmentFileName(r.from, r.to, Bodies))); err != nil {
return fmt.Errorf("mergeByAppendSegments: %w", err)
}
if err := m.merge(ctx, toMergeHeaders, filepath.Join(snapshotDir.Path, SegmentFileName(r.from, r.to, Headers))); err != nil {
return fmt.Errorf("mergeByAppendSegments: %w", err)
}
if err := m.merge(ctx, toMergeTxs, filepath.Join(snapshotDir.Path, SegmentFileName(r.from, r.to, Transactions))); err != nil {
return fmt.Errorf("mergeByAppendSegments: %w", err)
}
snapshots.Close()
if err := m.RemoveOldFiles(toMergeHeaders, toMergeBodies, toMergeTxs, &dir.Rw{Path: snapshots.Dir()}); err != nil {
return err
}
if err := snapshots.ReopenSegments(); err != nil {
return fmt.Errorf("ReopenSegments: %w", err)
}
}
return nil
}

View File

@ -77,9 +77,9 @@ func TestMerge(t *testing.T) {
{
merger := NewMerger(dir, 1, log.LvlInfo)
toMergeHeaders, toMergeBodies, toMergeTxs, from, to, recommendedMerge := merger.FindCandidates(s)
require.True(recommendedMerge)
err := merger.Merge(context.Background(), toMergeHeaders, toMergeBodies, toMergeTxs, from, to, &dir2.Rw{Path: s.Dir()})
ranges := merger.FindMergeRanges(s)
require.True(len(ranges) > 0)
err := merger.Merge(context.Background(), s, ranges, &dir2.Rw{Path: s.Dir()})
require.NoError(err)
require.NoError(s.ReopenSegments())
}
@ -93,19 +93,18 @@ func TestMerge(t *testing.T) {
{
merger := NewMerger(dir, 1, log.LvlInfo)
toMergeHeaders, toMergeBodies, toMergeTxs, from, to, recommendedMerge := merger.FindCandidates(s)
require.True(recommendedMerge)
err := merger.Merge(context.Background(), toMergeHeaders, toMergeBodies, toMergeTxs, from, to, &dir2.Rw{Path: s.Dir()})
ranges := merger.FindMergeRanges(s)
require.True(len(ranges) == 0)
err := merger.Merge(context.Background(), s, ranges, &dir2.Rw{Path: s.Dir()})
require.NoError(err)
require.NoError(s.ReopenSegments())
}
expectedFileName = SegmentFileName(1_000_000, 1_200_000, Transactions)
expectedFileName = SegmentFileName(1_100_000, 1_200_000, Transactions)
d, err = compress.NewDecompressor(filepath.Join(dir, expectedFileName))
require.NoError(err)
defer d.Close()
a = d.Count()
require.Equal(2, a)
require.Equal(1, a)
}
func TestRecompress(t *testing.T) {