2021-09-13 17:31:09 +00:00
/ *
Copyright 2021 Erigon contributors
Licensed under the Apache License , Version 2.0 ( the "License" ) ;
you may not use this file except in compliance with the License .
You may obtain a copy of the License at
http : //www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing , software
distributed under the License is distributed on an "AS IS" BASIS ,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
See the License for the specific language governing permissions and
limitations under the License .
* /
package recsplit
import (
2021-10-16 09:43:41 +00:00
"bufio"
2022-02-13 09:14:04 +00:00
"crypto/rand"
2021-09-13 17:31:09 +00:00
"encoding/binary"
"fmt"
2022-08-01 04:03:48 +00:00
"io"
2021-09-13 17:31:09 +00:00
"math"
"math/bits"
2021-10-16 09:43:41 +00:00
"os"
2022-08-04 05:31:17 +00:00
"path/filepath"
2021-09-13 17:31:09 +00:00
2022-05-30 02:06:11 +00:00
"github.com/c2h5oh/datasize"
2021-09-13 17:31:09 +00:00
"github.com/ledgerwatch/erigon-lib/etl"
2021-11-05 10:04:17 +00:00
"github.com/ledgerwatch/erigon-lib/recsplit/eliasfano16"
"github.com/ledgerwatch/erigon-lib/recsplit/eliasfano32"
2022-03-21 04:22:17 +00:00
"github.com/ledgerwatch/log/v3"
2021-09-13 17:31:09 +00:00
"github.com/spaolacci/murmur3"
)
2021-11-08 07:27:21 +00:00
var ASSERT = false
2021-11-07 02:54:48 +00:00
var ErrCollision = fmt . Errorf ( "duplicate key" )
2021-09-13 17:31:09 +00:00
const RecSplitLogPrefix = "recsplit"
const MaxLeafSize = 24
2021-09-20 11:14:49 +00:00
/ * * David Stafford ' s ( http : //zimbry.blogspot.com/2011/09/better-bit-mixing-improving-on.html)
* 13 th variant of the 64 - bit finalizer function in Austin Appleby ' s
* MurmurHash3 ( https : //github.com/aappleby/smhasher).
*
* @ param z a 64 - bit integer .
* @ return a 64 - bit integer obtained by mixing the bits of ` z ` .
* /
2021-09-13 17:31:09 +00:00
2021-09-20 11:14:49 +00:00
func remix ( z uint64 ) uint64 {
z = ( z ^ ( z >> 30 ) ) * 0xbf58476d1ce4e5b9
z = ( z ^ ( z >> 27 ) ) * 0x94d049bb133111eb
return z ^ ( z >> 31 )
2021-09-13 17:31:09 +00:00
}
// RecSplit is the implementation of Recursive Split algorithm for constructing perfect hash mapping, described in
// https://arxiv.org/pdf/1910.06416.pdf Emmanuel Esposito, Thomas Mueller Graf, and Sebastiano Vigna.
// Recsplit: Minimal perfect hashing via recursive splitting. In 2020 Proceedings of the Symposium on Algorithm Engineering and Experiments (ALENEX),
// pages 175− 185. SIAM, 2020.
type RecSplit struct {
2021-10-16 09:43:41 +00:00
bucketSize int
keyExpectedCount uint64 // Number of keys in the hash table
keysAdded uint64 // Number of keys actually added to the recSplit (to check the match with keyExpectedCount)
2021-11-21 14:52:23 +00:00
baseDataID uint64 // Minimal app-specific ID of entries of this index - helps app understand what data stored in given shard - persistent field
2021-10-16 09:43:41 +00:00
bucketCount uint64 // Number of buckets
hasher murmur3 . Hash128 // Salted hash function to use for splitting into initial buckets and mapping to 64-bit fingerprints
2022-05-30 02:06:11 +00:00
etlBufLimit datasize . ByteSize
2022-08-04 05:31:17 +00:00
lvl log . Lvl
2022-05-30 02:06:11 +00:00
bucketCollector * etl . Collector // Collector that sorts by buckets
enums bool // Whether to build two level index with perfect hash table pointing to enumeration and enumeration pointing to offsets
offsetCollector * etl . Collector // Collector that sorts by offsets
built bool // Flag indicating that the hash function has been built and no more keys can be added
currentBucketIdx uint64 // Current bucket being accumulated
currentBucket [ ] uint64 // 64-bit fingerprints of keys in the current bucket accumulated before the recsplit is performed for that bucket
currentBucketOffs [ ] uint64 // Index offsets for the current bucket
maxOffset uint64 // Maximum value of index offset to later decide how many bytes to use for the encoding
gr GolombRice // Helper object to encode the tree of hash function salts using Golomb-Rice code.
2021-09-18 21:59:27 +00:00
// Helper object to encode the sequence of cumulative number of keys in the buckets
// and the sequence of of cumulative bit offsets of buckets in the Golomb-Rice code.
2021-11-05 10:04:17 +00:00
ef eliasfano16 . DoubleEliasFano
offsetEf * eliasfano32 . EliasFano // Elias Fano instance for encoding the offsets
bucketSizeAcc [ ] uint64 // Bucket size accumulator
bucketPosAcc [ ] uint64 // Accumulator for position of every bucket in the encoding of the hash function
leafSize uint16 // Leaf size for recursive split algorithm
primaryAggrBound uint16 // The lower bound for primary key aggregation (computed from leafSize)
secondaryAggrBound uint16 // The lower bound for secondary key aggregation (computed from leadSize)
2021-09-20 11:14:49 +00:00
startSeed [ ] uint64
2021-09-18 21:59:27 +00:00
golombRice [ ] uint32
2021-09-20 11:14:49 +00:00
buffer [ ] uint64
2021-10-16 09:43:41 +00:00
offsetBuffer [ ] uint64
2021-09-20 16:39:32 +00:00
count [ ] uint16
2021-09-20 11:14:49 +00:00
salt uint32 // Murmur3 hash used for converting keys to 64-bit values and assigning to buckets
collision bool
tmpDir string
2021-10-16 09:43:41 +00:00
indexFile string
2022-08-04 05:31:17 +00:00
indexFileName string
2021-10-16 09:43:41 +00:00
indexF * os . File
indexW * bufio . Writer
bytesPerRec int
numBuf [ 8 ] byte
2022-02-09 06:22:45 +00:00
bucketKeyBuf [ 16 ] byte
2021-09-20 11:14:49 +00:00
trace bool
2021-10-16 09:43:41 +00:00
prevOffset uint64 // Previously added offset (for calculating minDelta for Elias Fano encoding of "enum -> offset" index)
minDelta uint64 // minDelta for Elias Fano encoding of "enum -> offset" index
2021-09-13 17:31:09 +00:00
}
type RecSplitArgs struct {
2022-05-30 02:06:11 +00:00
KeyCount int
BucketSize int
Salt uint32 // Hash seed (salt) for the hash function used for allocating the initial buckets - need to be generated randomly
LeafSize uint16
IndexFile string // File name where the index and the minimal perfect hash function will be written to
TmpDir string
StartSeed [ ] uint64 // For each level of recursive split, the hash seed (salt) used for that level - need to be generated randomly and be large enough to accomodate all the levels
Enums bool // Whether two level index needs to be built, where perfect hash map points to an enumeration, and enumeration points to offsets
BaseDataID uint64
EtlBufLimit datasize . ByteSize
2021-09-13 17:31:09 +00:00
}
// NewRecSplit creates a new RecSplit instance with given number of keys and given bucket size
// Typical bucket size is 100 - 2000, larger bucket sizes result in smaller representations of hash functions, at a cost of slower access
// salt parameters is used to randomise the hash function construction, to ensure that different Erigon instances (nodes)
// are likely to use different hash function, to collision attacks are unlikely to slow down any meaningful number of nodes at the same time
func NewRecSplit ( args RecSplitArgs ) ( * RecSplit , error ) {
bucketCount := ( args . KeyCount + args . BucketSize - 1 ) / args . BucketSize
2021-09-18 21:59:27 +00:00
rs := & RecSplit { bucketSize : args . BucketSize , keyExpectedCount : uint64 ( args . KeyCount ) , bucketCount : uint64 ( bucketCount ) }
2021-11-19 15:00:55 +00:00
if len ( args . StartSeed ) == 0 {
args . StartSeed = [ ] uint64 { 0x106393c187cae21a , 0x6453cec3f7376937 , 0x643e521ddbd2be98 , 0x3740c6412f6572cb , 0x717d47562f1ce470 , 0x4cd6eb4c63befb7c , 0x9bfd8c5e18c8da73 ,
0x082f20e10092a9a3 , 0x2ada2ce68d21defc , 0xe33cb4f3e7c6466b , 0x3980be458c509c59 , 0xc466fd9584828e8c , 0x45f0aabe1a61ede6 , 0xf6e7b8b33ad9b98d ,
0x4ef95e25f4b4983d , 0x81175195173b92d3 , 0x4e50927d8dd15978 , 0x1ea2099d1fafae7f , 0x425c8a06fbaaa815 , 0xcd4216006c74052a }
}
2021-09-20 11:14:49 +00:00
rs . salt = args . Salt
2022-02-13 09:14:04 +00:00
if rs . salt == 0 {
seedBytes := make ( [ ] byte , 4 )
if _ , err := rand . Read ( seedBytes ) ; err != nil {
return nil , err
}
rs . salt = binary . BigEndian . Uint32 ( seedBytes )
}
2021-09-20 16:39:32 +00:00
rs . hasher = murmur3 . New128WithSeed ( rs . salt )
2021-09-20 11:14:49 +00:00
rs . tmpDir = args . TmpDir
2021-10-16 09:43:41 +00:00
rs . indexFile = args . IndexFile
2022-08-04 05:31:17 +00:00
_ , fname := filepath . Split ( rs . indexFile )
rs . indexFileName = fname
2021-11-21 14:52:23 +00:00
rs . baseDataID = args . BaseDataID
2022-05-30 02:06:11 +00:00
rs . etlBufLimit = args . EtlBufLimit
if rs . etlBufLimit == 0 {
rs . etlBufLimit = etl . BufferOptimalSize
}
rs . bucketCollector = etl . NewCollector ( RecSplitLogPrefix , rs . tmpDir , etl . NewSortableBuffer ( rs . etlBufLimit ) )
2022-08-04 05:31:17 +00:00
rs . bucketCollector . LogLvl ( log . LvlDebug )
2021-10-16 09:43:41 +00:00
rs . enums = args . Enums
if args . Enums {
2022-05-30 02:06:11 +00:00
rs . offsetCollector = etl . NewCollector ( RecSplitLogPrefix , rs . tmpDir , etl . NewSortableBuffer ( rs . etlBufLimit ) )
2022-08-04 05:31:17 +00:00
rs . offsetCollector . LogLvl ( log . LvlDebug )
2021-10-16 09:43:41 +00:00
}
2021-09-20 11:14:49 +00:00
rs . currentBucket = make ( [ ] uint64 , 0 , args . BucketSize )
2021-10-16 09:43:41 +00:00
rs . currentBucketOffs = make ( [ ] uint64 , 0 , args . BucketSize )
rs . maxOffset = 0
2021-09-18 21:59:27 +00:00
rs . bucketSizeAcc = make ( [ ] uint64 , 1 , bucketCount + 1 )
rs . bucketPosAcc = make ( [ ] uint64 , 1 , bucketCount + 1 )
2021-09-13 17:31:09 +00:00
if args . LeafSize > MaxLeafSize {
return nil , fmt . Errorf ( "exceeded max leaf size %d: %d" , MaxLeafSize , args . LeafSize )
}
rs . leafSize = args . LeafSize
2021-09-20 16:39:32 +00:00
rs . primaryAggrBound = rs . leafSize * uint16 ( math . Max ( 2 , math . Ceil ( 0.35 * float64 ( rs . leafSize ) + 1. / 2. ) ) )
2021-09-13 17:31:09 +00:00
if rs . leafSize < 7 {
rs . secondaryAggrBound = rs . primaryAggrBound * 2
} else {
2021-09-20 16:39:32 +00:00
rs . secondaryAggrBound = rs . primaryAggrBound * uint16 ( math . Ceil ( 0.21 * float64 ( rs . leafSize ) + 9. / 10. ) )
2021-09-13 17:31:09 +00:00
}
rs . startSeed = args . StartSeed
2021-09-20 16:39:32 +00:00
rs . count = make ( [ ] uint16 , rs . secondaryAggrBound )
2021-09-13 17:31:09 +00:00
return rs , nil
}
2021-11-15 14:19:56 +00:00
func ( rs * RecSplit ) Close ( ) {
if rs . indexF != nil {
rs . indexF . Close ( )
}
if rs . bucketCollector != nil {
rs . bucketCollector . Close ( )
}
if rs . offsetCollector != nil {
rs . offsetCollector . Close ( )
}
}
2022-08-04 05:31:17 +00:00
func ( rs * RecSplit ) LogLvl ( lvl log . Lvl ) { rs . lvl = lvl }
2022-03-21 04:22:17 +00:00
2021-09-20 11:14:49 +00:00
func ( rs * RecSplit ) SetTrace ( trace bool ) {
rs . trace = trace
}
2021-09-13 17:31:09 +00:00
// remap converts the number x which is assumed to be uniformly distributed over the range [0..2^64) to the number that is uniformly
// distributed over the range [0..n)
func remap ( x uint64 , n uint64 ) uint64 {
hi , _ := bits . Mul64 ( x , n )
return hi
}
const mask48 uint64 = ( 1 << 48 ) - 1
// remap converts the number x which is assumed to be uniformly distributed over the range [0..2^64) to the number that is uniformly
// distributed over the range [0..n), under assumption that n is less than 2^16
2021-09-20 16:39:32 +00:00
func remap16 ( x uint64 , n uint16 ) uint16 {
return uint16 ( ( ( x & mask48 ) * uint64 ( n ) ) >> 48 )
2021-09-13 17:31:09 +00:00
}
2021-09-20 11:14:49 +00:00
// ResetNextSalt resets the RecSplit and uses the next salt value to try to avoid collisions
// when mapping keys to 64-bit values
func ( rs * RecSplit ) ResetNextSalt ( ) {
2022-02-10 15:30:55 +00:00
rs . built = false
2021-09-20 11:14:49 +00:00
rs . collision = false
rs . keysAdded = 0
rs . salt ++
2021-09-20 16:39:32 +00:00
rs . hasher = murmur3 . New128WithSeed ( rs . salt )
2022-03-10 07:48:37 +00:00
if rs . bucketCollector != nil {
rs . bucketCollector . Close ( )
}
2022-05-30 02:06:11 +00:00
rs . bucketCollector = etl . NewCollector ( RecSplitLogPrefix , rs . tmpDir , etl . NewSortableBuffer ( rs . etlBufLimit ) )
2022-02-10 07:40:24 +00:00
if rs . offsetCollector != nil {
2022-03-10 07:48:37 +00:00
rs . offsetCollector . Close ( )
2022-05-30 02:06:11 +00:00
rs . offsetCollector = etl . NewCollector ( RecSplitLogPrefix , rs . tmpDir , etl . NewSortableBuffer ( rs . etlBufLimit ) )
2022-02-10 07:40:24 +00:00
}
2021-09-20 11:14:49 +00:00
rs . currentBucket = rs . currentBucket [ : 0 ]
2021-10-16 09:43:41 +00:00
rs . currentBucketOffs = rs . currentBucketOffs [ : 0 ]
rs . maxOffset = 0
2021-09-20 11:14:49 +00:00
rs . bucketSizeAcc = rs . bucketSizeAcc [ : 1 ] // First entry is always zero
2022-02-16 16:44:00 +00:00
rs . bucketPosAcc = rs . bucketPosAcc [ : 1 ] // First entry is always zero
2021-09-20 11:14:49 +00:00
}
2022-03-19 04:38:37 +00:00
func splitParams ( m , leafSize , primaryAggrBound , secondaryAggrBound uint16 ) ( fanout , unit uint16 ) {
2021-10-16 09:43:41 +00:00
if m > secondaryAggrBound { // High-level aggregation (fanout 2)
unit = secondaryAggrBound * ( ( ( m + 1 ) / 2 + secondaryAggrBound - 1 ) / secondaryAggrBound )
2021-09-18 21:59:27 +00:00
fanout = 2
2021-10-16 09:43:41 +00:00
} else if m > primaryAggrBound { // Second-level aggregation
unit = primaryAggrBound
fanout = ( m + primaryAggrBound - 1 ) / primaryAggrBound
2021-09-18 21:59:27 +00:00
} else { // First-level aggregation
2021-10-16 09:43:41 +00:00
unit = leafSize
fanout = ( m + leafSize - 1 ) / leafSize
2021-09-18 21:59:27 +00:00
}
return
}
2022-03-19 04:38:37 +00:00
func computeGolombRice ( m uint16 , table [ ] uint32 , leafSize , primaryAggrBound , secondaryAggrBound uint16 ) {
2021-10-16 09:43:41 +00:00
fanout , unit := splitParams ( m , leafSize , primaryAggrBound , secondaryAggrBound )
2021-09-20 16:39:32 +00:00
k := make ( [ ] uint16 , fanout )
2021-09-18 21:59:27 +00:00
k [ fanout - 1 ] = m
2021-09-20 16:39:32 +00:00
for i := uint16 ( 0 ) ; i < fanout - 1 ; i ++ {
2021-09-18 21:59:27 +00:00
k [ i ] = unit
k [ fanout - 1 ] -= k [ i ]
}
2022-03-19 04:38:37 +00:00
sqrtProd := float64 ( 1 )
2021-09-20 16:39:32 +00:00
for i := uint16 ( 0 ) ; i < fanout ; i ++ {
2022-03-19 04:38:37 +00:00
sqrtProd *= math . Sqrt ( float64 ( k [ i ] ) )
2021-09-18 21:59:27 +00:00
}
2022-03-19 04:38:37 +00:00
p := math . Sqrt ( float64 ( m ) ) / ( math . Pow ( 2 * math . Pi , ( float64 ( fanout ) - 1. ) / 2.0 ) * sqrtProd )
2021-09-18 21:59:27 +00:00
golombRiceLength := uint32 ( math . Ceil ( math . Log2 ( - math . Log ( ( math . Sqrt ( 5 ) + 1.0 ) / 2.0 ) / math . Log1p ( - p ) ) ) ) // log2 Golomb modulus
if golombRiceLength > 0x1F {
panic ( "golombRiceLength > 0x1F" )
}
table [ m ] = golombRiceLength << 27
2021-09-20 16:39:32 +00:00
for i := uint16 ( 0 ) ; i < fanout ; i ++ {
2021-09-18 21:59:27 +00:00
golombRiceLength += table [ k [ i ] ] & 0xFFFF
}
if golombRiceLength > 0xFFFF {
panic ( "golombRiceLength > 0xFFFF" )
}
table [ m ] |= golombRiceLength // Sum of Golomb-Rice codeslengths in the subtree, stored in the lower 16 bits
nodes := uint32 ( 1 )
2021-09-20 16:39:32 +00:00
for i := uint16 ( 0 ) ; i < fanout ; i ++ {
2021-09-18 21:59:27 +00:00
nodes += ( table [ k [ i ] ] >> 16 ) & 0x7FF
}
2021-10-16 09:43:41 +00:00
if leafSize >= 3 && nodes > 0x7FF {
2021-09-18 21:59:27 +00:00
panic ( "rs.leafSize >= 3 && nodes > 0x7FF" )
}
table [ m ] |= nodes << 16
}
// golombParam returns the optimal Golomb parameter to use for encoding
// salt for the part of the hash function separating m elements. It is based on
// calculations with assumptions that we draw hash functions at random
2021-09-20 16:39:32 +00:00
func ( rs * RecSplit ) golombParam ( m uint16 ) int {
s := uint16 ( len ( rs . golombRice ) )
2021-09-18 21:59:27 +00:00
for m >= s {
rs . golombRice = append ( rs . golombRice , 0 )
// For the case where bucket is larger than planned
if s == 0 {
rs . golombRice [ 0 ] = ( bijMemo [ 0 ] << 27 ) | bijMemo [ 0 ]
} else if s <= rs . leafSize {
rs . golombRice [ s ] = ( bijMemo [ s ] << 27 ) | ( uint32 ( 1 ) << 16 ) | bijMemo [ s ]
} else {
2021-10-16 09:43:41 +00:00
computeGolombRice ( s , rs . golombRice , rs . leafSize , rs . primaryAggrBound , rs . secondaryAggrBound )
2021-09-18 21:59:27 +00:00
}
s ++
}
return int ( rs . golombRice [ m ] >> 27 )
}
2021-09-13 17:31:09 +00:00
// Add key to the RecSplit. There can be many more keys than what fits in RAM, and RecSplit
// spills data onto disk to accomodate that. The key gets copied by the collector, therefore
// the slice underlying key is not getting accessed by RecSplit after this invocation.
2021-10-16 09:43:41 +00:00
func ( rs * RecSplit ) AddKey ( key [ ] byte , offset uint64 ) error {
2021-09-13 17:31:09 +00:00
if rs . built {
return fmt . Errorf ( "cannot add keys after perfect hash function had been built" )
}
rs . hasher . Reset ( )
rs . hasher . Write ( key ) //nolint:errcheck
2021-09-20 16:39:32 +00:00
hi , lo := rs . hasher . Sum128 ( )
2022-02-09 06:22:45 +00:00
binary . BigEndian . PutUint64 ( rs . bucketKeyBuf [ : ] , remap ( hi , rs . bucketCount ) )
binary . BigEndian . PutUint64 ( rs . bucketKeyBuf [ 8 : ] , lo )
binary . BigEndian . PutUint64 ( rs . numBuf [ : ] , offset )
2021-10-16 09:43:41 +00:00
if offset > rs . maxOffset {
rs . maxOffset = offset
}
if rs . keysAdded > 0 {
delta := offset - rs . prevOffset
if rs . keysAdded == 1 || delta < rs . minDelta {
rs . minDelta = delta
}
}
if rs . enums {
2022-02-09 06:22:45 +00:00
if err := rs . offsetCollector . Collect ( rs . numBuf [ : ] , nil ) ; err != nil {
2021-10-16 09:43:41 +00:00
return err
}
2022-02-09 06:22:45 +00:00
binary . BigEndian . PutUint64 ( rs . numBuf [ : ] , rs . keysAdded )
if err := rs . bucketCollector . Collect ( rs . bucketKeyBuf [ : ] , rs . numBuf [ : ] ) ; err != nil {
2021-10-16 09:43:41 +00:00
return err
}
} else {
2022-02-09 06:22:45 +00:00
if err := rs . bucketCollector . Collect ( rs . bucketKeyBuf [ : ] , rs . numBuf [ : ] ) ; err != nil {
2021-10-16 09:43:41 +00:00
return err
}
}
2021-09-13 17:31:09 +00:00
rs . keysAdded ++
2021-11-01 02:23:38 +00:00
rs . prevOffset = offset
2021-10-16 09:43:41 +00:00
return nil
2021-09-13 17:31:09 +00:00
}
2021-09-18 21:59:27 +00:00
func ( rs * RecSplit ) recsplitCurrentBucket ( ) error {
2021-09-13 17:31:09 +00:00
// Extend rs.bucketSizeAcc to accomodate current bucket index + 1
for len ( rs . bucketSizeAcc ) <= int ( rs . currentBucketIdx ) + 1 {
rs . bucketSizeAcc = append ( rs . bucketSizeAcc , rs . bucketSizeAcc [ len ( rs . bucketSizeAcc ) - 1 ] )
}
2021-09-18 21:59:27 +00:00
rs . bucketSizeAcc [ int ( rs . currentBucketIdx ) + 1 ] += uint64 ( len ( rs . currentBucket ) )
2021-11-08 07:27:21 +00:00
// Sets of size 0 and 1 are not further processed, just write them to index
2021-09-13 17:31:09 +00:00
if len ( rs . currentBucket ) > 1 {
for i , key := range rs . currentBucket [ 1 : ] {
if key == rs . currentBucket [ i ] {
2021-09-20 11:14:49 +00:00
rs . collision = true
2021-11-07 02:54:48 +00:00
return fmt . Errorf ( "%w: %x" , ErrCollision , key )
2021-09-13 17:31:09 +00:00
}
}
2021-09-20 11:14:49 +00:00
bitPos := rs . gr . bitCount
if rs . buffer == nil {
rs . buffer = make ( [ ] uint64 , len ( rs . currentBucket ) )
2021-10-16 09:43:41 +00:00
rs . offsetBuffer = make ( [ ] uint64 , len ( rs . currentBucketOffs ) )
2021-09-20 11:14:49 +00:00
} else {
for len ( rs . buffer ) < len ( rs . currentBucket ) {
rs . buffer = append ( rs . buffer , 0 )
2021-10-16 09:43:41 +00:00
rs . offsetBuffer = append ( rs . offsetBuffer , 0 )
2021-09-20 11:14:49 +00:00
}
}
2021-10-16 09:43:41 +00:00
unary , err := rs . recsplit ( 0 /* level */ , rs . currentBucket , rs . currentBucketOffs , nil /* unary */ )
if err != nil {
return err
}
2021-09-18 21:59:27 +00:00
rs . gr . appendUnaryAll ( unary )
2021-09-20 11:14:49 +00:00
if rs . trace {
fmt . Printf ( "recsplitBucket(%d, %d, bitsize = %d)\n" , rs . currentBucketIdx , len ( rs . currentBucket ) , rs . gr . bitCount - bitPos )
}
2021-11-08 07:27:21 +00:00
} else {
for _ , offset := range rs . currentBucketOffs {
binary . BigEndian . PutUint64 ( rs . numBuf [ : ] , offset )
if _ , err := rs . indexW . Write ( rs . numBuf [ 8 - rs . bytesPerRec : ] ) ; err != nil {
return err
}
}
2021-09-13 17:31:09 +00:00
}
// Extend rs.bucketPosAcc to accomodate current bucket index + 1
for len ( rs . bucketPosAcc ) <= int ( rs . currentBucketIdx ) + 1 {
rs . bucketPosAcc = append ( rs . bucketPosAcc , rs . bucketPosAcc [ len ( rs . bucketPosAcc ) - 1 ] )
}
2021-09-20 11:14:49 +00:00
rs . bucketPosAcc [ int ( rs . currentBucketIdx ) + 1 ] = uint64 ( rs . gr . Bits ( ) )
2021-09-13 17:31:09 +00:00
// clear for the next buckey
rs . currentBucket = rs . currentBucket [ : 0 ]
2021-10-16 09:43:41 +00:00
rs . currentBucketOffs = rs . currentBucketOffs [ : 0 ]
2021-09-13 17:31:09 +00:00
return nil
}
// recsplit applies recSplit algorithm to the given bucket
2021-10-16 09:43:41 +00:00
func ( rs * RecSplit ) recsplit ( level int , bucket [ ] uint64 , offsets [ ] uint64 , unary [ ] uint64 ) ( [ ] uint64 , error ) {
2021-09-20 11:14:49 +00:00
if rs . trace {
fmt . Printf ( "recsplit(%d, %d, %x)\n" , level , len ( bucket ) , bucket )
}
2021-09-13 17:31:09 +00:00
// Pick initial salt for this level of recursive split
salt := rs . startSeed [ level ]
2021-09-20 16:39:32 +00:00
m := uint16 ( len ( bucket ) )
2021-09-13 17:31:09 +00:00
if m <= rs . leafSize {
// No need to build aggregation levels - just find find bijection
var mask uint32
for {
mask = 0
var fail bool
2021-09-20 16:39:32 +00:00
for i := uint16 ( 0 ) ; ! fail && i < m ; i ++ {
2021-09-20 11:14:49 +00:00
bit := uint32 ( 1 ) << remap16 ( remix ( bucket [ i ] + salt ) , m )
2021-09-13 17:31:09 +00:00
if mask & bit != 0 {
fail = true
} else {
mask |= bit
}
}
if ! fail {
break
}
salt ++
}
2021-10-16 09:43:41 +00:00
for i := uint16 ( 0 ) ; i < m ; i ++ {
j := remap16 ( remix ( bucket [ i ] + salt ) , m )
rs . offsetBuffer [ j ] = offsets [ i ]
}
for _ , offset := range rs . offsetBuffer [ : m ] {
binary . BigEndian . PutUint64 ( rs . numBuf [ : ] , offset )
if _ , err := rs . indexW . Write ( rs . numBuf [ 8 - rs . bytesPerRec : ] ) ; err != nil {
return nil , err
}
}
2021-09-13 17:31:09 +00:00
salt -= rs . startSeed [ level ]
2021-09-18 21:59:27 +00:00
log2golomb := rs . golombParam ( m )
2021-09-20 11:14:49 +00:00
if rs . trace {
fmt . Printf ( "encode bij %d with log2golomn %d at p = %d\n" , salt , log2golomb , rs . gr . bitCount )
}
2021-09-18 21:59:27 +00:00
rs . gr . appendFixed ( salt , log2golomb )
unary = append ( unary , salt >> log2golomb )
2021-09-13 17:31:09 +00:00
} else {
2021-10-16 09:43:41 +00:00
fanout , unit := splitParams ( m , rs . leafSize , rs . primaryAggrBound , rs . secondaryAggrBound )
2021-09-20 16:39:32 +00:00
count := rs . count
2021-09-13 17:31:09 +00:00
for {
2021-09-20 16:39:32 +00:00
for i := uint16 ( 0 ) ; i < fanout - 1 ; i ++ {
count [ i ] = 0
}
2021-09-13 17:31:09 +00:00
var fail bool
2021-09-20 16:39:32 +00:00
for i := uint16 ( 0 ) ; i < m ; i ++ {
count [ remap16 ( remix ( bucket [ i ] + salt ) , m ) / unit ] ++
}
for i := uint16 ( 0 ) ; i < fanout - 1 ; i ++ {
fail = fail || ( count [ i ] != unit )
2021-09-13 17:31:09 +00:00
}
2021-09-20 16:39:32 +00:00
if ! fail {
2021-09-13 17:31:09 +00:00
break
}
salt ++
}
2021-09-20 16:39:32 +00:00
for i , c := uint16 ( 0 ) , uint16 ( 0 ) ; i < fanout ; i ++ {
2021-09-20 11:14:49 +00:00
count [ i ] = c
c += unit
}
2021-09-20 16:39:32 +00:00
for i := uint16 ( 0 ) ; i < m ; i ++ {
j := remap16 ( remix ( bucket [ i ] + salt ) , m ) / unit
rs . buffer [ count [ j ] ] = bucket [ i ]
2021-10-16 09:43:41 +00:00
rs . offsetBuffer [ count [ j ] ] = offsets [ i ]
2021-09-20 11:14:49 +00:00
count [ j ] ++
}
copy ( bucket , rs . buffer )
2021-10-16 09:43:41 +00:00
copy ( offsets , rs . offsetBuffer )
2021-09-13 17:31:09 +00:00
salt -= rs . startSeed [ level ]
2021-09-18 21:59:27 +00:00
log2golomb := rs . golombParam ( m )
2021-09-20 11:14:49 +00:00
if rs . trace {
fmt . Printf ( "encode fanout %d: %d with log2golomn %d at p = %d\n" , fanout , salt , log2golomb , rs . gr . bitCount )
}
2021-09-18 21:59:27 +00:00
rs . gr . appendFixed ( salt , log2golomb )
unary = append ( unary , salt >> log2golomb )
2021-10-16 09:43:41 +00:00
var err error
2021-09-20 16:39:32 +00:00
var i uint16
2021-09-18 21:59:27 +00:00
for i = 0 ; i < m - unit ; i += unit {
2021-10-16 09:43:41 +00:00
if unary , err = rs . recsplit ( level + 1 , bucket [ i : i + unit ] , offsets [ i : i + unit ] , unary ) ; err != nil {
return nil , err
}
2021-09-13 17:31:09 +00:00
}
if m - i > 1 {
2021-10-16 09:43:41 +00:00
if unary , err = rs . recsplit ( level + 1 , bucket [ i : ] , offsets [ i : ] , unary ) ; err != nil {
return nil , err
}
} else if m - i == 1 {
binary . BigEndian . PutUint64 ( rs . numBuf [ : ] , offsets [ i ] )
if _ , err := rs . indexW . Write ( rs . numBuf [ 8 - rs . bytesPerRec : ] ) ; err != nil {
return nil , err
}
2021-09-13 17:31:09 +00:00
}
}
2021-10-16 09:43:41 +00:00
return unary , nil
2021-09-13 17:31:09 +00:00
}
2021-10-16 09:43:41 +00:00
// loadFuncBucket is required to satisfy the type etl.LoadFunc type, to use with collector.Load
func ( rs * RecSplit ) loadFuncBucket ( k , v [ ] byte , _ etl . CurrentTableReader , _ etl . LoadNextFunc ) error {
2021-09-13 17:31:09 +00:00
// k is the BigEndian encoding of the bucket number, and the v is the key that is assigned into that bucket
bucketIdx := binary . BigEndian . Uint64 ( k )
if rs . currentBucketIdx != bucketIdx {
if rs . currentBucketIdx != math . MaxUint64 {
if err := rs . recsplitCurrentBucket ( ) ; err != nil {
return err
}
}
rs . currentBucketIdx = bucketIdx
}
2021-09-20 11:14:49 +00:00
rs . currentBucket = append ( rs . currentBucket , binary . BigEndian . Uint64 ( k [ 8 : ] ) )
2021-10-16 09:43:41 +00:00
rs . currentBucketOffs = append ( rs . currentBucketOffs , binary . BigEndian . Uint64 ( v ) )
return nil
}
func ( rs * RecSplit ) loadFuncOffset ( k , _ [ ] byte , _ etl . CurrentTableReader , _ etl . LoadNextFunc ) error {
offset := binary . BigEndian . Uint64 ( k )
rs . offsetEf . AddOffset ( offset )
2021-09-13 17:31:09 +00:00
return nil
}
// Build has to be called after all the keys have been added, and it initiates the process
2021-10-16 09:43:41 +00:00
// of building the perfect hash function and writing index into a file
2021-09-13 17:31:09 +00:00
func ( rs * RecSplit ) Build ( ) error {
2022-02-09 06:22:45 +00:00
tmpIdxFilePath := rs . indexFile + ".tmp"
2022-01-07 07:27:26 +00:00
2021-09-13 17:31:09 +00:00
if rs . built {
return fmt . Errorf ( "already built" )
}
if rs . keysAdded != rs . keyExpectedCount {
return fmt . Errorf ( "expected keys %d, got %d" , rs . keyExpectedCount , rs . keysAdded )
}
2021-10-16 09:43:41 +00:00
var err error
2022-01-07 07:37:27 +00:00
if rs . indexF , err = os . Create ( tmpIdxFilePath ) ; err != nil {
2021-10-16 09:43:41 +00:00
return fmt . Errorf ( "create index file %s: %w" , rs . indexFile , err )
}
2021-11-19 15:00:55 +00:00
defer rs . indexF . Sync ( )
2021-10-16 09:43:41 +00:00
defer rs . indexF . Close ( )
2021-10-26 04:19:26 +00:00
rs . indexW = bufio . NewWriterSize ( rs . indexF , etl . BufIOSize )
2021-11-19 15:00:55 +00:00
defer rs . indexW . Flush ( )
2021-11-21 14:52:23 +00:00
// Write minimal app-specific dataID in this index file
binary . BigEndian . PutUint64 ( rs . numBuf [ : ] , rs . baseDataID )
if _ , err = rs . indexW . Write ( rs . numBuf [ : ] ) ; err != nil {
return fmt . Errorf ( "write number of keys: %w" , err )
}
2021-10-16 09:43:41 +00:00
// Write number of keys
binary . BigEndian . PutUint64 ( rs . numBuf [ : ] , rs . keysAdded )
if _ , err = rs . indexW . Write ( rs . numBuf [ : ] ) ; err != nil {
return fmt . Errorf ( "write number of keys: %w" , err )
}
// Write number of bytes per index record
rs . bytesPerRec = ( bits . Len64 ( rs . maxOffset ) + 7 ) / 8
if err = rs . indexW . WriteByte ( byte ( rs . bytesPerRec ) ) ; err != nil {
return fmt . Errorf ( "write bytes per record: %w" , err )
}
2021-11-21 14:52:23 +00:00
2021-09-13 17:31:09 +00:00
rs . currentBucketIdx = math . MaxUint64 // To make sure 0 bucket is detected
2021-10-25 02:12:00 +00:00
defer rs . bucketCollector . Close ( )
2022-08-04 05:31:17 +00:00
log . Log ( rs . lvl , "[index] calculating" , "file" , rs . indexFileName )
2021-10-25 02:12:00 +00:00
if err := rs . bucketCollector . Load ( nil , "" , rs . loadFuncBucket , etl . TransformArgs { } ) ; err != nil {
2021-09-13 17:31:09 +00:00
return err
}
if len ( rs . currentBucket ) > 0 {
if err := rs . recsplitCurrentBucket ( ) ; err != nil {
return err
}
}
2021-11-08 07:27:21 +00:00
if ASSERT {
rs . indexW . Flush ( )
rs . indexF . Seek ( 0 , 0 )
2022-08-01 04:03:48 +00:00
b , _ := io . ReadAll ( rs . indexF )
2021-11-08 07:27:21 +00:00
if len ( b ) != 9 + int ( rs . keysAdded ) * rs . bytesPerRec {
panic ( fmt . Errorf ( "expected: %d, got: %d; rs.keysAdded=%d, rs.bytesPerRec=%d, %s" , 9 + int ( rs . keysAdded ) * rs . bytesPerRec , len ( b ) , rs . keysAdded , rs . bytesPerRec , rs . indexFile ) )
}
}
2022-08-04 05:31:17 +00:00
log . Log ( rs . lvl , "[index] write" , "file" , rs . indexFileName )
2021-10-16 09:43:41 +00:00
if rs . enums {
2022-03-13 22:46:17 +00:00
rs . offsetEf = eliasfano32 . NewEliasFano ( rs . keysAdded , rs . maxOffset )
2021-10-25 02:12:00 +00:00
defer rs . offsetCollector . Close ( )
if err := rs . offsetCollector . Load ( nil , "" , rs . loadFuncOffset , etl . TransformArgs { } ) ; err != nil {
2021-10-16 09:43:41 +00:00
return err
}
2021-11-01 02:23:38 +00:00
rs . offsetEf . Build ( )
2021-10-16 09:43:41 +00:00
}
2021-09-18 21:59:27 +00:00
rs . gr . appendFixed ( 1 , 1 ) // Sentinel (avoids checking for parts of size 1)
// Construct Elias Fano index
rs . ef . Build ( rs . bucketSizeAcc , rs . bucketPosAcc )
2021-09-13 17:31:09 +00:00
rs . built = true
2021-10-16 09:43:41 +00:00
// Write out bucket count, bucketSize, leafSize
binary . BigEndian . PutUint64 ( rs . numBuf [ : ] , rs . bucketCount )
if _ , err := rs . indexW . Write ( rs . numBuf [ : 8 ] ) ; err != nil {
return fmt . Errorf ( "writing bucketCount: %w" , err )
2021-09-18 21:59:27 +00:00
}
2021-10-16 09:43:41 +00:00
binary . BigEndian . PutUint16 ( rs . numBuf [ : ] , uint16 ( rs . bucketSize ) )
if _ , err := rs . indexW . Write ( rs . numBuf [ : 2 ] ) ; err != nil {
return fmt . Errorf ( "writing bucketSize: %w" , err )
2021-09-18 21:59:27 +00:00
}
2021-10-16 09:43:41 +00:00
binary . BigEndian . PutUint16 ( rs . numBuf [ : ] , rs . leafSize )
if _ , err := rs . indexW . Write ( rs . numBuf [ : 2 ] ) ; err != nil {
return fmt . Errorf ( "writing leafSize: %w" , err )
}
// Write out salt
binary . BigEndian . PutUint32 ( rs . numBuf [ : ] , rs . salt )
if _ , err := rs . indexW . Write ( rs . numBuf [ : 4 ] ) ; err != nil {
return fmt . Errorf ( "writing salt: %w" , err )
}
// Write out start seeds
if err := rs . indexW . WriteByte ( byte ( len ( rs . startSeed ) ) ) ; err != nil {
return fmt . Errorf ( "writing len of start seeds: %w" , err )
}
for _ , s := range rs . startSeed {
binary . BigEndian . PutUint64 ( rs . numBuf [ : ] , s )
if _ , err := rs . indexW . Write ( rs . numBuf [ : 8 ] ) ; err != nil {
return fmt . Errorf ( "writing start seed: %w" , err )
2021-09-20 11:14:49 +00:00
}
2021-10-16 09:43:41 +00:00
}
if rs . enums {
if err := rs . indexW . WriteByte ( 1 ) ; err != nil {
return fmt . Errorf ( "writing enums = true: %w" , err )
2021-09-20 11:14:49 +00:00
}
2021-10-16 09:43:41 +00:00
} else {
if err := rs . indexW . WriteByte ( 0 ) ; err != nil {
return fmt . Errorf ( "writing enums = true: %w" , err )
2021-09-18 21:59:27 +00:00
}
2021-10-16 09:43:41 +00:00
}
if rs . enums {
// Write out elias fano for offsets
if err := rs . offsetEf . Write ( rs . indexW ) ; err != nil {
return fmt . Errorf ( "writing elias fano for offsets: %w" , err )
2021-09-18 21:59:27 +00:00
}
}
2021-10-16 09:43:41 +00:00
// Write out the size of golomb rice params
binary . BigEndian . PutUint16 ( rs . numBuf [ : ] , uint16 ( len ( rs . golombRice ) ) )
if _ , err := rs . indexW . Write ( rs . numBuf [ : 4 ] ) ; err != nil {
return fmt . Errorf ( "writing golomb rice param size: %w" , err )
}
// Write out golomb rice
if err := rs . gr . Write ( rs . indexW ) ; err != nil {
return fmt . Errorf ( "writing golomb rice: %w" , err )
2021-09-20 11:14:49 +00:00
}
2021-10-16 09:43:41 +00:00
// Write out elias fano
if err := rs . ef . Write ( rs . indexW ) ; err != nil {
return fmt . Errorf ( "writing elias fano: %w" , err )
2021-09-20 11:14:49 +00:00
}
2022-01-07 07:37:27 +00:00
_ = rs . indexW . Flush ( )
_ = rs . indexF . Sync ( )
_ = rs . indexF . Close ( )
if err := os . Rename ( tmpIdxFilePath , rs . indexFile ) ; err != nil {
return err
}
2021-10-16 09:43:41 +00:00
return nil
2021-09-20 11:14:49 +00:00
}
// Stats returns the size of golomb rice encoding and ellias fano encoding
func ( rs RecSplit ) Stats ( ) ( int , int ) {
return len ( rs . gr . Data ( ) ) , len ( rs . ef . Data ( ) )
}
// Collision returns true if there was a collision detected during mapping of keys
// into 64-bit values
// RecSplit needs to be reset, re-populated with keys, and rebuilt
func ( rs RecSplit ) Collision ( ) bool {
return rs . collision
2021-09-18 21:59:27 +00:00
}