diff --git a/beacon_node/client/src/lib.rs b/beacon_node/client/src/lib.rs index a033da87b..44eab4fe2 100644 --- a/beacon_node/client/src/lib.rs +++ b/beacon_node/client/src/lib.rs @@ -64,8 +64,6 @@ impl Client { )); } - println!("Here"); - Ok(Client { config, beacon_chain, diff --git a/validator_client/Cargo.toml b/validator_client/Cargo.toml index 2fc175ad9..eace153fa 100644 --- a/validator_client/Cargo.toml +++ b/validator_client/Cargo.toml @@ -30,4 +30,5 @@ slog-term = "^2.4.0" slog-async = "^2.3.0" tokio = "0.1.18" tokio-timer = "0.2.10" +error-chain = "0.12.0" bincode = "^1.1.2" diff --git a/validator_client/src/error.rs b/validator_client/src/error.rs new file mode 100644 index 000000000..29d7ba882 --- /dev/null +++ b/validator_client/src/error.rs @@ -0,0 +1,22 @@ +use slot_clock; + +use error_chain::{ + error_chain, error_chain_processing, impl_error_chain_kind, impl_error_chain_processed, + impl_extract_backtrace, +}; + +error_chain! { + links { } + + errors { + SlotClockError(e: slot_clock::SystemTimeSlotClockError) { + description("Error reading system time"), + display("SlotClockError: '{:?}'", e) + } + + SystemTimeError(t: String ) { + description("Error reading system time"), + display("SystemTimeError: '{}'", t) + } + } +} diff --git a/validator_client/src/main.rs b/validator_client/src/main.rs index 36e1514fe..84d0cbff7 100644 --- a/validator_client/src/main.rs +++ b/validator_client/src/main.rs @@ -2,12 +2,13 @@ mod attester_service; mod block_producer_service; mod config; mod duties; +pub mod error; mod service; use crate::config::Config as ValidatorClientConfig; use clap::{App, Arg}; use service::Service as ValidatorService; -use slog::{o, Drain}; +use slog::{error, info, o, Drain}; fn main() { // Logging @@ -51,5 +52,8 @@ fn main() { .expect("Unable to build a configuration for the validator client."); // start the validator service. - ValidatorService::start(config, log); + match ValidatorService::start(config, log.clone()) { + Ok(_) => info!(log, "Validator client shutdown successfully."), + Err(e) => error!(log, "Validator exited due to: {}", e.to_string()), + } } diff --git a/validator_client/src/service.rs b/validator_client/src/service.rs index 49acd8ad2..720388a61 100644 --- a/validator_client/src/service.rs +++ b/validator_client/src/service.rs @@ -2,7 +2,10 @@ use crate::attester_service::{AttestationGrpcClient, AttesterService}; use crate::block_producer_service::{BeaconBlockGrpcClient, BlockProducerService}; use crate::config::Config as ValidatorConfig; +use crate::duties::PollOutcome; use crate::duties::{DutiesManager, DutiesManagerService, EpochDutiesMap}; +use crate::error as error_chain; +use crate::error::ErrorKind; use attester::test_utils::EpochMap; use attester::{test_utils::LocalSigner as AttesterLocalSigner, Attester}; use block_proposer::{test_utils::LocalSigner as BlockProposerLocalSigner, BlockProducer}; @@ -13,9 +16,8 @@ use protos::services_grpc::{ AttestationServiceClient, BeaconBlockServiceClient, BeaconNodeServiceClient, ValidatorServiceClient, }; -use slog::{debug, info, warn}; +use slog::{debug, error, info, warn}; use slot_clock::{SlotClock, SystemTimeSlotClock}; -use std::ops::Sub; use std::sync::Arc; use std::time::{Duration, Instant, SystemTime}; use tokio::prelude::*; @@ -39,8 +41,8 @@ pub struct Service { slot_clock: Arc, /// The current slot we are processing. current_slot: Slot, - /// Micro seconds until the next slot. This is used for initializing the tokio timer interval. - micros_to_next_slot: Duration, + /// Duration until the next slot. This is used for initializing the tokio timer interval. + duration_to_next_slot: Duration, // GRPC Clients /// The beacon block GRPC client. beacon_block_client: Arc, @@ -57,7 +59,10 @@ impl Service { /// /// This tries to connect to a beacon node. Once connected, it initialised the gRPC clients /// and returns an instance of the service. - fn initialize_service(config: &ValidatorConfig, log: slog::Logger) -> Self { + fn initialize_service( + config: &ValidatorConfig, + log: slog::Logger, + ) -> error_chain::Result { // initialise the beacon node client to check for a connection let env = Arc::new(EnvBuilder::new().build()); @@ -76,13 +81,26 @@ impl Service { std::thread::sleep(Duration::from_secs(5)); continue; } - Ok(info) => break info, + Ok(info) => { + if SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs() + < info.genesis_time + { + warn!( + log, + "Beacon Node's genesis time is in the future. No work to do.\n Exiting" + ); + return Err("Genesis time in the future".into()); + } + break info; + } }; }; // build requisite objects to form Self let genesis_time = node_info.get_genesis_time(); - let genesis_time = 1_549_935_547; info!(log,"Beacon node connected"; "Node Version" => node_info.version.clone(), "Chain ID" => node_info.chain_id, "Genesis time" => genesis_time); @@ -124,60 +142,54 @@ impl Service { Arc::new(AttestationServiceClient::new(ch)) }; - //TODO: Add error chain. Handle errors - let current_slot = slot_clock.present_slot().unwrap().unwrap().sub(1); + let current_slot = slot_clock + .present_slot() + .map_err(|e| ErrorKind::SlotClockError(e))? + .expect("Genesis must be in the future"); - // calculate seconds to the next slot - let micros_to_next_slot = { + // calculate the duration to the next slot + let duration_to_next_slot = { + let seconds_per_slot = config.spec.seconds_per_slot; let syslot_time = SystemTime::now(); - let duration_since_epoch = syslot_time.duration_since(SystemTime::UNIX_EPOCH).unwrap(); - debug!(log, "Duration since unix epoch {:?}", duration_since_epoch); - let mut micros_to_slot = None; - if let Some(duration_since_genesis) = - duration_since_epoch.checked_sub(Duration::from_secs(genesis_time)) - { - // seconds till next slot - debug!(log, "Genesis Time {:?}", genesis_time); - debug!(log, "Duration since genesis {:?}", duration_since_genesis); - micros_to_slot = duration_since_genesis - .as_secs() - .checked_rem(config.spec.seconds_per_slot); - } - micros_to_slot.unwrap_or_else(|| 0) - /* - let duration_to_slot = duration_since_genesis - .checked_sub(Duration::from( - duration_since_genesis - .checked_div(config.spec.seconds_per_slot as u64) - .unwrap() - .as_secs() - .checked_mul(config.spec.seconds_per_slot) - .unwrap(), - )) - .unwrap(); - */ + let duration_since_epoch = syslot_time + .duration_since(SystemTime::UNIX_EPOCH) + .map_err(|e| ErrorKind::SystemTimeError(e.to_string()))?; + let duration_since_genesis = duration_since_epoch + .checked_sub(Duration::from_secs(genesis_time)) + .expect("Genesis must be in the future. Checked on connection"); + let elapsed_slots = duration_since_epoch + .as_secs() + .checked_div(seconds_per_slot as u64) + .expect("Seconds per slot should not be 0"); + + // the duration to the next slot + Duration::from_secs( + (elapsed_slots + 1) + .checked_mul(seconds_per_slot) + .expect("Next slot time should not overflow u64"), + ) + .checked_sub(duration_since_genesis) + .expect("This should never saturate") }; - info!(log, ""; "Micro Seconds to next slot"=>micros_to_next_slot); - - Self { + Ok(Self { connected_node_version: node_info.version, chain_id: node_info.chain_id as u16, fork, slot_clock, current_slot, - micros_to_next_slot: Duration::from_micros(micros_to_next_slot), + duration_to_next_slot, beacon_block_client, validator_client, attester_client, log, - } + }) } /// Initialise the service then run the core thread. - pub fn start(config: ValidatorConfig, log: slog::Logger) { + pub fn start(config: ValidatorConfig, log: slog::Logger) -> error_chain::Result<()> { // connect to the node and retrieve its properties and initialize the gRPC clients - let service = Service::initialize_service(&config, log); + let service = Service::initialize_service(&config, log)?; // we have connected to a node and established its parameters. Spin up the core service @@ -186,15 +198,17 @@ impl Service { .clock(Clock::system()) .name_prefix("validator-client-") .build() - .unwrap(); + .map_err(|e| format!("Tokio runtime failed: {}", e))?; // set up the validator work interval - start at next slot and proceed every slot - // TODO: Error chain handle errors. let interval = { // Set the interval to start at the next slot, and every slot after let slot_duration = Duration::from_secs(config.spec.seconds_per_slot); //TODO: Handle checked add correctly - Interval::new(Instant::now() + service.micros_to_next_slot, slot_duration) + Interval::new( + Instant::now() + service.duration_to_next_slot, + slot_duration, + ) }; // kick off core service @@ -216,16 +230,47 @@ impl Service { beacon_node: service.validator_client.clone(), }; - runtime.block_on(interval.for_each(move |_| { - // update duties - debug!( - service.log, - "Processing slot: {}", - service.slot_clock.present_slot().unwrap().unwrap().as_u64() - ); - manager.poll(); - Ok(()) - })); + runtime + .block_on(interval.for_each(move |_| { + // update duties + let current_slot = match service.slot_clock.present_slot() { + Err(e) => { + error!(service.log, "SystemTimeError {:?}", e); + return Ok(()); + } + Ok(slot) => slot.expect("Genesis is in the future"), + }; + + debug_assert!( + current_slot > service.current_slot, + "The Timer should poll a new slot" + ); + + debug!(service.log, "Processing slot: {}", current_slot.as_u64()); + + // check for new duties + match manager.poll() { + Err(error) => { + error!(service.log, "Epoch duties poll error"; "error" => format!("{:?}", error)) + } + Ok(PollOutcome::NoChange(epoch)) => { + debug!(service.log, "No change in duties"; "epoch" => epoch) + } + Ok(PollOutcome::DutiesChanged(epoch, duties)) => { + info!(service.log, "Duties changed (potential re-org)"; "epoch" => epoch, "duties" => format!("{:?}", duties)) + } + Ok(PollOutcome::NewDuties(epoch, duties)) => { + info!(service.log, "New duties obtained"; "epoch" => epoch, "duties" => format!("{:?}", duties)) + } + Ok(PollOutcome::UnknownValidatorOrEpoch(epoch)) => { + error!(service.log, "Epoch or validator unknown"; "epoch" => epoch) + } + }; + + Ok(()) + })) + .map_err(|e| format!("Service thread failed: {:?}", e))?; + Ok(()) } /*