diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 752ba3b7b..74059d428 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -602,7 +602,7 @@ where /// /// If type inference errors are being raised, see the comment on the definition of `Self`. #[allow(clippy::type_complexity)] - pub fn build( + pub fn build( mut self, ) -> Result>, String> { diff --git a/beacon_node/network/src/beacon_processor/mod.rs b/beacon_node/network/src/beacon_processor/mod.rs index f477878ac..63c414e2e 100644 --- a/beacon_node/network/src/beacon_processor/mod.rs +++ b/beacon_node/network/src/beacon_processor/mod.rs @@ -59,7 +59,8 @@ use std::task::Context; use std::time::Duration; use std::{cmp, collections::HashSet}; use task_executor::TaskExecutor; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, oneshot}; +use lighthouse_network::rpc::methods::TxBlobsByRangeRequest; use types::{ Attestation, AttesterSlashing, Hash256, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedContributionAndProof, SignedVoluntaryExit, SubnetId, @@ -152,6 +153,8 @@ const MAX_STATUS_QUEUE_LEN: usize = 1_024; /// will be stored before we start dropping them. const MAX_BLOCKS_BY_RANGE_QUEUE_LEN: usize = 1_024; +const MAX_TX_BLOBS_BY_RANGE_QUEUE_LEN: usize = 1_024; + /// The maximum number of queued `BlocksByRootRequest` objects received from the network RPC that /// will be stored before we start dropping them. const MAX_BLOCKS_BY_ROOTS_QUEUE_LEN: usize = 1_024; @@ -194,6 +197,7 @@ pub const RPC_BLOCK: &str = "rpc_block"; pub const CHAIN_SEGMENT: &str = "chain_segment"; pub const STATUS_PROCESSING: &str = "status_processing"; pub const BLOCKS_BY_RANGE_REQUEST: &str = "blocks_by_range_request"; +pub const TX_BLOBS_BY_RANGE_REQUEST: &str = "tx_blobs_by_range_request"; pub const BLOCKS_BY_ROOTS_REQUEST: &str = "blocks_by_roots_request"; pub const UNKNOWN_BLOCK_ATTESTATION: &str = "unknown_block_attestation"; pub const UNKNOWN_BLOCK_AGGREGATE: &str = "unknown_block_aggregate"; @@ -541,6 +545,21 @@ impl WorkEvent { } } + pub fn tx_blob_by_range_request( + peer_id: PeerId, + request_id: PeerRequestId, + request: TxBlobsByRangeRequest, + ) -> Self { + Self { + drop_during_sync: false, + work: Work::TxBlobsByRangeRequest { + peer_id, + request_id, + request, + }, + } + } + /// Create a new work event to process `BlocksByRootRequest`s from the RPC network. pub fn blocks_by_roots_request( peer_id: PeerId, @@ -728,6 +747,11 @@ pub enum Work { request_id: PeerRequestId, request: BlocksByRangeRequest, }, + TxBlobsByRangeRequest { + peer_id: PeerId, + request_id: PeerRequestId, + request: TxBlobsByRangeRequest, + }, BlocksByRootsRequest { peer_id: PeerId, request_id: PeerRequestId, @@ -754,6 +778,7 @@ impl Work { Work::ChainSegment { .. } => CHAIN_SEGMENT, Work::Status { .. } => STATUS_PROCESSING, Work::BlocksByRangeRequest { .. } => BLOCKS_BY_RANGE_REQUEST, + Work::TxBlobsByRangeRequest { .. } => TX_BLOBS_BY_RANGE_REQUEST, Work::BlocksByRootsRequest { .. } => BLOCKS_BY_ROOTS_REQUEST, Work::UnknownBlockAttestation { .. } => UNKNOWN_BLOCK_ATTESTATION, Work::UnknownBlockAggregate { .. } => UNKNOWN_BLOCK_AGGREGATE, @@ -897,6 +922,7 @@ impl BeaconProcessor { let mut status_queue = FifoQueue::new(MAX_STATUS_QUEUE_LEN); let mut bbrange_queue = FifoQueue::new(MAX_BLOCKS_BY_RANGE_QUEUE_LEN); + let mut txbbrange_queue = FifoQueue::new(MAX_TX_BLOBS_BY_RANGE_QUEUE_LEN); let mut bbroots_queue = FifoQueue::new(MAX_BLOCKS_BY_ROOTS_QUEUE_LEN); // Channels for sending work to the re-process scheduler (`work_reprocessing_tx`) and to @@ -1119,6 +1145,8 @@ impl BeaconProcessor { self.spawn_worker(item, toolbox); } else if let Some(item) = bbrange_queue.pop() { self.spawn_worker(item, toolbox); + } else if let Some(item) = txbbrange_queue.pop() { + self.spawn_worker(item, toolbox); } else if let Some(item) = bbroots_queue.pop() { self.spawn_worker(item, toolbox); // Check slashings after all other consensus messages so we prioritize @@ -1234,6 +1262,9 @@ impl BeaconProcessor { Work::BlocksByRangeRequest { .. } => { bbrange_queue.push(work, work_id, &self.log) } + Work::TxBlobsByRangeRequest { .. } => { + txbbrange_queue.push(work, work_id, &self.log) + } Work::BlocksByRootsRequest { .. } => { bbroots_queue.push(work, work_id, &self.log) } diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index 37aee0171..d480004f5 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -8,6 +8,7 @@ use lighthouse_network::rpc::StatusMessage; use lighthouse_network::rpc::*; use lighthouse_network::{PeerId, PeerRequestId, ReportSource, Response, SyncInfo}; use slog::{debug, error}; +use lighthouse_network::rpc::methods::TxBlobsByRangeRequest; use slot_clock::SlotClock; use std::sync::Arc; use task_executor::TaskExecutor; @@ -122,6 +123,15 @@ impl Worker { } } + pub fn handle_tx_blobs_by_range_request( + &self, + peer_id: PeerId, + request_id: PeerRequestId, + mut req: TxBlobsByRangeRequest, + ) { + //FIXME(sean) + } + /// Handle a `BlocksByRoot` request from the peer. pub fn handle_blocks_by_root_request( self, diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index 6bc642206..80b3a20d9 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -209,7 +209,9 @@ impl Processor { request_id: PeerRequestId, req: TxBlobsByRangeRequest, ) { - //FIXME(sean) + self.send_beacon_processor_work(BeaconWorkEvent::tx_blob_by_range_request( + peer_id, request_id, req, + )) } pub fn on_tx_blobs_by_range_response( @@ -218,7 +220,24 @@ impl Processor { request_id: RequestId, blob_wrapper: Option>>, ) { - //FIXME(sean) + trace!( + self.log, + "Received TxBlobsByRange Response"; + "peer" => %peer_id, + ); + + if let RequestId::Sync(id) = request_id { + self.send_to_sync(SyncMessage::TxBlobsByRangeResponse { + peer_id, + request_id: id, + blob_wrapper, + }); + } else { + debug!( + self.log, + "All tx blobs by range responses should belong to sync" + ); + } } /// Handle a `BlocksByRoot` response from the peer. diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index cdef90471..6110bdf52 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -53,7 +53,7 @@ use std::ops::Sub; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; -use types::{EthSpec, Hash256, SignedBeaconBlock, Slot}; +use types::{BlobWrapper, Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot}; /// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync /// from a peer. If a peer is within this tolerance (forwards or backwards), it is treated as a @@ -88,6 +88,18 @@ pub enum SyncMessage { /// A block has been received from the RPC. RpcBlock { request_id: RequestId, + beacon_block: Option>>, + }, + + /// A [`TxBlobsByRangeResponse`] response has been received. + TxBlobsByRangeResponse { + peer_id: PeerId, + request_id: RequestId, + blob_wrapper: Option>>, + }, + + /// A [`BlocksByRoot`] response has been received. + BlocksByRootResponse { peer_id: PeerId, beacon_block: Option>>, seen_timestamp: Duration, diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index 253145438..d7a0d86cd 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -55,7 +55,7 @@ use lru_cache::LRUTimeCache; use slog::{crit, debug, trace, warn}; use std::collections::HashMap; use std::sync::Arc; -use types::{Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot}; +use types::{BlobWrapper, Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot}; /// For how long we store failed finalized chains to prevent retries. const FAILED_CHAINS_EXPIRY_SECONDS: u64 = 30;