2019-05-27 13:51:49 +00:00
|
|
|
package ethdb
|
|
|
|
|
|
|
|
import (
|
2019-12-20 12:25:40 +00:00
|
|
|
"bytes"
|
|
|
|
"fmt"
|
|
|
|
"github.com/ledgerwatch/turbo-geth/common/debug"
|
2019-11-21 18:38:00 +00:00
|
|
|
"sort"
|
2019-11-11 19:28:27 +00:00
|
|
|
"sync"
|
2019-12-20 12:25:40 +00:00
|
|
|
"sync/atomic"
|
2019-11-11 19:28:27 +00:00
|
|
|
|
2019-05-27 13:51:49 +00:00
|
|
|
"github.com/ledgerwatch/turbo-geth/common"
|
|
|
|
"github.com/ledgerwatch/turbo-geth/common/dbutils"
|
2019-10-31 10:59:00 +00:00
|
|
|
"github.com/ledgerwatch/turbo-geth/log"
|
2019-05-27 13:51:49 +00:00
|
|
|
)
|
|
|
|
|
2019-11-21 18:38:00 +00:00
|
|
|
type puts map[string]putsBucket //map[bucket]putsBucket
|
|
|
|
|
|
|
|
func newPuts() puts {
|
|
|
|
return make(puts)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (p puts) Set(bucket, key, value []byte) {
|
|
|
|
var bucketPuts putsBucket
|
|
|
|
var ok bool
|
|
|
|
if bucketPuts, ok = p[string(bucket)]; !ok {
|
|
|
|
bucketPuts = make(putsBucket)
|
|
|
|
p[string(bucket)] = bucketPuts
|
|
|
|
}
|
|
|
|
bucketPuts[string(key)] = value
|
|
|
|
}
|
|
|
|
|
|
|
|
func (p puts) Delete(bucket, key []byte) {
|
|
|
|
p.Set(bucket, key, nil)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (p puts) SetStr(bucket string, key, value []byte) {
|
|
|
|
var bucketPuts putsBucket
|
|
|
|
var ok bool
|
|
|
|
if bucketPuts, ok = p[bucket]; !ok {
|
|
|
|
bucketPuts = make(putsBucket)
|
|
|
|
p[bucket] = bucketPuts
|
|
|
|
}
|
|
|
|
bucketPuts[string(key)] = value
|
2019-10-30 17:33:01 +00:00
|
|
|
}
|
|
|
|
|
2019-11-21 18:38:00 +00:00
|
|
|
func (p puts) DeleteStr(bucket string, key []byte) {
|
|
|
|
p.SetStr(bucket, key, nil)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (p puts) Size() int {
|
|
|
|
var size int
|
|
|
|
for _, put := range p {
|
|
|
|
size += len(put)
|
|
|
|
}
|
|
|
|
return size
|
|
|
|
}
|
|
|
|
|
|
|
|
type putsBucket map[string][]byte //map[key]value
|
|
|
|
|
|
|
|
func (pb putsBucket) Get(key []byte) ([]byte, bool) {
|
|
|
|
value, ok := pb[string(key)]
|
|
|
|
if !ok {
|
|
|
|
return nil, false
|
|
|
|
}
|
|
|
|
|
|
|
|
if value == nil {
|
|
|
|
return nil, true
|
|
|
|
}
|
|
|
|
|
|
|
|
v := make([]byte, len(value))
|
|
|
|
copy(v, value)
|
|
|
|
|
|
|
|
return v, true
|
|
|
|
}
|
|
|
|
|
|
|
|
func (pb putsBucket) GetStr(key string) ([]byte, bool) {
|
|
|
|
value, ok := pb[key]
|
|
|
|
if !ok {
|
|
|
|
return nil, false
|
|
|
|
}
|
|
|
|
|
|
|
|
if value == nil {
|
|
|
|
return nil, true
|
|
|
|
}
|
|
|
|
|
|
|
|
v := make([]byte, len(value))
|
|
|
|
copy(v, value)
|
|
|
|
|
|
|
|
return v, true
|
2019-10-30 17:33:01 +00:00
|
|
|
}
|
|
|
|
|
2019-05-27 13:51:49 +00:00
|
|
|
type mutation struct {
|
2019-12-20 12:25:40 +00:00
|
|
|
puts puts // Map buckets to map[key]value
|
|
|
|
//map[timestamp]map[hBucket]listOfChangedKeys
|
|
|
|
changeSetByBlock map[uint64]map[string]*dbutils.ChangeSet
|
2019-11-07 15:51:25 +00:00
|
|
|
mu sync.RWMutex
|
|
|
|
db Database
|
2019-05-27 13:51:49 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (m *mutation) getMem(bucket, key []byte) ([]byte, bool) {
|
|
|
|
m.mu.RLock()
|
|
|
|
defer m.mu.RUnlock()
|
2019-12-20 12:25:40 +00:00
|
|
|
|
2019-05-27 13:51:49 +00:00
|
|
|
if t, ok := m.puts[string(bucket)]; ok {
|
2019-11-21 18:38:00 +00:00
|
|
|
return t.Get(key)
|
2019-05-27 13:51:49 +00:00
|
|
|
}
|
|
|
|
return nil, false
|
|
|
|
}
|
|
|
|
|
|
|
|
// Can only be called from the worker thread
|
|
|
|
func (m *mutation) Get(bucket, key []byte) ([]byte, error) {
|
|
|
|
if value, ok := m.getMem(bucket, key); ok {
|
|
|
|
if value == nil {
|
|
|
|
return nil, ErrKeyNotFound
|
|
|
|
}
|
|
|
|
return value, nil
|
|
|
|
}
|
|
|
|
if m.db != nil {
|
|
|
|
return m.db.Get(bucket, key)
|
|
|
|
}
|
|
|
|
return nil, ErrKeyNotFound
|
|
|
|
}
|
|
|
|
|
2019-12-20 12:25:40 +00:00
|
|
|
func (m *mutation) getChangeSetByBlockNoLock(bucket []byte, timestamp uint64) (*dbutils.ChangeSet, error) {
|
|
|
|
bucketMap, ok := m.changeSetByBlock[timestamp]
|
|
|
|
if !ok {
|
|
|
|
return nil, ErrKeyNotFound
|
|
|
|
}
|
|
|
|
changeSet, ok := bucketMap[string(bucket)]
|
|
|
|
if !ok {
|
|
|
|
return nil, ErrKeyNotFound
|
|
|
|
}
|
|
|
|
|
|
|
|
return changeSet, nil
|
|
|
|
}
|
|
|
|
|
2019-05-27 13:51:49 +00:00
|
|
|
func (m *mutation) GetS(hBucket, key []byte, timestamp uint64) ([]byte, error) {
|
2019-12-20 12:25:40 +00:00
|
|
|
if !debug.IsThinHistory() || bytes.Equal(hBucket, dbutils.StorageHistoryBucket) {
|
|
|
|
composite, _ := dbutils.CompositeKeySuffix(key, timestamp)
|
|
|
|
return m.Get(hBucket, composite)
|
|
|
|
}
|
|
|
|
|
|
|
|
m.mu.RLock()
|
|
|
|
chs, err := m.getChangeSetByBlockNoLock(hBucket, timestamp)
|
|
|
|
m.mu.Unlock()
|
|
|
|
if err != nil {
|
|
|
|
if m.db != nil {
|
|
|
|
return m.db.GetS(hBucket, key, timestamp)
|
|
|
|
}
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
return chs.FindLast(key)
|
2019-05-27 13:51:49 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (m *mutation) getNoLock(bucket, key []byte) ([]byte, error) {
|
|
|
|
if t, ok := m.puts[string(bucket)]; ok {
|
2019-11-21 18:38:00 +00:00
|
|
|
value, ok := t.Get(key)
|
|
|
|
if ok && value == nil {
|
|
|
|
return nil, ErrKeyNotFound
|
2019-05-27 13:51:49 +00:00
|
|
|
}
|
2019-11-21 18:38:00 +00:00
|
|
|
return value, nil
|
2019-05-27 13:51:49 +00:00
|
|
|
}
|
|
|
|
if m.db != nil {
|
|
|
|
return m.db.Get(bucket, key)
|
|
|
|
}
|
|
|
|
return nil, ErrKeyNotFound
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *mutation) hasMem(bucket, key []byte) bool {
|
|
|
|
m.mu.RLock()
|
|
|
|
defer m.mu.RUnlock()
|
|
|
|
if t, ok := m.puts[string(bucket)]; ok {
|
2019-11-21 18:38:00 +00:00
|
|
|
_, ok = t.Get(key)
|
|
|
|
return ok
|
2019-05-27 13:51:49 +00:00
|
|
|
}
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *mutation) Has(bucket, key []byte) (bool, error) {
|
|
|
|
if m.hasMem(bucket, key) {
|
|
|
|
return true, nil
|
|
|
|
}
|
|
|
|
if m.db != nil {
|
|
|
|
return m.db.Has(bucket, key)
|
|
|
|
}
|
|
|
|
return false, nil
|
|
|
|
}
|
|
|
|
|
2019-11-27 13:15:25 +00:00
|
|
|
func (m *mutation) DiskSize() int64 {
|
2019-05-27 13:51:49 +00:00
|
|
|
if m.db == nil {
|
|
|
|
return 0
|
|
|
|
}
|
2019-11-27 13:15:25 +00:00
|
|
|
return m.db.DiskSize()
|
2019-05-27 13:51:49 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (m *mutation) Put(bucket, key []byte, value []byte) error {
|
|
|
|
bb := make([]byte, len(bucket))
|
|
|
|
copy(bb, bucket)
|
|
|
|
k := make([]byte, len(key))
|
|
|
|
copy(k, key)
|
|
|
|
v := make([]byte, len(value))
|
|
|
|
copy(v, value)
|
|
|
|
m.mu.Lock()
|
|
|
|
defer m.mu.Unlock()
|
2019-11-21 18:38:00 +00:00
|
|
|
|
2019-12-20 12:25:40 +00:00
|
|
|
m.puts.Set(bb, key, value)
|
2019-05-27 13:51:49 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Assumes that bucket, key, and value won't be modified
|
2019-10-31 10:59:00 +00:00
|
|
|
func (m *mutation) PutS(hBucket, key, value []byte, timestamp uint64, noHistory bool) error {
|
2019-05-27 13:51:49 +00:00
|
|
|
//fmt.Printf("PutS bucket %x key %x value %x timestamp %d\n", bucket, key, value, timestamp)
|
2019-12-20 12:25:40 +00:00
|
|
|
m.mu.Lock()
|
|
|
|
defer m.mu.Unlock()
|
|
|
|
|
|
|
|
hBucketStr := string(hBucket)
|
2019-11-07 15:51:25 +00:00
|
|
|
changesByBucket, ok := m.changeSetByBlock[timestamp]
|
2019-05-27 13:51:49 +00:00
|
|
|
if !ok {
|
2019-12-20 12:25:40 +00:00
|
|
|
changesByBucket = make(map[string]*dbutils.ChangeSet)
|
2019-11-07 15:51:25 +00:00
|
|
|
m.changeSetByBlock[timestamp] = changesByBucket
|
2019-05-27 13:51:49 +00:00
|
|
|
}
|
2019-12-20 12:25:40 +00:00
|
|
|
changeSet, ok := changesByBucket[hBucketStr]
|
2019-05-27 13:51:49 +00:00
|
|
|
if !ok {
|
2019-12-20 12:25:40 +00:00
|
|
|
changeSet = dbutils.NewChangeSet()
|
2019-05-27 13:51:49 +00:00
|
|
|
}
|
2019-12-20 12:25:40 +00:00
|
|
|
|
|
|
|
err := changeSet.Add(key, value)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
changesByBucket[hBucketStr] = changeSet
|
2019-10-31 10:59:00 +00:00
|
|
|
if noHistory {
|
|
|
|
return nil
|
|
|
|
}
|
2019-12-20 12:25:40 +00:00
|
|
|
if !debug.IsThinHistory() || bytes.Equal(hBucket, dbutils.StorageHistoryBucket) {
|
|
|
|
composite, _ := dbutils.CompositeKeySuffix(key, timestamp)
|
|
|
|
m.puts.Set(hBucket, composite, value)
|
|
|
|
}
|
|
|
|
|
2019-05-27 13:51:49 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *mutation) MultiPut(tuples ...[]byte) (uint64, error) {
|
|
|
|
m.mu.Lock()
|
|
|
|
defer m.mu.Unlock()
|
|
|
|
l := len(tuples)
|
|
|
|
for i := 0; i < l; i += 3 {
|
2019-11-21 18:38:00 +00:00
|
|
|
m.puts.Set(tuples[i], tuples[i+1], tuples[i+2])
|
2019-05-27 13:51:49 +00:00
|
|
|
}
|
|
|
|
return 0, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *mutation) BatchSize() int {
|
|
|
|
m.mu.RLock()
|
|
|
|
defer m.mu.RUnlock()
|
2019-11-21 18:38:00 +00:00
|
|
|
return m.puts.Size()
|
2019-05-27 13:51:49 +00:00
|
|
|
}
|
|
|
|
|
2019-11-21 15:12:38 +00:00
|
|
|
// IdealBatchSize defines the size of the data batches should ideally add in one write.
|
|
|
|
func (m *mutation) IdealBatchSize() int {
|
|
|
|
return m.db.IdealBatchSize()
|
|
|
|
}
|
|
|
|
|
2019-05-27 13:51:49 +00:00
|
|
|
func (m *mutation) GetAsOf(bucket, hBucket, key []byte, timestamp uint64) ([]byte, error) {
|
2019-12-20 12:25:40 +00:00
|
|
|
m.panicOnEmptyDB()
|
|
|
|
return m.db.GetAsOf(bucket, hBucket, key, timestamp)
|
2019-05-27 13:51:49 +00:00
|
|
|
}
|
|
|
|
|
2019-10-30 17:33:01 +00:00
|
|
|
// WARNING: Merged mem/DB walk is not implemented
|
2019-05-27 13:51:49 +00:00
|
|
|
func (m *mutation) Walk(bucket, startkey []byte, fixedbits uint, walker func([]byte, []byte) (bool, error)) error {
|
2019-12-20 12:25:40 +00:00
|
|
|
m.panicOnEmptyDB()
|
2019-05-27 13:51:49 +00:00
|
|
|
return m.db.Walk(bucket, startkey, fixedbits, walker)
|
|
|
|
}
|
|
|
|
|
2019-10-30 17:33:01 +00:00
|
|
|
// WARNING: Merged mem/DB walk is not implemented
|
2019-11-21 15:12:38 +00:00
|
|
|
func (m *mutation) MultiWalk(bucket []byte, startkeys [][]byte, fixedbits []uint, walker func(int, []byte, []byte) error) error {
|
2019-12-20 12:25:40 +00:00
|
|
|
m.panicOnEmptyDB()
|
2019-05-27 13:51:49 +00:00
|
|
|
return m.db.MultiWalk(bucket, startkeys, fixedbits, walker)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *mutation) WalkAsOf(bucket, hBucket, startkey []byte, fixedbits uint, timestamp uint64, walker func([]byte, []byte) (bool, error)) error {
|
2019-12-20 12:25:40 +00:00
|
|
|
m.panicOnEmptyDB()
|
2019-05-27 13:51:49 +00:00
|
|
|
return m.db.WalkAsOf(bucket, hBucket, startkey, fixedbits, timestamp, walker)
|
|
|
|
}
|
|
|
|
|
2019-11-21 15:12:38 +00:00
|
|
|
func (m *mutation) MultiWalkAsOf(bucket, hBucket []byte, startkeys [][]byte, fixedbits []uint, timestamp uint64, walker func(int, []byte, []byte) error) error {
|
2019-12-20 12:25:40 +00:00
|
|
|
m.panicOnEmptyDB()
|
2019-05-27 13:51:49 +00:00
|
|
|
return m.db.MultiWalkAsOf(bucket, hBucket, startkeys, fixedbits, timestamp, walker)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *mutation) RewindData(timestampSrc, timestampDst uint64, df func(hBucket, key, value []byte) error) error {
|
2019-12-10 05:37:18 +00:00
|
|
|
return RewindData(m, timestampSrc, timestampDst, df)
|
2019-05-27 13:51:49 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (m *mutation) Delete(bucket, key []byte) error {
|
|
|
|
m.mu.Lock()
|
|
|
|
defer m.mu.Unlock()
|
2019-11-21 18:38:00 +00:00
|
|
|
m.puts.Delete(bucket, key)
|
2019-05-27 13:51:49 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2019-11-07 15:51:25 +00:00
|
|
|
// Deletes all keys with specified suffix(blockNum) from all the buckets
|
2019-05-27 13:51:49 +00:00
|
|
|
func (m *mutation) DeleteTimestamp(timestamp uint64) error {
|
2019-11-07 15:51:25 +00:00
|
|
|
encodedTS := dbutils.EncodeTimestamp(timestamp)
|
2019-05-27 13:51:49 +00:00
|
|
|
m.mu.Lock()
|
|
|
|
defer m.mu.Unlock()
|
2019-12-20 12:25:40 +00:00
|
|
|
|
2019-11-07 15:51:25 +00:00
|
|
|
err := m.Walk(dbutils.ChangeSetBucket, encodedTS, uint(8*len(encodedTS)), func(k, v []byte) (bool, error) {
|
|
|
|
// k = encodedTS + hBucket
|
2019-12-20 12:25:40 +00:00
|
|
|
hBucket := k[len(encodedTS):]
|
|
|
|
hBucketStr := string(hBucket)
|
|
|
|
changedAccounts, err := dbutils.DecodeChangeSet(v)
|
2019-10-31 10:59:00 +00:00
|
|
|
if err != nil {
|
|
|
|
return false, err
|
|
|
|
}
|
2019-12-20 12:25:40 +00:00
|
|
|
if !debug.IsThinHistory() || bytes.Equal(hBucket, dbutils.StorageHistoryBucket) {
|
|
|
|
innerErr := changedAccounts.Walk(func(kk, _ []byte) error {
|
|
|
|
composite, _ := dbutils.CompositeKeySuffix(kk, timestamp)
|
|
|
|
m.puts.DeleteStr(hBucketStr, composite)
|
|
|
|
return nil
|
|
|
|
})
|
|
|
|
|
|
|
|
if innerErr != nil {
|
|
|
|
return false, innerErr
|
|
|
|
}
|
|
|
|
m.puts.DeleteStr(string(dbutils.ChangeSetBucket), k)
|
|
|
|
} else {
|
|
|
|
innerErr := changedAccounts.Walk(func(kk, _ []byte) error {
|
|
|
|
acc, err := m.getNoLock(dbutils.AccountsHistoryBucket, kk)
|
|
|
|
if err != nil {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
v, isEmpty, removeErr := RemoveFromIndex(acc, timestamp)
|
|
|
|
if removeErr != nil {
|
|
|
|
return removeErr
|
|
|
|
}
|
|
|
|
if isEmpty {
|
|
|
|
m.puts.DeleteStr(hBucketStr, kk)
|
|
|
|
} else {
|
|
|
|
m.puts.SetStr(hBucketStr, kk, v)
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
})
|
|
|
|
if innerErr != nil {
|
|
|
|
return false, innerErr
|
|
|
|
}
|
|
|
|
m.puts.DeleteStr(string(dbutils.ChangeSetBucket), k)
|
2019-05-27 13:51:49 +00:00
|
|
|
}
|
|
|
|
return true, nil
|
|
|
|
})
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *mutation) Commit() (uint64, error) {
|
|
|
|
if m.db == nil {
|
|
|
|
return 0, nil
|
|
|
|
}
|
|
|
|
m.mu.Lock()
|
|
|
|
defer m.mu.Unlock()
|
2019-11-07 15:51:25 +00:00
|
|
|
if len(m.changeSetByBlock) > 0 {
|
2019-11-21 18:38:00 +00:00
|
|
|
changeSetStr := string(dbutils.ChangeSetBucket)
|
2019-11-07 15:51:25 +00:00
|
|
|
for timestamp, changesByBucket := range m.changeSetByBlock {
|
|
|
|
encodedTS := dbutils.EncodeTimestamp(timestamp)
|
2019-12-20 12:25:40 +00:00
|
|
|
for hBucketStr, changes := range changesByBucket {
|
|
|
|
hBucket := []byte(hBucketStr)
|
2019-11-07 15:51:25 +00:00
|
|
|
changeSetKey := dbutils.CompositeChangeSetKey(encodedTS, hBucket)
|
|
|
|
dat, err := m.getNoLock(dbutils.ChangeSetBucket, changeSetKey)
|
2019-05-27 13:51:49 +00:00
|
|
|
if err != nil && err != ErrKeyNotFound {
|
|
|
|
return 0, err
|
|
|
|
}
|
2019-10-31 10:59:00 +00:00
|
|
|
|
2019-12-20 12:25:40 +00:00
|
|
|
changeSet, err := dbutils.DecodeChangeSet(dat)
|
2019-10-31 10:59:00 +00:00
|
|
|
if err != nil {
|
2019-12-20 12:25:40 +00:00
|
|
|
log.Error("DecodeChangeSet changeSet error on commit", "err", err)
|
2019-10-31 10:59:00 +00:00
|
|
|
}
|
|
|
|
|
2019-12-20 12:25:40 +00:00
|
|
|
if err = changeSet.MultiAdd(changes.Changes); err != nil {
|
2019-12-03 11:13:12 +00:00
|
|
|
return 0, err
|
|
|
|
}
|
2019-12-20 12:25:40 +00:00
|
|
|
changedRLP, err := changeSet.Encode()
|
2019-10-31 10:59:00 +00:00
|
|
|
if err != nil {
|
2019-12-20 12:25:40 +00:00
|
|
|
log.Error("EncodeChangeSet changeSet error on commit", "err", err)
|
|
|
|
}
|
|
|
|
if debug.IsThinHistory() {
|
|
|
|
changedKeys := changeSet.ChangedKeys()
|
|
|
|
for k := range changedKeys {
|
|
|
|
value, err := m.getNoLock(hBucket, []byte(k))
|
|
|
|
if err == ErrKeyNotFound {
|
|
|
|
if m.db != nil {
|
|
|
|
value, err = m.db.Get(hBucket, []byte(k))
|
|
|
|
if err != nil && err != ErrKeyNotFound {
|
|
|
|
return 0, err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
v, err := AppendToIndex(value, timestamp)
|
|
|
|
if err != nil {
|
|
|
|
log.Error("mutation, append to index", "err", err, "timestamp", timestamp)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
m.puts.SetStr(hBucketStr, []byte(k), v)
|
|
|
|
}
|
2019-10-31 10:59:00 +00:00
|
|
|
}
|
2019-11-21 18:38:00 +00:00
|
|
|
m.puts.SetStr(changeSetStr, changeSetKey, changedRLP)
|
2019-05-27 13:51:49 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2019-12-20 12:25:40 +00:00
|
|
|
m.changeSetByBlock = make(map[uint64]map[string]*dbutils.ChangeSet)
|
2019-11-21 18:38:00 +00:00
|
|
|
|
|
|
|
tuples := common.NewTuples(m.puts.Size(), 3, 1)
|
2019-05-27 13:51:49 +00:00
|
|
|
for bucketStr, bt := range m.puts {
|
|
|
|
bucketB := []byte(bucketStr)
|
2019-11-21 18:38:00 +00:00
|
|
|
for key := range bt {
|
|
|
|
value, _ := bt.GetStr(key)
|
|
|
|
if err := tuples.Append(bucketB, []byte(key), value); err != nil {
|
|
|
|
return 0, err
|
|
|
|
}
|
|
|
|
}
|
2019-05-27 13:51:49 +00:00
|
|
|
}
|
2019-11-21 18:38:00 +00:00
|
|
|
sort.Sort(tuples)
|
|
|
|
|
|
|
|
written, err := m.db.MultiPut(tuples.Values...)
|
|
|
|
if err != nil {
|
|
|
|
return 0, err
|
2019-05-27 13:51:49 +00:00
|
|
|
}
|
2019-11-21 18:38:00 +00:00
|
|
|
m.puts = make(puts)
|
2019-05-27 13:51:49 +00:00
|
|
|
return written, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *mutation) Rollback() {
|
|
|
|
m.mu.Lock()
|
|
|
|
defer m.mu.Unlock()
|
2019-12-20 12:25:40 +00:00
|
|
|
m.changeSetByBlock = make(map[uint64]map[string]*dbutils.ChangeSet)
|
2019-11-21 18:38:00 +00:00
|
|
|
m.puts = make(puts)
|
2019-05-27 13:51:49 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (m *mutation) Keys() ([][]byte, error) {
|
|
|
|
m.mu.RLock()
|
|
|
|
defer m.mu.RUnlock()
|
2019-11-21 18:38:00 +00:00
|
|
|
tuples := common.NewTuples(m.puts.Size(), 2, 1)
|
2019-05-27 13:51:49 +00:00
|
|
|
for bucketStr, bt := range m.puts {
|
|
|
|
bucketB := []byte(bucketStr)
|
2019-11-21 18:38:00 +00:00
|
|
|
for key := range bt {
|
|
|
|
if err := tuples.Append(bucketB, []byte(key)); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
}
|
2019-05-27 13:51:49 +00:00
|
|
|
}
|
2019-11-21 18:38:00 +00:00
|
|
|
sort.Sort(tuples)
|
|
|
|
return tuples.Values, nil
|
2019-05-27 13:51:49 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (m *mutation) Close() {
|
|
|
|
m.Rollback()
|
|
|
|
}
|
|
|
|
|
2019-10-30 17:33:01 +00:00
|
|
|
func (m *mutation) NewBatch() DbWithPendingMutations {
|
2019-05-27 13:51:49 +00:00
|
|
|
mm := &mutation{
|
2019-11-07 15:51:25 +00:00
|
|
|
db: m,
|
2019-11-21 18:38:00 +00:00
|
|
|
puts: newPuts(),
|
2019-12-20 12:25:40 +00:00
|
|
|
changeSetByBlock: make(map[uint64]map[string]*dbutils.ChangeSet),
|
2019-05-27 13:51:49 +00:00
|
|
|
}
|
|
|
|
return mm
|
|
|
|
}
|
|
|
|
|
2019-12-20 12:25:40 +00:00
|
|
|
func (m *mutation) panicOnEmptyDB() {
|
|
|
|
if m.db == nil {
|
|
|
|
panic("Not implemented")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-05-27 13:51:49 +00:00
|
|
|
func (m *mutation) MemCopy() Database {
|
2019-12-20 12:25:40 +00:00
|
|
|
m.panicOnEmptyDB()
|
|
|
|
return m.db
|
2019-05-27 13:51:49 +00:00
|
|
|
}
|
|
|
|
|
2019-12-10 13:12:21 +00:00
|
|
|
func (m *mutation) ID() uint64 {
|
|
|
|
return m.db.ID()
|
|
|
|
}
|
|
|
|
|
2019-05-27 13:51:49 +00:00
|
|
|
// [TURBO-GETH] Freezer support (not implemented yet)
|
|
|
|
// Ancients returns an error as we don't have a backing chain freezer.
|
|
|
|
func (m *mutation) Ancients() (uint64, error) {
|
|
|
|
return 0, errNotSupported
|
|
|
|
}
|
|
|
|
|
|
|
|
// TruncateAncients returns an error as we don't have a backing chain freezer.
|
|
|
|
func (m *mutation) TruncateAncients(items uint64) error {
|
|
|
|
return errNotSupported
|
|
|
|
}
|
2019-12-20 12:25:40 +00:00
|
|
|
|
|
|
|
func NewRWDecorator(db Database) *RWCounterDecorator {
|
|
|
|
return &RWCounterDecorator{
|
|
|
|
db,
|
|
|
|
DBCounterStats{},
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
type RWCounterDecorator struct {
|
|
|
|
Database
|
|
|
|
DBCounterStats
|
|
|
|
}
|
|
|
|
|
|
|
|
type DBCounterStats struct {
|
|
|
|
Put uint64
|
|
|
|
PutS uint64
|
|
|
|
Get uint64
|
|
|
|
GetS uint64
|
|
|
|
GetAsOf uint64
|
|
|
|
Has uint64
|
|
|
|
Walk uint64
|
|
|
|
WalkAsOf uint64
|
|
|
|
MultiWalk uint64
|
|
|
|
MultiWalkAsOf uint64
|
|
|
|
Delete uint64
|
|
|
|
DeleteTimestamp uint64
|
|
|
|
MultiPut uint64
|
|
|
|
}
|
|
|
|
|
|
|
|
func (d *RWCounterDecorator) Put(bucket, key, value []byte) error {
|
|
|
|
atomic.AddUint64(&d.DBCounterStats.Put, 1)
|
|
|
|
fmt.Println("PUT", string(bucket), key)
|
|
|
|
return d.Database.Put(bucket, key, value)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (d *RWCounterDecorator) PutS(hBucket, key, value []byte, timestamp uint64, changeSetBucketOnly bool) error {
|
|
|
|
atomic.AddUint64(&d.DBCounterStats.PutS, 1)
|
|
|
|
return d.Database.PutS(hBucket, key, value, timestamp, changeSetBucketOnly)
|
|
|
|
}
|
|
|
|
func (d *RWCounterDecorator) Get(bucket, key []byte) ([]byte, error) {
|
|
|
|
atomic.AddUint64(&d.DBCounterStats.Get, 1)
|
|
|
|
return d.Database.Get(bucket, key)
|
|
|
|
}
|
|
|
|
func (d *RWCounterDecorator) GetS(hBucket, key []byte, timestamp uint64) ([]byte, error) {
|
|
|
|
atomic.AddUint64(&d.DBCounterStats.GetS, 1)
|
|
|
|
return d.Database.GetS(hBucket, key, timestamp)
|
|
|
|
}
|
|
|
|
func (d *RWCounterDecorator) GetAsOf(bucket, hBucket, key []byte, timestamp uint64) ([]byte, error) {
|
|
|
|
atomic.AddUint64(&d.DBCounterStats.GetAsOf, 1)
|
|
|
|
return d.Database.GetAsOf(bucket, hBucket, key, timestamp)
|
|
|
|
}
|
|
|
|
func (d *RWCounterDecorator) Has(bucket, key []byte) (bool, error) {
|
|
|
|
atomic.AddUint64(&d.DBCounterStats.Has, 1)
|
|
|
|
return d.Database.Has(bucket, key)
|
|
|
|
}
|
|
|
|
func (d *RWCounterDecorator) Walk(bucket, startkey []byte, fixedbits uint, walker func([]byte, []byte) (bool, error)) error {
|
|
|
|
atomic.AddUint64(&d.DBCounterStats.Walk, 1)
|
|
|
|
return d.Database.Walk(bucket, startkey, fixedbits, walker)
|
|
|
|
}
|
|
|
|
func (d *RWCounterDecorator) MultiWalk(bucket []byte, startkeys [][]byte, fixedbits []uint, walker func(int, []byte, []byte) error) error {
|
|
|
|
atomic.AddUint64(&d.DBCounterStats.MultiWalk, 1)
|
|
|
|
return d.Database.MultiWalk(bucket, startkeys, fixedbits, walker)
|
|
|
|
}
|
|
|
|
func (d *RWCounterDecorator) WalkAsOf(bucket, hBucket, startkey []byte, fixedbits uint, timestamp uint64, walker func([]byte, []byte) (bool, error)) error {
|
|
|
|
atomic.AddUint64(&d.DBCounterStats.WalkAsOf, 1)
|
|
|
|
return d.Database.WalkAsOf(bucket, hBucket, startkey, fixedbits, timestamp, walker)
|
|
|
|
}
|
|
|
|
func (d *RWCounterDecorator) MultiWalkAsOf(bucket, hBucket []byte, startkeys [][]byte, fixedbits []uint, timestamp uint64, walker func(int, []byte, []byte) error) error {
|
|
|
|
atomic.AddUint64(&d.DBCounterStats.MultiWalkAsOf, 1)
|
|
|
|
return d.Database.MultiWalkAsOf(bucket, hBucket, startkeys, fixedbits, timestamp, walker)
|
|
|
|
}
|
|
|
|
func (d *RWCounterDecorator) Delete(bucket, key []byte) error {
|
|
|
|
atomic.AddUint64(&d.DBCounterStats.Delete, 1)
|
|
|
|
return d.Database.Delete(bucket, key)
|
|
|
|
}
|
|
|
|
func (d *RWCounterDecorator) DeleteTimestamp(timestamp uint64) error {
|
|
|
|
atomic.AddUint64(&d.DBCounterStats.DeleteTimestamp, 1)
|
|
|
|
return d.Database.DeleteTimestamp(timestamp)
|
|
|
|
}
|
|
|
|
func (d *RWCounterDecorator) MultiPut(tuples ...[]byte) (uint64, error) {
|
|
|
|
atomic.AddUint64(&d.DBCounterStats.MultiPut, 1)
|
|
|
|
return d.Database.MultiPut(tuples...)
|
|
|
|
}
|
|
|
|
func (d *RWCounterDecorator) MemCopy() Database {
|
|
|
|
return d.Database.MemCopy()
|
|
|
|
}
|
|
|
|
func (d *RWCounterDecorator) NewBatch() DbWithPendingMutations {
|
|
|
|
mm := &mutation{
|
|
|
|
db: d,
|
|
|
|
puts: newPuts(),
|
|
|
|
changeSetByBlock: make(map[uint64]map[string]*dbutils.ChangeSet),
|
|
|
|
}
|
|
|
|
return mm
|
|
|
|
}
|