From d8fd7c8803f3ed780177463a060fb0e3fa44a7d8 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Sat, 30 Mar 2019 18:36:38 +1100 Subject: [PATCH] Implement beacon node side of attestation production gRPC --- beacon_node/rpc/src/beacon_attester.rs | 100 +++++++++++++++++++------ 1 file changed, 76 insertions(+), 24 deletions(-) diff --git a/beacon_node/rpc/src/beacon_attester.rs b/beacon_node/rpc/src/beacon_attester.rs index 4166c30d4..d787d1e5f 100644 --- a/beacon_node/rpc/src/beacon_attester.rs +++ b/beacon_node/rpc/src/beacon_attester.rs @@ -7,9 +7,7 @@ use protos::services::{ PublishAttestation }; use protos::services_grpc::BeaconBlockService; -use slog::{Logger, info, warn, error}; - -const TEST_SHARD_PHASE_ZERO: u8 = 0; +use slog::{Logger, info, warn, error, trace}; #[derive(Clone)] pub struct AttestationServiceInstance { @@ -25,30 +23,47 @@ impl AttestationService for AttestationServiceInstance { req: ProduceAttestationDataRequest, sink: UnarySink, ) { - info!(&self.log, "Attempting to produce attestation at slot {}", req.get_slot()); + trace!(&self.log, "Attempting to produce attestation at slot {}", req.get_slot()); - // get the chain spec & state - let spec = self.chain.get_spec(); - let state = self.chain.get_state(); + // verify the slot, drop lock on state afterwards + { + let slot_requested = req.get_slot(); + let state = self.chain.get_state(); - let slot_requested = req.get_slot(); - - // Start by performing some checks - // Check that the the AttestionData is for the current slot (otherwise it will not be valid) - if slot_requested != state.slot { - let f = sink - .fail(RpcStatus::new( - RpcStatusCode::OutOfRange, - "AttestationData request for a slot that is not the current slot." - )) - .map_err(move |e| error!(&self.log, "Failed to reply with failure {:?}: {:?}", req, e)); + // Start by performing some checks + // Check that the the AttestionData is for the current slot (otherwise it will not be valid) + if slot_requested != state.slot { + let f = sink + .fail(RpcStatus::new( + RpcStatusCode::OutOfRange, + "AttestationData request for a slot that is not the current slot." + )) + .map_err(move |e| error!(&self.log, "Failed to reply with failure {:?}: {:?}", req, e)); + } } - // Then get the AttestationData from the beacon chain (for shard 0 for now) - let attestation_data = self.chain.produce_attestation_data(TEST_SHARD_PHASE_ZERO); + // Then get the AttestationData from the beacon chain + let attestation_data = match self.chain.produce_attestation_data(req.get_shard()){ + Ok(v) => v, + Err(e) => { + // Could not produce an attestation + let log_clone = self.log.clone(); + let f = sink + .fail(RpcStatus::new( + RpcStatusCode::Unknown + Some(format!("Could not produce an attestation: {:?}",e)), + )) + .map_err(move |e| warn!(log_clone, "failed to reply {:?}: {:?}", req, e)); + return ctx.spawn(f); + } + }; + + + let mut attestation_data_proto = AttestationDataProto::new(); + attestation_data_proto.set_ssz(ssz_encode(&attestation_data)); let mut resp = ProduceAttestationDataResponse::new(); - resp.set_attestation_data(attestation_data); + resp.set_attestation_data(attestation_data_proto); let f = sink .success(resp) @@ -64,12 +79,49 @@ impl AttestationService for AttestationServiceInstance { req: PublishAttestationRequest, sink: UnarySink, ) { - println!("publishing attestation {:?}", req.get_block()); + trace!(self.log, "Publishing attestation"); - // TODO: actually process the block. let mut resp = PublishAttestationResponse::new(); + let ssz_serialized_attestation = req.get_attestation().get_ssz(); - resp.set_success(true); + let attestation = match Attestation::ssz_decode(ssz_serialized_attestation, 0) { + Ok((v, _index)) => v, + Err(_) => { + let log_clone = self.log.clone(); + let f = sink + .fail(RpcStatus::new( + RpcStatusCode::InvalidArgument, + Some("Invalid attestation".to_string()), + )) + .map_err(move |e| warn!(log_clone, "failed to reply {:?}", req)); + return ctx.spawn(f); + } + }; + + match self.chain.process_attestation(attestation) { + Ok(_) => { + // Attestation was successfully processed. + info!( + self.log, + "PublishAttestation"; + "type" => "valid_attestation", + ); + + resp.set_success(true); + }, + Err(e)=> { + // Attestation was invalid + warn!( + self.log, + "PublishAttestation"; + "type" => "invalid_attestation", + ); + resp.set_success(false); + resp.set_msg( + format!("InvalidAttestation: {:?}", e).as_bytes().to_vec(), + ); + } + }; let f = sink .success(resp)