diff --git a/aggregator/aggregator.go b/aggregator/aggregator.go index e21cd15b4..a3a4d659c 100644 --- a/aggregator/aggregator.go +++ b/aggregator/aggregator.go @@ -46,6 +46,7 @@ import ( "github.com/ledgerwatch/erigon-lib/common/length" "github.com/ledgerwatch/erigon-lib/compress" "github.com/ledgerwatch/erigon-lib/recsplit" + "github.com/ledgerwatch/erigon-lib/recsplit/eliasfano32" "github.com/ledgerwatch/log/v3" "github.com/spaolacci/murmur3" "golang.org/x/crypto/sha3" @@ -694,6 +695,9 @@ func (c *Changes) produceChangeSets(blockFrom, blockTo uint64, historyType, bitm if err = comp.AddWord(before); err != nil { return nil, nil, nil, nil, fmt.Errorf("produceChangeSets AddWord before: %w", err) } + //if historyType == AccountHistory { + // fmt.Printf("produce %s.%d-%d [%x]=>[%x]\n", historyType.String(), blockFrom, blockTo, txKey, before) + //} var bitmap *roaring64.Bitmap var ok bool if bitmap, ok = bitmaps[string(key)]; !ok { @@ -732,26 +736,28 @@ func (c *Changes) produceChangeSets(blockFrom, blockTo uint64, historyType, bitm bitmapC.Close() } }() - bitmapKeys := make([]string, len(bitmaps)) + idxKeys := make([]string, len(bitmaps)) i := 0 + var buf []byte for key := range bitmaps { - bitmapKeys[i] = key + idxKeys[i] = key i++ } - sort.Strings(bitmapKeys) - var bitmapKey []byte - var bitmapVal []byte - for _, key := range bitmapKeys { - bitmapKey = append(bitmapKey[:0], []byte(key)...) - bitmapKey = append(bitmapKey, blockSuffix[:]...) - if err = bitmapC.AddWord(bitmapKey); err != nil { + sort.Strings(idxKeys) + for _, key := range idxKeys { + if err = bitmapC.AddWord([]byte(key)); err != nil { return nil, nil, nil, nil, fmt.Errorf("produceChangeSets bitmap add key: %w", err) } - bitmaps[key].RunOptimize() - if bitmapVal, err = bitmaps[key].ToBytes(); err != nil { - return nil, nil, nil, nil, fmt.Errorf("produceChangeSets bitmapVal production: %w", err) + bitmap := bitmaps[key] + ef := eliasfano32.NewEliasFano(bitmap.GetCardinality(), bitmap.Maximum()) + it := bitmap.Iterator() + for it.HasNext() { + v := it.Next() + ef.AddOffset(v) } - if err = bitmapC.AddWord(bitmapVal); err != nil { + ef.Build() + buf = ef.AppendBytes(buf[:0]) + if err = bitmapC.AddUncompressedWord(buf); err != nil { return nil, nil, nil, nil, fmt.Errorf("produceChangeSets bitmap add val: %w", err) } } @@ -765,7 +771,7 @@ func (c *Changes) produceChangeSets(blockFrom, blockTo uint64, historyType, bitm return nil, nil, nil, nil, fmt.Errorf("produceChangeSets bitmap decompressor: %w", err) } - bitmapI, err := buildIndex(bitmapD, bitmapIdxPath, c.dir, len(bitmapKeys)) + bitmapI, err := buildIndex(bitmapD, bitmapIdxPath, c.dir, len(idxKeys)) if err != nil { return nil, nil, nil, nil, fmt.Errorf("produceChangeSets bitmap buildIndex: %w", err) } @@ -1444,14 +1450,18 @@ func (a *Aggregator) backgroundMerge() { } if len(toRemove[fType]) > 1 { var valTransform func([]byte, []byte) ([]byte, error) + var mergeFunc func([]byte, []byte, []byte) ([]byte, error) if fType == Commitment { valTransform = cvt.commitmentValTransform + mergeFunc = mergeCommitments + } else { + mergeFunc = mergeReplace } var prefixLen int if fType == Storage { prefixLen = length.Addr } - if newItems[fType], err = a.computeAggregation(fType, toRemove[fType], from, to, valTransform, true /* withIndex */, prefixLen); err != nil { + if newItems[fType], err = a.computeAggregation(fType, toRemove[fType], from, to, valTransform, mergeFunc, true /* valCompressed */, true /* withIndex */, prefixLen); err != nil { a.mergeError <- fmt.Errorf("computeAggreation %s: %w", fType.String(), err) return } @@ -1491,9 +1501,11 @@ func (a *Aggregator) reduceHistoryFiles(fType FileType, item *byEndBlockItem) er var val []byte var count int g.Reset(0) + var key []byte for g.HasNext() { g.Skip() // Skip key on on the first pass val, _ = g.Next(val[:0]) + //fmt.Printf("reduce1 [%s.%d-%d] [%x]=>[%x]\n", fType.String(), item.startBlock, item.endBlock, key, val) if err = comp.AddWord(val); err != nil { return fmt.Errorf("reduceHistoryFiles AddWord: %w", err) } @@ -1525,11 +1537,11 @@ func (a *Aggregator) reduceHistoryFiles(fType FileType, item *byEndBlockItem) er g.Reset(0) g1.Reset(0) var lastOffset uint64 - var key []byte for g.HasNext() { key, _ = g.Next(key[:0]) g.Skip() // Skip value _, pos := g1.Next(nil) + //fmt.Printf("reduce2 [%s.%d-%d] [%x]==>%d\n", fType.String(), item.startBlock, item.endBlock, key, lastOffset) if err = rs.AddKey(key, lastOffset); err != nil { return fmt.Errorf("reduceHistoryFiles %p AddKey: %w", rs, err) } @@ -1568,6 +1580,31 @@ func (a *Aggregator) reduceHistoryFiles(fType FileType, item *byEndBlockItem) er return nil } +func mergeReplace(preval, val, buf []byte) ([]byte, error) { + return append(buf, val...), nil +} + +func mergeBitmaps(preval, val, buf []byte) ([]byte, error) { + preef, _ := eliasfano32.ReadEliasFano(preval) + ef, _ := eliasfano32.ReadEliasFano(val) + //fmt.Printf("mergeBitmaps [%x] (count=%d,max=%d) + [%x] (count=%d,max=%d)\n", preval, preef.Count(), preef.Max(), val, ef.Count(), ef.Max()) + preIt := preef.Iterator() + efIt := ef.Iterator() + newEf := eliasfano32.NewEliasFano(preef.Count()+ef.Count(), ef.Max()) + for preIt.HasNext() { + newEf.AddOffset(preIt.Next()) + } + for efIt.HasNext() { + newEf.AddOffset(efIt.Next()) + } + newEf.Build() + return newEf.AppendBytes(buf), nil +} + +func mergeCommitments(preval, val, buf []byte) ([]byte, error) { + return commitment.MergeBranches(preval, val, buf) +} + func (a *Aggregator) backgroundHistoryMerge() { defer a.historyWg.Done() for range a.historyChannel { @@ -1597,8 +1634,17 @@ func (a *Aggregator) backgroundHistoryMerge() { } } if len(toRemove[fType]) > 1 { - if newItems[fType], err = a.computeAggregation(fType, toRemove[fType], from, to, nil, /* valTransform */ - !finalMerge || fType == AccountBitmap || fType == StorageBitmap || fType == CodeBitmap /* withIndex */, 0 /* prefixLen */); err != nil { + isBitmap := fType == AccountBitmap || fType == StorageBitmap || fType == CodeBitmap + var mergeFunc func([]byte, []byte, []byte) ([]byte, error) + if isBitmap { + mergeFunc = mergeBitmaps + } else if fType == Commitment { + mergeFunc = mergeCommitments + } else { + mergeFunc = mergeReplace + } + if newItems[fType], err = a.computeAggregation(fType, toRemove[fType], from, to, nil /* valTransform */, mergeFunc, + !isBitmap /* valCompressed */, !finalMerge || isBitmap /* withIndex */, 0 /* prefixLen */); err != nil { a.historyError <- fmt.Errorf("computeAggreation %s: %w", fType.String(), err) return } @@ -2748,6 +2794,8 @@ func (a *Aggregator) findLargestMerge(fType FileType, maxTo uint64, maxSpan uint func (a *Aggregator) computeAggregation(fType FileType, toAggregate []*byEndBlockItem, aggFrom uint64, aggTo uint64, valTransform func(val []byte, transValBuf []byte) ([]byte, error), + mergeFunc func(preval, val, buf []byte) ([]byte, error), + valCompressed bool, withIndex bool, prefixLen int) (*byEndBlockItem, error) { var item2 = &byEndBlockItem{startBlock: aggFrom, endBlock: aggTo} var cp CursorHeap @@ -2763,7 +2811,7 @@ func (a *Aggregator) computeAggregation(fType FileType, } var err error var count int - if item2.decompressor, count, err = a.mergeIntoStateFile(&cp, prefixLen, fType, aggFrom, aggTo, a.diffDir, valTransform, fType == Commitment); err != nil { + if item2.decompressor, count, err = a.mergeIntoStateFile(&cp, prefixLen, fType, aggFrom, aggTo, a.diffDir, valTransform, mergeFunc, valCompressed); err != nil { return nil, fmt.Errorf("mergeIntoStateFile %s [%d-%d]: %w", fType.String(), aggFrom, aggTo, err) } item2.getter = item2.decompressor.MakeGetter() @@ -2861,7 +2909,8 @@ func (w *Writer) aggregateUpto(blockFrom, blockTo uint64) error { func (a *Aggregator) mergeIntoStateFile(cp *CursorHeap, prefixLen int, fType FileType, startBlock, endBlock uint64, dir string, valTransform func(val []byte, transValBuf []byte) ([]byte, error), - commitments bool, + mergeFunc func(preval, val, buf []byte) ([]byte, error), + valCompressed bool, ) (*compress.Decompressor, int, error) { datPath := filepath.Join(dir, fmt.Sprintf("%s.%d-%d.dat", fType.String(), startBlock, endBlock)) comp, err := compress.NewCompressor(context.Background(), AggregatorPrefix, datPath, dir, compress.MinPatternScore, 1) @@ -2896,20 +2945,22 @@ func (a *Aggregator) mergeIntoStateFile(cp *CursorHeap, prefixLen int, if ci1.t != FILE_CURSOR { return nil, 0, fmt.Errorf("mergeIntoStateFile: cursor of unexpected type: %d", ci1.t) } - if commitments { - if mergedOnce { - //fmt.Printf("mergeIntoStateFile pre-merge prefix [%x], [%x]+[%x]\n", commitment.CompactToHex(lastKey), ci1.val, lastVal) - if lastVal, err = commitment.MergeBranches(ci1.val, lastVal, nil); err != nil { - return nil, 0, fmt.Errorf("mergeIntoStateFile: merge commitments: %w", err) - } - //fmt.Printf("mergeIntoStateFile post-merge prefix [%x], [%x]\n", commitment.CompactToHex(lastKey), lastVal) - } else { - mergedOnce = true + if mergedOnce { + //fmt.Printf("mergeIntoStateFile pre-merge prefix [%x], [%x]+[%x]\n", commitment.CompactToHex(lastKey), ci1.val, lastVal) + if lastVal, err = mergeFunc(ci1.val, lastVal, nil); err != nil { + return nil, 0, fmt.Errorf("mergeIntoStateFile: merge values: %w", err) } + //fmt.Printf("mergeIntoStateFile post-merge prefix [%x], [%x]\n", commitment.CompactToHex(lastKey), lastVal) + } else { + mergedOnce = true } if ci1.dg.HasNext() { ci1.key, _ = ci1.dg.Next(ci1.key[:0]) - ci1.val, _ = ci1.dg.Next(ci1.val[:0]) + if valCompressed { + ci1.val, _ = ci1.dg.Next(ci1.val[:0]) + } else { + ci1.val, _ = ci1.dg.NextUncompressed() + } heap.Fix(cp, 0) } else { heap.Pop(cp) @@ -2929,6 +2980,8 @@ func (a *Aggregator) mergeIntoStateFile(cp *CursorHeap, prefixLen int, // Its bit are set for children that are present in the tree, and unset for those that are not (deleted, for example) // If all bits are zero (check below), this branch can be skipped, since it is empty skip = startBlock == 0 && len(lastVal) >= 4 && lastVal[2] == 0 && lastVal[3] == 0 + case AccountHistory, StorageHistory, CodeHistory: + skip = false default: // For the rest of types, empty value means deletion skip = startBlock == 0 && len(lastVal) == 0 @@ -2958,9 +3011,18 @@ func (a *Aggregator) mergeIntoStateFile(cp *CursorHeap, prefixLen int, if err = comp.AddWord(transValBuf); err != nil { return nil, 0, err } - } else if err = comp.AddWord(valBuf); err != nil { - return nil, 0, err + } else if valCompressed { + if err = comp.AddWord(valBuf); err != nil { + return nil, 0, err + } + } else { + if err = comp.AddUncompressedWord(valBuf); err != nil { + return nil, 0, err + } } + //if fType == AccountHistory { + // fmt.Printf("merge %s.%d-%d [%x]=>[%x]\n", fType.String(), startBlock, endBlock, keyBuf, valBuf) + //} } keyBuf = append(keyBuf[:0], lastKey...) valBuf = append(valBuf[:0], lastVal...) @@ -2983,9 +3045,18 @@ func (a *Aggregator) mergeIntoStateFile(cp *CursorHeap, prefixLen int, if err = comp.AddWord(transValBuf); err != nil { return nil, 0, err } - } else if err = comp.AddWord(valBuf); err != nil { - return nil, 0, err + } else if valCompressed { + if err = comp.AddWord(valBuf); err != nil { + return nil, 0, err + } + } else { + if err = comp.AddUncompressedWord(valBuf); err != nil { + return nil, 0, err + } } + //if fType == AccountHistory { + // fmt.Printf("merge %s.%d-%d [%x]=>[%x]\n", fType.String(), startBlock, endBlock, keyBuf, valBuf) + //} } if err = comp.Compress(); err != nil { return nil, 0, err diff --git a/aggregator/history.go b/aggregator/history.go index e563710ba..fe740215b 100644 --- a/aggregator/history.go +++ b/aggregator/history.go @@ -17,7 +17,6 @@ package aggregator import ( - "bytes" "encoding/binary" "fmt" "io/fs" @@ -27,11 +26,11 @@ import ( "strconv" "strings" - "github.com/RoaringBitmap/roaring/roaring64" "github.com/google/btree" "github.com/holiman/uint256" "github.com/ledgerwatch/erigon-lib/compress" "github.com/ledgerwatch/erigon-lib/recsplit" + "github.com/ledgerwatch/erigon-lib/recsplit/eliasfano32" "github.com/ledgerwatch/log/v3" ) @@ -197,55 +196,32 @@ func (hr *HistoryReader) searchInHistory(bitmapType, historyType FileType, key [ searchTx := hr.txNum hr.search.endBlock = searchBlock hr.search.startBlock = searchBlock - (searchBlock % 500_000) - var lookupKey = make([]byte, len(key)+8) - copy(lookupKey, key) - var bitmapVal []byte - bm := roaring64.New() + var eliasVal []byte var err error var found bool var foundTxNum uint64 var foundEndBlock uint64 hr.h.files[bitmapType].AscendGreaterOrEqual(&hr.search, func(i btree.Item) bool { item := i.(*byEndBlockItem) + offset := item.indexReader.Lookup(key) g := item.getter - for chunkEnd := hr.h.aggregationStep*(searchBlock/hr.h.aggregationStep) + hr.h.aggregationStep - 1; chunkEnd <= item.endBlock; chunkEnd += hr.h.aggregationStep { - if chunkEnd < item.startBlock { - continue - } - binary.BigEndian.PutUint64(lookupKey[len(key):], chunkEnd) - offset := item.indexReader.Lookup(lookupKey) + g.Reset(offset) + if keyMatch, _ := g.Match(key); keyMatch { if trace { - fmt.Printf("Lookup [%x] in %s.[%d-%d].idx = %d\n", lookupKey, bitmapType.String(), item.startBlock, item.endBlock, offset) + fmt.Printf("Found bitmap for [%x] in %s.[%d-%d]\n", key, bitmapType.String(), item.startBlock, item.endBlock) } - g.Reset(offset) - if keyMatch, _ := g.Match(lookupKey); keyMatch { - if trace { - fmt.Printf("Found bitmap for [%x] in %s.[%d-%d]\n", lookupKey, bitmapType.String(), item.startBlock, item.endBlock) + eliasVal, _ = g.NextUncompressed() + ef, _ := eliasfano32.ReadEliasFano(eliasVal) + it := ef.Iterator() + if trace { + for it.HasNext() { + fmt.Printf(" %d", it.Next()) } - bitmapVal, _ = g.Next(bitmapVal[:0]) - bm.Clear() - if _, err = bm.ReadFrom(bytes.NewReader(bitmapVal)); err != nil { - return false - } - if searchTx == 0 { - foundTxNum = bm.Minimum() - foundEndBlock = item.endBlock - found = true - return false - } - searchRank := bm.Rank(searchTx - 1) - if trace { - fmt.Printf("searchRank = %d for searchTx = %d, cardinality = %d\n", searchRank, searchTx, bm.GetCardinality()) - } - if trace && bm.GetCardinality() > 0 { - fmt.Printf("min = %d, max = %d\n", bm.Minimum(), bm.Maximum()) - } - if searchRank >= bm.GetCardinality() { - continue - } - foundTxNum, _ = bm.Select(searchRank) + fmt.Printf("\n") + } + foundTxNum, found = ef.Search(searchTx) + if found { foundEndBlock = item.endBlock - found = true return false } } @@ -261,6 +237,7 @@ func (hr *HistoryReader) searchInHistory(bitmapType, historyType FileType, key [ if trace { fmt.Printf("found in tx %d, endBlock %d\n", foundTxNum, foundEndBlock) } + var lookupKey = make([]byte, len(key)+8) binary.BigEndian.PutUint64(lookupKey, foundTxNum) copy(lookupKey[8:], key) var historyItem *byEndBlockItem diff --git a/compress/decompress.go b/compress/decompress.go index ac9d23351..063978013 100644 --- a/compress/decompress.go +++ b/compress/decompress.go @@ -280,6 +280,7 @@ func (g *Getter) NextUncompressed() ([]byte, uint64) { if l == 0 { return g.data[g.dataP:g.dataP], g.dataP } + g.nextPos(false) pos := g.dataP g.dataP += l return g.data[pos:g.dataP], g.dataP diff --git a/compress/decompress_test.go b/compress/decompress_test.go index baff44a0b..291cad917 100644 --- a/compress/decompress_test.go +++ b/compress/decompress_test.go @@ -17,6 +17,7 @@ package compress import ( + "bytes" "context" "fmt" "path/filepath" @@ -155,6 +156,47 @@ func TestDecompressMatchPrefix(t *testing.T) { } } +func prepareLoremDictUncompressed(t *testing.T) *Decompressor { + tmpDir := t.TempDir() + file := filepath.Join(tmpDir, "compressed") + t.Name() + c, err := NewCompressor(context.Background(), t.Name(), file, tmpDir, 1, 2) + if err != nil { + t.Fatal(err) + } + defer c.Close() + for k, w := range loremStrings { + if err = c.AddUncompressedWord([]byte(fmt.Sprintf("%s %d", w, k))); err != nil { + t.Fatal(err) + } + } + if err = c.Compress(); err != nil { + t.Fatal(err) + } + var d *Decompressor + if d, err = NewDecompressor(file); err != nil { + t.Fatal(err) + } + return d +} + +func TestUncompressed(t *testing.T) { + d := prepareLoremDictUncompressed(t) + defer d.Close() + g := d.MakeGetter() + i := 0 + for g.HasNext() { + w := loremStrings[i] + expected := []byte(fmt.Sprintf("%s %d", w, i+1)) + expected = expected[:len(expected)/2] + actual, _ := g.NextUncompressed() + if bytes.Equal(expected, actual) { + t.Errorf("expected %s, actual %s", expected, actual) + } + i++ + } +} + const lorem = `Lorem ipsum dolor sit amet consectetur adipiscing elit sed do eiusmod tempor incididunt ut labore et dolore magna aliqua Ut enim ad minim veniam quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur diff --git a/compress/parallel_compress.go b/compress/parallel_compress.go index dbc96e77c..23a92bc84 100644 --- a/compress/parallel_compress.go +++ b/compress/parallel_compress.go @@ -413,6 +413,17 @@ func reducedict(ctx context.Context, trace bool, logPrefix, segmentFilePath stri close(ch) // Drain the out queue if necessary if inCount > outCount { + for compressionQueue.Len() > 0 && compressionQueue[0].order == outCount { + compW := heap.Pop(&compressionQueue).(*CompressionWord) + outCount++ + if outCount == inCount { + close(out) + } + // Write to intermediate file + if _, e := intermediateW.Write(compW.word); e != nil { + return e + } + } for compW := range out { heap.Push(&compressionQueue, compW) for compressionQueue.Len() > 0 && compressionQueue[0].order == outCount { diff --git a/recsplit/eliasfano32/elias_fano.go b/recsplit/eliasfano32/elias_fano.go index 46f166df0..8abb577b4 100644 --- a/recsplit/eliasfano32/elias_fano.go +++ b/recsplit/eliasfano32/elias_fano.go @@ -1,5 +1,5 @@ /* - Copyright 2021 Erigon contributors + Copyright 2022 Erigon contributors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -22,6 +22,7 @@ import ( "io" "math" "math/bits" + "sort" "unsafe" "github.com/ledgerwatch/erigon-lib/common/bitutil" @@ -52,16 +53,11 @@ type EliasFano struct { u uint64 l uint64 maxOffset uint64 - minDelta uint64 i uint64 - delta uint64 wordsUpperBits int } -func NewEliasFano(count uint64, maxOffset, minDelta uint64) *EliasFano { - if minDelta > (1 << 32) { - panic(fmt.Sprintf("too big minDelat: %d", minDelta)) - } +func NewEliasFano(count uint64, maxOffset uint64) *EliasFano { if count == 0 { panic(fmt.Sprintf("too small count: %d", count)) } @@ -69,9 +65,8 @@ func NewEliasFano(count uint64, maxOffset, minDelta uint64) *EliasFano { ef := &EliasFano{ count: count - 1, maxOffset: maxOffset, - minDelta: minDelta, } - ef.u = maxOffset - ef.count*ef.minDelta + 1 + ef.u = maxOffset + 1 ef.wordsUpperBits = ef.deriveFields() return ef } @@ -79,13 +74,12 @@ func NewEliasFano(count uint64, maxOffset, minDelta uint64) *EliasFano { func (ef *EliasFano) AddOffset(offset uint64) { //fmt.Printf("0x%x,\n", offset) if ef.l != 0 { - set_bits(ef.lowerBits, ef.i*ef.l, int(ef.l), (offset-ef.delta)&ef.lowerBitsMask) + set_bits(ef.lowerBits, ef.i*ef.l, int(ef.l), offset&ef.lowerBitsMask) } //pos := ((offset - ef.delta) >> ef.l) + ef.i - set(ef.upperBits, ((offset-ef.delta)>>ef.l)+ef.i) + set(ef.upperBits, (offset>>ef.l)+ef.i) //fmt.Printf("add:%x, pos=%x, set=%x, res=%x\n", offset, pos, pos/64, uint64(1)<<(pos%64)) ef.i++ - ef.delta += ef.minDelta } func (ef EliasFano) jumpSizeWords() int { @@ -101,9 +95,7 @@ func (ef *EliasFano) deriveFields() int { ef.l = 0 } else { ef.l = 63 ^ uint64(bits.LeadingZeros64(ef.u/(ef.count+1))) // pos of first non-zero bit - //fmt.Printf("lllllllll: %d, %d\n", 63^uint64(bits.LeadingZeros64(24/7)), msb(ef.u/(ef.count+1))) } - //fmt.Printf("EF: %d, %d,%d\n", ef.count, ef.u, ef.l) ef.lowerBitsMask = (uint64(1) << ef.l) - 1 wordsLowerBits := int(((ef.count+1)*ef.l+63)/64 + 1) wordsUpperBits := int((ef.count + 1 + (ef.u >> ef.l) + 63) / 64) @@ -139,10 +131,6 @@ func (ef *EliasFano) Build() { if offset >= (1 << 32) { fmt.Printf("ef.l=%x,ef.u=%x\n", ef.l, ef.u) fmt.Printf("offset=%x,lastSuperQ=%x,i=%x,b=%x,c=%x\n", offset, lastSuperQ, i, b, c) - fmt.Printf("ef.minDelta=%x\n", ef.minDelta) - //fmt.Printf("ef.upperBits=%x\n", ef.upperBits) - //fmt.Printf("ef.lowerBits=%x\n", ef.lowerBits) - //fmt.Printf("ef.wordsUpperBits=%b\n", ef.wordsUpperBits) panic("") } // c % superQ is the bit index inside the group of 4096 bits @@ -159,7 +147,7 @@ func (ef *EliasFano) Build() { } } -func (ef EliasFano) get(i uint64) (val uint64, window uint64, sel int, currWord uint64, lower uint64, delta uint64) { +func (ef EliasFano) get(i uint64) (val uint64, window uint64, sel int, currWord uint64, lower uint64) { lower = i * ef.l idx64 := lower / 64 shift := lower % 64 @@ -186,14 +174,13 @@ func (ef EliasFano) get(i uint64) (val uint64, window uint64, sel int, currWord } sel = bitutil.Select64(window, d) - delta = i * ef.minDelta - val = ((currWord*64+uint64(sel)-i)<>= ef.l - valNext = ((currWord*64+uint64(bits.TrailingZeros64(window))-i-1)<= offset + })) + if i <= ef.count { + return ef.Get(i), true + } + return 0, false +} + +func (ef EliasFano) Max() uint64 { + return ef.maxOffset +} + +func (ef EliasFano) Count() uint64 { + return ef.count + 1 +} + +func (ef *EliasFano) Iterator() *EliasFanoIter { + return &EliasFanoIter{ef: ef, upperMask: 1, upperStep: uint64(1) << ef.l} +} + +type EliasFanoIter struct { + ef *EliasFano + idx uint64 + lowerIdx uint64 + upperIdx uint64 + upperMask uint64 + upper uint64 + upperStep uint64 +} + +func (efi *EliasFanoIter) HasNext() bool { + return efi.idx <= efi.ef.count +} + +func (efi *EliasFanoIter) Next() uint64 { + idx64 := efi.lowerIdx >> 6 + shift := efi.lowerIdx & 63 + lower := efi.ef.lowerBits[idx64] >> shift + if shift > 0 { + lower |= efi.ef.lowerBits[idx64+1] << (64 - shift) + } + if efi.upperMask == 0 { + efi.upperIdx++ + efi.upperMask = 1 + } + for efi.ef.upperBits[efi.upperIdx]&efi.upperMask == 0 { + efi.upper += efi.upperStep + efi.upperMask <<= 1 + if efi.upperMask == 0 { + efi.upperIdx++ + efi.upperMask = 1 + } + } + efi.upperMask <<= 1 + efi.lowerIdx += efi.ef.l + efi.idx++ + val := (lower & efi.ef.lowerBitsMask) | efi.upper + return val +} + // Write outputs the state of golomb rice encoding into a writer, which can be recovered later by Read func (ef *EliasFano) Write(w io.Writer) error { var numBuf [8]byte binary.BigEndian.PutUint64(numBuf[:], ef.count) - //fmt.Printf("write: %d,%x\n", ef.count, numBuf) if _, e := w.Write(numBuf[:]); e != nil { return e } @@ -227,10 +276,6 @@ func (ef *EliasFano) Write(w io.Writer) error { if _, e := w.Write(numBuf[:]); e != nil { return e } - binary.BigEndian.PutUint64(numBuf[:], ef.minDelta) - if _, e := w.Write(numBuf[:]); e != nil { - return e - } p := (*[maxDataSize]byte)(unsafe.Pointer(&ef.data[0])) b := (*p)[:] if _, e := w.Write(b[:len(ef.data)*8]); e != nil { @@ -239,19 +284,31 @@ func (ef *EliasFano) Write(w io.Writer) error { return nil } +// Write outputs the state of golomb rice encoding into a writer, which can be recovered later by Read +func (ef *EliasFano) AppendBytes(buf []byte) []byte { + var numBuf [8]byte + binary.BigEndian.PutUint64(numBuf[:], ef.count) + buf = append(buf, numBuf[:]...) + binary.BigEndian.PutUint64(numBuf[:], ef.u) + buf = append(buf, numBuf[:]...) + p := (*[maxDataSize]byte)(unsafe.Pointer(&ef.data[0])) + b := (*p)[:] + buf = append(buf, b[:len(ef.data)*8]...) + return buf +} + const maxDataSize = 0xFFFFFFFFFFFF // Read inputs the state of golomb rice encoding from a reader s func ReadEliasFano(r []byte) (*EliasFano, int) { ef := &EliasFano{} ef.count = binary.BigEndian.Uint64(r[:8]) - //fmt.Printf("read: %d,%x\n", ef.count, r[:8]) ef.u = binary.BigEndian.Uint64(r[8:16]) - ef.minDelta = binary.BigEndian.Uint64(r[16:24]) - p := (*[maxDataSize / 8]uint64)(unsafe.Pointer(&r[24])) + ef.maxOffset = ef.u - 1 + p := (*[maxDataSize / 8]uint64)(unsafe.Pointer(&r[16])) ef.data = p[:] ef.deriveFields() - return ef, 24 + 8*len(ef.data) + return ef, 16 + 8*len(ef.data) } // DoubleEliasFano can be used to encode two monotone sequences diff --git a/recsplit/eliasfano32/elias_fano_fuzz_test.go b/recsplit/eliasfano32/elias_fano_fuzz_test.go index caba53ce8..29bd4c6ce 100644 --- a/recsplit/eliasfano32/elias_fano_fuzz_test.go +++ b/recsplit/eliasfano32/elias_fano_fuzz_test.go @@ -23,7 +23,8 @@ import ( "testing" ) -// gotip test -trimpath -v -fuzz=FuzzEliasFano -fuzztime=10s ./recsplit +// gotip test -trimpath -v -fuzz=FuzzSingleEliasFano ./recsplit/eliasfano32 +// gotip test -trimpath -v -fuzz=FuzzDoubleEliasFano ./recsplit/eliasfano32 func FuzzSingleEliasFano(f *testing.F) { f.Fuzz(func(t *testing.T, in []byte) { @@ -40,14 +41,10 @@ func FuzzSingleEliasFano(f *testing.F) { // Treat each byte of the sequence as difference between previous value and the next count := len(in) keys := make([]uint64, count+1) - var minDeltaCumKeys uint64 for i, b := range in { keys[i+1] = keys[i] + uint64(b) - if i == 0 || uint64(b) < minDeltaCumKeys { - minDeltaCumKeys = uint64(b) - } } - ef := NewEliasFano(uint64(count+1), keys[count], minDeltaCumKeys) + ef := NewEliasFano(uint64(count+1), keys[count]) for _, c := range keys { ef.AddOffset(c) } @@ -78,26 +75,19 @@ func FuzzDoubleEliasFano(f *testing.F) { // Treat each byte of the sequence as difference between previous value and the next numBuckets := len(in) / 2 cumKeys := make([]uint64, numBuckets+1) - var minDeltaCumKeys, minDeltaPosition uint64 position := make([]uint64, numBuckets+1) for i, b := range in[:numBuckets] { cumKeys[i+1] = cumKeys[i] + uint64(b) - if i == 0 || uint64(b) < minDeltaCumKeys { - minDeltaCumKeys = uint64(b) - } } for i, b := range in[numBuckets:] { position[i+1] = position[i] + uint64(b) - if i == 0 || uint64(b) < minDeltaPosition { - minDeltaPosition = uint64(b) - } } - ef1 := NewEliasFano(uint64(numBuckets+1), cumKeys[numBuckets], minDeltaCumKeys) + ef1 := NewEliasFano(uint64(numBuckets+1), cumKeys[numBuckets]) for _, c := range cumKeys { ef1.AddOffset(c) } ef1.Build() - ef2 := NewEliasFano(uint64(numBuckets+1), position[numBuckets], minDeltaPosition) + ef2 := NewEliasFano(uint64(numBuckets+1), position[numBuckets]) for _, p := range position { ef2.AddOffset(p) } diff --git a/recsplit/eliasfano32/elias_fano_test.go b/recsplit/eliasfano32/elias_fano_test.go new file mode 100644 index 000000000..a5f782c28 --- /dev/null +++ b/recsplit/eliasfano32/elias_fano_test.go @@ -0,0 +1,76 @@ +/* + Copyright 2022 Erigon contributors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package eliasfano32 + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestEliasFano(t *testing.T) { + offsets := []uint64{1, 4, 6, 8, 10, 14, 16, 19, 22, 34, 37, 39, 41, 43, 48, 51, 54, 58, 62} + count := uint64(len(offsets)) + maxOffset := offsets[0] + for _, offset := range offsets { + if offset > maxOffset { + maxOffset = offset + } + } + ef := NewEliasFano(count, maxOffset) + for _, offset := range offsets { + ef.AddOffset(offset) + } + ef.Build() + for i, offset := range offsets { + offset1 := ef.Get(uint64(i)) + assert.Equal(t, offset, offset1, "offset") + } + v, ok := ef.Search(37) + assert.True(t, ok, "search1") + assert.Equal(t, uint64(37), v, "search1") + v, ok = ef.Search(0) + assert.True(t, ok, "search2") + assert.Equal(t, uint64(1), v, "search2") + _, ok = ef.Search(100) + assert.False(t, ok, "search3") + v, ok = ef.Search(11) + assert.True(t, ok, "search4") + assert.Equal(t, uint64(14), v, "search4") +} + +func TestIterator(t *testing.T) { + offsets := []uint64{1, 4, 6, 8, 10, 14, 16, 19, 22, 34, 37, 39, 41, 43, 48, 51, 54, 58, 62} + count := uint64(len(offsets)) + maxOffset := offsets[0] + for _, offset := range offsets { + if offset > maxOffset { + maxOffset = offset + } + } + ef := NewEliasFano(count, maxOffset) + for _, offset := range offsets { + ef.AddOffset(offset) + } + ef.Build() + efi := ef.Iterator() + i := 0 + for efi.HasNext() { + assert.Equal(t, offsets[i], efi.Next(), "iter") + i++ + } +} diff --git a/recsplit/recsplit.go b/recsplit/recsplit.go index 7ef73cd85..5ba89d64f 100644 --- a/recsplit/recsplit.go +++ b/recsplit/recsplit.go @@ -570,7 +570,7 @@ func (rs *RecSplit) Build() error { } if rs.enums { - rs.offsetEf = eliasfano32.NewEliasFano(rs.keysAdded, rs.maxOffset, rs.minDelta) + rs.offsetEf = eliasfano32.NewEliasFano(rs.keysAdded, rs.maxOffset) defer rs.offsetCollector.Close() if err := rs.offsetCollector.Load(nil, "", rs.loadFuncOffset, etl.TransformArgs{}); err != nil { return err