diff --git a/cmd/integration/commands/snapshot_check.go b/cmd/integration/commands/snapshot_check.go new file mode 100644 index 000000000..2ba4deff1 --- /dev/null +++ b/cmd/integration/commands/snapshot_check.go @@ -0,0 +1,298 @@ +package commands + +import ( + "context" + "fmt" + "github.com/ledgerwatch/lmdb-go/lmdb" + "io/ioutil" + "os" + "path/filepath" + "time" + + "github.com/c2h5oh/datasize" + "github.com/ledgerwatch/turbo-geth/cmd/utils" + "github.com/ledgerwatch/turbo-geth/common/dbutils" + "github.com/ledgerwatch/turbo-geth/core/rawdb" + "github.com/ledgerwatch/turbo-geth/eth/stagedsync" + "github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages" + "github.com/ledgerwatch/turbo-geth/ethdb" + "github.com/ledgerwatch/turbo-geth/log" + "github.com/ledgerwatch/turbo-geth/turbo/snapshotsync" + "github.com/spf13/cobra" +) + +func init() { + withChaindata(cmdSnapshotCheck) + withBlock(cmdSnapshotCheck) + cmdSnapshotCheck.Flags().StringVar(&tmpDBPath, "tmp_db", "", "path to temporary db(for debug)") + withChaindata(dbCopyCmd) + rootCmd.AddCommand(dbCopyCmd) + rootCmd.AddCommand(cmdSnapshotCheck) +} + +var tmpDBPath string + +var cmdSnapshotCheck = &cobra.Command{ + Use: "snapshot_check", + Short: "check execution over state snapshot by block", + Example: "go run cmd/integration/main.go snapshot_check --block 11400000 --chaindata /media/b00ris/nvme/backup/snapshotsync/tg/chaindata/ --snapshotDir /media/b00ris/nvme/snapshots/ --snapshotMode s --tmp_db /media/b00ris/nvme/tmp/debug", + RunE: func(cmd *cobra.Command, args []string) error { + ctx := utils.RootContext() + //db to provide headers, blocks, senders ... + mainDB, err := ethdb.Open(chaindata, true) + if err != nil { + return err + } + mode, err := snapshotsync.SnapshotModeFromString(snapshotMode) + if err != nil { + panic(err) + } + + if !mode.State || len(snapshotDir) == 0 { + return fmt.Errorf("you need state snapshot for it") + } + + stateSnapshotPath := filepath.Join(snapshotDir, "state") + stateSnapshot := ethdb.NewLMDB().Path(stateSnapshotPath).WithBucketsConfig(func(defaultBuckets dbutils.BucketsCfg) dbutils.BucketsCfg { + return dbutils.BucketsCfg{ + dbutils.PlainStateBucket: dbutils.BucketsConfigs[dbutils.PlainStateBucket], + dbutils.PlainContractCodeBucket: dbutils.BucketsConfigs[dbutils.PlainContractCodeBucket], + dbutils.CodeBucket: dbutils.BucketsConfigs[dbutils.CodeBucket], + } + }).Flags(func(flags uint) uint { return flags | lmdb.Readonly }).MustOpen() + + isNew := true + var path string + if len(tmpDBPath) > 0 { + isNew = false + path = tmpDBPath + } else { + path, err = ioutil.TempDir(os.TempDir(), "sndbg") + if err != nil { + return err + } + } + + defer func() { + if err == nil { + os.RemoveAll(path) + } else { + log.Info("Temp database", "path", path) + } + }() + tmpDb := ethdb.NewLMDB().Path(path).MustOpen() + + kv := ethdb.NewSnapshot2KV(). + DB(tmpDb). + SnapshotDB([]string{dbutils.HeaderPrefix, dbutils.BlockBodyPrefix, dbutils.Senders, dbutils.HeadBlockKey, dbutils.HeaderNumberPrefix}, mainDB.KV()). + SnapshotDB([]string{dbutils.PlainStateBucket, dbutils.CodeBucket, dbutils.PlainContractCodeBucket}, stateSnapshot). + MustOpen() + + db := ethdb.NewObjectDatabase(kv) + if isNew { + err = ethdb.SetStorageModeIfNotExist(db, ethdb.StorageMode{}) + if err != nil { + return err + } + } + + if err := snapshotCheck(ctx, db, isNew, os.TempDir()); err != nil { + log.Error("snapshotCheck error", "err", err) + return err + } + return nil + }, +} + +func snapshotCheck(ctx context.Context, db ethdb.Database, isNew bool, tmpDir string) (err error) { + var snapshotBlock uint64 = 11_000_000 + blockNum, _, err := stages.GetStageProgress(db, stages.Execution) + if err != nil { + return err + } + + //snapshot or last executed block + if blockNum > snapshotBlock { + snapshotBlock = blockNum + } + + //get end of check + var lastBlockHeaderNumber uint64 + if block == 0 { + lastBlockHash := rawdb.ReadHeadBlockHash(db) + lastBlockHeader, innerErr := rawdb.ReadHeaderByHash(db, lastBlockHash) + if innerErr != nil { + return innerErr + } + lastBlockHeaderNumber = lastBlockHeader.Number.Uint64() + } else { + lastBlockHeaderNumber = block + } + + if lastBlockHeaderNumber <= snapshotBlock { + return fmt.Errorf("incorrect header number last block:%v, snapshotBlock: %v", lastBlockHeaderNumber, snapshotBlock) + } + + if isNew { + log.Info("New tmp db. We need to promote hash state.") + tx, innerErr := db.Begin(context.Background(), ethdb.RW) + if innerErr != nil { + return innerErr + } + + tt := time.Now() + err = stagedsync.PromoteHashedStateCleanly("", tx, tmpDir, ctx.Done()) + log.Info("Promote took", "t", time.Since(tt)) + if err != nil { + tx.Rollback() + return fmt.Errorf("promote state err: %w", err) + } + tt = time.Now() + _, err = tx.Commit() + if err != nil { + tx.Rollback() + return fmt.Errorf("commit promote state err: %w", err) + } + log.Info("promote committed", "t", time.Since(tt)) + } + + if isNew { + log.Info("Regenerate IH") + tx, innerErr := db.Begin(context.Background(), ethdb.RW) + if innerErr != nil { + return innerErr + } + + hash, innerErr := rawdb.ReadCanonicalHash(tx, snapshotBlock) + if innerErr != nil { + tx.Rollback() + return innerErr + } + + syncHeadHeader := rawdb.ReadHeader(tx, hash, snapshotBlock) + if syncHeadHeader == nil { + tx.Rollback() + return fmt.Errorf("empty header for %v", snapshotBlock) + } + expectedRootHash := syncHeadHeader.Root + + tt := time.Now() + err = stagedsync.RegenerateIntermediateHashes("", tx, true, tmpDir, expectedRootHash, ctx.Done()) + if err != nil { + tx.Rollback() + return fmt.Errorf("regenerateIntermediateHashes err: %w", err) + } + log.Info("RegenerateIntermediateHashes took", "t", time.Since(tt)) + tt = time.Now() + _, err = tx.Commit() + if err != nil { + tx.Rollback() + return err + } + log.Info("Commit", "t", time.Since(tt)) + } + + cc, bc, st, progress := newSync(ctx.Done(), db, db, nil) + defer bc.Stop() + st.DisableStages(stages.Headers, + stages.BlockHashes, + stages.Bodies, + stages.Senders, + stages.AccountHistoryIndex, + stages.StorageHistoryIndex, + stages.LogIndex, + stages.CallTraces, + stages.TxLookup, + stages.TxPool, + stages.Finish, + ) + + if isNew { + stage3 := progress(stages.Senders) + err = stage3.DoneAndUpdate(db, lastBlockHeaderNumber) + if err != nil { + return err + } + + stage4 := progress(stages.Execution) + err = stage4.DoneAndUpdate(db, snapshotBlock) + if err != nil { + return err + } + stage5 := progress(stages.HashState) + err = stage5.DoneAndUpdate(db, snapshotBlock) + if err != nil { + return err + } + + stage6 := progress(stages.IntermediateHashes) + err = stage6.DoneAndUpdate(db, snapshotBlock) + if err != nil { + return err + } + } + + ch := ctx.Done() + var batchSize datasize.ByteSize + must(batchSize.UnmarshalText([]byte(batchSizeStr))) + + tx, err := db.Begin(context.Background(), ethdb.RW) + if err != nil { + return err + } + defer tx.Rollback() + + for blockNumber := snapshotBlock + 1; blockNumber <= lastBlockHeaderNumber; blockNumber++ { + err = st.SetCurrentStage(stages.Execution) + if err != nil { + return err + } + stage4 := progress(stages.Execution) + stage4.BlockNumber = blockNumber - 1 + log.Info("Stage4", "progress", stage4.BlockNumber) + + err = stagedsync.SpawnExecuteBlocksStage(stage4, tx, + bc.Config(), cc, bc.GetVMConfig(), + ch, + stagedsync.ExecuteBlockStageParams{ + ToBlock: blockNumber, // limit execution to the specified block + WriteReceipts: false, + BatchSize: int(batchSize), + }) + if err != nil { + return fmt.Errorf("execution err %w", err) + } + + stage5 := progress(stages.HashState) + stage5.BlockNumber = blockNumber - 1 + log.Info("Stage5", "progress", stage5.BlockNumber) + err = stagedsync.SpawnHashStateStage(stage5, tx, tmpDir, ch) + if err != nil { + return fmt.Errorf("spawnHashStateStage err %w", err) + } + + stage6 := progress(stages.IntermediateHashes) + stage6.BlockNumber = blockNumber - 1 + log.Info("Stage6", "progress", stage6.BlockNumber) + if err = stagedsync.SpawnIntermediateHashesStage(stage5, tx, true, tmpDir, ch); err != nil { + log.Error("Error on ih", "err", err, "block", blockNumber) + return fmt.Errorf("spawnIntermediateHashesStage %w", err) + } + + log.Info("Done", "progress", blockNumber) + err = tx.CommitAndBegin(context.TODO()) + if err != nil { + log.Error("Error on commit", "err", err, "block", blockNumber) + return err + } + } + + return nil +} + +var dbCopyCmd = &cobra.Command{ + Use: "copy_compact", + RunE: func(cmd *cobra.Command, args []string) error { + return copyCompact() + }, +} diff --git a/cmd/integration/commands/stages.go b/cmd/integration/commands/stages.go index d5d961269..7a40c1baf 100644 --- a/cmd/integration/commands/stages.go +++ b/cmd/integration/commands/stages.go @@ -702,7 +702,7 @@ func SetSnapshotKV(db ethdb.Database, snapshotDir, snapshotMode string) error { } snapshotKV := db.(ethdb.HasKV).KV() - snapshotKV, err = snapshotsync.WrapBySnapshots(snapshotKV, snapshotDir, mode) + snapshotKV, err = snapshotsync.WrapBySnapshotsFromDir(snapshotKV, snapshotDir, mode) if err != nil { return err } diff --git a/cmd/rpcdaemon/cli/config.go b/cmd/rpcdaemon/cli/config.go index 739167c90..169593bd4 100644 --- a/cmd/rpcdaemon/cli/config.go +++ b/cmd/rpcdaemon/cli/config.go @@ -102,7 +102,7 @@ func OpenDB(cfg Flags) (ethdb.KV, ethdb.Backend, error) { if innerErr != nil { return nil, nil, fmt.Errorf("can't process snapshot-mode err:%w", innerErr) } - kv, innerErr := snapshotsync.WrapBySnapshots(db, cfg.SnapshotDir, mode) + kv, innerErr := snapshotsync.WrapBySnapshotsFromDir(db, cfg.SnapshotDir, mode) if innerErr != nil { return nil, nil, fmt.Errorf("can't wrap by snapshots err:%w", innerErr) } diff --git a/cmd/snapshots/generator/commands/copy_from_state.go b/cmd/snapshots/generator/commands/copy_from_state.go new file mode 100644 index 000000000..2d10f813c --- /dev/null +++ b/cmd/snapshots/generator/commands/copy_from_state.go @@ -0,0 +1,164 @@ +package commands + +import ( + "context" + "fmt" + "github.com/ledgerwatch/turbo-geth/common/dbutils" + "github.com/ledgerwatch/turbo-geth/ethdb" + "github.com/ledgerwatch/turbo-geth/log" + "github.com/ledgerwatch/turbo-geth/turbo/snapshotsync" + "github.com/spf13/cobra" + "os" + "time" +) + +func init() { + withChaindata(copyFromStateSnapshotCmd) + withSnapshotFile(copyFromStateSnapshotCmd) + withSnapshotData(copyFromStateSnapshotCmd) + withBlock(copyFromStateSnapshotCmd) + rootCmd.AddCommand(copyFromStateSnapshotCmd) + +} + +//go run cmd/snapshots/generator/main.go state_copy --block 11000000 --snapshot /media/b00ris/nvme/snapshots/state --chaindata /media/b00ris/nvme/backup/snapshotsync/tg/chaindata/ &> /media/b00ris/nvme/copy.log +var copyFromStateSnapshotCmd = &cobra.Command{ + Use: "state_copy", + Short: "Copy from state snapshot", + Example: "go run cmd/snapshots/generator/main.go state_copy --block 11000000 --snapshot /media/b00ris/nvme/snapshots/state --chaindata /media/b00ris/nvme/backup/snapshotsync/tg/chaindata/", + RunE: func(cmd *cobra.Command, args []string) error { + return CopyFromState(cmd.Context(), chaindata, snapshotFile, block, snapshotDir, snapshotMode) + }, +} + +func CopyFromState(ctx context.Context, dbpath string, snapshotPath string, block uint64, snapshotDir, snapshotMode string) error { + db, err := ethdb.Open(dbpath, true) + if err != nil { + return err + } + + kv := db.KV() + if snapshotDir != "" { + var mode snapshotsync.SnapshotMode + mode, err = snapshotsync.SnapshotModeFromString(snapshotMode) + if err != nil { + return err + } + kv, err = snapshotsync.WrapBySnapshotsFromDir(kv, snapshotDir, mode) + if err != nil { + return err + } + } + db.SetKV(kv) + + err = os.RemoveAll(snapshotPath) + if err != nil { + return err + } + snkv := ethdb.NewLMDB().WithBucketsConfig(func(defaultBuckets dbutils.BucketsCfg) dbutils.BucketsCfg { + return dbutils.BucketsCfg{ + dbutils.PlainStateBucket: dbutils.BucketsConfigs[dbutils.PlainStateBucket], + dbutils.PlainContractCodeBucket: dbutils.BucketsConfigs[dbutils.PlainContractCodeBucket], + dbutils.CodeBucket: dbutils.BucketsConfigs[dbutils.CodeBucket], + } + }).Path(snapshotPath).MustOpen() + log.Info("Create snapshot db", "path", snapshotPath) + + sndb := ethdb.NewObjectDatabase(snkv).NewBatch() + + tt := time.Now() + tt2 := time.Now() + max := 10000000 + i := 0 + err = db.Walk(dbutils.PlainStateBucket, []byte{}, 0, func(k, v []byte) (bool, error) { + innerErr := sndb.Put(dbutils.PlainStateBucket, k, v) + if innerErr != nil { + return false, fmt.Errorf("put state err: %w", innerErr) + } + i++ + if i > max { + i = 0 + innerErr = sndb.CommitAndBegin(ctx) + if innerErr != nil { + return false, fmt.Errorf("commit state err: %w", innerErr) + } + log.Info("Commit state", "batch", time.Since(tt2), "all", time.Since(tt)) + tt2 = time.Now() + } + + return true, nil + }) + if err != nil { + return err + } + err = sndb.CommitAndBegin(ctx) + if err != nil { + return err + } + + log.Info("Copy plain state end", "t", time.Since(tt)) + tt = time.Now() + tt2 = time.Now() + err = db.Walk(dbutils.PlainContractCodeBucket, []byte{}, 0, func(k, v []byte) (bool, error) { + innerErr := sndb.Put(dbutils.PlainContractCodeBucket, k, v) + if innerErr != nil { + return false, fmt.Errorf("put contract code err: %w", innerErr) + } + i++ + if i > max { + i = 0 + innerErr = sndb.CommitAndBegin(ctx) + if innerErr != nil { + return false, fmt.Errorf("commit contract code err: %w", innerErr) + } + log.Info("Commit contract code", "batch", time.Since(tt2), "all", time.Since(tt)) + tt2 = time.Now() + } + + return true, nil + }) + if err != nil { + return err + } + log.Info("Copy contract code end", "t", time.Since(tt)) + _, err = sndb.Commit() + if err != nil { + return err + } + + tt = time.Now() + tt2 = time.Now() + err = db.Walk(dbutils.CodeBucket, []byte{}, 0, func(k, v []byte) (bool, error) { + innerErr := sndb.Put(dbutils.CodeBucket, k, v) + if innerErr != nil { + return false, fmt.Errorf("put code err: %w", innerErr) + } + i++ + if i > max { + i = 0 + innerErr = sndb.CommitAndBegin(ctx) + if innerErr != nil { + return false, fmt.Errorf("commit code err: %w", innerErr) + } + log.Info("Commit code", "batch", time.Since(tt2), "all", time.Since(tt)) + tt2 = time.Now() + } + + return true, nil + }) + if err != nil { + return err + } + log.Info("Copy code end", "t", time.Since(tt)) + _, err = sndb.Commit() + if err != nil { + return err + } + sndb.Close() + db.Close() + tt = time.Now() + defer func() { + log.Info("Verify end", "t", time.Since(tt)) + }() + return VerifyStateSnapshot(ctx, dbpath, snapshotPath, block) +} diff --git a/cmd/snapshots/generator/commands/generate_body_snapshot.go b/cmd/snapshots/generator/commands/generate_body_snapshot.go index 8ab296d75..c778f56ac 100644 --- a/cmd/snapshots/generator/commands/generate_body_snapshot.go +++ b/cmd/snapshots/generator/commands/generate_body_snapshot.go @@ -45,7 +45,7 @@ func BodySnapshot(ctx context.Context, dbPath, snapshotPath string, toBlock uint return err } - kv, err = snapshotsync.WrapBySnapshots(kv, snapshotDir, mode) + kv, err = snapshotsync.WrapBySnapshotsFromDir(kv, snapshotDir, mode) if err != nil { return err } @@ -53,8 +53,8 @@ func BodySnapshot(ctx context.Context, dbPath, snapshotPath string, toBlock uint snKV := ethdb.NewLMDB().WithBucketsConfig(func(defaultBuckets dbutils.BucketsCfg) dbutils.BucketsCfg { return dbutils.BucketsCfg{ - dbutils.BlockBodyPrefix: dbutils.BucketConfigItem{}, - dbutils.SnapshotInfoBucket: dbutils.BucketConfigItem{}, + dbutils.BlockBodyPrefix: dbutils.BucketConfigItem{}, + dbutils.BodiesSnapshotInfoBucket: dbutils.BucketConfigItem{}, } }).Path(snapshotPath).MustOpen() @@ -94,12 +94,12 @@ func BodySnapshot(ctx context.Context, dbPath, snapshotPath string, toBlock uint } } - err = snDB.Put(dbutils.SnapshotInfoBucket, []byte(dbutils.SnapshotBodyHeadNumber), big.NewInt(0).SetUint64(toBlock).Bytes()) + err = snDB.Put(dbutils.BodiesSnapshotInfoBucket, []byte(dbutils.SnapshotBodyHeadNumber), big.NewInt(0).SetUint64(toBlock).Bytes()) if err != nil { log.Crit("SnapshotBodyHeadNumber error", "err", err) return err } - err = snDB.Put(dbutils.SnapshotInfoBucket, []byte(dbutils.SnapshotBodyHeadHash), hash.Bytes()) + err = snDB.Put(dbutils.BodiesSnapshotInfoBucket, []byte(dbutils.SnapshotBodyHeadHash), hash.Bytes()) if err != nil { log.Crit("SnapshotBodyHeadHash error", "err", err) return err diff --git a/cmd/snapshots/generator/commands/generate_header_snapshot.go b/cmd/snapshots/generator/commands/generate_header_snapshot.go index b4cc45500..c7dbcc116 100644 --- a/cmd/snapshots/generator/commands/generate_header_snapshot.go +++ b/cmd/snapshots/generator/commands/generate_header_snapshot.go @@ -54,15 +54,15 @@ func HeaderSnapshot(ctx context.Context, dbPath, snapshotPath string, toBlock ui return err } - kv, err = snapshotsync.WrapBySnapshots(kv, snapshotDir, mode) + kv, err = snapshotsync.WrapBySnapshotsFromDir(kv, snapshotDir, mode) if err != nil { return err } } snKV := ethdb.NewLMDB().WithBucketsConfig(func(defaultBuckets dbutils.BucketsCfg) dbutils.BucketsCfg { return dbutils.BucketsCfg{ - dbutils.HeaderPrefix: dbutils.BucketConfigItem{}, - dbutils.SnapshotInfoBucket: dbutils.BucketConfigItem{}, + dbutils.HeaderPrefix: dbutils.BucketConfigItem{}, + dbutils.HeadersSnapshotInfoBucket: dbutils.BucketConfigItem{}, } }).Path(snapshotPath).MustOpen() @@ -107,12 +107,12 @@ func HeaderSnapshot(ctx context.Context, dbPath, snapshotPath string, toBlock ui } } - err = snDB.Put(dbutils.SnapshotInfoBucket, []byte(dbutils.SnapshotHeadersHeadNumber), big.NewInt(0).SetUint64(toBlock).Bytes()) + err = snDB.Put(dbutils.HeadersSnapshotInfoBucket, []byte(dbutils.SnapshotHeadersHeadNumber), big.NewInt(0).SetUint64(toBlock).Bytes()) if err != nil { log.Crit("SnapshotHeadersHeadNumber error", "err", err) return err } - err = snDB.Put(dbutils.SnapshotInfoBucket, []byte(dbutils.SnapshotHeadersHeadHash), hash.Bytes()) + err = snDB.Put(dbutils.HeadersSnapshotInfoBucket, []byte(dbutils.SnapshotHeadersHeadHash), hash.Bytes()) if err != nil { log.Crit("SnapshotHeadersHeadHash error", "err", err) return err diff --git a/cmd/snapshots/generator/commands/generate_state_snapshot.go b/cmd/snapshots/generator/commands/generate_state_snapshot.go new file mode 100644 index 000000000..e64b6b50b --- /dev/null +++ b/cmd/snapshots/generator/commands/generate_state_snapshot.go @@ -0,0 +1,177 @@ +package commands + +import ( + "context" + "errors" + "fmt" + "github.com/ledgerwatch/turbo-geth/common" + "github.com/ledgerwatch/turbo-geth/common/dbutils" + "github.com/ledgerwatch/turbo-geth/core/state" + "github.com/ledgerwatch/turbo-geth/core/types/accounts" + "github.com/ledgerwatch/turbo-geth/ethdb" + "github.com/ledgerwatch/turbo-geth/turbo/snapshotsync" + "github.com/ledgerwatch/turbo-geth/turbo/trie" + "github.com/spf13/cobra" + "os" + "time" +) + +func init() { + withChaindata(generateStateSnapshotCmd) + withSnapshotFile(generateStateSnapshotCmd) + withSnapshotData(generateStateSnapshotCmd) + withBlock(generateStateSnapshotCmd) + rootCmd.AddCommand(generateStateSnapshotCmd) + +} + +//go run cmd/snapshots/generator/main.go state_copy --block 11000000 --snapshot /media/b00ris/nvme/snapshots/state --chaindata /media/b00ris/nvme/backup/snapshotsync/tg/chaindata/ &> /media/b00ris/nvme/copy.log +var generateStateSnapshotCmd = &cobra.Command{ + Use: "state", + Short: "Generate state snapshot", + Example: "go run ./cmd/state/main.go stateSnapshot --block 11000000 --chaindata /media/b00ris/nvme/tgstaged/tg/chaindata/ --snapshot /media/b00ris/nvme/snapshots/state", + RunE: func(cmd *cobra.Command, args []string) error { + return GenerateStateSnapshot(cmd.Context(), chaindata, snapshotFile, block, snapshotDir, snapshotMode) + }, +} + +func GenerateStateSnapshot(ctx context.Context, dbPath, snapshotPath string, toBlock uint64, snapshotDir string, snapshotMode string) error { + if snapshotPath == "" { + return errors.New("empty snapshot path") + } + + err := os.RemoveAll(snapshotPath) + if err != nil { + return err + } + kv := ethdb.NewLMDB().Path(dbPath).MustOpen() + + if snapshotDir != "" { + var mode snapshotsync.SnapshotMode + mode, err = snapshotsync.SnapshotModeFromString(snapshotMode) + if err != nil { + return err + } + + kv, err = snapshotsync.WrapBySnapshotsFromDir(kv, snapshotDir, mode) + if err != nil { + return err + } + } + + snkv := ethdb.NewLMDB().WithBucketsConfig(func(defaultBuckets dbutils.BucketsCfg) dbutils.BucketsCfg { + return dbutils.BucketsCfg{ + dbutils.PlainStateBucket: dbutils.BucketConfigItem{}, + dbutils.PlainContractCodeBucket: dbutils.BucketConfigItem{}, + dbutils.CodeBucket: dbutils.BucketConfigItem{}, + dbutils.StateSnapshotInfoBucket: dbutils.BucketConfigItem{}, + } + }).Path(snapshotPath).MustOpen() + + sndb := ethdb.NewObjectDatabase(snkv) + mt := sndb.NewBatch() + + tx, err := kv.Begin(context.Background(), nil, ethdb.RO) + if err != nil { + return err + } + tx2, err := kv.Begin(context.Background(), nil, ethdb.RO) + if err != nil { + return err + } + defer tx.Rollback() + + i := 0 + t := time.Now() + tt := time.Now() + err = state.WalkAsOfAccounts(tx, common.Address{}, toBlock+1, func(k []byte, v []byte) (bool, error) { + i++ + if i%100000 == 0 { + fmt.Println(i, common.Bytes2Hex(k), "batch", time.Since(tt)) + tt = time.Now() + select { + case <-ctx.Done(): + return false, errors.New("interrupted") + default: + + } + } + if len(k) != 20 { + return true, nil + } + + var acc accounts.Account + if err = acc.DecodeForStorage(v); err != nil { + return false, fmt.Errorf("decoding %x for %x: %v", v, k, err) + } + + if acc.Incarnation > 0 { + storagePrefix := dbutils.PlainGenerateStoragePrefix(k, acc.Incarnation) + if acc.IsEmptyRoot() { + t := trie.New(common.Hash{}) + j := 0 + innerErr := state.WalkAsOfStorage(tx2, common.BytesToAddress(k), acc.Incarnation, common.Hash{}, toBlock+1, func(k1, k2 []byte, vv []byte) (bool, error) { + j++ + innerErr1 := mt.Put(dbutils.PlainStateBucket, dbutils.PlainGenerateCompositeStorageKey(k1, acc.Incarnation, k2), common.CopyBytes(vv)) + if innerErr1 != nil { + return false, innerErr1 + } + + h, _ := common.HashData(k1) + t.Update(h.Bytes(), common.CopyBytes(vv)) + + return true, nil + }) + if innerErr != nil { + return false, innerErr + } + acc.Root = t.Hash() + } + + if acc.IsEmptyCodeHash() { + codeHash, err1 := tx2.GetOne(dbutils.PlainContractCodeBucket, storagePrefix) + if err1 != nil && errors.Is(err1, ethdb.ErrKeyNotFound) { + return false, fmt.Errorf("getting code hash for %x: %v", k, err1) + } + if len(codeHash) > 0 { + code, err1 := tx2.GetOne(dbutils.CodeBucket, codeHash) + if err1 != nil { + return false, err1 + } + if err1 = mt.Put(dbutils.CodeBucket, codeHash, code); err1 != nil { + return false, err1 + } + if err1 = mt.Put(dbutils.PlainContractCodeBucket, storagePrefix, codeHash); err1 != nil { + return false, err1 + } + } + } + } + newAcc := make([]byte, acc.EncodingLengthForStorage()) + acc.EncodeForStorage(newAcc) + innerErr := mt.Put(dbutils.PlainStateBucket, common.CopyBytes(k), newAcc) + if innerErr != nil { + return false, innerErr + } + + if mt.BatchSize() >= mt.IdealBatchSize() { + ttt := time.Now() + innerErr = mt.CommitAndBegin(context.Background()) + if innerErr != nil { + return false, innerErr + } + fmt.Println("Committed", time.Since(ttt)) + } + return true, nil + }) + if err != nil { + return err + } + _, err = mt.Commit() + if err != nil { + return err + } + fmt.Println("took", time.Since(t)) + + return VerifyStateSnapshot(ctx, dbPath, snapshotFile, block) +} diff --git a/cmd/snapshots/generator/commands/verify_state_snapshot.go b/cmd/snapshots/generator/commands/verify_state_snapshot.go new file mode 100644 index 000000000..ecab3f755 --- /dev/null +++ b/cmd/snapshots/generator/commands/verify_state_snapshot.go @@ -0,0 +1,100 @@ +package commands + +import ( + "context" + "fmt" + "github.com/ledgerwatch/lmdb-go/lmdb" + "github.com/ledgerwatch/turbo-geth/common/dbutils" + "github.com/ledgerwatch/turbo-geth/core/rawdb" + "github.com/ledgerwatch/turbo-geth/eth/stagedsync" + "github.com/ledgerwatch/turbo-geth/ethdb" + "github.com/ledgerwatch/turbo-geth/turbo/snapshotsync" + "github.com/spf13/cobra" + "io/ioutil" + "os" + "time" +) + +func init() { + withChaindata(verifyStateSnapshotCmd) + withSnapshotFile(verifyStateSnapshotCmd) + withBlock(verifyStateSnapshotCmd) + + rootCmd.AddCommand(verifyStateSnapshotCmd) +} + +// +var verifyStateSnapshotCmd = &cobra.Command{ + Use: "verify_state", + Short: "Verify state snapshot", + Example: "go run cmd/snapshots/generator/main.go verify_state --block 11000000 --snapshot /media/b00ris/nvme/snapshots/state/ --chaindata /media/b00ris/nvme/backup/snapshotsync/tg/chaindata/ ", + RunE: func(cmd *cobra.Command, args []string) error { + return VerifyStateSnapshot(cmd.Context(), chaindata, snapshotFile, block) + }, +} + +func VerifyStateSnapshot(ctx context.Context, dbPath, snapshotPath string, block uint64) error { + db, err := ethdb.Open(dbPath, true) + if err != nil { + return fmt.Errorf("open err: %w", err) + } + + kv := db.KV() + if snapshotDir != "" { + var mode snapshotsync.SnapshotMode + mode, err = snapshotsync.SnapshotModeFromString(snapshotMode) + if err != nil { + return err + } + kv, err = snapshotsync.WrapBySnapshotsFromDir(kv, snapshotDir, mode) + if err != nil { + return err + } + } + db.SetKV(kv) + + snkv := ethdb.NewLMDB().WithBucketsConfig(func(defaultBuckets dbutils.BucketsCfg) dbutils.BucketsCfg { + return dbutils.BucketsCfg{ + dbutils.PlainStateBucket: dbutils.BucketsConfigs[dbutils.PlainStateBucket], + dbutils.PlainContractCodeBucket: dbutils.BucketsConfigs[dbutils.PlainContractCodeBucket], + dbutils.CodeBucket: dbutils.BucketsConfigs[dbutils.CodeBucket], + } + }).Path(snapshotPath).Flags(func(flags uint) uint { return flags | lmdb.Readonly }).MustOpen() + + tmpPath, err := ioutil.TempDir(os.TempDir(), "vrf*") + if err != nil { + return err + } + tmpDB := ethdb.NewLMDB().Path(tmpPath).MustOpen() + defer os.RemoveAll(tmpPath) + defer tmpDB.Close() + snkv = ethdb.NewSnapshot2KV().SnapshotDB([]string{dbutils.PlainStateBucket, dbutils.PlainContractCodeBucket, dbutils.CodeBucket}, snkv).DB(tmpDB).MustOpen() + sndb := ethdb.NewObjectDatabase(snkv) + tx, err := sndb.Begin(context.Background(), ethdb.RW) + if err != nil { + return err + } + defer tx.Rollback() + hash, err := rawdb.ReadCanonicalHash(db, block) + if err != nil { + return err + } + + syncHeadHeader := rawdb.ReadHeader(db, hash, block) + if syncHeadHeader == nil { + return fmt.Errorf("empty header") + } + expectedRootHash := syncHeadHeader.Root + tt := time.Now() + err = stagedsync.PromoteHashedStateCleanly("", tx, os.TempDir(), ctx.Done()) + fmt.Println("Promote took", time.Since(tt)) + if err != nil { + return fmt.Errorf("promote state err: %w", err) + } + + err = stagedsync.RegenerateIntermediateHashes("", tx, true, os.TempDir(), expectedRootHash, ctx.Done()) + if err != nil { + return fmt.Errorf("regenerateIntermediateHashes err: %w", err) + } + return nil +} diff --git a/cmd/snapshots/seeder/commands/root.go b/cmd/snapshots/seeder/commands/root.go index 5294ee7c0..a6cc6a10d 100644 --- a/cmd/snapshots/seeder/commands/root.go +++ b/cmd/snapshots/seeder/commands/root.go @@ -54,6 +54,7 @@ var rootCmd = &cobra.Command{ debug.Exit() }, Args: cobra.ExactArgs(1), + ArgAliases: []string{"snapshots dir"}, RunE: func(cmd *cobra.Command, args []string) error { return Seed(cmd.Context(), args[0]) }, diff --git a/cmd/snapshots/seeder/commands/seeder.go b/cmd/snapshots/seeder/commands/seeder.go index 388f9ad63..458d32b9b 100644 --- a/cmd/snapshots/seeder/commands/seeder.go +++ b/cmd/snapshots/seeder/commands/seeder.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "os" - "os/signal" "path/filepath" "time" @@ -20,13 +19,7 @@ import ( func Seed(ctx context.Context, datadir string) error { datadir = filepath.Dir(datadir) ctx, cancel := context.WithCancel(ctx) - - c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt) - go func() { - <-c - cancel() - }() + defer cancel() cfg := trnt.DefaultTorrentConfig() cfg.NoDHT = false @@ -39,7 +32,7 @@ func Seed(ctx context.Context, datadir string) error { pathes := []string{ cfg.DataDir + "/headers", cfg.DataDir + "/bodies", - //cfg.DataDir + "/state", + cfg.DataDir + "/state", //cfg.DataDir+"/receipts", } diff --git a/cmd/state/verify/verify_headers_snapshot.go b/cmd/state/verify/verify_headers_snapshot.go index 27bd18708..b2e904d20 100644 --- a/cmd/state/verify/verify_headers_snapshot.go +++ b/cmd/state/verify/verify_headers_snapshot.go @@ -16,7 +16,7 @@ func HeadersSnapshot(snapshotPath string) error { snKV := ethdb.NewLMDB().Path(snapshotPath).Flags(func(flags uint) uint { return flags | lmdb.Readonly }).WithBucketsConfig(func(defaultBuckets dbutils.BucketsCfg) dbutils.BucketsCfg { return dbutils.BucketsCfg{ dbutils.HeaderPrefix: dbutils.BucketConfigItem{}, - dbutils.SnapshotInfoBucket: dbutils.BucketConfigItem{}, + dbutils.HeadersSnapshotInfoBucket: dbutils.BucketConfigItem{}, } }).MustOpen() var prevHeader *types.Header diff --git a/common/bytes.go b/common/bytes.go index b07c52af9..33291f4a8 100644 --- a/common/bytes.go +++ b/common/bytes.go @@ -141,11 +141,14 @@ func TrimRightZeroes(s []byte) []byte { func KeyCmp(key1, key2 []byte) (int, bool) { switch { - case key1 == nil && key2 == nil: + //both keys are empty + case len(key1) == 0 && len(key2) == 0: return 0, true - case key1 == nil && key2 != nil: + // key1 is empty + case len(key1) == 0 && len(key2) != 0: return 1, false - case key1 != nil && key2 == nil: + // key2 is empty + case len(key1) != 0 && len(key2) == 0: return -1, false default: return bytes.Compare(key1, key2), false diff --git a/common/dbutils/bucket.go b/common/dbutils/bucket.go index 697a0171a..fa8062f36 100644 --- a/common/dbutils/bucket.go +++ b/common/dbutils/bucket.go @@ -100,8 +100,11 @@ var ( IntermediateTrieHashBucketOld1 = "iTh" // DatabaseInfoBucket is used to store information about data layout. - DatabaseInfoBucket = "DBINFO" - SnapshotInfoBucket = "SNINFO" + DatabaseInfoBucket = "DBINFO" + SnapshotInfoBucket = "SNINFO" + HeadersSnapshotInfoBucket = "hSNINFO" + BodiesSnapshotInfoBucket = "bSNINFO" + StateSnapshotInfoBucket = "sSNINFO" // databaseVerisionKey tracks the current database version. DatabaseVerisionKey = "DatabaseVersion" @@ -240,6 +243,9 @@ var Buckets = []string{ LogTopicIndex, LogAddressIndex, SnapshotInfoBucket, + HeadersSnapshotInfoBucket, + BodiesSnapshotInfoBucket, + StateSnapshotInfoBucket, CallFromIndex, CallToIndex, Log, diff --git a/eth/backend.go b/eth/backend.go index 54e6bbd8d..401ce5f87 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -232,12 +232,12 @@ func New(stack *node.Node, config *Config) (*Ethereum, error) { } snapshotKV := chainDb.KV() - snapshotKV, innerErr = snapshotsync.WrapBySnapshots2(snapshotKV, downloadedSnapshots) + snapshotKV, innerErr = snapshotsync.WrapBySnapshotsFromDownloader(snapshotKV, downloadedSnapshots) if innerErr != nil { return nil, innerErr } chainDb.SetKV(snapshotKV) - innerErr = snapshotsync.PostProcessing(chainDb, config.SnapshotMode) + innerErr = snapshotsync.PostProcessing(chainDb, config.SnapshotMode, downloadedSnapshots) if innerErr != nil { return nil, innerErr } @@ -251,6 +251,7 @@ func New(stack *node.Node, config *Config) (*Ethereum, error) { if err != nil { return nil, err } + err = torrentClient.Load(chainDb) if err != nil { return nil, err @@ -258,16 +259,20 @@ func New(stack *node.Node, config *Config) (*Ethereum, error) { err = torrentClient.AddSnapshotsTorrents(context.Background(), chainDb, config.NetworkID, config.SnapshotMode) if err == nil { torrentClient.Download() - snapshotKV := chainDb.KV() - snapshotKV, err = snapshotsync.WrapBySnapshots(snapshotKV, dbPath, config.SnapshotMode) - if err != nil { - return nil, err + mp, innerErr := torrentClient.GetSnapshots(chainDb, config.NetworkID) + if innerErr != nil { + return nil, innerErr + } + + snapshotKV, innerErr = snapshotsync.WrapBySnapshotsFromDownloader(snapshotKV, mp) + if innerErr != nil { + return nil, innerErr } chainDb.SetKV(snapshotKV) - err = snapshotsync.PostProcessing(chainDb, config.SnapshotMode) - if err != nil { - return nil, err + innerErr = snapshotsync.PostProcessing(chainDb, config.SnapshotMode, mp) + if innerErr != nil { + return nil, innerErr } } else { log.Error("There was an error in snapshot init. Swithing to regular sync", "err", err) diff --git a/eth/stagedsync/stage_hashstate.go b/eth/stagedsync/stage_hashstate.go index 429a31805..52cbccf0d 100644 --- a/eth/stagedsync/stage_hashstate.go +++ b/eth/stagedsync/stage_hashstate.go @@ -34,7 +34,7 @@ func SpawnHashStateStage(s *StageState, db ethdb.Database, tmpdir string, quit < log.Info(fmt.Sprintf("[%s] Promoting plain state", logPrefix), "from", s.BlockNumber, "to", to) if s.BlockNumber == 0 { // Initial hashing of the state is performed at the previous stage - if err := promoteHashedStateCleanly(logPrefix, db, tmpdir, quit); err != nil { + if err := PromoteHashedStateCleanly(logPrefix, db, tmpdir, quit); err != nil { return fmt.Errorf("[%s] %w", logPrefix, err) } } else { @@ -74,7 +74,7 @@ func unwindHashStateStageImpl(logPrefix string, u *UnwindState, s *StageState, s return nil } -func promoteHashedStateCleanly(logPrefix string, db ethdb.Database, tmpdir string, quit <-chan struct{}) error { +func PromoteHashedStateCleanly(logPrefix string, db ethdb.Database, tmpdir string, quit <-chan struct{}) error { err := etl.Transform( logPrefix, db, diff --git a/eth/stagedsync/stage_hashstate_test.go b/eth/stagedsync/stage_hashstate_test.go index dcee9d403..59c05ab83 100644 --- a/eth/stagedsync/stage_hashstate_test.go +++ b/eth/stagedsync/stage_hashstate_test.go @@ -35,7 +35,7 @@ func TestPromoteHashedStateClearState(t *testing.T) { generateBlocks(t, 1, 50, hashedWriterGen(tx1), changeCodeWithIncarnations) generateBlocks(t, 1, 50, plainWriterGen(tx2), changeCodeWithIncarnations) - err = promoteHashedStateCleanly("logPrefix", tx2, getTmpDir(), nil) + err = PromoteHashedStateCleanly("logPrefix", tx2, getTmpDir(), nil) if err != nil { t.Errorf("error while promoting state: %v", err) } @@ -71,7 +71,7 @@ func TestPromoteHashedStateIncremental(t *testing.T) { err = tx2.CommitAndBegin(context.Background()) require.NoError(t, err) - err = promoteHashedStateCleanly("logPrefix", tx2, getTmpDir(), nil) + err = PromoteHashedStateCleanly("logPrefix", tx2, getTmpDir(), nil) if err != nil { t.Errorf("error while promoting state: %v", err) } @@ -150,7 +150,7 @@ func TestUnwindHashed(t *testing.T) { generateBlocks(t, 1, 50, hashedWriterGen(tx1), changeCodeWithIncarnations) generateBlocks(t, 1, 50, plainWriterGen(tx2), changeCodeWithIncarnations) - err = promoteHashedStateCleanly("logPrefix", tx2, getTmpDir(), nil) + err = PromoteHashedStateCleanly("logPrefix", tx2, getTmpDir(), nil) if err != nil { t.Errorf("error while promoting state: %v", err) } diff --git a/eth/stagedsync/stage_interhashes.go b/eth/stagedsync/stage_interhashes.go index 9a8484864..80ddbf6f6 100644 --- a/eth/stagedsync/stage_interhashes.go +++ b/eth/stagedsync/stage_interhashes.go @@ -56,7 +56,7 @@ func SpawnIntermediateHashesStage(s *StageState, db ethdb.Database, checkRoot bo logPrefix := s.state.LogPrefix() log.Info(fmt.Sprintf("[%s] Generating intermediate hashes", logPrefix), "from", s.BlockNumber, "to", to) if s.BlockNumber == 0 { - if err := regenerateIntermediateHashes(logPrefix, tx, checkRoot, tmpdir, expectedRootHash, quit); err != nil { + if err := RegenerateIntermediateHashes(logPrefix, tx, checkRoot, tmpdir, expectedRootHash, quit); err != nil { return err } } else { @@ -78,7 +78,7 @@ func SpawnIntermediateHashesStage(s *StageState, db ethdb.Database, checkRoot bo return nil } -func regenerateIntermediateHashes(logPrefix string, db ethdb.Database, checkRoot bool, tmpdir string, expectedRootHash common.Hash, quit <-chan struct{}) error { +func RegenerateIntermediateHashes(logPrefix string, db ethdb.Database, checkRoot bool, tmpdir string, expectedRootHash common.Hash, quit <-chan struct{}) error { log.Info(fmt.Sprintf("[%s] Regeneration intermediate hashes started", logPrefix)) // Clear IH bucket c := db.(ethdb.HasTx).Tx().Cursor(dbutils.IntermediateTrieHashBucket) diff --git a/ethdb/kv_snapshot.go b/ethdb/kv_snapshot.go index 4bab875e6..c04875478 100644 --- a/ethdb/kv_snapshot.go +++ b/ethdb/kv_snapshot.go @@ -1,542 +1,774 @@ package ethdb import ( + "bytes" "context" "errors" "fmt" - "sync" - "unsafe" - "github.com/ledgerwatch/turbo-geth/common" "github.com/ledgerwatch/turbo-geth/common/dbutils" - "github.com/ledgerwatch/turbo-geth/log" + "unsafe" ) var ( - _ KV = &SnapshotKV{} - _ BucketMigrator = &snapshotTX{} - _ Tx = &snapshotTX{} - _ Tx = &lazyTx{} - _ Cursor = &snapshotCursor{} + _ KV = &SnapshotKV2{} + _ Tx = &sn2TX{} + _ BucketMigrator = &sn2TX{} + _ Cursor = &snCursor2{} ) -func (s *snapshotCursor) Reserve(k []byte, n int) ([]byte, error) { - return s.dbCursor.Reserve(k, n) +func NewSnapshot2KV() snapshotOpts2 { + return snapshotOpts2{} } -func (s *snapshotCursor) PutCurrent(key, value []byte) error { - return s.dbCursor.PutCurrent(key, value) +type snapshotData struct { + buckets []string + snapshot KV +} +type snapshotOpts2 struct { + db KV + snapshots []snapshotData } -func (s *snapshotTX) CursorDupSort(bucket string) CursorDupSort { - return s.dbTX.CursorDupSort(bucket) -} - -func (s *snapshotTX) Sequence(bucket string, amount uint64) (uint64, error) { - return s.dbTX.Sequence(bucket, amount) -} - -func (s *snapshotTX) CursorDupFixed(bucket string) CursorDupFixed { - return s.dbTX.CursorDupFixed(bucket) -} - -func (s *snapshotTX) Comparator(bucket string) dbutils.CmpFunc { - return s.dbTX.Comparator(bucket) -} - -func (s *snapshotTX) Cmp(bucket string, a, b []byte) int { - return s.dbTX.Cmp(bucket, a, b) -} - -func (s *snapshotTX) DCmp(bucket string, a, b []byte) int { - return s.dbTX.DCmp(bucket, a, b) -} - -func (s *snapshotTX) CHandle() unsafe.Pointer { - return s.dbTX.CHandle() -} - -func (v *lazyTx) CursorDupSort(bucket string) CursorDupSort { - panic("implement me") -} - -func (v *lazyTx) CursorDupFixed(bucket string) CursorDupFixed { - panic("implement me") -} - -func (v *lazyTx) Sequence(bucket string, amount uint64) (uint64, error) { - panic("implement me") -} - -func (v *lazyTx) Comparator(bucket string) dbutils.CmpFunc { - panic("implement me") -} - -func (v *lazyTx) Cmp(bucket string, a, b []byte) int { - panic("implement me") -} - -func (v *lazyTx) DCmp(bucket string, a, b []byte) int { - panic("implement me") -} - -func (v *lazyTx) CHandle() unsafe.Pointer { - panic("implement me") -} - -func (s *snapshotCursor) Prev() ([]byte, []byte, error) { - return s.dbCursor.Prev() -} - -func (s *snapshotCursor) Current() ([]byte, []byte, error) { - return s.dbCursor.Current() -} - -func (s *snapshotCursor) DeleteCurrent() error { - return s.dbCursor.DeleteCurrent() -} - -func (s *snapshotCursor) Count() (uint64, error) { - return s.dbCursor.Count() -} - -func (s *SnapshotKV) AllBuckets() dbutils.BucketsCfg { - return s.db.AllBuckets() -} - -func (s *snapshotTX) DropBucket(bucket string) error { - db, ok := s.dbTX.(BucketMigrator) - if !ok { - return fmt.Errorf("%T doesn't implement ethdb.TxMigrator interface", s.dbTX) - } - return db.DropBucket(bucket) -} - -func (s *snapshotTX) CreateBucket(bucket string) error { - db, ok := s.dbTX.(BucketMigrator) - if !ok { - return fmt.Errorf("%T doesn't implement ethdb.TxMigrator interface", s.dbTX) - } - return db.CreateBucket(bucket) -} - -func (s *snapshotTX) ExistsBucket(bucket string) bool { - db, ok := s.dbTX.(BucketMigrator) - if !ok { - return false - } - return db.ExistsBucket(bucket) -} - -func (s *snapshotTX) ClearBucket(bucket string) error { - db, ok := s.dbTX.(BucketMigrator) - if !ok { - return fmt.Errorf("%T doesn't implement ethdb.TxMigrator interface", s.dbTX) - } - return db.ClearBucket(bucket) -} - -func (s *snapshotTX) ExistingBuckets() ([]string, error) { - db, ok := s.dbTX.(BucketMigrator) - if !ok { - return []string{}, fmt.Errorf("%T doesn't implement ethdb.TxMigrator interface", s.dbTX) - } - return db.ExistingBuckets() -} - -func NewSnapshotKV() snapshotOpts { - return snapshotOpts{ - forBuckets: make(map[string]struct{}), - } -} - -type SnapshotKV struct { - db KV - snapshotDB KV - forBuckets map[string]struct{} -} - -type snapshotOpts struct { - db KV - snapshot KV - forBuckets map[string]struct{} -} - -func (opts snapshotOpts) SnapshotDB(db KV) snapshotOpts { - opts.snapshot = db +func (opts snapshotOpts2) SnapshotDB(buckets []string, db KV) snapshotOpts2 { + opts.snapshots = append(opts.snapshots, snapshotData{ + buckets: buckets, + snapshot: db, + }) return opts } -func (opts snapshotOpts) DB(db KV) snapshotOpts { +func (opts snapshotOpts2) DB(db KV) snapshotOpts2 { opts.db = db return opts } -func (opts snapshotOpts) For(bucket string) snapshotOpts { - opts.forBuckets[bucket] = struct{}{} - return opts -} - -func (opts snapshotOpts) MustOpen() KV { - return &SnapshotKV{ - snapshotDB: opts.snapshot, - db: opts.db, - forBuckets: opts.forBuckets, +func (opts snapshotOpts2) MustOpen() KV { + snapshots := make(map[string]snapshotData) + for i, v := range opts.snapshots { + for _, bucket := range v.buckets { + snapshots[bucket] = opts.snapshots[i] + } + } + return &SnapshotKV2{ + snapshots: snapshots, + db: opts.db, } } -func (s *SnapshotKV) View(ctx context.Context, f func(tx Tx) error) error { - dbTx, err := s.db.Begin(ctx, nil, RO) +type SnapshotKV2 struct { + db KV + snapshots map[string]snapshotData +} + +func (s *SnapshotKV2) View(ctx context.Context, f func(tx Tx) error) error { + snTX, err := s.Begin(ctx, nil, RO) if err != nil { return err } + defer snTX.Rollback() + return f(snTX) +} - t := &snapshotTX{ - dbTX: dbTx, - snTX: newVirtualTx(func() (Tx, error) { - return s.snapshotDB.Begin(ctx, nil, RO) - }, s.forBuckets), - forBuckets: s.forBuckets, +func (s *SnapshotKV2) Update(ctx context.Context, f func(tx Tx) error) error { + tx, err := s.Begin(ctx, nil, RW) + if err != nil { + return err } - defer t.Rollback() - return f(t) + defer tx.Rollback() + + err = f(tx) + if err == nil { + return tx.Commit(ctx) + } + return err } -func (s *SnapshotKV) Update(ctx context.Context, f func(tx Tx) error) error { - return s.db.Update(ctx, f) -} - -func (s *SnapshotKV) Close() { - defer s.db.Close() - if s.snapshotDB != nil { - defer s.snapshotDB.Close() +func (s *SnapshotKV2) Close() { + s.db.Close() + for i := range s.snapshots { + s.snapshots[i].snapshot.Close() } } -func (s *SnapshotKV) Begin(ctx context.Context, parentTx Tx, flags TxFlags) (Tx, error) { - dbTx, err := s.db.Begin(ctx, parentTx, flags) +func (s *SnapshotKV2) Begin(ctx context.Context, parent Tx, flags TxFlags) (Tx, error) { + dbTx, err := s.db.Begin(ctx, parent, flags) if err != nil { return nil, err } - - t := &snapshotTX{ - dbTX: dbTx, - snTX: newVirtualTx(func() (Tx, error) { - return s.snapshotDB.Begin(ctx, parentTx, flags) - }, s.forBuckets), - forBuckets: s.forBuckets, - } - return t, nil + return &sn2TX{ + dbTX: dbTx, + snapshots: s.snapshots, + snTX: map[string]Tx{}, + }, nil } -func newVirtualTx(construct func() (Tx, error), forBucket map[string]struct{}) *lazyTx { - return &lazyTx{ - construct: construct, - forBuckets: forBucket, +func (s *SnapshotKV2) AllBuckets() dbutils.BucketsCfg { + return s.db.AllBuckets() +} + +var ErrUnavailableSnapshot = errors.New("unavailable snapshot") + +type sn2TX struct { + dbTX Tx + snapshots map[string]snapshotData + snTX map[string]Tx +} + +func (s *sn2TX) DropBucket(bucket string) error { + return s.dbTX.(BucketMigrator).DropBucket(bucket) +} + +func (s *sn2TX) CreateBucket(bucket string) error { + return s.dbTX.(BucketMigrator).CreateBucket(bucket) +} + +func (s *sn2TX) ExistsBucket(bucket string) bool { + //todo snapshot check? + return s.dbTX.(BucketMigrator).ExistsBucket(bucket) +} + +func (s *sn2TX) ClearBucket(bucket string) error { + return s.dbTX.(BucketMigrator).ClearBucket(bucket) +} + +func (s *sn2TX) ExistingBuckets() ([]string, error) { + panic("implement me") +} + +func (s *sn2TX) Cursor(bucket string) Cursor { + tx, err := s.getSnapshotTX(bucket) + if err != nil && !errors.Is(err, ErrUnavailableSnapshot) { + panic(err.Error()) + } + //process only db buckets + if errors.Is(err, ErrUnavailableSnapshot) { + return s.dbTX.Cursor(bucket) + } + return &snCursor2{ + dbCursor: s.dbTX.Cursor(bucket), + snCursor: tx.Cursor(bucket), } } -var ErrNotSnapshotBucket = errors.New("not snapshot bucket") - -//lazyTx is used for lazy transaction creation. -type lazyTx struct { - construct func() (Tx, error) - forBuckets map[string]struct{} - tx Tx - sync.Mutex +func (s *sn2TX) CursorDupSort(bucket string) CursorDupSort { + tx, err := s.getSnapshotTX(bucket) + if err != nil && !errors.Is(err, ErrUnavailableSnapshot) { + panic(err.Error()) + } + //process only db buckets + if errors.Is(err, ErrUnavailableSnapshot) { + return s.dbTX.CursorDupSort(bucket) + } + dbc := s.dbTX.CursorDupSort(bucket) + sncbc := tx.CursorDupSort(bucket) + return &snCursor2Dup{ + snCursor2{ + dbCursor: dbc, + snCursor: sncbc, + }, + dbc, + sncbc, + } } -func (v *lazyTx) getTx() (Tx, error) { +func (s *sn2TX) CursorDupFixed(bucket string) CursorDupFixed { + panic("implement me") +} + +func (s *sn2TX) GetOne(bucket string, key []byte) (val []byte, err error) { + v, err := s.dbTX.GetOne(bucket, key) + if err != nil { + return nil, err + } + if len(v) == 0 { + snTx, innerErr := s.getSnapshotTX(bucket) + if innerErr != nil && !errors.Is(innerErr, ErrUnavailableSnapshot) { + return nil, innerErr + } + //process only db buckets + if errors.Is(innerErr, ErrUnavailableSnapshot) { + return v, nil + } + v, err = snTx.GetOne(bucket, key) + if err != nil { + return nil, err + } + if bytes.Equal(v, DeletedValue) { + return nil, nil + } + return v, nil + } + return v, nil +} +func (s *sn2TX) getSnapshotTX(bucket string) (Tx, error) { + tx, ok := s.snTX[bucket] + if ok { + return tx, nil + } + sn, ok := s.snapshots[bucket] + if !ok { + return nil, fmt.Errorf("%s %w", bucket, ErrUnavailableSnapshot) + } var err error - v.Lock() - defer v.Unlock() - if v.tx == nil { - v.tx, err = v.construct() - } - return v.tx, err -} - -func (v *lazyTx) Cursor(bucket string) Cursor { - if _, ok := v.forBuckets[bucket]; !ok { - return nil - } - - tx, err := v.getTx() - if err != nil { - log.Error("Fail to create tx", "err", err) - } - if tx != nil { - return tx.Cursor(bucket) - } - return nil -} - -func (v *lazyTx) GetOne(bucket string, key []byte) (val []byte, err error) { - if _, ok := v.forBuckets[bucket]; !ok { - return nil, ErrNotSnapshotBucket - } - tx, err := v.getTx() + tx, err = sn.snapshot.Begin(context.TODO(), nil, RO) if err != nil { return nil, err } - return tx.GetOne(bucket, key) + + s.snTX[bucket] = tx + return tx, nil } -func (v *lazyTx) HasOne(bucket string, key []byte) (bool, error) { - if _, ok := v.forBuckets[bucket]; !ok { - return false, ErrNotSnapshotBucket - } - tx, err := v.getTx() +func (s *sn2TX) HasOne(bucket string, key []byte) (bool, error) { + v, err := s.dbTX.HasOne(bucket, key) if err != nil { return false, err } - return tx.HasOne(bucket, key) -} + if !v { + snTx, err := s.getSnapshotTX(bucket) + if err != nil && !errors.Is(err, ErrUnavailableSnapshot) { + return false, err + } + //process only db buckets + if errors.Is(err, ErrUnavailableSnapshot) { + return v, nil + } -func (v *lazyTx) Commit(ctx context.Context) error { - return nil -} + v, err := snTx.GetOne(bucket, key) + if err != nil { + return false, err + } + if bytes.Equal(v, DeletedValue) { + return false, nil + } -func (v *lazyTx) Rollback() { - v.Lock() - defer v.Unlock() - if v.tx != nil { - v.tx.Rollback() + return true, nil } + return v, nil } -func (v *lazyTx) BucketSize(bucket string) (uint64, error) { - if _, ok := v.forBuckets[bucket]; !ok { - return 0, nil +func (s *sn2TX) Commit(ctx context.Context) error { + for i := range s.snTX { + defer s.snTX[i].Rollback() } - tx, err := v.getTx() - if err != nil { - return 0, err - } - return tx.BucketSize(bucket) -} - -type snapshotTX struct { - dbTX Tx - snTX Tx - forBuckets map[string]struct{} -} - -func (s *snapshotTX) Commit(ctx context.Context) error { - defer s.snTX.Rollback() return s.dbTX.Commit(ctx) } -func (s *snapshotTX) Rollback() { - defer s.snTX.Rollback() - defer s.dbTX.Rollback() +func (s *sn2TX) Rollback() { + for i := range s.snTX { + defer s.snTX[i].Rollback() + } + s.dbTX.Rollback() + } -func (s *snapshotTX) Cursor(bucket string) Cursor { - if _, ok := s.forBuckets[bucket]; !ok { - return s.dbTX.Cursor(bucket) - } - snCursor := s.snTX.Cursor(bucket) - //check snapshot bucket - if snCursor == nil { - return s.dbTX.Cursor(bucket) - } - return &snapshotCursor{ - snCursor: snCursor, - dbCursor: s.dbTX.Cursor(bucket), - } +func (s *sn2TX) BucketSize(name string) (uint64, error) { + panic("implement me") } -func (s *snapshotTX) GetOne(bucket string, key []byte) (val []byte, err error) { - _, ok := s.forBuckets[bucket] - if !ok { - return s.dbTX.GetOne(bucket, key) - } - v, err := s.dbTX.GetOne(bucket, key) - switch { - case err == nil && v != nil: - return v, nil - case err != nil && !errors.Is(err, ErrKeyNotFound): - return nil, err - } - return s.snTX.GetOne(bucket, key) +func (s *sn2TX) Comparator(bucket string) dbutils.CmpFunc { + return s.dbTX.Comparator(bucket) } -func (s *snapshotTX) HasOne(bucket string, key []byte) (bool, error) { - _, ok := s.forBuckets[bucket] - if !ok { - return s.dbTX.HasOne(bucket, key) - } - v, err := s.dbTX.HasOne(bucket, key) - switch { - case err == nil && v: - return true, nil - case err != nil && !errors.Is(err, ErrKeyNotFound): - return false, err - } - return s.snTX.HasOne(bucket, key) +func (s *sn2TX) Cmp(bucket string, a, b []byte) int { + panic("implement me") } -func (s *snapshotTX) BucketSize(name string) (uint64, error) { - dbSize, err := s.snTX.BucketSize(name) - if err != nil { - return 0, fmt.Errorf("db err %w", err) - } - snSize, err := s.dbTX.BucketSize(name) - if err != nil { - return 0, fmt.Errorf("snapshot db err %w", err) - } - return dbSize + snSize, nil +func (s *sn2TX) DCmp(bucket string, a, b []byte) int { + panic("implement me") } -type snapshotCursor struct { - snCursor Cursor +func (s *sn2TX) Sequence(bucket string, amount uint64) (uint64, error) { + panic("implement me") +} + +func (s *sn2TX) CHandle() unsafe.Pointer { + return s.dbTX.CHandle() +} + +var DeletedValue = []byte("it is deleted value") + +type snCursor2 struct { dbCursor Cursor + snCursor Cursor - lastDBKey []byte - lastSNDBKey []byte - lastDBVal []byte - lastSNDBVal []byte - keyCmp int + currentKey []byte } -func (s *snapshotCursor) Close() { - defer s.dbCursor.Close() - defer s.snCursor.Close() +func (s *snCursor2) Prefix(v []byte) Cursor { + s.dbCursor.Prefix(v) + s.snCursor.Prefix(v) + return s } -func (s *snapshotCursor) Prefix(v []byte) Cursor { +func (s *snCursor2) Prefetch(v uint) Cursor { panic("implement me") } -func (s *snapshotCursor) MatchBits(u uint) Cursor { - panic("implement me") -} - -func (s *snapshotCursor) Prefetch(v uint) Cursor { - panic("implement me") -} - -func (s *snapshotCursor) First() ([]byte, []byte, error) { +func (s *snCursor2) First() ([]byte, []byte, error) { var err error - s.lastSNDBKey, s.lastSNDBVal, err = s.snCursor.First() + lastDBKey, lastDBVal, err := s.dbCursor.First() if err != nil { return nil, nil, err } - s.lastDBKey, s.lastDBVal, err = s.dbCursor.First() + + lastSNDBKey, lastSNDBVal, err := s.snCursor.First() if err != nil { return nil, nil, err } - cmp, br := common.KeyCmp(s.lastDBKey, s.lastSNDBKey) + cmp, br := common.KeyCmp(lastDBKey, lastSNDBKey) if br { return nil, nil, nil } - s.keyCmp = cmp if cmp <= 0 { - return s.lastDBKey, s.lastDBVal, nil + s.saveCurrent(lastDBKey) + return lastDBKey, lastDBVal, nil } - - return s.lastSNDBKey, s.lastSNDBVal, nil + s.saveCurrent(lastSNDBKey) + return lastSNDBKey, lastSNDBVal, nil } -func (s *snapshotCursor) Seek(seek []byte) ([]byte, []byte, error) { - var err error - s.lastSNDBKey, s.lastSNDBVal, err = s.snCursor.Seek(seek) - if err != nil { +func (s *snCursor2) Seek(seek []byte) ([]byte, []byte, error) { + dbKey, dbVal, err := s.dbCursor.Seek(seek) + if err != nil && !errors.Is(err, ErrKeyNotFound) { return nil, nil, err } - s.lastDBKey, s.lastDBVal, err = s.dbCursor.Seek(seek) - if err != nil { + sndbKey, sndbVal, err := s.snCursor.Seek(seek) + if err != nil && !errors.Is(err, ErrKeyNotFound) { return nil, nil, err } - cmp, br := common.KeyCmp(s.lastDBKey, s.lastSNDBKey) - if br { - return nil, nil, nil - } - s.keyCmp = cmp + if bytes.Equal(dbKey, seek) && dbVal != nil { + return dbKey, dbVal, err + } + if bytes.Equal(sndbKey, seek) && sndbVal != nil { + return sndbKey, sndbVal, err + } + cmp, _ := common.KeyCmp(dbKey, sndbKey) if cmp <= 0 { - return s.lastDBKey, s.lastDBVal, nil + s.saveCurrent(dbKey) + return dbKey, dbVal, nil } - - return s.lastSNDBKey, s.lastSNDBVal, nil + s.saveCurrent(sndbKey) + return sndbKey, sndbVal, nil } -func (s *snapshotCursor) Next() ([]byte, []byte, error) { - var err error - if s.keyCmp >= 0 { - s.lastSNDBKey, s.lastSNDBVal, err = s.snCursor.Next() - } - if err != nil { - return nil, nil, err - } - if s.keyCmp <= 0 { - s.lastDBKey, s.lastDBVal, err = s.dbCursor.Next() - } - if err != nil { - return nil, nil, err - } - - cmp, br := common.KeyCmp(s.lastDBKey, s.lastSNDBKey) - if br { - return nil, nil, nil - } - - s.keyCmp = cmp - if cmp <= 0 { - return s.lastDBKey, s.lastDBVal, nil - } - - return s.lastSNDBKey, s.lastSNDBVal, nil -} - -func (s *snapshotCursor) Walk(walker func(k []byte, v []byte) (bool, error)) error { - panic("implement me") -} - -func (s *snapshotCursor) Put(key []byte, value []byte) error { - return s.dbCursor.Put(key, value) -} - -func (s *snapshotCursor) Delete(k, v []byte) error { - return s.dbCursor.Delete(k, v) -} - -func (s *snapshotCursor) Append(key []byte, value []byte) error { - return s.dbCursor.Append(key, value) -} - -func (s *snapshotCursor) SeekExact(key []byte) ([]byte, []byte, error) { +func (s *snCursor2) SeekExact(key []byte) ([]byte, []byte, error) { k, v, err := s.dbCursor.SeekExact(key) if err != nil { - return []byte{}, nil, err + return nil, nil, err + } + if bytes.Equal(v, DeletedValue) { + return nil, nil, nil } if v == nil { - return s.snCursor.SeekExact(key) + k, v, err = s.snCursor.SeekExact(key) + s.saveCurrent(k) + return k, v, err } + s.saveCurrent(k) return k, v, err } -func (s *snapshotCursor) Last() ([]byte, []byte, error) { +func (s *snCursor2) iteration(dbNextElement func() ([]byte, []byte, error), sndbNextElement func() ([]byte, []byte, error), cmpFunc func(kdb, ksndb []byte) (int, bool)) ([]byte, []byte, error) { var err error - s.lastSNDBKey, s.lastSNDBVal, err = s.snCursor.Last() + //current returns error on empty bucket + lastDBKey, lastDBVal, err := s.dbCursor.Current() + if err != nil { + var innerErr error + lastDBKey, lastDBVal, innerErr = dbNextElement() + if innerErr != nil { + return nil, nil, fmt.Errorf("get current from db %w inner %v", err, innerErr) + } + } + + lastSNDBKey, lastSNDBVal, err := s.snCursor.Current() if err != nil { return nil, nil, err } - s.lastDBKey, s.lastDBVal, err = s.dbCursor.Last() - if err != nil { - return nil, nil, err - } - cmp, br := common.KeyCmp(s.lastDBKey, s.lastSNDBKey) + + cmp, br := cmpFunc(lastDBKey, lastSNDBKey) if br { return nil, nil, nil } - s.keyCmp = cmp - if cmp >= 0 { - return s.lastDBKey, s.lastDBVal, nil + //todo Seek fastpath + if cmp > 0 { + lastSNDBKey, lastSNDBVal, err = sndbNextElement() + if err != nil { + return nil, nil, err + } + //todo + if currentKeyCmp, _ := common.KeyCmp(s.currentKey, lastDBKey); len(lastSNDBKey) == 0 && currentKeyCmp >= 0 && len(s.currentKey) > 0 { + lastDBKey, lastDBVal, err = dbNextElement() + } + if err != nil { + return nil, nil, err + } } - return s.lastSNDBKey, s.lastSNDBVal, nil + //current receives last acceptable key. If it is empty + if cmp < 0 { + lastDBKey, lastDBVal, err = dbNextElement() + if err != nil { + return nil, nil, err + } + if currentKeyCmp, _ := common.KeyCmp(s.currentKey, lastSNDBKey); len(lastDBKey) == 0 && currentKeyCmp >= 0 && len(s.currentKey) > 0 { + lastSNDBKey, lastSNDBVal, err = sndbNextElement() + } + if err != nil { + return nil, nil, err + } + } + if cmp == 0 { + lastDBKey, lastDBVal, err = dbNextElement() + if err != nil { + return nil, nil, err + } + lastSNDBKey, lastSNDBVal, err = sndbNextElement() + if err != nil { + return nil, nil, err + } + } + + cmp, br = cmpFunc(lastDBKey, lastSNDBKey) + if br { + return nil, nil, nil + } + if cmp <= 0 { + return lastDBKey, lastDBVal, nil + } + + return lastSNDBKey, lastSNDBVal, nil } + +func (s *snCursor2) Next() ([]byte, []byte, error) { + k, v, err := s.iteration(s.dbCursor.Next, s.snCursor.Next, common.KeyCmp) //f(s.dbCursor.Next, s.snCursor.Next) + if err != nil { + return nil, nil, err + } + for bytes.Equal(v, DeletedValue) { + k, v, err = s.iteration(s.dbCursor.Next, s.snCursor.Next, common.KeyCmp) // f(s.dbCursor.Next, s.snCursor.Next) + if err != nil { + return nil, nil, err + } + + } + s.saveCurrent(k) + return k, v, nil +} + +func (s *snCursor2) Prev() ([]byte, []byte, error) { + k, v, err := s.iteration(s.dbCursor.Prev, s.snCursor.Prev, func(kdb, ksndb []byte) (int, bool) { + cmp, br := KeyCmpBackward(kdb, ksndb) + return -1 * cmp, br + }) + if err != nil { + return nil, nil, err + } + for cmp, _ := KeyCmpBackward(k, s.currentKey); bytes.Equal(v, DeletedValue) || cmp >= 0; cmp, _ = KeyCmpBackward(k, s.currentKey) { + k, v, err = s.iteration(s.dbCursor.Prev, s.snCursor.Prev, func(kdb, ksndb []byte) (int, bool) { + cmp, br := KeyCmpBackward(kdb, ksndb) + return -1 * cmp, br + }) + if err != nil { + return nil, nil, err + } + } + s.saveCurrent(k) + return k, v, nil +} + +func (s *snCursor2) Last() ([]byte, []byte, error) { + var err error + lastSNDBKey, lastSNDBVal, err := s.snCursor.Last() + if err != nil { + return nil, nil, err + } + lastDBKey, lastDBVal, err := s.dbCursor.Last() + if err != nil { + return nil, nil, err + } + cmp, br := KeyCmpBackward(lastDBKey, lastSNDBKey) + if br { + return nil, nil, nil + } + + if cmp >= 0 { + s.saveCurrent(lastDBKey) + return lastDBKey, lastDBVal, nil + } + s.saveCurrent(lastSNDBKey) + return lastSNDBKey, lastSNDBVal, nil +} + +func (s *snCursor2) Current() ([]byte, []byte, error) { + k, v, err := s.dbCursor.Current() + if bytes.Equal(k, s.currentKey) { + return k, v, err + } + return s.snCursor.Current() +} + +func (s *snCursor2) Put(k, v []byte) error { + return s.dbCursor.Put(k, v) +} + +func (s *snCursor2) Append(k []byte, v []byte) error { + return s.dbCursor.Append(k, v) +} + +func (s *snCursor2) Delete(k, v []byte) error { + return s.dbCursor.Put(k, DeletedValue) +} + +func (s *snCursor2) DeleteCurrent() error { + panic("implement me") +} + +func (s *snCursor2) Reserve(k []byte, n int) ([]byte, error) { + panic("implement me") +} + +func (s *snCursor2) PutCurrent(key, value []byte) error { + panic("implement me") +} + +func (s *snCursor2) Count() (uint64, error) { + panic("implement me") +} + +func (s *snCursor2) Close() { + s.dbCursor.Close() + s.snCursor.Close() +} + +type snCursor2Dup struct { + snCursor2 + dbCursorDup CursorDupSort + sndbCursorDup CursorDupSort +} + +func (c *snCursor2Dup) SeekBothExact(key, value []byte) ([]byte, []byte, error) { + k, v, err := c.dbCursorDup.SeekBothExact(key, value) + if err != nil { + return nil, nil, err + } + if v == nil { + k, v, err = c.sndbCursorDup.SeekBothExact(key, value) + c.saveCurrent(k) + return k, v, err + } + c.saveCurrent(k) + return k, v, err + +} + +func (c *snCursor2Dup) SeekBothRange(key, value []byte) ([]byte, []byte, error) { + dbKey, dbVal, err := c.dbCursorDup.SeekBothRange(key, value) + if err != nil { + return nil, nil, err + } + snDBKey, snDBVal, err := c.sndbCursorDup.SeekBothRange(key, value) + if err != nil { + return nil, nil, err + } + + //todo Is it correct comparison + cmp, br := common.KeyCmp(dbKey, snDBKey) + if br { + return nil, nil, nil + } + if cmp >= 0 { + c.saveCurrent(dbKey) + return dbKey, dbVal, nil + } + return snDBKey, snDBVal, nil +} + +func (c *snCursor2Dup) FirstDup() ([]byte, error) { + panic("implement me") +} + +func (c *snCursor2Dup) NextDup() ([]byte, []byte, error) { + panic("implement me") +} + +func (c *snCursor2Dup) NextNoDup() ([]byte, []byte, error) { + panic("implement me") +} + +func (c *snCursor2Dup) LastDup(k []byte) ([]byte, error) { + panic("implement me") +} + +func (c *snCursor2Dup) CountDuplicates() (uint64, error) { + panic("implement me") +} + +func (c *snCursor2Dup) DeleteCurrentDuplicates() error { + panic("implement me") +} + +func (c *snCursor2Dup) AppendDup(key, value []byte) error { + panic("implement me") +} + +func (s *snCursor2) saveCurrent(k []byte) { + if k != nil { + s.currentKey = common.CopyBytes(k) + } +} + +func KeyCmpBackward(key1, key2 []byte) (int, bool) { + switch { + case len(key1) == 0 && len(key2) == 0: + return 0, true + case len(key1) == 0 && len(key2) != 0: + return -1, false + case len(key1) != 0 && len(key2) == 0: + return 1, false + default: + return bytes.Compare(key1, key2), false + } +} + +type KvData struct { + K []byte + V []byte +} + +func GenStateData(data []KvData) (KV, error) { + snapshot := NewLMDB().WithBucketsConfig(func(defaultBuckets dbutils.BucketsCfg) dbutils.BucketsCfg { + return dbutils.BucketsCfg{ + dbutils.PlainStateBucket: dbutils.BucketConfigItem{}, + } + }).InMem().MustOpen() + + err := snapshot.Update(context.Background(), func(tx Tx) error { + c := tx.Cursor(dbutils.PlainStateBucket) + for i := range data { + innerErr := c.Put(data[i].K, data[i].V) + if innerErr != nil { + return innerErr + } + } + return nil + }) + if err != nil { + return nil, err + } + return snapshot, nil +} + +//type cursorSnapshotDupsort struct { +// +//} +// +//func (c *cursorSnapshotDupsort) Prefix(v []byte) Cursor { +// panic("implement me") +//} +// +//func (c *cursorSnapshotDupsort) Prefetch(v uint) Cursor { +// panic("implement me") +//} +// +//func (c *cursorSnapshotDupsort) First() ([]byte, []byte, error) { +// panic("implement me") +//} +// +//func (c *cursorSnapshotDupsort) Seek(seek []byte) ([]byte, []byte, error) { +// panic("implement me") +//} +// +//func (c *cursorSnapshotDupsort) SeekExact(key []byte) ([]byte, []byte, error) { +// panic("implement me") +//} +// +//func (c *cursorSnapshotDupsort) Next() ([]byte, []byte, error) { +// panic("implement me") +//} +// +//func (c *cursorSnapshotDupsort) Prev() ([]byte, []byte, error) { +// panic("implement me") +//} +// +//func (c *cursorSnapshotDupsort) Last() ([]byte, []byte, error) { +// panic("implement me") +//} +// +//func (c *cursorSnapshotDupsort) Current() ([]byte, []byte, error) { +// panic("implement me") +//} +// +//func (c *cursorSnapshotDupsort) Put(k, v []byte) error { +// panic("implement me") +//} +// +//func (c *cursorSnapshotDupsort) Append(k []byte, v []byte) error { +// panic("implement me") +//} +// +//func (c *cursorSnapshotDupsort) Delete(k, v []byte) error { +// panic("implement me") +//} +// +//func (c *cursorSnapshotDupsort) DeleteCurrent() error { +// panic("implement me") +//} +// +//func (c *cursorSnapshotDupsort) Reserve(k []byte, n int) ([]byte, error) { +// panic("implement me") +//} +// +//func (c *cursorSnapshotDupsort) PutCurrent(key, value []byte) error { +// panic("implement me") +//} +// +//func (c *cursorSnapshotDupsort) Count() (uint64, error) { +// panic("implement me") +//} +// +//func (c *cursorSnapshotDupsort) Close() { +// panic("implement me") +//} +// +// +////dupsort +//func (c *cursorSnapshotDupsort) SeekBothExact(key, value []byte) ([]byte, []byte, error) { +// panic("implement me") +//} +// +//func (c *cursorSnapshotDupsort) SeekBothRange(key, value []byte) ([]byte, []byte, error) { +// panic("implement me") +//} +// +//func (c *cursorSnapshotDupsort) FirstDup() ([]byte, error) { +// panic("implement me") +//} +// +//func (c *cursorSnapshotDupsort) NextDup() ([]byte, []byte, error) { +// panic("implement me") +//} +// +//func (c *cursorSnapshotDupsort) NextNoDup() ([]byte, []byte, error) { +// panic("implement me") +//} +// +//func (c *cursorSnapshotDupsort) LastDup(k []byte) ([]byte, error) { +// panic("implement me") +//} +// +//func (c *cursorSnapshotDupsort) CountDuplicates() (uint64, error) { +// panic("implement me") +//} +// +//func (c *cursorSnapshotDupsort) DeleteCurrentDuplicates() error { +// panic("implement me") +//} +// +//func (c *cursorSnapshotDupsort) AppendDup(key, value []byte) error { +// panic("implement me") +//} diff --git a/ethdb/kv_snapshot_test.go b/ethdb/kv_snapshot_test.go index 599bfd751..533973ce9 100644 --- a/ethdb/kv_snapshot_test.go +++ b/ethdb/kv_snapshot_test.go @@ -3,12 +3,332 @@ package ethdb import ( "bytes" "context" + "fmt" "github.com/ledgerwatch/turbo-geth/common" "github.com/ledgerwatch/turbo-geth/common/dbutils" "testing" ) -func TestSnapshotGet(t *testing.T) { +//func TestSnapshotGet(t *testing.T) { +// sn1 := NewLMDB().WithBucketsConfig(func(defaultBuckets dbutils.BucketsCfg) dbutils.BucketsCfg { +// return dbutils.BucketsCfg{ +// dbutils.HeaderPrefix: dbutils.BucketConfigItem{}, +// } +// }).InMem().MustOpen() +// err := sn1.Update(context.Background(), func(tx Tx) error { +// bucket := tx.Cursor(dbutils.HeaderPrefix) +// innerErr := bucket.Put(dbutils.HeaderKey(1, common.Hash{1}), []byte{1}) +// if innerErr != nil { +// return innerErr +// } +// innerErr = bucket.Put(dbutils.HeaderKey(2, common.Hash{2}), []byte{2}) +// if innerErr != nil { +// return innerErr +// } +// +// return nil +// }) +// if err != nil { +// t.Fatal(err) +// } +// +// sn2 := NewLMDB().WithBucketsConfig(func(defaultBuckets dbutils.BucketsCfg) dbutils.BucketsCfg { +// return dbutils.BucketsCfg{ +// dbutils.BlockBodyPrefix: dbutils.BucketConfigItem{}, +// } +// }).InMem().MustOpen() +// err = sn2.Update(context.Background(), func(tx Tx) error { +// bucket := tx.Cursor(dbutils.BlockBodyPrefix) +// innerErr := bucket.Put(dbutils.BlockBodyKey(1, common.Hash{1}), []byte{1}) +// if innerErr != nil { +// return innerErr +// } +// innerErr = bucket.Put(dbutils.BlockBodyKey(2, common.Hash{2}), []byte{2}) +// if innerErr != nil { +// return innerErr +// } +// +// return nil +// }) +// if err != nil { +// t.Fatal(err) +// } +// +// mainDB := NewLMDB().InMem().MustOpen() +// err = mainDB.Update(context.Background(), func(tx Tx) error { +// bucket := tx.Cursor(dbutils.HeaderPrefix) +// innerErr := bucket.Put(dbutils.HeaderKey(2, common.Hash{2}), []byte{22}) +// if innerErr != nil { +// return innerErr +// } +// innerErr = bucket.Put(dbutils.HeaderKey(3, common.Hash{3}), []byte{33}) +// if innerErr != nil { +// return innerErr +// } +// +// bucket = tx.Cursor(dbutils.BlockBodyPrefix) +// innerErr = bucket.Put(dbutils.BlockBodyKey(2, common.Hash{2}), []byte{22}) +// if innerErr != nil { +// return innerErr +// } +// innerErr = bucket.Put(dbutils.BlockBodyKey(3, common.Hash{3}), []byte{33}) +// if innerErr != nil { +// return innerErr +// } +// +// return nil +// }) +// if err != nil { +// t.Fatal(err) +// } +// +// kv := NewSnapshotKV().For(dbutils.HeaderPrefix).SnapshotDB(sn1).DB(mainDB).MustOpen() +// kv = NewSnapshotKV().For(dbutils.BlockBodyPrefix).SnapshotDB(sn2).DB(kv).MustOpen() +// +// tx, err := kv.Begin(context.Background(), nil, RO) +// if err != nil { +// t.Fatal(err) +// } +// +// v, err := tx.GetOne(dbutils.HeaderPrefix, dbutils.HeaderKey(1, common.Hash{1})) +// if err != nil { +// t.Fatal(err) +// } +// if !bytes.Equal(v, []byte{1}) { +// t.Fatal(v) +// } +// +// v, err = tx.GetOne(dbutils.HeaderPrefix, dbutils.HeaderKey(2, common.Hash{2})) +// if err != nil { +// t.Fatal(err) +// } +// if !bytes.Equal(v, []byte{22}) { +// t.Fatal(v) +// } +// +// v, err = tx.GetOne(dbutils.HeaderPrefix, dbutils.HeaderKey(3, common.Hash{3})) +// if err != nil { +// t.Fatal(err) +// } +// if !bytes.Equal(v, []byte{33}) { +// t.Fatal(v) +// } +// +// v, err = tx.GetOne(dbutils.BlockBodyPrefix, dbutils.BlockBodyKey(1, common.Hash{1})) +// if err != nil { +// t.Fatal(err) +// } +// if !bytes.Equal(v, []byte{1}) { +// t.Fatal(v) +// } +// +// v, err = tx.GetOne(dbutils.BlockBodyPrefix, dbutils.BlockBodyKey(2, common.Hash{2})) +// if err != nil { +// t.Fatal(err) +// } +// if !bytes.Equal(v, []byte{22}) { +// t.Fatal(v) +// } +// +// v, err = tx.GetOne(dbutils.BlockBodyPrefix, dbutils.BlockBodyKey(3, common.Hash{3})) +// if err != nil { +// t.Fatal(err) +// } +// if !bytes.Equal(v, []byte{33}) { +// t.Fatal(v) +// } +// +// headerCursor := tx.Cursor(dbutils.HeaderPrefix) +// k, v, err := headerCursor.Last() +// if err != nil { +// t.Fatal(err) +// } +// if !(bytes.Equal(dbutils.HeaderKey(3, common.Hash{3}), k) && bytes.Equal(v, []byte{33})) { +// t.Fatal(k, v) +// } +// k, v, err = headerCursor.First() +// if err != nil { +// t.Fatal(err) +// } +// if !(bytes.Equal(dbutils.HeaderKey(1, common.Hash{1}), k) && bytes.Equal(v, []byte{1})) { +// t.Fatal(k, v) +// } +// +// k, v, err = headerCursor.Next() +// if err != nil { +// t.Fatal(err) +// } +// +// if !(bytes.Equal(dbutils.HeaderKey(2, common.Hash{2}), k) && bytes.Equal(v, []byte{22})) { +// t.Fatal(k, v) +// } +// +// k, v, err = headerCursor.Next() +// if err != nil { +// t.Fatal(err) +// } +// +// if !(bytes.Equal(dbutils.HeaderKey(3, common.Hash{3}), k) && bytes.Equal(v, []byte{33})) { +// t.Fatal(k, v) +// } +// +// k, v, err = headerCursor.Next() +// if err != nil { +// t.Fatal(err) +// } +// +// if !(bytes.Equal([]byte{}, k) && bytes.Equal(v, []byte{})) { +// t.Fatal(k, v) +// } +//} +// +//func TestSnapshotWritableTxAndGet(t *testing.T) { +// sn1 := NewLMDB().WithBucketsConfig(func(defaultBuckets dbutils.BucketsCfg) dbutils.BucketsCfg { +// return dbutils.BucketsCfg{ +// dbutils.HeaderPrefix: dbutils.BucketConfigItem{}, +// } +// }).InMem().MustOpen() +// err := sn1.Update(context.Background(), func(tx Tx) error { +// bucket := tx.Cursor(dbutils.HeaderPrefix) +// innerErr := bucket.Put(dbutils.HeaderKey(1, common.Hash{1}), []byte{1}) +// if innerErr != nil { +// return innerErr +// } +// innerErr = bucket.Put(dbutils.HeaderKey(2, common.Hash{2}), []byte{2}) +// if innerErr != nil { +// return innerErr +// } +// +// return nil +// }) +// if err != nil { +// t.Fatal(err) +// } +// +// sn2 := NewLMDB().WithBucketsConfig(func(defaultBuckets dbutils.BucketsCfg) dbutils.BucketsCfg { +// return dbutils.BucketsCfg{ +// dbutils.BlockBodyPrefix: dbutils.BucketConfigItem{}, +// } +// }).InMem().MustOpen() +// err = sn2.Update(context.Background(), func(tx Tx) error { +// bucket := tx.Cursor(dbutils.BlockBodyPrefix) +// innerErr := bucket.Put(dbutils.BlockBodyKey(1, common.Hash{1}), []byte{1}) +// if innerErr != nil { +// return innerErr +// } +// innerErr = bucket.Put(dbutils.BlockBodyKey(2, common.Hash{2}), []byte{2}) +// if innerErr != nil { +// return innerErr +// } +// +// return nil +// }) +// if err != nil { +// t.Fatal(err) +// } +// +// mainDB := NewLMDB().InMem().MustOpen() +// +// kv := NewSnapshotKV().For(dbutils.HeaderPrefix).SnapshotDB(sn1).DB(mainDB).MustOpen() +// kv = NewSnapshotKV().For(dbutils.BlockBodyPrefix).SnapshotDB(sn2).DB(kv).MustOpen() +// +// tx, err := kv.Begin(context.Background(), nil, RW) +// if err != nil { +// t.Fatal(err) +// } +// +// v, err := tx.GetOne(dbutils.HeaderPrefix, dbutils.HeaderKey(1, common.Hash{1})) +// if err != nil { +// t.Fatal(err) +// } +// if !bytes.Equal(v, []byte{1}) { +// t.Fatal(v) +// } +// +// v, err = tx.GetOne(dbutils.BlockBodyPrefix, dbutils.BlockBodyKey(1, common.Hash{1})) +// if err != nil { +// t.Fatal(err) +// } +// if !bytes.Equal(v, []byte{1}) { +// t.Fatal(v) +// } +// +// err = tx.Cursor(dbutils.BlockBodyPrefix).Put(dbutils.BlockBodyKey(4, common.Hash{4}), []byte{4}) +// if err != nil { +// t.Fatal(err) +// } +// err = tx.Cursor(dbutils.HeaderPrefix).Put(dbutils.HeaderKey(4, common.Hash{4}), []byte{4}) +// if err != nil { +// t.Fatal(err) +// } +// err = tx.Commit(context.Background()) +// if err != nil { +// t.Fatal(err) +// } +// tx, err = kv.Begin(context.Background(), nil, RO) +// if err != nil { +// t.Fatal(err) +// } +// c := tx.Cursor(dbutils.HeaderPrefix) +// k, v, err := c.First() +// if err != nil { +// t.Fatal(err) +// } +// if !bytes.Equal(k, dbutils.HeaderKey(1, common.Hash{1})) { +// t.Fatal(k, v) +// } +// +// k, v, err = c.Next() +// if err != nil { +// t.Fatal(err) +// } +// if !bytes.Equal(k, dbutils.HeaderKey(2, common.Hash{2})) { +// t.Fatal() +// } +// +// k, v, err = c.Next() +// if err != nil { +// t.Fatal(err) +// } +// if !bytes.Equal(k, dbutils.HeaderKey(4, common.Hash{4})) { +// t.Fatal() +// } +// k, v, err = c.Next() +// if k != nil || v != nil || err != nil { +// t.Fatal(k, v, err) +// } +// +// c = tx.Cursor(dbutils.BlockBodyPrefix) +// k, v, err = c.First() +// if err != nil { +// t.Fatal(err) +// } +// if !bytes.Equal(k, dbutils.BlockBodyKey(1, common.Hash{1})) { +// t.Fatal(k, v) +// } +// +// k, v, err = c.Next() +// if err != nil { +// t.Fatal(err) +// } +// if !bytes.Equal(k, dbutils.BlockBodyKey(2, common.Hash{2})) { +// t.Fatal() +// } +// +// k, v, err = c.Next() +// if err != nil { +// t.Fatal(err) +// } +// if !bytes.Equal(k, dbutils.BlockBodyKey(4, common.Hash{4})) { +// t.Fatal() +// } +// k, v, err = c.Next() +// if k != nil || v != nil || err != nil { +// t.Fatal(k, v, err) +// } +//} + +func TestSnapshot2Get(t *testing.T) { sn1 := NewLMDB().WithBucketsConfig(func(defaultBuckets dbutils.BucketsCfg) dbutils.BucketsCfg { return dbutils.BucketsCfg{ dbutils.HeaderPrefix: dbutils.BucketConfigItem{}, @@ -81,8 +401,8 @@ func TestSnapshotGet(t *testing.T) { t.Fatal(err) } - kv := NewSnapshotKV().For(dbutils.HeaderPrefix).SnapshotDB(sn1).DB(mainDB).MustOpen() - kv = NewSnapshotKV().For(dbutils.BlockBodyPrefix).SnapshotDB(sn2).DB(kv).MustOpen() + kv := NewSnapshot2KV().DB(mainDB).SnapshotDB([]string{dbutils.HeaderPrefix}, sn1). + SnapshotDB([]string{dbutils.BlockBodyPrefix}, sn2).MustOpen() tx, err := kv.Begin(context.Background(), nil, RO) if err != nil { @@ -181,7 +501,7 @@ func TestSnapshotGet(t *testing.T) { } } -func TestSnapshotWritableTxAndGet(t *testing.T) { +func TestSnapshot2WritableTxAndGet(t *testing.T) { sn1 := NewLMDB().WithBucketsConfig(func(defaultBuckets dbutils.BucketsCfg) dbutils.BucketsCfg { return dbutils.BucketsCfg{ dbutils.HeaderPrefix: dbutils.BucketConfigItem{}, @@ -228,9 +548,8 @@ func TestSnapshotWritableTxAndGet(t *testing.T) { mainDB := NewLMDB().InMem().MustOpen() - kv := NewSnapshotKV().For(dbutils.HeaderPrefix).SnapshotDB(sn1).DB(mainDB).MustOpen() - kv = NewSnapshotKV().For(dbutils.BlockBodyPrefix).SnapshotDB(sn2).DB(kv).MustOpen() - + kv := NewSnapshot2KV().DB(mainDB).SnapshotDB([]string{dbutils.HeaderPrefix}, sn1). + SnapshotDB([]string{dbutils.BlockBodyPrefix}, sn2).MustOpen() tx, err := kv.Begin(context.Background(), nil, RW) if err != nil { t.Fatal(err) @@ -252,11 +571,11 @@ func TestSnapshotWritableTxAndGet(t *testing.T) { t.Fatal(v) } - err = tx.Cursor(dbutils.BlockBodyPrefix).Put(dbutils.BlockBodyKey(4, common.Hash{4}), []byte{2}) + err = tx.Cursor(dbutils.BlockBodyPrefix).Put(dbutils.BlockBodyKey(4, common.Hash{4}), []byte{4}) if err != nil { t.Fatal(err) } - err = tx.Cursor(dbutils.HeaderPrefix).Put(dbutils.HeaderKey(4, common.Hash{4}), []byte{2}) + err = tx.Cursor(dbutils.HeaderPrefix).Put(dbutils.HeaderKey(4, common.Hash{4}), []byte{4}) if err != nil { t.Fatal(err) } @@ -269,10 +588,732 @@ func TestSnapshotWritableTxAndGet(t *testing.T) { t.Fatal(err) } c := tx.Cursor(dbutils.HeaderPrefix) - t.Log(c.First()) - t.Log(c.Next()) - t.Log(c.Next()) - t.Log(c.Next()) - t.Log(c.Next()) - t.Log(c.Next()) + k, v, err := c.First() + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(k, dbutils.HeaderKey(1, common.Hash{1})) { + t.Fatal(k, v) + } + + k, v, err = c.Next() + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(k, dbutils.HeaderKey(2, common.Hash{2})) { + t.Fatal(common.Bytes2Hex(k)) + } + if !bytes.Equal(v, []byte{2}) { + t.Fatal(common.Bytes2Hex(k)) + } + + k, v, err = c.Next() + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(k, dbutils.HeaderKey(4, common.Hash{4})) { + t.Fatal("invalid key", common.Bytes2Hex(k)) + } + if !bytes.Equal(v, []byte{4}) { + t.Fatal(common.Bytes2Hex(k), common.Bytes2Hex(v)) + } + + k, v, err = c.Next() + if k != nil || v != nil || err != nil { + t.Fatal(k, v, err) + } + + c = tx.Cursor(dbutils.BlockBodyPrefix) + k, v, err = c.First() + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(k, dbutils.BlockBodyKey(1, common.Hash{1})) { + t.Fatal(k, v) + } + + k, v, err = c.Next() + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(k, dbutils.BlockBodyKey(2, common.Hash{2})) { + t.Fatal() + } + if !bytes.Equal(v, []byte{2}) { + t.Fatal(common.Bytes2Hex(k), common.Bytes2Hex(v)) + } + + k, v, err = c.Next() + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(k, dbutils.BlockBodyKey(4, common.Hash{4})) { + t.Fatal() + } + if !bytes.Equal(v, []byte{4}) { + t.Fatal(common.Bytes2Hex(k), common.Bytes2Hex(v)) + } + + k, v, err = c.Next() + if k != nil || v != nil || err != nil { + t.Fatal(k, v, err) + } +} + +func TestSnapshot2WritableTxWalkReplaceAndCreateNewKey(t *testing.T) { + data := []KvData{} + for i := 1; i < 3; i++ { + for j := 1; j < 3; j++ { + data = append(data, KvData{ + K: dbutils.PlainGenerateCompositeStorageKey([]byte{uint8(i) * 2}, 1, []byte{uint8(j) * 2}), + V: []byte{uint8(i) * 2, uint8(j) * 2}, + }) + } + } + snapshotDB, err := GenStateData(data) + if err != nil { + t.Fatal(err) + } + mainDB := NewLMDB().InMem().MustOpen() + + kv := NewSnapshot2KV().DB(mainDB).SnapshotDB([]string{dbutils.PlainStateBucket}, snapshotDB). + MustOpen() + + tx, err := kv.Begin(context.Background(), nil, RW) + if err != nil { + t.Fatal(err) + } + + c := tx.Cursor(dbutils.PlainStateBucket) + replaceKey := dbutils.PlainGenerateCompositeStorageKey([]byte{2}, 1, []byte{4}) + replaceValue := []byte{2, 4, 4} + newKey := dbutils.PlainGenerateCompositeStorageKey([]byte{2}, 1, []byte{5}) + newValue := []byte{2, 5} + + //get first correct k&v + k, v, err := c.First() + if err != nil { + t.Fatal(err) + } + checkKV(t, k, v, data[0].K, data[0].V) + if !(bytes.Equal(k, data[0].K) || bytes.Equal(v, data[0].V)) { + t.Fatal(k, data[0].K, v, data[0].V) + } + err = c.Put(replaceKey, replaceValue) + if err != nil { + t.Fatal(err) + } + + // check the key that we've replaced value + k, v, err = c.Next() + if err != nil { + t.Fatal(err) + } + checkKV(t, k, v, replaceKey, replaceValue) + + err = c.Put(newKey, newValue) + if err != nil { + t.Fatal(err) + } + // check the key that we've inserted + k, v, err = c.Next() + if err != nil { + t.Fatal(err) + } + checkKV(t, k, v, newKey, newValue) + + //check the rest keys + k, v, err = c.Next() + if err != nil { + t.Fatal(err) + } + checkKV(t, k, v, data[2].K, data[2].V) +} + +func TestSnapshot2WritableTxWalkAndDeleteKey(t *testing.T) { + data := []KvData{ + {K: []byte{1}, V: []byte{1}}, + {K: []byte{2}, V: []byte{2}}, + {K: []byte{3}, V: []byte{3}}, + {K: []byte{4}, V: []byte{4}}, + {K: []byte{5}, V: []byte{5}}, + } + snapshotDB, err := GenStateData(data) + if err != nil { + t.Fatal(err) + } + + mainDB := NewLMDB().InMem().MustOpen() + kv := NewSnapshot2KV().DB(mainDB).SnapshotDB([]string{dbutils.PlainStateBucket}, snapshotDB). + MustOpen() + + tx, err := kv.Begin(context.Background(), nil, RW) + if err != nil { + t.Fatal(err) + } + + c := tx.Cursor(dbutils.PlainStateBucket) + deleteCursor := tx.Cursor(dbutils.PlainStateBucket) + + //get first correct k&v + k, v, err := c.First() + if err != nil { + t.Fatal(err) + } + checkKV(t, k, v, data[0].K, data[0].V) + + //remove value + err = deleteCursor.Delete(data[1].K, nil) + if err != nil { + t.Fatal(err) + } + err = deleteCursor.Delete(data[2].K, nil) + if err != nil { + t.Fatal(err) + } + err = deleteCursor.Delete(data[4].K, nil) + if err != nil { + t.Fatal(err) + } + + // check the key that we've replaced value + k, v, err = c.Next() + if err != nil { + t.Fatal(err) + } + checkKV(t, k, v, data[3].K, data[3].V) + + k, v, err = c.Next() + if err != nil { + t.Fatal(err) + } + checkKV(t, k, v, nil, nil) + + //2,3,5 removed. Current 4. Prev - + k, v, err = c.Prev() + if err != nil { + t.Fatal(err) + } + checkKV(t, k, v, data[0].K, data[0].V) + + k, v, err = c.Prev() + if err != nil { + t.Fatal(err) + } + checkKV(t, k, v, nil, nil) +} + +func TestSnapshot2WritableTxNextAndPrevAndDeleteKey(t *testing.T) { + data := []KvData{ + {K: []byte{1}, V: []byte{1}}, //to remove + {K: []byte{2}, V: []byte{2}}, + {K: []byte{3}, V: []byte{3}}, + {K: []byte{4}, V: []byte{4}}, //to remove + {K: []byte{5}, V: []byte{5}}, + } + snapshotDB, err := GenStateData(data) + if err != nil { + t.Fatal(err) + } + + mainDB := NewLMDB().InMem().MustOpen() + kv := NewSnapshot2KV().DB(mainDB).SnapshotDB([]string{dbutils.PlainStateBucket}, snapshotDB). + MustOpen() + + tx, err := kv.Begin(context.Background(), nil, RW) + if err != nil { + t.Fatal(err) + } + + c := tx.Cursor(dbutils.PlainStateBucket) + deleteCursor := tx.Cursor(dbutils.PlainStateBucket) + + //get first correct k&v + k, v, err := c.Last() + if err != nil { + t.Fatal(err) + } + checkKV(t, k, v, data[len(data)-1].K, data[len(data)-1].V) + + for i := len(data) - 2; i >= 0; i-- { + k, v, err = c.Prev() + if err != nil { + t.Fatal(i, err) + } + checkKV(t, k, v, data[i].K, data[i].V) + + k, v, err = c.Current() + if err != nil { + t.Fatal(i, err) + } + checkKV(t, k, v, data[i].K, data[i].V) + } + + k, v, err = c.Last() + if err != nil { + t.Fatal(err) + } + checkKV(t, k, v, data[4].K, data[4].V) + + //remove 4. Current on 5 + err = deleteCursor.Delete(data[3].K, nil) + if err != nil { + t.Fatal(err) + } + + //cursor on 3 after it + k, v, err = c.Prev() + if err != nil { + t.Fatal(err) + } + checkKV(t, k, v, data[2].K, data[2].V) + + err = deleteCursor.Delete(data[0].K, nil) + if err != nil { + t.Fatal(err) + } + + k, v, err = c.Prev() + if err != nil { + t.Fatal(err) + } + checkKV(t, k, v, data[1].K, data[1].V) + + k, v, err = c.Prev() + if err != nil { + t.Fatal(err) + } + checkKV(t, k, v, nil, nil) + +} +func TestSnapshot2WritableTxWalkLastElementIsSnapshot(t *testing.T) { + snapshotData := []KvData{ + { + K: []byte{0, 1}, + V: []byte{1}, + }, + { + K: []byte{0, 4}, + V: []byte{4}, + }, + } + replacedValue := []byte{1, 1} + mainData := []KvData{ + { + K: []byte{0, 1}, + V: replacedValue, + }, + { + K: []byte{0, 2}, + V: []byte{2}, + }, + { + K: []byte{0, 3}, + V: []byte{3}, + }, + } + snapshotDB, err := GenStateData(snapshotData) + if err != nil { + t.Fatal(err) + } + mainDB, err := GenStateData(mainData) + if err != nil { + t.Fatal(err) + } + + kv := NewSnapshot2KV().DB(mainDB).SnapshotDB([]string{dbutils.PlainStateBucket}, snapshotDB). + MustOpen() + + tx, err := kv.Begin(context.Background(), nil, RW) + if err != nil { + t.Fatal(err) + } + + c := tx.Cursor(dbutils.PlainStateBucket) + //get first correct k&v + k, v, err := c.First() + if err != nil { + t.Fatal(err) + } + + checkKV(t, k, v, mainData[0].K, mainData[0].V) + + k, v, err = c.Next() + if err != nil { + t.Fatal(err) + } + checkKV(t, k, v, mainData[1].K, mainData[1].V) + + k, v, err = c.Next() + if err != nil { + t.Fatal(err) + } + checkKV(t, k, v, mainData[2].K, mainData[2].V) + + k, v, err = c.Next() + if err != nil { + t.Fatal(err) + } + checkKV(t, k, v, snapshotData[1].K, snapshotData[1].V) + + k, v, err = c.Next() + if err != nil { + t.Fatal(err) + } + checkKV(t, k, v, nil, nil) +} + +func TestSnapshot2WritableTxWalkForwardAndBackward(t *testing.T) { + snapshotData := []KvData{ + { + K: []byte{0, 1}, + V: []byte{1}, + }, + { + K: []byte{0, 4}, + V: []byte{4}, + }, + } + replacedValue := []byte{1, 1} + mainData := []KvData{ + { + K: []byte{0, 1}, + V: replacedValue, + }, + { + K: []byte{0, 2}, + V: []byte{2}, + }, + { + K: []byte{0, 3}, + V: []byte{3}, + }, + } + data := []KvData{ + mainData[0], + mainData[1], + mainData[2], + snapshotData[1], + } + snapshotDB, err := GenStateData(snapshotData) + if err != nil { + t.Fatal(err) + } + mainDB, err := GenStateData(mainData) + if err != nil { + t.Fatal(err) + } + + kv := NewSnapshot2KV().DB(mainDB).SnapshotDB([]string{dbutils.PlainStateBucket}, snapshotDB). + MustOpen() + + tx, err := kv.Begin(context.Background(), nil, RW) + if err != nil { + t.Fatal(err) + } + + c := tx.Cursor(dbutils.PlainStateBucket) + //get first correct k&v + k, v, err := c.First() + if err != nil { + t.Fatal(err) + } + checkKV(t, k, v, data[0].K, data[0].V) + + for i := 1; i < len(data); i++ { + k, v, err = c.Next() + if err != nil { + t.Fatal(err) + } + checkKV(t, k, v, data[i].K, data[i].V) + + k, v, err = c.Current() + if err != nil { + t.Fatal(err) + } + checkKV(t, k, v, data[i].K, data[i].V) + } + + for i := len(data) - 2; i > 0; i-- { + k, v, err = c.Prev() + if err != nil { + t.Fatal(err) + } + checkKV(t, k, v, data[i].K, data[i].V) + + k, v, err = c.Current() + if err != nil { + t.Fatal(err) + } + checkKV(t, k, v, data[i].K, data[i].V) + } + + k, v, err = c.Last() + if err != nil { + t.Fatal(err) + } + checkKV(t, k, v, data[len(data)-1].K, data[len(data)-1].V) + k, v, err = c.Current() + if err != nil { + t.Fatal(err) + } + + checkKV(t, k, v, data[len(data)-1].K, data[len(data)-1].V) + + for i := len(data) - 2; i > 0; i-- { + k, v, err = c.Prev() + if err != nil { + t.Fatal(err) + } + checkKV(t, k, v, data[i].K, data[i].V) + + k, v, err = c.Current() + if err != nil { + t.Fatal(err) + } + checkKV(t, k, v, data[i].K, data[i].V) + } + + i := 0 + err = Walk(c, []byte{}, 0, func(k, v []byte) (bool, error) { + fmt.Println(common.Bytes2Hex(k), " => ", common.Bytes2Hex(v)) + checkKV(t, k, v, data[i].K, data[i].V) + i++ + return true, nil + }) + if err != nil { + t.Fatal(err) + } +} + +func TestSnapshot2WalkByEmptyDB(t *testing.T) { + data := []KvData{ + {K: []byte{1}, V: []byte{1}}, + {K: []byte{2}, V: []byte{2}}, + {K: []byte{3}, V: []byte{3}}, + {K: []byte{4}, V: []byte{4}}, + {K: []byte{5}, V: []byte{5}}, + } + snapshotDB, err := GenStateData(data) + if err != nil { + t.Fatal(err) + } + + mainDB := NewLMDB().InMem().MustOpen() + kv := NewSnapshot2KV().DB(mainDB).SnapshotDB([]string{dbutils.PlainStateBucket}, snapshotDB). + MustOpen() + + tx, err := kv.Begin(context.Background(), nil, RW) + if err != nil { + t.Fatal(err) + } + + c := tx.Cursor(dbutils.PlainStateBucket) + i := 0 + err = Walk(c, []byte{}, 0, func(k, v []byte) (bool, error) { + fmt.Println(common.Bytes2Hex(k), " => ", common.Bytes2Hex(v)) + checkKV(t, k, v, data[i].K, data[i].V) + i++ + return true, nil + }) + if err != nil { + t.Fatal(err) + } + +} + +func TestSnapshot2WritablePrevAndDeleteKey(t *testing.T) { + data := []KvData{ + {K: []byte{1}, V: []byte{1}}, + {K: []byte{2}, V: []byte{2}}, + {K: []byte{3}, V: []byte{3}}, + {K: []byte{4}, V: []byte{4}}, + {K: []byte{5}, V: []byte{5}}, + } + snapshotDB, err := GenStateData(data) + if err != nil { + t.Fatal(err) + } + + mainDB := NewLMDB().InMem().MustOpen() + kv := NewSnapshot2KV().DB(mainDB).SnapshotDB([]string{dbutils.PlainStateBucket}, snapshotDB). + MustOpen() + + tx, err := kv.Begin(context.Background(), nil, RW) + if err != nil { + t.Fatal(err) + } + + c := tx.Cursor(dbutils.PlainStateBucket) + + //get first correct k&v + k, v, err := c.First() + if err != nil { + printBucket(kv, dbutils.PlainStateBucket) + t.Fatal(err) + } + checkKV(t, k, v, data[0].K, data[0].V) + + for i := 1; i < len(data); i++ { + k, v, err = c.Next() + if err != nil { + t.Fatal(err) + } + checkKV(t, k, v, data[i].K, data[i].V) + + k, v, err = c.Current() + if err != nil { + t.Fatal(err) + } + checkKV(t, k, v, data[i].K, data[i].V) + } + + // check the key that we've replaced value + k, v, err = c.Next() + if err != nil { + t.Fatal(err) + } + checkKV(t, k, v, nil, nil) + + for i := len(data) - 2; i >= 0; i-- { + k, v, err = c.Prev() + if err != nil { + t.Fatal(err) + } + checkKV(t, k, v, data[i].K, data[i].V) + + k, v, err = c.Current() + if err != nil { + t.Fatal(err) + } + checkKV(t, k, v, data[i].K, data[i].V) + } +} + +func TestSnapshot2WritableTxNextAndPrevWithDeleteAndPutKeys(t *testing.T) { + data := []KvData{ + {K: []byte{1}, V: []byte{1}}, + {K: []byte{2}, V: []byte{2}}, + {K: []byte{3}, V: []byte{3}}, + {K: []byte{4}, V: []byte{4}}, + {K: []byte{5}, V: []byte{5}}, + } + snapshotDB, err := GenStateData(data) + if err != nil { + t.Fatal(err) + } + + mainDB := NewLMDB().InMem().MustOpen() + kv := NewSnapshot2KV().DB(mainDB).SnapshotDB([]string{dbutils.PlainStateBucket}, snapshotDB). + MustOpen() + + tx, err := kv.Begin(context.Background(), nil, RW) + if err != nil { + t.Fatal(err) + } + + c := tx.Cursor(dbutils.PlainStateBucket) + deleteCursor := tx.Cursor(dbutils.PlainStateBucket) + + //get first correct k&v + k, v, err := c.First() + if err != nil { + t.Fatal(err) + } + checkKV(t, k, v, data[0].K, data[0].V) + + k, v, err = c.Next() + if err != nil { + t.Fatal(err) + } + checkKV(t, k, v, data[1].K, data[1].V) + + err = deleteCursor.Delete(data[2].K, nil) + if err != nil { + t.Fatal(err) + } + + k, v, err = c.Next() + if err != nil { + t.Fatal(err) + } + checkKV(t, k, v, data[3].K, data[3].V) + + k, v, err = c.Prev() + if err != nil { + t.Fatal(err) + } + checkKV(t, k, v, data[1].K, data[1].V) + + err = deleteCursor.Put(data[2].K, data[2].V) + if err != nil { + t.Fatal(err) + } + + k, v, err = c.Next() + if err != nil { + t.Fatal(err) + } + checkKV(t, k, v, data[2].K, data[2].V) + + k, v, err = c.Next() + if err != nil { + t.Fatal(err) + } + checkKV(t, k, v, data[3].K, data[3].V) + + err = deleteCursor.Delete(data[2].K, nil) + if err != nil { + t.Fatal(err) + } + + k, v, err = c.Prev() + if err != nil { + t.Fatal(err) + } + checkKV(t, k, v, data[1].K, data[1].V) + + k, v, err = c.Prev() + if err != nil { + t.Fatal(err) + } + checkKV(t, k, v, data[0].K, data[0].V) + +} + +func printBucket(kv KV, bucket string) { + fmt.Println("+Print bucket", bucket) + defer func() { + fmt.Println("-Print bucket", bucket) + }() + err := kv.View(context.Background(), func(tx Tx) error { + c := tx.Cursor(bucket) + k, v, err := c.First() + if err != nil { + panic(fmt.Errorf("first err: %w", err)) + } + for k != nil && v != nil { + fmt.Println("k:=", common.Bytes2Hex(k), "v:=", common.Bytes2Hex(v)) + k, v, err = c.Next() + if err != nil { + panic(fmt.Errorf("next err: %w", err)) + } + } + return nil + }) + fmt.Println("Print err", err) +} + +func checkKV(t *testing.T, key, val, expectedKey, expectedVal []byte) { + t.Helper() + if !bytes.Equal(key, expectedKey) { + t.Log("+", common.Bytes2Hex(expectedKey)) + t.Log("-", common.Bytes2Hex(key)) + t.Fatal("wrong key") + } + if !bytes.Equal(val, expectedVal) { + t.Log("+", common.Bytes2Hex(expectedVal)) + t.Log("-", common.Bytes2Hex(val)) + t.Fatal("wrong value for key", common.Bytes2Hex(key)) + } } diff --git a/turbo/snapshotsync/bittorrent/const.go b/turbo/snapshotsync/bittorrent/const.go index e4d7bf8a0..ab5f2ee96 100644 --- a/turbo/snapshotsync/bittorrent/const.go +++ b/turbo/snapshotsync/bittorrent/const.go @@ -13,9 +13,9 @@ const ( SnapshotBlock = 11_000_000 LmdbFilename = "data.mdb" - HeadersSnapshotHash = "7727174de470b7fe0bb3e36d35e85cc48853d470" //11кk block 1mb chunk - BlocksSnapshotHash = "0546b881c50de9984dd8865d0f18cc5153e4c21b" //11кk block 1mb chunk - StateSnapshotHash = "" + HeadersSnapshotHash = "460da4ffbc2b77f6662a8a7c15e21f4c5981656d" //11кk block 1mb chunk + BlocksSnapshotHash = "6353d013d614f1f8145d71e1479de9b4361d273f" //11кk block 1mb chunk + StateSnapshotHash = "fed1ef2b4d2cd8ea32eda24559b4d7eedaeb1b78" ReceiptsSnapshotHash = "" SnapshotInfoHashPrefix = "ih" @@ -27,6 +27,7 @@ var ( params.MainnetChainConfig.ChainID.Uint64(): { snapshotsync.SnapshotType_headers: metainfo.NewHashFromHex(HeadersSnapshotHash), snapshotsync.SnapshotType_bodies: metainfo.NewHashFromHex(BlocksSnapshotHash), + snapshotsync.SnapshotType_state: metainfo.NewHashFromHex(StateSnapshotHash), }, } ErrInvalidSnapshot = errors.New("this snapshot for this chainID not supported ") diff --git a/turbo/snapshotsync/bittorrent/downloader.go b/turbo/snapshotsync/bittorrent/downloader.go index beae88a4f..a9af5cdbf 100644 --- a/turbo/snapshotsync/bittorrent/downloader.go +++ b/turbo/snapshotsync/bittorrent/downloader.go @@ -6,6 +6,7 @@ import ( "encoding/binary" "errors" "fmt" + "path/filepath" "time" lg "github.com/anacrolix/log" @@ -218,6 +219,53 @@ func (cli *Client) Download() { } } +func (cli *Client) GetSnapshots(db ethdb.Database, networkID uint64) (map[snapshotsync.SnapshotType]*snapshotsync.SnapshotsInfo, error) { + mp := make(map[snapshotsync.SnapshotType]*snapshotsync.SnapshotsInfo) + networkIDBytes := make([]byte, 8) + binary.BigEndian.PutUint64(networkIDBytes, networkID) + err := db.Walk(dbutils.SnapshotInfoBucket, append(networkIDBytes, []byte(SnapshotInfoHashPrefix)...), 8*8+16, func(k, v []byte) (bool, error) { + var hash metainfo.Hash + if len(v) != metainfo.HashSize { + return true, nil + } + copy(hash[:], v) + t, ok := cli.Cli.Torrent(hash) + if !ok { + return true, nil + } + + var gotInfo bool + readiness := int32(0) + select { + case <-t.GotInfo(): + gotInfo = true + readiness = int32(100 * (float64(t.BytesCompleted()) / float64(t.Info().TotalLength()))) + default: + } + + _, tpStr := ParseInfoHashKey(k) + tp, ok := snapshotsync.SnapshotType_value[tpStr] + if !ok { + return false, fmt.Errorf("incorrect type: %v", tpStr) + } + + val := &snapshotsync.SnapshotsInfo{ + Type: snapshotsync.SnapshotType(tp), + GotInfoByte: gotInfo, + Readiness: readiness, + SnapshotBlock: SnapshotBlock, + Dbpath: filepath.Join(cli.snapshotsDir, t.Files()[0].Path()), + } + mp[snapshotsync.SnapshotType(tp)] = val + return true, nil + }) + if err != nil { + return nil, err + } + + return mp, nil +} + func getTorrentSpec(db ethdb.Database, snapshotName string, networkID uint64) ([]byte, []byte, error) { var infohash, infobytes []byte var err error diff --git a/turbo/snapshotsync/bittorrent/server.go b/turbo/snapshotsync/bittorrent/server.go index 49f04e632..d44900012 100644 --- a/turbo/snapshotsync/bittorrent/server.go +++ b/turbo/snapshotsync/bittorrent/server.go @@ -2,14 +2,8 @@ package bittorrent import ( "context" - "encoding/binary" "errors" - "fmt" - "path/filepath" - - "github.com/anacrolix/torrent/metainfo" "github.com/golang/protobuf/ptypes/empty" - "github.com/ledgerwatch/turbo-geth/common/dbutils" "github.com/ledgerwatch/turbo-geth/ethdb" "github.com/ledgerwatch/turbo-geth/turbo/snapshotsync" ) @@ -52,51 +46,12 @@ func (S *SNDownloaderServer) Load() error { func (S *SNDownloaderServer) Snapshots(ctx context.Context, request *snapshotsync.SnapshotsRequest) (*snapshotsync.SnapshotsInfoReply, error) { reply := snapshotsync.SnapshotsInfoReply{} - err := S.WalkThroughTorrents(request.NetworkId, func(k, v []byte) (bool, error) { - var hash metainfo.Hash - if len(v) != metainfo.HashSize { - return true, nil - } - copy(hash[:], v) - t, ok := S.t.Cli.Torrent(hash) - if !ok { - return true, nil - } - - var gotInfo bool - readiness := int32(0) - select { - case <-t.GotInfo(): - gotInfo = true - readiness = int32(100 * (float64(t.BytesCompleted()) / float64(t.Info().TotalLength()))) - default: - } - - _, tpStr := ParseInfoHashKey(k) - tp, ok := snapshotsync.SnapshotType_value[tpStr] - if !ok { - return false, fmt.Errorf("incorrect type: %v", tpStr) - } - - val := &snapshotsync.SnapshotsInfo{ - Type: snapshotsync.SnapshotType(tp), - GotInfoByte: gotInfo, - Readiness: readiness, - SnapshotBlock: SnapshotBlock, - Dbpath: filepath.Join(S.t.snapshotsDir, t.Files()[0].Path()), - } - reply.Info = append(reply.Info, val) - - return true, nil - }) + resp, err := S.t.GetSnapshots(S.db, request.NetworkId) if err != nil { return nil, err } + for i := range resp { + reply.Info = append(reply.Info, resp[i]) + } return &reply, nil } - -func (S *SNDownloaderServer) WalkThroughTorrents(networkID uint64, f func(k, v []byte) (bool, error)) error { - networkIDBytes := make([]byte, 8) - binary.BigEndian.PutUint64(networkIDBytes, networkID) - return S.db.Walk(dbutils.SnapshotInfoBucket, append(networkIDBytes, []byte(SnapshotInfoHashPrefix)...), 8*8+16, f) -} diff --git a/turbo/snapshotsync/postprocessing.go b/turbo/snapshotsync/postprocessing.go index 992af8225..c9e2af730 100644 --- a/turbo/snapshotsync/postprocessing.go +++ b/turbo/snapshotsync/postprocessing.go @@ -24,19 +24,27 @@ var ( HeaderCanonical = stages.SyncStage("snapshot_canonical") ) -func PostProcessing(db ethdb.Database, mode SnapshotMode) error { +func PostProcessing(db ethdb.Database, mode SnapshotMode, downloadedSnapshots map[SnapshotType]*SnapshotsInfo) error { if mode.Headers { err := GenerateHeaderIndexes(context.Background(), db) if err != nil { return err } } + if mode.Bodies { err := PostProcessBodies(db) if err != nil { return err } } + + if mode.State { + err := PostProcessState(db, downloadedSnapshots[SnapshotType_state]) + if err != nil { + return err + } + } return nil } @@ -67,6 +75,23 @@ func PostProcessBodies(db ethdb.Database) error { return nil } +func PostProcessState(db ethdb.GetterPutter, info *SnapshotsInfo) error { + v, _, err := stages.GetStageProgress(db, stages.Execution) + if err != nil { + return err + } + + if v > 0 { + return nil + } + + err = stages.SaveStageProgress(db, stages.Execution, info.SnapshotBlock, nil) + if err != nil { + return err + } + return nil +} + func GenerateHeaderIndexes(ctx context.Context, db ethdb.Database) error { var hash common.Hash var number uint64 @@ -78,12 +103,12 @@ func GenerateHeaderIndexes(ctx context.Context, db ethdb.Database) error { if v == 0 { log.Info("Generate headers hash to number index") - headHashBytes, innerErr := db.Get(dbutils.SnapshotInfoBucket, []byte(dbutils.SnapshotHeadersHeadHash)) + headHashBytes, innerErr := db.Get(dbutils.HeadersSnapshotInfoBucket, []byte(dbutils.SnapshotHeadersHeadHash)) if innerErr != nil { return innerErr } - headNumberBytes, innerErr := db.Get(dbutils.SnapshotInfoBucket, []byte(dbutils.SnapshotHeadersHeadNumber)) + headNumberBytes, innerErr := db.Get(dbutils.HeadersSnapshotInfoBucket, []byte(dbutils.SnapshotHeadersHeadNumber)) if innerErr != nil { return innerErr } diff --git a/turbo/snapshotsync/postprocessing_test.go b/turbo/snapshotsync/postprocessing_test.go index f02b3db4c..6aa6ee894 100644 --- a/turbo/snapshotsync/postprocessing_test.go +++ b/turbo/snapshotsync/postprocessing_test.go @@ -30,7 +30,7 @@ func TestHeadersGenerateIndex(t *testing.T) { panic(innerErr) } } - c := tx.Cursor(dbutils.SnapshotInfoBucket) + c := tx.Cursor(dbutils.HeadersSnapshotInfoBucket) innerErr := c.Put([]byte(dbutils.SnapshotHeadersHeadHash), headers[len(headers)-1].Hash().Bytes()) if innerErr != nil { return innerErr @@ -55,7 +55,7 @@ func TestHeadersGenerateIndex(t *testing.T) { } snKV := ethdb.NewLMDB().Path(snPath).Flags(func(flags uint) uint { return flags | lmdb.Readonly }).WithBucketsConfig(ethdb.DefaultBucketConfigs).MustOpen() - snKV = ethdb.NewSnapshotKV().For(dbutils.HeaderPrefix).For(dbutils.SnapshotInfoBucket).SnapshotDB(snKV).DB(db).MustOpen() + snKV = ethdb.NewSnapshot2KV().SnapshotDB([]string{dbutils.HeadersSnapshotInfoBucket, dbutils.HeaderPrefix}, snKV).DB(db).MustOpen() err = GenerateHeaderIndexes(context.Background(), ethdb.NewObjectDatabase(snKV)) if err != nil { t.Fatal(err) diff --git a/turbo/snapshotsync/wrapdb.go b/turbo/snapshotsync/wrapdb.go index a4e775e83..b3a502472 100644 --- a/turbo/snapshotsync/wrapdb.go +++ b/turbo/snapshotsync/wrapdb.go @@ -10,18 +10,31 @@ import ( var ( bucketConfigs = map[SnapshotType]dbutils.BucketsCfg{ SnapshotType_bodies: { - dbutils.BlockBodyPrefix: dbutils.BucketConfigItem{}, - dbutils.SnapshotInfoBucket: dbutils.BucketConfigItem{}, + dbutils.BlockBodyPrefix: dbutils.BucketConfigItem{}, + dbutils.BodiesSnapshotInfoBucket: dbutils.BucketConfigItem{}, }, SnapshotType_headers: { - dbutils.HeaderPrefix: dbutils.BucketConfigItem{}, - dbutils.SnapshotInfoBucket: dbutils.BucketConfigItem{}, + dbutils.HeaderPrefix: dbutils.BucketConfigItem{}, + dbutils.HeadersSnapshotInfoBucket: dbutils.BucketConfigItem{}, + }, + SnapshotType_state: { + dbutils.PlainStateBucket: dbutils.BucketConfigItem{ + Flags: dbutils.DupSort, + AutoDupSortKeysConversion: true, + DupFromLen: 60, + DupToLen: 28, + }, + dbutils.PlainContractCodeBucket: dbutils.BucketConfigItem{}, + dbutils.CodeBucket: dbutils.BucketConfigItem{}, + dbutils.StateSnapshotInfoBucket: dbutils.BucketConfigItem{}, }, } ) -func WrapBySnapshots(kv ethdb.KV, snapshotDir string, mode SnapshotMode) (ethdb.KV, error) { +func WrapBySnapshotsFromDir(kv ethdb.KV, snapshotDir string, mode SnapshotMode) (ethdb.KV, error) { log.Info("Wrap db to snapshots", "dir", snapshotDir, "mode", mode.ToString()) + snkv := ethdb.NewSnapshot2KV().DB(kv) + if mode.Bodies { snapshotKV, err := ethdb.NewLMDB().Flags(func(flags uint) uint { return flags | lmdb.Readonly }).Path(snapshotDir + "/bodies").WithBucketsConfig(func(defaultBuckets dbutils.BucketsCfg) dbutils.BucketsCfg { return bucketConfigs[SnapshotType_bodies] @@ -30,10 +43,7 @@ func WrapBySnapshots(kv ethdb.KV, snapshotDir string, mode SnapshotMode) (ethdb. log.Error("Can't open body snapshot", "err", err) return nil, err } else { //nolint - kv = ethdb.NewSnapshotKV().SnapshotDB(snapshotKV). - For(dbutils.BlockBodyPrefix). - For(dbutils.SnapshotInfoBucket). - DB(kv).MustOpen() + snkv.SnapshotDB([]string{dbutils.BlockBodyPrefix, dbutils.BodiesSnapshotInfoBucket}, snapshotKV) } } @@ -45,33 +55,44 @@ func WrapBySnapshots(kv ethdb.KV, snapshotDir string, mode SnapshotMode) (ethdb. log.Error("Can't open headers snapshot", "err", err) return nil, err } else { //nolint - kv = ethdb.NewSnapshotKV().SnapshotDB(snapshotKV). - For(dbutils.HeaderPrefix). - For(dbutils.SnapshotInfoBucket). - DB(kv).MustOpen() + snkv.SnapshotDB([]string{dbutils.HeaderPrefix, dbutils.HeadersSnapshotInfoBucket}, snapshotKV) } } - return kv, nil + if mode.State { + snapshotKV, err := ethdb.NewLMDB().Flags(func(flags uint) uint { return flags | lmdb.Readonly }).Path(snapshotDir + "/headers").WithBucketsConfig(func(defaultBuckets dbutils.BucketsCfg) dbutils.BucketsCfg { + return bucketConfigs[SnapshotType_headers] + }).Open() + if err != nil { + log.Error("Can't open headers snapshot", "err", err) + return nil, err + } else { //nolint + snkv.SnapshotDB([]string{dbutils.StateSnapshotInfoBucket, dbutils.PlainStateBucket, dbutils.PlainContractCodeBucket, dbutils.CodeBucket}, snapshotKV) + } + } + return snkv.MustOpen(), nil } -func WrapBySnapshots2(kv ethdb.KV, snapshots map[SnapshotType]*SnapshotsInfo) (ethdb.KV, error) { - +func WrapBySnapshotsFromDownloader(kv ethdb.KV, snapshots map[SnapshotType]*SnapshotsInfo) (ethdb.KV, error) { + snKV := ethdb.NewSnapshot2KV().DB(kv) for k, v := range snapshots { log.Info("Wrap db by", "snapshot", k.String(), "dir", v.Dbpath) cfg := bucketConfigs[k] snapshotKV, err := ethdb.NewLMDB().Flags(func(flags uint) uint { return flags | lmdb.Readonly }).Path(v.Dbpath).WithBucketsConfig(func(defaultBuckets dbutils.BucketsCfg) dbutils.BucketsCfg { return cfg }).Open() + if err != nil { log.Error("Can't open snapshot", "err", err) return nil, err } else { //nolint - snKV := ethdb.NewSnapshotKV().SnapshotDB(snapshotKV) - for i := range bucketConfigs[k] { - snKV.For(i) + buckets := make([]string, 0, 1) + for bucket := range bucketConfigs[k] { + buckets = append(buckets, bucket) } - kv = snKV.DB(kv).MustOpen() + + snKV.SnapshotDB(buckets, snapshotKV) } } - return kv, nil + + return snKV.MustOpen(), nil }