diff --git a/beacon_node/rpc/src/beacon_block.rs b/beacon_node/rpc/src/beacon_block.rs index 6317414b0..29a61766e 100644 --- a/beacon_node/rpc/src/beacon_block.rs +++ b/beacon_node/rpc/src/beacon_block.rs @@ -1,3 +1,4 @@ +use crate::beacon_chain::BeaconChain; use crossbeam_channel; use eth2_libp2p::rpc::methods::BlockRootSlot; use eth2_libp2p::PubsubMessage; @@ -9,13 +10,15 @@ use protos::services::{ PublishBeaconBlockRequest, PublishBeaconBlockResponse, }; use protos::services_grpc::BeaconBlockService; -use slog::debug; use slog::Logger; +use slog::{debug, error, info, warn}; use ssz::{Decodable, TreeHash}; +use std::sync::Arc; use types::{BeaconBlock, Hash256, Slot}; #[derive(Clone)] pub struct BeaconBlockServiceInstance { + pub chain: Arc, pub network_chan: crossbeam_channel::Sender, pub log: Logger, } @@ -50,39 +53,91 @@ impl BeaconBlockService for BeaconBlockServiceInstance { req: PublishBeaconBlockRequest, sink: UnarySink, ) { - debug!(self.log, "PublishBeaconBlock"); + let mut resp = PublishBeaconBlockResponse::new(); - let block = req.get_block(); + let ssz_serialized_block = req.get_block().get_ssz(); - match BeaconBlock::ssz_decode(block.get_ssz(), 0) { + match BeaconBlock::ssz_decode(ssz_serialized_block, 0) { Ok((block, _i)) => { let block_root = Hash256::from_slice(&block.hash_tree_root()[..]); - // TODO: Obtain topics from the network service properly. - let topic = types::TopicBuilder::new("beacon_chain".to_string()).build(); - let message = PubsubMessage::Block(BlockRootSlot { - block_root, - slot: block.slot, - }); + match self.chain.process_block(block.clone()) { + Ok(outcome) => { + if outcome.sucessfully_processed() { + // Block was successfully processed. + info!( + self.log, + "PublishBeaconBlock"; + "type" => "invalid_block", + "outcome" => format!("{:?}", outcome) + ); - println!("Sending beacon block to gossipsub"); - self.network_chan.send(NetworkMessage::Publish { - topics: vec![topic], - message, - }); + // TODO: Obtain topics from the network service properly. + let topic = + types::TopicBuilder::new("beacon_chain".to_string()).build(); + let message = PubsubMessage::Block(BlockRootSlot { + block_root, + slot: block.slot, + }); + + println!("Sending beacon block to gossipsub"); + self.network_chan.send(NetworkMessage::Publish { + topics: vec![topic], + message, + }); + + resp.set_success(true); + } else if outcome.is_invalid() { + // Block was invalid. + warn!( + self.log, + "PublishBeaconBlock"; + "type" => "invalid_block", + "outcome" => format!("{:?}", outcome) + ); + + resp.set_success(false); + resp.set_msg( + format!("InvalidBlock: {:?}", outcome).as_bytes().to_vec(), + ); + } else { + // Some failure during processing. + error!( + self.log, + "PublishBeaconBlock"; + "type" => "other", + "outcome" => format!("{:?}", outcome) + ); + + resp.set_success(false); + resp.set_msg(format!("other: {:?}", outcome).as_bytes().to_vec()); + } + } + Err(e) => { + // Some failure during processing. + error!( + self.log, + "PublishBeaconBlock"; + "type" => "failed_to_process", + "error" => format!("{:?}", e) + ); + + resp.set_success(false); + resp.set_msg(format!("failed_to_process: {:?}", e).as_bytes().to_vec()); + } + } - // TODO: actually process the block. - let mut resp = PublishBeaconBlockResponse::new(); resp.set_success(true); + } + Err(_) => { + resp.set_success(false); + resp.set_msg(b"Invalid SSZ".to_vec()); + } + }; - let f = sink - .success(resp) - .map_err(move |e| println!("failed to reply {:?}: {:?}", req, e)); - ctx.spawn(f) - } - Err(e) => { - // - } - } + let f = sink + .success(resp) + .map_err(move |e| println!("failed to reply {:?}: {:?}", req, e)); + ctx.spawn(f) } } diff --git a/beacon_node/rpc/src/beacon_chain.rs b/beacon_node/rpc/src/beacon_chain.rs index 9b2681876..0551a8024 100644 --- a/beacon_node/rpc/src/beacon_chain.rs +++ b/beacon_node/rpc/src/beacon_chain.rs @@ -5,14 +5,18 @@ use beacon_chain::{ parking_lot::RwLockReadGuard, slot_clock::SlotClock, types::{BeaconState, ChainSpec}, - CheckPoint, }; +pub use beacon_chain::{BeaconChainError, BlockProcessingOutcome}; +use types::BeaconBlock; /// The RPC's API to the beacon chain. pub trait BeaconChain: Send + Sync { fn get_spec(&self) -> &ChainSpec; fn get_state(&self) -> RwLockReadGuard; + + fn process_block(&self, block: BeaconBlock) + -> Result; } impl BeaconChain for RawBeaconChain @@ -28,4 +32,11 @@ where fn get_state(&self) -> RwLockReadGuard { self.state.read() } + + fn process_block( + &self, + block: BeaconBlock, + ) -> Result { + self.process_block(block) + } } diff --git a/beacon_node/rpc/src/lib.rs b/beacon_node/rpc/src/lib.rs index 4dfd33487..289a81f3a 100644 --- a/beacon_node/rpc/src/lib.rs +++ b/beacon_node/rpc/src/lib.rs @@ -43,6 +43,7 @@ pub fn start_server( let beacon_block_service = { let instance = BeaconBlockServiceInstance { + chain: beacon_chain.clone(), network_chan, log: log.clone(), };