E3: parallel exec, apply on roTx (#5879)

This commit is contained in:
Alex Sharov 2022-10-28 08:47:45 +07:00 committed by GitHub
parent 5a321d6240
commit 8d1ed547b0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 132 additions and 478 deletions

View File

@ -12,6 +12,7 @@ import (
"github.com/c2h5oh/datasize"
common2 "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/bitmapdb"
"github.com/ledgerwatch/erigon/cmd/hack/tool/fromdb"
"github.com/ledgerwatch/erigon/cmd/utils"
"github.com/ledgerwatch/erigon/common"
@ -27,7 +28,6 @@ import (
"github.com/ledgerwatch/erigon/eth/integrity"
"github.com/ledgerwatch/erigon/eth/stagedsync"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/erigon/ethdb/bitmapdb"
"github.com/ledgerwatch/erigon/node/nodecfg"
"github.com/ledgerwatch/erigon/node/nodecfg/datadir"
"github.com/ledgerwatch/erigon/params"

View File

@ -8,12 +8,12 @@ import (
"github.com/RoaringBitmap/roaring"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/bitmapdb"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/common/dbutils"
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/eth/filters"
"github.com/ledgerwatch/erigon/ethdb/bitmapdb"
"github.com/ledgerwatch/erigon/ethdb/cbor"
"github.com/ledgerwatch/erigon/rpc"
"github.com/ledgerwatch/erigon/turbo/rpchelper"

View File

@ -7,13 +7,12 @@ import (
"fmt"
"math/big"
"github.com/RoaringBitmap/roaring"
"github.com/RoaringBitmap/roaring/roaring64"
"github.com/holiman/uint256"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/bitmapdb"
libstate "github.com/ledgerwatch/erigon-lib/state"
"github.com/ledgerwatch/log/v3"
"github.com/RoaringBitmap/roaring"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/common/dbutils"
"github.com/ledgerwatch/erigon/common/hexutil"
@ -24,12 +23,12 @@ import (
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/core/vm"
"github.com/ledgerwatch/erigon/eth/filters"
"github.com/ledgerwatch/erigon/ethdb/bitmapdb"
"github.com/ledgerwatch/erigon/ethdb/cbor"
"github.com/ledgerwatch/erigon/params"
"github.com/ledgerwatch/erigon/rpc"
"github.com/ledgerwatch/erigon/turbo/rpchelper"
"github.com/ledgerwatch/erigon/turbo/transactions"
"github.com/ledgerwatch/log/v3"
)
func (api *BaseAPI) getReceipts(ctx context.Context, tx kv.Tx, chainConfig *params.ChainConfig, block *types.Block, senders []common.Address) (types.Receipts, error) {

View File

@ -9,6 +9,7 @@ import (
"github.com/RoaringBitmap/roaring/roaring64"
jsoniter "github.com/json-iterator/go"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/bitmapdb"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/common/hexutil"
"github.com/ledgerwatch/erigon/consensus/ethash"
@ -18,7 +19,6 @@ import (
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/core/vm"
"github.com/ledgerwatch/erigon/ethdb"
"github.com/ledgerwatch/erigon/ethdb/bitmapdb"
"github.com/ledgerwatch/erigon/params"
"github.com/ledgerwatch/erigon/rpc"
"github.com/ledgerwatch/erigon/turbo/rpchelper"

View File

@ -5,11 +5,11 @@ import (
"fmt"
"time"
"github.com/ledgerwatch/erigon-lib/kv/bitmapdb"
"github.com/ledgerwatch/erigon-lib/kv/mdbx"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/common/changeset"
"github.com/ledgerwatch/erigon/common/dbutils"
"github.com/ledgerwatch/erigon/ethdb/bitmapdb"
)
func CheckIndex(ctx context.Context, chaindata string, changeSetBucket string, indexBucket string) error {

View File

@ -8,13 +8,13 @@ import (
"github.com/RoaringBitmap/roaring/roaring64"
"github.com/holiman/uint256"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/bitmapdb"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/common/changeset"
"github.com/ledgerwatch/erigon/common/dbutils"
"github.com/ledgerwatch/erigon/common/math"
"github.com/ledgerwatch/erigon/core/types/accounts"
"github.com/ledgerwatch/erigon/ethdb"
"github.com/ledgerwatch/erigon/ethdb/bitmapdb"
"github.com/ledgerwatch/erigon/turbo/trie"
)

View File

@ -8,12 +8,12 @@ import (
"github.com/RoaringBitmap/roaring/roaring64"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/bitmapdb"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/common/changeset"
"github.com/ledgerwatch/erigon/common/dbutils"
"github.com/ledgerwatch/erigon/core/types/accounts"
"github.com/ledgerwatch/erigon/ethdb"
"github.com/ledgerwatch/erigon/ethdb/bitmapdb"
)
func GetAsOf(tx kv.Tx, indexC kv.Cursor, changesC kv.CursorDupSort, storage bool, key []byte, timestamp uint64) ([]byte, error) {

View File

@ -11,6 +11,7 @@ import (
"github.com/davecgh/go-spew/spew"
"github.com/holiman/uint256"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/bitmapdb"
"github.com/ledgerwatch/erigon-lib/kv/memdb"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/common/changeset"
@ -18,7 +19,6 @@ import (
"github.com/ledgerwatch/erigon/common/math"
"github.com/ledgerwatch/erigon/core/types/accounts"
"github.com/ledgerwatch/erigon/crypto"
"github.com/ledgerwatch/erigon/ethdb/bitmapdb"
"github.com/ledgerwatch/erigon/turbo/trie"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

View File

@ -196,13 +196,11 @@ func (rs *State22) RegisterSender(txTask *TxTask) bool {
}
func (rs *State22) CommitTxNum(sender *common.Address, txNum uint64) uint64 {
rs.queueLock.Lock()
defer rs.queueLock.Unlock()
rs.triggerLock.Lock()
defer rs.triggerLock.Unlock()
count := uint64(0)
if triggered, ok := rs.triggers[txNum]; ok {
heap.Push(&rs.queue, triggered)
rs.queuePush(triggered)
rs.receiveWork.Signal()
count++
delete(rs.triggers, txNum)
@ -213,10 +211,22 @@ func (rs *State22) CommitTxNum(sender *common.Address, txNum uint64) uint64 {
delete(rs.senderTxNums, *sender)
}
}
rs.txsDone++
rs.txDoneIncrement()
return count
}
func (rs *State22) queuePush(t *TxTask) {
rs.queueLock.Lock()
heap.Push(&rs.queue, t)
rs.queueLock.Unlock()
}
func (rs *State22) txDoneIncrement() {
rs.lock.Lock()
rs.txsDone++
rs.lock.Unlock()
}
func (rs *State22) AddWork(txTask *TxTask) {
txTask.BalanceIncreaseSet = nil
txTask.ReadLists = nil
@ -234,9 +244,7 @@ func (rs *State22) AddWork(txTask *TxTask) {
txTask.StoragePrevs = nil
txTask.CodePrevs = nil
*/
rs.queueLock.Lock()
heap.Push(&rs.queue, txTask)
rs.queueLock.Unlock()
rs.queuePush(txTask)
rs.receiveWork.Signal()
}
@ -425,9 +433,6 @@ func (rs *State22) Apply(roTx kv.Tx, txTask *TxTask, agg *libstate.Aggregator22)
}
}
}
if err := agg.FinishTx(); err != nil {
return err
}
if txTask.WriteLists != nil {
for table, list := range txTask.WriteLists {
for i, key := range list.Keys {
@ -535,14 +540,16 @@ func (rs *State22) Unwind(ctx context.Context, tx kv.RwTx, txUnwindTo uint64, ag
func (rs *State22) DoneCount() uint64 {
rs.lock.RLock()
defer rs.lock.RUnlock()
return rs.txsDone
r := rs.txsDone
rs.lock.RUnlock()
return r
}
func (rs *State22) SizeEstimate() uint64 {
rs.lock.RLock()
defer rs.lock.RUnlock()
return rs.sizeEstimate
r := rs.sizeEstimate
rs.lock.RUnlock()
return r
}
func (rs *State22) ReadsValid(readLists map[string]*KvList) bool {

View File

@ -50,7 +50,7 @@ type Progress struct {
commitThreshold uint64
}
func (p *Progress) Log(logPrefix string, rs *state.State22, rwsLen int, queueSize, count, inputBlockNum, outputBlockNum, repeatCount uint64, resultsSize uint64, resultCh chan *state.TxTask) {
func (p *Progress) Log(logPrefix string, rs *state.State22, rwsLen int, queueSize, count, inputBlockNum, outputBlockNum, repeatCount uint64, resultsSize uint64, resultCh chan *state.TxTask, idxStepsAmountInDB float64) {
var m runtime.MemStats
common.ReadMemStats(&m)
sizeEstimate := rs.SizeEstimate()
@ -73,6 +73,7 @@ func (p *Progress) Log(logPrefix string, rs *state.State22, rwsLen int, queueSiz
"resultsSize", common.ByteCount(resultsSize),
"repeatRatio", fmt.Sprintf("%.2f%%", repeatRatio),
"buffer", fmt.Sprintf("%s/%s", common.ByteCount(sizeEstimate), common.ByteCount(p.commitThreshold)),
"idxStepsInDB", fmt.Sprintf("%.2f", idxStepsAmountInDB),
"alloc", common.ByteCount(m.Alloc), "sys", common.ByteCount(m.Sys),
)
//var txNums []string
@ -117,7 +118,7 @@ func Exec3(ctx context.Context,
var resultsSize = atomic2.NewInt64(0)
var lock sync.RWMutex
var rws state.TxTaskQueue
var rwsLock sync.Mutex
var rwsLock sync.RWMutex
if execStage.BlockNumber > 0 {
stageProgress = execStage.BlockNumber
@ -175,30 +176,37 @@ func Exec3(ctx context.Context,
commitThreshold := batchSize.Bytes() * 4
resultsThreshold := int64(batchSize.Bytes() * 4)
progress := NewProgress(block, commitThreshold)
logEvery := time.NewTicker(logInterval)
logEvery := time.NewTicker(10 * time.Second)
defer logEvery.Stop()
pruneEvery := time.NewTicker(time.Second)
defer pruneEvery.Stop()
rwsReceiveCond := sync.NewCond(&rwsLock)
heap.Init(&rws)
agg.SetTxNum(inputTxNum)
if parallel {
// Go-routine gathering results from the workers
go func() {
tx, err := chainDb.BeginRw(ctx)
if err := chainDb.Update(ctx, func(tx kv.RwTx) error {
agg.SetTx(tx)
if err = agg.Prune(ctx, agg.EndTxNumMinimax()); err != nil { // prune part of retired data, before commit
panic(err)
}
return nil
}); err != nil {
return err
}
applyLoop := func(ctx context.Context) {
tx, err := chainDb.BeginRo(ctx)
if err != nil {
panic(err)
}
defer tx.Rollback()
agg.SetTx(tx)
defer rs.Finish()
defer agg.StartWrites().FinishWrites()
doPrune := 0
_ = doPrune
for outputTxNum.Load() < maxTxNum.Load() {
select {
case <-ctx.Done():
return
case txTask := <-resultCh:
//fmt.Printf("Saved %d block %d txIndex %d\n", txTask.TxNum, txTask.BlockNum, txTask.TxIndex)
func() {
rwsLock.Lock()
defer rwsLock.Unlock()
@ -207,24 +215,39 @@ func Exec3(ctx context.Context,
processResultQueue(&rws, outputTxNum, rs, agg, tx, triggerCount, outputBlockNum, repeatCount, resultsSize)
rwsReceiveCond.Signal()
}()
}
}
}
/*
doPrune++
// if nothing to do, then spend some time for pruning
if doPrune%100 == 0 && len(resultCh) == 0 {
if err = agg.Prune(100); err != nil { // prune part of retired data, before commit
panic(err)
}
}
*/
// Go-routine gathering results from the workers
go func() {
tx, err := chainDb.BeginRw(ctx)
if err != nil {
panic(err)
}
defer tx.Rollback()
defer rs.Finish()
agg.SetTx(tx)
defer agg.StartWrites().FinishWrites()
applyCtx, cancelApplyCtx := context.WithCancel(ctx)
defer cancelApplyCtx()
go applyLoop(applyCtx)
for outputTxNum.Load() < maxTxNum.Load() {
select {
case <-logEvery.C:
progress.Log(execStage.LogPrefix(), rs, rws.Len(), uint64(queueSize), rs.DoneCount(), inputBlockNum.Load(), outputBlockNum.Load(), repeatCount.Load(), uint64(resultsSize.Load()), resultCh)
sizeEstimate := rs.SizeEstimate()
rwsLock.RLock()
rwsLen := rws.Len()
rwsLock.RUnlock()
progress.Log(execStage.LogPrefix(), rs, rwsLen, uint64(queueSize), rs.DoneCount(), inputBlockNum.Load(), outputBlockNum.Load(), repeatCount.Load(), uint64(resultsSize.Load()), resultCh, idxStepsInDB(tx))
//prevTriggerCount = triggerCount
if sizeEstimate < commitThreshold {
if rs.SizeEstimate() < commitThreshold {
break
}
cancelApplyCtx()
commitStart := time.Now()
log.Info("Committing...")
err := func() error {
@ -278,12 +301,18 @@ func Exec3(ctx context.Context,
return err
}
//TODO: can't commit - because we are in the middle of the block. Need make sure that we are always processed whole block.
if err = agg.Prune(ethconfig.HistoryV3AggregationStep / 20); err != nil { // prune part of retired data, before commit
return err
if idxStepsInDB(tx) > 3 {
if err = agg.Prune(ctx, ethconfig.HistoryV3AggregationStep/10); err != nil { // prune part of retired data, before commit
return err
}
}
if err = tx.Commit(); err != nil {
return err
}
applyCtx, cancelApplyCtx = context.WithCancel(ctx)
go applyLoop(applyCtx)
if tx, err = chainDb.BeginRw(ctx); err != nil {
return err
}
@ -297,6 +326,17 @@ func Exec3(ctx context.Context,
panic(err)
}
log.Info("Committed", "time", time.Since(commitStart))
case <-pruneEvery.C:
log.Debug("can prune", "can", agg.CanPrune(tx))
if agg.CanPrune(tx) {
t := time.Now()
for time.Since(t) < time.Second {
log.Debug("do prune")
if err = agg.Prune(ctx, 1_000); err != nil { // prune part of retired data, before commit
panic(err)
}
}
}
}
}
@ -321,6 +361,11 @@ func Exec3(ctx context.Context,
defer agg.StartWrites().FinishWrites()
}
if block < blockReader.(WithSnapshots).Snapshots().BlocksAvailable() {
agg.KeepInDB(0)
}
defer agg.KeepInDB(ethconfig.HistoryV3AggregationStep)
var b *types.Block
var blockNum uint64
loop:
@ -388,6 +433,7 @@ loop:
if err := rs.Apply(reconWorkers[0].Tx(), txTask, agg); err != nil {
panic(fmt.Errorf("State22.Apply: %w", err))
}
outputTxNum.Inc()
outputBlockNum.Store(txTask.BlockNum)
//fmt.Printf("Applied %d block %d txIndex %d\n", txTask.TxNum, txTask.BlockNum, txTask.TxIndex)
@ -417,7 +463,7 @@ loop:
}
if !useExternalTx {
applyTx.CollectMetrics()
if err = agg.Prune(ethconfig.HistoryV3AggregationStep / 10); err != nil {
if err = agg.Prune(ctx, ethconfig.HistoryV3AggregationStep/10); err != nil {
return err
}
if err := applyTx.Commit(); err != nil {
@ -437,7 +483,7 @@ loop:
select {
case <-logEvery.C:
if !parallel {
progress.Log(execStage.LogPrefix(), rs, rws.Len(), uint64(queueSize), count, inputBlockNum.Load(), outputBlockNum.Load(), repeatCount.Load(), uint64(resultsSize.Load()), resultCh)
progress.Log(execStage.LogPrefix(), rs, rws.Len(), uint64(queueSize), count, inputBlockNum.Load(), outputBlockNum.Load(), repeatCount.Load(), uint64(resultsSize.Load()), resultCh, idxStepsInDB(applyTx))
}
case <-interruptCh:
log.Info(fmt.Sprintf("interrupted, please wait for cleanup, next run will start with block %d", blockNum))
@ -445,6 +491,10 @@ loop:
break loop
default:
}
if err := agg.BuildFilesInBackground(chainDb); err != nil {
return err
}
}
if parallel {
wg.Wait()
@ -1072,3 +1122,15 @@ func ReconstituteState(ctx context.Context, s *StageState, dirs datadir.Dirs, wo
}
return nil
}
func idxStepsInDB(tx kv.Tx) float64 {
fst, _ := kv.FirstKey(tx, kv.TracesToKeys)
lst, _ := kv.LastKey(tx, kv.TracesToKeys)
if len(fst) > 0 && len(lst) > 0 {
fstTxNum := binary.BigEndian.Uint64(fst)
lstTxNum := binary.BigEndian.Uint64(lst)
return float64(lstTxNum-fstTxNum) / float64(ethconfig.HistoryV3AggregationStep)
}
return 0
}

View File

@ -14,9 +14,9 @@ import (
"github.com/ledgerwatch/erigon-lib/common/length"
"github.com/ledgerwatch/erigon-lib/etl"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/bitmapdb"
"github.com/ledgerwatch/erigon/common/dbutils"
"github.com/ledgerwatch/erigon/common/math"
"github.com/ledgerwatch/erigon/ethdb/bitmapdb"
"github.com/ledgerwatch/erigon/ethdb/prune"
"github.com/ledgerwatch/erigon/params"
"github.com/ledgerwatch/log/v3"

View File

@ -7,10 +7,10 @@ import (
"github.com/RoaringBitmap/roaring/roaring64"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/bitmapdb"
"github.com/ledgerwatch/erigon-lib/kv/memdb"
"github.com/ledgerwatch/erigon/common/dbutils"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/erigon/ethdb/bitmapdb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

View File

@ -15,10 +15,10 @@ import (
"github.com/ledgerwatch/erigon-lib/common/length"
"github.com/ledgerwatch/erigon-lib/etl"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/bitmapdb"
"github.com/ledgerwatch/erigon/common/changeset"
"github.com/ledgerwatch/erigon/common/dbutils"
"github.com/ledgerwatch/erigon/ethdb"
"github.com/ledgerwatch/erigon/ethdb/bitmapdb"
"github.com/ledgerwatch/erigon/ethdb/prune"
"github.com/ledgerwatch/log/v3"
"golang.org/x/exp/slices"

View File

@ -13,13 +13,13 @@ import (
"github.com/RoaringBitmap/roaring/roaring64"
"github.com/ledgerwatch/erigon-lib/common/length"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/bitmapdb"
kv2 "github.com/ledgerwatch/erigon-lib/kv/memdb"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/common/changeset"
"github.com/ledgerwatch/erigon/common/dbutils"
"github.com/ledgerwatch/erigon/common/math"
"github.com/ledgerwatch/erigon/crypto"
"github.com/ledgerwatch/erigon/ethdb/bitmapdb"
"github.com/ledgerwatch/erigon/ethdb/prune"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

View File

@ -13,9 +13,9 @@ import (
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/etl"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/bitmapdb"
"github.com/ledgerwatch/erigon/common/dbutils"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/ethdb/bitmapdb"
"github.com/ledgerwatch/erigon/ethdb/cbor"
"github.com/ledgerwatch/erigon/ethdb/prune"
"github.com/ledgerwatch/log/v3"

View File

@ -8,11 +8,11 @@ import (
"github.com/ledgerwatch/erigon-lib/common/length"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/bitmapdb"
"github.com/ledgerwatch/erigon-lib/kv/memdb"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/ethdb/bitmapdb"
"github.com/ledgerwatch/erigon/ethdb/prune"
"github.com/stretchr/testify/require"

View File

@ -1,359 +0,0 @@
package bitmapdb
import (
"bytes"
"encoding/binary"
"sort"
"sync"
"github.com/RoaringBitmap/roaring"
"github.com/RoaringBitmap/roaring/roaring64"
"github.com/c2h5oh/datasize"
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/common/math"
"github.com/ledgerwatch/erigon/ethdb"
)
var roaringPool = sync.Pool{
New: func() any {
return roaring.New()
},
}
func NewBitmap() *roaring.Bitmap {
a := roaringPool.Get().(*roaring.Bitmap)
a.Clear()
return a
}
func ReturnToPool(a *roaring.Bitmap) {
roaringPool.Put(a)
}
var roaring64Pool = sync.Pool{
New: func() any {
return roaring64.New()
},
}
func NewBitmap64() *roaring64.Bitmap {
a := roaring64Pool.Get().(*roaring64.Bitmap)
a.Clear()
return a
}
func ReturnToPool64(a *roaring64.Bitmap) {
roaring64Pool.Put(a)
}
const ChunkLimit = uint64(1950 * datasize.B) // threshold beyond which MDBX overflow pages appear: 4096 / 2 - (keySize + 8)
// CutLeft - cut from bitmap `targetSize` bytes from left
// removing lft part from `bm`
// returns nil on zero cardinality
func CutLeft(bm *roaring.Bitmap, sizeLimit uint64) *roaring.Bitmap {
if bm.GetCardinality() == 0 {
return nil
}
sz := bm.GetSerializedSizeInBytes()
if sz <= sizeLimit {
lft := roaring.New()
lft.AddRange(uint64(bm.Minimum()), uint64(bm.Maximum())+1)
lft.And(bm)
lft.RunOptimize()
bm.Clear()
return lft
}
from := uint64(bm.Minimum())
minMax := bm.Maximum() - bm.Minimum()
to := sort.Search(int(minMax), func(i int) bool { // can be optimized to avoid "too small steps", but let's leave it for readability
lft := roaring.New() // bitmap.Clear() method intentionally not used here, because then serialized size of bitmap getting bigger
lft.AddRange(from, from+uint64(i)+1)
lft.And(bm)
lft.RunOptimize()
return lft.GetSerializedSizeInBytes() > sizeLimit
})
lft := roaring.New()
lft.AddRange(from, from+uint64(to)) // no +1 because sort.Search returns element which is just higher threshold - but we need lower
lft.And(bm)
bm.RemoveRange(from, from+uint64(to))
lft.RunOptimize()
return lft
}
func WalkChunks(bm *roaring.Bitmap, sizeLimit uint64, f func(chunk *roaring.Bitmap, isLast bool) error) error {
for bm.GetCardinality() > 0 {
if err := f(CutLeft(bm, sizeLimit), bm.GetCardinality() == 0); err != nil {
return err
}
}
return nil
}
func WalkChunkWithKeys(k []byte, m *roaring.Bitmap, sizeLimit uint64, f func(chunkKey []byte, chunk *roaring.Bitmap) error) error {
return WalkChunks(m, sizeLimit, func(chunk *roaring.Bitmap, isLast bool) error {
chunkKey := make([]byte, len(k)+4)
copy(chunkKey, k)
if isLast {
binary.BigEndian.PutUint32(chunkKey[len(k):], ^uint32(0))
} else {
binary.BigEndian.PutUint32(chunkKey[len(k):], chunk.Maximum())
}
return f(chunkKey, chunk)
})
}
// TruncateRange - gets existing bitmap in db and call RemoveRange operator on it.
// starts from hot shard, stops when shard not overlap with [from-to)
// !Important: [from, to)
func TruncateRange(db kv.RwTx, bucket string, key []byte, to uint32) error {
chunkKey := make([]byte, len(key)+4)
copy(chunkKey, key)
binary.BigEndian.PutUint32(chunkKey[len(chunkKey)-4:], to)
bm, err := Get(db, bucket, key, to, math.MaxUint32)
if err != nil {
return err
}
if bm.GetCardinality() > 0 && to <= bm.Maximum() {
bm.RemoveRange(uint64(to), uint64(bm.Maximum())+1)
}
c, err := db.Cursor(bucket)
if err != nil {
return err
}
defer c.Close()
if err := ethdb.Walk(c, chunkKey, 0, func(k, v []byte) (bool, error) {
if !bytes.HasPrefix(k, key) {
return false, nil
}
if err := db.Delete(bucket, k); err != nil {
return false, err
}
return true, nil
}); err != nil {
return err
}
buf := bytes.NewBuffer(nil)
return WalkChunkWithKeys(key, bm, ChunkLimit, func(chunkKey []byte, chunk *roaring.Bitmap) error {
buf.Reset()
if _, err := chunk.WriteTo(buf); err != nil {
return err
}
return db.Put(bucket, chunkKey, libcommon.Copy(buf.Bytes()))
})
}
// Get - reading as much chunks as needed to satisfy [from, to] condition
// join all chunks to 1 bitmap by Or operator
func Get(db kv.Tx, bucket string, key []byte, from, to uint32) (*roaring.Bitmap, error) {
var chunks []*roaring.Bitmap
fromKey := make([]byte, len(key)+4)
copy(fromKey, key)
binary.BigEndian.PutUint32(fromKey[len(fromKey)-4:], from)
c, err := db.Cursor(bucket)
if err != nil {
return nil, err
}
defer c.Close()
for k, v, err := c.Seek(fromKey); k != nil; k, v, err = c.Next() {
if err != nil {
return nil, err
}
if !bytes.HasPrefix(k, key) {
break
}
bm := NewBitmap()
defer ReturnToPool(bm)
if _, err := bm.ReadFrom(bytes.NewReader(v)); err != nil {
return nil, err
}
chunks = append(chunks, bm)
if binary.BigEndian.Uint32(k[len(k)-4:]) >= to {
break
}
}
if len(chunks) == 0 {
return roaring.New(), nil
}
return roaring.FastOr(chunks...), nil
}
// SeekInBitmap - returns value in bitmap which is >= n
//
//nolint:deadcode
func SeekInBitmap(m *roaring.Bitmap, n uint32) (found uint32, ok bool) {
i := m.Iterator()
i.AdvanceIfNeeded(n)
ok = i.HasNext()
if ok {
found = i.Next()
}
return found, ok
}
// CutLeft - cut from bitmap `targetSize` bytes from left
// removing lft part from `bm`
// returns nil on zero cardinality
func CutLeft64(bm *roaring64.Bitmap, sizeLimit uint64) *roaring64.Bitmap {
if bm.GetCardinality() == 0 {
return nil
}
sz := bm.GetSerializedSizeInBytes()
if sz <= sizeLimit {
lft := roaring64.New()
lft.AddRange(bm.Minimum(), bm.Maximum()+1)
lft.And(bm)
lft.RunOptimize()
bm.Clear()
return lft
}
from := bm.Minimum()
minMax := bm.Maximum() - bm.Minimum()
to := sort.Search(int(minMax), func(i int) bool { // can be optimized to avoid "too small steps", but let's leave it for readability
lft := roaring64.New() // bitmap.Clear() method intentionally not used here, because then serialized size of bitmap getting bigger
lft.AddRange(from, from+uint64(i)+1)
lft.And(bm)
lft.RunOptimize()
return lft.GetSerializedSizeInBytes() > sizeLimit
})
lft := roaring64.New()
lft.AddRange(from, from+uint64(to)) // no +1 because sort.Search returns element which is just higher threshold - but we need lower
lft.And(bm)
bm.RemoveRange(from, from+uint64(to))
lft.RunOptimize()
return lft
}
func WalkChunks64(bm *roaring64.Bitmap, sizeLimit uint64, f func(chunk *roaring64.Bitmap, isLast bool) error) error {
for bm.GetCardinality() > 0 {
if err := f(CutLeft64(bm, sizeLimit), bm.GetCardinality() == 0); err != nil {
return err
}
}
return nil
}
func WalkChunkWithKeys64(k []byte, m *roaring64.Bitmap, sizeLimit uint64, f func(chunkKey []byte, chunk *roaring64.Bitmap) error) error {
return WalkChunks64(m, sizeLimit, func(chunk *roaring64.Bitmap, isLast bool) error {
chunkKey := make([]byte, len(k)+8)
copy(chunkKey, k)
if isLast {
binary.BigEndian.PutUint64(chunkKey[len(k):], ^uint64(0))
} else {
binary.BigEndian.PutUint64(chunkKey[len(k):], chunk.Maximum())
}
return f(chunkKey, chunk)
})
}
// TruncateRange - gets existing bitmap in db and call RemoveRange operator on it.
// starts from hot shard, stops when shard not overlap with [from-to)
// !Important: [from, to)
func TruncateRange64(db kv.RwTx, bucket string, key []byte, to uint64) error {
chunkKey := make([]byte, len(key)+8)
copy(chunkKey, key)
binary.BigEndian.PutUint64(chunkKey[len(chunkKey)-8:], to)
bm, err := Get64(db, bucket, key, to, math.MaxUint64)
if err != nil {
return err
}
if bm.GetCardinality() > 0 && to <= bm.Maximum() {
bm.RemoveRange(to, bm.Maximum()+1)
}
c, err := db.Cursor(bucket)
if err != nil {
return err
}
defer c.Close()
cDel, err := db.RwCursor(bucket)
if err != nil {
return err
}
defer cDel.Close()
if err := ethdb.Walk(c, chunkKey, 0, func(k, v []byte) (bool, error) {
if !bytes.HasPrefix(k, key) {
return false, nil
}
if err := cDel.Delete(k); err != nil {
return false, err
}
return true, nil
}); err != nil {
return err
}
buf := bytes.NewBuffer(nil)
return WalkChunkWithKeys64(key, bm, ChunkLimit, func(chunkKey []byte, chunk *roaring64.Bitmap) error {
buf.Reset()
if _, err := chunk.WriteTo(buf); err != nil {
return err
}
return db.Put(bucket, chunkKey, libcommon.Copy(buf.Bytes()))
})
}
// Get - reading as much chunks as needed to satisfy [from, to] condition
// join all chunks to 1 bitmap by Or operator
func Get64(db kv.Tx, bucket string, key []byte, from, to uint64) (*roaring64.Bitmap, error) {
var chunks []*roaring64.Bitmap
fromKey := make([]byte, len(key)+8)
copy(fromKey, key)
binary.BigEndian.PutUint64(fromKey[len(fromKey)-8:], from)
c, err := db.Cursor(bucket)
if err != nil {
return nil, err
}
defer c.Close()
for k, v, err := c.Seek(fromKey); k != nil; k, v, err = c.Next() {
if err != nil {
return nil, err
}
if !bytes.HasPrefix(k, key) {
break
}
bm := NewBitmap64()
defer ReturnToPool64(bm)
_, err := bm.ReadFrom(bytes.NewReader(v))
if err != nil {
return nil, err
}
chunks = append(chunks, bm)
if binary.BigEndian.Uint64(k[len(k)-8:]) >= to {
break
}
}
if len(chunks) == 0 {
return roaring64.New(), nil
}
return roaring64.FastOr(chunks...), nil
}
// SeekInBitmap - returns value in bitmap which is >= n
func SeekInBitmap64(m *roaring64.Bitmap, n uint64) (found uint64, ok bool) {
if m.IsEmpty() {
return 0, false
}
if n == 0 {
return m.Minimum(), true
}
searchRank := m.Rank(n - 1)
if searchRank >= m.GetCardinality() {
return 0, false
}
found, _ = m.Select(searchRank)
return found, true
}

View File

@ -1,55 +0,0 @@
package bitmapdb_test
import (
"testing"
"github.com/RoaringBitmap/roaring"
"github.com/ledgerwatch/erigon/ethdb/bitmapdb"
"github.com/stretchr/testify/require"
)
func TestCutLeft(t *testing.T) {
bm := roaring.New()
for j := 0; j < 10_000; j += 20 {
bm.AddRange(uint64(j), uint64(j+10))
}
N := uint64(1024)
for bm.GetCardinality() > 0 {
lft := bitmapdb.CutLeft(bm, N)
lftSz := lft.GetSerializedSizeInBytes()
if bm.GetCardinality() > 0 {
require.True(t, lftSz > N-256 && lftSz < N+256)
} else {
require.True(t, lft.GetSerializedSizeInBytes() > 0)
require.True(t, lftSz < N+256)
}
}
bm = roaring.New()
for j := 0; j < 10_000; j += 20 {
bm.AddRange(uint64(j), uint64(j+10))
}
N = uint64(2048)
for bm.GetCardinality() > 0 {
lft := bitmapdb.CutLeft(bm, N)
lftSz := lft.GetSerializedSizeInBytes()
if bm.GetCardinality() > 0 {
require.True(t, lftSz > N-256 && lftSz < N+256)
} else {
require.True(t, lft.GetSerializedSizeInBytes() > 0)
require.True(t, lftSz < N+256)
}
}
bm = roaring.New()
bm.Add(1)
lft := bitmapdb.CutLeft(bm, N)
require.True(t, lft.GetSerializedSizeInBytes() > 0)
require.True(t, lft.GetCardinality() == 1)
require.True(t, bm.GetCardinality() == 0)
bm = roaring.New()
lft = bitmapdb.CutLeft(bm, N)
require.True(t, lft == nil)
require.True(t, bm.GetCardinality() == 0)
}

2
go.mod
View File

@ -3,7 +3,7 @@ module github.com/ledgerwatch/erigon
go 1.18
require (
github.com/ledgerwatch/erigon-lib v0.0.0-20221026081821-45647a5e4231
github.com/ledgerwatch/erigon-lib v0.0.0-20221027083835-ba838a7a07ee
github.com/ledgerwatch/erigon-snapshot v1.1.1-0.20221025023844-6e716b9e651c
github.com/ledgerwatch/log/v3 v3.6.0
github.com/ledgerwatch/secp256k1 v1.0.0

4
go.sum
View File

@ -556,8 +556,8 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758 h1:0D5M2HQSGD3PYPwICLl+/9oulQauOuETfgFvhBDffs0=
github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c=
github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8=
github.com/ledgerwatch/erigon-lib v0.0.0-20221026081821-45647a5e4231 h1:ten0LCo3boEK5rQO+RKrFQWqtqT3TrK9t/55rfhEkLo=
github.com/ledgerwatch/erigon-lib v0.0.0-20221026081821-45647a5e4231/go.mod h1:IP/aYFvEPlHJ1THh03i+qbHFil+N3enYhTEPVQLFJBw=
github.com/ledgerwatch/erigon-lib v0.0.0-20221027083835-ba838a7a07ee h1:0yTuf5M6RFqB8mCXmde6AVLruY2wfwcYxYcrXJyfC7M=
github.com/ledgerwatch/erigon-lib v0.0.0-20221027083835-ba838a7a07ee/go.mod h1:IP/aYFvEPlHJ1THh03i+qbHFil+N3enYhTEPVQLFJBw=
github.com/ledgerwatch/erigon-snapshot v1.1.1-0.20221025023844-6e716b9e651c h1:OZkwKxpAnFLXMtDX3VFHSClkdbTlvBCXm6jjRxWxMjg=
github.com/ledgerwatch/erigon-snapshot v1.1.1-0.20221025023844-6e716b9e651c/go.mod h1:3AuPxZc85jkehh/HA9h8gabv5MSi3kb/ddtzBsTVJFo=
github.com/ledgerwatch/log/v3 v3.6.0 h1:JBUSK1epPyutUrz7KYDTcJtQLEHnehECRpKbM1ugy5M=

View File

@ -340,7 +340,7 @@ func OpenDatabase(config *nodecfg.Config, logger log.Logger, label kv.Label) (kv
opts = opts.GrowthStep(16 * datasize.MB)
}
if debug.WriteMap() {
opts = opts.WriteMap()
opts = opts.WriteMap().WriteMergeThreshold(1 * 8192)
}
if debug.MdbxReadAhead() {
opts = opts.Flags(func(u uint) uint {

View File

@ -27,7 +27,7 @@ import (
"github.com/holiman/uint256"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/ethdb/bitmapdb"
"github.com/ledgerwatch/erigon-lib/kv/bitmapdb"
"github.com/ledgerwatch/erigon/ethdb/olddb"
"github.com/ledgerwatch/erigon/ethdb/prune"
"github.com/stretchr/testify/assert"