[erigon2] Forward change files (#384)

* Start

* Forward change files

Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local>
Co-authored-by: Alex Sharp <alexsharp@Alexs-MacBook-Pro.local>
This commit is contained in:
ledgerwatch 2022-03-20 16:14:26 +00:00 committed by GitHub
parent 71751426bc
commit 7939c56571
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -22,6 +22,7 @@ import (
"container/heap"
"context"
"encoding/binary"
"errors"
"fmt"
"hash"
"io"
@ -183,14 +184,15 @@ type ChangeFile struct {
step uint64
namebase string
path string
pathTx string
file *os.File
fileTx *os.File
w *bufio.Writer
wTx *bufio.Writer
r *bufio.Reader
numBuf [8]byte
rTx *bufio.Reader
sizeCounter uint64
txPos int64 // Position of the last block iterated upon
txNum uint64
txSize uint64
txNum uint64 // Currently read transaction number
txRemaining uint64 // Remaining number of bytes to read in the current transaction
words []byte // Words pending for the next block record, in the same slice
wordOffsets []int // Offsets of words in the `words` slice
@ -212,6 +214,18 @@ func (cf *ChangeFile) closeFile() error {
}
cf.file = nil
}
if cf.wTx != nil {
if err := cf.wTx.Flush(); err != nil {
return err
}
cf.wTx = nil
}
if cf.fileTx != nil {
if err := cf.fileTx.Close(); err != nil {
return err
}
cf.fileTx = nil
}
return nil
}
@ -224,33 +238,43 @@ func (cf *ChangeFile) openFile(blockNum uint64, write bool) error {
endBlock := startBlock + cf.step - 1
if cf.w == nil {
cf.path = filepath.Join(cf.dir, fmt.Sprintf("%s.%d-%d.chg", cf.namebase, startBlock, endBlock))
cf.pathTx = filepath.Join(cf.dir, fmt.Sprintf("%s.%d-%d.ctx", cf.namebase, startBlock, endBlock))
var err error
if write {
if cf.file, err = os.OpenFile(cf.path, os.O_RDWR|os.O_CREATE, 0755); err != nil {
return err
}
if cf.fileTx, err = os.OpenFile(cf.pathTx, os.O_RDWR|os.O_CREATE, 0755); err != nil {
return err
}
} else {
if cf.file, err = os.Open(cf.path); err != nil {
return err
}
}
if cf.txPos, err = cf.file.Seek(0, 2 /* relative to the end of the file */); err != nil {
return err
if cf.fileTx, err = os.Open(cf.pathTx); err != nil {
return err
}
}
if write {
cf.w = bufio.NewWriter(cf.file)
cf.wTx = bufio.NewWriter(cf.fileTx)
}
cf.r = bufio.NewReader(cf.file)
cf.rTx = bufio.NewReader(cf.fileTx)
}
return nil
}
func (cf *ChangeFile) rewind() error {
var err error
if cf.txPos, err = cf.file.Seek(0, 2 /* relative to the end of the file */); err != nil {
if _, err = cf.file.Seek(0, 0); err != nil {
return err
}
cf.r = bufio.NewReader(cf.file)
if _, err = cf.fileTx.Seek(0, 0); err != nil {
return err
}
cf.rTx = bufio.NewReader(cf.fileTx)
return nil
}
@ -260,12 +284,13 @@ func (cf *ChangeFile) add(word []byte) {
}
func (cf *ChangeFile) finish(txNum uint64) error {
var numBuf [10]byte
// Write out words
lastOffset := 0
for _, offset := range cf.wordOffsets {
word := cf.words[lastOffset:offset]
n := binary.PutUvarint(cf.numBuf[:], uint64(len(word)))
if _, err := cf.w.Write(cf.numBuf[:n]); err != nil {
n := binary.PutUvarint(numBuf[:], uint64(len(word)))
if _, err := cf.w.Write(numBuf[:n]); err != nil {
return err
}
if len(word) > 0 {
@ -278,13 +303,12 @@ func (cf *ChangeFile) finish(txNum uint64) error {
}
cf.words = cf.words[:0]
cf.wordOffsets = cf.wordOffsets[:0]
// Write out tx number and then size of changes in this block
binary.BigEndian.PutUint64(cf.numBuf[:], txNum)
if _, err := cf.w.Write(cf.numBuf[:]); err != nil {
n := binary.PutUvarint(numBuf[:], txNum)
if _, err := cf.wTx.Write(numBuf[:n]); err != nil {
return err
}
binary.BigEndian.PutUint64(cf.numBuf[:], cf.sizeCounter)
if _, err := cf.w.Write(cf.numBuf[:]); err != nil {
n = binary.PutUvarint(numBuf[:], cf.sizeCounter)
if _, err := cf.wTx.Write(numBuf[:n]); err != nil {
return err
}
cf.sizeCounter = 0
@ -293,30 +317,17 @@ func (cf *ChangeFile) finish(txNum uint64) error {
// prevTx positions the reader to the beginning
// of the transaction
func (cf *ChangeFile) prevTx() (bool, error) {
if cf.txPos == 0 {
return false, nil
}
// Move back 16 bytes to read tx number and tx size
pos, err := cf.file.Seek(cf.txPos-16, 0 /* relative to the beginning */)
if err != nil {
func (cf *ChangeFile) nextTx() (bool, error) {
var err error
if cf.txNum, err = binary.ReadUvarint(cf.rTx); err != nil {
if errors.Is(err, io.EOF) {
return false, nil
}
return false, err
}
cf.r.Reset(cf.file)
if _, err = io.ReadFull(cf.r, cf.numBuf[:8]); err != nil {
if cf.txRemaining, err = binary.ReadUvarint(cf.rTx); err != nil {
return false, err
}
cf.txNum = binary.BigEndian.Uint64(cf.numBuf[:])
if _, err = io.ReadFull(cf.r, cf.numBuf[:8]); err != nil {
return false, err
}
cf.txSize = binary.BigEndian.Uint64(cf.numBuf[:])
cf.txRemaining = cf.txSize
cf.txPos, err = cf.file.Seek(pos-int64(cf.txSize), 0)
if err != nil {
return false, err
}
cf.r.Reset(cf.file)
return true, nil
}
@ -338,13 +349,20 @@ func (cf *ChangeFile) nextWord(wordBuf []byte) ([]byte, bool, error) {
if _, err = io.ReadFull(cf.r, buf[len(wordBuf):]); err != nil {
return wordBuf, false, fmt.Errorf("read word (%d %d): %w", ws, len(buf[len(wordBuf):]), err)
}
n := binary.PutUvarint(cf.numBuf[:], ws)
var numBuf [10]byte
n := binary.PutUvarint(numBuf[:], ws)
cf.txRemaining -= uint64(n) + ws
return buf, true, nil
}
func (cf *ChangeFile) deleteFile() error {
return os.Remove(cf.path)
if err := os.Remove(cf.path); err != nil {
return err
}
if err := os.Remove(cf.pathTx); err != nil {
return err
}
return nil
}
type Changes struct {
@ -442,18 +460,18 @@ func (c *Changes) finish(txNum uint64) error {
return nil
}
func (c *Changes) prevTx() (bool, uint64, error) {
bkeys, err := c.keys.prevTx()
func (c *Changes) nextTx() (bool, uint64, error) {
bkeys, err := c.keys.nextTx()
if err != nil {
return false, 0, err
}
var bbefore, bafter bool
if c.beforeOn {
if bbefore, err = c.before.prevTx(); err != nil {
if bbefore, err = c.before.nextTx(); err != nil {
return false, 0, err
}
}
if bafter, err = c.after.prevTx(); err != nil {
if bafter, err = c.after.nextTx(); err != nil {
return false, 0, err
}
if c.beforeOn && bkeys != bbefore {
@ -682,7 +700,7 @@ func (c *Changes) produceChangeSets(blockFrom, blockTo uint64, historyType, bitm
return nil, nil, nil, nil, fmt.Errorf("produceChangeSets rewind: %w", err)
}
var txKey = make([]byte, 8, 60)
for b, txNum, e = c.prevTx(); b && e == nil; b, txNum, e = c.prevTx() {
for b, txNum, e = c.nextTx(); b && e == nil; b, txNum, e = c.nextTx() {
binary.BigEndian.PutUint64(txKey[:8], txNum)
for key, before, after, b, e = c.nextTriple(key[:0], before[:0], after[:0]); b && e == nil; key, before, after, b, e = c.nextTriple(key[:0], before[:0], after[:0]) {
totalRecords++
@ -787,9 +805,8 @@ func (c *Changes) aggregateToBtree(bt *btree.BTree, prefixLen int, commitments b
var key, before, after []byte
var ai AggregateItem
var prefix []byte
// Note that the following loop iterates over transactions backwards, therefore it does not replace entries in the B-tree,
// but instead just updates their "change count" and the first byte of the value (insertion vs update flag)
for b, _, e = c.prevTx(); b && e == nil; b, _, e = c.prevTx() {
// Note that the following loop iterates over transactions forwards, therefore it replace entries in the B-tree
for b, _, e = c.nextTx(); b && e == nil; b, _, e = c.nextTx() {
// Within each transaction, keys are unique, but they can appear in any order
for key, before, after, b, e = c.nextTriple(key[:0], before[:0], after[:0]); b && e == nil; key, before, after, b, e = c.nextTriple(key[:0], before[:0], after[:0]) {
if prefixLen > 0 && !bytes.Equal(prefix, key[:prefixLen]) {
@ -807,12 +824,13 @@ func (c *Changes) aggregateToBtree(bt *btree.BTree, prefixLen int, commitments b
if commitments {
var err error
var mergedVal []byte
if mergedVal, err = commitment.MergeBranches(after, item.v, nil); err != nil {
if mergedVal, err = commitment.MergeBranches(item.v, after, nil); err != nil {
return fmt.Errorf("merge branches: %w", err)
}
//fmt.Printf("aggregateToBtree prefix [%x], [%x]+[%x]=>[%x]\n", commitment.CompactToHex(key), after, item.v, mergedVal)
item.v = mergedVal
}
item.v = common.Copy(after)
item.count++
}
}