From ca9094e79a4bf62a34b8349e5868782e5a1a557f Mon Sep 17 00:00:00 2001 From: Luke Anderson Date: Tue, 10 Sep 2019 10:54:37 +1000 Subject: [PATCH] WIP: Made block publishing validator function, which sends to a network channel. Untested. --- beacon_node/rest_api/Cargo.toml | 4 +- beacon_node/rest_api/src/helpers.rs | 30 +++++++++------ beacon_node/rest_api/src/lib.rs | 5 ++- beacon_node/rest_api/src/validator.rs | 54 +++++++++++++++++++++------ 4 files changed, 68 insertions(+), 25 deletions(-) diff --git a/beacon_node/rest_api/Cargo.toml b/beacon_node/rest_api/Cargo.toml index 863ea04da..a3d31e410 100644 --- a/beacon_node/rest_api/Cargo.toml +++ b/beacon_node/rest_api/Cargo.toml @@ -25,7 +25,7 @@ types = { path = "../../eth2/types" } clap = "2.32.0" http = "^0.1.17" prometheus = { version = "^0.6", features = ["process"] } -hyper = "0.12.32" +hyper = "0.12.34" futures = "0.1" exit-future = "0.1.3" tokio = "0.1.17" @@ -35,3 +35,5 @@ eth2_config = { path = "../../eth2/utils/eth2_config" } lighthouse_metrics = { path = "../../eth2/utils/lighthouse_metrics" } slot_clock = { path = "../../eth2/utils/slot_clock" } hex = "0.3.2" +parking_lot = "0.9" + diff --git a/beacon_node/rest_api/src/helpers.rs b/beacon_node/rest_api/src/helpers.rs index d6ea0397f..bff7d9ece 100644 --- a/beacon_node/rest_api/src/helpers.rs +++ b/beacon_node/rest_api/src/helpers.rs @@ -4,10 +4,11 @@ use bls::PublicKey; use eth2_libp2p::{PubsubMessage, Topic}; use eth2_libp2p::{BEACON_BLOCK_TOPIC, TOPIC_ENCODING_POSTFIX, TOPIC_PREFIX}; use hex; +use http::header; use hyper::{Body, Request}; use network::NetworkMessage; +use parking_lot::RwLock; use ssz::Encode; -use std::borrow::BorrowMut; use std::sync::Arc; use store::{iter::AncestorIter, Store}; use tokio::sync::mpsc; @@ -41,6 +42,21 @@ pub fn parse_root(string: &str) -> Result { } } +/// Checks the provided request to ensure that the `content-type` header. +/// +/// The content-type header should either be omitted, in which case JSON is assumed, or it should +/// explicity specify `application/json`. If anything else is provided, an error is returned. +pub fn check_content_type_for_json(req: &Request) -> Result<(), ApiError> { + match req.headers().get(header::CONTENT_TYPE) { + Some(h) if h == "application/json" => Ok(()), + Some(h) => Err(ApiError::InvalidQueryParams(format!( + "The provided content-type {:?} is not available, it must be JSON.", + h + ))), + _ => Ok(()), + } +} + /// Parse a PublicKey from a `0x` prefixed hex string pub fn parse_pubkey(string: &str) -> Result { const PREFIX: &str = "0x"; @@ -204,17 +220,9 @@ pub fn get_logger_from_request(req: &Request) -> slog::Logger { } pub fn publish_beacon_block_to_network( - req: &Request, + chan: Arc>>, block: BeaconBlock, ) -> Result<(), ApiError> { - // Get the network service from the request - let mut network_chan = req - .extensions() - .get::>() - .expect( - "Should always get the network channel from the request, since we put it in there.", - ); - // create the network topic to send on let topic_string = format!( "/{}/{}/{}", @@ -224,7 +232,7 @@ pub fn publish_beacon_block_to_network( let message = PubsubMessage::Block(block.as_ssz_bytes()); // Publish the block to the p2p network via gossipsub. - if let Err(e) = &network_chan.try_send(NetworkMessage::Publish { + if let Err(e) = chan.write().try_send(NetworkMessage::Publish { topics: vec![topic], message: message, }) { diff --git a/beacon_node/rest_api/src/lib.rs b/beacon_node/rest_api/src/lib.rs index c0927dde3..adab0c3bb 100644 --- a/beacon_node/rest_api/src/lib.rs +++ b/beacon_node/rest_api/src/lib.rs @@ -18,8 +18,9 @@ use client_network::NetworkMessage; use client_network::Service as NetworkService; use eth2_config::Eth2Config; use hyper::rt::Future; -use hyper::service::service_fn_ok; -use hyper::{Body, Method, Response, Server, StatusCode}; +use hyper::service::Service; +use hyper::{Body, Method, Request, Response, Server, StatusCode}; +use parking_lot::RwLock; use response_builder::ResponseBuilder; use slog::{info, o, warn}; use std::ops::Deref; diff --git a/beacon_node/rest_api/src/validator.rs b/beacon_node/rest_api/src/validator.rs index 632aee0ac..2ead55d14 100644 --- a/beacon_node/rest_api/src/validator.rs +++ b/beacon_node/rest_api/src/validator.rs @@ -5,8 +5,13 @@ use bls::{AggregateSignature, PublicKey, Signature}; use futures::future::Future; use futures::stream::Stream; use hyper::{Body, Error, Request}; +use network::NetworkMessage; +use parking_lot::RwLock; use serde::{Deserialize, Serialize}; -use slog::info; +use slog::{info, trace, warn}; +use std::sync::Arc; +use tokio; +use tokio::sync::mpsc; use types::beacon_state::EthSpec; use types::{Attestation, BeaconBlock, BitList, Epoch, RelativeEpoch, Shard, Slot}; @@ -200,17 +205,41 @@ pub fn get_new_beacon_block(req: Request) - /// HTTP Handler to publish a BeaconBlock, which has been signed by a validator. pub fn publish_beacon_block(req: Request) -> ApiResult { + let _ = check_content_type_for_json(&req)?; let log = get_logger_from_request(&req); let (beacon_chain, _head_state) = get_beacon_chain_from_request::(&req)?; + // Get the network sending channel from the request, for later transmission + let network_chan = req + .extensions() + .get::>>>() + .expect("Should always get the network channel from the request, since we put it in there.") + .clone(); - let (_head, body) = req.into_parts(); - let block_future = body - .fold(Vec::new(), |mut acc, chunk| { - acc.extend_from_slice(&*chunk); - futures::future::ok::<_, Error>(acc) + let body = req.into_body(); + trace!( + log, + "Got the request body, now going to parse it into a block." + ); + let block = body + .concat2() + .map(move |chunk| chunk.iter().cloned().collect::>()) + .map(|chunks| { + let block_result: Result, ApiError> = + serde_json::from_slice(&chunks.as_slice()).map_err(|e| { + ApiError::InvalidQueryParams(format!( + "Unable to deserialize JSON into a BeaconBlock: {:?}", + e + )) + }); + block_result }) + .unwrap() + .unwrap(); + + /* .map_err(|e| ApiError::ServerError(format!("Unable parse request body: {:?}", e))) .and_then(|body| { + trace!(log, "parsing json"); let block_result: Result, ApiError> = serde_json::from_slice(&body.as_slice()).map_err(|e| { ApiError::InvalidQueryParams(format!( @@ -220,16 +249,19 @@ pub fn publish_beacon_block(req: Request) - }); block_result }); + tokio::run(block_future); let block = block_future.wait()?; + */ + trace!(log, "BeaconBlock successfully parsed from JSON"; "block" => serde_json::to_string(&block).expect("We should always be able to serialize a block that we just created.")); match beacon_chain.process_block(block.clone()) { - Ok(BlockProcessingOutcome::Processed { - block_root: block_root, - }) => { + Ok(BlockProcessingOutcome::Processed { block_root }) => { // Block was processed, publish via gossipsub - info!(log, "Processed valid block from API"; "block_slot" => block.slot, "block_root" => format!("{}", block_root)); - publish_beacon_block_to_network::(&req, block)?; + info!(log, "Processed valid block from API, transmitting to network."; "block_slot" => block.slot, "block_root" => format!("{}", block_root)); + publish_beacon_block_to_network::(network_chan, block)?; } Ok(outcome) => { + warn!(log, "Block could not be processed, but is being sent to the network anyway."; "block_slot" => block.slot, "outcome" => format!("{:?}", outcome)); + //TODO need to send to network and return http 202 return Err(ApiError::InvalidQueryParams(format!( "The BeaconBlock could not be processed: {:?}", outcome