From 61fc946d54a0de8b937152bf2d799c428ed7b29a Mon Sep 17 00:00:00 2001 From: Age Manning Date: Fri, 22 Mar 2019 22:50:16 +1100 Subject: [PATCH] Adds initial connection to beacon node with retries --- validator_client/src/service.rs | 79 +++++++++++++++++++++++++++------ 1 file changed, 66 insertions(+), 13 deletions(-) diff --git a/validator_client/src/service.rs b/validator_client/src/service.rs index 95f2a2361..3bf2d6104 100644 --- a/validator_client/src/service.rs +++ b/validator_client/src/service.rs @@ -13,16 +13,75 @@ use protos::services_grpc::{ AttestationServiceClient, BeaconBlockServiceClient, BeaconNodeServiceClient, ValidatorServiceClient, }; -use slog::{info, o, Drain}; +use slog::{info, o, warn, Drain}; use slot_clock::SystemTimeSlotClock; use std::sync::Arc; use std::thread; +use std::time::Duration; +use types::{Epoch, Fork}; /// The validator service. This is the main thread that executes and maintains validator /// duties. -pub struct Service {} +#[derive(Debug)] +pub struct Service { + /// The node we currently connected to. + connected_node_version: String, + /// The chain id we are processing on. + chain_id: u16, + /// The fork state we processing on. + fork: Fork, + // /// The slot clock keeping track of time. + // slot_clock: Arc, +} impl Service { + /// Initial connection to the beacon node to determine its properties. + fn connect_to_node( + node_client: Arc, + seconds_per_slot: u64, + log: &slog::Logger, + ) -> Self { + // retrieve node information + let node_info = loop { + let info = match node_client.info(&Empty::new()) { + Err(e) => { + warn!(log, "Could not connect to node. Error: {}", e); + info!(log, "Retrying in 5 seconds..."); + std::thread::sleep(Duration::from_secs(5)); + continue; + } + Ok(info) => break info, + }; + }; + + info!(log,"Beacon node connected"; "Node Version:" => node_info.version.clone(), "Chain ID:" => node_info.chain_id); + + let proto_fork = node_info.get_fork(); + let mut previous_version: [u8; 4] = [0; 4]; + let mut current_version: [u8; 4] = [0; 4]; + previous_version.copy_from_slice(&proto_fork.get_previous_version()[..4]); + current_version.copy_from_slice(&proto_fork.get_current_version()[..4]); + let fork = Fork { + previous_version, + current_version, + epoch: Epoch::from(proto_fork.get_epoch()), + }; + + let genesis_time = 1_549_935_547; + let slot_clock = { + info!(log, "Genesis time"; "unix_epoch_seconds" => genesis_time); + let clock = SystemTimeSlotClock::new(genesis_time, seconds_per_slot) + .expect("Unable to instantiate SystemTimeSlotClock."); + Arc::new(clock) + }; + + Self { + connected_node_version: node_info.version, + chain_id: node_info.chain_id as u16, + fork, + } + } + pub fn start(config: ValidatorConfig, log: slog::Logger) { // initialize the RPC clients @@ -51,16 +110,13 @@ impl Service { Arc::new(AttestationServiceClient::new(ch)) }; + let spec = Arc::new(config.spec); // connect to the node and retrieve its properties - // node_info = connect_to_node(beacon_ndoe_grpc_client); + let service = + Service::connect_to_node(beacon_node_grpc_client, spec.seconds_per_slot, &log); - // retrieve node information - let node_info = beacon_node_grpc_client.info(&Empty::new()); - - info!(log, "Beacon node info: {:?}", node_info); - - // Spec - let spec = Arc::new(config.spec.clone()); + let poll_interval_millis = spec.seconds_per_slot * 1000 / 10; // 10% epoch time precision. + info!(log, "Starting block producer service"; "polls_per_epoch" => spec.seconds_per_slot * 1000 / poll_interval_millis); let genesis_time = 1_549_935_547; let slot_clock = { @@ -70,9 +126,6 @@ impl Service { Arc::new(clock) }; - let poll_interval_millis = spec.seconds_per_slot * 1000 / 10; // 10% epoch time precision. - info!(log, "Starting block producer service"; "polls_per_epoch" => spec.seconds_per_slot * 1000 / poll_interval_millis); - /* * Start threads. */