diff --git a/cmd/sentry/sentry/sentry_multi_client.go b/cmd/sentry/sentry/sentry_multi_client.go index ecf2b7818..9f8a41bfc 100644 --- a/cmd/sentry/sentry/sentry_multi_client.go +++ b/cmd/sentry/sentry/sentry_multi_client.go @@ -552,7 +552,7 @@ func (cs *MultiClient) blockBodies66(ctx context.Context, inreq *proto_sentry.In if err := rlp.DecodeBytes(inreq.Data, &request); err != nil { return fmt.Errorf("decode BlockBodiesPacket66: %w", err) } - txs, uncles := request.BlockRawBodiesPacket.Unpack() + txs, uncles, withdrawals := request.BlockRawBodiesPacket.Unpack() if len(txs) == 0 && len(uncles) == 0 { outreq := proto_sentry.PeerUselessRequest{ PeerId: inreq.PeerId, @@ -564,7 +564,7 @@ func (cs *MultiClient) blockBodies66(ctx context.Context, inreq *proto_sentry.In // No point processing empty response return nil } - cs.Bd.DeliverBodies(&txs, &uncles, uint64(len(inreq.Data)), ConvertH512ToPeerID(inreq.PeerId)) + cs.Bd.DeliverBodies(txs, uncles, withdrawals, uint64(len(inreq.Data)), ConvertH512ToPeerID(inreq.PeerId)) return nil } diff --git a/eth/protocols/eth/protocol.go b/eth/protocols/eth/protocol.go index c9d41bde3..a42db7ac4 100644 --- a/eth/protocols/eth/protocol.go +++ b/eth/protocols/eth/protocol.go @@ -707,17 +707,18 @@ func (rb *BlockRawBody) DecodeRLP(s *rlp.Stream) error { return s.ListEnd() } -// Unpack retrieves the transactions and uncles from the range packet and returns +// Unpack retrieves the transactions, uncles, and withdrawals from the range packet and returns // them in a split flat format that's more consistent with the internal data structures. -func (p *BlockRawBodiesPacket) Unpack() ([][][]byte, [][]*types.Header) { +func (p *BlockRawBodiesPacket) Unpack() ([][][]byte, [][]*types.Header, []types.Withdrawals) { var ( - txset = make([][][]byte, len(*p)) - uncleset = make([][]*types.Header, len(*p)) + txSet = make([][][]byte, len(*p)) + uncleSet = make([][]*types.Header, len(*p)) + withdrawalSet = make([]types.Withdrawals, len(*p)) ) for i, body := range *p { - txset[i], uncleset[i] = body.Transactions, body.Uncles + txSet[i], uncleSet[i], withdrawalSet[i] = body.Transactions, body.Uncles, body.Withdrawals } - return txset, uncleset + return txSet, uncleSet, withdrawalSet } // GetNodeDataPacket represents a trie node data query. diff --git a/turbo/stages/bodydownload/body_algos.go b/turbo/stages/bodydownload/body_algos.go index ace801843..e2dd8b32d 100644 --- a/turbo/stages/bodydownload/body_algos.go +++ b/turbo/stages/bodydownload/body_algos.go @@ -41,7 +41,7 @@ func (bd *BodyDownload) UpdateFromDb(db kv.Tx) (headHeight, headTime uint64, hea bd.requestedLow = bodyProgress + 1 bd.lowWaitUntil = 0 bd.requestHigh = bd.requestedLow + (bd.outstandingLimit / 2) - bd.requestedMap = make(map[DoubleHash]uint64) + bd.requestedMap = make(map[TripleHash]uint64) bd.delivered.Clear() bd.deliveredCount = 0 bd.wastedCount = 0 @@ -159,14 +159,20 @@ func (bd *BodyDownload) RequestMoreBodies(tx kv.RwTx, blockReader services.FullB request = false } else { bd.deliveriesH[blockNum] = header - if header.UncleHash != types.EmptyUncleHash || header.TxHash != types.EmptyRootHash { + if header.UncleHash != types.EmptyUncleHash || header.TxHash != types.EmptyRootHash || + (header.WithdrawalsHash != nil && *header.WithdrawalsHash != types.EmptyRootHash) { // Perhaps we already have this block block := rawdb.ReadBlock(tx, hash, blockNum) if block == nil { - var doubleHash DoubleHash - copy(doubleHash[:], header.UncleHash.Bytes()) - copy(doubleHash[length.Hash:], header.TxHash.Bytes()) - bd.requestedMap[doubleHash] = blockNum + var tripleHash TripleHash + copy(tripleHash[:], header.UncleHash.Bytes()) + copy(tripleHash[length.Hash:], header.TxHash.Bytes()) + if header.WithdrawalsHash != nil { + copy(tripleHash[2*length.Hash:], header.WithdrawalsHash.Bytes()) + } else { + copy(tripleHash[2*length.Hash:], types.EmptyRootHash.Bytes()) + } + bd.requestedMap[tripleHash] = blockNum } else { err = bd.addBodyToBucket(tx, blockNum, block.RawBody()) if err != nil { @@ -188,7 +194,7 @@ func (bd *BodyDownload) RequestMoreBodies(tx kv.RwTx, blockReader services.FullB blockNums = append(blockNums, blockNum) hashes = append(hashes, hash) } else { - // Both uncleHash and txHash are empty (or block is prefetched), no need to request + // uncleHash, txHash, and withdrawalsHash are all empty (or block is prefetched), no need to request bd.delivered.Add(blockNum) } } @@ -245,8 +251,8 @@ func (bd *BodyDownload) RequestSent(bodyReq *BodyRequest, timeWithTimeout uint64 } // DeliverBodies takes the block body received from a peer and adds it to the various data structures -func (bd *BodyDownload) DeliverBodies(txs *[][][]byte, uncles *[][]*types.Header, lenOfP2PMsg uint64, peerID [64]byte) { - bd.deliveryCh <- Delivery{txs: txs, uncles: uncles, lenOfP2PMessage: lenOfP2PMsg, peerID: peerID} +func (bd *BodyDownload) DeliverBodies(txs [][][]byte, uncles [][]*types.Header, withdrawals []types.Withdrawals, lenOfP2PMsg uint64, peerID [64]byte) { + bd.deliveryCh <- Delivery{txs: txs, uncles: uncles, withdrawals: withdrawals, lenOfP2PMessage: lenOfP2PMsg, peerID: peerID} select { case bd.DeliveryNotify <- struct{}{}: @@ -302,25 +308,29 @@ Loop: if delivery.uncles == nil { log.Warn("nil uncles delivered", "peer_id", delivery.peerID, "p2p_msg_len", delivery.lenOfP2PMessage) } - if delivery.txs == nil || delivery.uncles == nil { + if delivery.withdrawals == nil { + log.Warn("nil withdrawals delivered", "peer_id", delivery.peerID, "p2p_msg_len", delivery.lenOfP2PMessage) + } + if delivery.txs == nil || delivery.uncles == nil || delivery.withdrawals == nil { log.Debug("delivery body processing has been skipped due to nil tx|data") continue } - // TODO(yperbasis): withdrawals reqMap := make(map[uint64]*BodyRequest) - txs, uncles, lenOfP2PMessage, _ := *delivery.txs, *delivery.uncles, delivery.lenOfP2PMessage, delivery.peerID + txs, uncles, withdrawals, lenOfP2PMessage := delivery.txs, delivery.uncles, delivery.withdrawals, delivery.lenOfP2PMessage for i := range txs { uncleHash := types.CalcUncleHash(uncles[i]) txHash := types.DeriveSha(RawTransactions(txs[i])) - var doubleHash DoubleHash - copy(doubleHash[:], uncleHash.Bytes()) - copy(doubleHash[length.Hash:], txHash.Bytes()) + withdrawalsHash := types.DeriveSha(withdrawals[i]) + var tripleHash TripleHash + copy(tripleHash[:], uncleHash.Bytes()) + copy(tripleHash[length.Hash:], txHash.Bytes()) + copy(tripleHash[2*length.Hash:], withdrawalsHash.Bytes()) // Block numbers are added to the bd.delivered bitmap here, only for blocks for which the body has been received, and their double hashes are present in the bd.requestedMap // Also, block numbers can be added to bd.delivered for empty blocks, above - blockNum, ok := bd.requestedMap[doubleHash] + blockNum, ok := bd.requestedMap[tripleHash] if !ok { undelivered++ continue @@ -331,9 +341,9 @@ Loop: reqMap[req.BlockNums[0]] = req } } - delete(bd.requestedMap, doubleHash) // Delivered, cleaning up + delete(bd.requestedMap, tripleHash) // Delivered, cleaning up - err := bd.addBodyToBucket(tx, blockNum, &types.RawBody{Transactions: txs[i], Uncles: uncles[i]}) + err := bd.addBodyToBucket(tx, blockNum, &types.RawBody{Transactions: txs[i], Uncles: uncles[i], Withdrawals: withdrawals[i]}) if err != nil { return 0, 0, err } diff --git a/turbo/stages/bodydownload/body_data_struct.go b/turbo/stages/bodydownload/body_data_struct.go index 9280b3148..7f04b677e 100644 --- a/turbo/stages/bodydownload/body_data_struct.go +++ b/turbo/stages/bodydownload/body_data_struct.go @@ -8,22 +8,23 @@ import ( "github.com/ledgerwatch/erigon/core/types" ) -// DoubleHash is type to be used for the mapping between TxHash and UncleHash to the block header -type DoubleHash [2 * common.HashLength]byte +// TripleHash is type to be used for the mapping between TxHash, UncleHash, and WithdrawalsHash to the block header +type TripleHash [3 * common.HashLength]byte const MaxBodiesInRequest = 1024 type Delivery struct { peerID [64]byte - txs *[][][]byte - uncles *[][]*types.Header + txs [][][]byte + uncles [][]*types.Header + withdrawals []types.Withdrawals lenOfP2PMessage uint64 } // BodyDownload represents the state of body downloading process type BodyDownload struct { peerMap map[[64]byte]int - requestedMap map[DoubleHash]uint64 + requestedMap map[TripleHash]uint64 DeliveryNotify chan struct{} deliveryCh chan Delivery Engine consensus.Engine @@ -54,7 +55,7 @@ type BodyRequest struct { // NewBodyDownload create a new body download state object func NewBodyDownload(outstandingLimit int, engine consensus.Engine) *BodyDownload { bd := &BodyDownload{ - requestedMap: make(map[DoubleHash]uint64), + requestedMap: make(map[TripleHash]uint64), outstandingLimit: uint64(outstandingLimit), delivered: roaring64.New(), deliveriesH: make(map[uint64]*types.Header),