This commit is contained in:
alex.sharov 2022-08-22 10:33:14 +07:00
parent db7322ef87
commit 36778a2db3
2 changed files with 121 additions and 0 deletions

View File

@ -501,6 +501,46 @@ func (h *History) prune(step uint64, txFrom, txTo uint64) error {
return nil
}
func (h *History) iterateInDB(step uint64, txFrom, txTo uint64, f func(txNum uint64, k, v []byte) error) error {
historyKeysCursor, err := h.tx.RwCursorDupSort(h.indexKeysTable)
if err != nil {
return fmt.Errorf("create %s history cursor: %w", h.filenameBase, err)
}
defer historyKeysCursor.Close()
var txKey [8]byte
binary.BigEndian.PutUint64(txKey[:], txFrom)
var k, v []byte
idxC, err := h.tx.RwCursorDupSort(h.indexTable)
if err != nil {
return err
}
defer idxC.Close()
valsC, err := h.tx.RwCursor(h.historyValsTable)
if err != nil {
return err
}
defer valsC.Close()
for k, v, err = historyKeysCursor.Seek(txKey[:]); err == nil && k != nil; k, v, err = historyKeysCursor.Next() {
txNum := binary.BigEndian.Uint64(k)
if txNum >= txTo {
break
}
key, txnNumBytes := v[:len(v)-8], v[len(v)-8:]
{
_, vv, err := valsC.SeekExact(txnNumBytes)
if err != nil {
return err
}
if err := f(txNum, key, vv); err != nil {
return err
}
}
}
if err != nil {
return fmt.Errorf("iterate over %s history keys: %w", h.filenameBase, err)
}
return nil
}
func (h *History) pruneF(step uint64, txFrom, txTo uint64, f func(txNum uint64, k, v []byte) error) error {
historyKeysCursor, err := h.tx.RwCursorDupSort(h.indexKeysTable)
if err != nil {
@ -560,6 +600,7 @@ type HistoryContext struct {
}
func (h *History) MakeContext() *HistoryContext {
fmt.Printf("a: %d-%d\n", h.InvertedIndex.files.Len(), h.files.Len())
var hc = HistoryContext{h: h}
hc.indexFiles = btree.NewG[*ctxItem](32, ctxItemLess)
h.InvertedIndex.files.Ascend(func(item *filesItem) bool {

View File

@ -285,3 +285,83 @@ func (hc *HistoryContext) iterateHistoryBeforeTxNum(fromKey, toKey []byte, txNum
hi.advance()
return &hi
}
func (hc *HistoryContext) Iterate(txNumFrom, txNumTo uint64, f func(txNum uint64, k, v []byte) error) {
fmt.Printf("alex11: %d-%d\n", txNumFrom, txNumTo)
fmt.Printf("alex12: %d\n", hc.historyFiles.Len())
hc.historyFiles.Ascend(func(item *ctxItem) bool {
if item.endTxNum < txNumFrom {
return true
}
if item.startTxNum > txNumTo {
return false
}
for item.getter.HasNext() {
key, offset := item.getter.NextUncompressed()
val, offset2 := item.getter.NextUncompressed()
fmt.Printf("from file!!: %x, %x\n", key, val)
if err := f(0, key, val); err != nil {
panic(err)
}
_, _ = offset2, offset
}
return true
})
if err := hc.h.iterateInDB(1024, txNumFrom, txNumTo, f); err != nil {
panic(err)
}
//hc.indexFiles.Ascend(func(item *ctxItem) bool {
// if item.endTxNum < txNumFrom {
// return true
// }
// if item.startTxNum > txNumTo {
// return false
// }
//
// g := item.getter
// g.Reset(0)
// for g.HasNext() {
// key, offset := g.NextUncompressed()
// val, offset := g.NextUncompressed()
// fmt.Printf("alex: %x, %x\n", key, val)
//
// if !bytes.Equal(hi.key, key) {
// ef, _ := eliasfano32.ReadEliasFano(val)
// if n, ok := ef.Search(txNumFrom); ok {
// hi.key = key
// var txKey [8]byte
// binary.BigEndian.PutUint64(txKey[:], n)
// var historyItem *ctxItem
// var ok bool
// var search ctxItem
// search.startTxNum = top.startTxNum
// search.endTxNum = top.endTxNum
// if historyItem, ok = hi.hc.historyFiles.Get(&search); !ok {
// panic(fmt.Errorf("no %s file found for [%x]", hi.hc.h.filenameBase, hi.key))
// }
// offset := historyItem.reader.Lookup2(txKey[:], hi.key)
// g := historyItem.getter
// g.Reset(offset)
// if hi.compressVals {
// hi.val, _ = g.Next(nil)
// } else {
// hi.val, _ = g.NextUncompressed()
// }
// hi.hasNext = true
// return
// }
// }
//
// //if fromKey == nil || bytes.Compare(key, fromKey) > 0 {
// // heap.Push(&hi.h, &ReconItem{g: g, key: key, startTxNum: item.startTxNum, endTxNum: item.endTxNum, txNum: item.endTxNum, startOffset: offset, lastOffset: offset})
// // break
// //} else {
// // g.SkipUncompressed()
// //}
// }
// hi.total += uint64(item.getter.Size())
// return true
//})
}