erigon-pulse/ethdb/mutation.go

431 lines
9.8 KiB
Go
Raw Normal View History

package ethdb
import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"strings"
"sync"
"sync/atomic"
2020-11-01 08:28:11 +00:00
"time"
"unsafe"
"github.com/c2h5oh/datasize"
"github.com/google/btree"
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/common/dbutils"
2020-11-01 08:28:11 +00:00
"github.com/ledgerwatch/turbo-geth/log"
"github.com/ledgerwatch/turbo-geth/metrics"
)
2020-07-21 08:33:03 +00:00
var (
dbCommitBigBatchTimer = metrics.NewRegisteredTimer("db/commit/big_batch", nil)
dbCommitSmallBatchTimer = metrics.NewRegisteredTimer("db/commit/small_batch", nil)
)
type mutation struct {
puts *btree.BTree
mu sync.RWMutex
searchItem MutationItem
size int
db Database
}
type MutationItem struct {
table string
key []byte
value []byte
}
func (mi *MutationItem) Less(than btree.Item) bool {
i := than.(*MutationItem)
c := strings.Compare(mi.table, i.table)
if c != 0 {
return c < 0
}
return bytes.Compare(mi.key, i.key) < 0
}
func (m *mutation) KV() KV {
if casted, ok := m.db.(HasKV); ok {
return casted.KV()
}
return nil
}
func (m *mutation) getMem(table string, key []byte) ([]byte, bool) {
m.mu.RLock()
defer m.mu.RUnlock()
m.searchItem.table = table
m.searchItem.key = key
i := m.puts.Get(&m.searchItem)
if i == nil {
return nil, false
}
return i.(*MutationItem).value, true
}
func (m *mutation) IncrementSequence(bucket string, amount uint64) (res uint64, err error) {
v, ok := m.getMem(dbutils.Sequence, []byte(bucket))
if !ok && m.db != nil {
v, err = m.db.Get(dbutils.Sequence, []byte(bucket))
if err != nil && !errors.Is(err, ErrKeyNotFound) {
return 0, err
}
}
var currentV uint64 = 0
if len(v) > 0 {
currentV = binary.BigEndian.Uint64(v)
}
newVBytes := make([]byte, 8)
binary.BigEndian.PutUint64(newVBytes, currentV+amount)
if err = m.Put(dbutils.Sequence, []byte(bucket), newVBytes); err != nil {
return 0, err
}
return currentV, nil
}
func (m *mutation) ReadSequence(bucket string) (res uint64, err error) {
v, ok := m.getMem(dbutils.Sequence, []byte(bucket))
if !ok && m.db != nil {
v, err = m.db.Get(dbutils.Sequence, []byte(bucket))
if err != nil && !errors.Is(err, ErrKeyNotFound) {
return 0, err
}
}
var currentV uint64 = 0
if len(v) > 0 {
currentV = binary.BigEndian.Uint64(v)
}
return currentV, nil
}
// Can only be called from the worker thread
func (m *mutation) Get(table string, key []byte) ([]byte, error) {
if value, ok := m.getMem(table, key); ok {
if value == nil {
return nil, ErrKeyNotFound
}
return value, nil
}
if m.db != nil {
return m.db.Get(table, key)
}
return nil, ErrKeyNotFound
}
func (m *mutation) Last(table string) ([]byte, []byte, error) {
return m.db.Last(table)
2020-08-12 03:49:52 +00:00
}
func (m *mutation) hasMem(table string, key []byte) bool {
m.mu.RLock()
defer m.mu.RUnlock()
m.searchItem.table = table
m.searchItem.key = key
return m.puts.Has(&m.searchItem)
}
func (m *mutation) Has(table string, key []byte) (bool, error) {
if m.hasMem(table, key) {
return true, nil
}
if m.db != nil {
return m.db.Has(table, key)
}
return false, nil
}
func (m *mutation) DiskSize(ctx context.Context) (common.StorageSize, error) {
if m.db == nil {
return 0, nil
}
2020-07-29 04:31:46 +00:00
sz, err := m.db.(HasStats).DiskSize(ctx)
if err != nil {
return 0, err
}
2020-07-29 04:31:46 +00:00
return common.StorageSize(sz), nil
}
func (m *mutation) Put(table string, key []byte, value []byte) error {
m.mu.Lock()
defer m.mu.Unlock()
newMi := &MutationItem{table: table, key: key, value: value}
i := m.puts.ReplaceOrInsert(newMi)
m.size += int(unsafe.Sizeof(newMi)) + len(key) + len(value)
if i != nil {
oldMi := i.(*MutationItem)
m.size -= (int(unsafe.Sizeof(oldMi)) + len(oldMi.key) + len(oldMi.value))
}
return nil
}
func (m *mutation) Append(table string, key []byte, value []byte) error {
return m.Put(table, key, value)
}
func (m *mutation) AppendDup(table string, key []byte, value []byte) error {
return m.Put(table, key, value)
}
func (m *mutation) MultiPut(tuples ...[]byte) (uint64, error) {
m.mu.Lock()
defer m.mu.Unlock()
l := len(tuples)
for i := 0; i < l; i += 3 {
newMi := &MutationItem{table: string(tuples[i]), key: tuples[i+1], value: tuples[i+2]}
i := m.puts.ReplaceOrInsert(newMi)
m.size += int(unsafe.Sizeof(newMi)) + len(newMi.key) + len(newMi.value)
if i != nil {
oldMi := i.(*MutationItem)
m.size -= (int(unsafe.Sizeof(oldMi)) + len(oldMi.key) + len(oldMi.value))
}
}
return 0, nil
}
func (m *mutation) BatchSize() int {
m.mu.RLock()
defer m.mu.RUnlock()
return m.size
}
// IdealBatchSize defines the size of the data batches should ideally add in one write.
func (m *mutation) IdealBatchSize() int {
return int(512 * datasize.MB)
}
// WARNING: Merged mem/DB walk is not implemented
func (m *mutation) Walk(table string, startkey []byte, fixedbits int, walker func([]byte, []byte) (bool, error)) error {
m.panicOnEmptyDB()
return m.db.Walk(table, startkey, fixedbits, walker)
}
func (m *mutation) Delete(table string, k, v []byte) error {
if v != nil {
ChangeSets dupsort (#1342) * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * aa * aa * aa * aa * aa * aa * aa * aa * aa * aa * aa * aa * aa * aa * aa * squash * squash * fix * fix * fix * fix * fix * fix * fix * fix * fix * fix * fix * history_early_stop * history_early_stop * vmConfig with ReadOnly false * auto_increment * auto_increment * rebase master Co-authored-by: Alexey Akhunov <akhounov@gmail.com>
2020-11-16 12:08:28 +00:00
return m.db.Delete(table, k, v) // TODO: mutation to support DupSort deletes
}
//m.puts.Delete(table, k)
return m.Put(table, k, nil)
}
bitmap indices for logs (#1124) * save progress * try now * don't create bloom inside rlpDecode * don't create bloom inside ApplyTransaction * clean * clean * clean * clean * clean * clean * clean * clean * rename method * print timings * print timings * print timings * sort before flush * fix err lint * clean * move tests to transactions * compressed version * up bound * up bound * more tests * more tests * more tests * more tests * better removal * clean * better performance of get/put methods * clean * clean * clean * clean * clean * clean * clean * clean * clean * clean * clean * clean * clean * optimize rpcdaemon * fix test * fix rpcdaemon * fix test * simplify * simplify * fix nil pointer * clean * revert some changes * add some logs * clean * try without optimize * clean * clean * clean * clean * try * move log_index to own stage * move log_index to own stage * integration add log_index stage * integration add log_index stage * clean * clean * print timing * remove duplicates at unwind * extract truncateBitmaps func * try detect * clean * clean * clean * clean * clean * clean * clean * clean * clean * clean * clean * clean * clean * clean * clean * clean * clean * clean * clean * clean * clean * clean * clean * clean * clean * clean * clean * clean * clean * clean * clean * clean * add blackList of topics * clean * clean * clean * clean * clean * clean * clean * clean * sharding 1 * sharded 2 * sharded 2 * sharded 2 * sharded 2 * sharded 2 * sharded 2 * sharded 2 * sharded 2 * sharded 2 * sharded 2 * sharded 2 * sharded 2 * sharded 2 * sharded 2 * sharded 2 * sharded 2 * sharded 3 * sharded 3 * sharded 3 * speedup things by putCurrent and putReserve * clean * optimize trim * clean * remove blacklist * add more info to err * ? * clean * clean * clean * clean * clean * working version * switch to cgo version of roaring bitmaps * clean * clean * clean * clean * more docs * clean * clean * fix logs bloom field * Fix debug_getModifiedAccountsByNumber * Try to fix crash * fix problem with "absent block" * fix problem with "absent block" * remove optimize method call * remove roaring iterator * fix problem with rebuild indicess * remove debug prints * tests for eth_getLogs involving topics * add tests for new stage, speparate topics into 2 buckets * version up * remove debug logs * remove debug logs * remove bloom filter implementation * Optimisation * Optimisatin not required, make rpctest lenient to geth errors * Lenient to geth failures Co-authored-by: Alexey Akhunov <akhounov@gmail.com>
2020-09-28 17:18:36 +00:00
func (m *mutation) CommitAndBegin(ctx context.Context) error {
_, err := m.Commit()
return err
}
bitmap indices for logs (#1124) * save progress * try now * don't create bloom inside rlpDecode * don't create bloom inside ApplyTransaction * clean * clean * clean * clean * clean * clean * clean * clean * rename method * print timings * print timings * print timings * sort before flush * fix err lint * clean * move tests to transactions * compressed version * up bound * up bound * more tests * more tests * more tests * more tests * better removal * clean * better performance of get/put methods * clean * clean * clean * clean * clean * clean * clean * clean * clean * clean * clean * clean * clean * optimize rpcdaemon * fix test * fix rpcdaemon * fix test * simplify * simplify * fix nil pointer * clean * revert some changes * add some logs * clean * try without optimize * clean * clean * clean * clean * try * move log_index to own stage * move log_index to own stage * integration add log_index stage * integration add log_index stage * clean * clean * print timing * remove duplicates at unwind * extract truncateBitmaps func * try detect * clean * clean * clean * clean * clean * clean * clean * clean * clean * clean * clean * clean * clean * clean * clean * clean * clean * clean * clean * clean * clean * clean * clean * clean * clean * clean * clean * clean * clean * clean * clean * clean * add blackList of topics * clean * clean * clean * clean * clean * clean * clean * clean * sharding 1 * sharded 2 * sharded 2 * sharded 2 * sharded 2 * sharded 2 * sharded 2 * sharded 2 * sharded 2 * sharded 2 * sharded 2 * sharded 2 * sharded 2 * sharded 2 * sharded 2 * sharded 2 * sharded 2 * sharded 3 * sharded 3 * sharded 3 * speedup things by putCurrent and putReserve * clean * optimize trim * clean * remove blacklist * add more info to err * ? * clean * clean * clean * clean * clean * working version * switch to cgo version of roaring bitmaps * clean * clean * clean * clean * more docs * clean * clean * fix logs bloom field * Fix debug_getModifiedAccountsByNumber * Try to fix crash * fix problem with "absent block" * fix problem with "absent block" * remove optimize method call * remove roaring iterator * fix problem with rebuild indicess * remove debug prints * tests for eth_getLogs involving topics * add tests for new stage, speparate topics into 2 buckets * version up * remove debug logs * remove debug logs * remove bloom filter implementation * Optimisation * Optimisatin not required, make rpctest lenient to geth errors * Lenient to geth failures Co-authored-by: Alexey Akhunov <akhounov@gmail.com>
2020-09-28 17:18:36 +00:00
func (m *mutation) RollbackAndBegin(ctx context.Context) error {
m.Rollback()
return nil
}
func (m *mutation) doCommit(tx Tx) error {
var prevTable string
var c Cursor
var innerErr error
var isEndOfBucket bool
2020-11-01 08:28:11 +00:00
logEvery := time.NewTicker(30 * time.Second)
defer logEvery.Stop()
count := 0
total := float64(m.puts.Len())
m.puts.Ascend(func(i btree.Item) bool {
mi := i.(*MutationItem)
if mi.table != prevTable {
if c != nil {
c.Close()
}
c = tx.Cursor(mi.table)
prevTable = mi.table
firstKey, _, err := c.Seek(mi.key)
if err != nil {
innerErr = err
return false
}
isEndOfBucket = firstKey == nil
}
if isEndOfBucket {
if len(mi.value) > 0 {
if err := c.Append(mi.key, mi.value); err != nil {
innerErr = err
return false
}
}
} else if len(mi.value) == 0 {
if err := c.Delete(mi.key, nil); err != nil {
innerErr = err
return false
}
} else {
if err := c.Put(mi.key, mi.value); err != nil {
innerErr = err
return false
}
}
2020-11-01 08:28:11 +00:00
count++
select {
default:
case <-logEvery.C:
progress := fmt.Sprintf("%.1fM/%.1fM", float64(count)/1_000_000, total/1_000_000)
log.Info("Write to db", "progress", progress, "current table", mi.table)
}
return true
})
return innerErr
}
func (m *mutation) Commit() (uint64, error) {
if m.db == nil {
return 0, nil
}
m.mu.Lock()
defer m.mu.Unlock()
if tx, ok := m.db.(HasTx); ok {
if err := m.doCommit(tx.Tx()); err != nil {
return 0, err
}
} else {
if err := m.db.(HasKV).KV().Update(context.Background(), func(tx Tx) error {
return m.doCommit(tx)
}); err != nil {
return 0, err
}
}
m.puts.Clear(false /* addNodesToFreelist */)
m.size = 0
return 0, nil
}
func (m *mutation) Rollback() {
m.mu.Lock()
defer m.mu.Unlock()
m.puts.Clear(false /* addNodesToFreelist */)
m.size = 0
}
func (m *mutation) Keys() ([][]byte, error) {
m.mu.RLock()
defer m.mu.RUnlock()
tuples := common.NewTuples(m.puts.Len(), 2, 1)
var innerErr error
m.puts.Ascend(func(i btree.Item) bool {
mi := i.(*MutationItem)
if err := tuples.Append([]byte(mi.table), mi.key); err != nil {
innerErr = err
return false
}
return true
})
return tuples.Values, innerErr
}
func (m *mutation) Close() {
m.Rollback()
}
func (m *mutation) NewBatch() DbWithPendingMutations {
mm := &mutation{
db: m,
puts: btree.New(32),
}
return mm
}
func (m *mutation) Begin(ctx context.Context, flags TxFlags) (DbWithPendingMutations, error) {
return m.db.Begin(ctx, flags)
}
func (m *mutation) panicOnEmptyDB() {
if m.db == nil {
panic("Not implemented")
}
}
func (m *mutation) MemCopy() Database {
m.panicOnEmptyDB()
return m.db
}
2020-10-28 03:18:10 +00:00
func (m *mutation) SetKV(kv KV) {
m.db.(HasKV).SetKV(kv)
}
// [TURBO-GETH] Freezer support (not implemented yet)
// Ancients returns an error as we don't have a backing chain freezer.
func (m *mutation) Ancients() (uint64, error) {
return 0, errNotSupported
}
// TruncateAncients returns an error as we don't have a backing chain freezer.
func (m *mutation) TruncateAncients(items uint64) error {
return errNotSupported
}
func NewRWDecorator(db Database) *RWCounterDecorator {
return &RWCounterDecorator{
db,
DBCounterStats{},
}
}
type RWCounterDecorator struct {
Database
DBCounterStats
}
type DBCounterStats struct {
Put uint64
Get uint64
GetS uint64
GetAsOf uint64
Has uint64
Walk uint64
WalkAsOf uint64
MultiWalkAsOf uint64
Delete uint64
MultiPut uint64
}
2020-08-10 23:55:32 +00:00
func (d *RWCounterDecorator) Put(bucket string, key, value []byte) error {
atomic.AddUint64(&d.DBCounterStats.Put, 1)
return d.Database.Put(bucket, key, value)
}
2020-08-10 23:55:32 +00:00
func (d *RWCounterDecorator) Get(bucket string, key []byte) ([]byte, error) {
atomic.AddUint64(&d.DBCounterStats.Get, 1)
return d.Database.Get(bucket, key)
}
2020-01-07 10:41:33 +00:00
2020-08-10 23:55:32 +00:00
func (d *RWCounterDecorator) Has(bucket string, key []byte) (bool, error) {
atomic.AddUint64(&d.DBCounterStats.Has, 1)
return d.Database.Has(bucket, key)
}
2020-08-10 23:55:32 +00:00
func (d *RWCounterDecorator) Walk(bucket string, startkey []byte, fixedbits int, walker func([]byte, []byte) (bool, error)) error {
atomic.AddUint64(&d.DBCounterStats.Walk, 1)
return d.Database.Walk(bucket, startkey, fixedbits, walker)
}
func (d *RWCounterDecorator) Delete(bucket string, k, v []byte) error {
atomic.AddUint64(&d.DBCounterStats.Delete, 1)
return d.Database.Delete(bucket, k, v)
}
func (d *RWCounterDecorator) MultiPut(tuples ...[]byte) (uint64, error) {
atomic.AddUint64(&d.DBCounterStats.MultiPut, 1)
return d.Database.MultiPut(tuples...)
}
func (d *RWCounterDecorator) NewBatch() DbWithPendingMutations {
mm := &mutation{
db: d,
puts: btree.New(32),
}
return mm
}