diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 5fa7e7a77..48a42b941 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -8,6 +8,7 @@ pub mod test_utils; pub use self::beacon_chain::{BeaconChain, BlockProcessingOutcome, InvalidBlock, ValidBlock}; pub use self::checkpoint::CheckPoint; pub use self::errors::BeaconChainError; +pub use attestation_aggregator::Outcome as AggregationOutcome; pub use db; pub use fork_choice; pub use parking_lot; diff --git a/beacon_node/eth2-libp2p/src/rpc/methods.rs b/beacon_node/eth2-libp2p/src/rpc/methods.rs index 85ef7e06f..47e47f3eb 100644 --- a/beacon_node/eth2-libp2p/src/rpc/methods.rs +++ b/beacon_node/eth2-libp2p/src/rpc/methods.rs @@ -1,7 +1,7 @@ use ssz::{Decodable, DecodeError, Encodable, SszStream}; /// Available RPC methods types and ids. use ssz_derive::{Decode, Encode}; -use types::{BeaconBlockBody, BeaconBlockHeader, Epoch, Hash256, Slot}; +use types::{Attestation, BeaconBlockBody, BeaconBlockHeader, Epoch, Hash256, Slot}; #[derive(Debug)] /// Available Serenity Libp2p RPC methods @@ -97,6 +97,12 @@ impl RPCResponse { } } +#[derive(Debug, Clone)] +pub enum IncomingGossip { + Block(BlockGossip), + Attestation(AttestationGossip), +} + /* Request/Response data structures for RPC methods */ /// The HELLO request/response handshake message. @@ -236,3 +242,15 @@ pub struct BeaconChainStateResponse { /// The values corresponding the to the requested tree hashes. pub values: bool, //TBD - stubbed with encodeable bool } + +/// Gossipsub message providing notification of a new block. +#[derive(Encode, Decode, Clone, Debug, PartialEq)] +pub struct BlockGossip { + pub root: BlockRootSlot, +} + +/// Gossipsub message providing notification of a new attestation. +#[derive(Encode, Decode, Clone, Debug, PartialEq)] +pub struct AttestationGossip { + pub attestation: Attestation, +} diff --git a/beacon_node/eth2-libp2p/src/rpc/mod.rs b/beacon_node/eth2-libp2p/src/rpc/mod.rs index a1573ec93..925c36616 100644 --- a/beacon_node/eth2-libp2p/src/rpc/mod.rs +++ b/beacon_node/eth2-libp2p/src/rpc/mod.rs @@ -11,7 +11,7 @@ use libp2p::core::swarm::{ ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters, }; use libp2p::{Multiaddr, PeerId}; -pub use methods::{HelloMessage, RPCMethod, RPCRequest, RPCResponse}; +pub use methods::{HelloMessage, IncomingGossip, RPCMethod, RPCRequest, RPCResponse}; pub use protocol::{RPCEvent, RPCProtocol}; use slog::o; use std::marker::PhantomData; diff --git a/beacon_node/network/src/beacon_chain.rs b/beacon_node/network/src/beacon_chain.rs index cc54e8ae0..26cea0065 100644 --- a/beacon_node/network/src/beacon_chain.rs +++ b/beacon_node/network/src/beacon_chain.rs @@ -5,10 +5,10 @@ use beacon_chain::{ parking_lot::RwLockReadGuard, slot_clock::SlotClock, types::{BeaconState, ChainSpec}, - CheckPoint, + AggregationOutcome, CheckPoint, }; use eth2_libp2p::HelloMessage; -use types::{BeaconBlock, BeaconBlockBody, BeaconBlockHeader, Epoch, Hash256, Slot}; +use types::{Attestation, BeaconBlock, BeaconBlockBody, BeaconBlockHeader, Epoch, Hash256, Slot}; pub use beacon_chain::{BeaconChainError, BlockProcessingOutcome}; @@ -37,6 +37,11 @@ pub trait BeaconChain: Send + Sync { fn process_block(&self, block: BeaconBlock) -> Result; + fn process_attestation( + &self, + attestation: Attestation, + ) -> Result; + fn get_block_roots( &self, start_slot: Slot, @@ -119,6 +124,18 @@ where self.process_block(block) } + fn process_attestation( + &self, + _attestation: Attestation, + ) -> Result { + // Awaiting a proper operations pool before we can import attestations. + // + // Returning a useless error for now. + // + // https://github.com/sigp/lighthouse/issues/281 + return Err(BeaconChainError::DBInconsistent("CANNOT PROCESS".into())); + } + fn get_block_roots( &self, start_slot: Slot, diff --git a/beacon_node/network/src/message_handler.rs b/beacon_node/network/src/message_handler.rs index 57923b2c3..a788e83c9 100644 --- a/beacon_node/network/src/message_handler.rs +++ b/beacon_node/network/src/message_handler.rs @@ -4,7 +4,7 @@ use crate::service::{NetworkMessage, OutgoingMessage}; use crate::sync::SimpleSync; use crossbeam_channel::{unbounded as channel, Sender}; use eth2_libp2p::{ - rpc::{RPCRequest, RPCResponse}, + rpc::{IncomingGossip, RPCRequest, RPCResponse}, PeerId, RPCEvent, }; use futures::future; @@ -39,8 +39,8 @@ pub enum HandlerMessage { PeerDisconnected(PeerId), /// An RPC response/request has been received. RPC(PeerId, RPCEvent), - /// A block has been imported. - BlockImported(), //TODO: This comes from pub-sub - decide its contents + /// A gossip message has been received. + IncomingGossip(PeerId, IncomingGossip), } impl MessageHandler { @@ -90,6 +90,10 @@ impl MessageHandler { HandlerMessage::RPC(peer_id, rpc_event) => { self.handle_rpc_message(peer_id, rpc_event); } + // we have received an RPC message request/response + HandlerMessage::IncomingGossip(peer_id, gossip) => { + self.handle_gossip(peer_id, gossip); + } //TODO: Handle all messages _ => {} } @@ -186,6 +190,19 @@ impl MessageHandler { } }; } + + /// Handle RPC messages + fn handle_gossip(&mut self, peer_id: PeerId, gossip_message: IncomingGossip) { + match gossip_message { + IncomingGossip::Block(message) => { + self.sync + .on_block_gossip(peer_id, message, &mut self.network_context) + } + IncomingGossip::Attestation(message) => { + // + } + } + } } pub struct NetworkContext { diff --git a/beacon_node/network/src/sync/import_queue.rs b/beacon_node/network/src/sync/import_queue.rs index 6508af89e..17cbd2f12 100644 --- a/beacon_node/network/src/sync/import_queue.rs +++ b/beacon_node/network/src/sync/import_queue.rs @@ -104,7 +104,7 @@ impl ImportQueue { } /// Returns `true` if `self.chain` has not yet processed this block. - fn is_new_block(&self, block_root: &Hash256) -> bool { + pub fn is_new_block(&self, block_root: &Hash256) -> bool { self.chain .is_new_block_root(&block_root) .unwrap_or_else(|_| { diff --git a/beacon_node/network/src/sync/simple_sync.rs b/beacon_node/network/src/sync/simple_sync.rs index 4ee349043..06ccbafd3 100644 --- a/beacon_node/network/src/sync/simple_sync.rs +++ b/beacon_node/network/src/sync/simple_sync.rs @@ -204,6 +204,8 @@ impl SimpleSync { self.known_peers.insert(peer_id.clone(), remote); } + // TODO: boot peer if finalization is wrong. + match remote_status { PeerStatus::OnDifferentChain => { info!( @@ -462,6 +464,62 @@ impl SimpleSync { self.process_import_queue(network); } + /// Process a gossip message declaring a new block. + pub fn on_block_gossip( + &mut self, + peer_id: PeerId, + msg: BlockGossip, + network: &mut NetworkContext, + ) { + debug!( + self.log, + "BlockGossip"; + "peer" => format!("{:?}", peer_id), + ); + // TODO: filter out messages that a prior to the finalized slot. + // + // TODO: if the block is a few more slots ahead, try to get all block roots from then until + // now. + // + // Note: only requests the new block -- will fail if we don't have its parents. + if self.import_queue.is_new_block(&msg.root.block_root) { + self.request_block_headers( + peer_id, + BeaconBlockHeadersRequest { + start_root: msg.root.block_root, + start_slot: msg.root.slot, + max_headers: 1, + skip_slots: 0, + }, + network, + ) + } + } + + /// Process a gossip message declaring a new attestation. + /// + /// Not currently implemented. + pub fn on_attestation_gossip( + &mut self, + peer_id: PeerId, + msg: AttestationGossip, + _network: &mut NetworkContext, + ) { + debug!( + self.log, + "AttestationGossip"; + "peer" => format!("{:?}", peer_id), + ); + + // Awaiting a proper operations pool before we can import attestations. + // + // https://github.com/sigp/lighthouse/issues/281 + match self.chain.process_attestation(msg.attestation) { + Ok(_) => panic!("Impossible, method not implemented."), + Err(_) => error!(self.log, "Attestation processing not implemented!"), + } + } + /// Iterate through the `import_queue` and process any complete blocks. /// /// If a block is successfully processed it is removed from the queue, otherwise it remains in