diff --git a/aggregator/aggregator.go b/aggregator/aggregator.go index a7e99732f..64900a73c 100644 --- a/aggregator/aggregator.go +++ b/aggregator/aggregator.go @@ -22,6 +22,7 @@ import ( "container/heap" "context" "encoding/binary" + "errors" "fmt" "hash" "io" @@ -183,14 +184,15 @@ type ChangeFile struct { step uint64 namebase string path string + pathTx string file *os.File + fileTx *os.File w *bufio.Writer + wTx *bufio.Writer r *bufio.Reader - numBuf [8]byte + rTx *bufio.Reader sizeCounter uint64 - txPos int64 // Position of the last block iterated upon - txNum uint64 - txSize uint64 + txNum uint64 // Currently read transaction number txRemaining uint64 // Remaining number of bytes to read in the current transaction words []byte // Words pending for the next block record, in the same slice wordOffsets []int // Offsets of words in the `words` slice @@ -212,6 +214,18 @@ func (cf *ChangeFile) closeFile() error { } cf.file = nil } + if cf.wTx != nil { + if err := cf.wTx.Flush(); err != nil { + return err + } + cf.wTx = nil + } + if cf.fileTx != nil { + if err := cf.fileTx.Close(); err != nil { + return err + } + cf.fileTx = nil + } return nil } @@ -224,33 +238,43 @@ func (cf *ChangeFile) openFile(blockNum uint64, write bool) error { endBlock := startBlock + cf.step - 1 if cf.w == nil { cf.path = filepath.Join(cf.dir, fmt.Sprintf("%s.%d-%d.chg", cf.namebase, startBlock, endBlock)) + cf.pathTx = filepath.Join(cf.dir, fmt.Sprintf("%s.%d-%d.ctx", cf.namebase, startBlock, endBlock)) var err error if write { if cf.file, err = os.OpenFile(cf.path, os.O_RDWR|os.O_CREATE, 0755); err != nil { return err } + if cf.fileTx, err = os.OpenFile(cf.pathTx, os.O_RDWR|os.O_CREATE, 0755); err != nil { + return err + } } else { if cf.file, err = os.Open(cf.path); err != nil { return err } - } - if cf.txPos, err = cf.file.Seek(0, 2 /* relative to the end of the file */); err != nil { - return err + if cf.fileTx, err = os.Open(cf.pathTx); err != nil { + return err + } } if write { cf.w = bufio.NewWriter(cf.file) + cf.wTx = bufio.NewWriter(cf.fileTx) } cf.r = bufio.NewReader(cf.file) + cf.rTx = bufio.NewReader(cf.fileTx) } return nil } func (cf *ChangeFile) rewind() error { var err error - if cf.txPos, err = cf.file.Seek(0, 2 /* relative to the end of the file */); err != nil { + if _, err = cf.file.Seek(0, 0); err != nil { return err } cf.r = bufio.NewReader(cf.file) + if _, err = cf.fileTx.Seek(0, 0); err != nil { + return err + } + cf.rTx = bufio.NewReader(cf.fileTx) return nil } @@ -260,12 +284,13 @@ func (cf *ChangeFile) add(word []byte) { } func (cf *ChangeFile) finish(txNum uint64) error { + var numBuf [10]byte // Write out words lastOffset := 0 for _, offset := range cf.wordOffsets { word := cf.words[lastOffset:offset] - n := binary.PutUvarint(cf.numBuf[:], uint64(len(word))) - if _, err := cf.w.Write(cf.numBuf[:n]); err != nil { + n := binary.PutUvarint(numBuf[:], uint64(len(word))) + if _, err := cf.w.Write(numBuf[:n]); err != nil { return err } if len(word) > 0 { @@ -278,13 +303,12 @@ func (cf *ChangeFile) finish(txNum uint64) error { } cf.words = cf.words[:0] cf.wordOffsets = cf.wordOffsets[:0] - // Write out tx number and then size of changes in this block - binary.BigEndian.PutUint64(cf.numBuf[:], txNum) - if _, err := cf.w.Write(cf.numBuf[:]); err != nil { + n := binary.PutUvarint(numBuf[:], txNum) + if _, err := cf.wTx.Write(numBuf[:n]); err != nil { return err } - binary.BigEndian.PutUint64(cf.numBuf[:], cf.sizeCounter) - if _, err := cf.w.Write(cf.numBuf[:]); err != nil { + n = binary.PutUvarint(numBuf[:], cf.sizeCounter) + if _, err := cf.wTx.Write(numBuf[:n]); err != nil { return err } cf.sizeCounter = 0 @@ -293,30 +317,17 @@ func (cf *ChangeFile) finish(txNum uint64) error { // prevTx positions the reader to the beginning // of the transaction -func (cf *ChangeFile) prevTx() (bool, error) { - if cf.txPos == 0 { - return false, nil - } - // Move back 16 bytes to read tx number and tx size - pos, err := cf.file.Seek(cf.txPos-16, 0 /* relative to the beginning */) - if err != nil { +func (cf *ChangeFile) nextTx() (bool, error) { + var err error + if cf.txNum, err = binary.ReadUvarint(cf.rTx); err != nil { + if errors.Is(err, io.EOF) { + return false, nil + } return false, err } - cf.r.Reset(cf.file) - if _, err = io.ReadFull(cf.r, cf.numBuf[:8]); err != nil { + if cf.txRemaining, err = binary.ReadUvarint(cf.rTx); err != nil { return false, err } - cf.txNum = binary.BigEndian.Uint64(cf.numBuf[:]) - if _, err = io.ReadFull(cf.r, cf.numBuf[:8]); err != nil { - return false, err - } - cf.txSize = binary.BigEndian.Uint64(cf.numBuf[:]) - cf.txRemaining = cf.txSize - cf.txPos, err = cf.file.Seek(pos-int64(cf.txSize), 0) - if err != nil { - return false, err - } - cf.r.Reset(cf.file) return true, nil } @@ -338,13 +349,20 @@ func (cf *ChangeFile) nextWord(wordBuf []byte) ([]byte, bool, error) { if _, err = io.ReadFull(cf.r, buf[len(wordBuf):]); err != nil { return wordBuf, false, fmt.Errorf("read word (%d %d): %w", ws, len(buf[len(wordBuf):]), err) } - n := binary.PutUvarint(cf.numBuf[:], ws) + var numBuf [10]byte + n := binary.PutUvarint(numBuf[:], ws) cf.txRemaining -= uint64(n) + ws return buf, true, nil } func (cf *ChangeFile) deleteFile() error { - return os.Remove(cf.path) + if err := os.Remove(cf.path); err != nil { + return err + } + if err := os.Remove(cf.pathTx); err != nil { + return err + } + return nil } type Changes struct { @@ -442,18 +460,18 @@ func (c *Changes) finish(txNum uint64) error { return nil } -func (c *Changes) prevTx() (bool, uint64, error) { - bkeys, err := c.keys.prevTx() +func (c *Changes) nextTx() (bool, uint64, error) { + bkeys, err := c.keys.nextTx() if err != nil { return false, 0, err } var bbefore, bafter bool if c.beforeOn { - if bbefore, err = c.before.prevTx(); err != nil { + if bbefore, err = c.before.nextTx(); err != nil { return false, 0, err } } - if bafter, err = c.after.prevTx(); err != nil { + if bafter, err = c.after.nextTx(); err != nil { return false, 0, err } if c.beforeOn && bkeys != bbefore { @@ -682,7 +700,7 @@ func (c *Changes) produceChangeSets(blockFrom, blockTo uint64, historyType, bitm return nil, nil, nil, nil, fmt.Errorf("produceChangeSets rewind: %w", err) } var txKey = make([]byte, 8, 60) - for b, txNum, e = c.prevTx(); b && e == nil; b, txNum, e = c.prevTx() { + for b, txNum, e = c.nextTx(); b && e == nil; b, txNum, e = c.nextTx() { binary.BigEndian.PutUint64(txKey[:8], txNum) for key, before, after, b, e = c.nextTriple(key[:0], before[:0], after[:0]); b && e == nil; key, before, after, b, e = c.nextTriple(key[:0], before[:0], after[:0]) { totalRecords++ @@ -787,9 +805,8 @@ func (c *Changes) aggregateToBtree(bt *btree.BTree, prefixLen int, commitments b var key, before, after []byte var ai AggregateItem var prefix []byte - // Note that the following loop iterates over transactions backwards, therefore it does not replace entries in the B-tree, - // but instead just updates their "change count" and the first byte of the value (insertion vs update flag) - for b, _, e = c.prevTx(); b && e == nil; b, _, e = c.prevTx() { + // Note that the following loop iterates over transactions forwards, therefore it replace entries in the B-tree + for b, _, e = c.nextTx(); b && e == nil; b, _, e = c.nextTx() { // Within each transaction, keys are unique, but they can appear in any order for key, before, after, b, e = c.nextTriple(key[:0], before[:0], after[:0]); b && e == nil; key, before, after, b, e = c.nextTriple(key[:0], before[:0], after[:0]) { if prefixLen > 0 && !bytes.Equal(prefix, key[:prefixLen]) { @@ -807,12 +824,13 @@ func (c *Changes) aggregateToBtree(bt *btree.BTree, prefixLen int, commitments b if commitments { var err error var mergedVal []byte - if mergedVal, err = commitment.MergeBranches(after, item.v, nil); err != nil { + if mergedVal, err = commitment.MergeBranches(item.v, after, nil); err != nil { return fmt.Errorf("merge branches: %w", err) } //fmt.Printf("aggregateToBtree prefix [%x], [%x]+[%x]=>[%x]\n", commitment.CompactToHex(key), after, item.v, mergedVal) item.v = mergedVal } + item.v = common.Copy(after) item.count++ } }