From f6ec44f0dd38ff86309ace4a4e246c0ea42f4e86 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Thu, 30 Jun 2022 00:49:21 +0000 Subject: [PATCH] Register validator api (#3194) ## Issue Addressed Lays the groundwork for builder API changes by implementing the beacon-API's new `register_validator` endpoint ## Proposed Changes - Add a routine in the VC that runs on startup (re-try until success), once per epoch or whenever `suggested_fee_recipient` is updated, signing `ValidatorRegistrationData` and sending it to the BN. - TODO: `gas_limit` config options https://github.com/ethereum/builder-specs/issues/17 - BN only sends VC registration data to builders on demand, but VC registration data *does update* the BN's prepare proposer cache and send an updated fcU to a local EE. This is necessary for fee recipient consistency between the blinded and full block flow in the event of fallback. Having the BN only send registration data to builders on demand gives feedback directly to the VC about relay status. Also, since the BN has no ability to sign these messages anyways (so couldn't refresh them if it wanted), and validator registration is independent of the BN head, I think this approach makes sense. - Adds upcoming consensus spec changes for this PR https://github.com/ethereum/consensus-specs/pull/2884 - I initially applied the bit mask based on a configured application domain.. but I ended up just hard coding it here instead because that's how it's spec'd in the builder repo. - Should application mask appear in the api? Co-authored-by: realbigsean --- Cargo.lock | 1 + beacon_node/http_api/Cargo.toml | 1 + beacon_node/http_api/src/lib.rs | 86 ++++++- beacon_node/http_api/tests/tests.rs | 76 ++++++ book/src/api-vc-endpoints.md | 1 + common/eth2/src/lib.rs | 17 ++ consensus/types/src/application_domain.rs | 16 ++ consensus/types/src/chain_spec.rs | 50 ++++ consensus/types/src/config_and_preset.rs | 4 + consensus/types/src/lib.rs | 3 + .../types/src/validator_registration_data.rs | 23 ++ testing/web3signer_tests/src/lib.rs | 33 +++ validator_client/src/http_metrics/metrics.rs | 5 + validator_client/src/lib.rs | 5 +- validator_client/src/preparation_service.rs | 232 +++++++++++++++++- validator_client/src/signing_method.rs | 39 ++- .../src/signing_method/web3signer.rs | 3 + validator_client/src/validator_store.rs | 35 ++- 18 files changed, 603 insertions(+), 27 deletions(-) create mode 100644 consensus/types/src/application_domain.rs create mode 100644 consensus/types/src/validator_registration_data.rs diff --git a/Cargo.lock b/Cargo.lock index 3dbe00565..3bdce9138 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2530,6 +2530,7 @@ dependencies = [ "safe_arith", "sensitive_url", "serde", + "serde_json", "slog", "slot_clock", "state_processing", diff --git a/beacon_node/http_api/Cargo.toml b/beacon_node/http_api/Cargo.toml index 9dd2af7d1..07fb99239 100644 --- a/beacon_node/http_api/Cargo.toml +++ b/beacon_node/http_api/Cargo.toml @@ -39,6 +39,7 @@ environment = { path = "../../lighthouse/environment" } tree_hash = "0.4.1" sensitive_url = { path = "../../common/sensitive_url" } logging = { path = "../../common/logging" } +serde_json = "1.0.58" [[test]] name = "bn_http_api_tests" diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 379033a11..06dc96876 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -49,8 +49,8 @@ use types::{ BlindedPayload, CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName, FullPayload, ProposerPreparationData, ProposerSlashing, RelativeEpoch, Signature, SignedAggregateAndProof, SignedBeaconBlock, SignedBeaconBlockMerge, SignedBlindedBeaconBlock, - SignedContributionAndProof, SignedVoluntaryExit, Slot, SyncCommitteeMessage, - SyncContributionData, + SignedContributionAndProof, SignedValidatorRegistrationData, SignedVoluntaryExit, Slot, + SyncCommitteeMessage, SyncContributionData, }; use version::{ add_consensus_version_header, fork_versioned_response, inconsistent_fork_rejection, @@ -2408,12 +2408,10 @@ pub fn serve( .and(warp::path::end()) .and(not_while_syncing_filter.clone()) .and(chain_filter.clone()) - .and(warp::addr::remote()) .and(log_filter.clone()) .and(warp::body::json()) .and_then( |chain: Arc>, - client_addr: Option, log: Logger, preparation_data: Vec| { blocking_json_task(move || { @@ -2430,9 +2428,6 @@ pub fn serve( log, "Received proposer preparation data"; "count" => preparation_data.len(), - "client" => client_addr - .map(|a| a.to_string()) - .unwrap_or_else(|| "unknown".to_string()), ); execution_layer @@ -2455,6 +2450,82 @@ pub fn serve( }, ); + // POST validator/register_validator + let post_validator_register_validator = eth1_v1 + .and(warp::path("validator")) + .and(warp::path("register_validator")) + .and(warp::path::end()) + .and(chain_filter.clone()) + .and(log_filter.clone()) + .and(warp::body::json()) + .and_then( + |chain: Arc>, + log: Logger, + register_val_data: Vec| { + blocking_json_task(move || { + let execution_layer = chain + .execution_layer + .as_ref() + .ok_or(BeaconChainError::ExecutionLayerMissing) + .map_err(warp_utils::reject::beacon_chain_error)?; + let current_epoch = chain + .slot_clock + .now_or_genesis() + .ok_or(BeaconChainError::UnableToReadSlot) + .map_err(warp_utils::reject::beacon_chain_error)? + .epoch(T::EthSpec::slots_per_epoch()); + + debug!( + log, + "Received register validator request"; + "count" => register_val_data.len(), + ); + + let preparation_data = register_val_data + .iter() + .filter_map(|register_data| { + chain + .validator_index(®ister_data.message.pubkey) + .ok() + .flatten() + .map(|validator_index| ProposerPreparationData { + validator_index: validator_index as u64, + fee_recipient: register_data.message.fee_recipient, + }) + }) + .collect::>(); + + debug!( + log, + "Resolved validator request pubkeys"; + "count" => preparation_data.len() + ); + + // Update the prepare beacon proposer cache based on this request. + execution_layer + .update_proposer_preparation_blocking(current_epoch, &preparation_data) + .map_err(|_e| { + warp_utils::reject::custom_bad_request( + "error processing proposer preparations".to_string(), + ) + })?; + + // Call prepare beacon proposer blocking with the latest update in order to make + // sure we have a local payload to fall back to in the event of the blined block + // flow failing. + chain.prepare_beacon_proposer_blocking().map_err(|e| { + warp_utils::reject::custom_bad_request(format!( + "error updating proposer preparations: {:?}", + e + )) + })?; + + //TODO(sean): In the MEV-boost PR, add a call here to send the update request to the builder + + Ok(()) + }) + }, + ); // POST validator/sync_committee_subscriptions let post_validator_sync_committee_subscriptions = eth1_v1 .and(warp::path("validator")) @@ -3008,6 +3079,7 @@ pub fn serve( .or(post_validator_beacon_committee_subscriptions.boxed()) .or(post_validator_sync_committee_subscriptions.boxed()) .or(post_validator_prepare_beacon_proposer.boxed()) + .or(post_validator_register_validator.boxed()) .or(post_lighthouse_liveness.boxed()) .or(post_lighthouse_database_reconstruct.boxed()) .or(post_lighthouse_database_historical_blocks.boxed()) diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index 5f53a9615..2b0cfd7c4 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -11,6 +11,7 @@ use eth2::{ types::*, BeaconNodeHttpClient, Error, StatusCode, Timeouts, }; +use execution_layer::test_utils::MockExecutionLayer; use futures::stream::{Stream, StreamExt}; use futures::FutureExt; use lighthouse_network::{Enr, EnrExt, PeerId}; @@ -24,6 +25,7 @@ use task_executor::test_utils::TestRuntime; use tokio::sync::{mpsc, oneshot}; use tokio::time::Duration; use tree_hash::TreeHash; +use types::application_domain::ApplicationDomain; use types::{ AggregateSignature, BeaconState, BitList, Domain, EthSpec, Hash256, Keypair, MainnetEthSpec, RelativeEpoch, SelectionProof, SignedRoot, Slot, @@ -64,6 +66,9 @@ struct ApiTester { network_rx: mpsc::UnboundedReceiver>, local_enr: Enr, external_peer_id: PeerId, + // This is never directly accessed, but adding it creates a payload cache, which we use in tests here. + #[allow(dead_code)] + mock_el: Option>, _runtime: TestRuntime, } @@ -80,6 +85,7 @@ impl ApiTester { .spec(spec.clone()) .deterministic_keypairs(VALIDATOR_COUNT) .fresh_ephemeral_store() + .mock_execution_layer() .build(); harness.advance_slot(); @@ -214,6 +220,7 @@ impl ApiTester { network_rx, local_enr, external_peer_id, + mock_el: harness.mock_execution_layer, _runtime: harness.runtime, } } @@ -293,6 +300,7 @@ impl ApiTester { network_rx, local_enr, external_peer_id, + mock_el: None, _runtime: harness.runtime, } } @@ -2226,6 +2234,66 @@ impl ApiTester { self } + pub async fn test_post_validator_register_validator(self) -> Self { + let mut registrations = vec![]; + let mut fee_recipients = vec![]; + + let fork = self.chain.head().unwrap().beacon_state.fork(); + + for (val_index, keypair) in self.validator_keypairs.iter().enumerate() { + let pubkey = keypair.pk.compress(); + let fee_recipient = Address::from_low_u64_be(val_index as u64); + + let data = ValidatorRegistrationData { + fee_recipient, + gas_limit: 0, + timestamp: 0, + pubkey, + }; + let domain = self.chain.spec.get_domain( + Epoch::new(0), + Domain::ApplicationMask(ApplicationDomain::Builder), + &fork, + Hash256::zero(), + ); + let message = data.signing_root(domain); + let signature = keypair.sk.sign(message); + + fee_recipients.push(fee_recipient); + registrations.push(SignedValidatorRegistrationData { + message: data, + signature, + }); + } + + self.client + .post_validator_register_validator(®istrations) + .await + .unwrap(); + + for (val_index, (_, fee_recipient)) in self + .chain + .head() + .unwrap() + .beacon_state + .validators() + .into_iter() + .zip(fee_recipients.into_iter()) + .enumerate() + { + let actual = self + .chain + .execution_layer + .as_ref() + .unwrap() + .get_suggested_fee_recipient(val_index as u64) + .await; + assert_eq!(actual, fee_recipient); + } + + self + } + #[cfg(target_os = "linux")] pub async fn test_get_lighthouse_health(self) -> Self { self.client.get_lighthouse_health().await.unwrap(); @@ -2973,6 +3041,14 @@ async fn get_validator_beacon_committee_subscriptions() { .await; } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn post_validator_register_validator() { + ApiTester::new() + .await + .test_post_validator_register_validator() + .await; +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn lighthouse_endpoints() { ApiTester::new() diff --git a/book/src/api-vc-endpoints.md b/book/src/api-vc-endpoints.md index ae091130f..69cd83db5 100644 --- a/book/src/api-vc-endpoints.md +++ b/book/src/api-vc-endpoints.md @@ -134,6 +134,7 @@ Typical Responses | 200 "DOMAIN_VOLUNTARY_EXIT": "0x04000000", "DOMAIN_SELECTION_PROOF": "0x05000000", "DOMAIN_AGGREGATE_AND_PROOF": "0x06000000", + "DOMAIN_APPLICATION_MASK": "0x00000001", "MAX_VALIDATORS_PER_COMMITTEE": "2048", "SLOTS_PER_EPOCH": "32", "EPOCHS_PER_ETH1_VOTING_PERIOD": "32", diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index 3e965a2bf..529bad1d8 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -929,6 +929,23 @@ impl BeaconNodeHttpClient { Ok(()) } + /// `POST validator/register_validator` + pub async fn post_validator_register_validator( + &self, + registration_data: &[SignedValidatorRegistrationData], + ) -> Result<(), Error> { + let mut path = self.eth_path(V1)?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("validator") + .push("register_validator"); + + self.post(path, ®istration_data).await?; + + Ok(()) + } + /// `GET config/fork_schedule` pub async fn get_config_fork_schedule(&self) -> Result>, Error> { let mut path = self.eth_path(V1)?; diff --git a/consensus/types/src/application_domain.rs b/consensus/types/src/application_domain.rs new file mode 100644 index 000000000..5e33f2dfd --- /dev/null +++ b/consensus/types/src/application_domain.rs @@ -0,0 +1,16 @@ +/// This value is an application index of 0 with the bitmask applied (so it's equivalent to the bit mask). +/// Little endian hex: 0x00000001, Binary: 1000000000000000000000000 +pub const APPLICATION_DOMAIN_BUILDER: u32 = 16777216; + +#[derive(Debug, PartialEq, Clone, Copy)] +pub enum ApplicationDomain { + Builder, +} + +impl ApplicationDomain { + pub fn get_domain_constant(&self) -> u32 { + match self { + ApplicationDomain::Builder => APPLICATION_DOMAIN_BUILDER, + } + } +} diff --git a/consensus/types/src/chain_spec.rs b/consensus/types/src/chain_spec.rs index c283d4cb4..8a69505a5 100644 --- a/consensus/types/src/chain_spec.rs +++ b/consensus/types/src/chain_spec.rs @@ -1,3 +1,4 @@ +use crate::application_domain::{ApplicationDomain, APPLICATION_DOMAIN_BUILDER}; use crate::*; use eth2_serde_utils::quoted_u64::MaybeQuoted; use int_to_bytes::int_to_bytes4; @@ -20,6 +21,7 @@ pub enum Domain { SyncCommittee, ContributionAndProof, SyncCommitteeSelectionProof, + ApplicationMask(ApplicationDomain), } /// Lighthouse's internal configuration struct. @@ -159,6 +161,11 @@ pub struct ChainSpec { pub attestation_subnet_count: u64, pub random_subnets_per_validator: u64, pub epochs_per_random_subnet_subscription: u64, + + /* + * Application params + */ + pub(crate) domain_application_mask: u32, } impl ChainSpec { @@ -326,6 +333,7 @@ impl ChainSpec { Domain::SyncCommittee => self.domain_sync_committee, Domain::ContributionAndProof => self.domain_contribution_and_proof, Domain::SyncCommitteeSelectionProof => self.domain_sync_committee_selection_proof, + Domain::ApplicationMask(application_domain) => application_domain.get_domain_constant(), } } @@ -353,6 +361,17 @@ impl ChainSpec { self.compute_domain(Domain::Deposit, self.genesis_fork_version, Hash256::zero()) } + // This should be updated to include the current fork and the genesis validators root, but discussion is ongoing: + // + // https://github.com/ethereum/builder-specs/issues/14 + pub fn get_builder_domain(&self) -> Hash256 { + self.compute_domain( + Domain::ApplicationMask(ApplicationDomain::Builder), + self.genesis_fork_version, + Hash256::zero(), + ) + } + /// Return the 32-byte fork data root for the `current_version` and `genesis_validators_root`. /// /// This is used primarily in signature domains to avoid collisions across forks/chains. @@ -565,6 +584,11 @@ impl ChainSpec { maximum_gossip_clock_disparity_millis: 500, target_aggregators_per_committee: 16, epochs_per_random_subnet_subscription: 256, + + /* + * Application specific + */ + domain_application_mask: APPLICATION_DOMAIN_BUILDER, } } @@ -763,6 +787,11 @@ impl ChainSpec { maximum_gossip_clock_disparity_millis: 500, target_aggregators_per_committee: 16, epochs_per_random_subnet_subscription: 256, + + /* + * Application specific + */ + domain_application_mask: APPLICATION_DOMAIN_BUILDER, } } } @@ -1119,6 +1148,27 @@ mod tests { &spec, ); test_domain(Domain::SyncCommittee, spec.domain_sync_committee, &spec); + + // The builder domain index is zero + let builder_domain_pre_mask = [0; 4]; + test_domain( + Domain::ApplicationMask(ApplicationDomain::Builder), + apply_bit_mask(builder_domain_pre_mask, &spec), + &spec, + ); + } + + fn apply_bit_mask(domain_bytes: [u8; 4], spec: &ChainSpec) -> u32 { + let mut domain = [0; 4]; + let mask_bytes = int_to_bytes4(spec.domain_application_mask); + + // Apply application bit mask + for (i, (domain_byte, mask_byte)) in domain_bytes.iter().zip(mask_bytes.iter()).enumerate() + { + domain[i] = domain_byte | mask_byte; + } + + u32::from_le_bytes(domain) } // Test that `fork_name_at_epoch` and `fork_epoch` are consistent. diff --git a/consensus/types/src/config_and_preset.rs b/consensus/types/src/config_and_preset.rs index f721e6c3b..8b3a753bd 100644 --- a/consensus/types/src/config_and_preset.rs +++ b/consensus/types/src/config_and_preset.rs @@ -69,6 +69,10 @@ impl ConfigAndPreset { "domain_aggregate_and_proof", u32_hex(spec.domain_aggregate_and_proof), ), + ( + "domain_application_mask", + u32_hex(spec.domain_application_mask), + ), ( "target_aggregators_per_committee", spec.target_aggregators_per_committee.to_string(), diff --git a/consensus/types/src/lib.rs b/consensus/types/src/lib.rs index 22e429a58..ecfd77d7a 100644 --- a/consensus/types/src/lib.rs +++ b/consensus/types/src/lib.rs @@ -18,6 +18,7 @@ extern crate lazy_static; pub mod test_utils; pub mod aggregate_and_proof; +pub mod application_domain; pub mod attestation; pub mod attestation_data; pub mod attestation_duty; @@ -82,6 +83,7 @@ pub mod sync_committee_message; pub mod sync_selection_proof; pub mod sync_subnet_id; mod tree_hash_impls; +pub mod validator_registration_data; pub mod slot_data; #[cfg(feature = "sqlite")] @@ -157,6 +159,7 @@ pub use crate::sync_duty::SyncDuty; pub use crate::sync_selection_proof::SyncSelectionProof; pub use crate::sync_subnet_id::SyncSubnetId; pub use crate::validator::Validator; +pub use crate::validator_registration_data::*; pub use crate::validator_subscription::ValidatorSubscription; pub use crate::voluntary_exit::VoluntaryExit; diff --git a/consensus/types/src/validator_registration_data.rs b/consensus/types/src/validator_registration_data.rs new file mode 100644 index 000000000..5a3450df0 --- /dev/null +++ b/consensus/types/src/validator_registration_data.rs @@ -0,0 +1,23 @@ +use crate::*; +use serde::{Deserialize, Serialize}; +use ssz_derive::{Decode, Encode}; +use tree_hash_derive::TreeHash; + +/// Validator registration, for use in interacting with servers implementing the builder API. +#[derive(PartialEq, Debug, Serialize, Deserialize, Clone)] +pub struct SignedValidatorRegistrationData { + pub message: ValidatorRegistrationData, + pub signature: Signature, +} + +#[derive(PartialEq, Debug, Serialize, Deserialize, Clone, Encode, Decode, TreeHash)] +pub struct ValidatorRegistrationData { + pub fee_recipient: Address, + #[serde(with = "eth2_serde_utils::quoted_u64")] + pub gas_limit: u64, + #[serde(with = "eth2_serde_utils::quoted_u64")] + pub timestamp: u64, + pub pubkey: PublicKeyBytes, +} + +impl SignedRoot for ValidatorRegistrationData {} diff --git a/testing/web3signer_tests/src/lib.rs b/testing/web3signer_tests/src/lib.rs index 800f98865..e39e6515f 100644 --- a/testing/web3signer_tests/src/lib.rs +++ b/testing/web3signer_tests/src/lib.rs @@ -67,6 +67,7 @@ mod tests { impl SignedObject for SyncSelectionProof {} impl SignedObject for SyncCommitteeMessage {} impl SignedObject for SignedContributionAndProof {} + impl SignedObject for SignedValidatorRegistrationData {} /// A file format used by Web3Signer to discover and unlock keystores. #[derive(Serialize)] @@ -448,6 +449,18 @@ mod tests { } } + //TODO: remove this once the consensys web3signer includes the `validator_registration` method + #[allow(dead_code)] + fn get_validator_registration(pubkey: PublicKeyBytes) -> ValidatorRegistrationData { + let fee_recipient = Address::repeat_byte(42); + ValidatorRegistrationData { + fee_recipient, + gas_limit: 30_000_000, + timestamp: 100, + pubkey, + } + } + /// Test all the "base" (phase 0) types. async fn test_base_types(network: &str, listen_port: u16) { let network_config = Eth2NetworkConfig::constant(network).unwrap().unwrap(); @@ -499,6 +512,16 @@ mod tests { .await .unwrap() }) + //TODO: uncomment this once the consensys web3signer includes the `validator_registration` method + // + // .await + // .assert_signatures_match("validator_registration", |pubkey, validator_store| async move { + // let val_reg_data = get_validator_registration(pubkey); + // validator_store + // .sign_validator_registration_data(val_reg_data) + // .await + // .unwrap() + // }) .await; } @@ -575,6 +598,16 @@ mod tests { .unwrap() }, ) + //TODO: uncomment this once the consensys web3signer includes the `validator_registration` method + // + // .await + // .assert_signatures_match("validator_registration", |pubkey, validator_store| async move { + // let val_reg_data = get_validator_registration(pubkey); + // validator_store + // .sign_validator_registration_data(val_reg_data) + // .await + // .unwrap() + // }) .await; } diff --git a/validator_client/src/http_metrics/metrics.rs b/validator_client/src/http_metrics/metrics.rs index f405f1a2b..836aab4c1 100644 --- a/validator_client/src/http_metrics/metrics.rs +++ b/validator_client/src/http_metrics/metrics.rs @@ -85,6 +85,11 @@ lazy_static::lazy_static! { "Total count of attempted SyncSelectionProof signings", &["status"] ); + pub static ref SIGNED_VALIDATOR_REGISTRATIONS_TOTAL: Result = try_create_int_counter_vec( + "builder_validator_registrations_total", + "Total count of ValidatorRegistrationData signings", + &["status"] + ); pub static ref DUTIES_SERVICE_TIMES: Result = try_create_histogram_vec( "vc_duties_service_task_times_seconds", "Duration to perform duties service tasks", diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index ce35a0035..5e4584759 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -485,7 +485,10 @@ impl ProductionValidatorClient { self.preparation_service .clone() - .start_update_service(&self.context.eth2_config.spec) + .start_update_service( + self.config.private_tx_proposals, + &self.context.eth2_config.spec, + ) .map_err(|e| format!("Unable to start preparation service: {}", e))?; if let Some(doppelganger_service) = self.doppelganger_service.clone() { diff --git a/validator_client/src/preparation_service.rs b/validator_client/src/preparation_service.rs index b4b6caa05..34201180c 100644 --- a/validator_client/src/preparation_service.rs +++ b/validator_client/src/preparation_service.rs @@ -3,17 +3,28 @@ use crate::{ fee_recipient_file::FeeRecipientFile, validator_store::{DoppelgangerStatus, ValidatorStore}, }; +use bls::PublicKeyBytes; use environment::RuntimeContext; +use parking_lot::RwLock; use slog::{debug, error, info}; use slot_clock::SlotClock; +use std::collections::HashMap; +use std::hash::Hash; use std::ops::Deref; use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; use tokio::time::{sleep, Duration}; -use types::{Address, ChainSpec, EthSpec, ProposerPreparationData}; +use types::{ + Address, ChainSpec, EthSpec, ProposerPreparationData, SignedValidatorRegistrationData, + ValidatorRegistrationData, +}; /// Number of epochs before the Bellatrix hard fork to begin posting proposer preparations. const PROPOSER_PREPARATION_LOOKAHEAD_EPOCHS: u64 = 2; +/// Number of epochs to wait before re-submitting validator registration. +const EPOCHS_PER_VALIDATOR_REGISTRATION_SUBMISSION: u64 = 1; + /// Builds an `PreparationService`. pub struct PreparationServiceBuilder { validator_store: Option>>, @@ -83,6 +94,7 @@ impl PreparationServiceBuilder { .ok_or("Cannot build PreparationService without runtime_context")?, fee_recipient: self.fee_recipient, fee_recipient_file: self.fee_recipient_file, + validator_registration_cache: RwLock::new(HashMap::new()), }), }) } @@ -96,6 +108,32 @@ pub struct Inner { context: RuntimeContext, fee_recipient: Option
, fee_recipient_file: Option, + // Used to track unpublished validator registration changes. + validator_registration_cache: + RwLock>, +} + +#[derive(Hash, Eq, PartialEq, Debug, Clone)] +pub struct ValidatorRegistrationKey { + pub fee_recipient: Address, + pub gas_limit: u64, + pub pubkey: PublicKeyBytes, +} + +impl From for ValidatorRegistrationKey { + fn from(data: ValidatorRegistrationData) -> Self { + let ValidatorRegistrationData { + fee_recipient, + gas_limit, + timestamp: _, + pubkey, + } = data; + Self { + fee_recipient, + gas_limit, + pubkey, + } + } } /// Attempts to produce proposer preparations for all known validators at the beginning of each epoch. @@ -120,8 +158,19 @@ impl Deref for PreparationService { } impl PreparationService { + pub fn start_update_service( + self, + start_registration_service: bool, + spec: &ChainSpec, + ) -> Result<(), String> { + if start_registration_service { + self.clone().start_validator_registration_service(spec)?; + } + self.start_proposer_prepare_service(spec) + } + /// Starts the service which periodically produces proposer preparations. - pub fn start_update_service(self, spec: &ChainSpec) -> Result<(), String> { + pub fn start_proposer_prepare_service(self, spec: &ChainSpec) -> Result<(), String> { let log = self.context.log().clone(); let slot_duration = Duration::from_secs(spec.seconds_per_slot); @@ -163,6 +212,41 @@ impl PreparationService { Ok(()) } + /// Starts the service which periodically sends connected beacon nodes validator registration information. + pub fn start_validator_registration_service(self, spec: &ChainSpec) -> Result<(), String> { + let log = self.context.log().clone(); + + info!( + log, + "Validator registration service started"; + ); + + let spec = spec.clone(); + let slot_duration = Duration::from_secs(spec.seconds_per_slot); + + let executor = self.context.executor.clone(); + + let validator_registration_fut = async move { + loop { + // Poll the endpoint immediately to ensure fee recipients are received. + if let Err(e) = self.register_validators(&spec).await { + error!(log,"Error during validator registration";"error" => ?e); + } + + // Wait one slot if the register validator request fails or if we should not publish at the current slot. + if let Some(duration_to_next_slot) = self.slot_clock.duration_to_next_slot() { + sleep(duration_to_next_slot).await; + } else { + error!(log, "Failed to read slot clock"); + // If we can't read the slot clock, just wait another slot. + sleep(slot_duration).await; + } + } + }; + executor.spawn(validator_registration_fut, "validator_registration_service"); + Ok(()) + } + /// Return `true` if the current slot is close to or past the Bellatrix fork epoch. /// /// This avoids spamming the BN with preparations before the Bellatrix fork epoch, which may @@ -188,6 +272,33 @@ impl PreparationService { } fn collect_preparation_data(&self, spec: &ChainSpec) -> Vec { + self.collect_data(spec, |_, validator_index, fee_recipient| { + ProposerPreparationData { + validator_index, + fee_recipient, + } + }) + } + + fn collect_validator_registration_keys( + &self, + spec: &ChainSpec, + ) -> Vec { + self.collect_data(spec, |pubkey, _, fee_recipient| { + ValidatorRegistrationKey { + fee_recipient, + //TODO(sean) this is geth's default, we should make this configurable and maybe have the default be dynamic. + // Discussion here: https://github.com/ethereum/builder-specs/issues/17 + gas_limit: 30_000_000, + pubkey, + } + }) + } + + fn collect_data(&self, spec: &ChainSpec, map_fn: G) -> Vec + where + G: Fn(PublicKeyBytes, u64, Address) -> U, + { let log = self.context.log(); let fee_recipient_file = self @@ -234,10 +345,7 @@ impl PreparationService { .or(self.fee_recipient); if let Some(fee_recipient) = fee_recipient { - Some(ProposerPreparationData { - validator_index, - fee_recipient, - }) + Some(map_fn(pubkey, validator_index, fee_recipient)) } else { if spec.bellatrix_fork_epoch.is_some() { error!( @@ -284,4 +392,116 @@ impl PreparationService { } Ok(()) } + + /// Register validators with builders, used in the blinded block proposal flow. + async fn register_validators(&self, spec: &ChainSpec) -> Result<(), String> { + let registration_keys = self.collect_validator_registration_keys(spec); + + let mut changed_keys = vec![]; + + // Need to scope this so the read lock is not held across an await point (I don't know why + // but the explicit `drop` is not enough). + { + let guard = self.validator_registration_cache.read(); + for key in registration_keys.iter() { + if !guard.contains_key(key) { + changed_keys.push(key.clone()); + } + } + drop(guard); + } + + // Check if any have changed or it's been `EPOCHS_PER_VALIDATOR_REGISTRATION_SUBMISSION`. + if let Some(slot) = self.slot_clock.now() { + if slot % (E::slots_per_epoch() * EPOCHS_PER_VALIDATOR_REGISTRATION_SUBMISSION) == 0 { + self.publish_validator_registration_data(registration_keys) + .await?; + } else if !changed_keys.is_empty() { + self.publish_validator_registration_data(changed_keys) + .await?; + } + } + + Ok(()) + } + + async fn publish_validator_registration_data( + &self, + registration_keys: Vec, + ) -> Result<(), String> { + let log = self.context.log(); + + let registration_data_len = registration_keys.len(); + let mut signed = Vec::with_capacity(registration_data_len); + + for key in registration_keys { + let cached_registration_opt = + self.validator_registration_cache.read().get(&key).cloned(); + + let signed_data = if let Some(signed_data) = cached_registration_opt { + signed_data + } else { + let timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map_err(|e| format!("{e:?}"))? + .as_secs(); + + let ValidatorRegistrationKey { + fee_recipient, + gas_limit, + pubkey, + } = key.clone(); + + let signed_data = match self + .validator_store + .sign_validator_registration_data(ValidatorRegistrationData { + fee_recipient, + gas_limit, + timestamp, + pubkey, + }) + .await + { + Ok(data) => data, + Err(e) => { + error!(log, "Unable to sign validator registration data"; "error" => ?e, "pubkey" => ?pubkey); + continue; + } + }; + + self.validator_registration_cache + .write() + .insert(key, signed_data.clone()); + + signed_data + }; + signed.push(signed_data); + } + + if !signed.is_empty() { + let signed_ref = signed.as_slice(); + + match self + .beacon_nodes + .first_success(RequireSynced::Yes, |beacon_node| async move { + beacon_node + .post_validator_register_validator(signed_ref) + .await + }) + .await + { + Ok(()) => debug!( + log, + "Published validator registration"; + "count" => registration_data_len, + ), + Err(e) => error!( + log, + "Unable to publish validator registration"; + "error" => %e, + ), + } + } + Ok(()) + } } diff --git a/validator_client/src/signing_method.rs b/validator_client/src/signing_method.rs index 0daefc43c..de69d9900 100644 --- a/validator_client/src/signing_method.rs +++ b/validator_client/src/signing_method.rs @@ -30,6 +30,7 @@ pub enum Error { ShuttingDown, TokioJoin(String), MergeForkNotSupported, + GenesisForkVersionRequired, } /// Enumerates all messages that can be signed by a validator. @@ -45,6 +46,7 @@ pub enum SignableMessage<'a, T: EthSpec, Payload: ExecPayload = FullPayload), + ValidatorRegistration(&'a ValidatorRegistrationData), } impl<'a, T: EthSpec, Payload: ExecPayload> SignableMessage<'a, T, Payload> { @@ -64,6 +66,7 @@ impl<'a, T: EthSpec, Payload: ExecPayload> SignableMessage<'a, T, Payload> { beacon_block_root, .. } => beacon_block_root.signing_root(domain), SignableMessage::SignedContributionAndProof(c) => c.signing_root(domain), + SignableMessage::ValidatorRegistration(v) => v.signing_root(domain), } } } @@ -129,6 +132,22 @@ impl SigningMethod { let signing_root = signable_message.signing_root(domain_hash); + let fork_info = Some(ForkInfo { + fork, + genesis_validators_root, + }); + + self.get_signature_from_root(signable_message, signing_root, executor, fork_info) + .await + } + + pub async fn get_signature_from_root>( + &self, + signable_message: SignableMessage<'_, T, Payload>, + signing_root: Hash256, + executor: &TaskExecutor, + fork_info: Option, + ) -> Result { match self { SigningMethod::LocalKeystore { voting_keypair, .. } => { let _timer = @@ -181,21 +200,21 @@ impl SigningMethod { SignableMessage::SignedContributionAndProof(c) => { Web3SignerObject::ContributionAndProof(c) } + SignableMessage::ValidatorRegistration(v) => { + Web3SignerObject::ValidatorRegistration(v) + } }; // Determine the Web3Signer message type. let message_type = object.message_type(); - // The `fork_info` field is not required for deposits since they sign across the - // genesis fork version. - let fork_info = if let Web3SignerObject::Deposit { .. } = &object { - None - } else { - Some(ForkInfo { - fork, - genesis_validators_root, - }) - }; + if matches!( + object, + Web3SignerObject::Deposit { .. } | Web3SignerObject::ValidatorRegistration(_) + ) && fork_info.is_some() + { + return Err(Error::GenesisForkVersionRequired); + } let request = SigningRequest { message_type, diff --git a/validator_client/src/signing_method/web3signer.rs b/validator_client/src/signing_method/web3signer.rs index 9ac1655cc..0ab37484b 100644 --- a/validator_client/src/signing_method/web3signer.rs +++ b/validator_client/src/signing_method/web3signer.rs @@ -17,6 +17,7 @@ pub enum MessageType { SyncCommitteeMessage, SyncCommitteeSelectionProof, SyncCommitteeContributionAndProof, + ValidatorRegistration, } #[derive(Debug, PartialEq, Copy, Clone, Serialize)] @@ -64,6 +65,7 @@ pub enum Web3SignerObject<'a, T: EthSpec, Payload: ExecPayload> { }, SyncAggregatorSelectionData(&'a SyncAggregatorSelectionData), ContributionAndProof(&'a ContributionAndProof), + ValidatorRegistration(&'a ValidatorRegistrationData), } impl<'a, T: EthSpec, Payload: ExecPayload> Web3SignerObject<'a, T, Payload> { @@ -93,6 +95,7 @@ impl<'a, T: EthSpec, Payload: ExecPayload> Web3SignerObject<'a, T, Payload> { Web3SignerObject::ContributionAndProof(_) => { MessageType::SyncCommitteeContributionAndProof } + Web3SignerObject::ValidatorRegistration(_) => MessageType::ValidatorRegistration, } } } diff --git a/validator_client/src/validator_store.rs b/validator_client/src/validator_store.rs index b39ef9ef8..36ec5e895 100644 --- a/validator_client/src/validator_store.rs +++ b/validator_client/src/validator_store.rs @@ -20,9 +20,9 @@ use types::{ attestation::Error as AttestationError, graffiti::GraffitiString, Address, AggregateAndProof, Attestation, BeaconBlock, BlindedPayload, ChainSpec, ContributionAndProof, Domain, Epoch, EthSpec, ExecPayload, Fork, Graffiti, Hash256, Keypair, PublicKeyBytes, SelectionProof, - Signature, SignedAggregateAndProof, SignedBeaconBlock, SignedContributionAndProof, Slot, - SyncAggregatorSelectionData, SyncCommitteeContribution, SyncCommitteeMessage, - SyncSelectionProof, SyncSubnetId, + Signature, SignedAggregateAndProof, SignedBeaconBlock, SignedContributionAndProof, SignedRoot, + SignedValidatorRegistrationData, Slot, SyncAggregatorSelectionData, SyncCommitteeContribution, + SyncCommitteeMessage, SyncSelectionProof, SyncSubnetId, ValidatorRegistrationData, }; use validator_dir::ValidatorDir; @@ -524,6 +524,35 @@ impl ValidatorStore { } } + pub async fn sign_validator_registration_data( + &self, + validator_registration_data: ValidatorRegistrationData, + ) -> Result { + let domain_hash = self.spec.get_builder_domain(); + let signing_root = validator_registration_data.signing_root(domain_hash); + + let signing_method = + self.doppelganger_bypassed_signing_method(validator_registration_data.pubkey)?; + let signature = signing_method + .get_signature_from_root::>( + SignableMessage::ValidatorRegistration(&validator_registration_data), + signing_root, + &self.task_executor, + None, + ) + .await?; + + metrics::inc_counter_vec( + &metrics::SIGNED_VALIDATOR_REGISTRATIONS_TOTAL, + &[metrics::SUCCESS], + ); + + Ok(SignedValidatorRegistrationData { + message: validator_registration_data, + signature, + }) + } + /// Signs an `AggregateAndProof` for a given validator. /// /// The resulting `SignedAggregateAndProof` is sent on the aggregation channel and cannot be