mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2025-01-12 14:00:05 +00:00
57777e7c60
* Add kv.tx.bucket.Clear() and db.ClearBuckets() methods * Add kv.tx.bucket.Clear() and db.ClearBuckets() methods * choose db based on file suffix * implement db.id method * implement db.id method * use ethdb.NewDatabase method * use ethb.MustOpen method * cleanup * support TEST_DB env flag * create db path automatically needed * bolt - don't change prefix on happy path
307 lines
8.6 KiB
Go
307 lines
8.6 KiB
Go
package core
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/binary"
|
|
"errors"
|
|
"fmt"
|
|
"os"
|
|
"runtime"
|
|
"time"
|
|
|
|
"github.com/ledgerwatch/turbo-geth/common"
|
|
"github.com/ledgerwatch/turbo-geth/common/changeset"
|
|
"github.com/ledgerwatch/turbo-geth/common/dbutils"
|
|
"github.com/ledgerwatch/turbo-geth/common/etl"
|
|
"github.com/ledgerwatch/turbo-geth/ethdb"
|
|
"github.com/ledgerwatch/turbo-geth/log"
|
|
)
|
|
|
|
func NewIndexGenerator(db ethdb.Database, quitCh chan struct{}) *IndexGenerator {
|
|
return &IndexGenerator{
|
|
db: db,
|
|
ChangeSetBufSize: 256 * 1024 * 1024,
|
|
TempDir: os.TempDir(),
|
|
quitCh: quitCh,
|
|
}
|
|
}
|
|
|
|
type IndexGenerator struct {
|
|
db ethdb.Database
|
|
ChangeSetBufSize int
|
|
TempDir string
|
|
quitCh chan struct{}
|
|
}
|
|
|
|
var CSMapper = map[string]struct {
|
|
IndexBucket []byte
|
|
WalkerAdapter func(v []byte) changeset.Walker
|
|
KeySize int
|
|
Template string
|
|
New func() *changeset.ChangeSet
|
|
Encode func(*changeset.ChangeSet) ([]byte, error)
|
|
}{
|
|
string(dbutils.AccountChangeSetBucket): {
|
|
IndexBucket: dbutils.AccountsHistoryBucket,
|
|
WalkerAdapter: func(v []byte) changeset.Walker {
|
|
return changeset.AccountChangeSetBytes(v)
|
|
},
|
|
KeySize: common.HashLength,
|
|
Template: "acc-ind-",
|
|
New: changeset.NewAccountChangeSet,
|
|
Encode: changeset.EncodeAccounts,
|
|
},
|
|
string(dbutils.StorageChangeSetBucket): {
|
|
IndexBucket: dbutils.StorageHistoryBucket,
|
|
WalkerAdapter: func(v []byte) changeset.Walker {
|
|
return changeset.StorageChangeSetBytes(v)
|
|
},
|
|
KeySize: common.HashLength*2 + common.IncarnationLength,
|
|
Template: "st-ind-",
|
|
New: changeset.NewStorageChangeSet,
|
|
Encode: changeset.EncodeStorage,
|
|
},
|
|
string(dbutils.PlainAccountChangeSetBucket): {
|
|
IndexBucket: dbutils.AccountsHistoryBucket,
|
|
WalkerAdapter: func(v []byte) changeset.Walker {
|
|
return changeset.AccountChangeSetPlainBytes(v)
|
|
},
|
|
KeySize: common.AddressLength,
|
|
Template: "acc-ind-",
|
|
New: changeset.NewAccountChangeSetPlain,
|
|
Encode: changeset.EncodeAccountsPlain,
|
|
},
|
|
string(dbutils.PlainStorageChangeSetBucket): {
|
|
IndexBucket: dbutils.StorageHistoryBucket,
|
|
WalkerAdapter: func(v []byte) changeset.Walker {
|
|
return changeset.StorageChangeSetPlainBytes(v)
|
|
},
|
|
KeySize: common.AddressLength + common.IncarnationLength + common.HashLength,
|
|
Template: "st-ind-",
|
|
New: changeset.NewStorageChangeSetPlain,
|
|
Encode: changeset.EncodeStoragePlain,
|
|
},
|
|
}
|
|
|
|
func (ig *IndexGenerator) GenerateIndex(startBlock, endBlock uint64, changeSetBucket []byte) error {
|
|
v, ok := CSMapper[string(changeSetBucket)]
|
|
if !ok {
|
|
return errors.New("unknown bucket type")
|
|
}
|
|
log.Info("Index generation started", "from", startBlock, "to", endBlock, "csbucket", string(changeSetBucket))
|
|
if endBlock < startBlock && endBlock != 0 {
|
|
return fmt.Errorf("generateIndex %s: endBlock %d smaller than startBlock %d", changeSetBucket, endBlock, startBlock)
|
|
}
|
|
var (
|
|
endBlockKey []byte
|
|
chunks [][]byte
|
|
)
|
|
if endBlock != 0 {
|
|
endBlockKey = dbutils.EncodeTimestamp(endBlock)
|
|
chunks = calculateIndexChunks(startBlock, endBlock, runtime.NumCPU()/2+1)
|
|
}
|
|
t := time.Now()
|
|
err := etl.Transform(ig.db, changeSetBucket,
|
|
v.IndexBucket,
|
|
os.TempDir(),
|
|
getExtractFunc(v.WalkerAdapter),
|
|
loadFunc,
|
|
etl.TransformArgs{
|
|
ExtractStartKey: dbutils.EncodeTimestamp(startBlock),
|
|
ExtractEndKey: endBlockKey,
|
|
FixedBits: 0,
|
|
Chunks: chunks,
|
|
BufferType: etl.SortableAppendBuffer,
|
|
BufferSize: ig.ChangeSetBufSize,
|
|
Quit: ig.quitCh,
|
|
},
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
log.Info("Index generation successfully finished", "csbucket", string(changeSetBucket), "it took", time.Since(t))
|
|
return nil
|
|
}
|
|
|
|
func (ig *IndexGenerator) Truncate(timestampTo uint64, changeSetBucket []byte) error {
|
|
vv, ok := CSMapper[string(changeSetBucket)]
|
|
if !ok {
|
|
return errors.New("unknown bucket type")
|
|
}
|
|
|
|
currentKey := dbutils.EncodeTimestamp(timestampTo)
|
|
keys := make(map[string]struct{})
|
|
err := ig.db.Walk(changeSetBucket, currentKey, 0, func(k, v []byte) (b bool, e error) {
|
|
if err := common.Stopped(ig.quitCh); err != nil {
|
|
return false, err
|
|
}
|
|
|
|
currentKey = common.CopyBytes(k)
|
|
err := vv.WalkerAdapter(v).Walk(func(kk []byte, _ []byte) error {
|
|
keys[string(kk)] = struct{}{}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
return true, nil
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
historyEffects := make(map[string][]byte)
|
|
keySize := vv.KeySize
|
|
if bytes.Equal(dbutils.StorageChangeSetBucket, changeSetBucket) || bytes.Equal(dbutils.PlainStorageChangeSetBucket, changeSetBucket) {
|
|
keySize -= 8
|
|
}
|
|
|
|
var startKey = make([]byte, keySize+8)
|
|
|
|
for key := range keys {
|
|
key := common.CopyBytes([]byte(key))
|
|
copy(startKey[:keySize], dbutils.CompositeKeyWithoutIncarnation(key))
|
|
|
|
binary.BigEndian.PutUint64(startKey[keySize:], timestampTo)
|
|
if err := ig.db.Walk(vv.IndexBucket, startKey, 8*keySize, func(k, v []byte) (bool, error) {
|
|
timestamp := binary.BigEndian.Uint64(k[keySize:]) // the last timestamp in the chunk
|
|
kStr := string(common.CopyBytes(k))
|
|
if timestamp > timestampTo {
|
|
historyEffects[kStr] = nil
|
|
// truncate the chunk
|
|
index := dbutils.WrapHistoryIndex(v)
|
|
index = index.TruncateGreater(timestampTo)
|
|
if len(index) > 8 { // If the chunk is empty after truncation, it gets simply deleted
|
|
// Truncated chunk becomes "the last chunk" with the timestamp 0xffff....ffff
|
|
lastK := make([]byte, len(k))
|
|
copy(lastK, k[:keySize])
|
|
binary.BigEndian.PutUint64(lastK[keySize:], ^uint64(0))
|
|
historyEffects[string(lastK)] = common.CopyBytes(index)
|
|
}
|
|
}
|
|
return true, nil
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
for key, value := range historyEffects {
|
|
if value == nil {
|
|
if err := ig.db.Delete(vv.IndexBucket, []byte(key)); err != nil {
|
|
return err
|
|
}
|
|
} else {
|
|
if err := ig.db.Put(vv.IndexBucket, []byte(key), value); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (ig *IndexGenerator) DropIndex(bucket []byte) error {
|
|
//todo add truncate to all db
|
|
if bolt, ok := ig.db.(*ethdb.BoltDatabase); ok {
|
|
log.Warn("Remove bucket", "bucket", string(bucket))
|
|
err := bolt.ClearBuckets(bucket)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
return errors.New("imposible to drop")
|
|
}
|
|
|
|
func loadFunc(k []byte, value []byte, state etl.State, next etl.LoadNextFunc) error {
|
|
if len(value)%9 != 0 {
|
|
log.Error("Value must be a multiple of 9", "ln", len(value), "k", common.Bytes2Hex(k))
|
|
return errors.New("incorrect value")
|
|
}
|
|
|
|
currentChunkKey := dbutils.IndexChunkKey(k, ^uint64(0))
|
|
indexBytes, err1 := state.Get(currentChunkKey)
|
|
if err1 != nil && !errors.Is(err1, ethdb.ErrKeyNotFound) {
|
|
return fmt.Errorf("find chunk failed: %w", err1)
|
|
}
|
|
|
|
currentIndex := dbutils.WrapHistoryIndex(indexBytes)
|
|
|
|
for i := 0; i < len(value); i += 9 {
|
|
b := binary.BigEndian.Uint64(value[i:])
|
|
vzero := value[i+8] == 1
|
|
blockNr := b
|
|
if err1 != nil && err1 != ethdb.ErrKeyNotFound {
|
|
return fmt.Errorf("find chunk failed: %w", err1)
|
|
}
|
|
|
|
if dbutils.CheckNewIndexChunk(currentIndex, blockNr) {
|
|
// Chunk overflow, need to write the "old" current chunk under its key derived from the last element
|
|
indexKey, err3 := currentIndex.Key(k)
|
|
if err3 != nil {
|
|
return err3
|
|
}
|
|
// Flush the old chunk
|
|
if err4 := next(indexKey, currentIndex); err4 != nil {
|
|
return err4
|
|
}
|
|
// Start a new chunk
|
|
currentIndex = dbutils.NewHistoryIndex()
|
|
}
|
|
currentIndex = currentIndex.Append(blockNr, vzero)
|
|
}
|
|
err := next(currentChunkKey, currentIndex)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func getExtractFunc(bytes2walker func([]byte) changeset.Walker) etl.ExtractFunc { //nolint
|
|
return func(dbKey, dbValue []byte, next etl.ExtractNextFunc) error {
|
|
bufferMap := make(map[string][][]byte)
|
|
blockNum, _ := dbutils.DecodeTimestamp(dbKey)
|
|
err := bytes2walker(dbValue).Walk(func(changesetKey, changesetValue []byte) error {
|
|
sKey := string(changesetKey)
|
|
list := bufferMap[sKey]
|
|
b := blockNum
|
|
v := make([]byte, 9)
|
|
binary.BigEndian.PutUint64(v, b)
|
|
if len(changesetValue) == 0 {
|
|
v[8] = 1
|
|
}
|
|
list = append(list, v)
|
|
|
|
bufferMap[sKey] = list
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for k, v := range bufferMap {
|
|
for i := range v {
|
|
err = next(dbKey, []byte(k), v[i])
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func calculateIndexChunks(startBlock, endBlock uint64, numOfChunks int) [][]byte {
|
|
if endBlock < startBlock+1000000 || numOfChunks < 2 {
|
|
return nil
|
|
}
|
|
|
|
chunkSize := (endBlock - startBlock) / uint64(numOfChunks)
|
|
var chunks = make([][]byte, numOfChunks-1)
|
|
for i := uint64(1); i < uint64(numOfChunks); i++ {
|
|
chunks[i-1] = dbutils.EncodeTimestamp(i*chunkSize + 1)
|
|
}
|
|
return chunks
|
|
}
|