diff --git a/ethdb/olddb/miningmutation.go b/ethdb/olddb/memorymutation.go similarity index 63% rename from ethdb/olddb/miningmutation.go rename to ethdb/olddb/memorymutation.go index 22f7cc779..6636b9e9b 100644 --- a/ethdb/olddb/miningmutation.go +++ b/ethdb/olddb/memorymutation.go @@ -24,7 +24,7 @@ import ( "github.com/ledgerwatch/erigon/ethdb" ) -type miningmutation struct { +type memorymutation struct { // Bucket => Key => Value memTx kv.RwTx memDb kv.RwDB @@ -42,13 +42,13 @@ type miningmutation struct { // defer batch.Rollback() // ... some calculations on `batch` // batch.Commit() -func NewMiningBatch(tx kv.Tx) *miningmutation { +func NewMemoryBatch(tx kv.Tx) *memorymutation { tmpDB := mdbx.NewMDBX(log.New()).InMem().MustOpen() memTx, err := tmpDB.BeginRw(context.Background()) if err != nil { panic(err) } - return &miningmutation{ + return &memorymutation{ db: tx, memDb: tmpDB, memTx: memTx, @@ -62,19 +62,19 @@ func NewMiningBatch(tx kv.Tx) *miningmutation { } } -func (m *miningmutation) RwKV() kv.RwDB { +func (m *memorymutation) RwKV() kv.RwDB { if casted, ok := m.db.(ethdb.HasRwKV); ok { return casted.RwKV() } return nil } -func (m *miningmutation) isTableCleared(table string) bool { +func (m *memorymutation) isTableCleared(table string) bool { _, ok := m.clearedTables[table] return ok } -func (m *miningmutation) isEntryDeleted(table string, key []byte) bool { +func (m *memorymutation) isEntryDeleted(table string, key []byte) bool { _, ok := m.deletedEntries[table] if !ok { return ok @@ -84,7 +84,7 @@ func (m *miningmutation) isEntryDeleted(table string, key []byte) bool { } // getMem Retrieve database entry from memory (hashed storage will be left out for now because it is the only non auto-DupSorted table) -func (m *miningmutation) getMem(table string, key []byte) ([]byte, bool) { +func (m *memorymutation) getMem(table string, key []byte) ([]byte, bool) { val, err := m.memTx.GetOne(table, key) if err != nil { panic(err) @@ -92,10 +92,10 @@ func (m *miningmutation) getMem(table string, key []byte) ([]byte, bool) { return val, val != nil } -func (m *miningmutation) DBSize() (uint64, error) { panic("not implemented") } -func (m *miningmutation) PageSize() uint64 { panic("not implemented") } +func (m *memorymutation) DBSize() (uint64, error) { panic("not implemented") } +func (m *memorymutation) PageSize() uint64 { panic("not implemented") } -func (m *miningmutation) IncrementSequence(bucket string, amount uint64) (res uint64, err error) { +func (m *memorymutation) IncrementSequence(bucket string, amount uint64) (res uint64, err error) { v, ok := m.getMem(kv.Sequence, []byte(bucket)) if !ok && m.db != nil { v, err = m.db.GetOne(kv.Sequence, []byte(bucket)) @@ -118,7 +118,7 @@ func (m *miningmutation) IncrementSequence(bucket string, amount uint64) (res ui return currentV, nil } -func (m *miningmutation) ReadSequence(bucket string) (res uint64, err error) { +func (m *memorymutation) ReadSequence(bucket string) (res uint64, err error) { v, ok := m.getMem(kv.Sequence, []byte(bucket)) if !ok && m.db != nil { v, err = m.db.GetOne(kv.Sequence, []byte(bucket)) @@ -135,7 +135,7 @@ func (m *miningmutation) ReadSequence(bucket string) (res uint64, err error) { } // Can only be called from the worker thread -func (m *miningmutation) GetOne(table string, key []byte) ([]byte, error) { +func (m *memorymutation) GetOne(table string, key []byte) ([]byte, error) { if value, ok := m.getMem(table, key); ok { if value == nil { return nil, nil @@ -154,7 +154,7 @@ func (m *miningmutation) GetOne(table string, key []byte) ([]byte, error) { } // Can only be called from the worker thread -func (m *miningmutation) Get(table string, key []byte) ([]byte, error) { +func (m *memorymutation) Get(table string, key []byte) ([]byte, error) { value, err := m.GetOne(table, key) if err != nil { return nil, err @@ -167,12 +167,12 @@ func (m *miningmutation) Get(table string, key []byte) ([]byte, error) { return value, nil } -func (m *miningmutation) Last(table string) ([]byte, []byte, error) { - panic("not implemented. (miningmutation.Last)") +func (m *memorymutation) Last(table string) ([]byte, []byte, error) { + panic("not implemented. (memorymutation.Last)") } // Has return whether a key is present in a certain table. -func (m *miningmutation) Has(table string, key []byte) (bool, error) { +func (m *memorymutation) Has(table string, key []byte) (bool, error) { if _, ok := m.getMem(table, key); ok { return ok, nil } @@ -183,38 +183,38 @@ func (m *miningmutation) Has(table string, key []byte) (bool, error) { } // Put insert a new entry in the database, if it is hashed storage it will add it to a slice instead of a map. -func (m *miningmutation) Put(table string, key []byte, value []byte) error { +func (m *memorymutation) Put(table string, key []byte, value []byte) error { return m.memTx.Put(table, key, value) } -func (m *miningmutation) Append(table string, key []byte, value []byte) error { +func (m *memorymutation) Append(table string, key []byte, value []byte) error { return m.Put(table, key, value) } -func (m *miningmutation) AppendDup(table string, key []byte, value []byte) error { +func (m *memorymutation) AppendDup(table string, key []byte, value []byte) error { return m.Put(table, key, value) } -func (m *miningmutation) BatchSize() int { +func (m *memorymutation) BatchSize() int { return 0 } -func (m *miningmutation) ForEach(bucket string, fromPrefix []byte, walker func(k, v []byte) error) error { +func (m *memorymutation) ForEach(bucket string, fromPrefix []byte, walker func(k, v []byte) error) error { m.panicOnEmptyDB() return m.db.ForEach(bucket, fromPrefix, walker) } -func (m *miningmutation) ForPrefix(bucket string, prefix []byte, walker func(k, v []byte) error) error { +func (m *memorymutation) ForPrefix(bucket string, prefix []byte, walker func(k, v []byte) error) error { m.panicOnEmptyDB() return m.db.ForPrefix(bucket, prefix, walker) } -func (m *miningmutation) ForAmount(bucket string, prefix []byte, amount uint32, walker func(k, v []byte) error) error { +func (m *memorymutation) ForAmount(bucket string, prefix []byte, amount uint32, walker func(k, v []byte) error) error { m.panicOnEmptyDB() return m.db.ForAmount(bucket, prefix, amount, walker) } -func (m *miningmutation) Delete(table string, k, v []byte) error { +func (m *memorymutation) Delete(table string, k, v []byte) error { if _, ok := m.deletedEntries[table]; !ok { m.deletedEntries[table] = make(map[string]struct{}) } @@ -222,70 +222,116 @@ func (m *miningmutation) Delete(table string, k, v []byte) error { return m.memTx.Delete(table, k, v) } -func (m *miningmutation) Commit() error { +func (m *memorymutation) Commit() error { return nil } -func (m *miningmutation) Rollback() { +func (m *memorymutation) Rollback() { m.memTx.Rollback() m.memDb.Close() return } -func (m *miningmutation) Close() { +func (m *memorymutation) Close() { m.Rollback() } -func (m *miningmutation) Begin(ctx context.Context, flags ethdb.TxFlags) (ethdb.DbWithPendingMutations, error) { +func (m *memorymutation) Begin(ctx context.Context, flags ethdb.TxFlags) (ethdb.DbWithPendingMutations, error) { panic("mutation can't start transaction, because doesn't own it") } -func (m *miningmutation) panicOnEmptyDB() { +func (m *memorymutation) panicOnEmptyDB() { if m.db == nil { panic("Not implemented") } } -func (m *miningmutation) SetRwKV(kv kv.RwDB) { +func (m *memorymutation) SetRwKV(kv kv.RwDB) { m.db.(ethdb.HasRwKV).SetRwKV(kv) } -func (m *miningmutation) BucketSize(bucket string) (uint64, error) { +func (m *memorymutation) BucketSize(bucket string) (uint64, error) { return 0, nil } -func (m *miningmutation) DropBucket(bucket string) error { +func (m *memorymutation) DropBucket(bucket string) error { panic("Not implemented") } -func (m *miningmutation) ExistsBucket(bucket string) (bool, error) { +func (m *memorymutation) ExistsBucket(bucket string) (bool, error) { panic("Not implemented") } -func (m *miningmutation) ListBuckets() ([]string, error) { +func (m *memorymutation) ListBuckets() ([]string, error) { panic("Not implemented") } -func (m *miningmutation) ClearBucket(bucket string) error { +func (m *memorymutation) ClearBucket(bucket string) error { m.clearedTables[bucket] = struct{}{} return m.memTx.ClearBucket(bucket) } -func (m *miningmutation) isBucketCleared(bucket string) bool { +func (m *memorymutation) isBucketCleared(bucket string) bool { _, ok := m.clearedTables[bucket] return ok } -func (m *miningmutation) CollectMetrics() { +func (m *memorymutation) CollectMetrics() { } -func (m *miningmutation) CreateBucket(bucket string) error { +func (m *memorymutation) CreateBucket(bucket string) error { panic("Not implemented") } +func (m *memorymutation) Flush(tx kv.RwTx) error { + // Obtain buckets touched. + buckets, err := m.memTx.ListBuckets() + if err != nil { + return err + } + // Iterate over each bucket and apply changes accordingly. + for _, bucket := range buckets { + if _, ok := m.dupsortTables[bucket]; ok && bucket != kv.HashedStorage { + cbucket, err := m.memTx.CursorDupSort(bucket) + if err != nil { + return err + } + defer cbucket.Close() + dbCursor, err := tx.RwCursorDupSort(bucket) + if err != nil { + return err + } + defer dbCursor.Close() + for k, v, err := cbucket.First(); k != nil; k, v, err = cbucket.Next() { + if err != nil { + return err + } + if err := dbCursor.AppendDup(k, v); err != nil { + return err + } + } + } else { + cbucket, err := m.memTx.Cursor(bucket) + if err != nil { + return err + } + defer cbucket.Close() + for k, v, err := cbucket.First(); k != nil; k, v, err = cbucket.Next() { + if err != nil { + return err + } + if err := tx.Put(bucket, k, v); err != nil { + return err + } + } + } + } + return nil +} + // Cursor creates a new cursor (the real fun begins here) -func (m *miningmutation) makeCursor(bucket string) (kv.RwCursorDupSort, error) { - c := &miningmutationcursor{} +func (m *memorymutation) makeCursor(bucket string) (kv.RwCursorDupSort, error) { + c := &memorymutationcursor{} // We can filter duplicates in dup sorted table c.table = bucket @@ -309,26 +355,26 @@ func (m *miningmutation) makeCursor(bucket string) (kv.RwCursorDupSort, error) { } // Cursor creates a new cursor (the real fun begins here) -func (m *miningmutation) RwCursorDupSort(bucket string) (kv.RwCursorDupSort, error) { +func (m *memorymutation) RwCursorDupSort(bucket string) (kv.RwCursorDupSort, error) { return m.makeCursor(bucket) } // Cursor creates a new cursor (the real fun begins here) -func (m *miningmutation) RwCursor(bucket string) (kv.RwCursor, error) { +func (m *memorymutation) RwCursor(bucket string) (kv.RwCursor, error) { return m.makeCursor(bucket) } // Cursor creates a new cursor (the real fun begins here) -func (m *miningmutation) CursorDupSort(bucket string) (kv.CursorDupSort, error) { +func (m *memorymutation) CursorDupSort(bucket string) (kv.CursorDupSort, error) { return m.makeCursor(bucket) } // Cursor creates a new cursor (the real fun begins here) -func (m *miningmutation) Cursor(bucket string) (kv.Cursor, error) { +func (m *memorymutation) Cursor(bucket string) (kv.Cursor, error) { return m.makeCursor(bucket) } // ViewID creates a new cursor (the real fun begins here) -func (m *miningmutation) ViewID() uint64 { +func (m *memorymutation) ViewID() uint64 { panic("ViewID Not implemented") } diff --git a/ethdb/olddb/miningmutation_test.go b/ethdb/olddb/memorymutation_test.go similarity index 80% rename from ethdb/olddb/miningmutation_test.go rename to ethdb/olddb/memorymutation_test.go index 2f046f038..3f96479a2 100644 --- a/ethdb/olddb/miningmutation_test.go +++ b/ethdb/olddb/memorymutation_test.go @@ -35,7 +35,7 @@ func TestLastMiningDB(t *testing.T) { initializeDB(rwTx) - batch := NewMiningBatch(rwTx) + batch := NewMemoryBatch(rwTx) batch.Put(kv.HashedAccounts, []byte("BAAA"), []byte("value4")) batch.Put(kv.HashedAccounts, []byte("BCAA"), []byte("value5")) @@ -60,7 +60,7 @@ func TestLastMiningMem(t *testing.T) { initializeDB(rwTx) - batch := NewMiningBatch(rwTx) + batch := NewMemoryBatch(rwTx) batch.Put(kv.HashedAccounts, []byte("BAAA"), []byte("value4")) batch.Put(kv.HashedAccounts, []byte("DCAA"), []byte("value5")) @@ -84,7 +84,7 @@ func TestDeleteMining(t *testing.T) { require.NoError(t, err) initializeDB(rwTx) - batch := NewMiningBatch(rwTx) + batch := NewMemoryBatch(rwTx) batch.Put(kv.HashedAccounts, []byte("BAAA"), []byte("value4")) batch.Put(kv.HashedAccounts, []byte("DCAA"), []byte("value5")) batch.Put(kv.HashedAccounts, []byte("FCAA"), []byte("value5")) @@ -105,3 +105,24 @@ func TestDeleteMining(t *testing.T) { require.Equal(t, key, []byte(nil)) require.Equal(t, value, []byte(nil)) } + +func TestFlush(t *testing.T) { + rwTx, err := memdb.New().BeginRw(context.Background()) + require.NoError(t, err) + + initializeDB(rwTx) + batch := NewMemoryBatch(rwTx) + batch.Put(kv.HashedAccounts, []byte("BAAA"), []byte("value4")) + batch.Put(kv.HashedAccounts, []byte("AAAA"), []byte("value5")) + batch.Put(kv.HashedAccounts, []byte("FCAA"), []byte("value5")) + + require.NoError(t, batch.Flush(rwTx)) + + value, err := rwTx.GetOne(kv.HashedAccounts, []byte("BAAA")) + require.NoError(t, err) + require.Equal(t, value, []byte("value4")) + + value, err = rwTx.GetOne(kv.HashedAccounts, []byte("AAAA")) + require.NoError(t, err) + require.Equal(t, value, []byte("value5")) +} diff --git a/ethdb/olddb/miningmutationcursor.go b/ethdb/olddb/memorymutationcursor.go similarity index 83% rename from ethdb/olddb/miningmutationcursor.go rename to ethdb/olddb/memorymutationcursor.go index ed5765e1b..9c0f3aa94 100644 --- a/ethdb/olddb/miningmutationcursor.go +++ b/ethdb/olddb/memorymutationcursor.go @@ -27,7 +27,7 @@ type cursorentry struct { } // cursor -type miningmutationcursor struct { +type memorymutationcursor struct { // we can keep one cursor type if we store 2 of each kind. cursor kv.Cursor dupCursor kv.CursorDupSort @@ -43,12 +43,12 @@ type miningmutationcursor struct { currentDbEntry cursorentry currentMemEntry cursorentry // we keep the mining mutation so that we can insert new elements in db - mutation *miningmutation + mutation *memorymutation table string } // First move cursor to first position and return key and value accordingly. -func (m *miningmutationcursor) First() ([]byte, []byte, error) { +func (m *memorymutationcursor) First() ([]byte, []byte, error) { memKey, memValue, err := m.memCursor.First() if err != nil { return nil, nil, err @@ -68,7 +68,7 @@ func (m *miningmutationcursor) First() ([]byte, []byte, error) { return m.goForward(memKey, memValue, dbKey, dbValue) } -func (m *miningmutationcursor) getNextOnDb(dup bool) (key []byte, value []byte, err error) { +func (m *memorymutationcursor) getNextOnDb(dup bool) (key []byte, value []byte, err error) { if dup { key, value, err = m.dupCursor.NextDup() if err != nil { @@ -97,7 +97,7 @@ func (m *miningmutationcursor) getNextOnDb(dup bool) (key []byte, value []byte, return } -func (m *miningmutationcursor) convertAutoDupsort(key []byte, value []byte) []byte { +func (m *memorymutationcursor) convertAutoDupsort(key []byte, value []byte) []byte { // The only dupsorted table we are interested is HashedStorage if m.table != kv.HashedStorage { return key @@ -106,11 +106,11 @@ func (m *miningmutationcursor) convertAutoDupsort(key []byte, value []byte) []by } // Current return the current key and values the cursor is on. -func (m *miningmutationcursor) Current() ([]byte, []byte, error) { +func (m *memorymutationcursor) Current() ([]byte, []byte, error) { return common.CopyBytes(m.currentPair.key), common.CopyBytes(m.currentPair.value), nil } -func (m *miningmutationcursor) skipIntersection(memKey, memValue, dbKey, dbValue []byte) (newDbKey []byte, newDbValue []byte, err error) { +func (m *memorymutationcursor) skipIntersection(memKey, memValue, dbKey, dbValue []byte) (newDbKey []byte, newDbValue []byte, err error) { newDbKey = dbKey newDbValue = dbValue // Check for duplicates @@ -132,7 +132,7 @@ func (m *miningmutationcursor) skipIntersection(memKey, memValue, dbKey, dbValue return } -func (m *miningmutationcursor) goForward(memKey, memValue, dbKey, dbValue []byte) ([]byte, []byte, error) { +func (m *memorymutationcursor) goForward(memKey, memValue, dbKey, dbValue []byte) ([]byte, []byte, error) { var err error if memValue == nil && dbValue == nil { return nil, nil, nil @@ -167,7 +167,7 @@ func (m *miningmutationcursor) goForward(memKey, memValue, dbKey, dbValue []byte } // Next returns the next element of the mutation. -func (m *miningmutationcursor) Next() ([]byte, []byte, error) { +func (m *memorymutationcursor) Next() ([]byte, []byte, error) { if m.isPrevFromDb { k, v, err := m.getNextOnDb(false) if err != nil { @@ -185,7 +185,7 @@ func (m *miningmutationcursor) Next() ([]byte, []byte, error) { } // NextDup returns the next element of the mutation. -func (m *miningmutationcursor) NextDup() ([]byte, []byte, error) { +func (m *memorymutationcursor) NextDup() ([]byte, []byte, error) { if m.isPrevFromDb { k, v, err := m.getNextOnDb(true) @@ -204,7 +204,7 @@ func (m *miningmutationcursor) NextDup() ([]byte, []byte, error) { } // Seek move pointer to a key at a certain position. -func (m *miningmutationcursor) Seek(seek []byte) ([]byte, []byte, error) { +func (m *memorymutationcursor) Seek(seek []byte) ([]byte, []byte, error) { dbKey, dbValue, err := m.cursor.Seek(seek) if err != nil { return nil, nil, err @@ -226,7 +226,7 @@ func (m *miningmutationcursor) Seek(seek []byte) ([]byte, []byte, error) { } // Seek move pointer to a key at a certain position. -func (m *miningmutationcursor) SeekExact(seek []byte) ([]byte, []byte, error) { +func (m *memorymutationcursor) SeekExact(seek []byte) ([]byte, []byte, error) { memKey, memValue, err := m.memCursor.SeekExact(seek) if err != nil { return nil, nil, err @@ -257,37 +257,37 @@ func (m *miningmutationcursor) SeekExact(seek []byte) ([]byte, []byte, error) { return nil, nil, nil } -func (m *miningmutationcursor) Put(k, v []byte) error { +func (m *memorymutationcursor) Put(k, v []byte) error { return m.mutation.Put(m.table, common.CopyBytes(k), common.CopyBytes(v)) } -func (m *miningmutationcursor) Append(k []byte, v []byte) error { +func (m *memorymutationcursor) Append(k []byte, v []byte) error { return m.mutation.Put(m.table, common.CopyBytes(k), common.CopyBytes(v)) } -func (m *miningmutationcursor) AppendDup(k []byte, v []byte) error { +func (m *memorymutationcursor) AppendDup(k []byte, v []byte) error { return m.memDupCursor.AppendDup(common.CopyBytes(k), common.CopyBytes(v)) } -func (m *miningmutationcursor) PutNoDupData(key, value []byte) error { +func (m *memorymutationcursor) PutNoDupData(key, value []byte) error { panic("DeleteCurrentDuplicates Not implemented") } -func (m *miningmutationcursor) Delete(k, v []byte) error { +func (m *memorymutationcursor) Delete(k, v []byte) error { return m.mutation.Delete(m.table, k, v) } -func (m *miningmutationcursor) DeleteCurrent() error { +func (m *memorymutationcursor) DeleteCurrent() error { panic("DeleteCurrent Not implemented") } -func (m *miningmutationcursor) DeleteCurrentDuplicates() error { +func (m *memorymutationcursor) DeleteCurrentDuplicates() error { panic("DeleteCurrentDuplicates Not implemented") } // Seek move pointer to a key at a certain position. -func (m *miningmutationcursor) SeekBothRange(key, value []byte) ([]byte, error) { +func (m *memorymutationcursor) SeekBothRange(key, value []byte) ([]byte, error) { if value == nil { _, v, err := m.SeekExact(key) return v, err @@ -313,7 +313,7 @@ func (m *miningmutationcursor) SeekBothRange(key, value []byte) ([]byte, error) return retValue, err } -func (m *miningmutationcursor) Last() ([]byte, []byte, error) { +func (m *memorymutationcursor) Last() ([]byte, []byte, error) { // TODO(Giulio2002): make fixes. memKey, memValue, err := m.memCursor.Last() if err != nil { @@ -373,11 +373,11 @@ func (m *miningmutationcursor) Last() ([]byte, []byte, error) { return dbKey, dbValue, nil } -func (m *miningmutationcursor) Prev() ([]byte, []byte, error) { +func (m *memorymutationcursor) Prev() ([]byte, []byte, error) { panic("Prev is not implemented!") } -func (m *miningmutationcursor) Close() { +func (m *memorymutationcursor) Close() { if m.cursor != nil { m.cursor.Close() } @@ -387,26 +387,26 @@ func (m *miningmutationcursor) Close() { return } -func (m *miningmutationcursor) Count() (uint64, error) { +func (m *memorymutationcursor) Count() (uint64, error) { panic("Not implemented") } -func (m *miningmutationcursor) FirstDup() ([]byte, error) { +func (m *memorymutationcursor) FirstDup() ([]byte, error) { panic("Not implemented") } -func (m *miningmutationcursor) NextNoDup() ([]byte, []byte, error) { +func (m *memorymutationcursor) NextNoDup() ([]byte, []byte, error) { panic("Not implemented") } -func (m *miningmutationcursor) LastDup() ([]byte, error) { +func (m *memorymutationcursor) LastDup() ([]byte, error) { panic("Not implemented") } -func (m *miningmutationcursor) CountDuplicates() (uint64, error) { +func (m *memorymutationcursor) CountDuplicates() (uint64, error) { panic("Not implemented") } -func (m *miningmutationcursor) SeekBothExact(key, value []byte) ([]byte, []byte, error) { +func (m *memorymutationcursor) SeekBothExact(key, value []byte) ([]byte, []byte, error) { panic("SeekBothExact Not implemented") } diff --git a/turbo/stages/stageloop.go b/turbo/stages/stageloop.go index dacd8a2a2..50d442baf 100644 --- a/turbo/stages/stageloop.go +++ b/turbo/stages/stageloop.go @@ -225,7 +225,7 @@ func MiningStep(ctx context.Context, kv kv.RwDB, mining *stagedsync.Sync) (err e } defer tx.Rollback() - miningBatch := olddb.NewMiningBatch(tx) + miningBatch := olddb.NewMemoryBatch(tx) defer miningBatch.Rollback() if err = mining.Run(nil, miningBatch, false); err != nil {