From f03d08c5ce688352d3598f36d502a6a5cde7d3dc Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Sat, 12 Mar 2022 17:26:11 +0700 Subject: [PATCH] Snapshots: retire testing tool (#3684) --- cmd/downloader/debug/debug_test.go | 375 ------- cmd/downloader/downloader/downloader.go | 6 +- cmd/downloader/downloader/util.go | 4 +- eth/stagedsync/stage_senders.go | 2 - ethdb/snapshotdb/kv_snapshot.go | 1032 ------------------ ethdb/snapshotdb/kv_snapshot_test.go | 1305 ----------------------- go.mod | 2 +- go.sum | 4 +- turbo/app/snapshots.go | 20 +- turbo/snapshotsync/block_snapshots.go | 16 +- turbo/snapshotsync/wrapdb.go | 60 -- 11 files changed, 34 insertions(+), 2792 deletions(-) delete mode 100644 cmd/downloader/debug/debug_test.go delete mode 100644 ethdb/snapshotdb/kv_snapshot.go delete mode 100644 ethdb/snapshotdb/kv_snapshot_test.go delete mode 100644 turbo/snapshotsync/wrapdb.go diff --git a/cmd/downloader/debug/debug_test.go b/cmd/downloader/debug/debug_test.go deleted file mode 100644 index cee5e5b59..000000000 --- a/cmd/downloader/debug/debug_test.go +++ /dev/null @@ -1,375 +0,0 @@ -package debug - -import ( - "context" - "encoding/binary" - "fmt" - "os" - "testing" - "time" - - "github.com/holiman/uint256" - "github.com/ledgerwatch/erigon-lib/kv" - "github.com/ledgerwatch/erigon-lib/kv/mdbx" - "github.com/ledgerwatch/erigon/common" - "github.com/ledgerwatch/erigon/common/dbutils" - "github.com/ledgerwatch/erigon/consensus/ethash" - "github.com/ledgerwatch/erigon/core" - "github.com/ledgerwatch/erigon/core/rawdb" - "github.com/ledgerwatch/erigon/core/state" - "github.com/ledgerwatch/erigon/core/types" - "github.com/ledgerwatch/erigon/core/types/accounts" - "github.com/ledgerwatch/erigon/core/vm" - "github.com/ledgerwatch/erigon/ethdb" - "github.com/ledgerwatch/erigon/ethdb/snapshotdb" - "github.com/ledgerwatch/erigon/rlp" - "github.com/ledgerwatch/log/v3" -) - -const ( - AccountDiff = "accdiff" - StorageDiff = "stdiff" - ContractDiff = "contractdiff" - Deleted = "it is deleted" -) - -func WithBlock(block uint64, key []byte) []byte { - b := make([]byte, 8) - binary.BigEndian.PutUint64(b, block) - return append(b, key...) -} -func TestMatreshkaStream(t *testing.T) { - t.Skip() - chaindatadir := "/media/b00ris/nvme/fresh_sync/tg/chaindata" - tmpDbDir := "/home/b00ris/event_stream" - - chaindata, err := mdbx.Open(chaindatadir, log.New(), true) - if err != nil { - t.Fatal(err) - } - //tmpDb:=ethdb.NewMemDatabase() - os.RemoveAll(tmpDbDir) - - db, err := mdbx.NewMDBX(log.New()).Path(tmpDbDir).WithTablessCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { - defaultBuckets[AccountDiff] = kv.TableCfgItem{} - defaultBuckets[StorageDiff] = kv.TableCfgItem{} - defaultBuckets[ContractDiff] = kv.TableCfgItem{} - return defaultBuckets - }).Open() - if err != nil { - t.Fatal(err) - } - - chainConfig, _, genesisErr := core.CommitGenesisBlock(db, core.DefaultGenesisBlock()) - if genesisErr != nil { - t.Fatal(err) - } - if err := db.Update(context.Background(), func(tx kv.RwTx) error { - return tx.ClearBucket(kv.HeadHeaderKey) - }); err != nil { - t.Fatal(err) - } - - snkv := snapshotdb.NewSnapshotKV().DB(db). - //broken - //SnapshotDB([]string{dbutils.Headers, dbutils.HeaderCanonical, dbutils.HeaderTD, dbutils.HeaderNumber, dbutils.BlockBody, dbutils.HeadHeaderKey, dbutils.Senders}, chaindata.RwDB()). - Open() - _ = chaindata - defer snkv.Close() - - tx, err := snkv.BeginRw(context.Background()) - if err != nil { - t.Fatal(err) - } - defer tx.Rollback() - // - //tx, err := db.Begin(context.Background(), ethdb.RW) - //if err != nil { - // t.Fatal(err) - //} - psCursor, err := tx.Cursor(kv.PlainState) - if err != nil { - t.Fatal(err) - } - - i := 5 - err = ethdb.Walk(psCursor, []byte{}, 0, func(k, v []byte) (bool, error) { - fmt.Println(common.Bytes2Hex(k)) - i-- - if i == 0 { - return false, nil - } - return true, nil - }) - if err != nil { - t.Fatal(err) - } - - currentBlock := rawdb.ReadCurrentHeader(tx) - fmt.Println("currentBlock", currentBlock.Number.Uint64()) - blockNum := uint64(1) - limit := currentBlock.Number.Uint64() - getHeader := func(hash common.Hash, number uint64) *types.Header { return rawdb.ReadHeader(tx, hash, number) } - - stateReaderWriter := NewDebugReaderWriter(state.NewPlainStateReader(tx), state.NewPlainStateWriter(tx, tx, blockNum)) - tt := time.Now() - ttt := time.Now() - for currentBlock := blockNum; currentBlock < blockNum+limit; currentBlock++ { - stateReaderWriter.UpdateWriter(state.NewPlainStateWriter(tx, tx, currentBlock)) - block, err := rawdb.ReadBlockByNumber(tx, currentBlock) - if err != nil { - t.Fatal(err, currentBlock) - } - - contractHasTEVM := ethdb.GetHasTEVM(tx) - - _, _, err = core.ExecuteBlockEphemerally(chainConfig, &vm.Config{NoReceipts: true}, getHeader, ethash.NewFaker(), block, stateReaderWriter, stateReaderWriter, nil, nil, contractHasTEVM) - if err != nil { - t.Fatal(err, currentBlock) - } - cs := stateReaderWriter.UpdatedAccouts() - accDiffLen := len(cs) - for i := range cs { - if len(cs[i].Value) == 0 { - cs[i].Value = []byte(Deleted) - } - err = tx.Put(AccountDiff, WithBlock(currentBlock, cs[i].Key), cs[i].Value) - if err != nil { - t.Fatal(err, cs[i].Key, currentBlock) - } - } - cs = stateReaderWriter.UpdatedStorage() - stDiffLen := len(cs) - for i := range cs { - if len(cs[i].Value) == 0 { - cs[i].Value = []byte(Deleted) - } - err = tx.Put(StorageDiff, WithBlock(currentBlock, cs[i].Key), cs[i].Value) - if err != nil { - t.Fatal(err, cs[i].Key, currentBlock) - } - } - cs = stateReaderWriter.UpdatedCodes() - codesDiffLen := len(cs) - for i := range cs { - if len(cs[i].Value) == 0 { - cs[i].Value = []byte(Deleted) - } - err = tx.Put(ContractDiff, WithBlock(currentBlock, cs[i].Key), cs[i].Value) - if err != nil { - t.Fatal(err, cs[i].Key, currentBlock) - } - } - - stateReaderWriter.Reset() - if currentBlock%10000 == 0 { - err = tx.Commit() - if err != nil { - t.Fatal(err, currentBlock) - } - tx, err = snkv.BeginRw(context.Background()) - if err != nil { - t.Fatal(err, currentBlock) - } - defer tx.Rollback() - - dr := time.Since(ttt) - fmt.Println(currentBlock, "finished", "acc-", accDiffLen, "st-", stDiffLen, "codes - ", codesDiffLen, "all -", time.Since(tt), "chunk - ", dr, "blocks/s", 10000/dr.Seconds()) - ttt = time.Now() - } - } - err = tx.Commit() - if err != nil { - t.Fatal(err) - } - - fmt.Println("End") - //spew.Dump("readAcc",len(stateReaderWriter.readAcc)) - //spew.Dump("readStr",len(stateReaderWriter.readStorage)) - //spew.Dump("createdContracts", len(stateReaderWriter.createdContracts)) - //spew.Dump("deleted",len(stateReaderWriter.deletedAcc)) - -} - -var _ state.StateReader = &DebugReaderWriter{} -var _ state.WriterWithChangeSets = &DebugReaderWriter{} - -func NewDebugReaderWriter(r state.StateReader, w state.WriterWithChangeSets) *DebugReaderWriter { - return &DebugReaderWriter{ - r: r, - w: w, - //readAcc: make(map[common.Address]struct{}), - //readStorage: make(map[string]struct{}), - //readCodes: make(map[common.Hash]struct{}), - //readIncarnations: make(map[common.Address]struct{}), - - updatedAcc: make(map[common.Address][]byte), - updatedStorage: make(map[string][]byte), - updatedCodes: make(map[common.Hash][]byte), - //deletedAcc: make(map[common.Address]struct{}), - //createdContracts: make(map[common.Address]struct{}), - - } -} - -type DebugReaderWriter struct { - r state.StateReader - w state.WriterWithChangeSets - //readAcc map[common.Address]struct{} - //readStorage map[string]struct{} - //readCodes map[common.Hash] struct{} - //readIncarnations map[common.Address] struct{} - updatedAcc map[common.Address][]byte - updatedStorage map[string][]byte - updatedCodes map[common.Hash][]byte - //deletedAcc map[common.Address]struct{} - //createdContracts map[common.Address]struct{} -} - -func (d *DebugReaderWriter) Reset() { - d.updatedAcc = map[common.Address][]byte{} - d.updatedStorage = map[string][]byte{} - d.updatedCodes = map[common.Hash][]byte{} -} -func (d *DebugReaderWriter) UpdateWriter(w state.WriterWithChangeSets) { - d.w = w -} - -func (d *DebugReaderWriter) ReadAccountData(address common.Address) (*accounts.Account, error) { - //d.readAcc[address] = struct{}{} - return d.r.ReadAccountData(address) -} - -func (d *DebugReaderWriter) ReadAccountStorage(address common.Address, incarnation uint64, key *common.Hash) ([]byte, error) { - //d.readStorage[string(dbutils.PlainGenerateCompositeStorageKey(address.Bytes(),incarnation, key.Bytes()))] = struct{}{} - return d.r.ReadAccountStorage(address, incarnation, key) -} - -func (d *DebugReaderWriter) ReadAccountCode(address common.Address, incarnation uint64, codeHash common.Hash) ([]byte, error) { - //d.readCodes[codeHash] = struct{}{} - return d.r.ReadAccountCode(address, incarnation, codeHash) -} - -func (d *DebugReaderWriter) ReadAccountCodeSize(address common.Address, incarnation uint64, codeHash common.Hash) (int, error) { - return d.r.ReadAccountCodeSize(address, incarnation, codeHash) -} - -func (d *DebugReaderWriter) ReadAccountIncarnation(address common.Address) (uint64, error) { - //d.readIncarnations[address] = struct{}{} - return d.r.ReadAccountIncarnation(address) -} - -func (d *DebugReaderWriter) WriteChangeSets() error { - return d.w.WriteChangeSets() -} - -func (d *DebugReaderWriter) WriteHistory() error { - return d.w.WriteHistory() -} - -func (d *DebugReaderWriter) UpdateAccountData(address common.Address, original, account *accounts.Account) error { - b, err := rlp.EncodeToBytes(account) - if err != nil { - return err - } - d.updatedAcc[address] = b - return d.w.UpdateAccountData(address, original, account) -} - -func (d *DebugReaderWriter) UpdateAccountCode(address common.Address, incarnation uint64, codeHash common.Hash, code []byte) error { - d.updatedCodes[codeHash] = code - return d.w.UpdateAccountCode(address, incarnation, codeHash, code) -} - -func (d *DebugReaderWriter) DeleteAccount(address common.Address, original *accounts.Account) error { - d.updatedAcc[address] = nil - //d.deletedAcc[address]= struct{}{} - return d.w.DeleteAccount(address, original) -} - -func (d *DebugReaderWriter) WriteAccountStorage(address common.Address, incarnation uint64, key *common.Hash, original, value *uint256.Int) error { - d.updatedStorage[string(dbutils.PlainGenerateCompositeStorageKey(address.Bytes(), incarnation, key.Bytes()))] = value.Bytes() - return d.w.WriteAccountStorage(address, incarnation, key, original, value) -} - -func (d *DebugReaderWriter) CreateContract(address common.Address) error { - //d.createdContracts[address] = struct{}{} - return d.w.CreateContract(address) -} - -type Change struct { - Key []byte - Value []byte -} - -func (d *DebugReaderWriter) UpdatedAccouts() []Change { - ch := make([]Change, 0, len(d.updatedAcc)) - for k, v := range d.updatedAcc { - ch = append(ch, Change{ - Key: common.CopyBytes(k.Bytes()), - Value: common.CopyBytes(v), - }) - } - return ch -} -func (d *DebugReaderWriter) UpdatedStorage() []Change { - ch := make([]Change, 0, len(d.updatedStorage)) - for k, v := range d.updatedStorage { - ch = append(ch, Change{ - Key: common.CopyBytes([]byte(k)), - Value: common.CopyBytes(v), - }) - } - return ch - -} -func (d *DebugReaderWriter) UpdatedCodes() []Change { - ch := make([]Change, 0, len(d.updatedCodes)) - for k, v := range d.updatedCodes { - ch = append(ch, Change{ - Key: common.CopyBytes(k.Bytes()), - Value: common.CopyBytes(v), - }) - } - return ch -} - -//func (d *DebugReaderWriter) AllAccounts() map[common.Address]struct{} { -// accs:=make(map[common.Address]struct{}) -// for i:=range d.readAcc { -// accs[i]=struct{}{} -// } -// for i:=range d.updatedAcc { -// accs[i]=struct{}{} -// } -// for i:=range d.readIncarnations { -// accs[i]=struct{}{} -// } -// for i:=range d.deletedAcc { -// accs[i]=struct{}{} -// } -// for i:=range d.createdContracts { -// accs[i]=struct{}{} -// } -// return accs -//} -//func (d *DebugReaderWriter) AllStorage() map[string]struct{} { -// st:=make(map[string]struct{}) -// for i:=range d.readStorage { -// st[i]=struct{}{} -// } -// for i:=range d.updatedStorage { -// st[i]=struct{}{} -// } -// return st -//} -//func (d *DebugReaderWriter) AllCodes() map[common.Hash]struct{} { -// c:=make(map[common.Hash]struct{}) -// for i:=range d.readCodes { -// c[i]=struct{}{} -// } -// for i:=range d.updatedCodes { -// c[i]=struct{}{} -// } -// return c -//} diff --git a/cmd/downloader/downloader/downloader.go b/cmd/downloader/downloader/downloader.go index 3d36b8f8c..d8593e162 100644 --- a/cmd/downloader/downloader/downloader.go +++ b/cmd/downloader/downloader/downloader.go @@ -243,11 +243,11 @@ func AddTorrentFiles(ctx context.Context, snapshotsDir *dir.Rw, torrentClient *t // ResolveAbsentTorrents - add hard-coded hashes (if client doesn't have) as magnet links and download everything func ResolveAbsentTorrents(ctx context.Context, torrentClient *torrent.Client, preverifiedHashes []metainfo.Hash, snapshotDir *dir.Rw, silent bool) error { mi := &metainfo.MetaInfo{AnnounceList: Trackers} - for _, infoHash := range preverifiedHashes { - if _, ok := torrentClient.Torrent(infoHash); ok { + for i := range preverifiedHashes { + if _, ok := torrentClient.Torrent(preverifiedHashes[i]); ok { continue } - magnet := mi.Magnet(&infoHash, nil) + magnet := mi.Magnet(&preverifiedHashes[i], nil) t, err := torrentClient.AddMagnet(magnet.String()) if err != nil { return err diff --git a/cmd/downloader/downloader/util.go b/cmd/downloader/downloader/util.go index 27995fdc5..ad055ff87 100644 --- a/cmd/downloader/downloader/util.go +++ b/cmd/downloader/downloader/util.go @@ -3,7 +3,7 @@ package downloader import ( "bytes" "context" - "crypto/sha1" + "crypto/sha1" //nolint:gosec "errors" "fmt" "io" @@ -206,7 +206,7 @@ func verifyTorrent(info *metainfo.Info, root string, consumer func(i int, good b span.InitIndex() for i, numPieces := 0, info.NumPieces(); i < numPieces; i += 1 { p := info.Piece(i) - hash := sha1.New() + hash := sha1.New() //nolint:gosec _, err := io.Copy(hash, io.NewSectionReader(span, p.Offset(), p.Length())) if err != nil { return err diff --git a/eth/stagedsync/stage_senders.go b/eth/stagedsync/stage_senders.go index c95888454..b01c2b588 100644 --- a/eth/stagedsync/stage_senders.go +++ b/eth/stagedsync/stage_senders.go @@ -403,7 +403,6 @@ func retireBlocks(s *PruneState, tx kv.RwTx, cfg SendersCfg, ctx context.Context } //TODO: avoid too large deletes - log.Info("[snapshots] Retire blocks", "from", blockFrom, "to", blockTo) chainID, _ := uint256.FromBig(cfg.chainConfig.ChainID) wg := sync.WaitGroup{} wg.Add(1) @@ -432,6 +431,5 @@ func retireBlocks(s *PruneState, tx kv.RwTx, cfg SendersCfg, ctx context.Context defer wg.Done() }() wg.Wait() - fmt.Printf("sn runtime dump: %d-%d\n", blockFrom, blockTo) return nil } diff --git a/ethdb/snapshotdb/kv_snapshot.go b/ethdb/snapshotdb/kv_snapshot.go deleted file mode 100644 index 955d20700..000000000 --- a/ethdb/snapshotdb/kv_snapshot.go +++ /dev/null @@ -1,1032 +0,0 @@ -package snapshotdb - -import ( - "bytes" - "context" - "errors" - "fmt" - "sync" - - "github.com/ledgerwatch/erigon-lib/kv" - "github.com/ledgerwatch/erigon/common" - "github.com/ledgerwatch/erigon/ethdb" - "github.com/ledgerwatch/log/v3" -) - -var ( - _ kv.RwDB = &SnapshotKV{} - _ kv.RoDB = &SnapshotKV{} - _ kv.Tx = &snTX{} - _ kv.BucketMigrator = &snTX{} - _ kv.RwCursor = &snCursor{} - _ kv.Cursor = &snCursor{} -) - -type SnapshotUpdater interface { - UpdateSnapshots(tp string, snapshotKV kv.RoDB, done chan struct{}) - HeadersSnapshot() kv.RoDB - BodiesSnapshot() kv.RoDB - StateSnapshot() kv.RoDB -} - -type WriteDB interface { - WriteDB() kv.RwDB -} - -func NewSnapshotKV() snapshotOpts { - - return snapshotOpts{} -} - -type snapshotOpts struct { - db kv.RwDB - headersSnapshot kv.RoDB - bodiesSnapshot kv.RoDB - stateSnapshot kv.RoDB -} - -func (opts snapshotOpts) HeadersSnapshot(kv kv.RoDB) snapshotOpts { - opts.headersSnapshot = kv - return opts -} -func (opts snapshotOpts) BodiesSnapshot(kv kv.RoDB) snapshotOpts { - opts.bodiesSnapshot = kv - return opts -} -func (opts snapshotOpts) StateSnapshot(kv kv.RoDB) snapshotOpts { - opts.stateSnapshot = kv - return opts -} - -func (opts snapshotOpts) DB(db kv.RwDB) snapshotOpts { - opts.db = db - return opts -} - -func (opts snapshotOpts) Open() *SnapshotKV { - return &SnapshotKV{ - headersSnapshot: opts.headersSnapshot, - bodiesSnapshot: opts.bodiesSnapshot, - stateSnapshot: opts.stateSnapshot, - db: opts.db, - } -} - -type SnapshotKV struct { - db kv.RwDB - headersSnapshot kv.RoDB - bodiesSnapshot kv.RoDB - stateSnapshot kv.RoDB - mtx sync.RWMutex - - tmpDB kv.RwDB - tmpDBBuckets map[string]struct{} -} - -func (s *SnapshotKV) View(ctx context.Context, f func(tx kv.Tx) error) error { - snTX, err := s.BeginRo(ctx) - if err != nil { - return err - } - defer snTX.Rollback() - return f(snTX) -} - -func (s *SnapshotKV) Update(ctx context.Context, f func(tx kv.RwTx) error) error { - tx, err := s.BeginRw(ctx) - if err != nil { - return err - } - defer tx.Rollback() - - err = f(tx) - if err == nil { - return tx.Commit() - } - return err -} - -func (s *SnapshotKV) Close() { - defer s.db.Close() - s.mtx.Lock() - defer s.mtx.Unlock() - if s.headersSnapshot != nil { - defer s.headersSnapshot.Close() - } - if s.bodiesSnapshot != nil { - defer s.bodiesSnapshot.Close() - } - if s.stateSnapshot != nil { - defer s.stateSnapshot.Close() - } -} - -func (s *SnapshotKV) UpdateSnapshots(tp string, snapshotKV kv.RoDB, done chan struct{}) { - var toClose kv.RoDB - s.mtx.Lock() - defer s.mtx.Unlock() - switch { - case tp == "headers": - toClose = s.headersSnapshot - s.headersSnapshot = snapshotKV - case tp == "bodies": - toClose = s.bodiesSnapshot - s.bodiesSnapshot = snapshotKV - case tp == "state": - toClose = s.stateSnapshot - s.stateSnapshot = snapshotKV - default: - log.Error("incorrect type", "tp", tp) - } - - go func() { - if toClose != nil { - toClose.Close() - } - done <- struct{}{} - log.Info("old snapshot closed", "tp", tp) - }() -} - -func (s *SnapshotKV) WriteDB() kv.RwDB { - return s.db -} - -func (s *SnapshotKV) TempDB() kv.RwDB { - return s.tmpDB -} - -func (s *SnapshotKV) SetTempDB(kv kv.RwDB, buckets []string) { - bucketsMap := make(map[string]struct{}, len(buckets)) - for _, bucket := range buckets { - bucketsMap[bucket] = struct{}{} - } - s.tmpDB = kv - s.tmpDBBuckets = bucketsMap -} - -//todo -func (s *SnapshotKV) HeadersSnapshot() kv.RoDB { - return s.headersSnapshot -} -func (s *SnapshotKV) BodiesSnapshot() kv.RoDB { - return s.bodiesSnapshot -} -func (s *SnapshotKV) StateSnapshot() kv.RoDB { - return s.stateSnapshot -} - -func (s *SnapshotKV) snapsthotsTx(ctx context.Context) (kv.Tx, kv.Tx, kv.Tx, error) { - var headersTX, bodiesTX, stateTX kv.Tx - var err error - defer func() { - if err != nil { - if headersTX != nil { - headersTX.Rollback() - } - if bodiesTX != nil { - bodiesTX.Rollback() - } - if stateTX != nil { - stateTX.Rollback() - } - } - }() - if s.headersSnapshot != nil { - headersTX, err = s.headersSnapshot.BeginRo(ctx) - if err != nil { - return nil, nil, nil, err - } - } - if s.bodiesSnapshot != nil { - bodiesTX, err = s.bodiesSnapshot.BeginRo(ctx) - if err != nil { - return nil, nil, nil, err - } - } - if s.stateSnapshot != nil { - stateTX, err = s.stateSnapshot.BeginRo(ctx) - if err != nil { - return nil, nil, nil, err - } - } - return headersTX, bodiesTX, stateTX, nil -} -func (s *SnapshotKV) BeginRo(ctx context.Context) (kv.Tx, error) { - dbTx, err := s.db.BeginRo(ctx) - if err != nil { - return nil, err - } - var tmpTX kv.Tx - if s.tmpDB != nil { - tmpTX, err = s.tmpDB.BeginRo(context.Background()) - if err != nil { - return nil, err - } - } - headersTX, bodiesTX, stateTX, err := s.snapsthotsTx(ctx) - if err != nil { - return nil, err - } - return &snTX{ - dbTX: dbTx, - headersTX: headersTX, - bodiesTX: bodiesTX, - stateTX: stateTX, - tmpTX: tmpTX, - buckets: s.tmpDBBuckets, - }, nil -} - -func (s *SnapshotKV) BeginRw(ctx context.Context) (kv.RwTx, error) { - dbTx, err := s.db.BeginRw(ctx) //nolint - if err != nil { - return nil, err - } - - var tmpTX kv.Tx - if s.tmpDB != nil { - tmpTX, err = s.tmpDB.BeginRw(context.Background()) - if err != nil { - return nil, err - } - } - - headersTX, bodiesTX, stateTX, err := s.snapsthotsTx(ctx) - if err != nil { - return nil, err - } - - return &snTX{ - dbTX: dbTx, - headersTX: headersTX, - bodiesTX: bodiesTX, - stateTX: stateTX, - tmpTX: tmpTX, - buckets: s.tmpDBBuckets, - }, nil -} - -func (s *SnapshotKV) AllBuckets() kv.TableCfg { - return s.db.AllBuckets() -} - -var ErrUnavailableSnapshot = errors.New("unavailable snapshot") - -type snTX struct { - dbTX kv.Tx - headersTX kv.Tx - bodiesTX kv.Tx - stateTX kv.Tx - - //just an experiment with temp db for state snapshot migration. - tmpTX kv.Tx - buckets map[string]struct{} -} - -type DBTX interface { - DBTX() kv.RwTx -} - -func (s *snTX) DBTX() kv.RwTx { return s.dbTX.(kv.RwTx) } -func (s *snTX) ViewID() uint64 { return s.dbTX.ViewID() } - -func (s *snTX) RwCursor(bucket string) (kv.RwCursor, error) { - if !IsSnapshotBucket(bucket) { - return s.dbTX.(kv.RwTx).RwCursor(bucket) - } - tx, err := s.getSnapshotTX(bucket) - if err != nil && !errors.Is(err, ErrUnavailableSnapshot) { - panic(err.Error()) - } - //process only db buckets - if errors.Is(err, ErrUnavailableSnapshot) { - return s.dbTX.(kv.RwTx).RwCursor(bucket) - } - - snCursor2, err := tx.Cursor(bucket) - if err != nil { - return nil, err - } - - if IsStateSnapshotSnapshotBucket(bucket) && s.tmpTX != nil { - mainDBCursor, err := s.dbTX.Cursor(bucket) - if err != nil { - return nil, err - } - tmpDBCursor, err := s.tmpTX.(kv.RwTx).RwCursor(bucket) - if err != nil { - return nil, err - } - - return &snCursor{ - dbCursor: &snCursor{ - dbCursor: tmpDBCursor, - snCursor: mainDBCursor, - }, - snCursor: snCursor2, - }, nil - } - dbCursor, err := s.dbTX.(kv.RwTx).RwCursor(bucket) - if err != nil { - return nil, err - } - - return &snCursor{ - dbCursor: dbCursor, - snCursor: snCursor2, - }, nil - -} - -func (s *snTX) DropBucket(bucket string) error { - return s.dbTX.(kv.BucketMigrator).DropBucket(bucket) -} - -func (s *snTX) CreateBucket(bucket string) error { - return s.dbTX.(kv.BucketMigrator).CreateBucket(bucket) -} - -func (s *snTX) ExistsBucket(bucket string) (bool, error) { - return s.dbTX.(kv.BucketMigrator).ExistsBucket(bucket) -} - -func (s *snTX) ClearBucket(bucket string) error { - return s.dbTX.(kv.BucketMigrator).ClearBucket(bucket) -} - -func (s *snTX) ListBuckets() ([]string, error) { - return s.dbTX.(kv.BucketMigrator).ListBuckets() -} - -func (s *snTX) Cursor(bucket string) (kv.Cursor, error) { - if !IsSnapshotBucket(bucket) { - return s.dbTX.Cursor(bucket) - } - - tx, err := s.getSnapshotTX(bucket) - if err != nil && !errors.Is(err, ErrUnavailableSnapshot) { - panic(err.Error()) - } - //process only db buckets - if errors.Is(err, ErrUnavailableSnapshot) { - return s.dbTX.Cursor(bucket) - } - dbCursor, err := s.dbTX.Cursor(bucket) - if err != nil { - return nil, err - } - snCursor2, err := tx.Cursor(bucket) - if err != nil { - return nil, err - } - if IsStateSnapshotSnapshotBucket(bucket) && s.tmpTX != nil { - tmpDBCursor, err := s.tmpTX.Cursor(bucket) - if err != nil { - return nil, err - } - - return &snCursor{ - dbCursor: &snCursor{ - dbCursor: tmpDBCursor, - snCursor: dbCursor, - }, - snCursor: snCursor2, - }, nil - } - return &snCursor{ - dbCursor: dbCursor, - snCursor: snCursor2, - }, nil -} - -func (s *snTX) CursorDupSort(bucket string) (kv.CursorDupSort, error) { - tx, err := s.getSnapshotTX(bucket) - if err != nil && !errors.Is(err, ErrUnavailableSnapshot) { - panic(err.Error()) - } - //process only db buckets - if errors.Is(err, ErrUnavailableSnapshot) { - return s.dbTX.CursorDupSort(bucket) - } - dbc, err := s.dbTX.CursorDupSort(bucket) - if err != nil { - return nil, err - } - sncbc, err := tx.CursorDupSort(bucket) - if err != nil { - return nil, err - } - return &snCursorDup{ - dbc, - sncbc, - snCursor{ - dbCursor: dbc, - snCursor: sncbc, - }, - }, nil -} - -func (s *snTX) RwCursorDupSort(bucket string) (kv.RwCursorDupSort, error) { - c, err := s.CursorDupSort(bucket) - if err != nil { - return nil, err - } - return c.(kv.RwCursorDupSort), nil -} -func (s *snTX) GetOne(bucket string, key []byte) (val []byte, err error) { - v, err := s.dbTX.GetOne(bucket, key) - if err != nil { - return nil, err - } - if len(v) == 0 { - snTx, innerErr := s.getSnapshotTX(bucket) - if innerErr != nil && !errors.Is(innerErr, ErrUnavailableSnapshot) { - return nil, innerErr - } - //process only db buckets - if errors.Is(innerErr, ErrUnavailableSnapshot) { - return v, nil - } - v, err = snTx.GetOne(bucket, key) - if err != nil { - return nil, err - } - if bytes.Equal(v, DeletedValue) { - return nil, nil - } - return v, nil - } - return v, nil -} - -func (s *snTX) Put(bucket string, k, v []byte) error { - if s.tmpTX != nil && IsStateSnapshotSnapshotBucket(bucket) { - return s.tmpTX.(kv.RwTx).Put(bucket, k, v) - } - return s.dbTX.(kv.RwTx).Put(bucket, k, v) -} -func (s *snTX) Append(bucket string, k, v []byte) error { - if s.tmpTX != nil && IsStateSnapshotSnapshotBucket(bucket) { - return s.tmpTX.(kv.RwTx).Put(bucket, k, v) - } - return s.dbTX.(kv.RwTx).Append(bucket, k, v) -} -func (s *snTX) AppendDup(bucket string, k, v []byte) error { - if s.tmpTX != nil && IsStateSnapshotSnapshotBucket(bucket) { - return s.tmpTX.(kv.RwTx).Put(bucket, k, v) - } - return s.dbTX.(kv.RwTx).AppendDup(bucket, k, v) -} -func (s *snTX) Delete(bucket string, k, v []byte) error { - //note we can't use Delete here, because we can't change snapshots - //if we delete in main database we can find the value in snapshot - //so we are just marking that this value is deleted. - //this value will be removed on snapshot merging - if s.tmpTX != nil && IsStateSnapshotSnapshotBucket(bucket) { - return s.tmpTX.(kv.RwTx).Put(bucket, k, DeletedValue) - } - - return s.dbTX.(kv.RwTx).Put(bucket, k, DeletedValue) -} - -func (s *snTX) CollectMetrics() { - if rw, ok := s.dbTX.(kv.RwTx); ok { - rw.CollectMetrics() - } -} - -func (s *snTX) getSnapshotTX(bucket string) (kv.Tx, error) { - var tx kv.Tx - switch bucket { - case kv.Headers: - tx = s.headersTX - case kv.BlockBody, kv.EthTx: - tx = s.bodiesTX - case kv.PlainState, kv.PlainContractCode, kv.Code: - tx = s.stateTX - } - if tx == nil { - return nil, fmt.Errorf("%s %w", bucket, ErrUnavailableSnapshot) - } - return tx, nil -} - -func (s *snTX) Has(bucket string, key []byte) (bool, error) { - v, err := s.dbTX.Has(bucket, key) - if err != nil { - return false, err - } - if !v { - snTx, err := s.getSnapshotTX(bucket) - if err != nil && !errors.Is(err, ErrUnavailableSnapshot) { - return false, err - } - //process only db buckets - if errors.Is(err, ErrUnavailableSnapshot) { - return v, nil - } - - v, err := snTx.GetOne(bucket, key) - if err != nil { - return false, err - } - if bytes.Equal(v, DeletedValue) { - return false, nil - } - - return true, nil - } - return v, nil -} - -func (s *snTX) ForEach(bucket string, fromPrefix []byte, walker func(k, v []byte) error) error { - c, err := s.Cursor(bucket) - if err != nil { - return err - } - defer c.Close() - - for k, v, err := c.Seek(fromPrefix); k != nil; k, v, err = c.Next() { - if err != nil { - return err - } - if err := walker(k, v); err != nil { - return err - } - } - return nil -} - -func (s *snTX) ForPrefix(bucket string, prefix []byte, walker func(k, v []byte) error) error { - c, err := s.Cursor(bucket) - if err != nil { - return err - } - defer c.Close() - - for k, v, err := c.Seek(prefix); k != nil; k, v, err = c.Next() { - if err != nil { - return err - } - if !bytes.HasPrefix(k, prefix) { - break - } - if err := walker(k, v); err != nil { - return err - } - } - return nil -} -func (s *snTX) ForAmount(bucket string, fromPrefix []byte, amount uint32, walker func(k, v []byte) error) error { - c, err := s.Cursor(bucket) - if err != nil { - return err - } - defer c.Close() - - for k, v, err := c.Seek(fromPrefix); k != nil && amount > 0; k, v, err = c.Next() { - if err != nil { - return err - } - if err := walker(k, v); err != nil { - return err - } - amount-- - } - return nil -} - -func (s *snTX) Commit() error { - defer s.snapshotsRollback() - if s.tmpTX != nil { - err := s.tmpTX.Commit() - if err != nil { - s.dbTX.Rollback() - return err - } - } - return s.dbTX.Commit() -} -func (s *snTX) snapshotsRollback() { - if s.headersTX != nil { - defer s.headersTX.Rollback() - } - if s.bodiesTX != nil { - defer s.bodiesTX.Rollback() - } - if s.stateTX != nil { - defer s.stateTX.Rollback() - } -} -func (s *snTX) Rollback() { - defer s.snapshotsRollback() - defer func() { - if s.tmpTX != nil { - s.tmpTX.Rollback() - } - }() - s.dbTX.Rollback() -} - -func (s *snTX) BucketSize(bucket string) (uint64, error) { - return s.dbTX.BucketSize(bucket) -} - -func (s *snTX) IncrementSequence(bucket string, amount uint64) (uint64, error) { - return s.dbTX.(kv.RwTx).IncrementSequence(bucket, amount) -} - -func (s *snTX) ReadSequence(bucket string) (uint64, error) { - return s.dbTX.ReadSequence(bucket) -} - -func (s *snTX) BucketExists(bucket string) (bool, error) { - return s.dbTX.(ethdb.BucketsMigrator).BucketExists(bucket) -} - -func (s *snTX) ClearBuckets(buckets ...string) error { - return s.dbTX.(ethdb.BucketsMigrator).ClearBuckets(buckets...) -} - -func (s *snTX) DropBuckets(buckets ...string) error { - return s.dbTX.(ethdb.BucketsMigrator).DropBuckets(buckets...) -} - -var DeletedValue = []byte{0} - -type snCursor struct { - dbCursor kv.Cursor - snCursor kv.Cursor - - currentKey []byte -} - -func (s *snCursor) First() ([]byte, []byte, error) { - var err error - lastDBKey, lastDBVal, err := s.dbCursor.First() - if err != nil { - return nil, nil, err - } - - for bytes.Equal(lastDBVal, DeletedValue) { - lastDBKey, lastDBVal, err = s.dbCursor.Next() - if err != nil { - return nil, nil, err - } - - } - - lastSNDBKey, lastSNDBVal, err := s.snCursor.First() - if err != nil { - return nil, nil, err - } - cmp, br := common.KeyCmp(lastDBKey, lastSNDBKey) - if br { - return nil, nil, nil - } - - if cmp <= 0 { - s.saveCurrent(lastDBKey) - return lastDBKey, lastDBVal, nil - } - s.saveCurrent(lastSNDBKey) - return lastSNDBKey, lastSNDBVal, nil -} - -func (s *snCursor) Seek(seek []byte) ([]byte, []byte, error) { - dbKey, dbVal, err := s.dbCursor.Seek(seek) - if err != nil && !errors.Is(err, ethdb.ErrKeyNotFound) { - return nil, nil, err - } - - for bytes.Equal(dbVal, DeletedValue) { - dbKey, dbVal, err = s.dbCursor.Next() - if err != nil { - return nil, nil, err - } - } - - sndbKey, sndbVal, err := s.snCursor.Seek(seek) - if err != nil && !errors.Is(err, ethdb.ErrKeyNotFound) { - return nil, nil, err - } - - if bytes.Equal(dbKey, seek) && dbVal != nil { - return dbKey, dbVal, err - } - if bytes.Equal(sndbKey, seek) && sndbVal != nil { - return sndbKey, sndbVal, err - } - cmp, _ := common.KeyCmp(dbKey, sndbKey) - if cmp <= 0 { - s.saveCurrent(dbKey) - return dbKey, dbVal, nil - } - s.saveCurrent(sndbKey) - return sndbKey, sndbVal, nil -} - -func (s *snCursor) SeekExact(key []byte) ([]byte, []byte, error) { - k, v, err := s.dbCursor.SeekExact(key) - if err != nil { - return nil, nil, err - } - if bytes.Equal(v, DeletedValue) { - return nil, nil, nil - } - if v == nil { - k, v, err = s.snCursor.SeekExact(key) - s.saveCurrent(k) - return k, v, err - } - s.saveCurrent(k) - return k, v, err -} - -func (s *snCursor) iteration(dbNextElement func() ([]byte, []byte, error), sndbNextElement func() ([]byte, []byte, error), cmpFunc func(kdb, ksndb []byte) (int, bool)) ([]byte, []byte, error) { - var err error - var noDBNext, noSnDBNext bool - //current returns error on empty bucket - lastDBKey, lastDBVal, err := s.dbCursor.Current() - if err != nil { - var innerErr error - lastDBKey, lastDBVal, innerErr = dbNextElement() - if innerErr != nil { - return nil, nil, fmt.Errorf("get current from db %w inner %v", err, innerErr) - } - noDBNext = true - } - - lastSNDBKey, lastSNDBVal, err := s.snCursor.Current() - if err != nil { - var innerErr error - lastSNDBKey, lastSNDBVal, innerErr = sndbNextElement() - if innerErr != nil { - return nil, nil, fmt.Errorf("get current from snapshot %w inner %v", err, innerErr) - } - noSnDBNext = true - } - - cmp, br := cmpFunc(lastDBKey, lastSNDBKey) - if br { - return nil, nil, nil - } - - //todo Seek fastpath - if cmp > 0 { - if !noSnDBNext { - lastSNDBKey, lastSNDBVal, err = sndbNextElement() - if err != nil { - return nil, nil, err - } - - if currentKeyCmp, _ := common.KeyCmp(s.currentKey, lastDBKey); len(lastSNDBKey) == 0 && currentKeyCmp >= 0 && len(s.currentKey) > 0 { - lastDBKey, lastDBVal, err = dbNextElement() - } - if err != nil { - return nil, nil, err - } - } - } - - //current receives last acceptable key. If it is empty - if cmp < 0 { - if !noDBNext { - lastDBKey, lastDBVal, err = dbNextElement() - if err != nil { - return nil, nil, err - } - if currentKeyCmp, _ := common.KeyCmp(s.currentKey, lastSNDBKey); len(lastDBKey) == 0 && currentKeyCmp >= 0 && len(s.currentKey) > 0 { - lastSNDBKey, lastSNDBVal, err = sndbNextElement() - } - if err != nil { - return nil, nil, err - } - } - } - if cmp == 0 { - if !noDBNext { - lastDBKey, lastDBVal, err = dbNextElement() - if err != nil { - return nil, nil, err - } - } - if !noSnDBNext { - lastSNDBKey, lastSNDBVal, err = sndbNextElement() - if err != nil { - return nil, nil, err - } - } - } - - cmp, br = cmpFunc(lastDBKey, lastSNDBKey) - if br { - return nil, nil, nil - } - if cmp <= 0 { - return lastDBKey, lastDBVal, nil - } - - return lastSNDBKey, lastSNDBVal, nil -} - -func (s *snCursor) Next() ([]byte, []byte, error) { - k, v, err := s.iteration(s.dbCursor.Next, s.snCursor.Next, common.KeyCmp) //f(s.dbCursor.Next, s.snCursor.Next) - if err != nil { - return nil, nil, err - } - for bytes.Equal(v, DeletedValue) { - k, v, err = s.iteration(s.dbCursor.Next, s.snCursor.Next, common.KeyCmp) // f(s.dbCursor.Next, s.snCursor.Next) - if err != nil { - return nil, nil, err - } - - } - s.saveCurrent(k) - return k, v, nil -} - -func (s *snCursor) Prev() ([]byte, []byte, error) { - k, v, err := s.iteration(s.dbCursor.Prev, s.snCursor.Prev, func(kdb, ksndb []byte) (int, bool) { - cmp, br := KeyCmpBackward(kdb, ksndb) - return -1 * cmp, br - }) - if err != nil { - return nil, nil, err - } - for cmp, _ := KeyCmpBackward(k, s.currentKey); bytes.Equal(v, DeletedValue) || cmp >= 0; cmp, _ = KeyCmpBackward(k, s.currentKey) { - k, v, err = s.iteration(s.dbCursor.Prev, s.snCursor.Prev, func(kdb, ksndb []byte) (int, bool) { - cmp, br := KeyCmpBackward(kdb, ksndb) - return -1 * cmp, br - }) - if err != nil { - return nil, nil, err - } - } - s.saveCurrent(k) - return k, v, nil -} - -func (s *snCursor) Last() ([]byte, []byte, error) { - var err error - lastSNDBKey, lastSNDBVal, err := s.snCursor.Last() - if err != nil { - return nil, nil, err - } - lastDBKey, lastDBVal, err := s.dbCursor.Last() - if err != nil { - return nil, nil, err - } - - for bytes.Equal(lastDBVal, DeletedValue) { - lastDBKey, lastDBVal, err = s.dbCursor.Prev() - if err != nil { - return nil, nil, err - } - } - - cmp, br := KeyCmpBackward(lastDBKey, lastSNDBKey) - if br { - return nil, nil, nil - } - - if cmp >= 0 { - s.saveCurrent(lastDBKey) - return lastDBKey, lastDBVal, nil - } - s.saveCurrent(lastSNDBKey) - return lastSNDBKey, lastSNDBVal, nil -} - -func (s *snCursor) Current() ([]byte, []byte, error) { - k, v, err := s.dbCursor.Current() - if bytes.Equal(k, s.currentKey) { - return k, v, err - } - return s.snCursor.Current() -} - -func (s *snCursor) Put(k, v []byte) error { - return s.dbCursor.(kv.RwCursor).Put(k, v) -} - -func (s *snCursor) Append(k []byte, v []byte) error { - return s.dbCursor.(kv.RwCursor).Append(k, v) -} - -func (s *snCursor) Delete(k, v []byte) error { - return s.dbCursor.(kv.RwCursor).Put(k, DeletedValue) -} - -func (s *snCursor) DeleteCurrent() error { - panic("implement me") -} - -func (s *snCursor) Count() (uint64, error) { - panic("implement me") -} - -func (s *snCursor) Close() { - s.dbCursor.Close() - s.snCursor.Close() -} - -type snCursorDup struct { - dbCursorDup kv.CursorDupSort - sndbCursorDup kv.CursorDupSort - snCursor -} - -func (c *snCursorDup) SeekBothExact(key, value []byte) ([]byte, []byte, error) { - k, v, err := c.dbCursorDup.SeekBothExact(key, value) - if err != nil { - return nil, nil, err - } - if v == nil { - k, v, err = c.sndbCursorDup.SeekBothExact(key, value) - c.saveCurrent(k) - return k, v, err - } - c.saveCurrent(k) - return k, v, err - -} - -func (c *snCursorDup) SeekBothRange(key, value []byte) ([]byte, error) { - dbVal, err := c.dbCursorDup.SeekBothRange(key, value) - if err != nil { - return nil, err - } - snDBVal, err := c.sndbCursorDup.SeekBothRange(key, value) - if err != nil { - return nil, err - } - - if dbVal == nil { - c.saveCurrent(key) - return dbVal, nil - } - - return snDBVal, nil -} - -func (c *snCursorDup) FirstDup() ([]byte, error) { - panic("implement me") -} - -func (c *snCursorDup) NextDup() ([]byte, []byte, error) { - panic("implement me") -} - -func (c *snCursorDup) NextNoDup() ([]byte, []byte, error) { - panic("implement me") -} - -func (c *snCursorDup) LastDup() ([]byte, error) { - panic("implement me") -} - -func (c *snCursorDup) CountDuplicates() (uint64, error) { - panic("implement me") -} - -func (c *snCursorDup) DeleteCurrentDuplicates() error { - panic("implement me") -} - -func (c *snCursorDup) AppendDup(key, value []byte) error { - panic("implement me") -} - -func (s *snCursor) saveCurrent(k []byte) { - if k != nil { - s.currentKey = common.CopyBytes(k) - } -} - -func KeyCmpBackward(key1, key2 []byte) (int, bool) { - switch { - case len(key1) == 0 && len(key2) == 0: - return 0, true - case len(key1) == 0 && len(key2) != 0: - return -1, false - case len(key1) != 0 && len(key2) == 0: - return 1, false - default: - return bytes.Compare(key1, key2), false - } -} - -func IsSnapshotBucket(bucket string) bool { - return IsStateSnapshotSnapshotBucket(bucket) || IsHeaderSnapshotSnapshotBucket(bucket) || IsBodiesSnapshotSnapshotBucket(bucket) -} -func IsHeaderSnapshotSnapshotBucket(bucket string) bool { - return bucket == kv.Headers -} -func IsBodiesSnapshotSnapshotBucket(bucket string) bool { - return bucket == kv.BlockBody || bucket == kv.EthTx -} -func IsStateSnapshotSnapshotBucket(bucket string) bool { - return bucket == kv.PlainState || bucket == kv.PlainContractCode || bucket == kv.Code -} diff --git a/ethdb/snapshotdb/kv_snapshot_test.go b/ethdb/snapshotdb/kv_snapshot_test.go deleted file mode 100644 index ce39dee72..000000000 --- a/ethdb/snapshotdb/kv_snapshot_test.go +++ /dev/null @@ -1,1305 +0,0 @@ -package snapshotdb - -import ( - "bytes" - "context" - "fmt" - "testing" - "time" - - "github.com/ledgerwatch/erigon-lib/kv" - "github.com/ledgerwatch/erigon-lib/kv/mdbx" - kv2 "github.com/ledgerwatch/erigon-lib/kv/memdb" - "github.com/ledgerwatch/erigon/common" - "github.com/ledgerwatch/erigon/common/dbutils" - "github.com/ledgerwatch/erigon/ethdb" - "github.com/ledgerwatch/log/v3" - "github.com/stretchr/testify/require" -) - -func TestSnapshot2Get(t *testing.T) { - logger := log.New() - sn1 := mdbx.NewMDBX(logger).WithTablessCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { - return kv.TableCfg{ - kv.Headers: kv.TableCfgItem{}, - } - }).InMem().MustOpen() - defer sn1.Close() - err := sn1.Update(context.Background(), func(tx kv.RwTx) error { - bucket, err := tx.RwCursor(kv.Headers) - if err != nil { - return err - } - innerErr := bucket.Put(dbutils.HeaderKey(1, common.Hash{1}), []byte{1}) - if innerErr != nil { - return innerErr - } - innerErr = bucket.Put(dbutils.HeaderKey(2, common.Hash{2}), []byte{2}) - if innerErr != nil { - return innerErr - } - - return nil - }) - if err != nil { - t.Fatal(err) - } - - sn2 := mdbx.NewMDBX(logger).WithTablessCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { - return kv.TableCfg{ - kv.BlockBody: kv.TableCfgItem{}, - } - }).InMem().MustOpen() - defer sn2.Close() - err = sn2.Update(context.Background(), func(tx kv.RwTx) error { - bucket, err := tx.RwCursor(kv.BlockBody) - require.NoError(t, err) - innerErr := bucket.Put(dbutils.BlockBodyKey(1, common.Hash{1}), []byte{1}) - if innerErr != nil { - return innerErr - } - innerErr = bucket.Put(dbutils.BlockBodyKey(2, common.Hash{2}), []byte{2}) - if innerErr != nil { - return innerErr - } - - return nil - }) - if err != nil { - t.Fatal(err) - } - - mainDB := kv2.NewTestDB(t) - err = mainDB.Update(context.Background(), func(tx kv.RwTx) error { - bucket, err := tx.RwCursor(kv.Headers) - if err != nil { - return err - } - innerErr := bucket.Put(dbutils.HeaderKey(2, common.Hash{2}), []byte{22}) - if innerErr != nil { - return innerErr - } - innerErr = bucket.Put(dbutils.HeaderKey(3, common.Hash{3}), []byte{33}) - if innerErr != nil { - return innerErr - } - - bucket, err = tx.RwCursor(kv.BlockBody) - if err != nil { - return err - } - - innerErr = bucket.Put(dbutils.BlockBodyKey(2, common.Hash{2}), []byte{22}) - if innerErr != nil { - return innerErr - } - innerErr = bucket.Put(dbutils.BlockBodyKey(3, common.Hash{3}), []byte{33}) - if innerErr != nil { - return innerErr - } - - return nil - }) - if err != nil { - t.Fatal(err) - } - - db := NewSnapshotKV().DB(mainDB).HeadersSnapshot(sn1). - BodiesSnapshot(sn2).Open() - - tx, err := db.BeginRo(context.Background()) - if err != nil { - t.Fatal(err) - } - defer tx.Rollback() - - v, err := tx.GetOne(kv.Headers, dbutils.HeaderKey(1, common.Hash{1})) - if err != nil { - t.Fatal(err) - } - if !bytes.Equal(v, []byte{1}) { - t.Fatal(v) - } - - v, err = tx.GetOne(kv.Headers, dbutils.HeaderKey(2, common.Hash{2})) - if err != nil { - t.Fatal(err) - } - if !bytes.Equal(v, []byte{22}) { - t.Fatal(v) - } - - v, err = tx.GetOne(kv.Headers, dbutils.HeaderKey(3, common.Hash{3})) - if err != nil { - t.Fatal(err) - } - if !bytes.Equal(v, []byte{33}) { - t.Fatal(v) - } - - v, err = tx.GetOne(kv.BlockBody, dbutils.BlockBodyKey(1, common.Hash{1})) - if err != nil { - t.Fatal(err) - } - if !bytes.Equal(v, []byte{1}) { - t.Fatal(v) - } - - v, err = tx.GetOne(kv.BlockBody, dbutils.BlockBodyKey(2, common.Hash{2})) - if err != nil { - t.Fatal(err) - } - if !bytes.Equal(v, []byte{22}) { - t.Fatal(v) - } - - v, err = tx.GetOne(kv.BlockBody, dbutils.BlockBodyKey(3, common.Hash{3})) - if err != nil { - t.Fatal(err) - } - if !bytes.Equal(v, []byte{33}) { - t.Fatal(v) - } - - headerCursor, err := tx.Cursor(kv.Headers) - require.NoError(t, err) - k, v, err := headerCursor.Last() - require.NoError(t, err) - if !(bytes.Equal(dbutils.HeaderKey(3, common.Hash{3}), k) && bytes.Equal(v, []byte{33})) { - t.Fatal(k, v) - } - k, v, err = headerCursor.First() - require.NoError(t, err) - if !(bytes.Equal(dbutils.HeaderKey(1, common.Hash{1}), k) && bytes.Equal(v, []byte{1})) { - t.Fatal(k, v) - } - - k, v, err = headerCursor.Next() - require.NoError(t, err) - - if !(bytes.Equal(dbutils.HeaderKey(2, common.Hash{2}), k) && bytes.Equal(v, []byte{22})) { - t.Fatal(k, v) - } - - k, v, err = headerCursor.Next() - require.NoError(t, err) - - if !(bytes.Equal(dbutils.HeaderKey(3, common.Hash{3}), k) && bytes.Equal(v, []byte{33})) { - t.Fatal(k, v) - } - - k, v, err = headerCursor.Next() - require.NoError(t, err) - - if !(bytes.Equal([]byte{}, k) && bytes.Equal(v, []byte{})) { - t.Fatal(k, v) - } -} - -func TestSnapshot2WritableTxAndGet(t *testing.T) { - logger := log.New() - sn1 := mdbx.NewMDBX(logger).WithTablessCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { - return kv.TableCfg{ - kv.Headers: kv.TableCfgItem{}, - } - }).InMem().MustOpen() - defer sn1.Close() - - { - err := sn1.Update(context.Background(), func(tx kv.RwTx) error { - bucket, err := tx.RwCursor(kv.Headers) - require.NoError(t, err) - innerErr := bucket.Put(dbutils.HeaderKey(1, common.Hash{1}), []byte{1}) - if innerErr != nil { - return innerErr - } - innerErr = bucket.Put(dbutils.HeaderKey(2, common.Hash{2}), []byte{2}) - if innerErr != nil { - return innerErr - } - - return nil - }) - if err != nil { - t.Fatal(err) - } - } - - sn2 := mdbx.NewMDBX(logger).WithTablessCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { - return kv.TableCfg{ - kv.BlockBody: kv.TableCfgItem{}, - } - }).InMem().MustOpen() - defer sn2.Close() - { - err := sn2.Update(context.Background(), func(tx kv.RwTx) error { - bucket, err := tx.RwCursor(kv.BlockBody) - require.NoError(t, err) - innerErr := bucket.Put(dbutils.BlockBodyKey(1, common.Hash{1}), []byte{1}) - if innerErr != nil { - return innerErr - } - innerErr = bucket.Put(dbutils.BlockBodyKey(2, common.Hash{2}), []byte{2}) - if innerErr != nil { - return innerErr - } - - return nil - }) - require.NoError(t, err) - } - - mainDB := kv2.NewTestDB(t) - - db := NewSnapshotKV().DB(mainDB).HeadersSnapshot(sn1).BodiesSnapshot(sn2).Open() - { - tx, err := db.BeginRw(context.Background()) - require.NoError(t, err) - defer tx.Rollback() - - v, err := tx.GetOne(kv.Headers, dbutils.HeaderKey(1, common.Hash{1})) - require.NoError(t, err) - if !bytes.Equal(v, []byte{1}) { - t.Fatal(v) - } - - v, err = tx.GetOne(kv.BlockBody, dbutils.BlockBodyKey(1, common.Hash{1})) - require.NoError(t, err) - if !bytes.Equal(v, []byte{1}) { - t.Fatal(v) - } - - err = tx.Put(kv.BlockBody, dbutils.BlockBodyKey(4, common.Hash{4}), []byte{4}) - require.NoError(t, err) - err = tx.Put(kv.Headers, dbutils.HeaderKey(4, common.Hash{4}), []byte{4}) - require.NoError(t, err) - err = tx.Commit() - require.NoError(t, err) - } - tx, err := db.BeginRo(context.Background()) - require.NoError(t, err) - defer tx.Rollback() - c, err := tx.Cursor(kv.Headers) - require.NoError(t, err) - k, v, err := c.First() - require.NoError(t, err) - if !bytes.Equal(k, dbutils.HeaderKey(1, common.Hash{1})) { - t.Fatal(k, v) - } - - k, v, err = c.Next() - require.NoError(t, err) - if !bytes.Equal(k, dbutils.HeaderKey(2, common.Hash{2})) { - t.Fatal(common.Bytes2Hex(k)) - } - if !bytes.Equal(v, []byte{2}) { - t.Fatal(common.Bytes2Hex(k)) - } - - k, v, err = c.Next() - require.NoError(t, err) - if !bytes.Equal(k, dbutils.HeaderKey(4, common.Hash{4})) { - t.Fatal("invalid key", common.Bytes2Hex(k)) - } - if !bytes.Equal(v, []byte{4}) { - t.Fatal(common.Bytes2Hex(k), common.Bytes2Hex(v)) - } - - k, v, err = c.Next() - if k != nil || v != nil || err != nil { - t.Fatal(k, v, err) - } - - c, err = tx.Cursor(kv.BlockBody) - require.NoError(t, err) - k, v, err = c.First() - require.NoError(t, err) - if !bytes.Equal(k, dbutils.BlockBodyKey(1, common.Hash{1})) { - t.Fatal(k, v) - } - - k, v, err = c.Next() - require.NoError(t, err) - if !bytes.Equal(k, dbutils.BlockBodyKey(2, common.Hash{2})) { - t.Fatal() - } - if !bytes.Equal(v, []byte{2}) { - t.Fatal(common.Bytes2Hex(k), common.Bytes2Hex(v)) - } - - k, v, err = c.Next() - require.NoError(t, err) - if !bytes.Equal(k, dbutils.BlockBodyKey(4, common.Hash{4})) { - t.Fatal() - } - if !bytes.Equal(v, []byte{4}) { - t.Fatal(common.Bytes2Hex(k), common.Bytes2Hex(v)) - } - - k, v, err = c.Next() - if k != nil || v != nil || err != nil { - t.Fatal(k, v, err) - } -} - -func TestSnapshot2WritableTxWalkReplaceAndCreateNewKey(t *testing.T) { - data := []KvData{} - for i := 1; i < 3; i++ { - for j := 1; j < 3; j++ { - data = append(data, KvData{ - K: dbutils.PlainGenerateCompositeStorageKey([]byte{uint8(i) * 2}, 1, []byte{uint8(j) * 2}), - V: []byte{uint8(i) * 2, uint8(j) * 2}, - }) - } - } - snapshotDB, err := GenStateData(data) - if err != nil { - t.Fatal(err) - } - mainDB := kv2.NewTestDB(t) - - db := NewSnapshotKV().DB(mainDB).StateSnapshot(snapshotDB). - Open() - defer db.Close() - - tx, err := db.BeginRw(context.Background()) - if err != nil { - t.Fatal(err) - } - defer tx.Rollback() - - c, err := tx.RwCursor(kv.PlainState) - require.NoError(t, err) - replaceKey := dbutils.PlainGenerateCompositeStorageKey([]byte{2}, 1, []byte{4}) - replaceValue := []byte{2, 4, 4} - newKey := dbutils.PlainGenerateCompositeStorageKey([]byte{2}, 1, []byte{5}) - newValue := []byte{2, 5} - - //get first correct k&v - k, v, err := c.First() - if err != nil { - t.Fatal(err) - } - checkKV(t, k, v, data[0].K, data[0].V) - if !(bytes.Equal(k, data[0].K) || bytes.Equal(v, data[0].V)) { - t.Fatal(k, data[0].K, v, data[0].V) - } - err = c.Put(replaceKey, replaceValue) - if err != nil { - t.Fatal(err) - } - - // check the key that we've replaced value - k, v, err = c.Next() - if err != nil { - t.Fatal(err) - } - checkKV(t, k, v, replaceKey, replaceValue) - - err = c.Put(newKey, newValue) - if err != nil { - t.Fatal(err) - } - // check the key that we've inserted - k, v, err = c.Next() - if err != nil { - t.Fatal(err) - } - checkKV(t, k, v, newKey, newValue) - - //check the rest keys - k, v, err = c.Next() - if err != nil { - t.Fatal(err) - } - checkKV(t, k, v, data[2].K, data[2].V) -} - -func TestSnapshot2WritableTxWalkAndDeleteKey(t *testing.T) { - data := []KvData{ - {K: []byte{1}, V: []byte{1}}, - {K: []byte{2}, V: []byte{2}}, - {K: []byte{3}, V: []byte{3}}, - {K: []byte{4}, V: []byte{4}}, - {K: []byte{5}, V: []byte{5}}, - } - snapshotDB, err := GenStateData(data) - if err != nil { - t.Fatal(err) - } - - mainDB := kv2.NewTestDB(t) - db := NewSnapshotKV().DB(mainDB).StateSnapshot(snapshotDB). - Open() - - tx, err := db.BeginRw(context.Background()) - if err != nil { - t.Fatal(err) - } - defer tx.Rollback() - - c, err := tx.Cursor(kv.PlainState) - require.NoError(t, err) - deleteCursor, err := tx.RwCursor(kv.PlainState) - require.NoError(t, err) - - //get first correct k&v - k, v, err := c.First() - if err != nil { - t.Fatal(err) - } - checkKV(t, k, v, data[0].K, data[0].V) - - //remove value - err = deleteCursor.Delete(data[1].K, nil) - if err != nil { - t.Fatal(err) - } - err = deleteCursor.Delete(data[2].K, nil) - if err != nil { - t.Fatal(err) - } - err = deleteCursor.Delete(data[4].K, nil) - if err != nil { - t.Fatal(err) - } - - // check the key that we've replaced value - k, v, err = c.Next() - if err != nil { - t.Fatal(err) - } - checkKV(t, k, v, data[3].K, data[3].V) - - k, v, err = c.Next() - if err != nil { - t.Fatal(err) - } - checkKV(t, k, v, nil, nil) - - //2,3,5 removed. Current 4. Prev - - k, v, err = c.Prev() - if err != nil { - t.Fatal(err) - } - checkKV(t, k, v, data[0].K, data[0].V) - - k, v, err = c.Prev() - if err != nil { - t.Fatal(err) - } - checkKV(t, k, v, nil, nil) -} - -func TestSnapshot2WritableTxNextAndPrevAndDeleteKey(t *testing.T) { - data := []KvData{ - {K: []byte{1}, V: []byte{1}}, //to remove - {K: []byte{2}, V: []byte{2}}, - {K: []byte{3}, V: []byte{3}}, - {K: []byte{4}, V: []byte{4}}, //to remove - {K: []byte{5}, V: []byte{5}}, - } - snapshotDB, err := GenStateData(data) - if err != nil { - t.Fatal(err) - } - - mainDB := kv2.NewTestDB(t) - db := NewSnapshotKV().DB(mainDB).StateSnapshot(snapshotDB). - Open() - - tx, err := db.BeginRw(context.Background()) - if err != nil { - t.Fatal(err) - } - defer tx.Rollback() - - c, err := tx.Cursor(kv.PlainState) - require.NoError(t, err) - deleteCursor, err := tx.RwCursor(kv.PlainState) - require.NoError(t, err) - - //get first correct k&v - k, v, err := c.Last() - if err != nil { - t.Fatal(err) - } - checkKV(t, k, v, data[len(data)-1].K, data[len(data)-1].V) - - for i := len(data) - 2; i >= 0; i-- { - k, v, err = c.Prev() - if err != nil { - t.Fatal(i, err) - } - checkKV(t, k, v, data[i].K, data[i].V) - - k, v, err = c.Current() - if err != nil { - t.Fatal(i, err) - } - checkKV(t, k, v, data[i].K, data[i].V) - } - - k, v, err = c.Last() - if err != nil { - t.Fatal(err) - } - checkKV(t, k, v, data[4].K, data[4].V) - - //remove 4. Current on 5 - err = deleteCursor.Delete(data[3].K, nil) - if err != nil { - t.Fatal(err) - } - - //cursor on 3 after it - k, v, err = c.Prev() - if err != nil { - t.Fatal(err) - } - checkKV(t, k, v, data[2].K, data[2].V) - - err = deleteCursor.Delete(data[0].K, nil) - if err != nil { - t.Fatal(err) - } - - k, v, err = c.Prev() - if err != nil { - t.Fatal(err) - } - checkKV(t, k, v, data[1].K, data[1].V) - - k, v, err = c.Prev() - if err != nil { - t.Fatal(err) - } - checkKV(t, k, v, nil, nil) - -} -func TestSnapshot2WritableTxWalkLastElementIsSnapshot(t *testing.T) { - snapshotData := []KvData{ - { - K: []byte{0, 1}, - V: []byte{1}, - }, - { - K: []byte{0, 4}, - V: []byte{4}, - }, - } - replacedValue := []byte{1, 1} - mainData := []KvData{ - { - K: []byte{0, 1}, - V: replacedValue, - }, - { - K: []byte{0, 2}, - V: []byte{2}, - }, - { - K: []byte{0, 3}, - V: []byte{3}, - }, - } - snapshotDB, err := GenStateData(snapshotData) - if err != nil { - t.Fatal(err) - } - mainDB, err := GenStateData(mainData) - if err != nil { - t.Fatal(err) - } - - db := NewSnapshotKV().DB(mainDB).StateSnapshot(snapshotDB). - Open() - - tx, err := db.BeginRw(context.Background()) - if err != nil { - t.Fatal(err) - } - defer tx.Rollback() - - c, err := tx.Cursor(kv.PlainState) - require.NoError(t, err) - //get first correct k&v - k, v, err := c.First() - if err != nil { - t.Fatal(err) - } - - checkKV(t, k, v, mainData[0].K, mainData[0].V) - - k, v, err = c.Next() - if err != nil { - t.Fatal(err) - } - checkKV(t, k, v, mainData[1].K, mainData[1].V) - - k, v, err = c.Next() - if err != nil { - t.Fatal(err) - } - checkKV(t, k, v, mainData[2].K, mainData[2].V) - - k, v, err = c.Next() - if err != nil { - t.Fatal(err) - } - checkKV(t, k, v, snapshotData[1].K, snapshotData[1].V) - - k, v, err = c.Next() - if err != nil { - t.Fatal(err) - } - checkKV(t, k, v, nil, nil) -} - -func TestSnapshot2WritableTxWalkForwardAndBackward(t *testing.T) { - snapshotData := []KvData{ - { - K: []byte{0, 1}, - V: []byte{1}, - }, - { - K: []byte{0, 4}, - V: []byte{4}, - }, - } - replacedValue := []byte{1, 1} - mainData := []KvData{ - { - K: []byte{0, 1}, - V: replacedValue, - }, - { - K: []byte{0, 2}, - V: []byte{2}, - }, - { - K: []byte{0, 3}, - V: []byte{3}, - }, - } - data := []KvData{ - mainData[0], - mainData[1], - mainData[2], - snapshotData[1], - } - snapshotDB, err := GenStateData(snapshotData) - if err != nil { - t.Fatal(err) - } - mainDB, err := GenStateData(mainData) - if err != nil { - t.Fatal(err) - } - - db := NewSnapshotKV().DB(mainDB).StateSnapshot(snapshotDB). - Open() - - tx, err := db.BeginRw(context.Background()) - if err != nil { - t.Fatal(err) - } - defer tx.Rollback() - - c, err := tx.Cursor(kv.PlainState) - require.NoError(t, err) - //get first correct k&v - k, v, err := c.First() - if err != nil { - t.Fatal(err) - } - checkKV(t, k, v, data[0].K, data[0].V) - - for i := 1; i < len(data); i++ { - k, v, err = c.Next() - if err != nil { - t.Fatal(err) - } - checkKV(t, k, v, data[i].K, data[i].V) - - k, v, err = c.Current() - if err != nil { - t.Fatal(err) - } - checkKV(t, k, v, data[i].K, data[i].V) - } - - for i := len(data) - 2; i > 0; i-- { - k, v, err = c.Prev() - if err != nil { - t.Fatal(err) - } - checkKV(t, k, v, data[i].K, data[i].V) - - k, v, err = c.Current() - if err != nil { - t.Fatal(err) - } - checkKV(t, k, v, data[i].K, data[i].V) - } - - k, v, err = c.Last() - if err != nil { - t.Fatal(err) - } - checkKV(t, k, v, data[len(data)-1].K, data[len(data)-1].V) - k, v, err = c.Current() - if err != nil { - t.Fatal(err) - } - - checkKV(t, k, v, data[len(data)-1].K, data[len(data)-1].V) - - for i := len(data) - 2; i > 0; i-- { - k, v, err = c.Prev() - if err != nil { - t.Fatal(err) - } - checkKV(t, k, v, data[i].K, data[i].V) - - k, v, err = c.Current() - if err != nil { - t.Fatal(err) - } - checkKV(t, k, v, data[i].K, data[i].V) - } - - i := 0 - err = ethdb.Walk(c, []byte{}, 0, func(k, v []byte) (bool, error) { - checkKV(t, k, v, data[i].K, data[i].V) - i++ - return true, nil - }) - if err != nil { - t.Fatal(err) - } -} - -func TestSnapshot2WalkByEmptyDB(t *testing.T) { - data := []KvData{ - {K: []byte{1}, V: []byte{1}}, - {K: []byte{2}, V: []byte{2}}, - {K: []byte{3}, V: []byte{3}}, - {K: []byte{4}, V: []byte{4}}, - {K: []byte{5}, V: []byte{5}}, - } - snapshotDB, err := GenStateData(data) - if err != nil { - t.Fatal(err) - } - - mainDB := kv2.NewTestDB(t) - db := NewSnapshotKV().DB(mainDB).StateSnapshot(snapshotDB). - Open() - - tx, err := db.BeginRw(context.Background()) - if err != nil { - t.Fatal(err) - } - defer tx.Rollback() - - c, err := tx.Cursor(kv.PlainState) - require.NoError(t, err) - - i := 0 - err = ethdb.Walk(c, []byte{}, 0, func(k, v []byte) (bool, error) { - checkKV(t, k, v, data[i].K, data[i].V) - i++ - return true, nil - }) - if err != nil { - t.Fatal(err) - } - -} - -func TestSnapshot2WritablePrevAndDeleteKey(t *testing.T) { - data := []KvData{ - {K: []byte{1}, V: []byte{1}}, - {K: []byte{2}, V: []byte{2}}, - {K: []byte{3}, V: []byte{3}}, - {K: []byte{4}, V: []byte{4}}, - {K: []byte{5}, V: []byte{5}}, - } - snapshotDB, err := GenStateData(data) - if err != nil { - t.Fatal(err) - } - - mainDB := kv2.NewTestDB(t) - db := NewSnapshotKV().DB(mainDB).StateSnapshot(snapshotDB). - Open() - - tx, err := db.BeginRw(context.Background()) - require.NoError(t, err) - defer tx.Rollback() - c, err := tx.Cursor(kv.PlainState) - require.NoError(t, err) - - //get first correct k&v - k, v, err := c.First() - if err != nil { - printBucket(db, kv.PlainState) - t.Fatal(err) - } - checkKV(t, k, v, data[0].K, data[0].V) - - for i := 1; i < len(data); i++ { - k, v, err = c.Next() - require.NoError(t, err) - checkKV(t, k, v, data[i].K, data[i].V) - - k, v, err = c.Current() - require.NoError(t, err) - checkKV(t, k, v, data[i].K, data[i].V) - } - - // check the key that we've replaced value - k, v, err = c.Next() - if err != nil { - t.Fatal(err) - } - checkKV(t, k, v, nil, nil) - - for i := len(data) - 2; i >= 0; i-- { - k, v, err = c.Prev() - if err != nil { - t.Fatal(err) - } - checkKV(t, k, v, data[i].K, data[i].V) - - k, v, err = c.Current() - if err != nil { - t.Fatal(err) - } - checkKV(t, k, v, data[i].K, data[i].V) - } -} - -func TestSnapshot2WritableTxNextAndPrevWithDeleteAndPutKeys(t *testing.T) { - data := []KvData{ - {K: []byte{1}, V: []byte{1}}, - {K: []byte{2}, V: []byte{2}}, - {K: []byte{3}, V: []byte{3}}, - {K: []byte{4}, V: []byte{4}}, - {K: []byte{5}, V: []byte{5}}, - } - snapshotDB, err := GenStateData(data) - if err != nil { - t.Fatal(err) - } - - mainDB := kv2.NewTestDB(t) - db := NewSnapshotKV().DB(mainDB).StateSnapshot(snapshotDB). - Open() - - tx, err := db.BeginRw(context.Background()) - require.NoError(t, err) - defer tx.Rollback() - c, err := tx.Cursor(kv.PlainState) - require.NoError(t, err) - deleteCursor, err := tx.RwCursor(kv.PlainState) - require.NoError(t, err) - - //get first correct k&v - k, v, err := c.First() - if err != nil { - t.Fatal(err) - } - checkKV(t, k, v, data[0].K, data[0].V) - - k, v, err = c.Next() - if err != nil { - t.Fatal(err) - } - checkKV(t, k, v, data[1].K, data[1].V) - - err = deleteCursor.Delete(data[2].K, nil) - if err != nil { - t.Fatal(err) - } - - k, v, err = c.Next() - if err != nil { - t.Fatal(err) - } - checkKV(t, k, v, data[3].K, data[3].V) - - k, v, err = c.Prev() - if err != nil { - t.Fatal(err) - } - checkKV(t, k, v, data[1].K, data[1].V) - - err = deleteCursor.Put(data[2].K, data[2].V) - if err != nil { - t.Fatal(err) - } - - k, v, err = c.Next() - if err != nil { - t.Fatal(err) - } - checkKV(t, k, v, data[2].K, data[2].V) - - k, v, err = c.Next() - if err != nil { - t.Fatal(err) - } - checkKV(t, k, v, data[3].K, data[3].V) - - err = deleteCursor.Delete(data[2].K, nil) - if err != nil { - t.Fatal(err) - } - - k, v, err = c.Prev() - if err != nil { - t.Fatal(err) - } - checkKV(t, k, v, data[1].K, data[1].V) - - k, v, err = c.Prev() - if err != nil { - t.Fatal(err) - } - checkKV(t, k, v, data[0].K, data[0].V) - -} - -func TestSnapshotUpdateSnapshot(t *testing.T) { - data := []KvData{ - {K: []byte{1}, V: []byte{1}}, - {K: []byte{2}, V: []byte{2}}, - {K: []byte{3}, V: []byte{3}}, - {K: []byte{4}, V: []byte{4}}, - {K: []byte{5}, V: []byte{5}}, - } - snapshotDB, err := GenStateData(data) - if err != nil { - t.Fatal(err) - } - - data2 := append(data, []KvData{ - {K: []byte{6}, V: []byte{6}}, - {K: []byte{7}, V: []byte{7}}, - }...) - snapshotDB2, err := GenStateData(data2) - if err != nil { - t.Fatal(err) - } - - mainDB := kv2.NewTestDB(t) - db := NewSnapshotKV().DB(mainDB).StateSnapshot(snapshotDB). - Open() - - tx, err := db.BeginRo(context.Background()) - if err != nil { - t.Fatal(err) - } - defer tx.Rollback() - c, err := tx.Cursor(kv.PlainState) - if err != nil { - t.Fatal(err) - } - - k, v, err := c.First() - if err != nil { - t.Fatal(err) - } - checkKVErr(t, k, v, err, []byte{1}, []byte{1}) - - done := make(chan struct{}) - db.UpdateSnapshots("state", snapshotDB2, done) - - tx2, err := db.BeginRo(context.Background()) - if err != nil { - t.Fatal(err) - } - defer tx2.Rollback() - - c2, err := tx2.Cursor(kv.PlainState) - if err != nil { - t.Fatal(err) - } - - k2, v2, err2 := c2.First() - if err2 != nil { - t.Fatal(err2) - } - checkKVErr(t, k2, v2, err2, []byte{1}, []byte{1}) - - i := 2 - for { - k, v, err = c.Next() - if err != nil { - t.Fatal(err) - } - if k == nil { - break - } - checkKVErr(t, k, v, err, []byte{uint8(i)}, []byte{uint8(i)}) - i++ - } - //data[maxK]+1 - if i != 6 { - t.Fatal("incorrect last key", i) - } - tx.Rollback() - - i = 2 - for { - k2, v2, err2 = c2.Next() - if err2 != nil { - t.Fatal(err2) - } - if k2 == nil { - break - } - checkKVErr(t, k2, v2, err2, []byte{uint8(i)}, []byte{uint8(i)}) - i++ - } - //data2[maxK]+1 - if i != 8 { - t.Fatal("incorrect last key", i) - } - - //a short delay to close - time.Sleep(time.Second) - select { - case <-done: - default: - t.Fatal("Hasn't closed database") - - } -} -func TestPlainStateProxy(t *testing.T) { - snapshotData := []KvData{ - {K: []byte{1}, V: []byte{1}}, - {K: []byte{2}, V: []byte{2}}, - } - - writeDBData := []KvData{ - {K: []byte{3}, V: []byte{3}}, - } - - tmpDBData := []KvData{ - {K: []byte{4}, V: []byte{4}}, - } - - snapshotDB, err := GenStateData(snapshotData) - if err != nil { - t.Fatal(err) - } - - mainDB := kv2.NewTestDB(t) - db := NewSnapshotKV().DB(mainDB).StateSnapshot(snapshotDB). - Open() - err = db.Update(context.Background(), func(tx kv.RwTx) error { - c, err := tx.RwCursor(kv.PlainState) - if err != nil { - return err - } - for i := range writeDBData { - innerErr := c.Put(writeDBData[i].K, writeDBData[i].V) - if innerErr != nil { - return innerErr - } - } - return nil - }) - if err != nil { - t.Fatal(err) - } - - tmpDB := kv2.NewTestDB(t) - db.SetTempDB(tmpDB, []string{kv.PlainState}) - - nonStateKey := []byte{11} - nonStateValue := []byte{99} - - err = db.Update(context.Background(), func(tx kv.RwTx) error { - err = tx.Put(kv.BlockBody, nonStateKey, nonStateValue) - if err != nil { - return err - } - - c, err := tx.RwCursor(kv.PlainState) - if err != nil { - return err - } - for i := range tmpDBData { - innerErr := c.Put(tmpDBData[i].K, tmpDBData[i].V) - if innerErr != nil { - return innerErr - } - } - return nil - - }) - if err != nil { - t.Fatal(err) - } - - fullStateResult := []KvData{} - err = db.View(context.Background(), func(tx kv.Tx) error { - v, err := tx.GetOne(kv.BlockBody, nonStateKey) - if err != nil { - t.Error(err) - } - if !bytes.Equal(v, nonStateValue) { - t.Error(v, nonStateValue) - } - - return tx.ForEach(kv.PlainState, []byte{}, func(k, v []byte) error { - fullStateResult = append(fullStateResult, KvData{ - K: k, - V: v, - }) - return nil - }) - }) - if err != nil { - t.Fatal(err) - } - fullStateExpected := append(append(snapshotData, writeDBData...), tmpDBData...) - require.Equal(t, fullStateExpected, fullStateResult) - - tmpDBResult := []KvData{} - err = db.tmpDB.View(context.Background(), func(tx kv.Tx) error { - v, err := tx.GetOne(kv.BlockBody, nonStateKey) - if err != nil { - t.Error(err) - } - if len(v) != 0 { - t.Error(v) - } - - return tx.ForEach(kv.PlainState, []byte{}, func(k, v []byte) error { - tmpDBResult = append(tmpDBResult, KvData{ - K: k, - V: v, - }) - return nil - }) - }) - if err != nil { - t.Fatal(err) - } - require.Equal(t, tmpDBData, tmpDBData) - - writeDBResult := []KvData{} - err = db.WriteDB().View(context.Background(), func(tx kv.Tx) error { - v, err := tx.GetOne(kv.BlockBody, nonStateKey) - if err != nil { - t.Error(err) - } - if !bytes.Equal(v, nonStateValue) { - t.Error(v, nonStateValue) - } - - return tx.ForEach(kv.PlainState, []byte{}, func(k, v []byte) error { - writeDBResult = append(writeDBResult, KvData{ - K: k, - V: v, - }) - - return nil - }) - }) - if err != nil { - t.Fatal(err) - } - - require.Equal(t, writeDBData, writeDBResult) - -} - -func printBucket(db kv.RoDB, bucket string) { - fmt.Println("+Print bucket", bucket) - defer func() { - fmt.Println("-Print bucket", bucket) - }() - err := db.View(context.Background(), func(tx kv.Tx) error { - c, err := tx.Cursor(bucket) - if err != nil { - return err - } - k, v, err := c.First() - if err != nil { - panic(fmt.Errorf("first err: %w", err)) - } - for k != nil && v != nil { - fmt.Println("k:=", common.Bytes2Hex(k), "v:=", common.Bytes2Hex(v)) - k, v, err = c.Next() - if err != nil { - panic(fmt.Errorf("next err: %w", err)) - } - } - return nil - }) - fmt.Println("Print err", err) -} - -func checkKVErr(t *testing.T, k, v []byte, err error, expectedK, expectedV []byte) { - t.Helper() - if err != nil { - t.Fatal(err) - } - if !bytes.Equal(k, expectedK) { - t.Error("k!= expected", k, expectedK) - } - if !bytes.Equal(v, expectedV) { - t.Error("v!= expected", v, expectedV) - } -} - -func checkKV(t *testing.T, key, val, expectedKey, expectedVal []byte) { - t.Helper() - if !bytes.Equal(key, expectedKey) { - t.Log("+", common.Bytes2Hex(expectedKey)) - t.Log("-", common.Bytes2Hex(key)) - t.Fatal("wrong key") - } - if !bytes.Equal(val, expectedVal) { - t.Log("+", common.Bytes2Hex(expectedVal)) - t.Log("-", common.Bytes2Hex(val)) - t.Fatal("wrong value for key", common.Bytes2Hex(key)) - } -} - -type KvData struct { - K []byte - V []byte -} - -func GenStateData(data []KvData) (kv.RwDB, error) { - snapshot := mdbx.NewMDBX(log.New()).WithTablessCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { - return kv.TableCfg{ - kv.PlainState: kv.TableCfgItem{}, - } - }).InMem().MustOpen() - - err := snapshot.Update(context.Background(), func(tx kv.RwTx) error { - c, err := tx.RwCursor(kv.PlainState) - if err != nil { - return err - } - for i := range data { - innerErr := c.Put(data[i].K, data[i].V) - if innerErr != nil { - return innerErr - } - } - return nil - }) - if err != nil { - return nil, err - } - return snapshot, nil -} diff --git a/go.mod b/go.mod index a6ce46b3b..6d166d4ea 100644 --- a/go.mod +++ b/go.mod @@ -40,7 +40,7 @@ require ( github.com/json-iterator/go v1.1.12 github.com/julienschmidt/httprouter v1.3.0 github.com/kevinburke/go-bindata v3.21.0+incompatible - github.com/ledgerwatch/erigon-lib v0.0.0-20220310121515-3123b6d895c5 + github.com/ledgerwatch/erigon-lib v0.0.0-20220312093458-c1f1365f9224 github.com/ledgerwatch/log/v3 v3.4.1 github.com/ledgerwatch/secp256k1 v1.0.0 github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect diff --git a/go.sum b/go.sum index edc6d5fce..42d4700b8 100644 --- a/go.sum +++ b/go.sum @@ -641,8 +641,8 @@ github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758 h1:0D5M2HQSGD3P github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758/go.mod h1:B69LEHPfb2qLo0BaaOLcbitczOKLWTsrBG9LczfCD4k= github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c= github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8= -github.com/ledgerwatch/erigon-lib v0.0.0-20220310121515-3123b6d895c5 h1:f81JYvbRxP0T//AG+wmfZNxiwz2mDPK1cVJuhwcAiYc= -github.com/ledgerwatch/erigon-lib v0.0.0-20220310121515-3123b6d895c5/go.mod h1:mag5WaGOUTVOLvFkT4wpjR5YHMmm4hynWJ3YfQ44Elg= +github.com/ledgerwatch/erigon-lib v0.0.0-20220312093458-c1f1365f9224 h1:GqWMSF+pwVuW7cAosXMOzH/h3sIrEQ3TM4Ivwdl+D4M= +github.com/ledgerwatch/erigon-lib v0.0.0-20220312093458-c1f1365f9224/go.mod h1:mag5WaGOUTVOLvFkT4wpjR5YHMmm4hynWJ3YfQ44Elg= github.com/ledgerwatch/log/v3 v3.4.1 h1:/xGwlVulXnsO9Uq+tzaExc8OWmXXHU0dnLalpbnY5Bc= github.com/ledgerwatch/log/v3 v3.4.1/go.mod h1:VXcz6Ssn6XEeU92dCMc39/g1F0OYAjw1Mt+dGP5DjXY= github.com/ledgerwatch/secp256k1 v1.0.0 h1:Usvz87YoTG0uePIV8woOof5cQnLXGYa162rFf3YnwaQ= diff --git a/turbo/app/snapshots.go b/turbo/app/snapshots.go index 56eae4283..530d822b8 100644 --- a/turbo/app/snapshots.go +++ b/turbo/app/snapshots.go @@ -72,6 +72,7 @@ var snapshotCommand = cli.Command{ utils.DataDirFlag, SnapshotFromFlag, SnapshotToFlag, + SnapshotEveryFlag, }, debug.Flags...), }, }, @@ -88,6 +89,11 @@ var ( Usage: "To block number. Zero - means unlimited.", Value: 0, } + SnapshotEveryFlag = cli.Uint64Flag{ + Name: "every", + Usage: "Do operation every N blocks", + Value: 1_000, + } SnapshotSegmentSizeFlag = cli.Uint64Flag{ Name: "segment.size", Usage: "Amount of blocks in each segment", @@ -135,8 +141,9 @@ func doRetireCommand(cliCtx *cli.Context) error { tmpDir := filepath.Join(datadir, etl.TmpDirName) from := cliCtx.Uint64(SnapshotFromFlag.Name) to := cliCtx.Uint64(SnapshotToFlag.Name) + every := cliCtx.Uint64(SnapshotEveryFlag.Name) - chainDB := mdbx.NewMDBX(log.New()).Path(path.Join(datadir, "chaindata")).Readonly().MustOpen() + chainDB := mdbx.NewMDBX(log.New()).Label(kv.ChainDB).Path(path.Join(datadir, "chaindata")).Readonly().MustOpen() defer chainDB.Close() cfg := ethconfig.NewSnapshotCfg(true, true) @@ -148,10 +155,13 @@ func doRetireCommand(cliCtx *cli.Context) error { chainConfig := tool.ChainConfigFromDB(chainDB) chainID, _ := uint256.FromBig(chainConfig.ChainID) snapshots := snapshotsync.NewRoSnapshots(cfg, snapshotDir) + snapshots.ReopenSegments() - if err := snapshotsync.RetireBlocks(ctx, from, to, *chainID, tmpDir, snapshots, chainDB, 1, log.LvlInfo); err != nil { - panic(err) - //return err + for i := from; i < to; i += every { + if err := snapshotsync.RetireBlocks(ctx, i, i+every, *chainID, tmpDir, snapshots, chainDB, runtime.NumCPU()/2, log.LvlInfo); err != nil { + panic(err) + //return err + } } return nil } @@ -171,7 +181,7 @@ func doSnapshotCommand(cliCtx *cli.Context) error { tmpDir := filepath.Join(datadir, etl.TmpDirName) dir.MustExist(tmpDir) - chainDB := mdbx.NewMDBX(log.New()).Path(filepath.Join(datadir, "chaindata")).Readonly().MustOpen() + chainDB := mdbx.NewMDBX(log.New()).Label(kv.ChainDB).Path(filepath.Join(datadir, "chaindata")).Readonly().MustOpen() defer chainDB.Close() if err := snapshotBlocks(ctx, chainDB, fromBlock, toBlock, segmentSize, snapshotDir, tmpDir); err != nil { diff --git a/turbo/snapshotsync/block_snapshots.go b/turbo/snapshotsync/block_snapshots.go index a41bc1a02..2c9d35fba 100644 --- a/turbo/snapshotsync/block_snapshots.go +++ b/turbo/snapshotsync/block_snapshots.go @@ -666,7 +666,7 @@ func min(a, b uint64) uint64 { } func RetireBlocks(ctx context.Context, blockFrom, blockTo uint64, chainID uint256.Int, tmpDir string, snapshots *RoSnapshots, db kv.RoDB, workers int, lvl log.Lvl) error { - log.Log(lvl, "[snapshots] Retire Blocks", "from", blockFrom, "to", blockTo) + log.Log(lvl, "[snapshots] Retire Blocks", "range", fmt.Sprintf("%dk-%dk", blockFrom/1000, blockTo/1000)) // in future we will do it in background if err := DumpBlocks(ctx, blockFrom, blockTo, DEFAULT_SEGMENT_SIZE, tmpDir, snapshots.Dir(), db, workers, lvl); err != nil { return fmt.Errorf("DumpBlocks: %w", err) @@ -679,7 +679,6 @@ func RetireBlocks(ctx context.Context, blockFrom, blockTo uint64, chainID uint25 if len(ranges) == 0 { return nil } - if err := merger.Merge(ctx, snapshots, ranges, &dir.Rw{Path: snapshots.Dir()}); err != nil { return err } @@ -878,7 +877,8 @@ func DumpTxs(ctx context.Context, db kv.RoDB, segmentFile, tmpDir string, blockF } _, fileName := filepath.Split(segmentFile) - log.Log(lvl, "[snapshots] Compression", "ratio", f.Ratio.String(), "file", fileName) + ext := filepath.Ext(fileName) + log.Log(lvl, "[snapshots] Compression", "ratio", f.Ratio.String(), "file", fileName[:len(fileName)-len(ext)]) return firstTxID, nil } @@ -1427,6 +1427,8 @@ type mergeRange struct { from, to uint64 } +func (r mergeRange) String() string { return fmt.Sprintf("%dk-%dk", r.from/1000, r.to/1000) } + func (*Merger) FindMergeRanges(snapshots *RoSnapshots) (res []mergeRange) { for i := len(snapshots.blocks) - 1; i > 0; i-- { sn := snapshots.blocks[i] @@ -1469,6 +1471,7 @@ func (m *Merger) filesByRange(snapshots *RoSnapshots, from, to uint64) (toMergeH } func (m *Merger) Merge(ctx context.Context, snapshots *RoSnapshots, mergeRanges []mergeRange, snapshotDir *dir.Rw) error { + log.Log(m.lvl, "[snapshots] Merge segments", "ranges", fmt.Sprintf("%v", mergeRanges)) for _, r := range mergeRanges { toMergeHeaders, toMergeBodies, toMergeTxs := m.filesByRange(snapshots, r.from, r.to) if err := m.merge(ctx, toMergeBodies, filepath.Join(snapshotDir.Path, SegmentFileName(r.from, r.to, Bodies))); err != nil { @@ -1497,8 +1500,6 @@ func (m *Merger) merge(ctx context.Context, toMerge []string, targetFile string) _, fName := filepath.Split(f) fileNames[i] = fName } - _, fName := filepath.Split(targetFile) - log.Log(m.lvl, "[snapshots] Merging", "files", fileNames, "to", fName) f, err := compress.NewCompressor(ctx, "merge", targetFile, m.tmpDir, compress.MinPatternScore, m.workers) if err != nil { return err @@ -1518,6 +1519,11 @@ func (m *Merger) merge(ctx context.Context, toMerge []string, targetFile string) if err := f.AddWord(word); err != nil { return err } + select { + case <-ctx.Done(): + return ctx.Err() + default: + } } return nil }); err != nil { diff --git a/turbo/snapshotsync/wrapdb.go b/turbo/snapshotsync/wrapdb.go deleted file mode 100644 index 21eadf0b2..000000000 --- a/turbo/snapshotsync/wrapdb.go +++ /dev/null @@ -1,60 +0,0 @@ -package snapshotsync - -/* -import ( - "github.com/ledgerwatch/erigon-lib/gointerfaces/snapshotsync" - "github.com/ledgerwatch/erigon-lib/kv" - kv2 "github.com/ledgerwatch/erigon-lib/kv/mdbx" - "github.com/ledgerwatch/erigon/ethdb/snapshotdb" - "github.com/ledgerwatch/log/v3" -) - -var ( - BucketConfigs = map[snapshotsync.Type]kv.TableCfg{ - snapshotsync.SnapshotType_bodies: { - kv.BlockBody: kv.TableCfgItem{}, - kv.EthTx: kv.TableCfgItem{}, - }, - snapshotsync.SnapshotType_headers: { - kv.Headers: kv.TableCfgItem{}, - }, - snapshotsync.SnapshotType_state: { - kv.PlainState: kv.TableCfgItem{ - Flags: kv.DupSort, - AutoDupSortKeysConversion: true, - DupFromLen: 60, - DupToLen: 28, - }, - kv.PlainContractCode: kv.TableCfgItem{}, - kv.Code: kv.TableCfgItem{}, - }, - } -) - -func WrapBySnapshotsFromDownloader(db kv.RwDB, snapshots map[snapshotsync.Type]*snapshotsync.SnapshotsInfo) (kv.RwDB, error) { - snKV := snapshotdb.NewSnapshotKV().DB(db) - for k, v := range snapshots { - log.Info("Wrap db by", "snapshot", k.String(), "dir", v.Dbpath) - chainSnapshotCfg := BucketConfigs[k] - snapshotKV, err := kv2.NewMDBX(log.New()).Readonly().Path(v.Dbpath).WithTablessCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { - return chainSnapshotCfg - }).Open() - - if err != nil { - log.Error("Can't open snapshot", "err", err) - return nil, err - } else { //nolint - switch k { - case snapshotsync.SnapshotType_headers: - snKV = snKV.HeadersSnapshot(snapshotKV) - case snapshotsync.SnapshotType_bodies: - snKV = snKV.BodiesSnapshot(snapshotKV) - case snapshotsync.SnapshotType_state: - snKV = snKV.StateSnapshot(snapshotKV) - } - } - } - - return snKV.Open(), nil -} -*/