From f8060aa75d92f6325e6a1a09a26d927b09fc1102 Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Wed, 7 Sep 2022 14:40:39 +0700 Subject: [PATCH] erigon22: HistoryIterator1 v1 (#626) --- state/history.go | 202 ++++++++++++++++++++++++++++++++++++++++ state/history_test.go | 107 +++++++++++++++++++++ state/inverted_index.go | 4 +- 3 files changed, 311 insertions(+), 2 deletions(-) diff --git a/state/history.go b/state/history.go index c041cc31a..3b6dfd6db 100644 --- a/state/history.go +++ b/state/history.go @@ -18,6 +18,7 @@ package state import ( "bytes" + "container/heap" "context" "encoding/binary" "fmt" @@ -809,3 +810,204 @@ func (hc *HistoryContext) getNoStateFromDB(key []byte, txNum uint64, tx kv.Tx) ( } return nil, false, nil } + +func (hc *HistoryContext) IterateChanged(startTxNum, endTxNum uint64, roTx kv.Tx) *HistoryIterator1 { + var hi HistoryIterator1 + hi.hasNextInDb = true + hi.roTx = roTx + hi.indexTable = hc.h.indexTable + hi.idxKeysTable = hc.h.indexKeysTable + hi.valsTable = hc.h.historyValsTable + heap.Init(&hi.h) + hc.indexFiles.Ascend(func(item *ctxItem) bool { + g := item.getter + g.Reset(0) + for g.HasNext() { + key, offset := g.NextUncompressed() + heap.Push(&hi.h, &ReconItem{g: g, key: key, startTxNum: item.startTxNum, endTxNum: item.endTxNum, txNum: item.endTxNum, startOffset: offset, lastOffset: offset}) + hi.hasNextInFiles = true + } + hi.total += uint64(item.getter.Size()) + return true + }) + hi.hc = hc + hi.compressVals = hc.h.compressVals + hi.startTxNum = startTxNum + hi.endTxNum = endTxNum + binary.BigEndian.PutUint64(hi.startTxKey[:], startTxNum) + hi.advanceInDb() + hi.advanceInFiles() + hi.advance() + return &hi +} + +type HistoryIterator1 struct { + hc *HistoryContext + compressVals bool + total uint64 + + hasNextInFiles bool + hasNextInDb bool + startTxKey [8]byte + startTxNum, endTxNum uint64 + roTx kv.Tx + idxCursor, txNum2kCursor kv.CursorDupSort + indexTable, idxKeysTable, valsTable string + h ReconHeap + + key, nextKey, nextVal, nextFileKey, nextFileVal, nextDbKey, nextDbVal []byte +} + +func (hi *HistoryIterator1) Close() { + if hi.idxCursor != nil { + hi.idxCursor.Close() + } + if hi.txNum2kCursor != nil { + hi.txNum2kCursor.Close() + } +} + +func (hi *HistoryIterator1) advanceInFiles() { + for hi.h.Len() > 0 { + top := heap.Pop(&hi.h).(*ReconItem) + key := top.key + val, _ := top.g.NextUncompressed() + if top.g.HasNext() { + top.key, _ = top.g.NextUncompressed() + heap.Push(&hi.h, top) + } + //fmt.Printf("a: %x, %x\n", key, hi.key) + if !bytes.Equal(key, hi.key) { + ef, _ := eliasfano32.ReadEliasFano(val) + if n, ok := ef.Search(hi.startTxNum); ok { + hi.key = key + var txKey [8]byte + binary.BigEndian.PutUint64(txKey[:], n) + var historyItem *ctxItem + var ok bool + var search ctxItem + search.startTxNum = top.startTxNum + search.endTxNum = top.endTxNum + if historyItem, ok = hi.hc.historyFiles.Get(&search); !ok { + panic(fmt.Errorf("no %s file found for [%x]", hi.hc.h.filenameBase, hi.key)) + } + offset := historyItem.reader.Lookup2(txKey[:], hi.key) + g := historyItem.getter + g.Reset(offset) + if hi.compressVals { + hi.nextFileVal, _ = g.Next(nil) + } else { + hi.nextFileVal, _ = g.NextUncompressed() + } + + hi.nextFileKey = key + return + } + + } + } + hi.hasNextInFiles = false +} + +func (hi *HistoryIterator1) advanceInDb() { + var k []byte + var err error + if hi.idxCursor == nil { + if hi.idxCursor, err = hi.roTx.CursorDupSort(hi.indexTable); err != nil { + // TODO pass error properly around + panic(err) + } + if hi.txNum2kCursor, err = hi.roTx.CursorDupSort(hi.idxKeysTable); err != nil { + panic(err) + } + + if k, _, err = hi.idxCursor.First(); err != nil { + // TODO pass error properly around + panic(err) + } + } else { + if k, _, err = hi.idxCursor.NextNoDup(); err != nil { + panic(err) + } + } + for ; k != nil; k, _, err = hi.idxCursor.NextNoDup() { + if err != nil { + panic(err) + } + foundTxNumVal, err := hi.idxCursor.SeekBothRange(k, hi.startTxKey[:]) + if err != nil { + panic(err) + } + if foundTxNumVal == nil { + continue + } + txNum := binary.BigEndian.Uint64(foundTxNumVal) + if txNum >= hi.endTxNum { + continue + } + hi.nextDbKey = append(hi.nextDbKey[:0], k...) + vn, err := hi.txNum2kCursor.SeekBothRange(foundTxNumVal, k) + if err != nil { + panic(err) + } + valNum := binary.BigEndian.Uint64(vn[len(vn)-8:]) + if valNum == 0 { + // This is special valNum == 0, which is empty value + hi.nextDbVal = hi.nextDbVal[:0] + return + } + v, err := hi.roTx.GetOne(hi.valsTable, vn[len(vn)-8:]) + if err != nil { + panic(err) + } + hi.nextDbVal = append(hi.nextDbVal[:0], v...) + return + } + hi.idxCursor.Close() + hi.idxCursor = nil + hi.hasNextInDb = false +} + +func (hi *HistoryIterator1) advance() { + if hi.hasNextInFiles { + if hi.hasNextInDb { + c := bytes.Compare(hi.nextFileKey, hi.nextDbKey) + if c < 0 { + hi.nextKey = append(hi.nextKey[:0], hi.nextFileKey...) + hi.nextVal = append(hi.nextVal[:0], hi.nextFileVal...) + hi.advanceInFiles() + } else if c > 0 { + hi.nextKey = append(hi.nextKey[:0], hi.nextDbKey...) + hi.nextVal = append(hi.nextVal[:0], hi.nextDbVal...) + hi.advanceInDb() + } else { + hi.nextKey = append(hi.nextKey[:0], hi.nextFileKey...) + hi.nextVal = append(hi.nextVal[:0], hi.nextFileVal...) + hi.advanceInDb() + hi.advanceInFiles() + } + } else { + hi.nextKey = append(hi.nextKey[:0], hi.nextFileKey...) + hi.nextVal = append(hi.nextVal[:0], hi.nextFileVal...) + hi.advanceInFiles() + } + } else if hi.hasNextInDb { + hi.nextKey = append(hi.nextKey[:0], hi.nextDbKey...) + hi.nextVal = append(hi.nextVal[:0], hi.nextDbVal...) + hi.advanceInDb() + } else { + hi.nextKey = nil + hi.nextVal = nil + } +} + +func (hi *HistoryIterator1) HasNext() bool { + return hi.hasNextInFiles || hi.hasNextInDb || hi.nextKey != nil +} + +func (hi *HistoryIterator1) Next(keyBuf, valBuf []byte) ([]byte, []byte) { + k := append(keyBuf, hi.nextKey...) + v := append(valBuf, hi.nextVal...) + hi.advance() + return k, v +} diff --git a/state/history_test.go b/state/history_test.go index 4ca645c87..7913cce2a 100644 --- a/state/history_test.go +++ b/state/history_test.go @@ -248,6 +248,8 @@ func filledHistory(t *testing.T) (string, kv.RwDB, *History, uint64) { var v [8]byte binary.BigEndian.PutUint64(k[:], keyNum) binary.BigEndian.PutUint64(v[:], valNum) + k[0] = 1 //mark key to simplify debug + v[0] = 255 //mark value to simplify debug err = h.AddPrevValue(k[:], nil, prevVal[keyNum]) require.NoError(t, err) prevVal[keyNum] = v[:] @@ -280,6 +282,7 @@ func checkHistoryHistory(t *testing.T, db kv.RwDB, h *History, txs uint64) { //fmt.Printf("label=%s\n", label) binary.BigEndian.PutUint64(k[:], keyNum) binary.BigEndian.PutUint64(v[:], valNum) + k[0], v[0] = 0x01, 0xff val, ok, err := hc.GetNoState(k[:], txNum+1) //require.Equal(t, ok, txNum < 976) if ok { @@ -404,3 +407,107 @@ func TestHistoryScanFiles(t *testing.T) { // Check the history checkHistoryHistory(t, db, h, txs) } + +func TestIterateChanged(t *testing.T) { + _, db, h, _ := filledHistory(t) + defer db.Close() + defer func() { + h.Close() + }() + var err error + var tx kv.RwTx + defer func() { + if tx != nil { + tx.Rollback() + } + }() + + roTx, err := db.BeginRo(context.Background()) + require.NoError(t, err) + defer func() { + roTx.Rollback() + }() + var keys, vals []string + ic := h.MakeContext() + ic.SetTx(tx) + it := ic.IterateChanged(2, 20, roTx) + defer func() { + it.Close() + }() + for it.HasNext() { + k, v := it.Next(nil, nil) + keys = append(keys, fmt.Sprintf("%x", k)) + vals = append(vals, fmt.Sprintf("%x", v)) + } + it.Close() + require.Equal(t, []string{ + "0100000000000001", + "0100000000000002", + "0100000000000003", + "0100000000000004", + "0100000000000005", + "0100000000000006", + "0100000000000007", + "0100000000000008", + "0100000000000009", + "010000000000000a", + "010000000000000b", + "010000000000000c", + "010000000000000d", + "010000000000000e", + "010000000000000f", + "0100000000000010", + "0100000000000011", + "0100000000000012", + "0100000000000013"}, keys) + require.Equal(t, []string{ + "ff00000000000001", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + ""}, vals) + it = ic.IterateChanged(995, 1000, roTx) + keys, vals = keys[:0], vals[:0] + for it.HasNext() { + k, v := it.Next(nil, nil) + keys = append(keys, fmt.Sprintf("%x", k)) + vals = append(vals, fmt.Sprintf("%x", v)) + } + it.Close() + require.Equal(t, []string{ + "0100000000000001", + "0100000000000002", + "0100000000000003", + "0100000000000004", + "0100000000000005", + "0100000000000006", + "0100000000000009", + "010000000000000c", + "010000000000001b", + }, keys) + + require.Equal(t, []string{ + "ff000000000003e2", + "ff000000000001f1", + "ff0000000000014b", + "ff000000000000f8", + "ff000000000000c6", + "ff000000000000a5", + "ff0000000000006e", + "ff00000000000052", + "ff00000000000024"}, vals) +} diff --git a/state/inverted_index.go b/state/inverted_index.go index 35d8622f0..b7b4cd76e 100644 --- a/state/inverted_index.go +++ b/state/inverted_index.go @@ -40,8 +40,8 @@ import ( ) type InvertedIndex struct { - indexKeysTable string // txnNum_u64 -> key - indexTable string // indexKey_userDefined -> txnNum_u64 , Needs to be table with DupSort + indexKeysTable string // txnNum_u64 -> key (k+auto_increment) + indexTable string // k -> txnNum_u64 , Needs to be table with DupSort dir string // Directory where static files are created aggregationStep uint64