mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2024-12-24 12:37:16 +00:00
248 lines
6.1 KiB
Go
248 lines
6.1 KiB
Go
package bitmapdb
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/binary"
|
|
"github.com/RoaringBitmap/roaring"
|
|
"github.com/c2h5oh/datasize"
|
|
"github.com/ledgerwatch/turbo-geth/common"
|
|
"github.com/ledgerwatch/turbo-geth/ethdb"
|
|
)
|
|
|
|
const ShardLimit = 3 * datasize.KB
|
|
|
|
// AppendMergeByOr - appending delta to existing data in db, merge by Or
|
|
// Method maintains sharding - because some bitmaps are >1Mb and when new incoming blocks process it
|
|
// updates ~300 of bitmaps - by append small amount new values. It cause much big writes (LMDB does copy-on-write).
|
|
//
|
|
// if last existing shard size merge it with delta
|
|
// if serialized size of delta > ShardLimit - break down to multiple shards
|
|
// shard number - it's biggest value in bitmap
|
|
func AppendMergeByOr(c ethdb.Cursor, key []byte, delta *roaring.Bitmap) error {
|
|
lastShardKey := make([]byte, len(key)+4)
|
|
copy(lastShardKey, key)
|
|
binary.BigEndian.PutUint32(lastShardKey[len(lastShardKey)-4:], ^uint32(0))
|
|
|
|
currentLastV, seekErr := c.SeekExact(lastShardKey)
|
|
if seekErr != nil {
|
|
return seekErr
|
|
}
|
|
|
|
if currentLastV == nil { // no existing shards, then just create one
|
|
err := writeBitmapSharded(c, key, delta)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
last := roaring.New()
|
|
_, err := last.FromBuffer(currentLastV)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
delta = roaring.Or(delta, last)
|
|
|
|
err = writeBitmapSharded(c, key, delta)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// writeBitmapSharded - write bitmap to db, perform sharding if delta > ShardLimit
|
|
func writeBitmapSharded(c ethdb.Cursor, key []byte, delta *roaring.Bitmap) error {
|
|
shardKey := make([]byte, len(key)+4)
|
|
copy(shardKey, key)
|
|
sz := int(delta.GetSerializedSizeInBytes())
|
|
if sz <= int(ShardLimit) {
|
|
newV := bytes.NewBuffer(make([]byte, 0, delta.GetSerializedSizeInBytes()))
|
|
_, err := delta.WriteTo(newV)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
binary.BigEndian.PutUint32(shardKey[len(shardKey)-4:], ^uint32(0))
|
|
err = c.Put(common.CopyBytes(shardKey), newV.Bytes())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
shardsAmount := uint32(sz / int(ShardLimit))
|
|
if shardsAmount == 0 {
|
|
shardsAmount = 1
|
|
}
|
|
step := (delta.Maximum() - delta.Minimum()) / shardsAmount
|
|
step = step / 16
|
|
shard, tmp := roaring.New(), roaring.New() // shard will write to db, tmp will use to add data to shard
|
|
for delta.GetCardinality() > 0 {
|
|
from := uint64(delta.Minimum())
|
|
to := from + uint64(step)
|
|
tmp.Clear()
|
|
tmp.AddRange(from, to)
|
|
tmp.And(delta)
|
|
shard.Or(tmp)
|
|
shard.RunOptimize()
|
|
delta.RemoveRange(from, to)
|
|
if delta.GetCardinality() == 0 {
|
|
break
|
|
}
|
|
if shard.GetSerializedSizeInBytes() >= uint64(ShardLimit) {
|
|
newV := bytes.NewBuffer(make([]byte, 0, shard.GetSerializedSizeInBytes()))
|
|
_, err := shard.WriteTo(newV)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
binary.BigEndian.PutUint32(shardKey[len(shardKey)-4:], shard.Maximum())
|
|
|
|
err = c.Put(common.CopyBytes(shardKey), newV.Bytes())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
shard.Clear()
|
|
}
|
|
}
|
|
|
|
if shard.GetSerializedSizeInBytes() > 0 {
|
|
newV := bytes.NewBuffer(make([]byte, 0, shard.GetSerializedSizeInBytes()))
|
|
_, err := shard.WriteTo(newV)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
binary.BigEndian.PutUint32(shardKey[len(shardKey)-4:], ^uint32(0))
|
|
err = c.Put(common.CopyBytes(shardKey), newV.Bytes())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// TruncateRange - gets existing bitmap in db and call RemoveRange operator on it.
|
|
// starts from hot shard, stops when shard not overlap with [from-to)
|
|
// !Important: [from, to)
|
|
func TruncateRange(tx ethdb.Tx, bucket string, key []byte, from, to uint64) error {
|
|
shardKey := make([]byte, len(key)+4)
|
|
copy(shardKey, key)
|
|
binary.BigEndian.PutUint32(shardKey[len(shardKey)-4:], uint32(from))
|
|
c := tx.Cursor(bucket)
|
|
defer c.Close()
|
|
cForDelete := tx.Cursor(bucket) // use dedicated cursor for delete operation, but in near future will change to ETL
|
|
defer cForDelete.Close()
|
|
|
|
for k, v, err := c.Seek(shardKey); k != nil; k, v, err = c.Next() {
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if !bytes.HasPrefix(k, key) {
|
|
break
|
|
}
|
|
|
|
bm := roaring.New()
|
|
_, err := bm.FromBuffer(v)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
noReasonToCheckNextShard := (uint64(bm.Minimum()) <= from && uint64(bm.Maximum()) >= to) || binary.BigEndian.Uint32(k[len(k)-4:]) == ^uint32(0)
|
|
|
|
bm.RemoveRange(from, to)
|
|
if bm.GetCardinality() == 0 { // don't store empty bitmaps
|
|
err = cForDelete.Delete(k)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if noReasonToCheckNextShard {
|
|
break
|
|
}
|
|
continue
|
|
}
|
|
|
|
bm.RunOptimize()
|
|
newV := bytes.NewBuffer(make([]byte, 0, bm.GetSerializedSizeInBytes()))
|
|
_, err = bm.WriteTo(newV)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = c.Put(common.CopyBytes(k), newV.Bytes())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if noReasonToCheckNextShard {
|
|
break
|
|
}
|
|
}
|
|
|
|
// rename last shard
|
|
k, v, err := c.Current()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if k == nil { // if last shard was deleted, do 1 step back
|
|
k, v, err = c.Prev()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if binary.BigEndian.Uint32(k[len(k)-4:]) == ^uint32(0) { // nothing to return
|
|
return nil
|
|
}
|
|
if !bytes.HasPrefix(k, key) {
|
|
return nil
|
|
}
|
|
|
|
copyV := common.CopyBytes(v)
|
|
err = cForDelete.Delete(k)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
binary.BigEndian.PutUint32(shardKey[len(shardKey)-4:], ^uint32(0))
|
|
err = c.Put(shardKey, copyV)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Get - reading as much shards as needed to satisfy [from, to] condition
|
|
// join all shards to 1 bitmap by Or operator
|
|
func Get(c ethdb.Cursor, key []byte, from, to uint32) (*roaring.Bitmap, error) {
|
|
var shards []*roaring.Bitmap
|
|
|
|
fromKey := make([]byte, len(key)+4)
|
|
copy(fromKey, key)
|
|
binary.BigEndian.PutUint32(fromKey[len(fromKey)-4:], from)
|
|
for k, v, err := c.Seek(fromKey); k != nil; k, v, err = c.Next() {
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if !bytes.HasPrefix(k, key) {
|
|
break
|
|
}
|
|
|
|
bm := roaring.New()
|
|
_, err := bm.FromBuffer(v)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
shards = append(shards, bm)
|
|
|
|
if binary.BigEndian.Uint32(k[len(k)-4:]) >= to {
|
|
break
|
|
}
|
|
}
|
|
|
|
if len(shards) == 0 {
|
|
return roaring.New(), nil
|
|
}
|
|
return roaring.FastOr(shards...), nil
|
|
}
|