From 31687be5990e919892bb7d77167c5c6c39c7c38e Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Wed, 24 May 2023 15:52:51 +0700 Subject: [PATCH] blockReaders in tests, step4 (#7570) --- accounts/abi/bind/backends/simulated.go | 15 ++++++-- cmd/erigon-el/backend/node.go | 2 +- cmd/erigon-el/stages/stages.go | 2 +- cmd/hack/hack.go | 19 ++++++++- cmd/hack/tool/fromdb/tool.go | 1 - cmd/hack/tool/tool.go | 5 +-- cmd/integration/commands/reset_state.go | 15 +++----- cmd/integration/commands/stages.go | 24 +++++++----- cmd/utils/cmd.go | 11 ------ eth/stagedsync/stage_bodies.go | 8 ++-- eth/stagedsync/stage_bodies_test.go | 14 +++---- migrations/migrations.go | 2 +- migrations/txs_begin_end.go | 51 ++++++++++++++----------- migrations/txs_begin_end_test.go | 43 +++++++++++++++++++-- node/node.go | 10 +++++ turbo/node/node.go | 2 +- turbo/stages/mock_sentry.go | 2 +- turbo/stages/stageloop.go | 4 +- 18 files changed, 148 insertions(+), 82 deletions(-) diff --git a/accounts/abi/bind/backends/simulated.go b/accounts/abi/bind/backends/simulated.go index bd97639ea..0a078a655 100644 --- a/accounts/abi/bind/backends/simulated.go +++ b/accounts/abi/bind/backends/simulated.go @@ -294,12 +294,21 @@ func (b *SimulatedBackend) TransactionByHash(ctx context.Context, txHash libcomm if blockNumber == nil { return nil, false, ethereum.NotFound } - txn, _, _, _, err = rawdb.ReadTransaction(tx, txHash, *blockNumber) + blockHash, err := rawdb.ReadCanonicalHash(tx, *blockNumber) if err != nil { return nil, false, err } - if txn != nil { - return txn, false, nil + body, err := b.BlockReader().BodyWithTransactions(ctx, tx, blockHash, *blockNumber) + if err != nil { + return nil, false, err + } + if body == nil { + return nil, false, ethereum.NotFound + } + for _, txn = range body.Transactions { + if txn.Hash() == txHash { + return txn, false, nil + } } return nil, false, ethereum.NotFound } diff --git a/cmd/erigon-el/backend/node.go b/cmd/erigon-el/backend/node.go index 9b2706319..74a065bc9 100644 --- a/cmd/erigon-el/backend/node.go +++ b/cmd/erigon-el/backend/node.go @@ -33,7 +33,7 @@ func (eri *ErigonNode) Serve() error { } func (eri *ErigonNode) run() { - utils.StartNode(eri.stack) + node.StartNode(eri.stack) // we don't have accounts locally and we don't do mining // so these parts are ignored // see cmd/geth/daemon.go#startNode for full implementation diff --git a/cmd/erigon-el/stages/stages.go b/cmd/erigon-el/stages/stages.go index 2476506f3..ad9c01007 100644 --- a/cmd/erigon-el/stages/stages.go +++ b/cmd/erigon-el/stages/stages.go @@ -100,7 +100,7 @@ func NewStagedSync(ctx context.Context, snapshots, blockReader, cfg.HistoryV3, - cfg.TransactionsV3, + blockWriter, ), stagedsync.StageSendersCfg(db, controlServer.ChainConfig, false, dirs.Tmp, cfg.Prune, blockRetire, blockWriter, controlServer.Hd), stagedsync.StageExecuteBlocksCfg( diff --git a/cmd/hack/hack.go b/cmd/hack/hack.go index bbb7847bf..ca58c9466 100644 --- a/cmd/hack/hack.go +++ b/cmd/hack/hack.go @@ -25,11 +25,14 @@ import ( "github.com/ledgerwatch/erigon-lib/common/length" "github.com/ledgerwatch/erigon-lib/compress" "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/erigon-lib/kv/kvcfg" "github.com/ledgerwatch/erigon-lib/kv/mdbx" "github.com/ledgerwatch/erigon-lib/kv/temporal/historyv2" "github.com/ledgerwatch/erigon-lib/recsplit" "github.com/ledgerwatch/erigon-lib/recsplit/eliasfano32" librlp "github.com/ledgerwatch/erigon-lib/rlp" + "github.com/ledgerwatch/erigon/core/rawdb/blockio" + "github.com/ledgerwatch/erigon/turbo/services" "github.com/ledgerwatch/log/v3" "golang.org/x/exp/slices" @@ -131,16 +134,30 @@ func printCurrentBlockNumber(chaindata string) { }) } +func blocksIO(db kv.RoDB) (services.FullBlockReader, *blockio.BlockWriter) { + var transactionsV3 bool + if err := db.View(context.Background(), func(tx kv.Tx) error { + transactionsV3, _ = kvcfg.TransactionsV3.Enabled(tx) + return nil + }); err != nil { + panic(err) + } + br := snapshotsync.NewBlockReader(snapshotsync.NewRoSnapshots(ethconfig.Snapshot{Enabled: false}, "", log.New()), transactionsV3) + bw := blockio.NewBlockWriter(transactionsV3) + return br, bw +} + func printTxHashes(chaindata string, block uint64) error { db := mdbx.MustOpen(chaindata) defer db.Close() + br, _ := blocksIO(db) if err := db.View(context.Background(), func(tx kv.Tx) error { for b := block; b < block+1; b++ { hash, e := rawdb.ReadCanonicalHash(tx, b) if e != nil { return e } - block := rawdb.ReadBlock(tx, hash, b) + block, _, _ := br.BlockWithSenders(context.Background(), tx, hash, b) if block == nil { break } diff --git a/cmd/hack/tool/fromdb/tool.go b/cmd/hack/tool/fromdb/tool.go index ad883c23b..d93833c21 100644 --- a/cmd/hack/tool/fromdb/tool.go +++ b/cmd/hack/tool/fromdb/tool.go @@ -5,7 +5,6 @@ import ( "github.com/ledgerwatch/erigon-lib/chain" "github.com/ledgerwatch/erigon-lib/kv" - "github.com/ledgerwatch/erigon/cmd/hack/tool" "github.com/ledgerwatch/erigon/ethdb/prune" ) diff --git a/cmd/hack/tool/tool.go b/cmd/hack/tool/tool.go index 9373350b7..dfd2f4d87 100644 --- a/cmd/hack/tool/tool.go +++ b/cmd/hack/tool/tool.go @@ -5,7 +5,6 @@ import ( "github.com/ledgerwatch/erigon-lib/chain" "github.com/ledgerwatch/erigon-lib/kv" - "github.com/ledgerwatch/erigon/core/rawdb" ) @@ -21,9 +20,9 @@ func ParseFloat64(str string) float64 { } func ChainConfig(tx kv.Tx) *chain.Config { - genesisBlock, err := rawdb.ReadBlockByNumber(tx, 0) + genesisBlockHash, err := rawdb.ReadCanonicalHash(tx, 0) Check(err) - chainConfig, err := rawdb.ReadChainConfig(tx, genesisBlock.Hash()) + chainConfig, err := rawdb.ReadChainConfig(tx, genesisBlockHash) Check(err) return chainConfig } diff --git a/cmd/integration/commands/reset_state.go b/cmd/integration/commands/reset_state.go index 2a372c04a..f88c617cd 100644 --- a/cmd/integration/commands/reset_state.go +++ b/cmd/integration/commands/reset_state.go @@ -107,21 +107,18 @@ func printStages(tx kv.Tx, snapshots *snapshotsync.RoSnapshots, agg *state.Aggre if err != nil { return err } + txs3, err := kvcfg.TransactionsV3.Enabled(tx) + if err != nil { + return err + } lastK, lastV, err := rawdbv3.Last(tx, kv.MaxTxNum) if err != nil { return err } _, lastBlockInHistSnap, _ := rawdbv3.TxNums.FindBlockNum(tx, agg.EndTxNumMinimax()) - fmt.Fprintf(w, "history.v3: %t, idx steps: %.02f, lastMaxTxNum=%d->%d, lastBlockInSnap=%d\n\n", h3, rawdbhelpers.IdxStepsCountV3(tx), u64or0(lastK), u64or0(lastV), lastBlockInHistSnap) - - transactionsV3, _ := kvcfg.TransactionsV3.Enabled(tx) - var s1 uint64 - if transactionsV3 { - s1, err = tx.ReadSequence(kv.EthTxV3) - } else { - s1, err = tx.ReadSequence(kv.EthTx) - } + fmt.Fprintf(w, "history.v3: %t, txs.v3: %t, idx steps: %.02f, lastMaxTxNum=%d->%d, lastBlockInSnap=%d\n\n", h3, txs3, rawdbhelpers.IdxStepsCountV3(tx), u64or0(lastK), u64or0(lastV), lastBlockInHistSnap) + s1, err := tx.ReadSequence(kv.EthTx) if err != nil { return err } diff --git a/cmd/integration/commands/stages.go b/cmd/integration/commands/stages.go index ad8f6aadb..3e3c00fb8 100644 --- a/cmd/integration/commands/stages.go +++ b/cmd/integration/commands/stages.go @@ -768,8 +768,9 @@ func stageBodies(db kv.RwDB, ctx context.Context, logger log.Logger) error { sn, agg := allSnapshots(ctx, db, logger) defer sn.Close() defer agg.Close() - chainConfig, historyV3, transactionsV3 := fromdb.ChainConfig(db), kvcfg.HistoryV3.FromDB(db), kvcfg.TransactionsV3.FromDB(db) + chainConfig, historyV3 := fromdb.ChainConfig(db), kvcfg.HistoryV3.FromDB(db) _, _, sync, _, _ := newSync(ctx, db, nil /* miningConfig */, logger) + br, bw := blocksIO(db, logger) if err := db.Update(ctx, func(tx kv.RwTx) error { s := stage(sync, tx, nil, stages.Bodies) @@ -780,8 +781,7 @@ func stageBodies(db kv.RwDB, ctx context.Context, logger log.Logger) error { } u := sync.NewUnwindState(stages.Bodies, s.BlockNumber-unwind, s.BlockNumber) - br, _ := blocksIO(db, logger) - cfg := stagedsync.StageBodiesCfg(db, nil, nil, nil, nil, 0, *chainConfig, sn, br, historyV3, transactionsV3) + cfg := stagedsync.StageBodiesCfg(db, nil, nil, nil, nil, 0, *chainConfig, sn, br, historyV3, bw) if err := stagedsync.UnwindBodiesStage(u, tx, cfg, ctx); err != nil { return err } @@ -811,6 +811,7 @@ func stageSenders(db kv.RwDB, ctx context.Context, logger log.Logger) error { must(sync.SetCurrentStage(stages.Senders)) + br, bw := blocksIO(db, logger) if reset { _, bw := blocksIO(db, logger) return db.Update(ctx, func(tx kv.RwTx) error { return reset2.ResetSenders(ctx, db, tx, bw) }) @@ -828,12 +829,16 @@ func stageSenders(db kv.RwDB, ctx context.Context, logger log.Logger) error { if err := common2.Stopped(ctx.Done()); err != nil { return err } - withoutSenders, _ := rawdb.ReadBlockByNumber(tx, i) - if withoutSenders == nil { + h, _ := br.HeaderByNumber(ctx, tx, i) + if h == nil { break } + withoutSenders, senders, err := br.BlockWithSenders(ctx, tx, h.Hash(), h.Number.Uint64()) + if err != nil { + return err + } + withoutSenders.Body().SendersFromTxs() //remove senders info from txs txs := withoutSenders.Transactions() - _, senders, _ := rawdb.CanonicalBlockByNumberWithSenders(tx, i) if txs.Len() != len(senders) { logger.Error("not equal amount of senders", "block", i, "db", len(senders), "expect", txs.Len()) return nil @@ -861,9 +866,9 @@ func stageSenders(db kv.RwDB, ctx context.Context, logger log.Logger) error { s := stage(sync, tx, nil, stages.Senders) logger.Info("Stage", "name", s.ID, "progress", s.BlockNumber) - var br *snapshotsync.BlockRetire + var blockRetire *snapshotsync.BlockRetire if sn.Cfg().Enabled { - br = snapshotsync.NewBlockRetire(estimate.CompressSnapshot.Workers(), tmpdir, sn, db, nil, nil, logger) + blockRetire = snapshotsync.NewBlockRetire(estimate.CompressSnapshot.Workers(), tmpdir, sn, db, nil, nil, logger) } pm, err := prune.Get(tx) @@ -871,8 +876,7 @@ func stageSenders(db kv.RwDB, ctx context.Context, logger log.Logger) error { return err } - _, bw := blocksIO(db, logger) - cfg := stagedsync.StageSendersCfg(db, chainConfig, false, tmpdir, pm, br, bw, nil) + cfg := stagedsync.StageSendersCfg(db, chainConfig, false, tmpdir, pm, blockRetire, bw, nil) if unwind > 0 { u := sync.NewUnwindState(stages.Senders, s.BlockNumber-unwind, s.BlockNumber) if err = stagedsync.UnwindSendersStage(u, tx, cfg, ctx); err != nil { diff --git a/cmd/utils/cmd.go b/cmd/utils/cmd.go index e9f544301..2bd555943 100644 --- a/cmd/utils/cmd.go +++ b/cmd/utils/cmd.go @@ -22,9 +22,6 @@ import ( "io" "os" "runtime" - - "github.com/ledgerwatch/erigon/node" - "github.com/ledgerwatch/erigon/turbo/debug" ) // Fatalf formats a message to standard error and exits the program. @@ -46,11 +43,3 @@ func Fatalf(format string, args ...interface{}) { fmt.Fprintf(w, "Fatal: "+format+"\n", args...) os.Exit(1) } - -func StartNode(stack *node.Node) { - if err := stack.Start(); err != nil { - Fatalf("Error starting protocol stack: %v", err) - } - - go debug.ListenSignals(stack) -} diff --git a/eth/stagedsync/stage_bodies.go b/eth/stagedsync/stage_bodies.go index 83a9d7cfe..6b3cf1648 100644 --- a/eth/stagedsync/stage_bodies.go +++ b/eth/stagedsync/stage_bodies.go @@ -11,6 +11,7 @@ import ( "github.com/ledgerwatch/erigon-lib/common/dbg" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv/rawdbv3" + "github.com/ledgerwatch/erigon/core/rawdb/blockio" "github.com/ledgerwatch/log/v3" "github.com/ledgerwatch/erigon/core/rawdb" @@ -35,12 +36,13 @@ type BodiesCfg struct { chanConfig chain.Config snapshots *snapshotsync.RoSnapshots blockReader services.FullBlockReader + blockWriter *blockio.BlockWriter historyV3 bool transactionsV3 bool } -func StageBodiesCfg(db kv.RwDB, bd *bodydownload.BodyDownload, bodyReqSend func(context.Context, *bodydownload.BodyRequest) ([64]byte, bool), penalise func(context.Context, []headerdownload.PenaltyItem), blockPropagator adapter.BlockPropagator, timeout int, chanConfig chain.Config, snapshots *snapshotsync.RoSnapshots, blockReader services.FullBlockReader, historyV3 bool, transactionsV3 bool) BodiesCfg { - return BodiesCfg{db: db, bd: bd, bodyReqSend: bodyReqSend, penalise: penalise, blockPropagator: blockPropagator, timeout: timeout, chanConfig: chanConfig, snapshots: snapshots, blockReader: blockReader, historyV3: historyV3, transactionsV3: transactionsV3} +func StageBodiesCfg(db kv.RwDB, bd *bodydownload.BodyDownload, bodyReqSend func(context.Context, *bodydownload.BodyRequest) ([64]byte, bool), penalise func(context.Context, []headerdownload.PenaltyItem), blockPropagator adapter.BlockPropagator, timeout int, chanConfig chain.Config, snapshots *snapshotsync.RoSnapshots, blockReader services.FullBlockReader, historyV3 bool, blockWriter *blockio.BlockWriter) BodiesCfg { + return BodiesCfg{db: db, bd: bd, bodyReqSend: bodyReqSend, penalise: penalise, blockPropagator: blockPropagator, timeout: timeout, chanConfig: chanConfig, snapshots: snapshots, blockReader: blockReader, historyV3: historyV3, blockWriter: blockWriter} } // BodiesForward progresses Bodies stage in the forward direction @@ -205,7 +207,7 @@ func BodiesForward( } // Check existence before write - because WriteRawBody isn't idempotent (it allocates new sequence range for transactions on every call) - ok, lastTxnNum, err := rawdb.WriteRawBodyIfNotExists(tx, header.Hash(), blockHeight, rawBody) + ok, lastTxnNum, err := cfg.blockWriter.WriteRawBodyIfNotExists(tx, header.Hash(), blockHeight, rawBody) if err != nil { return false, fmt.Errorf("WriteRawBodyIfNotExists: %w", err) } diff --git a/eth/stagedsync/stage_bodies_test.go b/eth/stagedsync/stage_bodies_test.go index 76b31bbde..1da50f258 100644 --- a/eth/stagedsync/stage_bodies_test.go +++ b/eth/stagedsync/stage_bodies_test.go @@ -22,7 +22,7 @@ func TestBodiesUnwind(t *testing.T) { tx, err := db.BeginRw(m.Ctx) require.NoError(err) defer tx.Rollback() - ctx := m.Ctx + br, bw := m.NewBlocksIO() txn := &types.DynamicFeeTransaction{Tip: u256.N1, FeeCap: u256.N1, CommonTx: types.CommonTx{ChainID: u256.N1, Value: u256.N1, Gas: 1, Nonce: 1}} buf := bytes.NewBuffer(nil) @@ -35,13 +35,13 @@ func TestBodiesUnwind(t *testing.T) { b := &types.RawBody{Transactions: [][]byte{rlpTxn, rlpTxn, rlpTxn}} for i := uint64(1); i <= 10; i++ { - _, _, err = rawdb.WriteRawBody(tx, libcommon.Hash{byte(i)}, i, b) + _, _, err = bw.WriteRawBodyIfNotExists(tx, libcommon.Hash{byte(i)}, i, b) require.NoError(err) err = rawdb.WriteCanonicalHash(tx, libcommon.Hash{byte(i)}, i) require.NoError(err) } { - err = rawdb.MakeBodiesNonCanonical(tx, 5+1, false, ctx, "test", logEvery) // block 5 already canonical, start from next one + err = rawdb.MakeBodiesNonCanonical(tx, 5+1, false, m.Ctx, "test", logEvery) // block 5 already canonical, start from next one require.NoError(err) n, err := tx.ReadSequence(kv.EthTx) @@ -49,13 +49,13 @@ func TestBodiesUnwind(t *testing.T) { require.Equal(2+5*(3+2), int(n)) // genesis 2 system txs + from 1, 5 block with 3 txn in each } { - err = rawdb.MakeBodiesCanonical(tx, 5+1, ctx, "test", logEvery, false, nil) // block 5 already canonical, start from next one + err = rawdb.MakeBodiesCanonical(tx, 5+1, m.Ctx, "test", logEvery, br.TxsV3Enabled(), nil) // block 5 already canonical, start from next one require.NoError(err) n, err := tx.ReadSequence(kv.EthTx) require.NoError(err) require.Equal(2+10*(3+2), int(n)) - _, _, err = rawdb.WriteRawBody(tx, libcommon.Hash{11}, 11, b) + _, _, err = bw.WriteRawBodyIfNotExists(tx, libcommon.Hash{11}, 11, b) require.NoError(err) err = rawdb.WriteCanonicalHash(tx, libcommon.Hash{11}, 11) require.NoError(err) @@ -67,14 +67,14 @@ func TestBodiesUnwind(t *testing.T) { { // unwind to block 5, means mark blocks >= 6 as non-canonical - err = rawdb.MakeBodiesNonCanonical(tx, 5+1, false, ctx, "test", logEvery) + err = rawdb.MakeBodiesNonCanonical(tx, 5+1, false, m.Ctx, "test", logEvery) require.NoError(err) n, err := tx.ReadSequence(kv.EthTx) require.NoError(err) require.Equal(2+5*(3+2), int(n)) // from 0, 5 block with 3 txn in each - err = rawdb.MakeBodiesCanonical(tx, 5+1, ctx, "test", logEvery, false, nil) // block 5 already canonical, start from next one + err = rawdb.MakeBodiesCanonical(tx, 5+1, m.Ctx, "test", logEvery, br.TxsV3Enabled(), nil) // block 5 already canonical, start from next one require.NoError(err) n, err = tx.ReadSequence(kv.EthTx) require.NoError(err) diff --git a/migrations/migrations.go b/migrations/migrations.go index bbf4630cf..2940003f4 100644 --- a/migrations/migrations.go +++ b/migrations/migrations.go @@ -33,7 +33,7 @@ import ( var migrations = map[kv.Label][]Migration{ kv.ChainDB: { dbSchemaVersion5, - txsBeginEnd, + TxsBeginEnd, }, kv.TxPoolDB: {}, kv.SentryDB: {}, diff --git a/migrations/txs_begin_end.go b/migrations/txs_begin_end.go index e748a4470..bae1019f2 100644 --- a/migrations/txs_begin_end.go +++ b/migrations/txs_begin_end.go @@ -25,7 +25,7 @@ import ( var ErrTxsBeginEndNoMigration = fmt.Errorf("in this Erigon version DB format was changed: added additional first/last system-txs to blocks. There is no DB migration for this change. Please re-sync or switch to earlier version") -var txsBeginEnd = Migration{ +var TxsBeginEnd = Migration{ Name: "txs_begin_end", Up: func(db kv.RwDB, dirs datadir.Dirs, progress []byte, BeforeCommit Callback, logger log.Logger) (err error) { logEvery := time.NewTicker(10 * time.Second) @@ -89,7 +89,7 @@ var txsBeginEnd = Migration{ continue } - txs, err := rawdb.CanonicalTransactions(tx, b.BaseTxId, b.TxAmount) + txs, err := canonicalTransactions(tx, b.BaseTxId, b.TxAmount) if err != nil { return err } @@ -114,7 +114,7 @@ var txsBeginEnd = Migration{ if assert.Enable { newBlock, baseTxId, txAmount := rawdb.ReadBody(tx, canonicalHash, blockNum) - newBlock.Transactions, err = rawdb.CanonicalTransactions(tx, baseTxId, txAmount) + newBlock.Transactions, err = canonicalTransactions(tx, baseTxId, txAmount) for i, oldTx := range oldBlock.Transactions { newTx := newBlock.Transactions[i] if oldTx.GetNonce() != newTx.GetNonce() { @@ -226,24 +226,6 @@ var txsBeginEnd = Migration{ }, } -func writeRawBodyDeprecated(db kv.RwTx, hash common2.Hash, number uint64, body *types.RawBody) error { - baseTxId, err := db.IncrementSequence(kv.EthTx, uint64(len(body.Transactions))) - if err != nil { - return err - } - data := types.BodyForStorage{ - BaseTxId: baseTxId, - TxAmount: uint32(len(body.Transactions)), - Uncles: body.Uncles, - } - if err = rawdb.WriteBodyForStorage(db, hash, number, &data); err != nil { - return fmt.Errorf("failed to write body: %w", err) - } - if err = rawdb.WriteRawTransactions(db, body.Transactions, baseTxId, &hash); err != nil { - return fmt.Errorf("failed to WriteRawTransactions: %w, blockNum=%d", err, number) - } - return nil -} func writeTransactionsNewDeprecated(db kv.RwTx, txs []types.Transaction, baseTxId uint64) error { txId := baseTxId buf := bytes.NewBuffer(nil) @@ -277,7 +259,7 @@ func readCanonicalBodyWithTransactionsDeprecated(db kv.Getter, hash common2.Hash } body := new(types.Body) body.Uncles = bodyForStorage.Uncles - body.Transactions, err = rawdb.CanonicalTransactions(db, bodyForStorage.BaseTxId, bodyForStorage.TxAmount) + body.Transactions, err = canonicalTransactions(db, bodyForStorage.BaseTxId, bodyForStorage.TxAmount) if err != nil { log.Error("failed ReadTransactionByHash", "hash", hash, "block", number, "err", err) return nil @@ -285,7 +267,7 @@ func readCanonicalBodyWithTransactionsDeprecated(db kv.Getter, hash common2.Hash return body } -func makeBodiesNonCanonicalDeprecated(tx kv.RwTx, from uint64, ctx context.Context, logPrefix string, logEvery *time.Ticker) error { +func MakeBodiesNonCanonicalDeprecated(tx kv.RwTx, from uint64, ctx context.Context, logPrefix string, logEvery *time.Ticker) error { for blockNum := from; ; blockNum++ { h, err := rawdb.ReadCanonicalHash(tx, blockNum) if err != nil { @@ -354,3 +336,26 @@ func makeBodiesNonCanonicalDeprecated(tx kv.RwTx, from uint64, ctx context.Conte return nil } + +func canonicalTransactions(db kv.Getter, baseTxId uint64, amount uint32) ([]types.Transaction, error) { + if amount == 0 { + return []types.Transaction{}, nil + } + txIdKey := make([]byte, 8) + txs := make([]types.Transaction, amount) + binary.BigEndian.PutUint64(txIdKey, baseTxId) + i := uint32(0) + + if err := db.ForAmount(kv.EthTx, txIdKey, amount, func(k, v []byte) error { + var decodeErr error + if txs[i], decodeErr = types.UnmarshalTransactionFromBinary(v); decodeErr != nil { + return decodeErr + } + i++ + return nil + }); err != nil { + return nil, err + } + txs = txs[:i] // user may request big "amount", but db can return small "amount". Return as much as we found. + return txs, nil +} diff --git a/migrations/txs_begin_end_test.go b/migrations/txs_begin_end_test.go index 97fd05785..0094a3c3a 100644 --- a/migrations/txs_begin_end_test.go +++ b/migrations/txs_begin_end_test.go @@ -1,9 +1,10 @@ -package migrations +package migrations_test import ( "bytes" "context" "encoding/binary" + "fmt" "testing" "time" @@ -12,6 +13,7 @@ import ( "github.com/ledgerwatch/erigon-lib/common/u256" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv/memdb" + "github.com/ledgerwatch/erigon/migrations" "github.com/ledgerwatch/log/v3" "github.com/stretchr/testify/require" @@ -39,7 +41,7 @@ func TestTxsBeginEnd(t *testing.T) { err = rawdb.WriteCanonicalHash(tx, hash, i) require.NoError(err) } - if err := makeBodiesNonCanonicalDeprecated(tx, 7, context.Background(), "", logEvery); err != nil { + if err := migrations.MakeBodiesNonCanonicalDeprecated(tx, 7, context.Background(), "", logEvery); err != nil { return err } @@ -59,8 +61,8 @@ func TestTxsBeginEnd(t *testing.T) { }) require.NoError(err) - migrator := NewMigrator(kv.ChainDB) - migrator.Migrations = []Migration{txsBeginEnd} + migrator := migrations.NewMigrator(kv.ChainDB) + migrator.Migrations = []migrations.Migration{migrations.TxsBeginEnd} logger := log.New() err = migrator.Apply(db, tmpDir, logger) require.NoError(err) @@ -104,3 +106,36 @@ func TestTxsBeginEnd(t *testing.T) { require.NoError(err) } + +func writeRawBodyDeprecated(db kv.RwTx, hash libcommon.Hash, number uint64, body *types.RawBody) error { + baseTxId, err := db.IncrementSequence(kv.EthTx, uint64(len(body.Transactions))) + if err != nil { + return err + } + data := types.BodyForStorage{ + BaseTxId: baseTxId, + TxAmount: uint32(len(body.Transactions)), + Uncles: body.Uncles, + } + if err = rawdb.WriteBodyForStorage(db, hash, number, &data); err != nil { + return fmt.Errorf("failed to write body: %w", err) + } + if err = writeRawTransactionsDeprecated(db, body.Transactions, baseTxId); err != nil { + return fmt.Errorf("failed to WriteRawTransactions: %w, blockNum=%d", err, number) + } + return nil +} + +func writeRawTransactionsDeprecated(tx kv.RwTx, txs [][]byte, baseTxId uint64) error { + txId := baseTxId + for _, txn := range txs { + txIdKey := make([]byte, 8) + binary.BigEndian.PutUint64(txIdKey, txId) + // If next Append returns KeyExists error - it means you need to open transaction in App code before calling this func. Batch is also fine. + if err := tx.Append(kv.EthTx, txIdKey, txn); err != nil { + return fmt.Errorf("txId=%d, baseTxId=%d, %w", txId, baseTxId, err) + } + txId++ + } + return nil +} diff --git a/node/node.go b/node/node.go index e86f75ddc..ab571471c 100644 --- a/node/node.go +++ b/node/node.go @@ -27,8 +27,10 @@ import ( "sync" "github.com/c2h5oh/datasize" + "github.com/ledgerwatch/erigon/cmd/utils" "github.com/ledgerwatch/erigon/node/nodecfg" "github.com/ledgerwatch/erigon/params" + "github.com/ledgerwatch/erigon/turbo/debug" "golang.org/x/sync/semaphore" "github.com/gofrs/flock" @@ -368,3 +370,11 @@ func OpenDatabase(config *nodecfg.Config, label kv.Label, logger log.Logger) (kv func (n *Node) ResolvePath(x string) string { return n.config.ResolvePath(x) } + +func StartNode(stack *Node) { + if err := stack.Start(); err != nil { + utils.Fatalf("Error starting protocol stack: %v", err) + } + + go debug.ListenSignals(stack) +} diff --git a/turbo/node/node.go b/turbo/node/node.go index f897e716e..77faf09f7 100644 --- a/turbo/node/node.go +++ b/turbo/node/node.go @@ -35,7 +35,7 @@ func (eri *ErigonNode) Serve() error { } func (eri *ErigonNode) run() { - utils.StartNode(eri.stack) + node.StartNode(eri.stack) // we don't have accounts locally and we don't do mining // so these parts are ignored // see cmd/geth/daemon.go#startNode for full implementation diff --git a/turbo/stages/mock_sentry.go b/turbo/stages/mock_sentry.go index 2434b0f65..081e6b160 100644 --- a/turbo/stages/mock_sentry.go +++ b/turbo/stages/mock_sentry.go @@ -415,7 +415,7 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK mock.BlockSnapshots, blockReader, cfg.HistoryV3, - cfg.TransactionsV3, + blockWriter, ), stagedsync.StageSendersCfg(mock.DB, mock.ChainConfig, false, dirs.Tmp, prune, blockRetire, blockWriter, mock.sentriesClient.Hd), stagedsync.StageExecuteBlocksCfg( diff --git a/turbo/stages/stageloop.go b/turbo/stages/stageloop.go index 072463141..53a8bb54c 100644 --- a/turbo/stages/stageloop.go +++ b/turbo/stages/stageloop.go @@ -458,7 +458,7 @@ func NewDefaultStages(ctx context.Context, snapshots, blockReader, cfg.HistoryV3, - cfg.TransactionsV3, + blockWriter, ), stagedsync.StageSendersCfg(db, controlServer.ChainConfig, false, dirs.Tmp, cfg.Prune, blockRetire, blockWriter, controlServer.Hd), stagedsync.StageExecuteBlocksCfg( @@ -513,7 +513,7 @@ func NewInMemoryExecution(ctx context.Context, db kv.RwDB, cfg *ethconfig.Config dirs.Tmp, nil, nil, ), - stagedsync.StageBodiesCfg(db, controlServer.Bd, controlServer.SendBodyRequest, controlServer.Penalize, controlServer.BroadcastNewBlock, cfg.Sync.BodyDownloadTimeoutSeconds, *controlServer.ChainConfig, snapshots, blockReader, cfg.HistoryV3, cfg.TransactionsV3), + stagedsync.StageBodiesCfg(db, controlServer.Bd, controlServer.SendBodyRequest, controlServer.Penalize, controlServer.BroadcastNewBlock, cfg.Sync.BodyDownloadTimeoutSeconds, *controlServer.ChainConfig, snapshots, blockReader, cfg.HistoryV3, blockWriter), stagedsync.StageBlockHashesCfg(db, dirs.Tmp, controlServer.ChainConfig, blockWriter), stagedsync.StageSendersCfg(db, controlServer.ChainConfig, true, dirs.Tmp, cfg.Prune, nil, blockWriter, controlServer.Hd), stagedsync.StageExecuteBlocksCfg(