From 069343d5a8e5a0b91ea7f7c57a2578a8a8421ab0 Mon Sep 17 00:00:00 2001 From: andrew Date: Wed, 6 Nov 2019 11:24:48 +0100 Subject: [PATCH] BadgerDatabase DeleteTimestamp --- ethdb/badger_db.go | 71 +++++++++++++++++++++++++++++++++--------- ethdb/bolt_db.go | 7 ++--- ethdb/database_test.go | 6 ++-- ethdb/interface.go | 16 +++++----- 4 files changed, 70 insertions(+), 30 deletions(-) diff --git a/ethdb/badger_db.go b/ethdb/badger_db.go index 334235359..864b87eab 100644 --- a/ethdb/badger_db.go +++ b/ethdb/badger_db.go @@ -67,18 +67,16 @@ func bucketKey(bucket, key []byte) []byte { // Delete removes a single entry. func (db *BadgerDatabase) Delete(bucket, key []byte) error { - err := db.db.Update(func(txn *badger.Txn) error { + return db.db.Update(func(txn *badger.Txn) error { return txn.Delete(bucketKey(bucket, key)) }) - return err } // Put inserts or updates a single entry. func (db *BadgerDatabase) Put(bucket, key []byte, value []byte) error { - err := db.db.Update(func(txn *badger.Txn) error { + return db.db.Update(func(txn *badger.Txn) error { return txn.Set(bucketKey(bucket, key), value) }) - return err } // Get returns a single value. @@ -98,15 +96,16 @@ func (db *BadgerDatabase) Get(bucket, key []byte) ([]byte, error) { // PutS adds a new entry to the historical buckets: // hBucket (unless changeSetBucketOnly) and ChangeSet. func (db *BadgerDatabase) PutS(hBucket, key, value []byte, timestamp uint64, changeSetBucketOnly bool) error { - composite, suffix := dbutils.CompositeKeySuffix(key, timestamp) - suffixkey := make([]byte, len(suffix)+len(hBucket)) - copy(suffixkey, suffix) - copy(suffixkey[len(suffix):], hBucket) - + composite, encodedStamp := dbutils.CompositeKeySuffix(key, timestamp) hKey := bucketKey(hBucket, composite) + + suffixkey := make([]byte, len(encodedStamp)+len(hBucket)) + copy(suffixkey, encodedStamp) + copy(suffixkey[len(encodedStamp):], hBucket) + changeSetKey := bucketKey(dbutils.ChangeSetBucket, suffixkey) - err := db.db.Update(func(tx *badger.Txn) error { + return db.db.Update(func(tx *badger.Txn) error { if !changeSetBucketOnly { if err := tx.Set(hKey, value); err != nil { return err @@ -121,10 +120,11 @@ func (db *BadgerDatabase) PutS(hBucket, key, value []byte, timestamp uint64, cha var sh dbutils.ChangeSet if err == nil { err = changeSetItem.Value(func(val []byte) error { - sh, err = dbutils.Decode(val) - if err != nil { - log.Error("PutS Decode suffix err", "err", err) - return err + var err2 error + sh, err2 = dbutils.Decode(val) + if err2 != nil { + log.Error("PutS Decode suffix err", "err", err2) + return err2 } return nil }) @@ -142,7 +142,48 @@ func (db *BadgerDatabase) PutS(hBucket, key, value []byte, timestamp uint64, cha return tx.Set(changeSetKey, dat) }) - return err +} + +// DeleteTimestamp removes data for a given timestamp from all historical buckets (incl. ChangeSet). +func (db *BadgerDatabase) DeleteTimestamp(timestamp uint64) error { + encodedStamp := dbutils.EncodeTimestamp(timestamp) + prefix := bucketKey(dbutils.ChangeSetBucket, encodedStamp) + return db.db.Update(func(tx *badger.Txn) error { + var keys [][]byte + it := tx.NewIterator(badger.DefaultIteratorOptions) + defer it.Close() + for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() { + item := it.Item() + k := item.Key() + + var changedAccounts dbutils.ChangeSet + err := item.Value(func(v []byte) error { + var err2 error + changedAccounts, err2 = dbutils.Decode(v) + return err2 + }) + if err != nil { + return err + } + + bucket := k[len(encodedStamp):] + err = changedAccounts.Walk(func(kk, _ []byte) error { + kk = append(kk, encodedStamp...) + return tx.Delete(bucketKey(bucket, kk)) + }) + if err != nil { + return err + } + + keys = append(keys, k) + } + for _, k := range keys { + if err := tx.Delete(bucketKey(dbutils.ChangeSetBucket, k)); err != nil { + return err + } + } + return nil + }) } // TODO [Andrew] implement the full Database interface diff --git a/ethdb/bolt_db.go b/ethdb/bolt_db.go index d016be238..79134516a 100644 --- a/ethdb/bolt_db.go +++ b/ethdb/bolt_db.go @@ -545,7 +545,7 @@ func (db *BoltDatabase) Delete(bucket, key []byte) error { return err } -// Deletes all keys with specified suffix from all the buckets +// DeleteTimestamp removes data for a given timestamp from all historical buckets (incl. ChangeSet). func (db *BoltDatabase) DeleteTimestamp(timestamp uint64) error { suffix := dbutils.EncodeTimestamp(timestamp) err := db.db.Update(func(tx *bolt.Tx) error { @@ -566,10 +566,7 @@ func (db *BoltDatabase) DeleteTimestamp(timestamp uint64) error { } err = changedAccounts.Walk(func(kk, _ []byte) error { kk = append(kk, suffix...) - if err := hb.Delete(kk); err != nil { - return err - } - return nil + return hb.Delete(kk) }) if err != nil { return err diff --git a/ethdb/database_test.go b/ethdb/database_test.go index 4e311fb3b..8208b9cd6 100644 --- a/ethdb/database_test.go +++ b/ethdb/database_test.go @@ -80,7 +80,7 @@ func TestBadgerDB_PutGet(t *testing.T) { testPutGet(db, t) } -func testPutGet(db SimpleDatabase, t *testing.T) { +func testPutGet(db MinDatabase, t *testing.T) { t.Parallel() for _, k := range testValues { @@ -184,7 +184,7 @@ func TestBadgerDB_ParallelPutGet(t *testing.T) { defer remove() testParallelPutGet(db) } -func testParallelPutGet(db SimpleDatabase) { +func testParallelPutGet(db MinDatabase) { const n = 8 var pending sync.WaitGroup @@ -209,7 +209,7 @@ func testParallelPutGet(db SimpleDatabase) { panic("get failed: " + err.Error()) } if !bytes.Equal(data, []byte("v"+key)) { - panic(fmt.Sprintf("get failed, got %q expected %q", []byte(data), []byte("v"+key))) + panic(fmt.Sprintf("get failed, got %q expected %q", data, []byte("v"+key))) } }(strconv.Itoa(i)) } diff --git a/ethdb/interface.go b/ethdb/interface.go index 228f1469d..46c4e3cce 100644 --- a/ethdb/interface.go +++ b/ethdb/interface.go @@ -27,9 +27,8 @@ type Putter interface { // PutS adds a new entry to the historical buckets: // hBucket (unless changeSetBucketOnly) and ChangeSet. + // timestamp == block number PutS(hBucket, key, value []byte, timestamp uint64, changeSetBucketOnly bool) error - - DeleteTimestamp(timestamp uint64) error } // Getter wraps the database read operations. @@ -49,6 +48,10 @@ type Getter interface { type Deleter interface { // Delete removes a single entry. Delete(bucket, key []byte) error + + // DeleteTimestamp removes data for a given timestamp from all historical buckets (incl. ChangeSet). + // timestamp == block number + DeleteTimestamp(timestamp uint64) error } // Database wraps all database operations. All methods are safe for concurrent use. @@ -69,12 +72,11 @@ type Database interface { TruncateAncients(items uint64) error } -// SimpleDatabase is a minimalistic version of the Database interface. -// TODO [Andrew] remove the interface. -type SimpleDatabase interface { - Deleter - Put(bucket, key, value []byte) error +// MinDatabase is a minimalistic version of the Database interface. +type MinDatabase interface { Get(bucket, key []byte) ([]byte, error) + Put(bucket, key, value []byte) error + Delete(bucket, key []byte) error } // DbWithPendingMutations is an extended version of the Database,