RecSplit: store BaseDataID in .idx file (helps to navigate over non blockNum-based entries) (#180)

* save

* save

* save

* save
This commit is contained in:
Alex Sharov 2021-11-21 21:52:23 +07:00 committed by GitHub
parent 06fb85a0ed
commit bb3f510d16
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 19 additions and 4 deletions

View File

@ -42,6 +42,7 @@ type Index struct {
ef eliasfano16.DoubleEliasFano
enums bool
offsetEf *eliasfano32.EliasFano
baseDataID uint64
bucketCount uint64 // Number of buckets
hasher murmur3.Hash128 // Salted hash function to use for splitting into initial buckets and mapping to 64-bit fingerprints
bucketSize int
@ -80,10 +81,12 @@ func OpenIndex(indexFile string) (*Index, error) {
}
idx.data = idx.mmapHandle1[:size]
// Read number of keys and bytes per record
idx.keyCount = binary.BigEndian.Uint64(idx.data[:8])
idx.bytesPerRec = int(idx.data[8])
idx.baseDataID = binary.BigEndian.Uint64(idx.data[:8])
idx.keyCount = binary.BigEndian.Uint64(idx.data[8:16])
idx.bytesPerRec = int(idx.data[16])
idx.recMask = (uint64(1) << (8 * idx.bytesPerRec)) - 1
offset := 9 + int(idx.keyCount)*idx.bytesPerRec
offset := 16 + 1 + int(idx.keyCount)*idx.bytesPerRec
// Bucket count, bucketSize, leafSize
idx.bucketCount = binary.BigEndian.Uint64(idx.data[offset:])
offset += 8
@ -138,6 +141,8 @@ func OpenIndex(indexFile string) (*Index, error) {
return idx, nil
}
func (idx *Index) BaseDataID() uint64 { return idx.baseDataID }
func (idx *Index) Close() error {
if err := mmap.Munmap(idx.mmapHandle1, idx.mmapHandle2); err != nil {
return err
@ -229,7 +234,7 @@ func (idx Index) Lookup(key []byte) uint64 {
}
b := gr.ReadNext(idx.golombParam(m))
rec := int(cumKeys) + int(remap16(remix(fingerprint+idx.startSeed[level]+b), m))
return binary.BigEndian.Uint64(idx.data[1+idx.bytesPerRec*(rec+1):]) & idx.recMask
return binary.BigEndian.Uint64(idx.data[1+8+idx.bytesPerRec*(rec+1):]) & idx.recMask
}
func (idx Index) Lookup2(i uint64) uint64 {

View File

@ -61,6 +61,7 @@ type RecSplit struct {
bucketSize int
keyExpectedCount uint64 // Number of keys in the hash table
keysAdded uint64 // Number of keys actually added to the recSplit (to check the match with keyExpectedCount)
baseDataID uint64 // Minimal app-specific ID of entries of this index - helps app understand what data stored in given shard - persistent field
bucketCount uint64 // Number of buckets
hasher murmur3.Hash128 // Salted hash function to use for splitting into initial buckets and mapping to 64-bit fingerprints
bucketCollector *etl.Collector // Collector that sorts by buckets
@ -108,6 +109,7 @@ type RecSplitArgs struct {
TmpDir string
StartSeed []uint64 // For each level of recursive split, the hash seed (salt) used for that level - need to be generated randomly and be large enough to accomodate all the levels
Enums bool // Whether two level index needs to be built, where perfect hash map points to an enumeration, and enumeration points to offsets
BaseDataID uint64
}
// NewRecSplit creates a new RecSplit instance with given number of keys and given bucket size
@ -126,6 +128,7 @@ func NewRecSplit(args RecSplitArgs) (*RecSplit, error) {
rs.hasher = murmur3.New128WithSeed(rs.salt)
rs.tmpDir = args.TmpDir
rs.indexFile = args.IndexFile
rs.baseDataID = args.BaseDataID
rs.bucketCollector = etl.NewCollector(RecSplitLogPrefix, rs.tmpDir, etl.NewSortableBuffer(etl.BufferOptimalSize))
rs.enums = args.Enums
if args.Enums {
@ -504,6 +507,12 @@ func (rs *RecSplit) Build() error {
defer rs.indexF.Close()
rs.indexW = bufio.NewWriterSize(rs.indexF, etl.BufIOSize)
defer rs.indexW.Flush()
// Write minimal app-specific dataID in this index file
binary.BigEndian.PutUint64(rs.numBuf[:], rs.baseDataID)
if _, err = rs.indexW.Write(rs.numBuf[:]); err != nil {
return fmt.Errorf("write number of keys: %w", err)
}
// Write number of keys
binary.BigEndian.PutUint64(rs.numBuf[:], rs.keysAdded)
if _, err = rs.indexW.Write(rs.numBuf[:]); err != nil {
@ -514,6 +523,7 @@ func (rs *RecSplit) Build() error {
if err = rs.indexW.WriteByte(byte(rs.bytesPerRec)); err != nil {
return fmt.Errorf("write bytes per record: %w", err)
}
rs.currentBucketIdx = math.MaxUint64 // To make sure 0 bucket is detected
defer rs.bucketCollector.Close()
if err := rs.bucketCollector.Load(nil, "", rs.loadFuncBucket, etl.TransformArgs{}); err != nil {