move NewHashBatch to erigon-lib, remove oddb package (#8408)

This commit is contained in:
Alex Sharov 2023-10-09 15:47:54 +07:00 committed by GitHub
parent afd88edea8
commit 62395c767e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 66 additions and 1048 deletions

View File

@ -577,7 +577,7 @@ func generate6(_ kv.RwDB, tx kv.RwTx) (bool, error) {
}
func dropT(_ kv.RwDB, tx kv.RwTx) (bool, error) {
if err := tx.(kv.BucketMigrator).ClearBucket("t"); err != nil {
if err := tx.ClearBucket("t"); err != nil {
return false, err
}
return true, nil
@ -607,14 +607,14 @@ func generate7(_ kv.RwDB, tx kv.RwTx) (bool, error) {
}
func dropT1(_ kv.RwDB, tx kv.RwTx) (bool, error) {
if err := tx.(kv.BucketMigrator).ClearBucket("t1"); err != nil {
if err := tx.ClearBucket("t1"); err != nil {
return false, err
}
return true, nil
}
func dropT2(_ kv.RwDB, tx kv.RwTx) (bool, error) {
if err := tx.(kv.BucketMigrator).ClearBucket("t2"); err != nil {
if err := tx.ClearBucket("t2"); err != nil {
return false, err
}
return true, nil
@ -624,7 +624,7 @@ func dropT2(_ kv.RwDB, tx kv.RwTx) (bool, error) {
func generate8(_ kv.RwDB, tx kv.RwTx) (bool, error) {
for i := 0; i < 100; i++ {
k := fmt.Sprintf("table_%05d", i)
if err := tx.(kv.BucketMigrator).CreateBucket(k); err != nil {
if err := tx.CreateBucket(k); err != nil {
return false, err
}
}
@ -656,7 +656,7 @@ func generate9(tx kv.RwTx, entries int) error {
func dropAll(_ kv.RwDB, tx kv.RwTx) (bool, error) {
for i := 0; i < 100; i++ {
k := fmt.Sprintf("table_%05d", i)
if err := tx.(kv.BucketMigrator).DropBucket(k); err != nil {
if err := tx.DropBucket(k); err != nil {
return false, err
}
}

View File

@ -23,7 +23,7 @@ import (
"github.com/ledgerwatch/erigon/core/types/accounts"
)
func ReadAccount(db kv.Tx, addr libcommon.Address, acc *accounts.Account) (bool, error) {
func ReadAccount(db kv.Getter, addr libcommon.Address, acc *accounts.Account) (bool, error) {
enc, err := db.GetOne(kv.PlainState, addr[:])
if err != nil {
return false, err

View File

@ -303,8 +303,6 @@ type StatelessReadTx interface {
// Sequence changes become visible outside the current write transaction after it is committed, and discarded on abort.
// Starts from 0.
ReadSequence(table string) (uint64, error)
BucketSize(table string) (uint64, error)
}
type StatelessWriteTx interface {
@ -340,6 +338,16 @@ type StatelessRwTx interface {
StatelessWriteTx
}
// PendingMutations in-memory storage of changes
// Later they can either be flushed to the database or abandon
type PendingMutations interface {
StatelessRwTx
// Flush all in-memory data into `tx`
Flush(ctx context.Context, tx RwTx) error
Close()
BatchSize() int
}
// Tx
// WARNING:
// - Tx is not threadsafe and may only be used in the goroutine that created it
@ -397,6 +405,7 @@ type Tx interface {
// Pointer to the underlying C transaction handle (e.g. *C.MDBX_txn)
CHandle() unsafe.Pointer
BucketSize(table string) (uint64, error)
}
// RwTx

View File

@ -37,10 +37,7 @@ func TestBucketCRUD(t *testing.T) {
normalBucket := kv.ChaindataTables[15]
deprecatedBucket := kv.ChaindataDeprecatedTables[0]
migrator, ok := tx.(kv.BucketMigrator)
if !ok {
return
}
migrator := tx
// check thad buckets have unique DBI's
uniquness := map[kv.DBI]bool{}

View File

@ -16,7 +16,7 @@
//go:build !js
package olddb
package membatch
import (
"bytes"
@ -29,9 +29,6 @@ import (
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/memdb"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/ethdb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@ -207,42 +204,3 @@ func TestParallelPutGet(t *testing.T) {
}
pending.Wait()
}
var hexEntries = map[string]string{
"6b": "89c6",
"91": "c476",
"a8": "0a514e",
"bb": "7a",
"bd": "fe76",
"c0": "12",
}
var startKey = common.FromHex("a0")
var fixedBits = 3
var keysInRange = [][]byte{common.FromHex("a8"), common.FromHex("bb"), common.FromHex("bd")}
func TestWalk(t *testing.T) {
_, tx := memdb.NewTestTx(t)
for k, v := range hexEntries {
err := tx.Put(testBucket, common.FromHex(k), common.FromHex(v))
if err != nil {
t.Fatalf("put failed: %v", err)
}
}
var gotKeys [][]byte
c, err := tx.Cursor(testBucket)
if err != nil {
panic(err)
}
defer c.Close()
err = ethdb.Walk(c, startKey, fixedBits, func(key, val []byte) (bool, error) {
gotKeys = append(gotKeys, common.CopyBytes(key))
return true, nil
})
assert.NoError(t, err)
assert.Equal(t, keysInRange, gotKeys)
}

View File

@ -1,4 +1,4 @@
package olddb
package membatch
import (
"context"
@ -11,13 +11,11 @@ import (
"github.com/ledgerwatch/erigon-lib/etl"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/log/v3"
"github.com/ledgerwatch/erigon/ethdb"
)
type mapmutation struct {
type Mapmutation struct {
puts map[string]map[string][]byte // table -> key -> value ie. blocks -> hash -> blockBod
db kv.RwTx
db kv.Tx
quit <-chan struct{}
clean func()
mu sync.RWMutex
@ -32,10 +30,10 @@ type mapmutation struct {
// Common pattern:
//
// batch := db.NewBatch()
// defer batch.Rollback()
// defer batch.Close()
// ... some calculations on `batch`
// batch.Commit()
func NewHashBatch(tx kv.RwTx, quit <-chan struct{}, tmpdir string, logger log.Logger) *mapmutation {
func NewHashBatch(tx kv.Tx, quit <-chan struct{}, tmpdir string, logger log.Logger) *Mapmutation {
clean := func() {}
if quit == nil {
ch := make(chan struct{})
@ -43,7 +41,7 @@ func NewHashBatch(tx kv.RwTx, quit <-chan struct{}, tmpdir string, logger log.Lo
quit = ch
}
return &mapmutation{
return &Mapmutation{
db: tx,
puts: make(map[string]map[string][]byte),
quit: quit,
@ -53,14 +51,7 @@ func NewHashBatch(tx kv.RwTx, quit <-chan struct{}, tmpdir string, logger log.Lo
}
}
func (m *mapmutation) RwKV() kv.RwDB {
if casted, ok := m.db.(ethdb.HasRwKV); ok {
return casted.RwKV()
}
return nil
}
func (m *mapmutation) getMem(table string, key []byte) ([]byte, bool) {
func (m *Mapmutation) getMem(table string, key []byte) ([]byte, bool) {
m.mu.RLock()
defer m.mu.RUnlock()
if _, ok := m.puts[table]; !ok {
@ -73,7 +64,7 @@ func (m *mapmutation) getMem(table string, key []byte) ([]byte, bool) {
return nil, false
}
func (m *mapmutation) IncrementSequence(bucket string, amount uint64) (res uint64, err error) {
func (m *Mapmutation) IncrementSequence(bucket string, amount uint64) (res uint64, err error) {
v, ok := m.getMem(kv.Sequence, []byte(bucket))
if !ok && m.db != nil {
v, err = m.db.GetOne(kv.Sequence, []byte(bucket))
@ -95,7 +86,7 @@ func (m *mapmutation) IncrementSequence(bucket string, amount uint64) (res uint6
return currentV, nil
}
func (m *mapmutation) ReadSequence(bucket string) (res uint64, err error) {
func (m *Mapmutation) ReadSequence(bucket string) (res uint64, err error) {
v, ok := m.getMem(kv.Sequence, []byte(bucket))
if !ok && m.db != nil {
v, err = m.db.GetOne(kv.Sequence, []byte(bucket))
@ -112,7 +103,7 @@ func (m *mapmutation) ReadSequence(bucket string) (res uint64, err error) {
}
// Can only be called from the worker thread
func (m *mapmutation) GetOne(table string, key []byte) ([]byte, error) {
func (m *Mapmutation) GetOne(table string, key []byte) ([]byte, error) {
if value, ok := m.getMem(table, key); ok {
return value, nil
}
@ -127,21 +118,7 @@ func (m *mapmutation) GetOne(table string, key []byte) ([]byte, error) {
return nil, nil
}
// Can only be called from the worker thread
func (m *mapmutation) Get(table string, key []byte) ([]byte, error) {
value, err := m.GetOne(table, key)
if err != nil {
return nil, err
}
if value == nil {
return nil, ethdb.ErrKeyNotFound
}
return value, nil
}
func (m *mapmutation) Last(table string) ([]byte, []byte, error) {
func (m *Mapmutation) Last(table string) ([]byte, []byte, error) {
c, err := m.db.Cursor(table)
if err != nil {
return nil, nil, err
@ -150,7 +127,7 @@ func (m *mapmutation) Last(table string) ([]byte, []byte, error) {
return c.Last()
}
func (m *mapmutation) Has(table string, key []byte) (bool, error) {
func (m *Mapmutation) Has(table string, key []byte) (bool, error) {
if _, ok := m.getMem(table, key); ok {
return ok, nil
}
@ -161,7 +138,7 @@ func (m *mapmutation) Has(table string, key []byte) (bool, error) {
}
// puts a table key with a value and if the table is not found then it appends a table
func (m *mapmutation) Put(table string, k, v []byte) error {
func (m *Mapmutation) Put(table string, k, v []byte) error {
m.mu.Lock()
defer m.mu.Unlock()
if _, ok := m.puts[table]; !ok {
@ -183,40 +160,40 @@ func (m *mapmutation) Put(table string, k, v []byte) error {
return nil
}
func (m *mapmutation) Append(table string, key []byte, value []byte) error {
func (m *Mapmutation) Append(table string, key []byte, value []byte) error {
return m.Put(table, key, value)
}
func (m *mapmutation) AppendDup(table string, key []byte, value []byte) error {
func (m *Mapmutation) AppendDup(table string, key []byte, value []byte) error {
return m.Put(table, key, value)
}
func (m *mapmutation) BatchSize() int {
func (m *Mapmutation) BatchSize() int {
m.mu.RLock()
defer m.mu.RUnlock()
return m.size
}
func (m *mapmutation) ForEach(bucket string, fromPrefix []byte, walker func(k, v []byte) error) error {
func (m *Mapmutation) ForEach(bucket string, fromPrefix []byte, walker func(k, v []byte) error) error {
m.panicOnEmptyDB()
return m.db.ForEach(bucket, fromPrefix, walker)
}
func (m *mapmutation) ForPrefix(bucket string, prefix []byte, walker func(k, v []byte) error) error {
func (m *Mapmutation) ForPrefix(bucket string, prefix []byte, walker func(k, v []byte) error) error {
m.panicOnEmptyDB()
return m.db.ForPrefix(bucket, prefix, walker)
}
func (m *mapmutation) ForAmount(bucket string, prefix []byte, amount uint32, walker func(k, v []byte) error) error {
func (m *Mapmutation) ForAmount(bucket string, prefix []byte, amount uint32, walker func(k, v []byte) error) error {
m.panicOnEmptyDB()
return m.db.ForAmount(bucket, prefix, amount, walker)
}
func (m *mapmutation) Delete(table string, k []byte) error {
func (m *Mapmutation) Delete(table string, k []byte) error {
return m.Put(table, k, nil)
}
func (m *mapmutation) doCommit(tx kv.RwTx) error {
func (m *Mapmutation) doCommit(tx kv.RwTx) error {
logEvery := time.NewTicker(30 * time.Second)
defer logEvery.Stop()
count := 0
@ -235,7 +212,7 @@ func (m *mapmutation) doCommit(tx kv.RwTx) error {
tx.CollectMetrics()
}
}
if err := collector.Load(m.db, table, etl.IdentityLoadFunc, etl.TransformArgs{Quit: m.quit}); err != nil {
if err := collector.Load(tx, table, etl.IdentityLoadFunc, etl.TransformArgs{Quit: m.quit}); err != nil {
return err
}
}
@ -244,13 +221,13 @@ func (m *mapmutation) doCommit(tx kv.RwTx) error {
return nil
}
func (m *mapmutation) Commit() error {
func (m *Mapmutation) Flush(ctx context.Context, tx kv.RwTx) error {
if m.db == nil {
return nil
}
m.mu.Lock()
defer m.mu.Unlock()
if err := m.doCommit(m.db); err != nil {
if err := m.doCommit(tx); err != nil {
return err
}
@ -261,7 +238,7 @@ func (m *mapmutation) Commit() error {
return nil
}
func (m *mapmutation) Rollback() {
func (m *Mapmutation) Close() {
m.mu.Lock()
defer m.mu.Unlock()
m.puts = map[string]map[string][]byte{}
@ -270,25 +247,11 @@ func (m *mapmutation) Rollback() {
m.size = 0
m.clean()
}
func (m *Mapmutation) Commit() error { panic("not db txn, use .Flush method") }
func (m *Mapmutation) Rollback() { panic("not db txn, use .Close method") }
func (m *mapmutation) Close() {
m.Rollback()
}
func (m *mapmutation) Begin(ctx context.Context, flags ethdb.TxFlags) (ethdb.DbWithPendingMutations, error) {
panic("mutation can't start transaction, because doesn't own it")
}
func (m *mapmutation) panicOnEmptyDB() {
func (m *Mapmutation) panicOnEmptyDB() {
if m.db == nil {
panic("Not implemented")
}
}
func (m *mapmutation) SetRwKV(kv kv.RwDB) {
hasRwKV, ok := m.db.(ethdb.HasRwKV)
if !ok {
log.Warn("Failed to convert mapmutation type to HasRwKV interface")
}
hasRwKV.SetRwKV(kv)
}

View File

@ -41,7 +41,7 @@ type MemoryMutation struct {
// Common pattern:
//
// batch := NewMemoryBatch(db, tmpDir)
// defer batch.Rollback()
// defer batch.Close()
// ... some calculations on `batch`
// batch.Commit()
func NewMemoryBatch(tx kv.Tx, tmpDir string) *MemoryMutation {

View File

@ -10,6 +10,7 @@ import (
"time"
"github.com/c2h5oh/datasize"
"github.com/ledgerwatch/erigon-lib/kv/membatch"
"github.com/ledgerwatch/log/v3"
"golang.org/x/sync/errgroup"
@ -42,8 +43,6 @@ import (
"github.com/ledgerwatch/erigon/eth/ethconfig/estimate"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
trace_logger "github.com/ledgerwatch/erigon/eth/tracers/logger"
"github.com/ledgerwatch/erigon/ethdb"
"github.com/ledgerwatch/erigon/ethdb/olddb"
"github.com/ledgerwatch/erigon/ethdb/prune"
"github.com/ledgerwatch/erigon/turbo/services"
"github.com/ledgerwatch/erigon/turbo/shards"
@ -140,7 +139,7 @@ func StageExecuteBlocksCfg(
func executeBlock(
block *types.Block,
tx kv.RwTx,
batch ethdb.Database,
batch kv.StatelessRwTx,
cfg ExecuteBlockCfg,
vmConfig vm.Config, // emit copy, because will modify it
writeChangesets bool,
@ -206,7 +205,7 @@ func executeBlock(
}
func newStateReaderWriter(
batch ethdb.Database,
batch kv.StatelessRwTx,
tx kv.RwTx,
block *types.Block,
writeChangesets bool,
@ -415,12 +414,12 @@ func SpawnExecuteBlocksStage(s *StageState, u Unwinder, tx kv.RwTx, toBlock uint
var stoppedErr error
var batch ethdb.DbWithPendingMutations
var batch kv.PendingMutations
// state is stored through ethdb batches
batch = olddb.NewHashBatch(tx, quit, cfg.dirs.Tmp, logger)
batch = membatch.NewHashBatch(tx, quit, cfg.dirs.Tmp, logger)
// avoids stacking defers within the loop
defer func() {
batch.Rollback()
batch.Close()
}()
var readAhead chan uint64
@ -508,7 +507,7 @@ Loop:
if shouldUpdateProgress {
logger.Info("Committed State", "gas reached", currentStateGas, "gasTarget", gasState)
currentStateGas = 0
if err = batch.Commit(); err != nil {
if err = batch.Flush(ctx, tx); err != nil {
return err
}
if err = s.Update(tx, stageProgress); err != nil {
@ -525,7 +524,7 @@ Loop:
// TODO: This creates stacked up deferrals
defer tx.Rollback()
}
batch = olddb.NewHashBatch(tx, quit, cfg.dirs.Tmp, logger)
batch = membatch.NewHashBatch(tx, quit, cfg.dirs.Tmp, logger)
}
gas = gas + block.GasUsed()
@ -543,7 +542,7 @@ Loop:
if err = s.Update(batch, stageProgress); err != nil {
return err
}
if err = batch.Commit(); err != nil {
if err = batch.Flush(ctx, tx); err != nil {
return fmt.Errorf("batch commit: %w", err)
}
@ -647,7 +646,7 @@ func blocksReadAheadFunc(ctx context.Context, tx kv.Tx, cfg *ExecuteBlockCfg, bl
}
func logProgress(logPrefix string, prevBlock uint64, prevTime time.Time, currentBlock uint64, prevTx, currentTx uint64, gas uint64,
gasState float64, batch ethdb.DbWithPendingMutations, logger log.Logger) (uint64, uint64, time.Time) {
gasState float64, batch kv.PendingMutations, logger log.Logger) (uint64, uint64, time.Time) {
currentTime := time.Now()
interval := currentTime.Sub(prevTime)
speed := float64(currentBlock-prevBlock) / (float64(interval) / float64(time.Second))

View File

@ -10,6 +10,7 @@ import (
mapset "github.com/deckarep/golang-set/v2"
"github.com/holiman/uint256"
"github.com/ledgerwatch/erigon-lib/kv/membatch"
"github.com/ledgerwatch/log/v3"
"golang.org/x/net/context"
@ -17,7 +18,6 @@ import (
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/common/fixedgas"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/memdb"
types2 "github.com/ledgerwatch/erigon-lib/types"
"github.com/ledgerwatch/erigon/consensus"
@ -115,8 +115,11 @@ func SpawnMiningExecStage(s *StageState, tx kv.RwTx, cfg MiningExecCfg, quit <-c
} else {
yielded := mapset.NewSet[[32]byte]()
simulationTx := memdb.NewMemoryBatch(tx, cfg.tmpdir)
defer simulationTx.Rollback()
var simulationTx kv.StatelessRwTx
m := membatch.NewHashBatch(tx, quit, cfg.tmpdir, logger)
defer m.Close()
simulationTx = m
executionAt, err := s.ExecutionAt(tx)
if err != nil {
return err
@ -180,7 +183,7 @@ func getNextTransactions(
header *types.Header,
amount uint16,
executionAt uint64,
simulationTx *memdb.MemoryMutation,
simulationTx kv.StatelessRwTx,
alreadyYielded mapset.Set[[32]byte],
logger log.Logger,
) (types.TransactionsStream, int, error) {
@ -237,7 +240,7 @@ func getNextTransactions(
return types.NewTransactionsFixedOrder(txs), count, nil
}
func filterBadTransactions(transactions []types.Transaction, config chain.Config, blockNumber uint64, baseFee *big.Int, simulationTx *memdb.MemoryMutation, logger log.Logger) ([]types.Transaction, error) {
func filterBadTransactions(transactions []types.Transaction, config chain.Config, blockNumber uint64, baseFee *big.Int, simulationTx kv.StatelessRwTx, logger log.Logger) ([]types.Transaction, error) {
initialCnt := len(transactions)
var filtered []types.Transaction
gasBailout := false

View File

@ -17,7 +17,6 @@
package ethdb
import (
"context"
"errors"
"github.com/ledgerwatch/erigon-lib/kv"
@ -28,85 +27,6 @@ import (
// ErrKeyNotFound is returned when key isn't found in the database.
var ErrKeyNotFound = errors.New("db: key not found")
type TxFlags uint
const (
RW TxFlags = 0x00 // default
RO TxFlags = 0x02
)
// DBGetter wraps the database read operations.
type DBGetter interface {
kv.Getter
// Get returns the value for a given key if it's present.
Get(bucket string, key []byte) ([]byte, error)
}
// Database wraps all database operations. All methods are safe for concurrent use.
type Database interface {
DBGetter
kv.Putter
kv.Deleter
kv.Closer
Begin(ctx context.Context, flags TxFlags) (DbWithPendingMutations, error) // starts db transaction
Last(bucket string) ([]byte, []byte, error)
IncrementSequence(bucket string, amount uint64) (uint64, error)
ReadSequence(bucket string) (uint64, error)
RwKV() kv.RwDB
}
// MinDatabase is a minimalistic version of the Database interface.
type MinDatabase interface {
Get(bucket string, key []byte) ([]byte, error)
Put(table string, k, v []byte) error
Delete(table string, k []byte) error
}
// DbWithPendingMutations is an extended version of the Database,
// where all changes are first made in memory.
// Later they can either be committed to the database or rolled back.
type DbWithPendingMutations interface {
Database
// Commit - commits transaction (or flush data into underlying db object in case of `mutation`)
//
// Common pattern:
//
// tx := db.Begin()
// defer tx.Rollback()
// ... some calculations on `tx`
// tx.Commit()
//
Commit() error
Rollback()
BatchSize() int
}
type HasRwKV interface {
RwKV() kv.RwDB
SetRwKV(kv kv.RwDB)
}
type HasTx interface {
Tx() kv.Tx
}
type BucketsMigrator interface {
BucketExists(bucket string) (bool, error) // makes them empty
ClearBuckets(buckets ...string) error // makes them empty
DropBuckets(buckets ...string) error // drops them, use of them after drop will panic
}
func GetOneWrapper(dat []byte, err error) ([]byte, error) {
if err != nil {
return nil, err
}
if dat == nil {
return nil, ErrKeyNotFound
}
return dat, nil
}

View File

@ -1,345 +0,0 @@
package olddb
import (
"bytes"
"context"
"encoding/binary"
"fmt"
"strings"
"sync"
"time"
"unsafe"
"github.com/google/btree"
"github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/ethdb"
"github.com/ledgerwatch/log/v3"
)
type mutation struct {
puts *btree.BTree
db kv.RwTx
quit <-chan struct{}
clean func()
searchItem MutationItem
mu sync.RWMutex
size int
}
type MutationItem struct {
table string
key []byte
value []byte
}
// NewBatch - starts in-mem batch
//
// Common pattern:
//
// batch := db.NewBatch()
// defer batch.Rollback()
// ... some calculations on `batch`
// batch.Commit()
func NewBatch(tx kv.RwTx, quit <-chan struct{}) *mutation {
clean := func() {}
if quit == nil {
ch := make(chan struct{})
clean = func() { close(ch) }
quit = ch
}
return &mutation{
db: tx,
puts: btree.New(32),
quit: quit,
clean: clean,
}
}
func (mi *MutationItem) Less(than btree.Item) bool {
i, ok := than.(*MutationItem)
if !ok {
log.Warn("Failed to convert btree.Item to MutationItem pointer")
}
c := strings.Compare(mi.table, i.table)
if c != 0 {
return c < 0
}
return bytes.Compare(mi.key, i.key) < 0
}
func (m *mutation) ReadOnly() bool { return false }
func (m *mutation) RwKV() kv.RwDB {
if casted, ok := m.db.(ethdb.HasRwKV); ok {
return casted.RwKV()
}
return nil
}
func (m *mutation) getMem(table string, key []byte) ([]byte, bool) {
m.mu.RLock()
defer m.mu.RUnlock()
m.searchItem.table = table
m.searchItem.key = key
i := m.puts.Get(&m.searchItem)
if i == nil {
return nil, false
}
return i.(*MutationItem).value, true
}
func (m *mutation) IncrementSequence(bucket string, amount uint64) (res uint64, err error) {
v, ok := m.getMem(kv.Sequence, []byte(bucket))
if !ok && m.db != nil {
v, err = m.db.GetOne(kv.Sequence, []byte(bucket))
if err != nil {
return 0, err
}
}
var currentV uint64 = 0
if len(v) > 0 {
currentV = binary.BigEndian.Uint64(v)
}
newVBytes := make([]byte, 8)
binary.BigEndian.PutUint64(newVBytes, currentV+amount)
if err = m.Put(kv.Sequence, []byte(bucket), newVBytes); err != nil {
return 0, err
}
return currentV, nil
}
func (m *mutation) ReadSequence(bucket string) (res uint64, err error) {
v, ok := m.getMem(kv.Sequence, []byte(bucket))
if !ok && m.db != nil {
v, err = m.db.GetOne(kv.Sequence, []byte(bucket))
if err != nil {
return 0, err
}
}
var currentV uint64 = 0
if len(v) > 0 {
currentV = binary.BigEndian.Uint64(v)
}
return currentV, nil
}
// Can only be called from the worker thread
func (m *mutation) GetOne(table string, key []byte) ([]byte, error) {
if value, ok := m.getMem(table, key); ok {
if value == nil {
return nil, nil
}
return value, nil
}
if m.db != nil {
// TODO: simplify when tx can no longer be parent of mutation
value, err := m.db.GetOne(table, key)
if err != nil {
return nil, err
}
return value, nil
}
return nil, nil
}
// Can only be called from the worker thread
func (m *mutation) Get(table string, key []byte) ([]byte, error) {
value, err := m.GetOne(table, key)
if err != nil {
return nil, err
}
if value == nil {
return nil, ethdb.ErrKeyNotFound
}
return value, nil
}
func (m *mutation) Last(table string) ([]byte, []byte, error) {
c, err := m.db.Cursor(table)
if err != nil {
return nil, nil, err
}
defer c.Close()
return c.Last()
}
func (m *mutation) hasMem(table string, key []byte) bool {
m.mu.RLock()
defer m.mu.RUnlock()
m.searchItem.table = table
m.searchItem.key = key
return m.puts.Has(&m.searchItem)
}
func (m *mutation) Has(table string, key []byte) (bool, error) {
if m.hasMem(table, key) {
return true, nil
}
if m.db != nil {
return m.db.Has(table, key)
}
return false, nil
}
func (m *mutation) Put(table string, k, v []byte) error {
m.mu.Lock()
defer m.mu.Unlock()
newMi := &MutationItem{table: table, key: k, value: v}
i := m.puts.ReplaceOrInsert(newMi)
m.size += int(unsafe.Sizeof(newMi)) + len(k) + len(v)
if i != nil {
oldMi := i.(*MutationItem)
m.size -= int(unsafe.Sizeof(oldMi)) + len(oldMi.key) + len(oldMi.value)
}
return nil
}
func (m *mutation) Append(table string, key []byte, value []byte) error {
return m.Put(table, key, value)
}
func (m *mutation) AppendDup(table string, key []byte, value []byte) error {
return m.Put(table, key, value)
}
func (m *mutation) BatchSize() int {
m.mu.RLock()
defer m.mu.RUnlock()
return m.size
}
func (m *mutation) ForEach(bucket string, fromPrefix []byte, walker func(k, v []byte) error) error {
m.panicOnEmptyDB()
return m.db.ForEach(bucket, fromPrefix, walker)
}
func (m *mutation) ForPrefix(bucket string, prefix []byte, walker func(k, v []byte) error) error {
m.panicOnEmptyDB()
return m.db.ForPrefix(bucket, prefix, walker)
}
func (m *mutation) ForAmount(bucket string, prefix []byte, amount uint32, walker func(k, v []byte) error) error {
m.panicOnEmptyDB()
return m.db.ForAmount(bucket, prefix, amount, walker)
}
func (m *mutation) Delete(table string, k []byte) error {
//m.puts.Delete(table, k)
return m.Put(table, k, nil)
}
func (m *mutation) doCommit(tx kv.RwTx) error {
var prevTable string
var c kv.RwCursor
var innerErr error
var isEndOfBucket bool
logEvery := time.NewTicker(30 * time.Second)
defer logEvery.Stop()
count := 0
total := float64(m.puts.Len())
m.puts.Ascend(func(i btree.Item) bool {
mi := i.(*MutationItem)
if mi.table != prevTable {
if c != nil {
c.Close()
}
var err error
c, err = tx.RwCursor(mi.table)
if err != nil {
innerErr = err
return false
}
prevTable = mi.table
firstKey, _, err := c.Seek(mi.key)
if err != nil {
innerErr = err
return false
}
isEndOfBucket = firstKey == nil
}
if isEndOfBucket {
if len(mi.value) > 0 {
if err := c.Append(mi.key, mi.value); err != nil {
innerErr = err
return false
}
}
} else if len(mi.value) == 0 {
if err := c.Delete(mi.key); err != nil {
innerErr = err
return false
}
} else {
if err := c.Put(mi.key, mi.value); err != nil {
innerErr = err
return false
}
}
count++
select {
default:
case <-logEvery.C:
progress := fmt.Sprintf("%.1fM/%.1fM", float64(count)/1_000_000, total/1_000_000)
log.Info("Write to db", "progress", progress, "current table", mi.table)
tx.CollectMetrics()
case <-m.quit:
innerErr = common.ErrStopped
return false
}
return true
})
tx.CollectMetrics()
return innerErr
}
func (m *mutation) Commit() error {
if m.db == nil {
return nil
}
m.mu.Lock()
defer m.mu.Unlock()
if err := m.doCommit(m.db); err != nil {
return err
}
m.puts.Clear(false /* addNodesToFreelist */)
m.size = 0
m.clean()
return nil
}
func (m *mutation) Rollback() {
m.mu.Lock()
defer m.mu.Unlock()
m.puts.Clear(false /* addNodesToFreelist */)
m.size = 0
m.clean()
}
func (m *mutation) Close() {
m.Rollback()
}
func (m *mutation) Begin(ctx context.Context, flags ethdb.TxFlags) (ethdb.DbWithPendingMutations, error) {
panic("mutation can't start transaction, because doesn't own it")
}
func (m *mutation) panicOnEmptyDB() {
if m.db == nil {
panic("Not implemented")
}
}
func (m *mutation) SetRwKV(kv kv.RwDB) {
m.db.(ethdb.HasRwKV).SetRwKV(kv)
}

View File

@ -1,248 +0,0 @@
// Copyright 2014 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// Package ethdb defines the interfaces for an Ethereum data store.
package olddb
import (
"context"
"fmt"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/ethdb"
"github.com/ledgerwatch/log/v3"
)
// ObjectDatabase - is an object-style interface of DB accessing
type ObjectDatabase struct {
kv kv.RwDB
}
// NewObjectDatabase returns a AbstractDB wrapper.
// Deprecated
func NewObjectDatabase(kv kv.RwDB) *ObjectDatabase {
return &ObjectDatabase{
kv: kv,
}
}
// Put inserts or updates a single entry.
func (db *ObjectDatabase) Put(table string, k, v []byte) error {
err := db.kv.Update(context.Background(), func(tx kv.RwTx) error {
return tx.Put(table, k, v)
})
return err
}
// Append appends a single entry to the end of the bucket.
func (db *ObjectDatabase) Append(bucket string, key []byte, value []byte) error {
err := db.kv.Update(context.Background(), func(tx kv.RwTx) error {
c, err := tx.RwCursor(bucket)
if err != nil {
return err
}
return c.Append(key, value)
})
return err
}
// AppendDup appends a single entry to the end of the bucket.
func (db *ObjectDatabase) AppendDup(bucket string, key []byte, value []byte) error {
err := db.kv.Update(context.Background(), func(tx kv.RwTx) error {
c, err := tx.RwCursorDupSort(bucket)
if err != nil {
return err
}
return c.AppendDup(key, value)
})
return err
}
func (db *ObjectDatabase) Has(bucket string, key []byte) (bool, error) {
var has bool
err := db.kv.View(context.Background(), func(tx kv.Tx) error {
v, err := tx.GetOne(bucket, key)
if err != nil {
return err
}
has = v != nil
return nil
})
return has, err
}
func (db *ObjectDatabase) IncrementSequence(bucket string, amount uint64) (res uint64, err error) {
err = db.kv.Update(context.Background(), func(tx kv.RwTx) error {
res, err = tx.IncrementSequence(bucket, amount)
return err
})
return res, err
}
func (db *ObjectDatabase) ReadSequence(bucket string) (res uint64, err error) {
err = db.kv.View(context.Background(), func(tx kv.Tx) error {
res, err = tx.ReadSequence(bucket)
return err
})
return res, err
}
// Get returns the value for a given key if it's present.
func (db *ObjectDatabase) GetOne(bucket string, key []byte) ([]byte, error) {
var dat []byte
err := db.kv.View(context.Background(), func(tx kv.Tx) error {
v, err := tx.GetOne(bucket, key)
if err != nil {
return err
}
if v != nil {
dat = make([]byte, len(v))
copy(dat, v)
}
return nil
})
return dat, err
}
func (db *ObjectDatabase) Get(bucket string, key []byte) ([]byte, error) {
dat, err := db.GetOne(bucket, key)
return ethdb.GetOneWrapper(dat, err)
}
func (db *ObjectDatabase) Last(bucket string) ([]byte, []byte, error) {
var key, value []byte
if err := db.kv.View(context.Background(), func(tx kv.Tx) error {
c, err := tx.Cursor(bucket)
if err != nil {
return err
}
k, v, err := c.Last()
if err != nil {
return err
}
if k != nil {
key, value = common.CopyBytes(k), common.CopyBytes(v)
}
return nil
}); err != nil {
return nil, nil, err
}
return key, value, nil
}
func (db *ObjectDatabase) ForEach(bucket string, fromPrefix []byte, walker func(k, v []byte) error) error {
return db.kv.View(context.Background(), func(tx kv.Tx) error {
return tx.ForEach(bucket, fromPrefix, walker)
})
}
func (db *ObjectDatabase) ForAmount(bucket string, fromPrefix []byte, amount uint32, walker func(k, v []byte) error) error {
return db.kv.View(context.Background(), func(tx kv.Tx) error {
return tx.ForAmount(bucket, fromPrefix, amount, walker)
})
}
func (db *ObjectDatabase) ForPrefix(bucket string, prefix []byte, walker func(k, v []byte) error) error {
return db.kv.View(context.Background(), func(tx kv.Tx) error {
return tx.ForPrefix(bucket, prefix, walker)
})
}
// Delete deletes the key from the queue and database
func (db *ObjectDatabase) Delete(table string, k []byte) error {
// Execute the actual operation
err := db.kv.Update(context.Background(), func(tx kv.RwTx) error {
return tx.Delete(table, k)
})
return err
}
func (db *ObjectDatabase) BucketExists(name string) (bool, error) {
exists := false
if err := db.kv.View(context.Background(), func(tx kv.Tx) (err error) {
migrator, ok := tx.(kv.BucketMigrator)
if !ok {
return fmt.Errorf("%T doesn't implement ethdb.TxMigrator interface", db.kv)
}
exists, err = migrator.ExistsBucket(name)
if err != nil {
return err
}
return nil
}); err != nil {
return false, err
}
return exists, nil
}
func (db *ObjectDatabase) ClearBuckets(buckets ...string) error {
for i := range buckets {
name := buckets[i]
if err := db.kv.Update(context.Background(), func(tx kv.RwTx) error {
migrator, ok := tx.(kv.BucketMigrator)
if !ok {
return fmt.Errorf("%T doesn't implement ethdb.TxMigrator interface", db.kv)
}
if err := migrator.ClearBucket(name); err != nil {
return err
}
return nil
}); err != nil {
return err
}
}
return nil
}
func (db *ObjectDatabase) DropBuckets(buckets ...string) error {
for i := range buckets {
name := buckets[i]
log.Info("Dropping bucket", "name", name)
if err := db.kv.Update(context.Background(), func(tx kv.RwTx) error {
migrator, ok := tx.(kv.BucketMigrator)
if !ok {
return fmt.Errorf("%T doesn't implement ethdb.TxMigrator interface", db.kv)
}
if err := migrator.DropBucket(name); err != nil {
return err
}
return nil
}); err != nil {
return err
}
}
return nil
}
func (db *ObjectDatabase) Close() {
db.kv.Close()
}
func (db *ObjectDatabase) RwKV() kv.RwDB {
return db.kv
}
func (db *ObjectDatabase) SetRwKV(kv kv.RwDB) {
db.kv = kv
}
func (db *ObjectDatabase) Begin(ctx context.Context, flags ethdb.TxFlags) (ethdb.DbWithPendingMutations, error) {
batch := &TxDb{db: db}
if err := batch.begin(ctx, flags); err != nil {
return batch, err
}
return batch, nil
}

View File

@ -1,238 +0,0 @@
package olddb
import (
"context"
"fmt"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/ethdb"
"github.com/ledgerwatch/log/v3"
)
// TxDb - provides Database interface around ethdb.Tx
// It's not thread-safe!
// TxDb not usable after .Commit()/.Rollback() call, but usable after .CommitAndBegin() call
// you can put unlimited amount of data into this class
// Walk and MultiWalk methods - work outside of Tx object yet, will implement it later
// Deprecated
// nolint
type TxDb struct {
db ethdb.Database
tx kv.Tx
cursors map[string]kv.Cursor
txFlags ethdb.TxFlags
len uint64
}
// nolint
func WrapIntoTxDB(tx kv.RwTx) *TxDb {
return &TxDb{tx: tx, cursors: map[string]kv.Cursor{}}
}
func (m *TxDb) Close() {
panic("don't call me")
}
func (m *TxDb) Begin(ctx context.Context, flags ethdb.TxFlags) (ethdb.DbWithPendingMutations, error) {
batch := m
if m.tx != nil {
panic("nested transactions not supported")
}
if err := batch.begin(ctx, flags); err != nil {
return nil, err
}
return batch, nil
}
func (m *TxDb) cursor(bucket string) (kv.Cursor, error) {
c, ok := m.cursors[bucket]
if !ok {
var err error
c, err = m.tx.Cursor(bucket)
if err != nil {
return nil, err
}
m.cursors[bucket] = c
}
return c, nil
}
func (m *TxDb) IncrementSequence(bucket string, amount uint64) (res uint64, err error) {
return m.tx.(kv.RwTx).IncrementSequence(bucket, amount)
}
func (m *TxDb) ReadSequence(bucket string) (res uint64, err error) {
return m.tx.ReadSequence(bucket)
}
func (m *TxDb) Put(table string, k, v []byte) error {
m.len += uint64(len(k) + len(v))
c, err := m.cursor(table)
if err != nil {
return err
}
return c.(kv.RwCursor).Put(k, v)
}
func (m *TxDb) Append(bucket string, key []byte, value []byte) error {
m.len += uint64(len(key) + len(value))
c, err := m.cursor(bucket)
if err != nil {
return err
}
return c.(kv.RwCursor).Append(key, value)
}
func (m *TxDb) AppendDup(bucket string, key []byte, value []byte) error {
m.len += uint64(len(key) + len(value))
c, err := m.cursor(bucket)
if err != nil {
return err
}
return c.(kv.RwCursorDupSort).AppendDup(key, value)
}
func (m *TxDb) Delete(table string, k []byte) error {
m.len += uint64(len(k))
c, err := m.cursor(table)
if err != nil {
return err
}
return c.(kv.RwCursor).Delete(k)
}
func (m *TxDb) begin(ctx context.Context, flags ethdb.TxFlags) error {
db := m.db.(ethdb.HasRwKV).RwKV()
var tx kv.Tx
var err error
if flags&ethdb.RO != 0 {
tx, err = db.BeginRo(ctx)
} else {
tx, err = db.BeginRw(ctx)
}
if err != nil {
return err
}
m.tx = tx
m.cursors = make(map[string]kv.Cursor, 16)
return nil
}
func (m *TxDb) RwKV() kv.RwDB {
panic("not allowed to get KV interface because you will loose transaction, please use .Tx() method")
}
// Last can only be called from the transaction thread
func (m *TxDb) Last(bucket string) ([]byte, []byte, error) {
c, err := m.cursor(bucket)
if err != nil {
return []byte{}, nil, err
}
return c.Last()
}
func (m *TxDb) GetOne(bucket string, key []byte) ([]byte, error) {
c, err := m.cursor(bucket)
if err != nil {
return nil, err
}
_, v, err := c.SeekExact(key)
return v, err
}
func (m *TxDb) Get(bucket string, key []byte) ([]byte, error) {
dat, err := m.GetOne(bucket, key)
return ethdb.GetOneWrapper(dat, err)
}
func (m *TxDb) Has(bucket string, key []byte) (bool, error) {
v, err := m.Get(bucket, key)
if err != nil {
return false, err
}
return v != nil, nil
}
func (m *TxDb) BatchSize() int {
return int(m.len)
}
func (m *TxDb) ForEach(bucket string, fromPrefix []byte, walker func(k, v []byte) error) error {
return m.tx.ForEach(bucket, fromPrefix, walker)
}
func (m *TxDb) ForPrefix(bucket string, prefix []byte, walker func(k, v []byte) error) error {
return m.tx.ForPrefix(bucket, prefix, walker)
}
func (m *TxDb) ForAmount(bucket string, prefix []byte, amount uint32, walker func(k, v []byte) error) error {
return m.tx.ForAmount(bucket, prefix, amount, walker)
}
func (m *TxDb) Commit() error {
if m.tx == nil {
return fmt.Errorf("second call .Commit() on same transaction")
}
if err := m.tx.Commit(); err != nil {
return err
}
m.tx = nil
m.cursors = nil
m.len = 0
return nil
}
func (m *TxDb) Rollback() {
if m.tx == nil {
return
}
m.tx.Rollback()
m.cursors = nil
m.tx = nil
m.len = 0
}
func (m *TxDb) Tx() kv.Tx {
return m.tx
}
func (m *TxDb) BucketExists(name string) (bool, error) {
migrator, ok := m.tx.(kv.BucketMigrator)
if !ok {
return false, fmt.Errorf("%T doesn't implement ethdb.TxMigrator interface", m.tx)
}
return migrator.ExistsBucket(name)
}
func (m *TxDb) ClearBuckets(buckets ...string) error {
for i := range buckets {
name := buckets[i]
migrator, ok := m.tx.(kv.BucketMigrator)
if !ok {
return fmt.Errorf("%T doesn't implement ethdb.TxMigrator interface", m.tx)
}
if err := migrator.ClearBucket(name); err != nil {
return err
}
}
return nil
}
func (m *TxDb) DropBuckets(buckets ...string) error {
for i := range buckets {
name := buckets[i]
log.Info("Dropping bucket", "name", name)
migrator, ok := m.tx.(kv.BucketMigrator)
if !ok {
return fmt.Errorf("%T doesn't implement ethdb.TxMigrator interface", m.tx)
}
if err := migrator.DropBucket(name); err != nil {
return err
}
}
return nil
}