erigon-pulse/ethdb/bitmapdb/dbutils.go
2020-10-06 07:36:34 +01:00

252 lines
6.1 KiB
Go

package bitmapdb
import (
"bytes"
"encoding/binary"
"github.com/RoaringBitmap/roaring"
"github.com/c2h5oh/datasize"
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/ethdb"
)
const ShardLimit = 3 * datasize.KB
// AppendMergeByOr - appending delta to existing data in db, merge by Or
// Method maintains sharding - because some bitmaps are >1Mb and when new incoming blocks process it
// updates ~300 of bitmaps - by append small amount new values. It cause much big writes (LMDB does copy-on-write).
//
// if last existing shard size merge it with delta
// if serialized size of delta > ShardLimit - break down to multiple shards
// shard number - it's biggest value in bitmap
func AppendMergeByOr(c ethdb.Cursor, key []byte, delta *roaring.Bitmap) error {
lastShardKey := make([]byte, len(key)+4)
copy(lastShardKey, key)
binary.BigEndian.PutUint32(lastShardKey[len(lastShardKey)-4:], ^uint32(0))
currentLastV, seekErr := c.SeekExact(lastShardKey)
if seekErr != nil {
return seekErr
}
if currentLastV == nil { // no existing shards, then just create one
err := writeBitmapSharded(c, key, delta)
if err != nil {
return err
}
return nil
}
last := roaring.New()
_, err := last.FromBuffer(currentLastV)
if err != nil {
return err
}
delta = roaring.Or(delta, last)
err = writeBitmapSharded(c, key, delta)
if err != nil {
return err
}
return nil
}
// writeBitmapSharded - write bitmap to db, perform sharding if delta > ShardLimit
func writeBitmapSharded(c ethdb.Cursor, key []byte, delta *roaring.Bitmap) error {
shardKey := make([]byte, len(key)+4)
copy(shardKey, key)
sz := int(delta.GetSerializedSizeInBytes())
if sz <= int(ShardLimit) {
newV := bytes.NewBuffer(make([]byte, 0, delta.GetSerializedSizeInBytes()))
_, err := delta.WriteTo(newV)
if err != nil {
return err
}
binary.BigEndian.PutUint32(shardKey[len(shardKey)-4:], ^uint32(0))
err = c.Put(common.CopyBytes(shardKey), newV.Bytes())
if err != nil {
return err
}
return nil
}
shardsAmount := uint32(sz / int(ShardLimit))
if shardsAmount == 0 {
shardsAmount = 1
}
step := (delta.Maximum() - delta.Minimum()) / shardsAmount
step = step / 16
if step == 0 {
step = 1
}
shard, tmp := roaring.New(), roaring.New() // shard will write to db, tmp will use to add data to shard
for delta.GetCardinality() > 0 {
from := uint64(delta.Minimum())
to := from + uint64(step)
tmp.Clear()
tmp.AddRange(from, to)
tmp.And(delta)
shard.Or(tmp)
shard.RunOptimize()
delta.RemoveRange(from, to)
if delta.GetCardinality() == 0 {
break
}
if shard.GetSerializedSizeInBytes() >= uint64(ShardLimit) {
newV := bytes.NewBuffer(make([]byte, 0, shard.GetSerializedSizeInBytes()))
_, err := shard.WriteTo(newV)
if err != nil {
return err
}
binary.BigEndian.PutUint32(shardKey[len(shardKey)-4:], shard.Maximum())
err = c.Put(common.CopyBytes(shardKey), newV.Bytes())
if err != nil {
return err
}
shard.Clear()
}
}
if shard.GetSerializedSizeInBytes() > 0 {
newV := bytes.NewBuffer(make([]byte, 0, shard.GetSerializedSizeInBytes()))
_, err := shard.WriteTo(newV)
if err != nil {
return err
}
binary.BigEndian.PutUint32(shardKey[len(shardKey)-4:], ^uint32(0))
err = c.Put(common.CopyBytes(shardKey), newV.Bytes())
if err != nil {
return err
}
return nil
}
return nil
}
// TruncateRange - gets existing bitmap in db and call RemoveRange operator on it.
// starts from hot shard, stops when shard not overlap with [from-to)
// !Important: [from, to)
func TruncateRange(tx ethdb.Tx, bucket string, key []byte, from, to uint64) error {
shardKey := make([]byte, len(key)+4)
copy(shardKey, key)
binary.BigEndian.PutUint32(shardKey[len(shardKey)-4:], uint32(from))
c := tx.Cursor(bucket)
defer c.Close()
cForDelete := tx.Cursor(bucket) // use dedicated cursor for delete operation, but in near future will change to ETL
defer cForDelete.Close()
for k, v, err := c.Seek(shardKey); k != nil; k, v, err = c.Next() {
if err != nil {
return err
}
if !bytes.HasPrefix(k, key) {
break
}
bm := roaring.New()
_, err := bm.FromBuffer(v)
if err != nil {
return err
}
noReasonToCheckNextShard := (uint64(bm.Minimum()) <= from && uint64(bm.Maximum()) >= to) || binary.BigEndian.Uint32(k[len(k)-4:]) == ^uint32(0)
bm.RemoveRange(from, to)
if bm.GetCardinality() == 0 { // don't store empty bitmaps
err = cForDelete.Delete(k)
if err != nil {
return err
}
if noReasonToCheckNextShard {
break
}
continue
}
bm.RunOptimize()
newV := bytes.NewBuffer(make([]byte, 0, bm.GetSerializedSizeInBytes()))
_, err = bm.WriteTo(newV)
if err != nil {
return err
}
err = c.Put(common.CopyBytes(k), newV.Bytes())
if err != nil {
return err
}
if noReasonToCheckNextShard {
break
}
}
// rename last shard
k, v, err := c.Current()
if err != nil {
return err
}
if k == nil { // if last shard was deleted, do 1 step back
k, v, err = c.Prev()
if err != nil {
return err
}
}
if binary.BigEndian.Uint32(k[len(k)-4:]) == ^uint32(0) { // nothing to return
return nil
}
if !bytes.HasPrefix(k, key) {
return nil
}
copyV := common.CopyBytes(v)
err = cForDelete.Delete(k)
if err != nil {
return err
}
binary.BigEndian.PutUint32(shardKey[len(shardKey)-4:], ^uint32(0))
err = c.Put(shardKey, copyV)
if err != nil {
return err
}
return nil
}
// Get - reading as much shards as needed to satisfy [from, to] condition
// join all shards to 1 bitmap by Or operator
func Get(c ethdb.Cursor, key []byte, from, to uint32) (*roaring.Bitmap, error) {
var shards []*roaring.Bitmap
fromKey := make([]byte, len(key)+4)
copy(fromKey, key)
binary.BigEndian.PutUint32(fromKey[len(fromKey)-4:], from)
for k, v, err := c.Seek(fromKey); k != nil; k, v, err = c.Next() {
if err != nil {
return nil, err
}
if !bytes.HasPrefix(k, key) {
break
}
bm := roaring.New()
_, err := bm.FromBuffer(v)
if err != nil {
return nil, err
}
shards = append(shards, bm)
if binary.BigEndian.Uint32(k[len(k)-4:]) >= to {
break
}
}
if len(shards) == 0 {
return roaring.New(), nil
}
return roaring.FastOr(shards...), nil
}