erigon-pulse/migrations/dupsort_state.go
Alex Sharov 62fe81e4be
IH stage speedup and lmdb custom comparators support (#1080)
* etl.Loader - allow use of custom comparator

* log timing

* try now

* try now

* more performance

* etl.Loader - allow use of custom comparator

* working version

* simplify IH cursor

* clean

* squash

* squash

* squash

* squash

* squash

* squash

* squash

* clean

* add only unwind support

* squash

* squash

* clean

* fix test

* clean

* clean

* clean
2020-09-10 13:35:58 +01:00

122 lines
3.6 KiB
Go

package migrations
import (
"fmt"
"github.com/ledgerwatch/turbo-geth/common/dbutils"
"github.com/ledgerwatch/turbo-geth/common/etl"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/trie"
)
var dupSortHashState = Migration{
Name: "dupsort_hash_state",
Up: func(db ethdb.Database, datadir string, OnLoadCommit etl.LoadCommitHandler) error {
if exists, err := db.(ethdb.BucketsMigrator).BucketExists(dbutils.CurrentStateBucketOld1); err != nil {
return err
} else if !exists {
return OnLoadCommit(db, nil, true)
}
if err := db.(ethdb.BucketsMigrator).ClearBuckets(dbutils.CurrentStateBucket); err != nil {
return err
}
extractFunc := func(k []byte, v []byte, next etl.ExtractNextFunc) error {
return next(k, k, v)
}
if err := etl.Transform(
db,
dbutils.CurrentStateBucketOld1,
dbutils.CurrentStateBucket,
datadir,
extractFunc,
etl.IdentityLoadFunc,
etl.TransformArgs{OnLoadCommit: OnLoadCommit},
); err != nil {
return err
}
if err := db.(ethdb.BucketsMigrator).DropBuckets(dbutils.CurrentStateBucketOld1); err != nil {
return err
}
return nil
},
}
var dupSortPlainState = Migration{
Name: "dupsort_plain_state",
Up: func(db ethdb.Database, datadir string, OnLoadCommit etl.LoadCommitHandler) error {
if exists, err := db.(ethdb.BucketsMigrator).BucketExists(dbutils.PlainStateBucketOld1); err != nil {
return err
} else if !exists {
return OnLoadCommit(db, nil, true)
}
if err := db.(ethdb.BucketsMigrator).ClearBuckets(dbutils.PlainStateBucket); err != nil {
return err
}
extractFunc := func(k []byte, v []byte, next etl.ExtractNextFunc) error {
return next(k, k, v)
}
if err := etl.Transform(
db,
dbutils.PlainStateBucketOld1,
dbutils.PlainStateBucket,
datadir,
extractFunc,
etl.IdentityLoadFunc,
etl.TransformArgs{OnLoadCommit: OnLoadCommit},
); err != nil {
return err
}
if err := db.(ethdb.BucketsMigrator).DropBuckets(dbutils.PlainStateBucketOld1); err != nil {
return err
}
return nil
},
}
var dupSortIH = Migration{
Name: "dupsort_intermediate_trie_hashes",
Up: func(db ethdb.Database, datadir string, OnLoadCommit etl.LoadCommitHandler) error {
if err := db.(ethdb.BucketsMigrator).ClearBuckets(dbutils.IntermediateTrieHashBucket); err != nil {
return err
}
buf := etl.NewSortableBuffer(etl.BufferOptimalSize)
comparator := db.(ethdb.HasTx).Tx().Comparator(dbutils.IntermediateTrieHashBucket)
buf.SetComparator(comparator)
collector := etl.NewCollector(datadir, buf)
hashCollector := func(keyHex []byte, hash []byte) error {
if len(keyHex) == 0 {
return nil
}
if len(keyHex) > trie.IHDupKeyLen {
return collector.Collect(keyHex[:trie.IHDupKeyLen], append(keyHex[trie.IHDupKeyLen:], hash...))
}
return collector.Collect(keyHex, hash)
}
loader := trie.NewFlatDBTrieLoader(dbutils.CurrentStateBucket, dbutils.IntermediateTrieHashBucket)
if err := loader.Reset(trie.NewRetainList(0), hashCollector /* HashCollector */, false); err != nil {
return err
}
if _, err := loader.CalcTrieRoot(db, nil); err != nil {
return err
}
if err := collector.Load(db, dbutils.IntermediateTrieHashBucket, etl.IdentityLoadFunc, etl.TransformArgs{
Comparator: comparator,
}); err != nil {
return fmt.Errorf("gen ih stage: fail load data to bucket: %w", err)
}
// this Migration is empty, sync will regenerate IH bucket values automatically
// alternative is - to copy whole stage here
if err := db.(ethdb.BucketsMigrator).DropBuckets(dbutils.IntermediateTrieHashBucketOld1); err != nil {
return err
}
return OnLoadCommit(db, nil, true)
},
}