preparation for --txs.v3, step 2 (#7636)

This commit is contained in:
Alex Sharov 2023-06-03 07:20:22 +07:00 committed by GitHub
parent 190bc9dfd8
commit d40317c905
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 174 additions and 127 deletions

View File

@ -88,7 +88,7 @@ func (e *Eth1Execution) InsertBodies(ctx context.Context, req *execution.InsertB
Amount: withdrawal.Amount,
})
}
if _, _, err := e.blockWriter.WriteRawBodyIfNotExists(tx, gointerfaces.ConvertH256ToHash(body.BlockHash),
if _, err := e.blockWriter.WriteRawBodyIfNotExists(tx, gointerfaces.ConvertH256ToHash(body.BlockHash),
body.BlockNumber, &types.RawBody{
Transactions: body.Transactions,
Uncles: uncles,

View File

@ -84,7 +84,7 @@ func (e *Eth1Execution) InsertBodies(ctx context.Context, req *execution.InsertB
uncles = append(uncles, h)
}
// Withdrawals processing
if _, _, err := e.blockWriter.WriteRawBodyIfNotExists(tx, gointerfaces.ConvertH256ToHash(body.BlockHash),
if _, err := e.blockWriter.WriteRawBodyIfNotExists(tx, gointerfaces.ConvertH256ToHash(body.BlockHash),
body.BlockNumber, &types.RawBody{
Transactions: body.Transactions,
Uncles: uncles,

View File

@ -53,7 +53,7 @@ func TestEthSubscribe(t *testing.T) {
initialCycle := true
highestSeenHeader := chain.TopBlock.NumberU64()
hook := stages.NewHook(m.Ctx, m.Notifications, m.Sync, m.ChainConfig, m.Log, m.UpdateHead)
hook := stages.NewHook(m.Ctx, m.Notifications, m.Sync, br, m.ChainConfig, m.Log, m.UpdateHead)
if _, err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, initialCycle, logger, nil, hook); err != nil {
t.Fatal(err)
}

View File

@ -202,6 +202,9 @@ func (back *RemoteBackend) SubscribeLogs(ctx context.Context, onNewLogs func(rep
func (back *RemoteBackend) TxnLookup(ctx context.Context, tx kv.Getter, txnHash libcommon.Hash) (uint64, bool, error) {
return back.blockReader.TxnLookup(ctx, tx, txnHash)
}
func (back *RemoteBackend) HasSenders(ctx context.Context, tx kv.Getter, hash libcommon.Hash, blockHeight uint64) (bool, error) {
panic("HasSenders is low-level method, don't use it in RPCDaemon")
}
func (back *RemoteBackend) BlockWithSenders(ctx context.Context, tx kv.Getter, hash libcommon.Hash, blockHeight uint64) (block *types.Block, senders []libcommon.Address, err error) {
return back.blockReader.BlockWithSenders(ctx, tx, hash, blockHeight)
}

View File

@ -677,7 +677,7 @@ func (cs *MultiClient) getReceipts66(ctx context.Context, inreq *proto_sentry.In
return err
}
defer tx.Rollback()
receipts, err := eth.AnswerGetReceiptsQuery(tx, query.GetReceiptsPacket)
receipts, err := eth.AnswerGetReceiptsQuery(cs.blockReader, tx, query.GetReceiptsPacket)
if err != nil {
return err
}

View File

@ -258,10 +258,7 @@ func write(tx kv.RwTx, g *types.Genesis, tmpDir string) (*types.Block, *state.In
}
blockWriter := blockio.NewBlockWriter(histV3, transactionV3)
if err := blockWriter.WriteHeader(tx, block.HeaderNoCopy()); err != nil {
return nil, nil, err
}
if err := blockWriter.WriteBody(tx, block.Hash(), block.NumberU64(), block.Body()); err != nil {
if err := blockWriter.WriteBlock(tx, block); err != nil {
return nil, nil, err
}
if err := blockWriter.WriteTd(tx, block.Hash(), block.NumberU64(), g.Difficulty); err != nil {

View File

@ -26,8 +26,6 @@ import (
"math/big"
"time"
"github.com/ledgerwatch/erigon-lib/kv/kvcfg"
"github.com/gballet/go-verkle"
common2 "github.com/ledgerwatch/erigon-lib/common"
libcommon "github.com/ledgerwatch/erigon-lib/common"
@ -36,6 +34,7 @@ import (
"github.com/ledgerwatch/erigon-lib/common/hexutility"
"github.com/ledgerwatch/erigon-lib/common/length"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/rawdbv3"
"github.com/ledgerwatch/log/v3"
"github.com/ledgerwatch/erigon/common"
@ -409,17 +408,12 @@ func ReadStorageBody(db kv.Getter, hash libcommon.Hash, number uint64) (types.Bo
return *bodyForStorage, nil
}
func CanonicalTxnByID(db kv.Getter, id uint64, blockHash libcommon.Hash, transactionsV3 bool) (types.Transaction, error) {
func CanonicalTxnByID(db kv.Getter, id uint64, blockHash libcommon.Hash) (types.Transaction, error) {
txIdKey := make([]byte, 8)
binary.BigEndian.PutUint64(txIdKey, id)
var v []byte
var err error
if transactionsV3 {
key := append(txIdKey, blockHash.Bytes()...)
v, err = db.GetOne(kv.EthTxV3, key)
} else {
v, err = db.GetOne(kv.EthTx, txIdKey)
}
v, err = db.GetOne(kv.EthTx, txIdKey)
if err != nil {
return nil, err
}
@ -437,12 +431,9 @@ func CanonicalTransactions(db kv.Getter, baseTxId uint64, amount uint32) ([]type
if amount == 0 {
return []types.Transaction{}, nil
}
txIdKey := make([]byte, 8)
txs := make([]types.Transaction, amount)
binary.BigEndian.PutUint64(txIdKey, baseTxId)
i := uint32(0)
if err := db.ForAmount(kv.EthTx, txIdKey, amount, func(k, v []byte) error {
if err := db.ForAmount(kv.EthTx, hexutility.EncodeTs(baseTxId), amount, func(k, v []byte) error {
var decodeErr error
if txs[i], decodeErr = types.UnmarshalTransactionFromBinary(v); decodeErr != nil {
return decodeErr
@ -460,12 +451,9 @@ func NonCanonicalTransactions(db kv.Getter, baseTxId uint64, amount uint32) ([]t
if amount == 0 {
return []types.Transaction{}, nil
}
txIdKey := make([]byte, 8)
txs := make([]types.Transaction, amount)
binary.BigEndian.PutUint64(txIdKey, baseTxId)
i := uint32(0)
if err := db.ForAmount(kv.NonCanonicalTxs, txIdKey, amount, func(k, v []byte) error {
if err := db.ForAmount(kv.NonCanonicalTxs, hexutility.EncodeTs(baseTxId), amount, func(k, v []byte) error {
var decodeErr error
if txs[i], decodeErr = types.DecodeTransaction(v); decodeErr != nil {
return decodeErr
@ -479,11 +467,11 @@ func NonCanonicalTransactions(db kv.Getter, baseTxId uint64, amount uint32) ([]t
return txs, nil
}
func WriteTransactions(db kv.RwTx, txs []types.Transaction, baseTxId uint64, blockHash *libcommon.Hash) error {
func WriteTransactions(db kv.RwTx, txs []types.Transaction, baseTxId uint64) error {
txId := baseTxId
txIdKey := make([]byte, 8)
buf := bytes.NewBuffer(nil)
for _, tx := range txs {
txIdKey := make([]byte, 8)
binary.BigEndian.PutUint64(txIdKey, txId)
txId++
@ -492,36 +480,21 @@ func WriteTransactions(db kv.RwTx, txs []types.Transaction, baseTxId uint64, blo
return fmt.Errorf("broken tx rlp: %w", err)
}
// If next Append returns KeyExists error - it means you need to open transaction in App code before calling this func. Batch is also fine.
if blockHash != nil {
key := append(txIdKey, blockHash.Bytes()...)
if err := db.Append(kv.EthTxV3, key, common.CopyBytes(buf.Bytes())); err != nil {
return err
}
} else {
if err := db.Append(kv.EthTx, txIdKey, common.CopyBytes(buf.Bytes())); err != nil {
return err
}
if err := db.Append(kv.EthTx, txIdKey, buf.Bytes()); err != nil {
return err
}
}
return nil
}
func WriteRawTransactions(tx kv.RwTx, txs [][]byte, baseTxId uint64, blockHash *common2.Hash) error {
func WriteRawTransactions(tx kv.RwTx, txs [][]byte, baseTxId uint64) error {
txId := baseTxId
txIdKey := make([]byte, 8)
for _, txn := range txs {
txIdKey := make([]byte, 8)
binary.BigEndian.PutUint64(txIdKey, txId)
// If next Append returns KeyExists error - it means you need to open transaction in App code before calling this func. Batch is also fine.
if blockHash != nil {
if err := tx.Append(kv.EthTx, txIdKey, txn); err != nil {
return fmt.Errorf("txId=%d, baseTxId=%d, %w", txId, baseTxId, err)
}
} else {
key := append(txIdKey, blockHash.Bytes()...)
if err := tx.Append(kv.EthTxV3, key, txn); err != nil {
return fmt.Errorf("txId=%d, baseTxId=%d, %w", txId, baseTxId, err)
}
if err := tx.Append(kv.EthTx, txIdKey, txn); err != nil {
return fmt.Errorf("txId=%d, baseTxId=%d, %w", txId, baseTxId, err)
}
txId++
}
@ -674,6 +647,10 @@ func ReadBody(db kv.Getter, hash libcommon.Hash, number uint64) (*types.Body, ui
return body, bodyForStorage.BaseTxId + 1, bodyForStorage.TxAmount - 2 // 1 system txn in the begining of block, and 1 at the end
}
func HasSenders(db kv.Getter, hash libcommon.Hash, number uint64) (bool, error) {
return db.Has(kv.Senders, dbutils.BlockBodyKey(number, hash))
}
func ReadSenders(db kv.Getter, hash libcommon.Hash, number uint64) ([]libcommon.Address, error) {
data, err := db.GetOne(kv.Senders, dbutils.BlockBodyKey(number, hash))
if err != nil {
@ -686,21 +663,21 @@ func ReadSenders(db kv.Getter, hash libcommon.Hash, number uint64) ([]libcommon.
return senders, nil
}
func WriteRawBodyIfNotExists(db kv.RwTx, hash libcommon.Hash, number uint64, body *types.RawBody) (ok bool, lastTxnNum uint64, err error) {
func WriteRawBodyIfNotExists(db kv.RwTx, hash libcommon.Hash, number uint64, body *types.RawBody) (ok bool, err error) {
exists, err := db.Has(kv.BlockBody, dbutils.BlockBodyKey(number, hash))
if err != nil {
return false, 0, err
return false, err
}
if exists {
return false, 0, nil
return false, nil
}
return WriteRawBody(db, hash, number, body)
}
func WriteRawBody(db kv.RwTx, hash libcommon.Hash, number uint64, body *types.RawBody) (ok bool, lastTxnID uint64, err error) {
func WriteRawBody(db kv.RwTx, hash libcommon.Hash, number uint64, body *types.RawBody) (ok bool, err error) {
baseTxnID, err := db.IncrementSequence(kv.EthTx, uint64(len(body.Transactions))+2)
if err != nil {
return false, 0, err
return false, err
}
data := types.BodyForStorage{
BaseTxId: baseTxnID,
@ -709,14 +686,13 @@ func WriteRawBody(db kv.RwTx, hash libcommon.Hash, number uint64, body *types.Ra
Withdrawals: body.Withdrawals,
}
if err = WriteBodyForStorage(db, hash, number, &data); err != nil {
return false, 0, fmt.Errorf("WriteBodyForStorage: %w", err)
return false, fmt.Errorf("WriteBodyForStorage: %w", err)
}
lastTxnID = baseTxnID + uint64(data.TxAmount) - 1
firstNonSystemTxnID := baseTxnID + 1
if err = WriteRawTransactions(db, body.Transactions, firstNonSystemTxnID, &hash); err != nil {
return false, 0, fmt.Errorf("WriteRawTransactions: %w", err)
if err = WriteRawTransactions(db, body.Transactions, firstNonSystemTxnID); err != nil {
return false, fmt.Errorf("WriteRawTransactions: %w", err)
}
return true, lastTxnID, nil
return true, nil
}
func WriteBody(db kv.RwTx, hash libcommon.Hash, number uint64, body *types.Body) error {
@ -735,12 +711,7 @@ func WriteBody(db kv.RwTx, hash libcommon.Hash, number uint64, body *types.Body)
if err := WriteBodyForStorage(db, hash, number, &data); err != nil {
return fmt.Errorf("failed to write body: %w", err)
}
transactionV3, _ := kvcfg.TransactionsV3.Enabled(db.(kv.Tx))
if transactionV3 {
err = WriteTransactions(db, body.Transactions, baseTxId+1, &hash)
} else {
err = WriteTransactions(db, body.Transactions, baseTxId+1, nil)
}
err = WriteTransactions(db, body.Transactions, baseTxId+1)
if err != nil {
return fmt.Errorf("failed to WriteTransactions: %w", err)
}
@ -765,8 +736,44 @@ func DeleteBody(db kv.Deleter, hash libcommon.Hash, number uint64) {
}
}
func AppendCanonicalTxNums(tx kv.RwTx, from uint64) (err error) {
nextBaseTxNum := uint64(0)
if from > 0 {
nextBaseTxNum, err = rawdbv3.TxNums.Max(tx, from-1)
if err != nil {
return err
}
nextBaseTxNum++
}
for blockNum := from; ; blockNum++ {
h, err := ReadCanonicalHash(tx, blockNum)
if err != nil {
return err
}
if h == (libcommon.Hash{}) {
break
}
data := ReadStorageBodyRLP(tx, h, blockNum)
if len(data) == 0 {
break
}
bodyForStorage := new(types.BodyForStorage)
if err := rlp.DecodeBytes(data, bodyForStorage); err != nil {
return err
}
nextBaseTxNum += uint64(bodyForStorage.TxAmount)
err = rawdbv3.TxNums.Append(tx, blockNum, nextBaseTxNum-1)
if err != nil {
return err
}
}
return nil
}
// MakeBodiesCanonical - move all txs of non-canonical blocks from NonCanonicalTxs table to EthTx table
func MakeBodiesCanonical(tx kv.RwTx, from uint64, ctx context.Context, logPrefix string, logEvery *time.Ticker, transactionsV3 bool, cb func(blockNum, lastTxnNum uint64) error) error {
func MakeBodiesCanonical(tx kv.RwTx, from uint64, ctx context.Context, logPrefix string, logEvery *time.Ticker, cb func(blockNum uint64, lastTxnNum uint64) error) error {
for blockNum := from; ; blockNum++ {
h, err := ReadCanonicalHash(tx, blockNum)
if err != nil {
@ -785,9 +792,6 @@ func MakeBodiesCanonical(tx kv.RwTx, from uint64, ctx context.Context, logPrefix
return err
}
ethTx := kv.EthTx
if transactionsV3 {
ethTx = kv.EthTxV3
}
newBaseId, err := tx.IncrementSequence(ethTx, uint64(bodyForStorage.TxAmount))
if err != nil {
return err
@ -797,15 +801,8 @@ func MakeBodiesCanonical(tx kv.RwTx, from uint64, ctx context.Context, logPrefix
i := uint64(0)
if err := tx.ForAmount(kv.NonCanonicalTxs, hexutility.EncodeTs(bodyForStorage.BaseTxId+1), bodyForStorage.TxAmount-2, func(k, v []byte) error {
id := newBaseId + 1 + i
if transactionsV3 {
key := append(hexutility.EncodeTs(id), h.Bytes()...)
if err := tx.Put(kv.EthTxV3, key, v); err != nil {
return err
}
} else {
if err := tx.Put(kv.EthTx, hexutility.EncodeTs(id), v); err != nil {
return err
}
if err := tx.Put(kv.EthTx, hexutility.EncodeTs(id), v); err != nil {
return err
}
if err := tx.Delete(kv.NonCanonicalTxs, k); err != nil {
return err
@ -872,6 +869,7 @@ func MakeBodiesNonCanonical(tx kv.RwTx, from uint64, deleteBodies bool, ctx cont
return err
}
}
// next loop does move only non-system txs. need move system-txs manually (because they may not exist)
i := uint64(0)
if err := tx.ForAmount(kv.EthTx, hexutility.EncodeTs(bodyForStorage.BaseTxId+1), bodyForStorage.TxAmount-2, func(k, v []byte) error {
@ -926,6 +924,7 @@ func MakeBodiesNonCanonical(tx kv.RwTx, from uint64, deleteBodies bool, ctx cont
return err
}
}
return nil
}

View File

@ -78,25 +78,42 @@ func (w *BlockWriter) FillHeaderNumberIndex(logPrefix string, tx kv.RwTx, tmpDir
}
func (w *BlockWriter) MakeBodiesCanonical(tx kv.RwTx, from uint64, ctx context.Context, logPrefix string, logEvery *time.Ticker) error {
// Property of blockchain: same block in different forks will have different hashes.
// Means - can mark all canonical blocks as non-canonical on unwind, and
// do opposite here - without storing any meta-info.
if err := rawdb.MakeBodiesCanonical(tx, from, ctx, logPrefix, logEvery, w.txsV3, func(blockNum, lastTxnNum uint64) error {
if w.historyV3 {
if err := rawdbv3.TxNums.Append(tx, blockNum, lastTxnNum); err != nil {
return err
if !w.txsV3 {
// Property of blockchain: same block in different forks will have different hashes.
// Means - can mark all canonical blocks as non-canonical on unwind, and
// do opposite here - without storing any meta-info.
if err := rawdb.MakeBodiesCanonical(tx, from, ctx, logPrefix, logEvery, func(blockNum, lastTxnNum uint64) error {
if w.historyV3 {
if err := rawdbv3.TxNums.Append(tx, blockNum, lastTxnNum); err != nil {
return err
}
}
return nil
}); err != nil {
return fmt.Errorf("make block canonical: %w", err)
}
return nil
}); err != nil {
return fmt.Errorf("make block canonical: %w", err)
}
if w.historyV3 {
if err := rawdb.AppendCanonicalTxNums(tx, from); err != nil {
return err
}
}
return nil
}
func (w *BlockWriter) MakeBodiesNonCanonical(tx kv.RwTx, from uint64, deleteBodies bool, ctx context.Context, logPrefix string, logEvery *time.Ticker) error {
if err := rawdb.MakeBodiesNonCanonical(tx, from, deleteBodies, ctx, logPrefix, logEvery); err != nil {
return err
if !w.txsV3 {
if err := rawdb.MakeBodiesNonCanonical(tx, from, deleteBodies, ctx, logPrefix, logEvery); err != nil {
return err
}
if w.historyV3 {
if err := rawdbv3.TxNums.Truncate(tx, from); err != nil {
return err
}
}
return nil
}
if w.historyV3 {
if err := rawdbv3.TxNums.Truncate(tx, from); err != nil {
return err
@ -110,10 +127,10 @@ func extractHeaders(k []byte, v []byte, next etl.ExtractNextFunc) error {
if len(k) != 40 {
return nil
}
return next(k, common.Copy(k[8:]), common.Copy(k[:8]))
return next(k, k[8:], k[:8])
}
func (w *BlockWriter) WriteRawBodyIfNotExists(tx kv.RwTx, hash common.Hash, number uint64, body *types.RawBody) (ok bool, lastTxnNum uint64, err error) {
func (w *BlockWriter) WriteRawBodyIfNotExists(tx kv.RwTx, hash common.Hash, number uint64, body *types.RawBody) (ok bool, err error) {
return rawdb.WriteRawBodyIfNotExists(tx, hash, number, body)
}
func (w *BlockWriter) WriteBody(tx kv.RwTx, hash common.Hash, number uint64, body *types.Body) error {

View File

@ -1078,7 +1078,7 @@ func (s *Ethereum) Start() error {
s.sentriesClient.StartStreamLoops(s.sentryCtx)
time.Sleep(10 * time.Millisecond) // just to reduce logs order confusion
hook := stages2.NewHook(s.sentryCtx, s.notifications, s.stagedSync, s.chainConfig, s.logger, s.sentriesClient.UpdateHead)
hook := stages2.NewHook(s.sentryCtx, s.notifications, s.stagedSync, s.blockReader, s.chainConfig, s.logger, s.sentriesClient.UpdateHead)
go stages2.StageLoop(s.sentryCtx, s.chainDB, s.stagedSync, s.sentriesClient.Hd, s.waitForStageLoopStop, s.config.Sync.LoopThrottle, s.logger, s.blockSnapshots, hook)
return nil

View File

@ -91,7 +91,6 @@ func TestGetBlockReceipts(t *testing.T) {
receipts []rlp.RawValue
)
br, _ := m.NewBlocksIO()
err := m.DB.View(m.Ctx, func(tx kv.Tx) error {
for i := uint64(0); i <= rawdb.ReadCurrentHeader(tx).Number.Uint64(); i++ {
block, err := br.BlockByNumber(m.Ctx, tx, i)

View File

@ -157,7 +157,7 @@ func AnswerGetBlockBodiesQuery(db kv.Tx, query GetBlockBodiesPacket, blockReader
return bodies
}
func AnswerGetReceiptsQuery(db kv.Tx, query GetReceiptsPacket) ([]rlp.RawValue, error) { //nolint:unparam
func AnswerGetReceiptsQuery(br services.FullBlockReader, db kv.Tx, query GetReceiptsPacket) ([]rlp.RawValue, error) { //nolint:unparam
// Gather state data until the fetch or network limits is reached
var (
bytes int
@ -168,16 +168,19 @@ func AnswerGetReceiptsQuery(db kv.Tx, query GetReceiptsPacket) ([]rlp.RawValue,
lookups >= 2*maxReceiptsServe {
break
}
// Retrieve the requested block's receipts
number := rawdb.ReadHeaderNumber(db, hash)
if number == nil {
return nil, nil
}
block, senders, err := rawdb.ReadBlockWithSenders(db, hash, *number)
// Retrieve the requested block's receipts
b, s, err := br.BlockWithSenders(context.Background(), db, hash, *number)
if err != nil {
return nil, err
}
results := rawdb.ReadReceipts(db, block, senders)
if b == nil {
return nil, nil
}
results := rawdb.ReadReceipts(db, b, s)
if results == nil {
header, err := rawdb.ReadHeaderByHash(db, hash)
if err != nil {

View File

@ -11,6 +11,8 @@ import (
"github.com/ledgerwatch/erigon-lib/common/dbg"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/rawdbv3"
"github.com/ledgerwatch/erigon/common/dbutils"
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/core/rawdb/blockio"
"github.com/ledgerwatch/log/v3"
@ -202,14 +204,19 @@ func BodiesForward(
}
// Check existence before write - because WriteRawBody isn't idempotent (it allocates new sequence range for transactions on every call)
ok, lastTxnNum, err := cfg.blockWriter.WriteRawBodyIfNotExists(tx, header.Hash(), blockHeight, rawBody)
ok, err := cfg.blockWriter.WriteRawBodyIfNotExists(tx, header.Hash(), blockHeight, rawBody)
if err != nil {
return false, fmt.Errorf("WriteRawBodyIfNotExists: %w", err)
}
if cfg.historyV3 && ok {
if err := rawdbv3.TxNums.Append(tx, blockHeight, lastTxnNum); err != nil {
body, _ := rawdb.ReadBodyForStorageByKey(tx, dbutils.BlockBodyKey(blockHeight, header.Hash()))
lastTxnID := body.BaseTxId + uint64(body.TxAmount) - 1
if err := rawdbv3.TxNums.Append(tx, blockHeight, lastTxnID); err != nil {
return false, err
}
//if err := rawdb.AppendCanonicalTxNums(tx, blockHeight); err != nil {
// return false, err
//}
}
if ok {
dataflow.BlockBodyDownloadStates.AddChange(blockHeight, dataflow.BlockBodyCleared)

View File

@ -35,7 +35,7 @@ func TestBodiesUnwind(t *testing.T) {
b := &types.RawBody{Transactions: [][]byte{rlpTxn, rlpTxn, rlpTxn}}
for i := uint64(1); i <= 10; i++ {
_, _, err = bw.WriteRawBodyIfNotExists(tx, libcommon.Hash{byte(i)}, i, b)
_, err = bw.WriteRawBodyIfNotExists(tx, libcommon.Hash{byte(i)}, i, b)
require.NoError(err)
err = rawdb.WriteCanonicalHash(tx, libcommon.Hash{byte(i)}, i)
require.NoError(err)
@ -55,7 +55,7 @@ func TestBodiesUnwind(t *testing.T) {
require.NoError(err)
require.Equal(2+10*(3+2), int(n))
_, _, err = bw.WriteRawBodyIfNotExists(tx, libcommon.Hash{11}, 11, b)
_, err = bw.WriteRawBodyIfNotExists(tx, libcommon.Hash{11}, 11, b)
require.NoError(err)
err = rawdb.WriteCanonicalHash(tx, libcommon.Hash{11}, 11)
require.NoError(err)

View File

@ -682,7 +682,7 @@ func unwindExecutionStage(u *UnwindState, s *StageState, tx kv.RwTx, ctx context
if err != nil {
return fmt.Errorf("read canonical hash of unwind point: %w", err)
}
txs, err := rawdb.RawTransactionsRange(tx, u.UnwindPoint, s.BlockNumber)
txs, err := cfg.blockReader.RawTransactions(ctx, tx, u.UnwindPoint, s.BlockNumber)
if err != nil {
return err
}

View File

@ -22,6 +22,7 @@ import (
"github.com/ledgerwatch/erigon/ethdb/cbor"
"github.com/ledgerwatch/erigon/params"
"github.com/ledgerwatch/erigon/turbo/engineapi"
"github.com/ledgerwatch/erigon/turbo/services"
)
type FinishCfg struct {
@ -123,7 +124,7 @@ func PruneFinish(u *PruneState, tx kv.RwTx, cfg FinishCfg, ctx context.Context)
return nil
}
func NotifyNewHeaders(ctx context.Context, finishStageBeforeSync uint64, finishStageAfterSync uint64, unwindTo *uint64, notifier ChainEventNotifier, tx kv.Tx, logger log.Logger) error {
func NotifyNewHeaders(ctx context.Context, finishStageBeforeSync uint64, finishStageAfterSync uint64, unwindTo *uint64, notifier ChainEventNotifier, tx kv.Tx, logger log.Logger, blockReader services.FullBlockReader) error {
t := time.Now()
if notifier == nil {
logger.Trace("RPC Daemon notification channel not set. No headers notifications will be sent")
@ -174,7 +175,7 @@ func NotifyNewHeaders(ctx context.Context, finishStageBeforeSync uint64, finishS
t = time.Now()
if notifier.HasLogSubsriptions() {
logs, err := ReadLogs(tx, notifyFrom, isUnwind)
logs, err := ReadLogs(tx, notifyFrom, isUnwind, blockReader)
if err != nil {
return err
}
@ -186,7 +187,7 @@ func NotifyNewHeaders(ctx context.Context, finishStageBeforeSync uint64, finishS
return nil
}
func ReadLogs(tx kv.Tx, from uint64, isUnwind bool) ([]*remote.SubscribeLogsReply, error) {
func ReadLogs(tx kv.Tx, from uint64, isUnwind bool, blockReader services.FullBlockReader) ([]*remote.SubscribeLogsReply, error) {
logs, err := tx.Cursor(kv.Log)
if err != nil {
return nil, err
@ -206,7 +207,7 @@ func ReadLogs(tx kv.Tx, from uint64, isUnwind bool) ([]*remote.SubscribeLogsRepl
if block == nil || blockNum != prevBlockNum {
logIndex = 0
prevBlockNum = blockNum
if block, err = rawdb.ReadBlockByNumber(tx, blockNum); err != nil {
if block, err = blockReader.BlockByNumber(context.Background(), tx, blockNum); err != nil {
return nil, err
}
}

View File

@ -767,11 +767,10 @@ func (s *EthBackendServer) EngineGetPayloadBodiesByHashV1(ctx context.Context, r
for hashIdx, hash := range request.Hashes {
h := gointerfaces.ConvertH256ToHash(hash)
block, err := rawdb.ReadBlockByHash(tx, h)
block, err := s.blockReader.BlockByHash(ctx, tx, h)
if err != nil {
return nil, err
}
body, err := extractPayloadBodyFromBlock(block)
if err != nil {
return nil, err
@ -801,7 +800,10 @@ func (s *EthBackendServer) EngineGetPayloadBodiesByRangeV1(ctx context.Context,
break
}
block := rawdb.ReadBlock(tx, hash, request.Start+i)
block, _, err := s.blockReader.BlockWithSenders(ctx, tx, hash, request.Start+i)
if err != nil {
return nil, err
}
body, err := extractPayloadBodyFromBlock(block)
if err != nil {
return nil, err

View File

@ -222,7 +222,7 @@ func InsertChain(ethereum *eth.Ethereum, chain *core.ChainPack, logger log.Logge
sentryControlServer.Hd.MarkAllVerified()
blockReader, _ := ethereum.BlockIO()
hook := stages.NewHook(ethereum.SentryCtx(), ethereum.Notifications(), ethereum.StagedSync(), ethereum.ChainConfig(), logger, sentryControlServer.UpdateHead)
hook := stages.NewHook(ethereum.SentryCtx(), ethereum.Notifications(), ethereum.StagedSync(), blockReader, ethereum.ChainConfig(), logger, sentryControlServer.UpdateHead)
_, err := stages.StageLoopStep(ethereum.SentryCtx(), ethereum.ChainDB(), ethereum.StagedSync(), initialCycle, logger, blockReader.Snapshots().(*snapshotsync.RoSnapshots), hook)
if err != nil {
return err

View File

@ -35,6 +35,7 @@ type BodyReader interface {
BodyWithTransactions(ctx context.Context, tx kv.Getter, hash common.Hash, blockHeight uint64) (body *types.Body, err error)
BodyRlp(ctx context.Context, tx kv.Getter, hash common.Hash, blockHeight uint64) (bodyRlp rlp.RawValue, err error)
Body(ctx context.Context, tx kv.Getter, hash common.Hash, blockHeight uint64) (body *types.Body, txAmount uint32, err error)
HasSenders(ctx context.Context, tx kv.Getter, hash common.Hash, blockHeight uint64) (bool, error)
}
type TxnReader interface {

View File

@ -123,6 +123,9 @@ func (r *RemoteBlockReader) TxnByIdxInBlock(ctx context.Context, tx kv.Getter, b
return b.Transactions[i], nil
}
func (r *RemoteBlockReader) HasSenders(ctx context.Context, _ kv.Getter, hash libcommon.Hash, blockHeight uint64) (bool, error) {
panic("HasSenders is low-level method, don't use it in RPCDaemon")
}
func (r *RemoteBlockReader) BlockWithSenders(ctx context.Context, _ kv.Getter, hash libcommon.Hash, blockHeight uint64) (block *types.Block, senders []libcommon.Address, err error) {
reply, err := r.client.Block(ctx, &remote.BlockRequest{BlockHash: gointerfaces.ConvertHashToH256(hash), BlockHeight: blockHeight})
if err != nil {
@ -379,6 +382,14 @@ func (r *BlockReader) Body(ctx context.Context, tx kv.Getter, hash libcommon.Has
return body, txAmount, nil
}
func (r *BlockReader) HasSenders(ctx context.Context, tx kv.Getter, hash libcommon.Hash, blockHeight uint64) (bool, error) {
blocksAvailable := r.sn.BlocksAvailable()
if blocksAvailable == 0 || blockHeight > blocksAvailable {
return rawdb.HasSenders(tx, hash, blockHeight)
}
return true, nil
}
func (r *BlockReader) BlockWithSenders(ctx context.Context, tx kv.Getter, hash libcommon.Hash, blockHeight uint64) (block *types.Block, senders []libcommon.Address, err error) {
return r.blockWithSenders(ctx, tx, hash, blockHeight, false)
}
@ -703,7 +714,7 @@ func (r *BlockReader) TxnByIdxInBlock(ctx context.Context, tx kv.Getter, blockNu
return nil, nil
}
txn, err = rawdb.CanonicalTxnByID(tx, b.BaseTxId+1+uint64(i), canonicalHash, r.TransactionsV3)
txn, err = rawdb.CanonicalTxnByID(tx, b.BaseTxId+1+uint64(i), canonicalHash)
if err != nil {
return nil, err
}

View File

@ -1334,7 +1334,7 @@ func DumpTxs(ctx context.Context, db kv.RoDB, blockFrom, blockTo uint64, chainCo
chainID, _ := uint256.FromBig(chainConfig.ChainID)
var prevTxID uint64
numBuf := make([]byte, binary.MaxVarintLen64)
numBuf := make([]byte, 8)
parseCtx := types2.NewTxParseContext(*chainID)
parseCtx.WithSender(false)
slot := types2.TxSlot{}
@ -1356,7 +1356,7 @@ func DumpTxs(ctx context.Context, db kv.RoDB, blockFrom, blockTo uint64, chainCo
valueBuf := make([]byte, 16*4096)
addSystemTx := func(tx kv.Tx, txId uint64) error {
binary.BigEndian.PutUint64(numBuf, txId)
tv, err := tx.GetOne(kv.EthTx, numBuf[:8])
tv, err := tx.GetOne(kv.EthTx, numBuf)
if err != nil {
return err
}
@ -1424,7 +1424,7 @@ func DumpTxs(ctx context.Context, db kv.RoDB, blockFrom, blockTo uint64, chainCo
prevTxID = body.BaseTxId
}
binary.BigEndian.PutUint64(numBuf, body.BaseTxId+1)
if err := tx.ForAmount(kv.EthTx, numBuf[:8], body.TxAmount-2, func(tk, tv []byte) error {
if err := tx.ForAmount(kv.EthTx, numBuf, body.TxAmount-2, func(tk, tv []byte) error {
id := binary.BigEndian.Uint64(tk)
if prevTxID != 0 && id != prevTxID+1 {
panic(fmt.Sprintf("no gaps in tx ids are allowed: block %d does jump from %d to %d", blockNum, prevTxID, id))
@ -1540,21 +1540,25 @@ func DumpBodies(ctx context.Context, db kv.RoDB, blockFrom, blockTo uint64, firs
}
copy(key, k)
copy(key[8:], v)
dataRLP, err := tx.GetOne(kv.BlockBody, key)
// Important: DB does store canonical and non-canonical txs in same table. And using same body.BaseTxID
// But snapshots using canonical TxNum in field body.BaseTxID
// So, we manually calc this field here and serialize again.
//
// FYI: we also have other table to map canonical BlockNum->TxNum: kv.MaxTxNum
body, err := rawdb.ReadBodyForStorageByKey(tx, key)
if err != nil {
return false, err
}
if dataRLP == nil {
logger.Warn("header missed", "block_num", blockNum, "hash", hex.EncodeToString(v))
if body == nil {
logger.Warn("body missed", "block_num", blockNum, "hash", hex.EncodeToString(v))
return true, nil
}
body := &types.BodyForStorage{}
if err = rlp.DecodeBytes(dataRLP, body); err != nil {
return false, err
}
body.BaseTxId = firstTxNum
firstTxNum += uint64(body.TxAmount)
dataRLP, err = rlp.EncodeToBytes(body)
dataRLP, err := rlp.EncodeToBytes(body)
if err != nil {
return false, err
}

View File

@ -588,7 +588,7 @@ func (ms *MockSentry) insertPoWBlocks(chain *core.ChainPack) error {
}
initialCycle := MockInsertAsInitialCycle
blockReader, _ := ms.NewBlocksIO()
hook := NewHook(ms.Ctx, ms.Notifications, ms.Sync, ms.ChainConfig, ms.Log, ms.UpdateHead)
hook := NewHook(ms.Ctx, ms.Notifications, ms.Sync, blockReader, ms.ChainConfig, ms.Log, ms.UpdateHead)
if _, err = StageLoopStep(ms.Ctx, ms.DB, ms.Sync, initialCycle, ms.Log, blockReader.Snapshots().(*snapshotsync.RoSnapshots), hook); err != nil {
return err
}
@ -613,7 +613,7 @@ func (ms *MockSentry) insertPoSBlocks(chain *core.ChainPack) error {
initialCycle := false
blockReader, _ := ms.NewBlocksIO()
hook := NewHook(ms.Ctx, ms.Notifications, ms.Sync, ms.ChainConfig, ms.Log, ms.UpdateHead)
hook := NewHook(ms.Ctx, ms.Notifications, ms.Sync, blockReader, ms.ChainConfig, ms.Log, ms.UpdateHead)
headBlockHash, err := StageLoopStep(ms.Ctx, ms.DB, ms.Sync, initialCycle, ms.Log, blockReader.Snapshots().(*snapshotsync.RoSnapshots), hook)
if err != nil {
return err

View File

@ -496,8 +496,9 @@ func TestAnchorReplace2(t *testing.T) {
m.ReceiveWg.Wait() // Wait for all messages to be processed before we proceeed
br, _ := m.NewBlocksIO()
initialCycle := true
hook := stages.NewHook(m.Ctx, m.Notifications, m.Sync, m.ChainConfig, m.Log, m.UpdateHead)
hook := stages.NewHook(m.Ctx, m.Notifications, m.Sync, br, m.ChainConfig, m.Log, m.UpdateHead)
if _, err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, initialCycle, m.Log, m.BlockSnapshots, hook); err != nil {
t.Fatal(err)
}

View File

@ -231,11 +231,12 @@ type Hook struct {
sync *stagedsync.Sync
chainConfig *chain.Config
logger log.Logger
blockReader services.FullBlockReader
updateHead func(ctx context.Context, headHeight uint64, headTime uint64, hash libcommon.Hash, td *uint256.Int)
}
func NewHook(ctx context.Context, notifications *shards.Notifications, sync *stagedsync.Sync, chainConfig *chain.Config, logger log.Logger, updateHead func(ctx context.Context, headHeight uint64, headTime uint64, hash libcommon.Hash, td *uint256.Int)) *Hook {
return &Hook{ctx: ctx, notifications: notifications, sync: sync, chainConfig: chainConfig, logger: logger, updateHead: updateHead}
func NewHook(ctx context.Context, notifications *shards.Notifications, sync *stagedsync.Sync, blockReader services.FullBlockReader, chainConfig *chain.Config, logger log.Logger, updateHead func(ctx context.Context, headHeight uint64, headTime uint64, hash libcommon.Hash, td *uint256.Int)) *Hook {
return &Hook{ctx: ctx, notifications: notifications, sync: sync, blockReader: blockReader, chainConfig: chainConfig, logger: logger, updateHead: updateHead}
}
func (h *Hook) BeforeRun(tx kv.Tx, canRunCycleInOneTransaction bool) error {
notifications := h.notifications
@ -250,6 +251,7 @@ func (h *Hook) BeforeRun(tx kv.Tx, canRunCycleInOneTransaction bool) error {
}
func (h *Hook) AfterRun(tx kv.Tx, finishProgressBefore uint64) error {
notifications := h.notifications
blockReader := h.blockReader
// -- send notifications START
//TODO: can this 2 headers be 1
var headHeader, currentHeder *types.Header
@ -287,7 +289,7 @@ func (h *Hook) AfterRun(tx kv.Tx, finishProgressBefore uint64) error {
}
if notifications != nil && notifications.Events != nil {
if err = stagedsync.NotifyNewHeaders(h.ctx, finishProgressBefore, head, h.sync.PrevUnwindPoint(), notifications.Events, tx, h.logger); err != nil {
if err = stagedsync.NotifyNewHeaders(h.ctx, finishProgressBefore, head, h.sync.PrevUnwindPoint(), notifications.Events, tx, h.logger, blockReader); err != nil {
return nil
}
}