blockReaders in tests, step4 (#7570)

This commit is contained in:
Alex Sharov 2023-05-24 15:52:51 +07:00 committed by GitHub
parent 2181b6abfd
commit 31687be599
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 148 additions and 82 deletions

View File

@ -294,12 +294,21 @@ func (b *SimulatedBackend) TransactionByHash(ctx context.Context, txHash libcomm
if blockNumber == nil { if blockNumber == nil {
return nil, false, ethereum.NotFound return nil, false, ethereum.NotFound
} }
txn, _, _, _, err = rawdb.ReadTransaction(tx, txHash, *blockNumber) blockHash, err := rawdb.ReadCanonicalHash(tx, *blockNumber)
if err != nil { if err != nil {
return nil, false, err return nil, false, err
} }
if txn != nil { body, err := b.BlockReader().BodyWithTransactions(ctx, tx, blockHash, *blockNumber)
return txn, false, nil if err != nil {
return nil, false, err
}
if body == nil {
return nil, false, ethereum.NotFound
}
for _, txn = range body.Transactions {
if txn.Hash() == txHash {
return txn, false, nil
}
} }
return nil, false, ethereum.NotFound return nil, false, ethereum.NotFound
} }

View File

@ -33,7 +33,7 @@ func (eri *ErigonNode) Serve() error {
} }
func (eri *ErigonNode) run() { func (eri *ErigonNode) run() {
utils.StartNode(eri.stack) node.StartNode(eri.stack)
// we don't have accounts locally and we don't do mining // we don't have accounts locally and we don't do mining
// so these parts are ignored // so these parts are ignored
// see cmd/geth/daemon.go#startNode for full implementation // see cmd/geth/daemon.go#startNode for full implementation

View File

@ -100,7 +100,7 @@ func NewStagedSync(ctx context.Context,
snapshots, snapshots,
blockReader, blockReader,
cfg.HistoryV3, cfg.HistoryV3,
cfg.TransactionsV3, blockWriter,
), ),
stagedsync.StageSendersCfg(db, controlServer.ChainConfig, false, dirs.Tmp, cfg.Prune, blockRetire, blockWriter, controlServer.Hd), stagedsync.StageSendersCfg(db, controlServer.ChainConfig, false, dirs.Tmp, cfg.Prune, blockRetire, blockWriter, controlServer.Hd),
stagedsync.StageExecuteBlocksCfg( stagedsync.StageExecuteBlocksCfg(

View File

@ -25,11 +25,14 @@ import (
"github.com/ledgerwatch/erigon-lib/common/length" "github.com/ledgerwatch/erigon-lib/common/length"
"github.com/ledgerwatch/erigon-lib/compress" "github.com/ledgerwatch/erigon-lib/compress"
"github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/kvcfg"
"github.com/ledgerwatch/erigon-lib/kv/mdbx" "github.com/ledgerwatch/erigon-lib/kv/mdbx"
"github.com/ledgerwatch/erigon-lib/kv/temporal/historyv2" "github.com/ledgerwatch/erigon-lib/kv/temporal/historyv2"
"github.com/ledgerwatch/erigon-lib/recsplit" "github.com/ledgerwatch/erigon-lib/recsplit"
"github.com/ledgerwatch/erigon-lib/recsplit/eliasfano32" "github.com/ledgerwatch/erigon-lib/recsplit/eliasfano32"
librlp "github.com/ledgerwatch/erigon-lib/rlp" librlp "github.com/ledgerwatch/erigon-lib/rlp"
"github.com/ledgerwatch/erigon/core/rawdb/blockio"
"github.com/ledgerwatch/erigon/turbo/services"
"github.com/ledgerwatch/log/v3" "github.com/ledgerwatch/log/v3"
"golang.org/x/exp/slices" "golang.org/x/exp/slices"
@ -131,16 +134,30 @@ func printCurrentBlockNumber(chaindata string) {
}) })
} }
func blocksIO(db kv.RoDB) (services.FullBlockReader, *blockio.BlockWriter) {
var transactionsV3 bool
if err := db.View(context.Background(), func(tx kv.Tx) error {
transactionsV3, _ = kvcfg.TransactionsV3.Enabled(tx)
return nil
}); err != nil {
panic(err)
}
br := snapshotsync.NewBlockReader(snapshotsync.NewRoSnapshots(ethconfig.Snapshot{Enabled: false}, "", log.New()), transactionsV3)
bw := blockio.NewBlockWriter(transactionsV3)
return br, bw
}
func printTxHashes(chaindata string, block uint64) error { func printTxHashes(chaindata string, block uint64) error {
db := mdbx.MustOpen(chaindata) db := mdbx.MustOpen(chaindata)
defer db.Close() defer db.Close()
br, _ := blocksIO(db)
if err := db.View(context.Background(), func(tx kv.Tx) error { if err := db.View(context.Background(), func(tx kv.Tx) error {
for b := block; b < block+1; b++ { for b := block; b < block+1; b++ {
hash, e := rawdb.ReadCanonicalHash(tx, b) hash, e := rawdb.ReadCanonicalHash(tx, b)
if e != nil { if e != nil {
return e return e
} }
block := rawdb.ReadBlock(tx, hash, b) block, _, _ := br.BlockWithSenders(context.Background(), tx, hash, b)
if block == nil { if block == nil {
break break
} }

View File

@ -5,7 +5,6 @@ import (
"github.com/ledgerwatch/erigon-lib/chain" "github.com/ledgerwatch/erigon-lib/chain"
"github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/cmd/hack/tool" "github.com/ledgerwatch/erigon/cmd/hack/tool"
"github.com/ledgerwatch/erigon/ethdb/prune" "github.com/ledgerwatch/erigon/ethdb/prune"
) )

View File

@ -5,7 +5,6 @@ import (
"github.com/ledgerwatch/erigon-lib/chain" "github.com/ledgerwatch/erigon-lib/chain"
"github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/erigon/core/rawdb"
) )
@ -21,9 +20,9 @@ func ParseFloat64(str string) float64 {
} }
func ChainConfig(tx kv.Tx) *chain.Config { func ChainConfig(tx kv.Tx) *chain.Config {
genesisBlock, err := rawdb.ReadBlockByNumber(tx, 0) genesisBlockHash, err := rawdb.ReadCanonicalHash(tx, 0)
Check(err) Check(err)
chainConfig, err := rawdb.ReadChainConfig(tx, genesisBlock.Hash()) chainConfig, err := rawdb.ReadChainConfig(tx, genesisBlockHash)
Check(err) Check(err)
return chainConfig return chainConfig
} }

View File

@ -107,21 +107,18 @@ func printStages(tx kv.Tx, snapshots *snapshotsync.RoSnapshots, agg *state.Aggre
if err != nil { if err != nil {
return err return err
} }
txs3, err := kvcfg.TransactionsV3.Enabled(tx)
if err != nil {
return err
}
lastK, lastV, err := rawdbv3.Last(tx, kv.MaxTxNum) lastK, lastV, err := rawdbv3.Last(tx, kv.MaxTxNum)
if err != nil { if err != nil {
return err return err
} }
_, lastBlockInHistSnap, _ := rawdbv3.TxNums.FindBlockNum(tx, agg.EndTxNumMinimax()) _, lastBlockInHistSnap, _ := rawdbv3.TxNums.FindBlockNum(tx, agg.EndTxNumMinimax())
fmt.Fprintf(w, "history.v3: %t, idx steps: %.02f, lastMaxTxNum=%d->%d, lastBlockInSnap=%d\n\n", h3, rawdbhelpers.IdxStepsCountV3(tx), u64or0(lastK), u64or0(lastV), lastBlockInHistSnap) fmt.Fprintf(w, "history.v3: %t, txs.v3: %t, idx steps: %.02f, lastMaxTxNum=%d->%d, lastBlockInSnap=%d\n\n", h3, txs3, rawdbhelpers.IdxStepsCountV3(tx), u64or0(lastK), u64or0(lastV), lastBlockInHistSnap)
s1, err := tx.ReadSequence(kv.EthTx)
transactionsV3, _ := kvcfg.TransactionsV3.Enabled(tx)
var s1 uint64
if transactionsV3 {
s1, err = tx.ReadSequence(kv.EthTxV3)
} else {
s1, err = tx.ReadSequence(kv.EthTx)
}
if err != nil { if err != nil {
return err return err
} }

View File

@ -768,8 +768,9 @@ func stageBodies(db kv.RwDB, ctx context.Context, logger log.Logger) error {
sn, agg := allSnapshots(ctx, db, logger) sn, agg := allSnapshots(ctx, db, logger)
defer sn.Close() defer sn.Close()
defer agg.Close() defer agg.Close()
chainConfig, historyV3, transactionsV3 := fromdb.ChainConfig(db), kvcfg.HistoryV3.FromDB(db), kvcfg.TransactionsV3.FromDB(db) chainConfig, historyV3 := fromdb.ChainConfig(db), kvcfg.HistoryV3.FromDB(db)
_, _, sync, _, _ := newSync(ctx, db, nil /* miningConfig */, logger) _, _, sync, _, _ := newSync(ctx, db, nil /* miningConfig */, logger)
br, bw := blocksIO(db, logger)
if err := db.Update(ctx, func(tx kv.RwTx) error { if err := db.Update(ctx, func(tx kv.RwTx) error {
s := stage(sync, tx, nil, stages.Bodies) s := stage(sync, tx, nil, stages.Bodies)
@ -780,8 +781,7 @@ func stageBodies(db kv.RwDB, ctx context.Context, logger log.Logger) error {
} }
u := sync.NewUnwindState(stages.Bodies, s.BlockNumber-unwind, s.BlockNumber) u := sync.NewUnwindState(stages.Bodies, s.BlockNumber-unwind, s.BlockNumber)
br, _ := blocksIO(db, logger) cfg := stagedsync.StageBodiesCfg(db, nil, nil, nil, nil, 0, *chainConfig, sn, br, historyV3, bw)
cfg := stagedsync.StageBodiesCfg(db, nil, nil, nil, nil, 0, *chainConfig, sn, br, historyV3, transactionsV3)
if err := stagedsync.UnwindBodiesStage(u, tx, cfg, ctx); err != nil { if err := stagedsync.UnwindBodiesStage(u, tx, cfg, ctx); err != nil {
return err return err
} }
@ -811,6 +811,7 @@ func stageSenders(db kv.RwDB, ctx context.Context, logger log.Logger) error {
must(sync.SetCurrentStage(stages.Senders)) must(sync.SetCurrentStage(stages.Senders))
br, bw := blocksIO(db, logger)
if reset { if reset {
_, bw := blocksIO(db, logger) _, bw := blocksIO(db, logger)
return db.Update(ctx, func(tx kv.RwTx) error { return reset2.ResetSenders(ctx, db, tx, bw) }) return db.Update(ctx, func(tx kv.RwTx) error { return reset2.ResetSenders(ctx, db, tx, bw) })
@ -828,12 +829,16 @@ func stageSenders(db kv.RwDB, ctx context.Context, logger log.Logger) error {
if err := common2.Stopped(ctx.Done()); err != nil { if err := common2.Stopped(ctx.Done()); err != nil {
return err return err
} }
withoutSenders, _ := rawdb.ReadBlockByNumber(tx, i) h, _ := br.HeaderByNumber(ctx, tx, i)
if withoutSenders == nil { if h == nil {
break break
} }
withoutSenders, senders, err := br.BlockWithSenders(ctx, tx, h.Hash(), h.Number.Uint64())
if err != nil {
return err
}
withoutSenders.Body().SendersFromTxs() //remove senders info from txs
txs := withoutSenders.Transactions() txs := withoutSenders.Transactions()
_, senders, _ := rawdb.CanonicalBlockByNumberWithSenders(tx, i)
if txs.Len() != len(senders) { if txs.Len() != len(senders) {
logger.Error("not equal amount of senders", "block", i, "db", len(senders), "expect", txs.Len()) logger.Error("not equal amount of senders", "block", i, "db", len(senders), "expect", txs.Len())
return nil return nil
@ -861,9 +866,9 @@ func stageSenders(db kv.RwDB, ctx context.Context, logger log.Logger) error {
s := stage(sync, tx, nil, stages.Senders) s := stage(sync, tx, nil, stages.Senders)
logger.Info("Stage", "name", s.ID, "progress", s.BlockNumber) logger.Info("Stage", "name", s.ID, "progress", s.BlockNumber)
var br *snapshotsync.BlockRetire var blockRetire *snapshotsync.BlockRetire
if sn.Cfg().Enabled { if sn.Cfg().Enabled {
br = snapshotsync.NewBlockRetire(estimate.CompressSnapshot.Workers(), tmpdir, sn, db, nil, nil, logger) blockRetire = snapshotsync.NewBlockRetire(estimate.CompressSnapshot.Workers(), tmpdir, sn, db, nil, nil, logger)
} }
pm, err := prune.Get(tx) pm, err := prune.Get(tx)
@ -871,8 +876,7 @@ func stageSenders(db kv.RwDB, ctx context.Context, logger log.Logger) error {
return err return err
} }
_, bw := blocksIO(db, logger) cfg := stagedsync.StageSendersCfg(db, chainConfig, false, tmpdir, pm, blockRetire, bw, nil)
cfg := stagedsync.StageSendersCfg(db, chainConfig, false, tmpdir, pm, br, bw, nil)
if unwind > 0 { if unwind > 0 {
u := sync.NewUnwindState(stages.Senders, s.BlockNumber-unwind, s.BlockNumber) u := sync.NewUnwindState(stages.Senders, s.BlockNumber-unwind, s.BlockNumber)
if err = stagedsync.UnwindSendersStage(u, tx, cfg, ctx); err != nil { if err = stagedsync.UnwindSendersStage(u, tx, cfg, ctx); err != nil {

View File

@ -22,9 +22,6 @@ import (
"io" "io"
"os" "os"
"runtime" "runtime"
"github.com/ledgerwatch/erigon/node"
"github.com/ledgerwatch/erigon/turbo/debug"
) )
// Fatalf formats a message to standard error and exits the program. // Fatalf formats a message to standard error and exits the program.
@ -46,11 +43,3 @@ func Fatalf(format string, args ...interface{}) {
fmt.Fprintf(w, "Fatal: "+format+"\n", args...) fmt.Fprintf(w, "Fatal: "+format+"\n", args...)
os.Exit(1) os.Exit(1)
} }
func StartNode(stack *node.Node) {
if err := stack.Start(); err != nil {
Fatalf("Error starting protocol stack: %v", err)
}
go debug.ListenSignals(stack)
}

View File

@ -11,6 +11,7 @@ import (
"github.com/ledgerwatch/erigon-lib/common/dbg" "github.com/ledgerwatch/erigon-lib/common/dbg"
"github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/rawdbv3" "github.com/ledgerwatch/erigon-lib/kv/rawdbv3"
"github.com/ledgerwatch/erigon/core/rawdb/blockio"
"github.com/ledgerwatch/log/v3" "github.com/ledgerwatch/log/v3"
"github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/erigon/core/rawdb"
@ -35,12 +36,13 @@ type BodiesCfg struct {
chanConfig chain.Config chanConfig chain.Config
snapshots *snapshotsync.RoSnapshots snapshots *snapshotsync.RoSnapshots
blockReader services.FullBlockReader blockReader services.FullBlockReader
blockWriter *blockio.BlockWriter
historyV3 bool historyV3 bool
transactionsV3 bool transactionsV3 bool
} }
func StageBodiesCfg(db kv.RwDB, bd *bodydownload.BodyDownload, bodyReqSend func(context.Context, *bodydownload.BodyRequest) ([64]byte, bool), penalise func(context.Context, []headerdownload.PenaltyItem), blockPropagator adapter.BlockPropagator, timeout int, chanConfig chain.Config, snapshots *snapshotsync.RoSnapshots, blockReader services.FullBlockReader, historyV3 bool, transactionsV3 bool) BodiesCfg { func StageBodiesCfg(db kv.RwDB, bd *bodydownload.BodyDownload, bodyReqSend func(context.Context, *bodydownload.BodyRequest) ([64]byte, bool), penalise func(context.Context, []headerdownload.PenaltyItem), blockPropagator adapter.BlockPropagator, timeout int, chanConfig chain.Config, snapshots *snapshotsync.RoSnapshots, blockReader services.FullBlockReader, historyV3 bool, blockWriter *blockio.BlockWriter) BodiesCfg {
return BodiesCfg{db: db, bd: bd, bodyReqSend: bodyReqSend, penalise: penalise, blockPropagator: blockPropagator, timeout: timeout, chanConfig: chanConfig, snapshots: snapshots, blockReader: blockReader, historyV3: historyV3, transactionsV3: transactionsV3} return BodiesCfg{db: db, bd: bd, bodyReqSend: bodyReqSend, penalise: penalise, blockPropagator: blockPropagator, timeout: timeout, chanConfig: chanConfig, snapshots: snapshots, blockReader: blockReader, historyV3: historyV3, blockWriter: blockWriter}
} }
// BodiesForward progresses Bodies stage in the forward direction // BodiesForward progresses Bodies stage in the forward direction
@ -205,7 +207,7 @@ func BodiesForward(
} }
// Check existence before write - because WriteRawBody isn't idempotent (it allocates new sequence range for transactions on every call) // Check existence before write - because WriteRawBody isn't idempotent (it allocates new sequence range for transactions on every call)
ok, lastTxnNum, err := rawdb.WriteRawBodyIfNotExists(tx, header.Hash(), blockHeight, rawBody) ok, lastTxnNum, err := cfg.blockWriter.WriteRawBodyIfNotExists(tx, header.Hash(), blockHeight, rawBody)
if err != nil { if err != nil {
return false, fmt.Errorf("WriteRawBodyIfNotExists: %w", err) return false, fmt.Errorf("WriteRawBodyIfNotExists: %w", err)
} }

View File

@ -22,7 +22,7 @@ func TestBodiesUnwind(t *testing.T) {
tx, err := db.BeginRw(m.Ctx) tx, err := db.BeginRw(m.Ctx)
require.NoError(err) require.NoError(err)
defer tx.Rollback() defer tx.Rollback()
ctx := m.Ctx br, bw := m.NewBlocksIO()
txn := &types.DynamicFeeTransaction{Tip: u256.N1, FeeCap: u256.N1, CommonTx: types.CommonTx{ChainID: u256.N1, Value: u256.N1, Gas: 1, Nonce: 1}} txn := &types.DynamicFeeTransaction{Tip: u256.N1, FeeCap: u256.N1, CommonTx: types.CommonTx{ChainID: u256.N1, Value: u256.N1, Gas: 1, Nonce: 1}}
buf := bytes.NewBuffer(nil) buf := bytes.NewBuffer(nil)
@ -35,13 +35,13 @@ func TestBodiesUnwind(t *testing.T) {
b := &types.RawBody{Transactions: [][]byte{rlpTxn, rlpTxn, rlpTxn}} b := &types.RawBody{Transactions: [][]byte{rlpTxn, rlpTxn, rlpTxn}}
for i := uint64(1); i <= 10; i++ { for i := uint64(1); i <= 10; i++ {
_, _, err = rawdb.WriteRawBody(tx, libcommon.Hash{byte(i)}, i, b) _, _, err = bw.WriteRawBodyIfNotExists(tx, libcommon.Hash{byte(i)}, i, b)
require.NoError(err) require.NoError(err)
err = rawdb.WriteCanonicalHash(tx, libcommon.Hash{byte(i)}, i) err = rawdb.WriteCanonicalHash(tx, libcommon.Hash{byte(i)}, i)
require.NoError(err) require.NoError(err)
} }
{ {
err = rawdb.MakeBodiesNonCanonical(tx, 5+1, false, ctx, "test", logEvery) // block 5 already canonical, start from next one err = rawdb.MakeBodiesNonCanonical(tx, 5+1, false, m.Ctx, "test", logEvery) // block 5 already canonical, start from next one
require.NoError(err) require.NoError(err)
n, err := tx.ReadSequence(kv.EthTx) n, err := tx.ReadSequence(kv.EthTx)
@ -49,13 +49,13 @@ func TestBodiesUnwind(t *testing.T) {
require.Equal(2+5*(3+2), int(n)) // genesis 2 system txs + from 1, 5 block with 3 txn in each require.Equal(2+5*(3+2), int(n)) // genesis 2 system txs + from 1, 5 block with 3 txn in each
} }
{ {
err = rawdb.MakeBodiesCanonical(tx, 5+1, ctx, "test", logEvery, false, nil) // block 5 already canonical, start from next one err = rawdb.MakeBodiesCanonical(tx, 5+1, m.Ctx, "test", logEvery, br.TxsV3Enabled(), nil) // block 5 already canonical, start from next one
require.NoError(err) require.NoError(err)
n, err := tx.ReadSequence(kv.EthTx) n, err := tx.ReadSequence(kv.EthTx)
require.NoError(err) require.NoError(err)
require.Equal(2+10*(3+2), int(n)) require.Equal(2+10*(3+2), int(n))
_, _, err = rawdb.WriteRawBody(tx, libcommon.Hash{11}, 11, b) _, _, err = bw.WriteRawBodyIfNotExists(tx, libcommon.Hash{11}, 11, b)
require.NoError(err) require.NoError(err)
err = rawdb.WriteCanonicalHash(tx, libcommon.Hash{11}, 11) err = rawdb.WriteCanonicalHash(tx, libcommon.Hash{11}, 11)
require.NoError(err) require.NoError(err)
@ -67,14 +67,14 @@ func TestBodiesUnwind(t *testing.T) {
{ {
// unwind to block 5, means mark blocks >= 6 as non-canonical // unwind to block 5, means mark blocks >= 6 as non-canonical
err = rawdb.MakeBodiesNonCanonical(tx, 5+1, false, ctx, "test", logEvery) err = rawdb.MakeBodiesNonCanonical(tx, 5+1, false, m.Ctx, "test", logEvery)
require.NoError(err) require.NoError(err)
n, err := tx.ReadSequence(kv.EthTx) n, err := tx.ReadSequence(kv.EthTx)
require.NoError(err) require.NoError(err)
require.Equal(2+5*(3+2), int(n)) // from 0, 5 block with 3 txn in each require.Equal(2+5*(3+2), int(n)) // from 0, 5 block with 3 txn in each
err = rawdb.MakeBodiesCanonical(tx, 5+1, ctx, "test", logEvery, false, nil) // block 5 already canonical, start from next one err = rawdb.MakeBodiesCanonical(tx, 5+1, m.Ctx, "test", logEvery, br.TxsV3Enabled(), nil) // block 5 already canonical, start from next one
require.NoError(err) require.NoError(err)
n, err = tx.ReadSequence(kv.EthTx) n, err = tx.ReadSequence(kv.EthTx)
require.NoError(err) require.NoError(err)

View File

@ -33,7 +33,7 @@ import (
var migrations = map[kv.Label][]Migration{ var migrations = map[kv.Label][]Migration{
kv.ChainDB: { kv.ChainDB: {
dbSchemaVersion5, dbSchemaVersion5,
txsBeginEnd, TxsBeginEnd,
}, },
kv.TxPoolDB: {}, kv.TxPoolDB: {},
kv.SentryDB: {}, kv.SentryDB: {},

View File

@ -25,7 +25,7 @@ import (
var ErrTxsBeginEndNoMigration = fmt.Errorf("in this Erigon version DB format was changed: added additional first/last system-txs to blocks. There is no DB migration for this change. Please re-sync or switch to earlier version") var ErrTxsBeginEndNoMigration = fmt.Errorf("in this Erigon version DB format was changed: added additional first/last system-txs to blocks. There is no DB migration for this change. Please re-sync or switch to earlier version")
var txsBeginEnd = Migration{ var TxsBeginEnd = Migration{
Name: "txs_begin_end", Name: "txs_begin_end",
Up: func(db kv.RwDB, dirs datadir.Dirs, progress []byte, BeforeCommit Callback, logger log.Logger) (err error) { Up: func(db kv.RwDB, dirs datadir.Dirs, progress []byte, BeforeCommit Callback, logger log.Logger) (err error) {
logEvery := time.NewTicker(10 * time.Second) logEvery := time.NewTicker(10 * time.Second)
@ -89,7 +89,7 @@ var txsBeginEnd = Migration{
continue continue
} }
txs, err := rawdb.CanonicalTransactions(tx, b.BaseTxId, b.TxAmount) txs, err := canonicalTransactions(tx, b.BaseTxId, b.TxAmount)
if err != nil { if err != nil {
return err return err
} }
@ -114,7 +114,7 @@ var txsBeginEnd = Migration{
if assert.Enable { if assert.Enable {
newBlock, baseTxId, txAmount := rawdb.ReadBody(tx, canonicalHash, blockNum) newBlock, baseTxId, txAmount := rawdb.ReadBody(tx, canonicalHash, blockNum)
newBlock.Transactions, err = rawdb.CanonicalTransactions(tx, baseTxId, txAmount) newBlock.Transactions, err = canonicalTransactions(tx, baseTxId, txAmount)
for i, oldTx := range oldBlock.Transactions { for i, oldTx := range oldBlock.Transactions {
newTx := newBlock.Transactions[i] newTx := newBlock.Transactions[i]
if oldTx.GetNonce() != newTx.GetNonce() { if oldTx.GetNonce() != newTx.GetNonce() {
@ -226,24 +226,6 @@ var txsBeginEnd = Migration{
}, },
} }
func writeRawBodyDeprecated(db kv.RwTx, hash common2.Hash, number uint64, body *types.RawBody) error {
baseTxId, err := db.IncrementSequence(kv.EthTx, uint64(len(body.Transactions)))
if err != nil {
return err
}
data := types.BodyForStorage{
BaseTxId: baseTxId,
TxAmount: uint32(len(body.Transactions)),
Uncles: body.Uncles,
}
if err = rawdb.WriteBodyForStorage(db, hash, number, &data); err != nil {
return fmt.Errorf("failed to write body: %w", err)
}
if err = rawdb.WriteRawTransactions(db, body.Transactions, baseTxId, &hash); err != nil {
return fmt.Errorf("failed to WriteRawTransactions: %w, blockNum=%d", err, number)
}
return nil
}
func writeTransactionsNewDeprecated(db kv.RwTx, txs []types.Transaction, baseTxId uint64) error { func writeTransactionsNewDeprecated(db kv.RwTx, txs []types.Transaction, baseTxId uint64) error {
txId := baseTxId txId := baseTxId
buf := bytes.NewBuffer(nil) buf := bytes.NewBuffer(nil)
@ -277,7 +259,7 @@ func readCanonicalBodyWithTransactionsDeprecated(db kv.Getter, hash common2.Hash
} }
body := new(types.Body) body := new(types.Body)
body.Uncles = bodyForStorage.Uncles body.Uncles = bodyForStorage.Uncles
body.Transactions, err = rawdb.CanonicalTransactions(db, bodyForStorage.BaseTxId, bodyForStorage.TxAmount) body.Transactions, err = canonicalTransactions(db, bodyForStorage.BaseTxId, bodyForStorage.TxAmount)
if err != nil { if err != nil {
log.Error("failed ReadTransactionByHash", "hash", hash, "block", number, "err", err) log.Error("failed ReadTransactionByHash", "hash", hash, "block", number, "err", err)
return nil return nil
@ -285,7 +267,7 @@ func readCanonicalBodyWithTransactionsDeprecated(db kv.Getter, hash common2.Hash
return body return body
} }
func makeBodiesNonCanonicalDeprecated(tx kv.RwTx, from uint64, ctx context.Context, logPrefix string, logEvery *time.Ticker) error { func MakeBodiesNonCanonicalDeprecated(tx kv.RwTx, from uint64, ctx context.Context, logPrefix string, logEvery *time.Ticker) error {
for blockNum := from; ; blockNum++ { for blockNum := from; ; blockNum++ {
h, err := rawdb.ReadCanonicalHash(tx, blockNum) h, err := rawdb.ReadCanonicalHash(tx, blockNum)
if err != nil { if err != nil {
@ -354,3 +336,26 @@ func makeBodiesNonCanonicalDeprecated(tx kv.RwTx, from uint64, ctx context.Conte
return nil return nil
} }
func canonicalTransactions(db kv.Getter, baseTxId uint64, amount uint32) ([]types.Transaction, error) {
if amount == 0 {
return []types.Transaction{}, nil
}
txIdKey := make([]byte, 8)
txs := make([]types.Transaction, amount)
binary.BigEndian.PutUint64(txIdKey, baseTxId)
i := uint32(0)
if err := db.ForAmount(kv.EthTx, txIdKey, amount, func(k, v []byte) error {
var decodeErr error
if txs[i], decodeErr = types.UnmarshalTransactionFromBinary(v); decodeErr != nil {
return decodeErr
}
i++
return nil
}); err != nil {
return nil, err
}
txs = txs[:i] // user may request big "amount", but db can return small "amount". Return as much as we found.
return txs, nil
}

View File

@ -1,9 +1,10 @@
package migrations package migrations_test
import ( import (
"bytes" "bytes"
"context" "context"
"encoding/binary" "encoding/binary"
"fmt"
"testing" "testing"
"time" "time"
@ -12,6 +13,7 @@ import (
"github.com/ledgerwatch/erigon-lib/common/u256" "github.com/ledgerwatch/erigon-lib/common/u256"
"github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/memdb" "github.com/ledgerwatch/erigon-lib/kv/memdb"
"github.com/ledgerwatch/erigon/migrations"
"github.com/ledgerwatch/log/v3" "github.com/ledgerwatch/log/v3"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -39,7 +41,7 @@ func TestTxsBeginEnd(t *testing.T) {
err = rawdb.WriteCanonicalHash(tx, hash, i) err = rawdb.WriteCanonicalHash(tx, hash, i)
require.NoError(err) require.NoError(err)
} }
if err := makeBodiesNonCanonicalDeprecated(tx, 7, context.Background(), "", logEvery); err != nil { if err := migrations.MakeBodiesNonCanonicalDeprecated(tx, 7, context.Background(), "", logEvery); err != nil {
return err return err
} }
@ -59,8 +61,8 @@ func TestTxsBeginEnd(t *testing.T) {
}) })
require.NoError(err) require.NoError(err)
migrator := NewMigrator(kv.ChainDB) migrator := migrations.NewMigrator(kv.ChainDB)
migrator.Migrations = []Migration{txsBeginEnd} migrator.Migrations = []migrations.Migration{migrations.TxsBeginEnd}
logger := log.New() logger := log.New()
err = migrator.Apply(db, tmpDir, logger) err = migrator.Apply(db, tmpDir, logger)
require.NoError(err) require.NoError(err)
@ -104,3 +106,36 @@ func TestTxsBeginEnd(t *testing.T) {
require.NoError(err) require.NoError(err)
} }
func writeRawBodyDeprecated(db kv.RwTx, hash libcommon.Hash, number uint64, body *types.RawBody) error {
baseTxId, err := db.IncrementSequence(kv.EthTx, uint64(len(body.Transactions)))
if err != nil {
return err
}
data := types.BodyForStorage{
BaseTxId: baseTxId,
TxAmount: uint32(len(body.Transactions)),
Uncles: body.Uncles,
}
if err = rawdb.WriteBodyForStorage(db, hash, number, &data); err != nil {
return fmt.Errorf("failed to write body: %w", err)
}
if err = writeRawTransactionsDeprecated(db, body.Transactions, baseTxId); err != nil {
return fmt.Errorf("failed to WriteRawTransactions: %w, blockNum=%d", err, number)
}
return nil
}
func writeRawTransactionsDeprecated(tx kv.RwTx, txs [][]byte, baseTxId uint64) error {
txId := baseTxId
for _, txn := range txs {
txIdKey := make([]byte, 8)
binary.BigEndian.PutUint64(txIdKey, txId)
// If next Append returns KeyExists error - it means you need to open transaction in App code before calling this func. Batch is also fine.
if err := tx.Append(kv.EthTx, txIdKey, txn); err != nil {
return fmt.Errorf("txId=%d, baseTxId=%d, %w", txId, baseTxId, err)
}
txId++
}
return nil
}

View File

@ -27,8 +27,10 @@ import (
"sync" "sync"
"github.com/c2h5oh/datasize" "github.com/c2h5oh/datasize"
"github.com/ledgerwatch/erigon/cmd/utils"
"github.com/ledgerwatch/erigon/node/nodecfg" "github.com/ledgerwatch/erigon/node/nodecfg"
"github.com/ledgerwatch/erigon/params" "github.com/ledgerwatch/erigon/params"
"github.com/ledgerwatch/erigon/turbo/debug"
"golang.org/x/sync/semaphore" "golang.org/x/sync/semaphore"
"github.com/gofrs/flock" "github.com/gofrs/flock"
@ -368,3 +370,11 @@ func OpenDatabase(config *nodecfg.Config, label kv.Label, logger log.Logger) (kv
func (n *Node) ResolvePath(x string) string { func (n *Node) ResolvePath(x string) string {
return n.config.ResolvePath(x) return n.config.ResolvePath(x)
} }
func StartNode(stack *Node) {
if err := stack.Start(); err != nil {
utils.Fatalf("Error starting protocol stack: %v", err)
}
go debug.ListenSignals(stack)
}

View File

@ -35,7 +35,7 @@ func (eri *ErigonNode) Serve() error {
} }
func (eri *ErigonNode) run() { func (eri *ErigonNode) run() {
utils.StartNode(eri.stack) node.StartNode(eri.stack)
// we don't have accounts locally and we don't do mining // we don't have accounts locally and we don't do mining
// so these parts are ignored // so these parts are ignored
// see cmd/geth/daemon.go#startNode for full implementation // see cmd/geth/daemon.go#startNode for full implementation

View File

@ -415,7 +415,7 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK
mock.BlockSnapshots, mock.BlockSnapshots,
blockReader, blockReader,
cfg.HistoryV3, cfg.HistoryV3,
cfg.TransactionsV3, blockWriter,
), ),
stagedsync.StageSendersCfg(mock.DB, mock.ChainConfig, false, dirs.Tmp, prune, blockRetire, blockWriter, mock.sentriesClient.Hd), stagedsync.StageSendersCfg(mock.DB, mock.ChainConfig, false, dirs.Tmp, prune, blockRetire, blockWriter, mock.sentriesClient.Hd),
stagedsync.StageExecuteBlocksCfg( stagedsync.StageExecuteBlocksCfg(

View File

@ -458,7 +458,7 @@ func NewDefaultStages(ctx context.Context,
snapshots, snapshots,
blockReader, blockReader,
cfg.HistoryV3, cfg.HistoryV3,
cfg.TransactionsV3, blockWriter,
), ),
stagedsync.StageSendersCfg(db, controlServer.ChainConfig, false, dirs.Tmp, cfg.Prune, blockRetire, blockWriter, controlServer.Hd), stagedsync.StageSendersCfg(db, controlServer.ChainConfig, false, dirs.Tmp, cfg.Prune, blockRetire, blockWriter, controlServer.Hd),
stagedsync.StageExecuteBlocksCfg( stagedsync.StageExecuteBlocksCfg(
@ -513,7 +513,7 @@ func NewInMemoryExecution(ctx context.Context, db kv.RwDB, cfg *ethconfig.Config
dirs.Tmp, dirs.Tmp,
nil, nil, nil, nil,
), ),
stagedsync.StageBodiesCfg(db, controlServer.Bd, controlServer.SendBodyRequest, controlServer.Penalize, controlServer.BroadcastNewBlock, cfg.Sync.BodyDownloadTimeoutSeconds, *controlServer.ChainConfig, snapshots, blockReader, cfg.HistoryV3, cfg.TransactionsV3), stagedsync.StageBodiesCfg(db, controlServer.Bd, controlServer.SendBodyRequest, controlServer.Penalize, controlServer.BroadcastNewBlock, cfg.Sync.BodyDownloadTimeoutSeconds, *controlServer.ChainConfig, snapshots, blockReader, cfg.HistoryV3, blockWriter),
stagedsync.StageBlockHashesCfg(db, dirs.Tmp, controlServer.ChainConfig, blockWriter), stagedsync.StageBlockHashesCfg(db, dirs.Tmp, controlServer.ChainConfig, blockWriter),
stagedsync.StageSendersCfg(db, controlServer.ChainConfig, true, dirs.Tmp, cfg.Prune, nil, blockWriter, controlServer.Hd), stagedsync.StageSendersCfg(db, controlServer.ChainConfig, true, dirs.Tmp, cfg.Prune, nil, blockWriter, controlServer.Hd),
stagedsync.StageExecuteBlocksCfg( stagedsync.StageExecuteBlocksCfg(