diff --git a/Cargo.lock b/Cargo.lock index 6665ed5ec..1ecfd872c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1612,6 +1612,7 @@ dependencies = [ "slog-async", "slog-stdlog", "slog-term", + "slot_clock", "smallvec 1.4.2", "snap", "tempdir", diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index 36b799c8d..ae6959437 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -3,13 +3,14 @@ use crate::beacon_processor::{ }; use crate::service::NetworkMessage; use crate::sync::{PeerSyncInfo, SyncMessage}; -use beacon_chain::{BeaconChain, BeaconChainTypes}; +use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes}; use eth2_libp2p::rpc::*; use eth2_libp2p::{ MessageId, NetworkGlobals, PeerAction, PeerId, PeerRequestId, Request, Response, }; use itertools::process_results; use slog::{debug, error, o, trace, warn}; +use slot_clock::SlotClock; use std::cmp; use std::sync::Arc; use tokio::sync::mpsc; @@ -158,7 +159,9 @@ impl Processor { ); } - self.process_status(peer_id, status); + if let Err(e) = self.process_status(peer_id, status) { + error!(self.log, "Could not process status message"; "error" => format!("{:?}", e)); + } } /// Process a `Status` response from a peer. @@ -175,22 +178,29 @@ impl Processor { ); // Process the status message, without sending back another status. - self.process_status(peer_id, status); + if let Err(e) = self.process_status(peer_id, status) { + error!(self.log, "Could not process status message"; "error" => format!("{:?}", e)); + } } /// Process a `Status` message, requesting new blocks if appropriate. /// /// Disconnects the peer if required. - fn process_status(&mut self, peer_id: PeerId, status: StatusMessage) { + fn process_status( + &mut self, + peer_id: PeerId, + status: StatusMessage, + ) -> Result<(), BeaconChainError> { let remote = PeerSyncInfo::from(status); let local = match PeerSyncInfo::from_chain(&self.chain) { Some(local) => local, None => { - return error!( + error!( self.log, "Failed to get peer sync info"; "msg" => "likely due to head lock contention" - ) + ); + return Err(BeaconChainError::CannotAttestToFutureState); } }; @@ -209,7 +219,11 @@ impl Processor { self.network .goodbye_peer(peer_id, GoodbyeReason::IrrelevantNetwork); } else if remote.head_slot - > self.chain.slot().unwrap_or_else(|_| Slot::from(0u64)) + FUTURE_SLOT_TOLERANCE + > self + .chain + .slot() + .unwrap_or_else(|_| self.chain.slot_clock.genesis_slot()) + + FUTURE_SLOT_TOLERANCE { // Note: If the slot_clock cannot be read, this will not error. Other system // components will deal with an invalid slot clock error. @@ -230,8 +244,7 @@ impl Processor { && self .chain .root_at_slot(start_slot(remote.finalized_epoch)) - .map(|root_opt| root_opt != Some(remote.finalized_root)) - .unwrap_or_else(|_| false) + .map(|root_opt| root_opt != Some(remote.finalized_root))? { // The remotes finalized epoch is less than or greater than ours, but the block root is // different to the one in our chain. @@ -267,8 +280,7 @@ impl Processor { } else if self .chain .store - .item_exists::>(&remote.head_root) - .unwrap_or_else(|_| false) + .item_exists::>(&remote.head_root)? { debug!( self.log, "Peer with known chain found"; @@ -293,6 +305,8 @@ impl Processor { ); self.send_to_sync(SyncMessage::AddPeer(peer_id, remote)); } + + Ok(()) } /// Handle a `BlocksByRoot` request from the peer. @@ -438,6 +452,11 @@ impl Processor { } } + let current_slot = self + .chain + .slot() + .unwrap_or_else(|_| self.chain.slot_clock.genesis_slot()); + if blocks_sent < (req.count as usize) { debug!( self.log, @@ -445,7 +464,7 @@ impl Processor { "peer" => peer_id.to_string(), "msg" => "Failed to return all requested blocks", "start_slot" => req.start_slot, - "current_slot" => self.chain.slot().unwrap_or_else(|_| Slot::from(0_u64)).as_u64(), + "current_slot" => current_slot, "requested" => req.count, "returned" => blocks_sent); } else { @@ -454,7 +473,7 @@ impl Processor { "Sending BlocksByRange Response"; "peer" => peer_id.to_string(), "start_slot" => req.start_slot, - "current_slot" => self.chain.slot().unwrap_or_else(|_| Slot::from(0_u64)).as_u64(), + "current_slot" => current_slot, "requested" => req.count, "returned" => blocks_sent); }