diff --git a/validator_client/src/duties/epoch_duties.rs b/validator_client/src/duties/epoch_duties.rs index 24b01e620..a74ded18b 100644 --- a/validator_client/src/duties/epoch_duties.rs +++ b/validator_client/src/duties/epoch_duties.rs @@ -5,8 +5,8 @@ use types::{Epoch, PublicKey, Slot}; /// The type of work a validator is required to do in a given slot. #[derive(Debug, Clone)] pub struct WorkType { - produce_block: bool, - produce_attestation: bool, + pub produce_block: bool, + pub produce_attestation: bool, } /// The information required for a validator to propose and attest during some epoch. @@ -85,7 +85,7 @@ impl DerefMut for EpochDutiesMap { impl EpochDutiesMap { /// Checks if the validator has work to do. - fn is_work_slot( + pub fn is_work_slot( &self, slot: Slot, pubkey: &PublicKey, diff --git a/validator_client/src/duties/mod.rs b/validator_client/src/duties/mod.rs index 20a477910..94b845083 100644 --- a/validator_client/src/duties/mod.rs +++ b/validator_client/src/duties/mod.rs @@ -5,15 +5,14 @@ mod grpc; //mod test_node; mod traits; -pub use self::epoch_duties::EpochDutiesMap; use self::epoch_duties::{EpochDuties, EpochDutiesMapError}; +pub use self::epoch_duties::{EpochDutiesMap, WorkType}; use self::traits::{BeaconNode, BeaconNodeError}; -use bls::PublicKey; use futures::Async; use slog::{debug, error, info}; use std::sync::Arc; use std::sync::RwLock; -use types::Epoch; +use types::{Epoch, PublicKey, Slot}; #[derive(Debug, PartialEq, Clone)] pub enum UpdateOutcome { @@ -24,9 +23,6 @@ pub enum UpdateOutcome { /// New `EpochDuties` were obtained, different to those which were previously known. This is /// likely to be the result of chain re-organisation. DutiesChanged(Epoch, EpochDuties), - /// The Beacon Node was unable to return the duties as the validator is unknown, or the - /// shuffling for the epoch is unknown. - UnknownValidatorOrEpoch(Epoch), } #[derive(Debug, PartialEq)] @@ -56,18 +52,19 @@ impl DutiesManager { fn update(&self, epoch: Epoch) -> Result { let duties = self.beacon_node.request_duties(epoch, &self.pubkeys)?; // If these duties were known, check to see if they're updates or identical. - let result = if let Some(known_duties) = self.duties_map.read()?.get(&epoch) { + if let Some(known_duties) = self.duties_map.read()?.get(&epoch) { if *known_duties == duties { return Ok(UpdateOutcome::NoChange(epoch)); } else { //TODO: Duties could be large here. Remove from display and avoid the clone. - return Ok(UpdateOutcome::DutiesChanged(epoch, duties.clone())); + self.duties_map.write()?.insert(epoch, duties.clone()); + return Ok(UpdateOutcome::DutiesChanged(epoch, duties)); } } else { - Ok(UpdateOutcome::NewDuties(epoch, duties.clone())) + //TODO: Remove clone by removing duties from outcome + self.duties_map.write()?.insert(epoch, duties.clone()); + return Ok(UpdateOutcome::NewDuties(epoch, duties)); }; - self.duties_map.write()?.insert(epoch, duties); - result } /// A future wrapping around `update()`. This will perform logic based upon the update @@ -84,12 +81,30 @@ impl DutiesManager { Ok(UpdateOutcome::NewDuties(epoch, duties)) => { info!(log, "New duties obtained"; "epoch" => epoch, "duties" => format!("{:?}", duties)) } - Ok(UpdateOutcome::UnknownValidatorOrEpoch(epoch)) => { - error!(log, "Epoch or validator unknown"; "epoch" => epoch) - } }; Ok(Async::Ready(())) } + + /// Returns a list of (Public, WorkType) indicating all the validators that have work to perform + /// this slot. + pub fn get_current_work(&self, slot: Slot) -> Option> { + let mut current_work: Vec<(PublicKey, WorkType)> = Vec::new(); + + // if the map is poisoned, return None + let duties = self.duties_map.read().ok()?; + + for validator_pk in &self.pubkeys { + match duties.is_work_slot(slot, &validator_pk) { + Ok(Some(work_type)) => current_work.push((validator_pk.clone(), work_type)), + Ok(None) => {} // No work for this validator + Err(_) => {} // Unknown epoch or validator, no work + } + } + if current_work.is_empty() { + return None; + } + Some(current_work) + } } //TODO: Use error_chain to handle errors @@ -101,7 +116,7 @@ impl From for Error { //TODO: Use error_chain to handle errors impl From> for Error { - fn from(e: std::sync::PoisonError) -> Error { + fn from(_e: std::sync::PoisonError) -> Error { Error::DutiesMapPoisoned } } diff --git a/validator_client/src/service.rs b/validator_client/src/service.rs index f47570f74..07d88d344 100644 --- a/validator_client/src/service.rs +++ b/validator_client/src/service.rs @@ -240,7 +240,7 @@ impl Service { .block_on(interval.for_each(move |_| { let log = service.log.clone(); - // get the current slot + /* get the current slot and epoch */ let current_slot = match service.slot_clock.present_slot() { Err(e) => { error!(log, "SystemTimeError {:?}", e); @@ -258,13 +258,25 @@ impl Service { info!(log, "Processing slot: {}", current_slot.as_u64()); - // check for new duties - let mut cloned_manager = manager.clone(); + /* check for new duties */ + + let cloned_manager = manager.clone(); tokio::spawn(futures::future::poll_fn(move || { cloned_manager.run_update(current_epoch.clone(), log.clone()) })); - // execute any specified duties + /* execute any specified duties */ + + if let Some(work) = manager.get_current_work(current_slot) { + for (_public_key, work_type) in work { + if work_type.produce_block { + // TODO: Produce a beacon block in a new thread + } + if work_type.produce_attestation { + //TODO: Produce an attestation in a new thread + } + } + } Ok(()) }))