State snapshot sync (#1417)

* move experiments to new branch&reorganise kv_snapshot

* walk&modify tests

* added delete from snapshot tests

* fmt

* state snapshot debug

* snapshot validation passed. copy state snapshot

* debug

* snapshot cursor.Prev test

* Prev works correct. Added Current check

* add err check

* added walk forward and backward test

* before refactoring

* refactoring

* execution with snapshot debug

* fix

* remove useless test

* before dupcursor implimentation

* tests with prev and delete works

* execution based on state snapshot passed

* remove useless tests

* blocks to 1140000 passed

* clean verifier

* cleanup state generation

* clean verify && seeder

* remove debug code

* tests passed

* fix lint

* save state

* test passed

* fix lint

* add state hash

* fix lint
This commit is contained in:
b00ris 2021-01-02 22:28:37 +03:00 committed by GitHub
parent 75cb938980
commit 8db5790838
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 2621 additions and 551 deletions

View File

@ -0,0 +1,298 @@
package commands
import (
"context"
"fmt"
"github.com/ledgerwatch/lmdb-go/lmdb"
"io/ioutil"
"os"
"path/filepath"
"time"
"github.com/c2h5oh/datasize"
"github.com/ledgerwatch/turbo-geth/cmd/utils"
"github.com/ledgerwatch/turbo-geth/common/dbutils"
"github.com/ledgerwatch/turbo-geth/core/rawdb"
"github.com/ledgerwatch/turbo-geth/eth/stagedsync"
"github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/log"
"github.com/ledgerwatch/turbo-geth/turbo/snapshotsync"
"github.com/spf13/cobra"
)
func init() {
withChaindata(cmdSnapshotCheck)
withBlock(cmdSnapshotCheck)
cmdSnapshotCheck.Flags().StringVar(&tmpDBPath, "tmp_db", "", "path to temporary db(for debug)")
withChaindata(dbCopyCmd)
rootCmd.AddCommand(dbCopyCmd)
rootCmd.AddCommand(cmdSnapshotCheck)
}
var tmpDBPath string
var cmdSnapshotCheck = &cobra.Command{
Use: "snapshot_check",
Short: "check execution over state snapshot by block",
Example: "go run cmd/integration/main.go snapshot_check --block 11400000 --chaindata /media/b00ris/nvme/backup/snapshotsync/tg/chaindata/ --snapshotDir /media/b00ris/nvme/snapshots/ --snapshotMode s --tmp_db /media/b00ris/nvme/tmp/debug",
RunE: func(cmd *cobra.Command, args []string) error {
ctx := utils.RootContext()
//db to provide headers, blocks, senders ...
mainDB, err := ethdb.Open(chaindata, true)
if err != nil {
return err
}
mode, err := snapshotsync.SnapshotModeFromString(snapshotMode)
if err != nil {
panic(err)
}
if !mode.State || len(snapshotDir) == 0 {
return fmt.Errorf("you need state snapshot for it")
}
stateSnapshotPath := filepath.Join(snapshotDir, "state")
stateSnapshot := ethdb.NewLMDB().Path(stateSnapshotPath).WithBucketsConfig(func(defaultBuckets dbutils.BucketsCfg) dbutils.BucketsCfg {
return dbutils.BucketsCfg{
dbutils.PlainStateBucket: dbutils.BucketsConfigs[dbutils.PlainStateBucket],
dbutils.PlainContractCodeBucket: dbutils.BucketsConfigs[dbutils.PlainContractCodeBucket],
dbutils.CodeBucket: dbutils.BucketsConfigs[dbutils.CodeBucket],
}
}).Flags(func(flags uint) uint { return flags | lmdb.Readonly }).MustOpen()
isNew := true
var path string
if len(tmpDBPath) > 0 {
isNew = false
path = tmpDBPath
} else {
path, err = ioutil.TempDir(os.TempDir(), "sndbg")
if err != nil {
return err
}
}
defer func() {
if err == nil {
os.RemoveAll(path)
} else {
log.Info("Temp database", "path", path)
}
}()
tmpDb := ethdb.NewLMDB().Path(path).MustOpen()
kv := ethdb.NewSnapshot2KV().
DB(tmpDb).
SnapshotDB([]string{dbutils.HeaderPrefix, dbutils.BlockBodyPrefix, dbutils.Senders, dbutils.HeadBlockKey, dbutils.HeaderNumberPrefix}, mainDB.KV()).
SnapshotDB([]string{dbutils.PlainStateBucket, dbutils.CodeBucket, dbutils.PlainContractCodeBucket}, stateSnapshot).
MustOpen()
db := ethdb.NewObjectDatabase(kv)
if isNew {
err = ethdb.SetStorageModeIfNotExist(db, ethdb.StorageMode{})
if err != nil {
return err
}
}
if err := snapshotCheck(ctx, db, isNew, os.TempDir()); err != nil {
log.Error("snapshotCheck error", "err", err)
return err
}
return nil
},
}
func snapshotCheck(ctx context.Context, db ethdb.Database, isNew bool, tmpDir string) (err error) {
var snapshotBlock uint64 = 11_000_000
blockNum, _, err := stages.GetStageProgress(db, stages.Execution)
if err != nil {
return err
}
//snapshot or last executed block
if blockNum > snapshotBlock {
snapshotBlock = blockNum
}
//get end of check
var lastBlockHeaderNumber uint64
if block == 0 {
lastBlockHash := rawdb.ReadHeadBlockHash(db)
lastBlockHeader, innerErr := rawdb.ReadHeaderByHash(db, lastBlockHash)
if innerErr != nil {
return innerErr
}
lastBlockHeaderNumber = lastBlockHeader.Number.Uint64()
} else {
lastBlockHeaderNumber = block
}
if lastBlockHeaderNumber <= snapshotBlock {
return fmt.Errorf("incorrect header number last block:%v, snapshotBlock: %v", lastBlockHeaderNumber, snapshotBlock)
}
if isNew {
log.Info("New tmp db. We need to promote hash state.")
tx, innerErr := db.Begin(context.Background(), ethdb.RW)
if innerErr != nil {
return innerErr
}
tt := time.Now()
err = stagedsync.PromoteHashedStateCleanly("", tx, tmpDir, ctx.Done())
log.Info("Promote took", "t", time.Since(tt))
if err != nil {
tx.Rollback()
return fmt.Errorf("promote state err: %w", err)
}
tt = time.Now()
_, err = tx.Commit()
if err != nil {
tx.Rollback()
return fmt.Errorf("commit promote state err: %w", err)
}
log.Info("promote committed", "t", time.Since(tt))
}
if isNew {
log.Info("Regenerate IH")
tx, innerErr := db.Begin(context.Background(), ethdb.RW)
if innerErr != nil {
return innerErr
}
hash, innerErr := rawdb.ReadCanonicalHash(tx, snapshotBlock)
if innerErr != nil {
tx.Rollback()
return innerErr
}
syncHeadHeader := rawdb.ReadHeader(tx, hash, snapshotBlock)
if syncHeadHeader == nil {
tx.Rollback()
return fmt.Errorf("empty header for %v", snapshotBlock)
}
expectedRootHash := syncHeadHeader.Root
tt := time.Now()
err = stagedsync.RegenerateIntermediateHashes("", tx, true, tmpDir, expectedRootHash, ctx.Done())
if err != nil {
tx.Rollback()
return fmt.Errorf("regenerateIntermediateHashes err: %w", err)
}
log.Info("RegenerateIntermediateHashes took", "t", time.Since(tt))
tt = time.Now()
_, err = tx.Commit()
if err != nil {
tx.Rollback()
return err
}
log.Info("Commit", "t", time.Since(tt))
}
cc, bc, st, progress := newSync(ctx.Done(), db, db, nil)
defer bc.Stop()
st.DisableStages(stages.Headers,
stages.BlockHashes,
stages.Bodies,
stages.Senders,
stages.AccountHistoryIndex,
stages.StorageHistoryIndex,
stages.LogIndex,
stages.CallTraces,
stages.TxLookup,
stages.TxPool,
stages.Finish,
)
if isNew {
stage3 := progress(stages.Senders)
err = stage3.DoneAndUpdate(db, lastBlockHeaderNumber)
if err != nil {
return err
}
stage4 := progress(stages.Execution)
err = stage4.DoneAndUpdate(db, snapshotBlock)
if err != nil {
return err
}
stage5 := progress(stages.HashState)
err = stage5.DoneAndUpdate(db, snapshotBlock)
if err != nil {
return err
}
stage6 := progress(stages.IntermediateHashes)
err = stage6.DoneAndUpdate(db, snapshotBlock)
if err != nil {
return err
}
}
ch := ctx.Done()
var batchSize datasize.ByteSize
must(batchSize.UnmarshalText([]byte(batchSizeStr)))
tx, err := db.Begin(context.Background(), ethdb.RW)
if err != nil {
return err
}
defer tx.Rollback()
for blockNumber := snapshotBlock + 1; blockNumber <= lastBlockHeaderNumber; blockNumber++ {
err = st.SetCurrentStage(stages.Execution)
if err != nil {
return err
}
stage4 := progress(stages.Execution)
stage4.BlockNumber = blockNumber - 1
log.Info("Stage4", "progress", stage4.BlockNumber)
err = stagedsync.SpawnExecuteBlocksStage(stage4, tx,
bc.Config(), cc, bc.GetVMConfig(),
ch,
stagedsync.ExecuteBlockStageParams{
ToBlock: blockNumber, // limit execution to the specified block
WriteReceipts: false,
BatchSize: int(batchSize),
})
if err != nil {
return fmt.Errorf("execution err %w", err)
}
stage5 := progress(stages.HashState)
stage5.BlockNumber = blockNumber - 1
log.Info("Stage5", "progress", stage5.BlockNumber)
err = stagedsync.SpawnHashStateStage(stage5, tx, tmpDir, ch)
if err != nil {
return fmt.Errorf("spawnHashStateStage err %w", err)
}
stage6 := progress(stages.IntermediateHashes)
stage6.BlockNumber = blockNumber - 1
log.Info("Stage6", "progress", stage6.BlockNumber)
if err = stagedsync.SpawnIntermediateHashesStage(stage5, tx, true, tmpDir, ch); err != nil {
log.Error("Error on ih", "err", err, "block", blockNumber)
return fmt.Errorf("spawnIntermediateHashesStage %w", err)
}
log.Info("Done", "progress", blockNumber)
err = tx.CommitAndBegin(context.TODO())
if err != nil {
log.Error("Error on commit", "err", err, "block", blockNumber)
return err
}
}
return nil
}
var dbCopyCmd = &cobra.Command{
Use: "copy_compact",
RunE: func(cmd *cobra.Command, args []string) error {
return copyCompact()
},
}

View File

@ -702,7 +702,7 @@ func SetSnapshotKV(db ethdb.Database, snapshotDir, snapshotMode string) error {
}
snapshotKV := db.(ethdb.HasKV).KV()
snapshotKV, err = snapshotsync.WrapBySnapshots(snapshotKV, snapshotDir, mode)
snapshotKV, err = snapshotsync.WrapBySnapshotsFromDir(snapshotKV, snapshotDir, mode)
if err != nil {
return err
}

View File

@ -102,7 +102,7 @@ func OpenDB(cfg Flags) (ethdb.KV, ethdb.Backend, error) {
if innerErr != nil {
return nil, nil, fmt.Errorf("can't process snapshot-mode err:%w", innerErr)
}
kv, innerErr := snapshotsync.WrapBySnapshots(db, cfg.SnapshotDir, mode)
kv, innerErr := snapshotsync.WrapBySnapshotsFromDir(db, cfg.SnapshotDir, mode)
if innerErr != nil {
return nil, nil, fmt.Errorf("can't wrap by snapshots err:%w", innerErr)
}

View File

@ -0,0 +1,164 @@
package commands
import (
"context"
"fmt"
"github.com/ledgerwatch/turbo-geth/common/dbutils"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/log"
"github.com/ledgerwatch/turbo-geth/turbo/snapshotsync"
"github.com/spf13/cobra"
"os"
"time"
)
func init() {
withChaindata(copyFromStateSnapshotCmd)
withSnapshotFile(copyFromStateSnapshotCmd)
withSnapshotData(copyFromStateSnapshotCmd)
withBlock(copyFromStateSnapshotCmd)
rootCmd.AddCommand(copyFromStateSnapshotCmd)
}
//go run cmd/snapshots/generator/main.go state_copy --block 11000000 --snapshot /media/b00ris/nvme/snapshots/state --chaindata /media/b00ris/nvme/backup/snapshotsync/tg/chaindata/ &> /media/b00ris/nvme/copy.log
var copyFromStateSnapshotCmd = &cobra.Command{
Use: "state_copy",
Short: "Copy from state snapshot",
Example: "go run cmd/snapshots/generator/main.go state_copy --block 11000000 --snapshot /media/b00ris/nvme/snapshots/state --chaindata /media/b00ris/nvme/backup/snapshotsync/tg/chaindata/",
RunE: func(cmd *cobra.Command, args []string) error {
return CopyFromState(cmd.Context(), chaindata, snapshotFile, block, snapshotDir, snapshotMode)
},
}
func CopyFromState(ctx context.Context, dbpath string, snapshotPath string, block uint64, snapshotDir, snapshotMode string) error {
db, err := ethdb.Open(dbpath, true)
if err != nil {
return err
}
kv := db.KV()
if snapshotDir != "" {
var mode snapshotsync.SnapshotMode
mode, err = snapshotsync.SnapshotModeFromString(snapshotMode)
if err != nil {
return err
}
kv, err = snapshotsync.WrapBySnapshotsFromDir(kv, snapshotDir, mode)
if err != nil {
return err
}
}
db.SetKV(kv)
err = os.RemoveAll(snapshotPath)
if err != nil {
return err
}
snkv := ethdb.NewLMDB().WithBucketsConfig(func(defaultBuckets dbutils.BucketsCfg) dbutils.BucketsCfg {
return dbutils.BucketsCfg{
dbutils.PlainStateBucket: dbutils.BucketsConfigs[dbutils.PlainStateBucket],
dbutils.PlainContractCodeBucket: dbutils.BucketsConfigs[dbutils.PlainContractCodeBucket],
dbutils.CodeBucket: dbutils.BucketsConfigs[dbutils.CodeBucket],
}
}).Path(snapshotPath).MustOpen()
log.Info("Create snapshot db", "path", snapshotPath)
sndb := ethdb.NewObjectDatabase(snkv).NewBatch()
tt := time.Now()
tt2 := time.Now()
max := 10000000
i := 0
err = db.Walk(dbutils.PlainStateBucket, []byte{}, 0, func(k, v []byte) (bool, error) {
innerErr := sndb.Put(dbutils.PlainStateBucket, k, v)
if innerErr != nil {
return false, fmt.Errorf("put state err: %w", innerErr)
}
i++
if i > max {
i = 0
innerErr = sndb.CommitAndBegin(ctx)
if innerErr != nil {
return false, fmt.Errorf("commit state err: %w", innerErr)
}
log.Info("Commit state", "batch", time.Since(tt2), "all", time.Since(tt))
tt2 = time.Now()
}
return true, nil
})
if err != nil {
return err
}
err = sndb.CommitAndBegin(ctx)
if err != nil {
return err
}
log.Info("Copy plain state end", "t", time.Since(tt))
tt = time.Now()
tt2 = time.Now()
err = db.Walk(dbutils.PlainContractCodeBucket, []byte{}, 0, func(k, v []byte) (bool, error) {
innerErr := sndb.Put(dbutils.PlainContractCodeBucket, k, v)
if innerErr != nil {
return false, fmt.Errorf("put contract code err: %w", innerErr)
}
i++
if i > max {
i = 0
innerErr = sndb.CommitAndBegin(ctx)
if innerErr != nil {
return false, fmt.Errorf("commit contract code err: %w", innerErr)
}
log.Info("Commit contract code", "batch", time.Since(tt2), "all", time.Since(tt))
tt2 = time.Now()
}
return true, nil
})
if err != nil {
return err
}
log.Info("Copy contract code end", "t", time.Since(tt))
_, err = sndb.Commit()
if err != nil {
return err
}
tt = time.Now()
tt2 = time.Now()
err = db.Walk(dbutils.CodeBucket, []byte{}, 0, func(k, v []byte) (bool, error) {
innerErr := sndb.Put(dbutils.CodeBucket, k, v)
if innerErr != nil {
return false, fmt.Errorf("put code err: %w", innerErr)
}
i++
if i > max {
i = 0
innerErr = sndb.CommitAndBegin(ctx)
if innerErr != nil {
return false, fmt.Errorf("commit code err: %w", innerErr)
}
log.Info("Commit code", "batch", time.Since(tt2), "all", time.Since(tt))
tt2 = time.Now()
}
return true, nil
})
if err != nil {
return err
}
log.Info("Copy code end", "t", time.Since(tt))
_, err = sndb.Commit()
if err != nil {
return err
}
sndb.Close()
db.Close()
tt = time.Now()
defer func() {
log.Info("Verify end", "t", time.Since(tt))
}()
return VerifyStateSnapshot(ctx, dbpath, snapshotPath, block)
}

View File

@ -45,7 +45,7 @@ func BodySnapshot(ctx context.Context, dbPath, snapshotPath string, toBlock uint
return err
}
kv, err = snapshotsync.WrapBySnapshots(kv, snapshotDir, mode)
kv, err = snapshotsync.WrapBySnapshotsFromDir(kv, snapshotDir, mode)
if err != nil {
return err
}
@ -53,8 +53,8 @@ func BodySnapshot(ctx context.Context, dbPath, snapshotPath string, toBlock uint
snKV := ethdb.NewLMDB().WithBucketsConfig(func(defaultBuckets dbutils.BucketsCfg) dbutils.BucketsCfg {
return dbutils.BucketsCfg{
dbutils.BlockBodyPrefix: dbutils.BucketConfigItem{},
dbutils.SnapshotInfoBucket: dbutils.BucketConfigItem{},
dbutils.BlockBodyPrefix: dbutils.BucketConfigItem{},
dbutils.BodiesSnapshotInfoBucket: dbutils.BucketConfigItem{},
}
}).Path(snapshotPath).MustOpen()
@ -94,12 +94,12 @@ func BodySnapshot(ctx context.Context, dbPath, snapshotPath string, toBlock uint
}
}
err = snDB.Put(dbutils.SnapshotInfoBucket, []byte(dbutils.SnapshotBodyHeadNumber), big.NewInt(0).SetUint64(toBlock).Bytes())
err = snDB.Put(dbutils.BodiesSnapshotInfoBucket, []byte(dbutils.SnapshotBodyHeadNumber), big.NewInt(0).SetUint64(toBlock).Bytes())
if err != nil {
log.Crit("SnapshotBodyHeadNumber error", "err", err)
return err
}
err = snDB.Put(dbutils.SnapshotInfoBucket, []byte(dbutils.SnapshotBodyHeadHash), hash.Bytes())
err = snDB.Put(dbutils.BodiesSnapshotInfoBucket, []byte(dbutils.SnapshotBodyHeadHash), hash.Bytes())
if err != nil {
log.Crit("SnapshotBodyHeadHash error", "err", err)
return err

View File

@ -54,15 +54,15 @@ func HeaderSnapshot(ctx context.Context, dbPath, snapshotPath string, toBlock ui
return err
}
kv, err = snapshotsync.WrapBySnapshots(kv, snapshotDir, mode)
kv, err = snapshotsync.WrapBySnapshotsFromDir(kv, snapshotDir, mode)
if err != nil {
return err
}
}
snKV := ethdb.NewLMDB().WithBucketsConfig(func(defaultBuckets dbutils.BucketsCfg) dbutils.BucketsCfg {
return dbutils.BucketsCfg{
dbutils.HeaderPrefix: dbutils.BucketConfigItem{},
dbutils.SnapshotInfoBucket: dbutils.BucketConfigItem{},
dbutils.HeaderPrefix: dbutils.BucketConfigItem{},
dbutils.HeadersSnapshotInfoBucket: dbutils.BucketConfigItem{},
}
}).Path(snapshotPath).MustOpen()
@ -107,12 +107,12 @@ func HeaderSnapshot(ctx context.Context, dbPath, snapshotPath string, toBlock ui
}
}
err = snDB.Put(dbutils.SnapshotInfoBucket, []byte(dbutils.SnapshotHeadersHeadNumber), big.NewInt(0).SetUint64(toBlock).Bytes())
err = snDB.Put(dbutils.HeadersSnapshotInfoBucket, []byte(dbutils.SnapshotHeadersHeadNumber), big.NewInt(0).SetUint64(toBlock).Bytes())
if err != nil {
log.Crit("SnapshotHeadersHeadNumber error", "err", err)
return err
}
err = snDB.Put(dbutils.SnapshotInfoBucket, []byte(dbutils.SnapshotHeadersHeadHash), hash.Bytes())
err = snDB.Put(dbutils.HeadersSnapshotInfoBucket, []byte(dbutils.SnapshotHeadersHeadHash), hash.Bytes())
if err != nil {
log.Crit("SnapshotHeadersHeadHash error", "err", err)
return err

View File

@ -0,0 +1,177 @@
package commands
import (
"context"
"errors"
"fmt"
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/common/dbutils"
"github.com/ledgerwatch/turbo-geth/core/state"
"github.com/ledgerwatch/turbo-geth/core/types/accounts"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/turbo/snapshotsync"
"github.com/ledgerwatch/turbo-geth/turbo/trie"
"github.com/spf13/cobra"
"os"
"time"
)
func init() {
withChaindata(generateStateSnapshotCmd)
withSnapshotFile(generateStateSnapshotCmd)
withSnapshotData(generateStateSnapshotCmd)
withBlock(generateStateSnapshotCmd)
rootCmd.AddCommand(generateStateSnapshotCmd)
}
//go run cmd/snapshots/generator/main.go state_copy --block 11000000 --snapshot /media/b00ris/nvme/snapshots/state --chaindata /media/b00ris/nvme/backup/snapshotsync/tg/chaindata/ &> /media/b00ris/nvme/copy.log
var generateStateSnapshotCmd = &cobra.Command{
Use: "state",
Short: "Generate state snapshot",
Example: "go run ./cmd/state/main.go stateSnapshot --block 11000000 --chaindata /media/b00ris/nvme/tgstaged/tg/chaindata/ --snapshot /media/b00ris/nvme/snapshots/state",
RunE: func(cmd *cobra.Command, args []string) error {
return GenerateStateSnapshot(cmd.Context(), chaindata, snapshotFile, block, snapshotDir, snapshotMode)
},
}
func GenerateStateSnapshot(ctx context.Context, dbPath, snapshotPath string, toBlock uint64, snapshotDir string, snapshotMode string) error {
if snapshotPath == "" {
return errors.New("empty snapshot path")
}
err := os.RemoveAll(snapshotPath)
if err != nil {
return err
}
kv := ethdb.NewLMDB().Path(dbPath).MustOpen()
if snapshotDir != "" {
var mode snapshotsync.SnapshotMode
mode, err = snapshotsync.SnapshotModeFromString(snapshotMode)
if err != nil {
return err
}
kv, err = snapshotsync.WrapBySnapshotsFromDir(kv, snapshotDir, mode)
if err != nil {
return err
}
}
snkv := ethdb.NewLMDB().WithBucketsConfig(func(defaultBuckets dbutils.BucketsCfg) dbutils.BucketsCfg {
return dbutils.BucketsCfg{
dbutils.PlainStateBucket: dbutils.BucketConfigItem{},
dbutils.PlainContractCodeBucket: dbutils.BucketConfigItem{},
dbutils.CodeBucket: dbutils.BucketConfigItem{},
dbutils.StateSnapshotInfoBucket: dbutils.BucketConfigItem{},
}
}).Path(snapshotPath).MustOpen()
sndb := ethdb.NewObjectDatabase(snkv)
mt := sndb.NewBatch()
tx, err := kv.Begin(context.Background(), nil, ethdb.RO)
if err != nil {
return err
}
tx2, err := kv.Begin(context.Background(), nil, ethdb.RO)
if err != nil {
return err
}
defer tx.Rollback()
i := 0
t := time.Now()
tt := time.Now()
err = state.WalkAsOfAccounts(tx, common.Address{}, toBlock+1, func(k []byte, v []byte) (bool, error) {
i++
if i%100000 == 0 {
fmt.Println(i, common.Bytes2Hex(k), "batch", time.Since(tt))
tt = time.Now()
select {
case <-ctx.Done():
return false, errors.New("interrupted")
default:
}
}
if len(k) != 20 {
return true, nil
}
var acc accounts.Account
if err = acc.DecodeForStorage(v); err != nil {
return false, fmt.Errorf("decoding %x for %x: %v", v, k, err)
}
if acc.Incarnation > 0 {
storagePrefix := dbutils.PlainGenerateStoragePrefix(k, acc.Incarnation)
if acc.IsEmptyRoot() {
t := trie.New(common.Hash{})
j := 0
innerErr := state.WalkAsOfStorage(tx2, common.BytesToAddress(k), acc.Incarnation, common.Hash{}, toBlock+1, func(k1, k2 []byte, vv []byte) (bool, error) {
j++
innerErr1 := mt.Put(dbutils.PlainStateBucket, dbutils.PlainGenerateCompositeStorageKey(k1, acc.Incarnation, k2), common.CopyBytes(vv))
if innerErr1 != nil {
return false, innerErr1
}
h, _ := common.HashData(k1)
t.Update(h.Bytes(), common.CopyBytes(vv))
return true, nil
})
if innerErr != nil {
return false, innerErr
}
acc.Root = t.Hash()
}
if acc.IsEmptyCodeHash() {
codeHash, err1 := tx2.GetOne(dbutils.PlainContractCodeBucket, storagePrefix)
if err1 != nil && errors.Is(err1, ethdb.ErrKeyNotFound) {
return false, fmt.Errorf("getting code hash for %x: %v", k, err1)
}
if len(codeHash) > 0 {
code, err1 := tx2.GetOne(dbutils.CodeBucket, codeHash)
if err1 != nil {
return false, err1
}
if err1 = mt.Put(dbutils.CodeBucket, codeHash, code); err1 != nil {
return false, err1
}
if err1 = mt.Put(dbutils.PlainContractCodeBucket, storagePrefix, codeHash); err1 != nil {
return false, err1
}
}
}
}
newAcc := make([]byte, acc.EncodingLengthForStorage())
acc.EncodeForStorage(newAcc)
innerErr := mt.Put(dbutils.PlainStateBucket, common.CopyBytes(k), newAcc)
if innerErr != nil {
return false, innerErr
}
if mt.BatchSize() >= mt.IdealBatchSize() {
ttt := time.Now()
innerErr = mt.CommitAndBegin(context.Background())
if innerErr != nil {
return false, innerErr
}
fmt.Println("Committed", time.Since(ttt))
}
return true, nil
})
if err != nil {
return err
}
_, err = mt.Commit()
if err != nil {
return err
}
fmt.Println("took", time.Since(t))
return VerifyStateSnapshot(ctx, dbPath, snapshotFile, block)
}

View File

@ -0,0 +1,100 @@
package commands
import (
"context"
"fmt"
"github.com/ledgerwatch/lmdb-go/lmdb"
"github.com/ledgerwatch/turbo-geth/common/dbutils"
"github.com/ledgerwatch/turbo-geth/core/rawdb"
"github.com/ledgerwatch/turbo-geth/eth/stagedsync"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/turbo/snapshotsync"
"github.com/spf13/cobra"
"io/ioutil"
"os"
"time"
)
func init() {
withChaindata(verifyStateSnapshotCmd)
withSnapshotFile(verifyStateSnapshotCmd)
withBlock(verifyStateSnapshotCmd)
rootCmd.AddCommand(verifyStateSnapshotCmd)
}
//
var verifyStateSnapshotCmd = &cobra.Command{
Use: "verify_state",
Short: "Verify state snapshot",
Example: "go run cmd/snapshots/generator/main.go verify_state --block 11000000 --snapshot /media/b00ris/nvme/snapshots/state/ --chaindata /media/b00ris/nvme/backup/snapshotsync/tg/chaindata/ ",
RunE: func(cmd *cobra.Command, args []string) error {
return VerifyStateSnapshot(cmd.Context(), chaindata, snapshotFile, block)
},
}
func VerifyStateSnapshot(ctx context.Context, dbPath, snapshotPath string, block uint64) error {
db, err := ethdb.Open(dbPath, true)
if err != nil {
return fmt.Errorf("open err: %w", err)
}
kv := db.KV()
if snapshotDir != "" {
var mode snapshotsync.SnapshotMode
mode, err = snapshotsync.SnapshotModeFromString(snapshotMode)
if err != nil {
return err
}
kv, err = snapshotsync.WrapBySnapshotsFromDir(kv, snapshotDir, mode)
if err != nil {
return err
}
}
db.SetKV(kv)
snkv := ethdb.NewLMDB().WithBucketsConfig(func(defaultBuckets dbutils.BucketsCfg) dbutils.BucketsCfg {
return dbutils.BucketsCfg{
dbutils.PlainStateBucket: dbutils.BucketsConfigs[dbutils.PlainStateBucket],
dbutils.PlainContractCodeBucket: dbutils.BucketsConfigs[dbutils.PlainContractCodeBucket],
dbutils.CodeBucket: dbutils.BucketsConfigs[dbutils.CodeBucket],
}
}).Path(snapshotPath).Flags(func(flags uint) uint { return flags | lmdb.Readonly }).MustOpen()
tmpPath, err := ioutil.TempDir(os.TempDir(), "vrf*")
if err != nil {
return err
}
tmpDB := ethdb.NewLMDB().Path(tmpPath).MustOpen()
defer os.RemoveAll(tmpPath)
defer tmpDB.Close()
snkv = ethdb.NewSnapshot2KV().SnapshotDB([]string{dbutils.PlainStateBucket, dbutils.PlainContractCodeBucket, dbutils.CodeBucket}, snkv).DB(tmpDB).MustOpen()
sndb := ethdb.NewObjectDatabase(snkv)
tx, err := sndb.Begin(context.Background(), ethdb.RW)
if err != nil {
return err
}
defer tx.Rollback()
hash, err := rawdb.ReadCanonicalHash(db, block)
if err != nil {
return err
}
syncHeadHeader := rawdb.ReadHeader(db, hash, block)
if syncHeadHeader == nil {
return fmt.Errorf("empty header")
}
expectedRootHash := syncHeadHeader.Root
tt := time.Now()
err = stagedsync.PromoteHashedStateCleanly("", tx, os.TempDir(), ctx.Done())
fmt.Println("Promote took", time.Since(tt))
if err != nil {
return fmt.Errorf("promote state err: %w", err)
}
err = stagedsync.RegenerateIntermediateHashes("", tx, true, os.TempDir(), expectedRootHash, ctx.Done())
if err != nil {
return fmt.Errorf("regenerateIntermediateHashes err: %w", err)
}
return nil
}

View File

@ -54,6 +54,7 @@ var rootCmd = &cobra.Command{
debug.Exit()
},
Args: cobra.ExactArgs(1),
ArgAliases: []string{"snapshots dir"},
RunE: func(cmd *cobra.Command, args []string) error {
return Seed(cmd.Context(), args[0])
},

View File

@ -4,7 +4,6 @@ import (
"context"
"fmt"
"os"
"os/signal"
"path/filepath"
"time"
@ -20,13 +19,7 @@ import (
func Seed(ctx context.Context, datadir string) error {
datadir = filepath.Dir(datadir)
ctx, cancel := context.WithCancel(ctx)
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
go func() {
<-c
cancel()
}()
defer cancel()
cfg := trnt.DefaultTorrentConfig()
cfg.NoDHT = false
@ -39,7 +32,7 @@ func Seed(ctx context.Context, datadir string) error {
pathes := []string{
cfg.DataDir + "/headers",
cfg.DataDir + "/bodies",
//cfg.DataDir + "/state",
cfg.DataDir + "/state",
//cfg.DataDir+"/receipts",
}

View File

@ -16,7 +16,7 @@ func HeadersSnapshot(snapshotPath string) error {
snKV := ethdb.NewLMDB().Path(snapshotPath).Flags(func(flags uint) uint { return flags | lmdb.Readonly }).WithBucketsConfig(func(defaultBuckets dbutils.BucketsCfg) dbutils.BucketsCfg {
return dbutils.BucketsCfg{
dbutils.HeaderPrefix: dbutils.BucketConfigItem{},
dbutils.SnapshotInfoBucket: dbutils.BucketConfigItem{},
dbutils.HeadersSnapshotInfoBucket: dbutils.BucketConfigItem{},
}
}).MustOpen()
var prevHeader *types.Header

View File

@ -141,11 +141,14 @@ func TrimRightZeroes(s []byte) []byte {
func KeyCmp(key1, key2 []byte) (int, bool) {
switch {
case key1 == nil && key2 == nil:
//both keys are empty
case len(key1) == 0 && len(key2) == 0:
return 0, true
case key1 == nil && key2 != nil:
// key1 is empty
case len(key1) == 0 && len(key2) != 0:
return 1, false
case key1 != nil && key2 == nil:
// key2 is empty
case len(key1) != 0 && len(key2) == 0:
return -1, false
default:
return bytes.Compare(key1, key2), false

View File

@ -100,8 +100,11 @@ var (
IntermediateTrieHashBucketOld1 = "iTh"
// DatabaseInfoBucket is used to store information about data layout.
DatabaseInfoBucket = "DBINFO"
SnapshotInfoBucket = "SNINFO"
DatabaseInfoBucket = "DBINFO"
SnapshotInfoBucket = "SNINFO"
HeadersSnapshotInfoBucket = "hSNINFO"
BodiesSnapshotInfoBucket = "bSNINFO"
StateSnapshotInfoBucket = "sSNINFO"
// databaseVerisionKey tracks the current database version.
DatabaseVerisionKey = "DatabaseVersion"
@ -240,6 +243,9 @@ var Buckets = []string{
LogTopicIndex,
LogAddressIndex,
SnapshotInfoBucket,
HeadersSnapshotInfoBucket,
BodiesSnapshotInfoBucket,
StateSnapshotInfoBucket,
CallFromIndex,
CallToIndex,
Log,

View File

@ -232,12 +232,12 @@ func New(stack *node.Node, config *Config) (*Ethereum, error) {
}
snapshotKV := chainDb.KV()
snapshotKV, innerErr = snapshotsync.WrapBySnapshots2(snapshotKV, downloadedSnapshots)
snapshotKV, innerErr = snapshotsync.WrapBySnapshotsFromDownloader(snapshotKV, downloadedSnapshots)
if innerErr != nil {
return nil, innerErr
}
chainDb.SetKV(snapshotKV)
innerErr = snapshotsync.PostProcessing(chainDb, config.SnapshotMode)
innerErr = snapshotsync.PostProcessing(chainDb, config.SnapshotMode, downloadedSnapshots)
if innerErr != nil {
return nil, innerErr
}
@ -251,6 +251,7 @@ func New(stack *node.Node, config *Config) (*Ethereum, error) {
if err != nil {
return nil, err
}
err = torrentClient.Load(chainDb)
if err != nil {
return nil, err
@ -258,16 +259,20 @@ func New(stack *node.Node, config *Config) (*Ethereum, error) {
err = torrentClient.AddSnapshotsTorrents(context.Background(), chainDb, config.NetworkID, config.SnapshotMode)
if err == nil {
torrentClient.Download()
snapshotKV := chainDb.KV()
snapshotKV, err = snapshotsync.WrapBySnapshots(snapshotKV, dbPath, config.SnapshotMode)
if err != nil {
return nil, err
mp, innerErr := torrentClient.GetSnapshots(chainDb, config.NetworkID)
if innerErr != nil {
return nil, innerErr
}
snapshotKV, innerErr = snapshotsync.WrapBySnapshotsFromDownloader(snapshotKV, mp)
if innerErr != nil {
return nil, innerErr
}
chainDb.SetKV(snapshotKV)
err = snapshotsync.PostProcessing(chainDb, config.SnapshotMode)
if err != nil {
return nil, err
innerErr = snapshotsync.PostProcessing(chainDb, config.SnapshotMode, mp)
if innerErr != nil {
return nil, innerErr
}
} else {
log.Error("There was an error in snapshot init. Swithing to regular sync", "err", err)

View File

@ -34,7 +34,7 @@ func SpawnHashStateStage(s *StageState, db ethdb.Database, tmpdir string, quit <
log.Info(fmt.Sprintf("[%s] Promoting plain state", logPrefix), "from", s.BlockNumber, "to", to)
if s.BlockNumber == 0 { // Initial hashing of the state is performed at the previous stage
if err := promoteHashedStateCleanly(logPrefix, db, tmpdir, quit); err != nil {
if err := PromoteHashedStateCleanly(logPrefix, db, tmpdir, quit); err != nil {
return fmt.Errorf("[%s] %w", logPrefix, err)
}
} else {
@ -74,7 +74,7 @@ func unwindHashStateStageImpl(logPrefix string, u *UnwindState, s *StageState, s
return nil
}
func promoteHashedStateCleanly(logPrefix string, db ethdb.Database, tmpdir string, quit <-chan struct{}) error {
func PromoteHashedStateCleanly(logPrefix string, db ethdb.Database, tmpdir string, quit <-chan struct{}) error {
err := etl.Transform(
logPrefix,
db,

View File

@ -35,7 +35,7 @@ func TestPromoteHashedStateClearState(t *testing.T) {
generateBlocks(t, 1, 50, hashedWriterGen(tx1), changeCodeWithIncarnations)
generateBlocks(t, 1, 50, plainWriterGen(tx2), changeCodeWithIncarnations)
err = promoteHashedStateCleanly("logPrefix", tx2, getTmpDir(), nil)
err = PromoteHashedStateCleanly("logPrefix", tx2, getTmpDir(), nil)
if err != nil {
t.Errorf("error while promoting state: %v", err)
}
@ -71,7 +71,7 @@ func TestPromoteHashedStateIncremental(t *testing.T) {
err = tx2.CommitAndBegin(context.Background())
require.NoError(t, err)
err = promoteHashedStateCleanly("logPrefix", tx2, getTmpDir(), nil)
err = PromoteHashedStateCleanly("logPrefix", tx2, getTmpDir(), nil)
if err != nil {
t.Errorf("error while promoting state: %v", err)
}
@ -150,7 +150,7 @@ func TestUnwindHashed(t *testing.T) {
generateBlocks(t, 1, 50, hashedWriterGen(tx1), changeCodeWithIncarnations)
generateBlocks(t, 1, 50, plainWriterGen(tx2), changeCodeWithIncarnations)
err = promoteHashedStateCleanly("logPrefix", tx2, getTmpDir(), nil)
err = PromoteHashedStateCleanly("logPrefix", tx2, getTmpDir(), nil)
if err != nil {
t.Errorf("error while promoting state: %v", err)
}

View File

@ -56,7 +56,7 @@ func SpawnIntermediateHashesStage(s *StageState, db ethdb.Database, checkRoot bo
logPrefix := s.state.LogPrefix()
log.Info(fmt.Sprintf("[%s] Generating intermediate hashes", logPrefix), "from", s.BlockNumber, "to", to)
if s.BlockNumber == 0 {
if err := regenerateIntermediateHashes(logPrefix, tx, checkRoot, tmpdir, expectedRootHash, quit); err != nil {
if err := RegenerateIntermediateHashes(logPrefix, tx, checkRoot, tmpdir, expectedRootHash, quit); err != nil {
return err
}
} else {
@ -78,7 +78,7 @@ func SpawnIntermediateHashesStage(s *StageState, db ethdb.Database, checkRoot bo
return nil
}
func regenerateIntermediateHashes(logPrefix string, db ethdb.Database, checkRoot bool, tmpdir string, expectedRootHash common.Hash, quit <-chan struct{}) error {
func RegenerateIntermediateHashes(logPrefix string, db ethdb.Database, checkRoot bool, tmpdir string, expectedRootHash common.Hash, quit <-chan struct{}) error {
log.Info(fmt.Sprintf("[%s] Regeneration intermediate hashes started", logPrefix))
// Clear IH bucket
c := db.(ethdb.HasTx).Tx().Cursor(dbutils.IntermediateTrieHashBucket)

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -13,9 +13,9 @@ const (
SnapshotBlock = 11_000_000
LmdbFilename = "data.mdb"
HeadersSnapshotHash = "7727174de470b7fe0bb3e36d35e85cc48853d470" //11кk block 1mb chunk
BlocksSnapshotHash = "0546b881c50de9984dd8865d0f18cc5153e4c21b" //11кk block 1mb chunk
StateSnapshotHash = ""
HeadersSnapshotHash = "460da4ffbc2b77f6662a8a7c15e21f4c5981656d" //11кk block 1mb chunk
BlocksSnapshotHash = "6353d013d614f1f8145d71e1479de9b4361d273f" //11кk block 1mb chunk
StateSnapshotHash = "fed1ef2b4d2cd8ea32eda24559b4d7eedaeb1b78"
ReceiptsSnapshotHash = ""
SnapshotInfoHashPrefix = "ih"
@ -27,6 +27,7 @@ var (
params.MainnetChainConfig.ChainID.Uint64(): {
snapshotsync.SnapshotType_headers: metainfo.NewHashFromHex(HeadersSnapshotHash),
snapshotsync.SnapshotType_bodies: metainfo.NewHashFromHex(BlocksSnapshotHash),
snapshotsync.SnapshotType_state: metainfo.NewHashFromHex(StateSnapshotHash),
},
}
ErrInvalidSnapshot = errors.New("this snapshot for this chainID not supported ")

View File

@ -6,6 +6,7 @@ import (
"encoding/binary"
"errors"
"fmt"
"path/filepath"
"time"
lg "github.com/anacrolix/log"
@ -218,6 +219,53 @@ func (cli *Client) Download() {
}
}
func (cli *Client) GetSnapshots(db ethdb.Database, networkID uint64) (map[snapshotsync.SnapshotType]*snapshotsync.SnapshotsInfo, error) {
mp := make(map[snapshotsync.SnapshotType]*snapshotsync.SnapshotsInfo)
networkIDBytes := make([]byte, 8)
binary.BigEndian.PutUint64(networkIDBytes, networkID)
err := db.Walk(dbutils.SnapshotInfoBucket, append(networkIDBytes, []byte(SnapshotInfoHashPrefix)...), 8*8+16, func(k, v []byte) (bool, error) {
var hash metainfo.Hash
if len(v) != metainfo.HashSize {
return true, nil
}
copy(hash[:], v)
t, ok := cli.Cli.Torrent(hash)
if !ok {
return true, nil
}
var gotInfo bool
readiness := int32(0)
select {
case <-t.GotInfo():
gotInfo = true
readiness = int32(100 * (float64(t.BytesCompleted()) / float64(t.Info().TotalLength())))
default:
}
_, tpStr := ParseInfoHashKey(k)
tp, ok := snapshotsync.SnapshotType_value[tpStr]
if !ok {
return false, fmt.Errorf("incorrect type: %v", tpStr)
}
val := &snapshotsync.SnapshotsInfo{
Type: snapshotsync.SnapshotType(tp),
GotInfoByte: gotInfo,
Readiness: readiness,
SnapshotBlock: SnapshotBlock,
Dbpath: filepath.Join(cli.snapshotsDir, t.Files()[0].Path()),
}
mp[snapshotsync.SnapshotType(tp)] = val
return true, nil
})
if err != nil {
return nil, err
}
return mp, nil
}
func getTorrentSpec(db ethdb.Database, snapshotName string, networkID uint64) ([]byte, []byte, error) {
var infohash, infobytes []byte
var err error

View File

@ -2,14 +2,8 @@ package bittorrent
import (
"context"
"encoding/binary"
"errors"
"fmt"
"path/filepath"
"github.com/anacrolix/torrent/metainfo"
"github.com/golang/protobuf/ptypes/empty"
"github.com/ledgerwatch/turbo-geth/common/dbutils"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/turbo/snapshotsync"
)
@ -52,51 +46,12 @@ func (S *SNDownloaderServer) Load() error {
func (S *SNDownloaderServer) Snapshots(ctx context.Context, request *snapshotsync.SnapshotsRequest) (*snapshotsync.SnapshotsInfoReply, error) {
reply := snapshotsync.SnapshotsInfoReply{}
err := S.WalkThroughTorrents(request.NetworkId, func(k, v []byte) (bool, error) {
var hash metainfo.Hash
if len(v) != metainfo.HashSize {
return true, nil
}
copy(hash[:], v)
t, ok := S.t.Cli.Torrent(hash)
if !ok {
return true, nil
}
var gotInfo bool
readiness := int32(0)
select {
case <-t.GotInfo():
gotInfo = true
readiness = int32(100 * (float64(t.BytesCompleted()) / float64(t.Info().TotalLength())))
default:
}
_, tpStr := ParseInfoHashKey(k)
tp, ok := snapshotsync.SnapshotType_value[tpStr]
if !ok {
return false, fmt.Errorf("incorrect type: %v", tpStr)
}
val := &snapshotsync.SnapshotsInfo{
Type: snapshotsync.SnapshotType(tp),
GotInfoByte: gotInfo,
Readiness: readiness,
SnapshotBlock: SnapshotBlock,
Dbpath: filepath.Join(S.t.snapshotsDir, t.Files()[0].Path()),
}
reply.Info = append(reply.Info, val)
return true, nil
})
resp, err := S.t.GetSnapshots(S.db, request.NetworkId)
if err != nil {
return nil, err
}
for i := range resp {
reply.Info = append(reply.Info, resp[i])
}
return &reply, nil
}
func (S *SNDownloaderServer) WalkThroughTorrents(networkID uint64, f func(k, v []byte) (bool, error)) error {
networkIDBytes := make([]byte, 8)
binary.BigEndian.PutUint64(networkIDBytes, networkID)
return S.db.Walk(dbutils.SnapshotInfoBucket, append(networkIDBytes, []byte(SnapshotInfoHashPrefix)...), 8*8+16, f)
}

View File

@ -24,19 +24,27 @@ var (
HeaderCanonical = stages.SyncStage("snapshot_canonical")
)
func PostProcessing(db ethdb.Database, mode SnapshotMode) error {
func PostProcessing(db ethdb.Database, mode SnapshotMode, downloadedSnapshots map[SnapshotType]*SnapshotsInfo) error {
if mode.Headers {
err := GenerateHeaderIndexes(context.Background(), db)
if err != nil {
return err
}
}
if mode.Bodies {
err := PostProcessBodies(db)
if err != nil {
return err
}
}
if mode.State {
err := PostProcessState(db, downloadedSnapshots[SnapshotType_state])
if err != nil {
return err
}
}
return nil
}
@ -67,6 +75,23 @@ func PostProcessBodies(db ethdb.Database) error {
return nil
}
func PostProcessState(db ethdb.GetterPutter, info *SnapshotsInfo) error {
v, _, err := stages.GetStageProgress(db, stages.Execution)
if err != nil {
return err
}
if v > 0 {
return nil
}
err = stages.SaveStageProgress(db, stages.Execution, info.SnapshotBlock, nil)
if err != nil {
return err
}
return nil
}
func GenerateHeaderIndexes(ctx context.Context, db ethdb.Database) error {
var hash common.Hash
var number uint64
@ -78,12 +103,12 @@ func GenerateHeaderIndexes(ctx context.Context, db ethdb.Database) error {
if v == 0 {
log.Info("Generate headers hash to number index")
headHashBytes, innerErr := db.Get(dbutils.SnapshotInfoBucket, []byte(dbutils.SnapshotHeadersHeadHash))
headHashBytes, innerErr := db.Get(dbutils.HeadersSnapshotInfoBucket, []byte(dbutils.SnapshotHeadersHeadHash))
if innerErr != nil {
return innerErr
}
headNumberBytes, innerErr := db.Get(dbutils.SnapshotInfoBucket, []byte(dbutils.SnapshotHeadersHeadNumber))
headNumberBytes, innerErr := db.Get(dbutils.HeadersSnapshotInfoBucket, []byte(dbutils.SnapshotHeadersHeadNumber))
if innerErr != nil {
return innerErr
}

View File

@ -30,7 +30,7 @@ func TestHeadersGenerateIndex(t *testing.T) {
panic(innerErr)
}
}
c := tx.Cursor(dbutils.SnapshotInfoBucket)
c := tx.Cursor(dbutils.HeadersSnapshotInfoBucket)
innerErr := c.Put([]byte(dbutils.SnapshotHeadersHeadHash), headers[len(headers)-1].Hash().Bytes())
if innerErr != nil {
return innerErr
@ -55,7 +55,7 @@ func TestHeadersGenerateIndex(t *testing.T) {
}
snKV := ethdb.NewLMDB().Path(snPath).Flags(func(flags uint) uint { return flags | lmdb.Readonly }).WithBucketsConfig(ethdb.DefaultBucketConfigs).MustOpen()
snKV = ethdb.NewSnapshotKV().For(dbutils.HeaderPrefix).For(dbutils.SnapshotInfoBucket).SnapshotDB(snKV).DB(db).MustOpen()
snKV = ethdb.NewSnapshot2KV().SnapshotDB([]string{dbutils.HeadersSnapshotInfoBucket, dbutils.HeaderPrefix}, snKV).DB(db).MustOpen()
err = GenerateHeaderIndexes(context.Background(), ethdb.NewObjectDatabase(snKV))
if err != nil {
t.Fatal(err)

View File

@ -10,18 +10,31 @@ import (
var (
bucketConfigs = map[SnapshotType]dbutils.BucketsCfg{
SnapshotType_bodies: {
dbutils.BlockBodyPrefix: dbutils.BucketConfigItem{},
dbutils.SnapshotInfoBucket: dbutils.BucketConfigItem{},
dbutils.BlockBodyPrefix: dbutils.BucketConfigItem{},
dbutils.BodiesSnapshotInfoBucket: dbutils.BucketConfigItem{},
},
SnapshotType_headers: {
dbutils.HeaderPrefix: dbutils.BucketConfigItem{},
dbutils.SnapshotInfoBucket: dbutils.BucketConfigItem{},
dbutils.HeaderPrefix: dbutils.BucketConfigItem{},
dbutils.HeadersSnapshotInfoBucket: dbutils.BucketConfigItem{},
},
SnapshotType_state: {
dbutils.PlainStateBucket: dbutils.BucketConfigItem{
Flags: dbutils.DupSort,
AutoDupSortKeysConversion: true,
DupFromLen: 60,
DupToLen: 28,
},
dbutils.PlainContractCodeBucket: dbutils.BucketConfigItem{},
dbutils.CodeBucket: dbutils.BucketConfigItem{},
dbutils.StateSnapshotInfoBucket: dbutils.BucketConfigItem{},
},
}
)
func WrapBySnapshots(kv ethdb.KV, snapshotDir string, mode SnapshotMode) (ethdb.KV, error) {
func WrapBySnapshotsFromDir(kv ethdb.KV, snapshotDir string, mode SnapshotMode) (ethdb.KV, error) {
log.Info("Wrap db to snapshots", "dir", snapshotDir, "mode", mode.ToString())
snkv := ethdb.NewSnapshot2KV().DB(kv)
if mode.Bodies {
snapshotKV, err := ethdb.NewLMDB().Flags(func(flags uint) uint { return flags | lmdb.Readonly }).Path(snapshotDir + "/bodies").WithBucketsConfig(func(defaultBuckets dbutils.BucketsCfg) dbutils.BucketsCfg {
return bucketConfigs[SnapshotType_bodies]
@ -30,10 +43,7 @@ func WrapBySnapshots(kv ethdb.KV, snapshotDir string, mode SnapshotMode) (ethdb.
log.Error("Can't open body snapshot", "err", err)
return nil, err
} else { //nolint
kv = ethdb.NewSnapshotKV().SnapshotDB(snapshotKV).
For(dbutils.BlockBodyPrefix).
For(dbutils.SnapshotInfoBucket).
DB(kv).MustOpen()
snkv.SnapshotDB([]string{dbutils.BlockBodyPrefix, dbutils.BodiesSnapshotInfoBucket}, snapshotKV)
}
}
@ -45,33 +55,44 @@ func WrapBySnapshots(kv ethdb.KV, snapshotDir string, mode SnapshotMode) (ethdb.
log.Error("Can't open headers snapshot", "err", err)
return nil, err
} else { //nolint
kv = ethdb.NewSnapshotKV().SnapshotDB(snapshotKV).
For(dbutils.HeaderPrefix).
For(dbutils.SnapshotInfoBucket).
DB(kv).MustOpen()
snkv.SnapshotDB([]string{dbutils.HeaderPrefix, dbutils.HeadersSnapshotInfoBucket}, snapshotKV)
}
}
return kv, nil
if mode.State {
snapshotKV, err := ethdb.NewLMDB().Flags(func(flags uint) uint { return flags | lmdb.Readonly }).Path(snapshotDir + "/headers").WithBucketsConfig(func(defaultBuckets dbutils.BucketsCfg) dbutils.BucketsCfg {
return bucketConfigs[SnapshotType_headers]
}).Open()
if err != nil {
log.Error("Can't open headers snapshot", "err", err)
return nil, err
} else { //nolint
snkv.SnapshotDB([]string{dbutils.StateSnapshotInfoBucket, dbutils.PlainStateBucket, dbutils.PlainContractCodeBucket, dbutils.CodeBucket}, snapshotKV)
}
}
return snkv.MustOpen(), nil
}
func WrapBySnapshots2(kv ethdb.KV, snapshots map[SnapshotType]*SnapshotsInfo) (ethdb.KV, error) {
func WrapBySnapshotsFromDownloader(kv ethdb.KV, snapshots map[SnapshotType]*SnapshotsInfo) (ethdb.KV, error) {
snKV := ethdb.NewSnapshot2KV().DB(kv)
for k, v := range snapshots {
log.Info("Wrap db by", "snapshot", k.String(), "dir", v.Dbpath)
cfg := bucketConfigs[k]
snapshotKV, err := ethdb.NewLMDB().Flags(func(flags uint) uint { return flags | lmdb.Readonly }).Path(v.Dbpath).WithBucketsConfig(func(defaultBuckets dbutils.BucketsCfg) dbutils.BucketsCfg {
return cfg
}).Open()
if err != nil {
log.Error("Can't open snapshot", "err", err)
return nil, err
} else { //nolint
snKV := ethdb.NewSnapshotKV().SnapshotDB(snapshotKV)
for i := range bucketConfigs[k] {
snKV.For(i)
buckets := make([]string, 0, 1)
for bucket := range bucketConfigs[k] {
buckets = append(buckets, bucket)
}
kv = snKV.DB(kv).MustOpen()
snKV.SnapshotDB(buckets, snapshotKV)
}
}
return kv, nil
return snKV.MustOpen(), nil
}