diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 33198f0a3..eb8df6f2a 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -46,6 +46,26 @@ pub enum BlockProcessingOutcome { InvalidBlock(InvalidBlock), } +impl BlockProcessingOutcome { + /// Returns `true` if the block was objectively invalid and we should disregard the peer who + /// sent it. + pub fn is_invalid(&self) -> bool { + match self { + BlockProcessingOutcome::ValidBlock(_) => false, + BlockProcessingOutcome::InvalidBlock(r) => match r { + InvalidBlock::FutureSlot => true, + InvalidBlock::StateRootMismatch => true, + InvalidBlock::ParentUnknown => false, + InvalidBlock::SlotProcessingError(_) => false, + InvalidBlock::PerBlockProcessingError(e) => match e { + BlockProcessingError::Invalid(_) => true, + BlockProcessingError::BeaconStateError(_) => false, + }, + }, + } + } +} + pub struct BeaconChain { pub block_store: Arc>, pub state_store: Arc>, @@ -685,10 +705,10 @@ where // TODO: check the block proposer signature BEFORE doing a state transition. This will // significantly lower exposure surface to DoS attacks. - // Transition the parent state to the present slot. + // Transition the parent state to the block slot. let mut state = parent_state; let previous_block_header = parent_block.block_header(); - for _ in state.slot.as_u64()..present_slot.as_u64() { + for _ in state.slot.as_u64()..block.slot.as_u64() { if let Err(e) = per_slot_processing(&mut state, &previous_block_header, &self.spec) { return Ok(BlockProcessingOutcome::InvalidBlock( InvalidBlock::SlotProcessingError(e), diff --git a/beacon_node/network/src/beacon_chain.rs b/beacon_node/network/src/beacon_chain.rs index e2829cfa6..bb4e8e71e 100644 --- a/beacon_node/network/src/beacon_chain.rs +++ b/beacon_node/network/src/beacon_chain.rs @@ -10,7 +10,7 @@ use beacon_chain::{ use eth2_libp2p::HelloMessage; use types::{BeaconBlock, BeaconStateError, Epoch, Hash256, Slot}; -pub use beacon_chain::BeaconChainError; +pub use beacon_chain::{BeaconChainError, BlockProcessingOutcome}; /// The network's API to the beacon chain. pub trait BeaconChain: Send + Sync { @@ -34,6 +34,9 @@ pub trait BeaconChain: Send + Sync { fn hello_message(&self) -> HelloMessage; + fn process_block(&self, block: BeaconBlock) + -> Result; + fn get_block_roots( &self, start_slot: Slot, @@ -98,6 +101,13 @@ where } } + fn process_block( + &self, + block: BeaconBlock, + ) -> Result { + self.process_block(block) + } + fn get_block_roots( &self, start_slot: Slot, diff --git a/beacon_node/network/src/message_handler.rs b/beacon_node/network/src/message_handler.rs index 1a790eee1..2a84616e5 100644 --- a/beacon_node/network/src/message_handler.rs +++ b/beacon_node/network/src/message_handler.rs @@ -161,6 +161,17 @@ impl MessageHandler { &mut self.network_context, ) } + RPCResponse::BeaconBlockBodies(response) => { + debug!( + self.log, + "BeaconBlockBodies response received"; "peer" => format!("{:?}", peer_id) + ); + self.sync.on_beacon_block_bodies_response( + peer_id, + response, + &mut self.network_context, + ) + } // TODO: Handle all responses _ => panic!("Unknown response: {:?}", response), } diff --git a/beacon_node/network/src/sync/simple_sync.rs b/beacon_node/network/src/sync/simple_sync.rs index ee0646dbb..b190f787f 100644 --- a/beacon_node/network/src/sync/simple_sync.rs +++ b/beacon_node/network/src/sync/simple_sync.rs @@ -3,12 +3,12 @@ use crate::message_handler::NetworkContext; use eth2_libp2p::rpc::methods::*; use eth2_libp2p::rpc::{RPCRequest, RPCResponse}; use eth2_libp2p::PeerId; -use slog::{debug, error, o, warn}; +use slog::{debug, error, info, o, warn}; use ssz::TreeHash; use std::collections::HashMap; use std::sync::Arc; -use std::time::Instant; -use types::{BeaconBlockHeader, Epoch, Hash256, Slot}; +use std::time::{Duration, Instant}; +use types::{BeaconBlock, BeaconBlockBody, BeaconBlockHeader, Epoch, Hash256, Slot}; /// The number of slots that we can import blocks ahead of us, before going into full Sync mode. const SLOT_IMPORT_TOLERANCE: u64 = 100; @@ -102,7 +102,11 @@ pub struct SimpleSync { impl SimpleSync { pub fn new(beacon_chain: Arc, log: &slog::Logger) -> Self { let sync_logger = log.new(o!("Service"=> "Sync")); - let import_queue = ImportQueue::new(beacon_chain.clone(), log.clone()); + + let queue_item_stale_time = Duration::from_secs(600); + + let import_queue = + ImportQueue::new(beacon_chain.clone(), queue_item_stale_time, log.clone()); SimpleSync { chain: beacon_chain.clone(), known_peers: HashMap::new(), @@ -229,13 +233,72 @@ impl SimpleSync { return; } - let block_roots = self.import_queue.enqueue_headers(response.headers); + let block_roots = self + .import_queue + .enqueue_headers(response.headers, peer_id.clone()); if !block_roots.is_empty() { self.request_block_bodies(peer_id, BeaconBlockBodiesRequest { block_roots }, network); } } + pub fn on_beacon_block_bodies_response( + &mut self, + peer_id: PeerId, + response: BeaconBlockBodiesResponse, + network: &mut NetworkContext, + ) { + self.import_queue + .enqueue_bodies(response.block_bodies, peer_id.clone()); + self.process_import_queue(network); + } + + pub fn process_import_queue(&mut self, network: &mut NetworkContext) { + let mut blocks: Vec<(Hash256, BeaconBlock, PeerId)> = self + .import_queue + .partials + .iter() + .filter_map(|(key, partial)| { + if let Some(_) = partial.body { + let (block, _root) = partial.clone().complete().expect("Body must be Some"); + Some((*key, block, partial.sender.clone())) + } else { + None + } + }) + .collect(); + + // Sort the blocks to be in ascending slot order. + blocks.sort_unstable_by(|a, b| a.1.slot.partial_cmp(&b.1.slot).unwrap()); + + let mut imported_keys = vec![]; + + for (key, block, sender) in blocks { + match self.chain.process_block(block) { + Ok(outcome) => { + if outcome.is_invalid() { + warn!(self.log, "Invalid block: {:?}", outcome); + network.disconnect(sender); + } else { + imported_keys.push(key) + } + } + Err(e) => { + error!(self.log, "Error during block processing"; "error" => format!("{:?}", e)) + } + } + } + + println!("imported_keys.len: {:?}", imported_keys.len()); + + if !imported_keys.is_empty() { + info!(self.log, "Imported {} blocks", imported_keys.len()); + for key in imported_keys { + self.import_queue.partials.remove(&key); + } + } + } + fn request_block_roots( &mut self, peer_id: PeerId, @@ -298,19 +361,41 @@ pub struct ImportQueue { pub chain: Arc, /// Partially imported blocks, keyed by the root of `BeaconBlockBody`. pub partials: HashMap, + /// Time before a queue entry is consider state. + pub stale_time: Duration, /// Logging log: slog::Logger, } impl ImportQueue { - pub fn new(chain: Arc, log: slog::Logger) -> Self { + pub fn new(chain: Arc, stale_time: Duration, log: slog::Logger) -> Self { Self { chain, partials: HashMap::new(), + stale_time, log, } } + pub fn remove_stale(&mut self) { + let keys: Vec = self + .partials + .iter() + .filter_map(|(key, partial)| { + if partial.inserted + self.stale_time >= Instant::now() { + Some(*key) + } else { + None + } + }) + .collect(); + + keys.iter().for_each(|key| { + self.partials.remove(&key); + }); + } + + /// Returns `true` if `self.chain` has not yet processed this block. fn is_new_block(&self, block_root: &Hash256) -> bool { self.chain .is_new_block_root(&block_root) @@ -322,9 +407,6 @@ impl ImportQueue { /// Returns the index of the first new root in the list of block roots. pub fn first_new_root(&mut self, roots: &[BlockRootSlot]) -> Option { - for root in roots { - println!("root {}", root.block_root); - } roots .iter() .position(|brs| self.is_new_block(&brs.block_root)) @@ -339,14 +421,18 @@ impl ImportQueue { /// If a `header` is already in the queue, but not yet processed by the chain the block root is /// included in the output and the `inserted` time for the partial record is set to /// `Instant::now()`. Updating the `inserted` time stops the partial from becoming stale. - pub fn enqueue_headers(&mut self, headers: Vec) -> Vec { + pub fn enqueue_headers( + &mut self, + headers: Vec, + sender: PeerId, + ) -> Vec { let mut required_bodies: Vec = vec![]; for header in headers { let block_root = Hash256::from_slice(&header.hash_tree_root()[..]); if self.is_new_block(&block_root) { - self.insert_partial(block_root, header); + self.insert_header(block_root, header, sender.clone()); required_bodies.push(block_root) } } @@ -354,20 +440,60 @@ impl ImportQueue { required_bodies } - fn insert_partial(&mut self, block_root: Hash256, header: BeaconBlockHeader) { - self.partials.insert( - header.block_body_root, - PartialBeaconBlock { + /// If there is a matching `header` for this `body`, adds it to the queue. + /// + /// If there is no `header` for the `body`, the body is simply discarded. + pub fn enqueue_bodies(&mut self, bodies: Vec, sender: PeerId) { + for body in bodies { + self.insert_body(body, sender.clone()); + } + } + + /// Inserts a header to the queue. + /// + /// If the header already exists, the `inserted` time is set to `now` and not other + /// modifications are made. + fn insert_header(&mut self, block_root: Hash256, header: BeaconBlockHeader, sender: PeerId) { + self.partials + .entry(header.block_body_root) + .and_modify(|p| p.inserted = Instant::now()) + .or_insert(PartialBeaconBlock { block_root, header, + body: None, inserted: Instant::now(), - }, - ); + sender, + }); + } + + /// Updates an existing partial with the `body`. + /// + /// If there is no header for the `body`, the body is simply discarded. + fn insert_body(&mut self, body: BeaconBlockBody, sender: PeerId) { + let body_root = Hash256::from_slice(&body.hash_tree_root()[..]); + + self.partials.entry(body_root).and_modify(|p| { + if body_root == p.header.block_body_root { + p.body = Some(body); + p.inserted = Instant::now(); + p.sender = sender; + } + }); } } +#[derive(Clone, Debug)] pub struct PartialBeaconBlock { pub block_root: Hash256, pub header: BeaconBlockHeader, + pub body: Option, pub inserted: Instant, + pub sender: PeerId, +} + +impl PartialBeaconBlock { + /// Given a `body`, consumes `self` and returns a complete `BeaconBlock` along with its root. + pub fn complete(self) -> Option<(BeaconBlock, Hash256)> { + Some((self.header.into_block(self.body?), self.block_root)) + } } diff --git a/beacon_node/network/tests/tests.rs b/beacon_node/network/tests/tests.rs index 076a3f529..57587717b 100644 --- a/beacon_node/network/tests/tests.rs +++ b/beacon_node/network/tests/tests.rs @@ -77,7 +77,16 @@ impl SyncNode { match request { RPCRequest::BeaconBlockHeaders(request) => request, - _ => panic!("Did not get block root request"), + _ => panic!("Did not get block headers request"), + } + } + + pub fn get_block_bodies_request(&self) -> BeaconBlockBodiesRequest { + let request = self.recv_rpc_request().expect("No block bodies request"); + + match request { + RPCRequest::BeaconBlockBodies(request) => request, + _ => panic!("Did not get block bodies request"), } } @@ -223,6 +232,29 @@ impl SyncMaster { self.send_rpc_response(node, response) } + pub fn respond_to_block_bodies_request( + &mut self, + node: &SyncNode, + request: BeaconBlockBodiesRequest, + ) { + let block_bodies: Vec = request + .block_roots + .iter() + .map(|root| { + let block = self + .harness + .beacon_chain + .get_block(root) + .expect("Failed to load block") + .expect("Block did not exist"); + block.body + }) + .collect(); + + let response = RPCResponse::BeaconBlockBodies(BeaconBlockBodiesResponse { block_bodies }); + self.send_rpc_response(node, response) + } + fn send_rpc_response(&mut self, node: &SyncNode, rpc_response: RPCResponse) { node.send(self.rpc_response(node, rpc_response)); } @@ -311,6 +343,11 @@ fn first_test() { master.respond_to_block_headers_request(&nodes[0], headers_request); - std::thread::sleep(Duration::from_millis(500)); + let bodies_request = nodes[0].get_block_bodies_request(); + assert_eq!(bodies_request.block_roots.len(), 2); + + master.respond_to_block_bodies_request(&nodes[0], bodies_request); + + std::thread::sleep(Duration::from_millis(10000)); runtime.shutdown_now(); } diff --git a/eth2/types/src/beacon_block_header.rs b/eth2/types/src/beacon_block_header.rs index 3d8b08cc8..f4bee27e1 100644 --- a/eth2/types/src/beacon_block_header.rs +++ b/eth2/types/src/beacon_block_header.rs @@ -37,6 +37,19 @@ impl BeaconBlockHeader { pub fn canonical_root(&self) -> Hash256 { Hash256::from_slice(&self.hash_tree_root()[..]) } + + /// Given a `body`, consumes `self` and returns a complete `BeaconBlock`. + /// + /// Spec v0.5.0 + pub fn into_block(self, body: BeaconBlockBody) -> BeaconBlock { + BeaconBlock { + slot: self.slot, + previous_block_root: self.previous_block_root, + state_root: self.state_root, + body, + signature: self.signature, + } + } } #[cfg(test)]