mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2024-12-29 07:07:16 +00:00
[erigon2.2] Fixes for inverted indices and domains for the prototype (#489)
* Better control of compress/uncompressed * Add new function * more careful pruning * Printf * Printf * Fix DupSort * Remove copying in prune Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local>
This commit is contained in:
parent
dba877998b
commit
e2c6ef0058
@ -562,9 +562,13 @@ var ChaindataTablesCfg = TableCfg{
|
||||
CodeKeys: {Flags: DupSort},
|
||||
CodeHistoryKeys: {Flags: DupSort},
|
||||
CodeIdx: {Flags: DupSort},
|
||||
LogAddressKeys: {Flags: DupSort},
|
||||
LogAddressIdx: {Flags: DupSort},
|
||||
LogTopicsKeys: {Flags: DupSort},
|
||||
LogTopicsIdx: {Flags: DupSort},
|
||||
TracesFromKeys: {Flags: DupSort},
|
||||
TracesFromIdx: {Flags: DupSort},
|
||||
TracesToKeys: {Flags: DupSort},
|
||||
TracesToIdx: {Flags: DupSort},
|
||||
}
|
||||
|
||||
|
@ -662,6 +662,10 @@ func (a *Aggregator) ReadAccountCodeSizeBeforeTxNum(addr []byte, txNum uint64, r
|
||||
return len(code), nil
|
||||
}
|
||||
|
||||
func (a *Aggregator) ReadyToFinishTx() bool {
|
||||
return (a.txNum+1)%a.aggregationStep == 0
|
||||
}
|
||||
|
||||
func (a *Aggregator) FinishTx() error {
|
||||
if (a.txNum+1)%a.aggregationStep != 0 {
|
||||
return nil
|
||||
|
@ -375,11 +375,13 @@ const (
|
||||
// CursorItem is the item in the priority queue used to do merge interation
|
||||
// over storage of a given account
|
||||
type CursorItem struct {
|
||||
t CursorType // Whether this item represents state file or DB record, or tree
|
||||
endTxNum uint64
|
||||
key, val []byte
|
||||
dg *compress.Getter
|
||||
c kv.CursorDupSort
|
||||
t CursorType // Whether this item represents state file or DB record, or tree
|
||||
endTxNum uint64
|
||||
key, val []byte
|
||||
dg *compress.Getter
|
||||
c kv.CursorDupSort
|
||||
keyCompressed bool
|
||||
valCompressed bool
|
||||
}
|
||||
|
||||
type CursorHeap []*CursorItem
|
||||
|
@ -251,6 +251,7 @@ func (d *Domain) mergeFiles(files [][NumberOfTypes]*filesItem, r DomainRanges, m
|
||||
}
|
||||
}()
|
||||
for fType := FileType(0); fType < NumberOfTypes; fType++ {
|
||||
compressVal := fType != EfHistory
|
||||
var startTxNum, endTxNum uint64
|
||||
if fType == Values {
|
||||
if !r.values {
|
||||
@ -265,7 +266,6 @@ func (d *Domain) mergeFiles(files [][NumberOfTypes]*filesItem, r DomainRanges, m
|
||||
startTxNum = r.historyStartTxNum
|
||||
endTxNum = r.historyEndTxNum
|
||||
}
|
||||
valCompressed := fType != EfHistory
|
||||
removeVals := fType == History && (endTxNum-startTxNum) == maxSpan
|
||||
tmpPath := filepath.Join(d.dir, fmt.Sprintf("%s-%s.%d-%d.tmp", d.filenameBase, fType.String(), startTxNum/d.aggregationStep, endTxNum/d.aggregationStep))
|
||||
datPath := filepath.Join(d.dir, fmt.Sprintf("%s-%s.%d-%d.dat", d.filenameBase, fType.String(), startTxNum/d.aggregationStep, endTxNum/d.aggregationStep))
|
||||
@ -290,7 +290,15 @@ func (d *Domain) mergeFiles(files [][NumberOfTypes]*filesItem, r DomainRanges, m
|
||||
if g.HasNext() {
|
||||
key, _ := g.Next(nil)
|
||||
val, _ := g.Next(nil)
|
||||
heap.Push(&cp, &CursorItem{t: FILE_CURSOR, dg: g, key: key, val: val, endTxNum: item.endTxNum})
|
||||
heap.Push(&cp, &CursorItem{
|
||||
t: FILE_CURSOR,
|
||||
dg: g,
|
||||
key: key,
|
||||
val: val,
|
||||
endTxNum: item.endTxNum,
|
||||
keyCompressed: item.endTxNum-item.startTxNum > d.aggregationStep,
|
||||
valCompressed: fType != EfHistory && item.endTxNum-item.startTxNum > d.aggregationStep,
|
||||
})
|
||||
}
|
||||
}
|
||||
count := 0
|
||||
@ -315,8 +323,12 @@ func (d *Domain) mergeFiles(files [][NumberOfTypes]*filesItem, r DomainRanges, m
|
||||
mergedOnce = true
|
||||
}
|
||||
if ci1.dg.HasNext() {
|
||||
ci1.key, _ = ci1.dg.Next(ci1.key[:0])
|
||||
if valCompressed {
|
||||
if ci1.keyCompressed {
|
||||
ci1.key, _ = ci1.dg.Next(ci1.key[:0])
|
||||
} else {
|
||||
ci1.key, _ = ci1.dg.NextUncompressed()
|
||||
}
|
||||
if ci1.valCompressed {
|
||||
ci1.val, _ = ci1.dg.Next(ci1.val[:0])
|
||||
} else {
|
||||
ci1.val, _ = ci1.dg.NextUncompressed()
|
||||
@ -341,7 +353,7 @@ func (d *Domain) mergeFiles(files [][NumberOfTypes]*filesItem, r DomainRanges, m
|
||||
return outItems, err
|
||||
}
|
||||
count++ // Only counting keys, not values
|
||||
if valCompressed {
|
||||
if compressVal {
|
||||
if err = comp.AddWord(valBuf); err != nil {
|
||||
return outItems, err
|
||||
}
|
||||
@ -360,7 +372,7 @@ func (d *Domain) mergeFiles(files [][NumberOfTypes]*filesItem, r DomainRanges, m
|
||||
return outItems, err
|
||||
}
|
||||
count++ // Only counting keys, not values
|
||||
if valCompressed {
|
||||
if compressVal {
|
||||
if err = comp.AddWord(valBuf); err != nil {
|
||||
return outItems, err
|
||||
}
|
||||
@ -506,7 +518,14 @@ func (ii *InvertedIndex) mergeFiles(files []*filesItem, startTxNum, endTxNum uin
|
||||
if g.HasNext() {
|
||||
key, _ := g.Next(nil)
|
||||
val, _ := g.Next(nil)
|
||||
heap.Push(&cp, &CursorItem{t: FILE_CURSOR, dg: g, key: key, val: val, endTxNum: item.endTxNum})
|
||||
heap.Push(&cp, &CursorItem{
|
||||
t: FILE_CURSOR,
|
||||
dg: g,
|
||||
key: key,
|
||||
val: val,
|
||||
endTxNum: item.endTxNum,
|
||||
keyCompressed: item.endTxNum-item.startTxNum > ii.aggregationStep,
|
||||
})
|
||||
}
|
||||
}
|
||||
count := 0
|
||||
@ -531,7 +550,11 @@ func (ii *InvertedIndex) mergeFiles(files []*filesItem, startTxNum, endTxNum uin
|
||||
mergedOnce = true
|
||||
}
|
||||
if ci1.dg.HasNext() {
|
||||
ci1.key, _ = ci1.dg.Next(ci1.key[:0])
|
||||
if ci1.keyCompressed {
|
||||
ci1.key, _ = ci1.dg.Next(ci1.key[:0])
|
||||
} else {
|
||||
ci1.key, _ = ci1.dg.NextUncompressed()
|
||||
}
|
||||
ci1.val, _ = ci1.dg.NextUncompressed()
|
||||
heap.Fix(&cp, 0)
|
||||
} else {
|
||||
|
Loading…
Reference in New Issue
Block a user