mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2024-12-28 14:47:16 +00:00
domain: files generic btree
This commit is contained in:
parent
5821ae7bbc
commit
ebea2863c1
@ -57,6 +57,13 @@ type filesItem struct {
|
||||
readerMerge *recsplit.IndexReader // index reader for the background merge thread
|
||||
}
|
||||
|
||||
func filesItemLess(i, j *filesItem) bool {
|
||||
if i.endTxNum == j.endTxNum {
|
||||
return i.startTxNum > j.startTxNum
|
||||
}
|
||||
return i.endTxNum < j.endTxNum
|
||||
}
|
||||
|
||||
func (i *filesItem) Less(than btree.Item) bool {
|
||||
if i.endTxNum == than.(*filesItem).endTxNum {
|
||||
return i.startTxNum > than.(*filesItem).startTxNum
|
||||
@ -123,8 +130,8 @@ type Domain struct {
|
||||
indexTable string // Needs to be table with DupSort
|
||||
tx kv.RwTx
|
||||
txNum uint64
|
||||
files [NumberOfTypes]*btree.BTree // Static files pertaining to this domain, items are of type `filesItem`
|
||||
prefixLen int // Number of bytes in the keys that can be used for prefix iteration
|
||||
files [NumberOfTypes]*btree.BTreeG[*filesItem] // Static files pertaining to this domain, items are of type `filesItem`
|
||||
prefixLen int // Number of bytes in the keys that can be used for prefix iteration
|
||||
compressVals bool
|
||||
stats DomainStats
|
||||
}
|
||||
@ -160,7 +167,7 @@ func NewDomain(
|
||||
compressVals: compressVals,
|
||||
}
|
||||
for fType := FileType(0); fType < NumberOfTypes; fType++ {
|
||||
d.files[fType] = btree.New(32)
|
||||
d.files[fType] = btree.NewG[*filesItem](32, filesItemLess)
|
||||
}
|
||||
d.scanStateFiles(files)
|
||||
for fType := FileType(0); fType < NumberOfTypes; fType++ {
|
||||
@ -212,8 +219,7 @@ func (d *Domain) scanStateFiles(files []fs.DirEntry) {
|
||||
}
|
||||
var item = &filesItem{startTxNum: startTxNum * d.aggregationStep, endTxNum: endTxNum * d.aggregationStep}
|
||||
var foundI *filesItem
|
||||
d.files[fType].AscendGreaterOrEqual(&filesItem{startTxNum: endTxNum * d.aggregationStep, endTxNum: endTxNum * d.aggregationStep}, func(i btree.Item) bool {
|
||||
it := i.(*filesItem)
|
||||
d.files[fType].AscendGreaterOrEqual(&filesItem{startTxNum: endTxNum * d.aggregationStep, endTxNum: endTxNum * d.aggregationStep}, func(it *filesItem) bool {
|
||||
if it.endTxNum == endTxNum {
|
||||
foundI = it
|
||||
}
|
||||
@ -229,8 +235,7 @@ func (d *Domain) scanStateFiles(files []fs.DirEntry) {
|
||||
func (d *Domain) openFiles(fType FileType) error {
|
||||
var err error
|
||||
var totalKeys uint64
|
||||
d.files[fType].Ascend(func(i btree.Item) bool {
|
||||
item := i.(*filesItem)
|
||||
d.files[fType].Ascend(func(item *filesItem) bool {
|
||||
datPath := filepath.Join(d.dir, fmt.Sprintf("%s-%s.%d-%d.dat", d.filenameBase, fType.String(), item.startTxNum/d.aggregationStep, item.endTxNum/d.aggregationStep))
|
||||
if item.decompressor, err = compress.NewDecompressor(datPath); err != nil {
|
||||
return false
|
||||
@ -253,8 +258,7 @@ func (d *Domain) openFiles(fType FileType) error {
|
||||
}
|
||||
|
||||
func (d *Domain) closeFiles(fType FileType) {
|
||||
d.files[fType].Ascend(func(i btree.Item) bool {
|
||||
item := i.(*filesItem)
|
||||
d.files[fType].Ascend(func(item *filesItem) bool {
|
||||
if item.decompressor != nil {
|
||||
item.decompressor.Close()
|
||||
}
|
||||
@ -473,8 +477,7 @@ func (d *Domain) IteratePrefix(prefix []byte, it func(k, v []byte)) error {
|
||||
}
|
||||
heap.Push(&cp, &CursorItem{t: DB_CURSOR, key: common.Copy(k), val: common.Copy(v), c: keysCursor, endTxNum: txNum})
|
||||
}
|
||||
d.files[Values].Ascend(func(i btree.Item) bool {
|
||||
item := i.(*filesItem)
|
||||
d.files[Values].Ascend(func(item *filesItem) bool {
|
||||
if item.index.Empty() {
|
||||
return true
|
||||
}
|
||||
@ -988,8 +991,7 @@ func (d *Domain) prune(step uint64, txFrom, txTo uint64) error {
|
||||
func (d *Domain) readFromFiles(fType FileType, filekey []byte) ([]byte, bool) {
|
||||
var val []byte
|
||||
var found bool
|
||||
d.files[fType].Descend(func(i btree.Item) bool {
|
||||
item := i.(*filesItem)
|
||||
d.files[fType].Descend(func(item *filesItem) bool {
|
||||
if item.index.Empty() {
|
||||
return true
|
||||
}
|
||||
@ -1021,12 +1023,11 @@ func (d *Domain) historyBeforeTxNum(key []byte, txNum uint64, roTx kv.Tx) ([]byt
|
||||
var found bool
|
||||
var anyItem bool // Whether any filesItem has been looked at in the loop below
|
||||
var topState *filesItem
|
||||
d.files[Values].AscendGreaterOrEqual(&search, func(i btree.Item) bool {
|
||||
topState = i.(*filesItem)
|
||||
d.files[Values].AscendGreaterOrEqual(&search, func(i *filesItem) bool {
|
||||
topState = i
|
||||
return false
|
||||
})
|
||||
d.files[EfHistory].AscendGreaterOrEqual(&search, func(i btree.Item) bool {
|
||||
item := i.(*filesItem)
|
||||
d.files[EfHistory].AscendGreaterOrEqual(&search, func(item *filesItem) bool {
|
||||
anyItem = true
|
||||
offset := item.indexReader.Lookup(key)
|
||||
g := item.getter
|
||||
@ -1053,8 +1054,7 @@ func (d *Domain) historyBeforeTxNum(key []byte, txNum uint64, roTx kv.Tx) ([]byt
|
||||
if anyItem {
|
||||
// If there were no changes but there were history files, the value can be obtained from value files
|
||||
var val []byte
|
||||
d.files[Values].DescendLessOrEqual(topState, func(i btree.Item) bool {
|
||||
item := i.(*filesItem)
|
||||
d.files[Values].DescendLessOrEqual(topState, func(item *filesItem) bool {
|
||||
if item.index.Empty() {
|
||||
return true
|
||||
}
|
||||
@ -1118,9 +1118,8 @@ func (d *Domain) historyBeforeTxNum(key []byte, txNum uint64, roTx kv.Tx) ([]byt
|
||||
var historyItem *filesItem
|
||||
search.startTxNum = foundStartTxNum
|
||||
search.endTxNum = foundEndTxNum
|
||||
if i := d.files[History].Get(&search); i != nil {
|
||||
historyItem = i.(*filesItem)
|
||||
} else {
|
||||
historyItem, ok := d.files[History].Get(&search)
|
||||
if !ok || historyItem == nil {
|
||||
return nil, false, fmt.Errorf("no %s file found for [%x]", d.filenameBase, key)
|
||||
}
|
||||
offset := historyItem.indexReader.Lookup2(txKey[:], key)
|
||||
|
@ -36,9 +36,12 @@ func (d *Domain) endTxNumMinimax() uint64 {
|
||||
var minimax uint64
|
||||
for fType := FileType(0); fType < NumberOfTypes; fType++ {
|
||||
if d.files[fType].Len() > 0 {
|
||||
endTxNum := d.files[fType].Max().(*filesItem).endTxNum
|
||||
if minimax == 0 || endTxNum < minimax {
|
||||
minimax = endTxNum
|
||||
max, ok := d.files[fType].Max()
|
||||
if ok {
|
||||
endTxNum := max.endTxNum
|
||||
if minimax == 0 || endTxNum < minimax {
|
||||
minimax = endTxNum
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -73,8 +76,7 @@ func (r DomainRanges) any() bool {
|
||||
// That is why only Values type is inspected
|
||||
func (d *Domain) findMergeRange(maxEndTxNum, maxSpan uint64) DomainRanges {
|
||||
var r DomainRanges
|
||||
d.files[Values].Ascend(func(i btree.Item) bool {
|
||||
item := i.(*filesItem)
|
||||
d.files[Values].Ascend(func(item *filesItem) bool {
|
||||
if item.endTxNum > maxEndTxNum {
|
||||
return false
|
||||
}
|
||||
@ -91,8 +93,7 @@ func (d *Domain) findMergeRange(maxEndTxNum, maxSpan uint64) DomainRanges {
|
||||
}
|
||||
return true
|
||||
})
|
||||
d.files[History].Ascend(func(i btree.Item) bool {
|
||||
item := i.(*filesItem)
|
||||
d.files[History].Ascend(func(item *filesItem) bool {
|
||||
if item.endTxNum > maxEndTxNum {
|
||||
return false
|
||||
}
|
||||
@ -164,8 +165,7 @@ func (d *Domain) staticFilesInRange(r DomainRanges) ([][NumberOfTypes]*filesItem
|
||||
}
|
||||
startJ = 0
|
||||
j := 0
|
||||
d.files[fType].Ascend(func(i btree.Item) bool {
|
||||
item := i.(*filesItem)
|
||||
d.files[fType].Ascend(func(item *filesItem) bool {
|
||||
if item.startTxNum < startTxNum {
|
||||
startJ++
|
||||
return true
|
||||
|
@ -41,8 +41,7 @@ func (d *Domain) MakeContext() *DomainContext {
|
||||
for fType := FileType(0); fType < NumberOfTypes; fType++ {
|
||||
bt := btree.New(32)
|
||||
dc.files[fType] = bt
|
||||
d.files[fType].Ascend(func(i btree.Item) bool {
|
||||
item := i.(*filesItem)
|
||||
d.files[fType].Ascend(func(item *filesItem) bool {
|
||||
bt.ReplaceOrInsert(&filesItem{
|
||||
startTxNum: item.startTxNum,
|
||||
endTxNum: item.endTxNum,
|
||||
|
@ -25,7 +25,6 @@ import (
|
||||
"fmt"
|
||||
"math"
|
||||
"runtime"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -49,6 +48,7 @@ import (
|
||||
"github.com/ledgerwatch/erigon-lib/types"
|
||||
"github.com/ledgerwatch/log/v3"
|
||||
"go.uber.org/atomic"
|
||||
"golang.org/x/exp/slices"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -2041,7 +2041,7 @@ func (p *PendingPool) EnforceWorstInvariants() {
|
||||
heap.Init(p.worst)
|
||||
}
|
||||
func (p *PendingPool) EnforceBestInvariants() {
|
||||
sort.Sort(p.best)
|
||||
slices.SortFunc(p.best.ms, func(i, j *metaTx) bool { return i.better(j, p.best.pendingBaseFee) })
|
||||
}
|
||||
|
||||
func (p *PendingPool) Best() *metaTx { //nolint
|
||||
|
Loading…
Reference in New Issue
Block a user