mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2025-01-03 09:37:38 +00:00
Added Flush function to memory mutation (#4439)
This commit is contained in:
parent
eb497372ae
commit
1f36d76e09
@ -24,7 +24,7 @@ import (
|
|||||||
"github.com/ledgerwatch/erigon/ethdb"
|
"github.com/ledgerwatch/erigon/ethdb"
|
||||||
)
|
)
|
||||||
|
|
||||||
type miningmutation struct {
|
type memorymutation struct {
|
||||||
// Bucket => Key => Value
|
// Bucket => Key => Value
|
||||||
memTx kv.RwTx
|
memTx kv.RwTx
|
||||||
memDb kv.RwDB
|
memDb kv.RwDB
|
||||||
@ -42,13 +42,13 @@ type miningmutation struct {
|
|||||||
// defer batch.Rollback()
|
// defer batch.Rollback()
|
||||||
// ... some calculations on `batch`
|
// ... some calculations on `batch`
|
||||||
// batch.Commit()
|
// batch.Commit()
|
||||||
func NewMiningBatch(tx kv.Tx) *miningmutation {
|
func NewMemoryBatch(tx kv.Tx) *memorymutation {
|
||||||
tmpDB := mdbx.NewMDBX(log.New()).InMem().MustOpen()
|
tmpDB := mdbx.NewMDBX(log.New()).InMem().MustOpen()
|
||||||
memTx, err := tmpDB.BeginRw(context.Background())
|
memTx, err := tmpDB.BeginRw(context.Background())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
return &miningmutation{
|
return &memorymutation{
|
||||||
db: tx,
|
db: tx,
|
||||||
memDb: tmpDB,
|
memDb: tmpDB,
|
||||||
memTx: memTx,
|
memTx: memTx,
|
||||||
@ -62,19 +62,19 @@ func NewMiningBatch(tx kv.Tx) *miningmutation {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *miningmutation) RwKV() kv.RwDB {
|
func (m *memorymutation) RwKV() kv.RwDB {
|
||||||
if casted, ok := m.db.(ethdb.HasRwKV); ok {
|
if casted, ok := m.db.(ethdb.HasRwKV); ok {
|
||||||
return casted.RwKV()
|
return casted.RwKV()
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *miningmutation) isTableCleared(table string) bool {
|
func (m *memorymutation) isTableCleared(table string) bool {
|
||||||
_, ok := m.clearedTables[table]
|
_, ok := m.clearedTables[table]
|
||||||
return ok
|
return ok
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *miningmutation) isEntryDeleted(table string, key []byte) bool {
|
func (m *memorymutation) isEntryDeleted(table string, key []byte) bool {
|
||||||
_, ok := m.deletedEntries[table]
|
_, ok := m.deletedEntries[table]
|
||||||
if !ok {
|
if !ok {
|
||||||
return ok
|
return ok
|
||||||
@ -84,7 +84,7 @@ func (m *miningmutation) isEntryDeleted(table string, key []byte) bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// getMem Retrieve database entry from memory (hashed storage will be left out for now because it is the only non auto-DupSorted table)
|
// getMem Retrieve database entry from memory (hashed storage will be left out for now because it is the only non auto-DupSorted table)
|
||||||
func (m *miningmutation) getMem(table string, key []byte) ([]byte, bool) {
|
func (m *memorymutation) getMem(table string, key []byte) ([]byte, bool) {
|
||||||
val, err := m.memTx.GetOne(table, key)
|
val, err := m.memTx.GetOne(table, key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
@ -92,10 +92,10 @@ func (m *miningmutation) getMem(table string, key []byte) ([]byte, bool) {
|
|||||||
return val, val != nil
|
return val, val != nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *miningmutation) DBSize() (uint64, error) { panic("not implemented") }
|
func (m *memorymutation) DBSize() (uint64, error) { panic("not implemented") }
|
||||||
func (m *miningmutation) PageSize() uint64 { panic("not implemented") }
|
func (m *memorymutation) PageSize() uint64 { panic("not implemented") }
|
||||||
|
|
||||||
func (m *miningmutation) IncrementSequence(bucket string, amount uint64) (res uint64, err error) {
|
func (m *memorymutation) IncrementSequence(bucket string, amount uint64) (res uint64, err error) {
|
||||||
v, ok := m.getMem(kv.Sequence, []byte(bucket))
|
v, ok := m.getMem(kv.Sequence, []byte(bucket))
|
||||||
if !ok && m.db != nil {
|
if !ok && m.db != nil {
|
||||||
v, err = m.db.GetOne(kv.Sequence, []byte(bucket))
|
v, err = m.db.GetOne(kv.Sequence, []byte(bucket))
|
||||||
@ -118,7 +118,7 @@ func (m *miningmutation) IncrementSequence(bucket string, amount uint64) (res ui
|
|||||||
return currentV, nil
|
return currentV, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *miningmutation) ReadSequence(bucket string) (res uint64, err error) {
|
func (m *memorymutation) ReadSequence(bucket string) (res uint64, err error) {
|
||||||
v, ok := m.getMem(kv.Sequence, []byte(bucket))
|
v, ok := m.getMem(kv.Sequence, []byte(bucket))
|
||||||
if !ok && m.db != nil {
|
if !ok && m.db != nil {
|
||||||
v, err = m.db.GetOne(kv.Sequence, []byte(bucket))
|
v, err = m.db.GetOne(kv.Sequence, []byte(bucket))
|
||||||
@ -135,7 +135,7 @@ func (m *miningmutation) ReadSequence(bucket string) (res uint64, err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Can only be called from the worker thread
|
// Can only be called from the worker thread
|
||||||
func (m *miningmutation) GetOne(table string, key []byte) ([]byte, error) {
|
func (m *memorymutation) GetOne(table string, key []byte) ([]byte, error) {
|
||||||
if value, ok := m.getMem(table, key); ok {
|
if value, ok := m.getMem(table, key); ok {
|
||||||
if value == nil {
|
if value == nil {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
@ -154,7 +154,7 @@ func (m *miningmutation) GetOne(table string, key []byte) ([]byte, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Can only be called from the worker thread
|
// Can only be called from the worker thread
|
||||||
func (m *miningmutation) Get(table string, key []byte) ([]byte, error) {
|
func (m *memorymutation) Get(table string, key []byte) ([]byte, error) {
|
||||||
value, err := m.GetOne(table, key)
|
value, err := m.GetOne(table, key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -167,12 +167,12 @@ func (m *miningmutation) Get(table string, key []byte) ([]byte, error) {
|
|||||||
return value, nil
|
return value, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *miningmutation) Last(table string) ([]byte, []byte, error) {
|
func (m *memorymutation) Last(table string) ([]byte, []byte, error) {
|
||||||
panic("not implemented. (miningmutation.Last)")
|
panic("not implemented. (memorymutation.Last)")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Has return whether a key is present in a certain table.
|
// Has return whether a key is present in a certain table.
|
||||||
func (m *miningmutation) Has(table string, key []byte) (bool, error) {
|
func (m *memorymutation) Has(table string, key []byte) (bool, error) {
|
||||||
if _, ok := m.getMem(table, key); ok {
|
if _, ok := m.getMem(table, key); ok {
|
||||||
return ok, nil
|
return ok, nil
|
||||||
}
|
}
|
||||||
@ -183,38 +183,38 @@ func (m *miningmutation) Has(table string, key []byte) (bool, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Put insert a new entry in the database, if it is hashed storage it will add it to a slice instead of a map.
|
// Put insert a new entry in the database, if it is hashed storage it will add it to a slice instead of a map.
|
||||||
func (m *miningmutation) Put(table string, key []byte, value []byte) error {
|
func (m *memorymutation) Put(table string, key []byte, value []byte) error {
|
||||||
return m.memTx.Put(table, key, value)
|
return m.memTx.Put(table, key, value)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *miningmutation) Append(table string, key []byte, value []byte) error {
|
func (m *memorymutation) Append(table string, key []byte, value []byte) error {
|
||||||
return m.Put(table, key, value)
|
return m.Put(table, key, value)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *miningmutation) AppendDup(table string, key []byte, value []byte) error {
|
func (m *memorymutation) AppendDup(table string, key []byte, value []byte) error {
|
||||||
return m.Put(table, key, value)
|
return m.Put(table, key, value)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *miningmutation) BatchSize() int {
|
func (m *memorymutation) BatchSize() int {
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *miningmutation) ForEach(bucket string, fromPrefix []byte, walker func(k, v []byte) error) error {
|
func (m *memorymutation) ForEach(bucket string, fromPrefix []byte, walker func(k, v []byte) error) error {
|
||||||
m.panicOnEmptyDB()
|
m.panicOnEmptyDB()
|
||||||
return m.db.ForEach(bucket, fromPrefix, walker)
|
return m.db.ForEach(bucket, fromPrefix, walker)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *miningmutation) ForPrefix(bucket string, prefix []byte, walker func(k, v []byte) error) error {
|
func (m *memorymutation) ForPrefix(bucket string, prefix []byte, walker func(k, v []byte) error) error {
|
||||||
m.panicOnEmptyDB()
|
m.panicOnEmptyDB()
|
||||||
return m.db.ForPrefix(bucket, prefix, walker)
|
return m.db.ForPrefix(bucket, prefix, walker)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *miningmutation) ForAmount(bucket string, prefix []byte, amount uint32, walker func(k, v []byte) error) error {
|
func (m *memorymutation) ForAmount(bucket string, prefix []byte, amount uint32, walker func(k, v []byte) error) error {
|
||||||
m.panicOnEmptyDB()
|
m.panicOnEmptyDB()
|
||||||
return m.db.ForAmount(bucket, prefix, amount, walker)
|
return m.db.ForAmount(bucket, prefix, amount, walker)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *miningmutation) Delete(table string, k, v []byte) error {
|
func (m *memorymutation) Delete(table string, k, v []byte) error {
|
||||||
if _, ok := m.deletedEntries[table]; !ok {
|
if _, ok := m.deletedEntries[table]; !ok {
|
||||||
m.deletedEntries[table] = make(map[string]struct{})
|
m.deletedEntries[table] = make(map[string]struct{})
|
||||||
}
|
}
|
||||||
@ -222,70 +222,116 @@ func (m *miningmutation) Delete(table string, k, v []byte) error {
|
|||||||
return m.memTx.Delete(table, k, v)
|
return m.memTx.Delete(table, k, v)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *miningmutation) Commit() error {
|
func (m *memorymutation) Commit() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *miningmutation) Rollback() {
|
func (m *memorymutation) Rollback() {
|
||||||
m.memTx.Rollback()
|
m.memTx.Rollback()
|
||||||
m.memDb.Close()
|
m.memDb.Close()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *miningmutation) Close() {
|
func (m *memorymutation) Close() {
|
||||||
m.Rollback()
|
m.Rollback()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *miningmutation) Begin(ctx context.Context, flags ethdb.TxFlags) (ethdb.DbWithPendingMutations, error) {
|
func (m *memorymutation) Begin(ctx context.Context, flags ethdb.TxFlags) (ethdb.DbWithPendingMutations, error) {
|
||||||
panic("mutation can't start transaction, because doesn't own it")
|
panic("mutation can't start transaction, because doesn't own it")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *miningmutation) panicOnEmptyDB() {
|
func (m *memorymutation) panicOnEmptyDB() {
|
||||||
if m.db == nil {
|
if m.db == nil {
|
||||||
panic("Not implemented")
|
panic("Not implemented")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *miningmutation) SetRwKV(kv kv.RwDB) {
|
func (m *memorymutation) SetRwKV(kv kv.RwDB) {
|
||||||
m.db.(ethdb.HasRwKV).SetRwKV(kv)
|
m.db.(ethdb.HasRwKV).SetRwKV(kv)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *miningmutation) BucketSize(bucket string) (uint64, error) {
|
func (m *memorymutation) BucketSize(bucket string) (uint64, error) {
|
||||||
return 0, nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *miningmutation) DropBucket(bucket string) error {
|
func (m *memorymutation) DropBucket(bucket string) error {
|
||||||
panic("Not implemented")
|
panic("Not implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *miningmutation) ExistsBucket(bucket string) (bool, error) {
|
func (m *memorymutation) ExistsBucket(bucket string) (bool, error) {
|
||||||
panic("Not implemented")
|
panic("Not implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *miningmutation) ListBuckets() ([]string, error) {
|
func (m *memorymutation) ListBuckets() ([]string, error) {
|
||||||
panic("Not implemented")
|
panic("Not implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *miningmutation) ClearBucket(bucket string) error {
|
func (m *memorymutation) ClearBucket(bucket string) error {
|
||||||
m.clearedTables[bucket] = struct{}{}
|
m.clearedTables[bucket] = struct{}{}
|
||||||
return m.memTx.ClearBucket(bucket)
|
return m.memTx.ClearBucket(bucket)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *miningmutation) isBucketCleared(bucket string) bool {
|
func (m *memorymutation) isBucketCleared(bucket string) bool {
|
||||||
_, ok := m.clearedTables[bucket]
|
_, ok := m.clearedTables[bucket]
|
||||||
return ok
|
return ok
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *miningmutation) CollectMetrics() {
|
func (m *memorymutation) CollectMetrics() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *miningmutation) CreateBucket(bucket string) error {
|
func (m *memorymutation) CreateBucket(bucket string) error {
|
||||||
panic("Not implemented")
|
panic("Not implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *memorymutation) Flush(tx kv.RwTx) error {
|
||||||
|
// Obtain buckets touched.
|
||||||
|
buckets, err := m.memTx.ListBuckets()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// Iterate over each bucket and apply changes accordingly.
|
||||||
|
for _, bucket := range buckets {
|
||||||
|
if _, ok := m.dupsortTables[bucket]; ok && bucket != kv.HashedStorage {
|
||||||
|
cbucket, err := m.memTx.CursorDupSort(bucket)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer cbucket.Close()
|
||||||
|
dbCursor, err := tx.RwCursorDupSort(bucket)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer dbCursor.Close()
|
||||||
|
for k, v, err := cbucket.First(); k != nil; k, v, err = cbucket.Next() {
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := dbCursor.AppendDup(k, v); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
cbucket, err := m.memTx.Cursor(bucket)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer cbucket.Close()
|
||||||
|
for k, v, err := cbucket.First(); k != nil; k, v, err = cbucket.Next() {
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := tx.Put(bucket, k, v); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Cursor creates a new cursor (the real fun begins here)
|
// Cursor creates a new cursor (the real fun begins here)
|
||||||
func (m *miningmutation) makeCursor(bucket string) (kv.RwCursorDupSort, error) {
|
func (m *memorymutation) makeCursor(bucket string) (kv.RwCursorDupSort, error) {
|
||||||
c := &miningmutationcursor{}
|
c := &memorymutationcursor{}
|
||||||
// We can filter duplicates in dup sorted table
|
// We can filter duplicates in dup sorted table
|
||||||
c.table = bucket
|
c.table = bucket
|
||||||
|
|
||||||
@ -309,26 +355,26 @@ func (m *miningmutation) makeCursor(bucket string) (kv.RwCursorDupSort, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Cursor creates a new cursor (the real fun begins here)
|
// Cursor creates a new cursor (the real fun begins here)
|
||||||
func (m *miningmutation) RwCursorDupSort(bucket string) (kv.RwCursorDupSort, error) {
|
func (m *memorymutation) RwCursorDupSort(bucket string) (kv.RwCursorDupSort, error) {
|
||||||
return m.makeCursor(bucket)
|
return m.makeCursor(bucket)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Cursor creates a new cursor (the real fun begins here)
|
// Cursor creates a new cursor (the real fun begins here)
|
||||||
func (m *miningmutation) RwCursor(bucket string) (kv.RwCursor, error) {
|
func (m *memorymutation) RwCursor(bucket string) (kv.RwCursor, error) {
|
||||||
return m.makeCursor(bucket)
|
return m.makeCursor(bucket)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Cursor creates a new cursor (the real fun begins here)
|
// Cursor creates a new cursor (the real fun begins here)
|
||||||
func (m *miningmutation) CursorDupSort(bucket string) (kv.CursorDupSort, error) {
|
func (m *memorymutation) CursorDupSort(bucket string) (kv.CursorDupSort, error) {
|
||||||
return m.makeCursor(bucket)
|
return m.makeCursor(bucket)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Cursor creates a new cursor (the real fun begins here)
|
// Cursor creates a new cursor (the real fun begins here)
|
||||||
func (m *miningmutation) Cursor(bucket string) (kv.Cursor, error) {
|
func (m *memorymutation) Cursor(bucket string) (kv.Cursor, error) {
|
||||||
return m.makeCursor(bucket)
|
return m.makeCursor(bucket)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ViewID creates a new cursor (the real fun begins here)
|
// ViewID creates a new cursor (the real fun begins here)
|
||||||
func (m *miningmutation) ViewID() uint64 {
|
func (m *memorymutation) ViewID() uint64 {
|
||||||
panic("ViewID Not implemented")
|
panic("ViewID Not implemented")
|
||||||
}
|
}
|
@ -35,7 +35,7 @@ func TestLastMiningDB(t *testing.T) {
|
|||||||
|
|
||||||
initializeDB(rwTx)
|
initializeDB(rwTx)
|
||||||
|
|
||||||
batch := NewMiningBatch(rwTx)
|
batch := NewMemoryBatch(rwTx)
|
||||||
batch.Put(kv.HashedAccounts, []byte("BAAA"), []byte("value4"))
|
batch.Put(kv.HashedAccounts, []byte("BAAA"), []byte("value4"))
|
||||||
batch.Put(kv.HashedAccounts, []byte("BCAA"), []byte("value5"))
|
batch.Put(kv.HashedAccounts, []byte("BCAA"), []byte("value5"))
|
||||||
|
|
||||||
@ -60,7 +60,7 @@ func TestLastMiningMem(t *testing.T) {
|
|||||||
|
|
||||||
initializeDB(rwTx)
|
initializeDB(rwTx)
|
||||||
|
|
||||||
batch := NewMiningBatch(rwTx)
|
batch := NewMemoryBatch(rwTx)
|
||||||
batch.Put(kv.HashedAccounts, []byte("BAAA"), []byte("value4"))
|
batch.Put(kv.HashedAccounts, []byte("BAAA"), []byte("value4"))
|
||||||
batch.Put(kv.HashedAccounts, []byte("DCAA"), []byte("value5"))
|
batch.Put(kv.HashedAccounts, []byte("DCAA"), []byte("value5"))
|
||||||
|
|
||||||
@ -84,7 +84,7 @@ func TestDeleteMining(t *testing.T) {
|
|||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
initializeDB(rwTx)
|
initializeDB(rwTx)
|
||||||
batch := NewMiningBatch(rwTx)
|
batch := NewMemoryBatch(rwTx)
|
||||||
batch.Put(kv.HashedAccounts, []byte("BAAA"), []byte("value4"))
|
batch.Put(kv.HashedAccounts, []byte("BAAA"), []byte("value4"))
|
||||||
batch.Put(kv.HashedAccounts, []byte("DCAA"), []byte("value5"))
|
batch.Put(kv.HashedAccounts, []byte("DCAA"), []byte("value5"))
|
||||||
batch.Put(kv.HashedAccounts, []byte("FCAA"), []byte("value5"))
|
batch.Put(kv.HashedAccounts, []byte("FCAA"), []byte("value5"))
|
||||||
@ -105,3 +105,24 @@ func TestDeleteMining(t *testing.T) {
|
|||||||
require.Equal(t, key, []byte(nil))
|
require.Equal(t, key, []byte(nil))
|
||||||
require.Equal(t, value, []byte(nil))
|
require.Equal(t, value, []byte(nil))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestFlush(t *testing.T) {
|
||||||
|
rwTx, err := memdb.New().BeginRw(context.Background())
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
initializeDB(rwTx)
|
||||||
|
batch := NewMemoryBatch(rwTx)
|
||||||
|
batch.Put(kv.HashedAccounts, []byte("BAAA"), []byte("value4"))
|
||||||
|
batch.Put(kv.HashedAccounts, []byte("AAAA"), []byte("value5"))
|
||||||
|
batch.Put(kv.HashedAccounts, []byte("FCAA"), []byte("value5"))
|
||||||
|
|
||||||
|
require.NoError(t, batch.Flush(rwTx))
|
||||||
|
|
||||||
|
value, err := rwTx.GetOne(kv.HashedAccounts, []byte("BAAA"))
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, value, []byte("value4"))
|
||||||
|
|
||||||
|
value, err = rwTx.GetOne(kv.HashedAccounts, []byte("AAAA"))
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, value, []byte("value5"))
|
||||||
|
}
|
@ -27,7 +27,7 @@ type cursorentry struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// cursor
|
// cursor
|
||||||
type miningmutationcursor struct {
|
type memorymutationcursor struct {
|
||||||
// we can keep one cursor type if we store 2 of each kind.
|
// we can keep one cursor type if we store 2 of each kind.
|
||||||
cursor kv.Cursor
|
cursor kv.Cursor
|
||||||
dupCursor kv.CursorDupSort
|
dupCursor kv.CursorDupSort
|
||||||
@ -43,12 +43,12 @@ type miningmutationcursor struct {
|
|||||||
currentDbEntry cursorentry
|
currentDbEntry cursorentry
|
||||||
currentMemEntry cursorentry
|
currentMemEntry cursorentry
|
||||||
// we keep the mining mutation so that we can insert new elements in db
|
// we keep the mining mutation so that we can insert new elements in db
|
||||||
mutation *miningmutation
|
mutation *memorymutation
|
||||||
table string
|
table string
|
||||||
}
|
}
|
||||||
|
|
||||||
// First move cursor to first position and return key and value accordingly.
|
// First move cursor to first position and return key and value accordingly.
|
||||||
func (m *miningmutationcursor) First() ([]byte, []byte, error) {
|
func (m *memorymutationcursor) First() ([]byte, []byte, error) {
|
||||||
memKey, memValue, err := m.memCursor.First()
|
memKey, memValue, err := m.memCursor.First()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
@ -68,7 +68,7 @@ func (m *miningmutationcursor) First() ([]byte, []byte, error) {
|
|||||||
return m.goForward(memKey, memValue, dbKey, dbValue)
|
return m.goForward(memKey, memValue, dbKey, dbValue)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *miningmutationcursor) getNextOnDb(dup bool) (key []byte, value []byte, err error) {
|
func (m *memorymutationcursor) getNextOnDb(dup bool) (key []byte, value []byte, err error) {
|
||||||
if dup {
|
if dup {
|
||||||
key, value, err = m.dupCursor.NextDup()
|
key, value, err = m.dupCursor.NextDup()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -97,7 +97,7 @@ func (m *miningmutationcursor) getNextOnDb(dup bool) (key []byte, value []byte,
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *miningmutationcursor) convertAutoDupsort(key []byte, value []byte) []byte {
|
func (m *memorymutationcursor) convertAutoDupsort(key []byte, value []byte) []byte {
|
||||||
// The only dupsorted table we are interested is HashedStorage
|
// The only dupsorted table we are interested is HashedStorage
|
||||||
if m.table != kv.HashedStorage {
|
if m.table != kv.HashedStorage {
|
||||||
return key
|
return key
|
||||||
@ -106,11 +106,11 @@ func (m *miningmutationcursor) convertAutoDupsort(key []byte, value []byte) []by
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Current return the current key and values the cursor is on.
|
// Current return the current key and values the cursor is on.
|
||||||
func (m *miningmutationcursor) Current() ([]byte, []byte, error) {
|
func (m *memorymutationcursor) Current() ([]byte, []byte, error) {
|
||||||
return common.CopyBytes(m.currentPair.key), common.CopyBytes(m.currentPair.value), nil
|
return common.CopyBytes(m.currentPair.key), common.CopyBytes(m.currentPair.value), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *miningmutationcursor) skipIntersection(memKey, memValue, dbKey, dbValue []byte) (newDbKey []byte, newDbValue []byte, err error) {
|
func (m *memorymutationcursor) skipIntersection(memKey, memValue, dbKey, dbValue []byte) (newDbKey []byte, newDbValue []byte, err error) {
|
||||||
newDbKey = dbKey
|
newDbKey = dbKey
|
||||||
newDbValue = dbValue
|
newDbValue = dbValue
|
||||||
// Check for duplicates
|
// Check for duplicates
|
||||||
@ -132,7 +132,7 @@ func (m *miningmutationcursor) skipIntersection(memKey, memValue, dbKey, dbValue
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *miningmutationcursor) goForward(memKey, memValue, dbKey, dbValue []byte) ([]byte, []byte, error) {
|
func (m *memorymutationcursor) goForward(memKey, memValue, dbKey, dbValue []byte) ([]byte, []byte, error) {
|
||||||
var err error
|
var err error
|
||||||
if memValue == nil && dbValue == nil {
|
if memValue == nil && dbValue == nil {
|
||||||
return nil, nil, nil
|
return nil, nil, nil
|
||||||
@ -167,7 +167,7 @@ func (m *miningmutationcursor) goForward(memKey, memValue, dbKey, dbValue []byte
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Next returns the next element of the mutation.
|
// Next returns the next element of the mutation.
|
||||||
func (m *miningmutationcursor) Next() ([]byte, []byte, error) {
|
func (m *memorymutationcursor) Next() ([]byte, []byte, error) {
|
||||||
if m.isPrevFromDb {
|
if m.isPrevFromDb {
|
||||||
k, v, err := m.getNextOnDb(false)
|
k, v, err := m.getNextOnDb(false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -185,7 +185,7 @@ func (m *miningmutationcursor) Next() ([]byte, []byte, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// NextDup returns the next element of the mutation.
|
// NextDup returns the next element of the mutation.
|
||||||
func (m *miningmutationcursor) NextDup() ([]byte, []byte, error) {
|
func (m *memorymutationcursor) NextDup() ([]byte, []byte, error) {
|
||||||
if m.isPrevFromDb {
|
if m.isPrevFromDb {
|
||||||
k, v, err := m.getNextOnDb(true)
|
k, v, err := m.getNextOnDb(true)
|
||||||
|
|
||||||
@ -204,7 +204,7 @@ func (m *miningmutationcursor) NextDup() ([]byte, []byte, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Seek move pointer to a key at a certain position.
|
// Seek move pointer to a key at a certain position.
|
||||||
func (m *miningmutationcursor) Seek(seek []byte) ([]byte, []byte, error) {
|
func (m *memorymutationcursor) Seek(seek []byte) ([]byte, []byte, error) {
|
||||||
dbKey, dbValue, err := m.cursor.Seek(seek)
|
dbKey, dbValue, err := m.cursor.Seek(seek)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
@ -226,7 +226,7 @@ func (m *miningmutationcursor) Seek(seek []byte) ([]byte, []byte, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Seek move pointer to a key at a certain position.
|
// Seek move pointer to a key at a certain position.
|
||||||
func (m *miningmutationcursor) SeekExact(seek []byte) ([]byte, []byte, error) {
|
func (m *memorymutationcursor) SeekExact(seek []byte) ([]byte, []byte, error) {
|
||||||
memKey, memValue, err := m.memCursor.SeekExact(seek)
|
memKey, memValue, err := m.memCursor.SeekExact(seek)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
@ -257,37 +257,37 @@ func (m *miningmutationcursor) SeekExact(seek []byte) ([]byte, []byte, error) {
|
|||||||
return nil, nil, nil
|
return nil, nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *miningmutationcursor) Put(k, v []byte) error {
|
func (m *memorymutationcursor) Put(k, v []byte) error {
|
||||||
return m.mutation.Put(m.table, common.CopyBytes(k), common.CopyBytes(v))
|
return m.mutation.Put(m.table, common.CopyBytes(k), common.CopyBytes(v))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *miningmutationcursor) Append(k []byte, v []byte) error {
|
func (m *memorymutationcursor) Append(k []byte, v []byte) error {
|
||||||
return m.mutation.Put(m.table, common.CopyBytes(k), common.CopyBytes(v))
|
return m.mutation.Put(m.table, common.CopyBytes(k), common.CopyBytes(v))
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *miningmutationcursor) AppendDup(k []byte, v []byte) error {
|
func (m *memorymutationcursor) AppendDup(k []byte, v []byte) error {
|
||||||
return m.memDupCursor.AppendDup(common.CopyBytes(k), common.CopyBytes(v))
|
return m.memDupCursor.AppendDup(common.CopyBytes(k), common.CopyBytes(v))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *miningmutationcursor) PutNoDupData(key, value []byte) error {
|
func (m *memorymutationcursor) PutNoDupData(key, value []byte) error {
|
||||||
panic("DeleteCurrentDuplicates Not implemented")
|
panic("DeleteCurrentDuplicates Not implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *miningmutationcursor) Delete(k, v []byte) error {
|
func (m *memorymutationcursor) Delete(k, v []byte) error {
|
||||||
return m.mutation.Delete(m.table, k, v)
|
return m.mutation.Delete(m.table, k, v)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *miningmutationcursor) DeleteCurrent() error {
|
func (m *memorymutationcursor) DeleteCurrent() error {
|
||||||
panic("DeleteCurrent Not implemented")
|
panic("DeleteCurrent Not implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *miningmutationcursor) DeleteCurrentDuplicates() error {
|
func (m *memorymutationcursor) DeleteCurrentDuplicates() error {
|
||||||
panic("DeleteCurrentDuplicates Not implemented")
|
panic("DeleteCurrentDuplicates Not implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Seek move pointer to a key at a certain position.
|
// Seek move pointer to a key at a certain position.
|
||||||
func (m *miningmutationcursor) SeekBothRange(key, value []byte) ([]byte, error) {
|
func (m *memorymutationcursor) SeekBothRange(key, value []byte) ([]byte, error) {
|
||||||
if value == nil {
|
if value == nil {
|
||||||
_, v, err := m.SeekExact(key)
|
_, v, err := m.SeekExact(key)
|
||||||
return v, err
|
return v, err
|
||||||
@ -313,7 +313,7 @@ func (m *miningmutationcursor) SeekBothRange(key, value []byte) ([]byte, error)
|
|||||||
return retValue, err
|
return retValue, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *miningmutationcursor) Last() ([]byte, []byte, error) {
|
func (m *memorymutationcursor) Last() ([]byte, []byte, error) {
|
||||||
// TODO(Giulio2002): make fixes.
|
// TODO(Giulio2002): make fixes.
|
||||||
memKey, memValue, err := m.memCursor.Last()
|
memKey, memValue, err := m.memCursor.Last()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -373,11 +373,11 @@ func (m *miningmutationcursor) Last() ([]byte, []byte, error) {
|
|||||||
return dbKey, dbValue, nil
|
return dbKey, dbValue, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *miningmutationcursor) Prev() ([]byte, []byte, error) {
|
func (m *memorymutationcursor) Prev() ([]byte, []byte, error) {
|
||||||
panic("Prev is not implemented!")
|
panic("Prev is not implemented!")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *miningmutationcursor) Close() {
|
func (m *memorymutationcursor) Close() {
|
||||||
if m.cursor != nil {
|
if m.cursor != nil {
|
||||||
m.cursor.Close()
|
m.cursor.Close()
|
||||||
}
|
}
|
||||||
@ -387,26 +387,26 @@ func (m *miningmutationcursor) Close() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *miningmutationcursor) Count() (uint64, error) {
|
func (m *memorymutationcursor) Count() (uint64, error) {
|
||||||
panic("Not implemented")
|
panic("Not implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *miningmutationcursor) FirstDup() ([]byte, error) {
|
func (m *memorymutationcursor) FirstDup() ([]byte, error) {
|
||||||
panic("Not implemented")
|
panic("Not implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *miningmutationcursor) NextNoDup() ([]byte, []byte, error) {
|
func (m *memorymutationcursor) NextNoDup() ([]byte, []byte, error) {
|
||||||
panic("Not implemented")
|
panic("Not implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *miningmutationcursor) LastDup() ([]byte, error) {
|
func (m *memorymutationcursor) LastDup() ([]byte, error) {
|
||||||
panic("Not implemented")
|
panic("Not implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *miningmutationcursor) CountDuplicates() (uint64, error) {
|
func (m *memorymutationcursor) CountDuplicates() (uint64, error) {
|
||||||
panic("Not implemented")
|
panic("Not implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *miningmutationcursor) SeekBothExact(key, value []byte) ([]byte, []byte, error) {
|
func (m *memorymutationcursor) SeekBothExact(key, value []byte) ([]byte, []byte, error) {
|
||||||
panic("SeekBothExact Not implemented")
|
panic("SeekBothExact Not implemented")
|
||||||
}
|
}
|
@ -225,7 +225,7 @@ func MiningStep(ctx context.Context, kv kv.RwDB, mining *stagedsync.Sync) (err e
|
|||||||
}
|
}
|
||||||
defer tx.Rollback()
|
defer tx.Rollback()
|
||||||
|
|
||||||
miningBatch := olddb.NewMiningBatch(tx)
|
miningBatch := olddb.NewMemoryBatch(tx)
|
||||||
defer miningBatch.Rollback()
|
defer miningBatch.Rollback()
|
||||||
|
|
||||||
if err = mining.Run(nil, miningBatch, false); err != nil {
|
if err = mining.Run(nil, miningBatch, false); err != nil {
|
||||||
|
Loading…
Reference in New Issue
Block a user