diff --git a/kv/tables.go b/kv/tables.go index c1cd1c4fe..6e2f4dad7 100644 --- a/kv/tables.go +++ b/kv/tables.go @@ -562,9 +562,13 @@ var ChaindataTablesCfg = TableCfg{ CodeKeys: {Flags: DupSort}, CodeHistoryKeys: {Flags: DupSort}, CodeIdx: {Flags: DupSort}, + LogAddressKeys: {Flags: DupSort}, LogAddressIdx: {Flags: DupSort}, + LogTopicsKeys: {Flags: DupSort}, LogTopicsIdx: {Flags: DupSort}, + TracesFromKeys: {Flags: DupSort}, TracesFromIdx: {Flags: DupSort}, + TracesToKeys: {Flags: DupSort}, TracesToIdx: {Flags: DupSort}, } diff --git a/state/aggregator.go b/state/aggregator.go index b8a4068b2..21f8772ab 100644 --- a/state/aggregator.go +++ b/state/aggregator.go @@ -662,6 +662,10 @@ func (a *Aggregator) ReadAccountCodeSizeBeforeTxNum(addr []byte, txNum uint64, r return len(code), nil } +func (a *Aggregator) ReadyToFinishTx() bool { + return (a.txNum+1)%a.aggregationStep == 0 +} + func (a *Aggregator) FinishTx() error { if (a.txNum+1)%a.aggregationStep != 0 { return nil diff --git a/state/domain.go b/state/domain.go index c2ead72c2..eecbe04a2 100644 --- a/state/domain.go +++ b/state/domain.go @@ -375,11 +375,13 @@ const ( // CursorItem is the item in the priority queue used to do merge interation // over storage of a given account type CursorItem struct { - t CursorType // Whether this item represents state file or DB record, or tree - endTxNum uint64 - key, val []byte - dg *compress.Getter - c kv.CursorDupSort + t CursorType // Whether this item represents state file or DB record, or tree + endTxNum uint64 + key, val []byte + dg *compress.Getter + c kv.CursorDupSort + keyCompressed bool + valCompressed bool } type CursorHeap []*CursorItem diff --git a/state/merge.go b/state/merge.go index aeacc0a77..18e15edd5 100644 --- a/state/merge.go +++ b/state/merge.go @@ -251,6 +251,7 @@ func (d *Domain) mergeFiles(files [][NumberOfTypes]*filesItem, r DomainRanges, m } }() for fType := FileType(0); fType < NumberOfTypes; fType++ { + compressVal := fType != EfHistory var startTxNum, endTxNum uint64 if fType == Values { if !r.values { @@ -265,7 +266,6 @@ func (d *Domain) mergeFiles(files [][NumberOfTypes]*filesItem, r DomainRanges, m startTxNum = r.historyStartTxNum endTxNum = r.historyEndTxNum } - valCompressed := fType != EfHistory removeVals := fType == History && (endTxNum-startTxNum) == maxSpan tmpPath := filepath.Join(d.dir, fmt.Sprintf("%s-%s.%d-%d.tmp", d.filenameBase, fType.String(), startTxNum/d.aggregationStep, endTxNum/d.aggregationStep)) datPath := filepath.Join(d.dir, fmt.Sprintf("%s-%s.%d-%d.dat", d.filenameBase, fType.String(), startTxNum/d.aggregationStep, endTxNum/d.aggregationStep)) @@ -290,7 +290,15 @@ func (d *Domain) mergeFiles(files [][NumberOfTypes]*filesItem, r DomainRanges, m if g.HasNext() { key, _ := g.Next(nil) val, _ := g.Next(nil) - heap.Push(&cp, &CursorItem{t: FILE_CURSOR, dg: g, key: key, val: val, endTxNum: item.endTxNum}) + heap.Push(&cp, &CursorItem{ + t: FILE_CURSOR, + dg: g, + key: key, + val: val, + endTxNum: item.endTxNum, + keyCompressed: item.endTxNum-item.startTxNum > d.aggregationStep, + valCompressed: fType != EfHistory && item.endTxNum-item.startTxNum > d.aggregationStep, + }) } } count := 0 @@ -315,8 +323,12 @@ func (d *Domain) mergeFiles(files [][NumberOfTypes]*filesItem, r DomainRanges, m mergedOnce = true } if ci1.dg.HasNext() { - ci1.key, _ = ci1.dg.Next(ci1.key[:0]) - if valCompressed { + if ci1.keyCompressed { + ci1.key, _ = ci1.dg.Next(ci1.key[:0]) + } else { + ci1.key, _ = ci1.dg.NextUncompressed() + } + if ci1.valCompressed { ci1.val, _ = ci1.dg.Next(ci1.val[:0]) } else { ci1.val, _ = ci1.dg.NextUncompressed() @@ -341,7 +353,7 @@ func (d *Domain) mergeFiles(files [][NumberOfTypes]*filesItem, r DomainRanges, m return outItems, err } count++ // Only counting keys, not values - if valCompressed { + if compressVal { if err = comp.AddWord(valBuf); err != nil { return outItems, err } @@ -360,7 +372,7 @@ func (d *Domain) mergeFiles(files [][NumberOfTypes]*filesItem, r DomainRanges, m return outItems, err } count++ // Only counting keys, not values - if valCompressed { + if compressVal { if err = comp.AddWord(valBuf); err != nil { return outItems, err } @@ -506,7 +518,14 @@ func (ii *InvertedIndex) mergeFiles(files []*filesItem, startTxNum, endTxNum uin if g.HasNext() { key, _ := g.Next(nil) val, _ := g.Next(nil) - heap.Push(&cp, &CursorItem{t: FILE_CURSOR, dg: g, key: key, val: val, endTxNum: item.endTxNum}) + heap.Push(&cp, &CursorItem{ + t: FILE_CURSOR, + dg: g, + key: key, + val: val, + endTxNum: item.endTxNum, + keyCompressed: item.endTxNum-item.startTxNum > ii.aggregationStep, + }) } } count := 0 @@ -531,7 +550,11 @@ func (ii *InvertedIndex) mergeFiles(files []*filesItem, startTxNum, endTxNum uin mergedOnce = true } if ci1.dg.HasNext() { - ci1.key, _ = ci1.dg.Next(ci1.key[:0]) + if ci1.keyCompressed { + ci1.key, _ = ci1.dg.Next(ci1.key[:0]) + } else { + ci1.key, _ = ci1.dg.NextUncompressed() + } ci1.val, _ = ci1.dg.NextUncompressed() heap.Fix(&cp, 0) } else {