From e649f7ea91aad8cf5c09ed0a889394a4f71b9f9d Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Wed, 9 Feb 2022 13:22:45 +0700 Subject: [PATCH] Less alloc etl recsplit (#307) * less allocs recsplit * save * save --- compress/decompress.go | 9 +++++---- etl/dataprovider.go | 5 +++-- recsplit/recsplit.go | 26 +++++++++----------------- 3 files changed, 17 insertions(+), 23 deletions(-) diff --git a/compress/decompress.go b/compress/decompress.go index f4feb67c6..2c24c0394 100644 --- a/compress/decompress.go +++ b/compress/decompress.go @@ -116,6 +116,7 @@ type Getter struct { mask byte uncovered []int // Buffer for uncovered portions of the word word []byte + fName string } func (g *Getter) zero() bool { @@ -212,7 +213,7 @@ func (d *Decompressor) Count() int { return int(d.count) } // Getter is not thread-safe, but there can be multiple getters used simultaneously and concurrently // for the same decompressor func (d *Decompressor) MakeGetter() *Getter { - return &Getter{patternDict: &d.dict, posDict: &d.posDict, data: d.data[d.wordsStart:], uncovered: make([]int, 0, 128)} + return &Getter{patternDict: &d.dict, posDict: &d.posDict, data: d.data[d.wordsStart:], uncovered: make([]int, 0, 128), fName: d.compressedFile} } func (g *Getter) Reset(offset uint64) { @@ -247,7 +248,7 @@ func (g *Getter) Next(buf []byte) ([]byte, uint64) { lastPos = intPos pattern := g.nextPattern() if len(g.word) < intPos { - panic("likely .idx is invalid") + panic(fmt.Sprintf("likely .idx is invalid: %s", g.fName)) } copy(g.word[intPos:], pattern) if intPos > lastUncovered { @@ -284,7 +285,7 @@ func (g *Getter) Skip() uint64 { intPos := lastPos + int(pos) - 1 lastPos = intPos if wordLen < intPos { - panic("likely .idx is invalid") + panic(fmt.Sprintf("likely .idx is invalid: %s", g.fName)) } if intPos > lastUncovered { add += uint64(intPos - lastUncovered) @@ -402,7 +403,7 @@ func (g *Getter) MatchPrefix(buf []byte) bool { intPos := lastPos + int(pos) - 1 lastPos = intPos if wordLen < intPos { - panic("likely .idx is invalid") + panic(fmt.Sprintf("likely .idx is invalid: %s", g.fName)) } pattern = g.nextPattern() if strings.HasPrefix(string(pattern), string(buf)) { diff --git a/etl/dataprovider.go b/etl/dataprovider.go index 03dd782fc..237a3a32b 100644 --- a/etl/dataprovider.go +++ b/etl/dataprovider.go @@ -110,10 +110,11 @@ func (p *fileDataProvider) String() string { } func writeToDisk(encoder Encoder, entries []sortableBufferEntry) error { - pair := [2][]byte{} + pair := make([][]byte, 2) + pairInterface := interface{}(pair) // to avoid interface cast on each iteration for i := range entries { pair[0], pair[1] = entries[i].key, entries[i].value - if err := encoder.Encode(pair); err != nil { + if err := encoder.Encode(pairInterface); err != nil { return fmt.Errorf("error writing entries to disk: %w", err) } } diff --git a/recsplit/recsplit.go b/recsplit/recsplit.go index f9e24487f..8f6cd0ec9 100644 --- a/recsplit/recsplit.go +++ b/recsplit/recsplit.go @@ -24,9 +24,7 @@ import ( "math" "math/bits" "os" - "path/filepath" - "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/etl" "github.com/ledgerwatch/erigon-lib/recsplit/eliasfano16" "github.com/ledgerwatch/erigon-lib/recsplit/eliasfano32" @@ -97,6 +95,7 @@ type RecSplit struct { indexW *bufio.Writer bytesPerRec int numBuf [8]byte + bucketKeyBuf [16]byte trace bool prevOffset uint64 // Previously added offset (for calculating minDelta for Elias Fano encoding of "enum -> offset" index) minDelta uint64 // minDelta for Elias Fano encoding of "enum -> offset" index @@ -281,11 +280,9 @@ func (rs *RecSplit) AddKey(key []byte, offset uint64) error { rs.hasher.Reset() rs.hasher.Write(key) //nolint:errcheck hi, lo := rs.hasher.Sum128() - var bucketKey [16]byte - binary.BigEndian.PutUint64(bucketKey[:], remap(hi, rs.bucketCount)) - binary.BigEndian.PutUint64(bucketKey[8:], lo) - var offsetVal [8]byte - binary.BigEndian.PutUint64(offsetVal[:], offset) + binary.BigEndian.PutUint64(rs.bucketKeyBuf[:], remap(hi, rs.bucketCount)) + binary.BigEndian.PutUint64(rs.bucketKeyBuf[8:], lo) + binary.BigEndian.PutUint64(rs.numBuf[:], offset) if offset > rs.maxOffset { rs.maxOffset = offset } @@ -297,16 +294,15 @@ func (rs *RecSplit) AddKey(key []byte, offset uint64) error { } if rs.enums { - if err := rs.offsetCollector.Collect(offsetVal[:], nil); err != nil { + if err := rs.offsetCollector.Collect(rs.numBuf[:], nil); err != nil { return err } - var keyIdx [8]byte - binary.BigEndian.PutUint64(keyIdx[:], rs.keysAdded) - if err := rs.bucketCollector.Collect(bucketKey[:], keyIdx[:]); err != nil { + binary.BigEndian.PutUint64(rs.numBuf[:], rs.keysAdded) + if err := rs.bucketCollector.Collect(rs.bucketKeyBuf[:], rs.numBuf[:]); err != nil { return err } } else { - if err := rs.bucketCollector.Collect(bucketKey[:], offsetVal[:]); err != nil { + if err := rs.bucketCollector.Collect(rs.bucketKeyBuf[:], rs.numBuf[:]); err != nil { return err } } @@ -495,9 +491,7 @@ func (rs *RecSplit) loadFuncOffset(k, _ []byte, _ etl.CurrentTableReader, _ etl. // Build has to be called after all the keys have been added, and it initiates the process // of building the perfect hash function and writing index into a file func (rs *RecSplit) Build() error { - _, fileName := filepath.Split(rs.indexFile) - tmpIdxFilePath := filepath.Join(rs.tmpDir, fileName) - common.MustExist(rs.tmpDir) + tmpIdxFilePath := rs.indexFile + ".tmp" if rs.built { return fmt.Errorf("already built") @@ -622,8 +616,6 @@ func (rs *RecSplit) Build() error { _ = rs.indexW.Flush() _ = rs.indexF.Sync() _ = rs.indexF.Close() - dir, _ := filepath.Split(rs.indexFile) - common.MustExist(dir) if err := os.Rename(tmpIdxFilePath, rs.indexFile); err != nil { return err }