BadgerDatabase DeleteTimestamp

This commit is contained in:
andrew 2019-11-06 11:24:48 +01:00
parent 8209822cb4
commit 069343d5a8
4 changed files with 70 additions and 30 deletions

View File

@ -67,18 +67,16 @@ func bucketKey(bucket, key []byte) []byte {
// Delete removes a single entry.
func (db *BadgerDatabase) Delete(bucket, key []byte) error {
err := db.db.Update(func(txn *badger.Txn) error {
return db.db.Update(func(txn *badger.Txn) error {
return txn.Delete(bucketKey(bucket, key))
})
return err
}
// Put inserts or updates a single entry.
func (db *BadgerDatabase) Put(bucket, key []byte, value []byte) error {
err := db.db.Update(func(txn *badger.Txn) error {
return db.db.Update(func(txn *badger.Txn) error {
return txn.Set(bucketKey(bucket, key), value)
})
return err
}
// Get returns a single value.
@ -98,15 +96,16 @@ func (db *BadgerDatabase) Get(bucket, key []byte) ([]byte, error) {
// PutS adds a new entry to the historical buckets:
// hBucket (unless changeSetBucketOnly) and ChangeSet.
func (db *BadgerDatabase) PutS(hBucket, key, value []byte, timestamp uint64, changeSetBucketOnly bool) error {
composite, suffix := dbutils.CompositeKeySuffix(key, timestamp)
suffixkey := make([]byte, len(suffix)+len(hBucket))
copy(suffixkey, suffix)
copy(suffixkey[len(suffix):], hBucket)
composite, encodedStamp := dbutils.CompositeKeySuffix(key, timestamp)
hKey := bucketKey(hBucket, composite)
suffixkey := make([]byte, len(encodedStamp)+len(hBucket))
copy(suffixkey, encodedStamp)
copy(suffixkey[len(encodedStamp):], hBucket)
changeSetKey := bucketKey(dbutils.ChangeSetBucket, suffixkey)
err := db.db.Update(func(tx *badger.Txn) error {
return db.db.Update(func(tx *badger.Txn) error {
if !changeSetBucketOnly {
if err := tx.Set(hKey, value); err != nil {
return err
@ -121,10 +120,11 @@ func (db *BadgerDatabase) PutS(hBucket, key, value []byte, timestamp uint64, cha
var sh dbutils.ChangeSet
if err == nil {
err = changeSetItem.Value(func(val []byte) error {
sh, err = dbutils.Decode(val)
if err != nil {
log.Error("PutS Decode suffix err", "err", err)
return err
var err2 error
sh, err2 = dbutils.Decode(val)
if err2 != nil {
log.Error("PutS Decode suffix err", "err", err2)
return err2
}
return nil
})
@ -142,7 +142,48 @@ func (db *BadgerDatabase) PutS(hBucket, key, value []byte, timestamp uint64, cha
return tx.Set(changeSetKey, dat)
})
return err
}
// DeleteTimestamp removes data for a given timestamp from all historical buckets (incl. ChangeSet).
func (db *BadgerDatabase) DeleteTimestamp(timestamp uint64) error {
encodedStamp := dbutils.EncodeTimestamp(timestamp)
prefix := bucketKey(dbutils.ChangeSetBucket, encodedStamp)
return db.db.Update(func(tx *badger.Txn) error {
var keys [][]byte
it := tx.NewIterator(badger.DefaultIteratorOptions)
defer it.Close()
for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
item := it.Item()
k := item.Key()
var changedAccounts dbutils.ChangeSet
err := item.Value(func(v []byte) error {
var err2 error
changedAccounts, err2 = dbutils.Decode(v)
return err2
})
if err != nil {
return err
}
bucket := k[len(encodedStamp):]
err = changedAccounts.Walk(func(kk, _ []byte) error {
kk = append(kk, encodedStamp...)
return tx.Delete(bucketKey(bucket, kk))
})
if err != nil {
return err
}
keys = append(keys, k)
}
for _, k := range keys {
if err := tx.Delete(bucketKey(dbutils.ChangeSetBucket, k)); err != nil {
return err
}
}
return nil
})
}
// TODO [Andrew] implement the full Database interface

View File

@ -545,7 +545,7 @@ func (db *BoltDatabase) Delete(bucket, key []byte) error {
return err
}
// Deletes all keys with specified suffix from all the buckets
// DeleteTimestamp removes data for a given timestamp from all historical buckets (incl. ChangeSet).
func (db *BoltDatabase) DeleteTimestamp(timestamp uint64) error {
suffix := dbutils.EncodeTimestamp(timestamp)
err := db.db.Update(func(tx *bolt.Tx) error {
@ -566,10 +566,7 @@ func (db *BoltDatabase) DeleteTimestamp(timestamp uint64) error {
}
err = changedAccounts.Walk(func(kk, _ []byte) error {
kk = append(kk, suffix...)
if err := hb.Delete(kk); err != nil {
return err
}
return nil
return hb.Delete(kk)
})
if err != nil {
return err

View File

@ -80,7 +80,7 @@ func TestBadgerDB_PutGet(t *testing.T) {
testPutGet(db, t)
}
func testPutGet(db SimpleDatabase, t *testing.T) {
func testPutGet(db MinDatabase, t *testing.T) {
t.Parallel()
for _, k := range testValues {
@ -184,7 +184,7 @@ func TestBadgerDB_ParallelPutGet(t *testing.T) {
defer remove()
testParallelPutGet(db)
}
func testParallelPutGet(db SimpleDatabase) {
func testParallelPutGet(db MinDatabase) {
const n = 8
var pending sync.WaitGroup
@ -209,7 +209,7 @@ func testParallelPutGet(db SimpleDatabase) {
panic("get failed: " + err.Error())
}
if !bytes.Equal(data, []byte("v"+key)) {
panic(fmt.Sprintf("get failed, got %q expected %q", []byte(data), []byte("v"+key)))
panic(fmt.Sprintf("get failed, got %q expected %q", data, []byte("v"+key)))
}
}(strconv.Itoa(i))
}

View File

@ -27,9 +27,8 @@ type Putter interface {
// PutS adds a new entry to the historical buckets:
// hBucket (unless changeSetBucketOnly) and ChangeSet.
// timestamp == block number
PutS(hBucket, key, value []byte, timestamp uint64, changeSetBucketOnly bool) error
DeleteTimestamp(timestamp uint64) error
}
// Getter wraps the database read operations.
@ -49,6 +48,10 @@ type Getter interface {
type Deleter interface {
// Delete removes a single entry.
Delete(bucket, key []byte) error
// DeleteTimestamp removes data for a given timestamp from all historical buckets (incl. ChangeSet).
// timestamp == block number
DeleteTimestamp(timestamp uint64) error
}
// Database wraps all database operations. All methods are safe for concurrent use.
@ -69,12 +72,11 @@ type Database interface {
TruncateAncients(items uint64) error
}
// SimpleDatabase is a minimalistic version of the Database interface.
// TODO [Andrew] remove the interface.
type SimpleDatabase interface {
Deleter
Put(bucket, key, value []byte) error
// MinDatabase is a minimalistic version of the Database interface.
type MinDatabase interface {
Get(bucket, key []byte) ([]byte, error)
Put(bucket, key, value []byte) error
Delete(bucket, key []byte) error
}
// DbWithPendingMutations is an extended version of the Database,