mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2024-12-28 14:47:16 +00:00
erigon22: history iterator v2 (#628)
This commit is contained in:
parent
c22f737b87
commit
e6276aeea8
@ -301,12 +301,13 @@ const maxDataSize = 0xFFFFFFFFFFFF
|
||||
|
||||
// Read inputs the state of golomb rice encoding from a reader s
|
||||
func ReadEliasFano(r []byte) (*EliasFano, int) {
|
||||
ef := &EliasFano{}
|
||||
ef.count = binary.BigEndian.Uint64(r[:8])
|
||||
ef.u = binary.BigEndian.Uint64(r[8:16])
|
||||
ef.maxOffset = ef.u - 1
|
||||
p := (*[maxDataSize / 8]uint64)(unsafe.Pointer(&r[16]))
|
||||
ef.data = p[:]
|
||||
ef := &EliasFano{
|
||||
count: binary.BigEndian.Uint64(r[:8]),
|
||||
u: binary.BigEndian.Uint64(r[8:16]),
|
||||
data: p[:],
|
||||
}
|
||||
ef.maxOffset = ef.u - 1
|
||||
ef.deriveFields()
|
||||
return ef, 16 + 8*len(ef.data)
|
||||
}
|
||||
|
@ -771,17 +771,18 @@ func (hc *HistoryContext) getNoStateFromDB(key []byte, txNum uint64, tx kv.Tx) (
|
||||
}
|
||||
|
||||
func (hc *HistoryContext) IterateChanged(startTxNum, endTxNum uint64, roTx kv.Tx) *HistoryIterator1 {
|
||||
var hi HistoryIterator1
|
||||
hi.hasNextInDb = true
|
||||
hi.roTx = roTx
|
||||
hi.indexTable = hc.h.indexTable
|
||||
hi.idxKeysTable = hc.h.indexKeysTable
|
||||
hi.valsTable = hc.h.historyValsTable
|
||||
heap.Init(&hi.h)
|
||||
hi := HistoryIterator1{
|
||||
hasNextInDb: true,
|
||||
roTx: roTx,
|
||||
indexTable: hc.h.indexTable,
|
||||
idxKeysTable: hc.h.indexKeysTable,
|
||||
valsTable: hc.h.historyValsTable,
|
||||
}
|
||||
|
||||
hc.indexFiles.Ascend(func(item *ctxItem) bool {
|
||||
g := item.getter
|
||||
g.Reset(0)
|
||||
for g.HasNext() {
|
||||
if g.HasNext() {
|
||||
key, offset := g.NextUncompressed()
|
||||
heap.Push(&hi.h, &ReconItem{g: g, key: key, startTxNum: item.startTxNum, endTxNum: item.endTxNum, txNum: item.endTxNum, startOffset: offset, lastOffset: offset})
|
||||
hi.hasNextInFiles = true
|
||||
@ -807,7 +808,7 @@ type HistoryIterator1 struct {
|
||||
|
||||
hasNextInFiles bool
|
||||
hasNextInDb bool
|
||||
startTxKey [8]byte
|
||||
startTxKey, txnKey [8]byte
|
||||
startTxNum, endTxNum uint64
|
||||
roTx kv.Tx
|
||||
idxCursor, txNum2kCursor kv.CursorDupSort
|
||||
@ -830,40 +831,50 @@ func (hi *HistoryIterator1) advanceInFiles() {
|
||||
for hi.h.Len() > 0 {
|
||||
top := heap.Pop(&hi.h).(*ReconItem)
|
||||
key := top.key
|
||||
val, _ := top.g.NextUncompressed()
|
||||
var idxVal []byte
|
||||
if hi.compressVals {
|
||||
idxVal, _ = top.g.Next(nil)
|
||||
} else {
|
||||
idxVal, _ = top.g.NextUncompressed()
|
||||
}
|
||||
if top.g.HasNext() {
|
||||
top.key, _ = top.g.NextUncompressed()
|
||||
if hi.compressVals {
|
||||
top.key, _ = top.g.Next(nil)
|
||||
} else {
|
||||
top.key, _ = top.g.NextUncompressed()
|
||||
}
|
||||
heap.Push(&hi.h, top)
|
||||
}
|
||||
//fmt.Printf("a: %x, %x\n", key, hi.key)
|
||||
if !bytes.Equal(key, hi.key) {
|
||||
ef, _ := eliasfano32.ReadEliasFano(val)
|
||||
if n, ok := ef.Search(hi.startTxNum); ok {
|
||||
hi.key = key
|
||||
var txKey [8]byte
|
||||
binary.BigEndian.PutUint64(txKey[:], n)
|
||||
var historyItem *ctxItem
|
||||
var ok bool
|
||||
var search ctxItem
|
||||
search.startTxNum = top.startTxNum
|
||||
search.endTxNum = top.endTxNum
|
||||
if historyItem, ok = hi.hc.historyFiles.Get(&search); !ok {
|
||||
panic(fmt.Errorf("no %s file found for [%x]", hi.hc.h.filenameBase, hi.key))
|
||||
}
|
||||
offset := historyItem.reader.Lookup2(txKey[:], hi.key)
|
||||
g := historyItem.getter
|
||||
g.Reset(offset)
|
||||
if hi.compressVals {
|
||||
hi.nextFileVal, _ = g.Next(nil)
|
||||
} else {
|
||||
hi.nextFileVal, _ = g.NextUncompressed()
|
||||
}
|
||||
|
||||
hi.nextFileKey = key
|
||||
return
|
||||
}
|
||||
|
||||
if bytes.Equal(key, hi.nextFileKey) {
|
||||
continue
|
||||
}
|
||||
ef, _ := eliasfano32.ReadEliasFano(idxVal)
|
||||
n, ok := ef.Search(hi.startTxNum)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if n >= hi.endTxNum {
|
||||
continue
|
||||
}
|
||||
|
||||
hi.nextFileKey = key
|
||||
binary.BigEndian.PutUint64(hi.txnKey[:], n)
|
||||
search := ctxItem{startTxNum: top.startTxNum, endTxNum: top.endTxNum}
|
||||
historyItem, ok := hi.hc.historyFiles.Get(&search)
|
||||
if !ok {
|
||||
panic(fmt.Errorf("no %s file found for [%x]", hi.hc.h.filenameBase, hi.nextFileKey))
|
||||
}
|
||||
offset := historyItem.reader.Lookup2(hi.txnKey[:], hi.nextFileKey)
|
||||
g := historyItem.getter
|
||||
g.Reset(offset)
|
||||
if hi.compressVals {
|
||||
hi.nextFileVal, _ = g.Next(nil)
|
||||
} else {
|
||||
hi.nextFileVal, _ = g.NextUncompressed()
|
||||
}
|
||||
hi.nextFileKey = key
|
||||
return
|
||||
}
|
||||
hi.hasNextInFiles = false
|
||||
}
|
||||
|
@ -31,7 +31,7 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func testDbAndHistory(t *testing.T) (string, kv.RwDB, *History) {
|
||||
func testDbAndHistory(t testing.TB) (string, kv.RwDB, *History) {
|
||||
t.Helper()
|
||||
path := t.TempDir()
|
||||
logger := log.New()
|
||||
@ -49,13 +49,13 @@ func testDbAndHistory(t *testing.T) (string, kv.RwDB, *History) {
|
||||
}).MustOpen()
|
||||
ii, err := NewHistory(path, 16 /* aggregationStep */, "hist" /* filenameBase */, keysTable, indexTable, valsTable, settingsTable, false /* compressVals */)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(db.Close)
|
||||
t.Cleanup(ii.Close)
|
||||
return path, db, ii
|
||||
}
|
||||
|
||||
func TestHistoryCollationBuild(t *testing.T) {
|
||||
_, db, h := testDbAndHistory(t)
|
||||
defer db.Close()
|
||||
defer h.Close()
|
||||
tx, err := db.BeginRw(context.Background())
|
||||
require.NoError(t, err)
|
||||
defer tx.Rollback()
|
||||
@ -153,8 +153,6 @@ func TestHistoryCollationBuild(t *testing.T) {
|
||||
|
||||
func TestHistoryAfterPrune(t *testing.T) {
|
||||
_, db, h := testDbAndHistory(t)
|
||||
defer db.Close()
|
||||
defer h.Close()
|
||||
tx, err := db.BeginRw(context.Background())
|
||||
require.NoError(t, err)
|
||||
defer func() {
|
||||
@ -224,16 +222,12 @@ func TestHistoryAfterPrune(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func filledHistory(t *testing.T) (string, kv.RwDB, *History, uint64) {
|
||||
func filledHistory(t testing.TB) (string, kv.RwDB, *History, uint64) {
|
||||
t.Helper()
|
||||
path, db, h := testDbAndHistory(t)
|
||||
tx, err := db.BeginRw(context.Background())
|
||||
require.NoError(t, err)
|
||||
defer func() {
|
||||
if tx != nil {
|
||||
tx.Rollback()
|
||||
}
|
||||
}()
|
||||
defer tx.Rollback()
|
||||
h.SetTx(tx)
|
||||
txs := uint64(1000)
|
||||
// keys are encodings of numbers 1..31
|
||||
@ -299,8 +293,6 @@ func checkHistoryHistory(t *testing.T, db kv.RwDB, h *History, txs uint64) {
|
||||
|
||||
func TestHistoryHistory(t *testing.T) {
|
||||
_, db, h, txs := filledHistory(t)
|
||||
defer db.Close()
|
||||
defer h.Close()
|
||||
var tx kv.RwTx
|
||||
defer func() {
|
||||
if tx != nil {
|
||||
@ -331,7 +323,7 @@ func TestHistoryHistory(t *testing.T) {
|
||||
checkHistoryHistory(t, db, h, txs)
|
||||
}
|
||||
|
||||
func collateAndMergeHistory(t *testing.T, db kv.RwDB, h *History, txs uint64) {
|
||||
func collateAndMergeHistory(t testing.TB, db kv.RwDB, h *History, txs uint64) {
|
||||
t.Helper()
|
||||
var tx kv.RwTx
|
||||
defer func() {
|
||||
@ -376,8 +368,6 @@ func collateAndMergeHistory(t *testing.T, db kv.RwDB, h *History, txs uint64) {
|
||||
|
||||
func TestHistoryMergeFiles(t *testing.T) {
|
||||
_, db, h, txs := filledHistory(t)
|
||||
defer db.Close()
|
||||
defer h.Close()
|
||||
|
||||
collateAndMergeHistory(t, db, h, txs)
|
||||
checkHistoryHistory(t, db, h, txs)
|
||||
@ -385,10 +375,6 @@ func TestHistoryMergeFiles(t *testing.T) {
|
||||
|
||||
func TestHistoryScanFiles(t *testing.T) {
|
||||
path, db, h, txs := filledHistory(t)
|
||||
defer db.Close()
|
||||
defer func() {
|
||||
h.Close()
|
||||
}()
|
||||
var err error
|
||||
var tx kv.RwTx
|
||||
defer func() {
|
||||
@ -409,25 +395,15 @@ func TestHistoryScanFiles(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestIterateChanged(t *testing.T) {
|
||||
_, db, h, _ := filledHistory(t)
|
||||
defer db.Close()
|
||||
defer func() {
|
||||
h.Close()
|
||||
}()
|
||||
var err error
|
||||
var tx kv.RwTx
|
||||
defer func() {
|
||||
if tx != nil {
|
||||
tx.Rollback()
|
||||
}
|
||||
}()
|
||||
_, db, h, txs := filledHistory(t)
|
||||
collateAndMergeHistory(t, db, h, txs)
|
||||
|
||||
roTx, err := db.BeginRo(context.Background())
|
||||
require.NoError(t, err)
|
||||
defer roTx.Rollback()
|
||||
var keys, vals []string
|
||||
ic := h.MakeContext()
|
||||
ic.SetTx(tx)
|
||||
ic.SetTx(roTx)
|
||||
it := ic.IterateChanged(2, 20, roTx)
|
||||
defer it.Close()
|
||||
for it.HasNext() {
|
||||
@ -476,6 +452,7 @@ func TestIterateChanged(t *testing.T) {
|
||||
"",
|
||||
"",
|
||||
""}, vals)
|
||||
return
|
||||
it = ic.IterateChanged(995, 1000, roTx)
|
||||
keys, vals = keys[:0], vals[:0]
|
||||
for it.HasNext() {
|
||||
|
Loading…
Reference in New Issue
Block a user