diff --git a/Cargo.lock b/Cargo.lock index 3dbe00565..3bdce9138 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2530,6 +2530,7 @@ dependencies = [ "safe_arith", "sensitive_url", "serde", + "serde_json", "slog", "slot_clock", "state_processing", diff --git a/beacon_node/http_api/Cargo.toml b/beacon_node/http_api/Cargo.toml index 9dd2af7d1..07fb99239 100644 --- a/beacon_node/http_api/Cargo.toml +++ b/beacon_node/http_api/Cargo.toml @@ -39,6 +39,7 @@ environment = { path = "../../lighthouse/environment" } tree_hash = "0.4.1" sensitive_url = { path = "../../common/sensitive_url" } logging = { path = "../../common/logging" } +serde_json = "1.0.58" [[test]] name = "bn_http_api_tests" diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 379033a11..06dc96876 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -49,8 +49,8 @@ use types::{ BlindedPayload, CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName, FullPayload, ProposerPreparationData, ProposerSlashing, RelativeEpoch, Signature, SignedAggregateAndProof, SignedBeaconBlock, SignedBeaconBlockMerge, SignedBlindedBeaconBlock, - SignedContributionAndProof, SignedVoluntaryExit, Slot, SyncCommitteeMessage, - SyncContributionData, + SignedContributionAndProof, SignedValidatorRegistrationData, SignedVoluntaryExit, Slot, + SyncCommitteeMessage, SyncContributionData, }; use version::{ add_consensus_version_header, fork_versioned_response, inconsistent_fork_rejection, @@ -2408,12 +2408,10 @@ pub fn serve( .and(warp::path::end()) .and(not_while_syncing_filter.clone()) .and(chain_filter.clone()) - .and(warp::addr::remote()) .and(log_filter.clone()) .and(warp::body::json()) .and_then( |chain: Arc>, - client_addr: Option, log: Logger, preparation_data: Vec| { blocking_json_task(move || { @@ -2430,9 +2428,6 @@ pub fn serve( log, "Received proposer preparation data"; "count" => preparation_data.len(), - "client" => client_addr - .map(|a| a.to_string()) - .unwrap_or_else(|| "unknown".to_string()), ); execution_layer @@ -2455,6 +2450,82 @@ pub fn serve( }, ); + // POST validator/register_validator + let post_validator_register_validator = eth1_v1 + .and(warp::path("validator")) + .and(warp::path("register_validator")) + .and(warp::path::end()) + .and(chain_filter.clone()) + .and(log_filter.clone()) + .and(warp::body::json()) + .and_then( + |chain: Arc>, + log: Logger, + register_val_data: Vec| { + blocking_json_task(move || { + let execution_layer = chain + .execution_layer + .as_ref() + .ok_or(BeaconChainError::ExecutionLayerMissing) + .map_err(warp_utils::reject::beacon_chain_error)?; + let current_epoch = chain + .slot_clock + .now_or_genesis() + .ok_or(BeaconChainError::UnableToReadSlot) + .map_err(warp_utils::reject::beacon_chain_error)? + .epoch(T::EthSpec::slots_per_epoch()); + + debug!( + log, + "Received register validator request"; + "count" => register_val_data.len(), + ); + + let preparation_data = register_val_data + .iter() + .filter_map(|register_data| { + chain + .validator_index(®ister_data.message.pubkey) + .ok() + .flatten() + .map(|validator_index| ProposerPreparationData { + validator_index: validator_index as u64, + fee_recipient: register_data.message.fee_recipient, + }) + }) + .collect::>(); + + debug!( + log, + "Resolved validator request pubkeys"; + "count" => preparation_data.len() + ); + + // Update the prepare beacon proposer cache based on this request. + execution_layer + .update_proposer_preparation_blocking(current_epoch, &preparation_data) + .map_err(|_e| { + warp_utils::reject::custom_bad_request( + "error processing proposer preparations".to_string(), + ) + })?; + + // Call prepare beacon proposer blocking with the latest update in order to make + // sure we have a local payload to fall back to in the event of the blined block + // flow failing. + chain.prepare_beacon_proposer_blocking().map_err(|e| { + warp_utils::reject::custom_bad_request(format!( + "error updating proposer preparations: {:?}", + e + )) + })?; + + //TODO(sean): In the MEV-boost PR, add a call here to send the update request to the builder + + Ok(()) + }) + }, + ); // POST validator/sync_committee_subscriptions let post_validator_sync_committee_subscriptions = eth1_v1 .and(warp::path("validator")) @@ -3008,6 +3079,7 @@ pub fn serve( .or(post_validator_beacon_committee_subscriptions.boxed()) .or(post_validator_sync_committee_subscriptions.boxed()) .or(post_validator_prepare_beacon_proposer.boxed()) + .or(post_validator_register_validator.boxed()) .or(post_lighthouse_liveness.boxed()) .or(post_lighthouse_database_reconstruct.boxed()) .or(post_lighthouse_database_historical_blocks.boxed()) diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index 5f53a9615..2b0cfd7c4 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -11,6 +11,7 @@ use eth2::{ types::*, BeaconNodeHttpClient, Error, StatusCode, Timeouts, }; +use execution_layer::test_utils::MockExecutionLayer; use futures::stream::{Stream, StreamExt}; use futures::FutureExt; use lighthouse_network::{Enr, EnrExt, PeerId}; @@ -24,6 +25,7 @@ use task_executor::test_utils::TestRuntime; use tokio::sync::{mpsc, oneshot}; use tokio::time::Duration; use tree_hash::TreeHash; +use types::application_domain::ApplicationDomain; use types::{ AggregateSignature, BeaconState, BitList, Domain, EthSpec, Hash256, Keypair, MainnetEthSpec, RelativeEpoch, SelectionProof, SignedRoot, Slot, @@ -64,6 +66,9 @@ struct ApiTester { network_rx: mpsc::UnboundedReceiver>, local_enr: Enr, external_peer_id: PeerId, + // This is never directly accessed, but adding it creates a payload cache, which we use in tests here. + #[allow(dead_code)] + mock_el: Option>, _runtime: TestRuntime, } @@ -80,6 +85,7 @@ impl ApiTester { .spec(spec.clone()) .deterministic_keypairs(VALIDATOR_COUNT) .fresh_ephemeral_store() + .mock_execution_layer() .build(); harness.advance_slot(); @@ -214,6 +220,7 @@ impl ApiTester { network_rx, local_enr, external_peer_id, + mock_el: harness.mock_execution_layer, _runtime: harness.runtime, } } @@ -293,6 +300,7 @@ impl ApiTester { network_rx, local_enr, external_peer_id, + mock_el: None, _runtime: harness.runtime, } } @@ -2226,6 +2234,66 @@ impl ApiTester { self } + pub async fn test_post_validator_register_validator(self) -> Self { + let mut registrations = vec![]; + let mut fee_recipients = vec![]; + + let fork = self.chain.head().unwrap().beacon_state.fork(); + + for (val_index, keypair) in self.validator_keypairs.iter().enumerate() { + let pubkey = keypair.pk.compress(); + let fee_recipient = Address::from_low_u64_be(val_index as u64); + + let data = ValidatorRegistrationData { + fee_recipient, + gas_limit: 0, + timestamp: 0, + pubkey, + }; + let domain = self.chain.spec.get_domain( + Epoch::new(0), + Domain::ApplicationMask(ApplicationDomain::Builder), + &fork, + Hash256::zero(), + ); + let message = data.signing_root(domain); + let signature = keypair.sk.sign(message); + + fee_recipients.push(fee_recipient); + registrations.push(SignedValidatorRegistrationData { + message: data, + signature, + }); + } + + self.client + .post_validator_register_validator(®istrations) + .await + .unwrap(); + + for (val_index, (_, fee_recipient)) in self + .chain + .head() + .unwrap() + .beacon_state + .validators() + .into_iter() + .zip(fee_recipients.into_iter()) + .enumerate() + { + let actual = self + .chain + .execution_layer + .as_ref() + .unwrap() + .get_suggested_fee_recipient(val_index as u64) + .await; + assert_eq!(actual, fee_recipient); + } + + self + } + #[cfg(target_os = "linux")] pub async fn test_get_lighthouse_health(self) -> Self { self.client.get_lighthouse_health().await.unwrap(); @@ -2973,6 +3041,14 @@ async fn get_validator_beacon_committee_subscriptions() { .await; } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn post_validator_register_validator() { + ApiTester::new() + .await + .test_post_validator_register_validator() + .await; +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn lighthouse_endpoints() { ApiTester::new() diff --git a/book/src/api-vc-endpoints.md b/book/src/api-vc-endpoints.md index ae091130f..69cd83db5 100644 --- a/book/src/api-vc-endpoints.md +++ b/book/src/api-vc-endpoints.md @@ -134,6 +134,7 @@ Typical Responses | 200 "DOMAIN_VOLUNTARY_EXIT": "0x04000000", "DOMAIN_SELECTION_PROOF": "0x05000000", "DOMAIN_AGGREGATE_AND_PROOF": "0x06000000", + "DOMAIN_APPLICATION_MASK": "0x00000001", "MAX_VALIDATORS_PER_COMMITTEE": "2048", "SLOTS_PER_EPOCH": "32", "EPOCHS_PER_ETH1_VOTING_PERIOD": "32", diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index 3e965a2bf..529bad1d8 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -929,6 +929,23 @@ impl BeaconNodeHttpClient { Ok(()) } + /// `POST validator/register_validator` + pub async fn post_validator_register_validator( + &self, + registration_data: &[SignedValidatorRegistrationData], + ) -> Result<(), Error> { + let mut path = self.eth_path(V1)?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("validator") + .push("register_validator"); + + self.post(path, ®istration_data).await?; + + Ok(()) + } + /// `GET config/fork_schedule` pub async fn get_config_fork_schedule(&self) -> Result>, Error> { let mut path = self.eth_path(V1)?; diff --git a/consensus/types/src/application_domain.rs b/consensus/types/src/application_domain.rs new file mode 100644 index 000000000..5e33f2dfd --- /dev/null +++ b/consensus/types/src/application_domain.rs @@ -0,0 +1,16 @@ +/// This value is an application index of 0 with the bitmask applied (so it's equivalent to the bit mask). +/// Little endian hex: 0x00000001, Binary: 1000000000000000000000000 +pub const APPLICATION_DOMAIN_BUILDER: u32 = 16777216; + +#[derive(Debug, PartialEq, Clone, Copy)] +pub enum ApplicationDomain { + Builder, +} + +impl ApplicationDomain { + pub fn get_domain_constant(&self) -> u32 { + match self { + ApplicationDomain::Builder => APPLICATION_DOMAIN_BUILDER, + } + } +} diff --git a/consensus/types/src/chain_spec.rs b/consensus/types/src/chain_spec.rs index c283d4cb4..8a69505a5 100644 --- a/consensus/types/src/chain_spec.rs +++ b/consensus/types/src/chain_spec.rs @@ -1,3 +1,4 @@ +use crate::application_domain::{ApplicationDomain, APPLICATION_DOMAIN_BUILDER}; use crate::*; use eth2_serde_utils::quoted_u64::MaybeQuoted; use int_to_bytes::int_to_bytes4; @@ -20,6 +21,7 @@ pub enum Domain { SyncCommittee, ContributionAndProof, SyncCommitteeSelectionProof, + ApplicationMask(ApplicationDomain), } /// Lighthouse's internal configuration struct. @@ -159,6 +161,11 @@ pub struct ChainSpec { pub attestation_subnet_count: u64, pub random_subnets_per_validator: u64, pub epochs_per_random_subnet_subscription: u64, + + /* + * Application params + */ + pub(crate) domain_application_mask: u32, } impl ChainSpec { @@ -326,6 +333,7 @@ impl ChainSpec { Domain::SyncCommittee => self.domain_sync_committee, Domain::ContributionAndProof => self.domain_contribution_and_proof, Domain::SyncCommitteeSelectionProof => self.domain_sync_committee_selection_proof, + Domain::ApplicationMask(application_domain) => application_domain.get_domain_constant(), } } @@ -353,6 +361,17 @@ impl ChainSpec { self.compute_domain(Domain::Deposit, self.genesis_fork_version, Hash256::zero()) } + // This should be updated to include the current fork and the genesis validators root, but discussion is ongoing: + // + // https://github.com/ethereum/builder-specs/issues/14 + pub fn get_builder_domain(&self) -> Hash256 { + self.compute_domain( + Domain::ApplicationMask(ApplicationDomain::Builder), + self.genesis_fork_version, + Hash256::zero(), + ) + } + /// Return the 32-byte fork data root for the `current_version` and `genesis_validators_root`. /// /// This is used primarily in signature domains to avoid collisions across forks/chains. @@ -565,6 +584,11 @@ impl ChainSpec { maximum_gossip_clock_disparity_millis: 500, target_aggregators_per_committee: 16, epochs_per_random_subnet_subscription: 256, + + /* + * Application specific + */ + domain_application_mask: APPLICATION_DOMAIN_BUILDER, } } @@ -763,6 +787,11 @@ impl ChainSpec { maximum_gossip_clock_disparity_millis: 500, target_aggregators_per_committee: 16, epochs_per_random_subnet_subscription: 256, + + /* + * Application specific + */ + domain_application_mask: APPLICATION_DOMAIN_BUILDER, } } } @@ -1119,6 +1148,27 @@ mod tests { &spec, ); test_domain(Domain::SyncCommittee, spec.domain_sync_committee, &spec); + + // The builder domain index is zero + let builder_domain_pre_mask = [0; 4]; + test_domain( + Domain::ApplicationMask(ApplicationDomain::Builder), + apply_bit_mask(builder_domain_pre_mask, &spec), + &spec, + ); + } + + fn apply_bit_mask(domain_bytes: [u8; 4], spec: &ChainSpec) -> u32 { + let mut domain = [0; 4]; + let mask_bytes = int_to_bytes4(spec.domain_application_mask); + + // Apply application bit mask + for (i, (domain_byte, mask_byte)) in domain_bytes.iter().zip(mask_bytes.iter()).enumerate() + { + domain[i] = domain_byte | mask_byte; + } + + u32::from_le_bytes(domain) } // Test that `fork_name_at_epoch` and `fork_epoch` are consistent. diff --git a/consensus/types/src/config_and_preset.rs b/consensus/types/src/config_and_preset.rs index f721e6c3b..8b3a753bd 100644 --- a/consensus/types/src/config_and_preset.rs +++ b/consensus/types/src/config_and_preset.rs @@ -69,6 +69,10 @@ impl ConfigAndPreset { "domain_aggregate_and_proof", u32_hex(spec.domain_aggregate_and_proof), ), + ( + "domain_application_mask", + u32_hex(spec.domain_application_mask), + ), ( "target_aggregators_per_committee", spec.target_aggregators_per_committee.to_string(), diff --git a/consensus/types/src/lib.rs b/consensus/types/src/lib.rs index 22e429a58..ecfd77d7a 100644 --- a/consensus/types/src/lib.rs +++ b/consensus/types/src/lib.rs @@ -18,6 +18,7 @@ extern crate lazy_static; pub mod test_utils; pub mod aggregate_and_proof; +pub mod application_domain; pub mod attestation; pub mod attestation_data; pub mod attestation_duty; @@ -82,6 +83,7 @@ pub mod sync_committee_message; pub mod sync_selection_proof; pub mod sync_subnet_id; mod tree_hash_impls; +pub mod validator_registration_data; pub mod slot_data; #[cfg(feature = "sqlite")] @@ -157,6 +159,7 @@ pub use crate::sync_duty::SyncDuty; pub use crate::sync_selection_proof::SyncSelectionProof; pub use crate::sync_subnet_id::SyncSubnetId; pub use crate::validator::Validator; +pub use crate::validator_registration_data::*; pub use crate::validator_subscription::ValidatorSubscription; pub use crate::voluntary_exit::VoluntaryExit; diff --git a/consensus/types/src/validator_registration_data.rs b/consensus/types/src/validator_registration_data.rs new file mode 100644 index 000000000..5a3450df0 --- /dev/null +++ b/consensus/types/src/validator_registration_data.rs @@ -0,0 +1,23 @@ +use crate::*; +use serde::{Deserialize, Serialize}; +use ssz_derive::{Decode, Encode}; +use tree_hash_derive::TreeHash; + +/// Validator registration, for use in interacting with servers implementing the builder API. +#[derive(PartialEq, Debug, Serialize, Deserialize, Clone)] +pub struct SignedValidatorRegistrationData { + pub message: ValidatorRegistrationData, + pub signature: Signature, +} + +#[derive(PartialEq, Debug, Serialize, Deserialize, Clone, Encode, Decode, TreeHash)] +pub struct ValidatorRegistrationData { + pub fee_recipient: Address, + #[serde(with = "eth2_serde_utils::quoted_u64")] + pub gas_limit: u64, + #[serde(with = "eth2_serde_utils::quoted_u64")] + pub timestamp: u64, + pub pubkey: PublicKeyBytes, +} + +impl SignedRoot for ValidatorRegistrationData {} diff --git a/testing/web3signer_tests/src/lib.rs b/testing/web3signer_tests/src/lib.rs index 800f98865..e39e6515f 100644 --- a/testing/web3signer_tests/src/lib.rs +++ b/testing/web3signer_tests/src/lib.rs @@ -67,6 +67,7 @@ mod tests { impl SignedObject for SyncSelectionProof {} impl SignedObject for SyncCommitteeMessage {} impl SignedObject for SignedContributionAndProof {} + impl SignedObject for SignedValidatorRegistrationData {} /// A file format used by Web3Signer to discover and unlock keystores. #[derive(Serialize)] @@ -448,6 +449,18 @@ mod tests { } } + //TODO: remove this once the consensys web3signer includes the `validator_registration` method + #[allow(dead_code)] + fn get_validator_registration(pubkey: PublicKeyBytes) -> ValidatorRegistrationData { + let fee_recipient = Address::repeat_byte(42); + ValidatorRegistrationData { + fee_recipient, + gas_limit: 30_000_000, + timestamp: 100, + pubkey, + } + } + /// Test all the "base" (phase 0) types. async fn test_base_types(network: &str, listen_port: u16) { let network_config = Eth2NetworkConfig::constant(network).unwrap().unwrap(); @@ -499,6 +512,16 @@ mod tests { .await .unwrap() }) + //TODO: uncomment this once the consensys web3signer includes the `validator_registration` method + // + // .await + // .assert_signatures_match("validator_registration", |pubkey, validator_store| async move { + // let val_reg_data = get_validator_registration(pubkey); + // validator_store + // .sign_validator_registration_data(val_reg_data) + // .await + // .unwrap() + // }) .await; } @@ -575,6 +598,16 @@ mod tests { .unwrap() }, ) + //TODO: uncomment this once the consensys web3signer includes the `validator_registration` method + // + // .await + // .assert_signatures_match("validator_registration", |pubkey, validator_store| async move { + // let val_reg_data = get_validator_registration(pubkey); + // validator_store + // .sign_validator_registration_data(val_reg_data) + // .await + // .unwrap() + // }) .await; } diff --git a/validator_client/src/http_metrics/metrics.rs b/validator_client/src/http_metrics/metrics.rs index f405f1a2b..836aab4c1 100644 --- a/validator_client/src/http_metrics/metrics.rs +++ b/validator_client/src/http_metrics/metrics.rs @@ -85,6 +85,11 @@ lazy_static::lazy_static! { "Total count of attempted SyncSelectionProof signings", &["status"] ); + pub static ref SIGNED_VALIDATOR_REGISTRATIONS_TOTAL: Result = try_create_int_counter_vec( + "builder_validator_registrations_total", + "Total count of ValidatorRegistrationData signings", + &["status"] + ); pub static ref DUTIES_SERVICE_TIMES: Result = try_create_histogram_vec( "vc_duties_service_task_times_seconds", "Duration to perform duties service tasks", diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index ce35a0035..5e4584759 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -485,7 +485,10 @@ impl ProductionValidatorClient { self.preparation_service .clone() - .start_update_service(&self.context.eth2_config.spec) + .start_update_service( + self.config.private_tx_proposals, + &self.context.eth2_config.spec, + ) .map_err(|e| format!("Unable to start preparation service: {}", e))?; if let Some(doppelganger_service) = self.doppelganger_service.clone() { diff --git a/validator_client/src/preparation_service.rs b/validator_client/src/preparation_service.rs index b4b6caa05..34201180c 100644 --- a/validator_client/src/preparation_service.rs +++ b/validator_client/src/preparation_service.rs @@ -3,17 +3,28 @@ use crate::{ fee_recipient_file::FeeRecipientFile, validator_store::{DoppelgangerStatus, ValidatorStore}, }; +use bls::PublicKeyBytes; use environment::RuntimeContext; +use parking_lot::RwLock; use slog::{debug, error, info}; use slot_clock::SlotClock; +use std::collections::HashMap; +use std::hash::Hash; use std::ops::Deref; use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; use tokio::time::{sleep, Duration}; -use types::{Address, ChainSpec, EthSpec, ProposerPreparationData}; +use types::{ + Address, ChainSpec, EthSpec, ProposerPreparationData, SignedValidatorRegistrationData, + ValidatorRegistrationData, +}; /// Number of epochs before the Bellatrix hard fork to begin posting proposer preparations. const PROPOSER_PREPARATION_LOOKAHEAD_EPOCHS: u64 = 2; +/// Number of epochs to wait before re-submitting validator registration. +const EPOCHS_PER_VALIDATOR_REGISTRATION_SUBMISSION: u64 = 1; + /// Builds an `PreparationService`. pub struct PreparationServiceBuilder { validator_store: Option>>, @@ -83,6 +94,7 @@ impl PreparationServiceBuilder { .ok_or("Cannot build PreparationService without runtime_context")?, fee_recipient: self.fee_recipient, fee_recipient_file: self.fee_recipient_file, + validator_registration_cache: RwLock::new(HashMap::new()), }), }) } @@ -96,6 +108,32 @@ pub struct Inner { context: RuntimeContext, fee_recipient: Option
, fee_recipient_file: Option, + // Used to track unpublished validator registration changes. + validator_registration_cache: + RwLock>, +} + +#[derive(Hash, Eq, PartialEq, Debug, Clone)] +pub struct ValidatorRegistrationKey { + pub fee_recipient: Address, + pub gas_limit: u64, + pub pubkey: PublicKeyBytes, +} + +impl From for ValidatorRegistrationKey { + fn from(data: ValidatorRegistrationData) -> Self { + let ValidatorRegistrationData { + fee_recipient, + gas_limit, + timestamp: _, + pubkey, + } = data; + Self { + fee_recipient, + gas_limit, + pubkey, + } + } } /// Attempts to produce proposer preparations for all known validators at the beginning of each epoch. @@ -120,8 +158,19 @@ impl Deref for PreparationService { } impl PreparationService { + pub fn start_update_service( + self, + start_registration_service: bool, + spec: &ChainSpec, + ) -> Result<(), String> { + if start_registration_service { + self.clone().start_validator_registration_service(spec)?; + } + self.start_proposer_prepare_service(spec) + } + /// Starts the service which periodically produces proposer preparations. - pub fn start_update_service(self, spec: &ChainSpec) -> Result<(), String> { + pub fn start_proposer_prepare_service(self, spec: &ChainSpec) -> Result<(), String> { let log = self.context.log().clone(); let slot_duration = Duration::from_secs(spec.seconds_per_slot); @@ -163,6 +212,41 @@ impl PreparationService { Ok(()) } + /// Starts the service which periodically sends connected beacon nodes validator registration information. + pub fn start_validator_registration_service(self, spec: &ChainSpec) -> Result<(), String> { + let log = self.context.log().clone(); + + info!( + log, + "Validator registration service started"; + ); + + let spec = spec.clone(); + let slot_duration = Duration::from_secs(spec.seconds_per_slot); + + let executor = self.context.executor.clone(); + + let validator_registration_fut = async move { + loop { + // Poll the endpoint immediately to ensure fee recipients are received. + if let Err(e) = self.register_validators(&spec).await { + error!(log,"Error during validator registration";"error" => ?e); + } + + // Wait one slot if the register validator request fails or if we should not publish at the current slot. + if let Some(duration_to_next_slot) = self.slot_clock.duration_to_next_slot() { + sleep(duration_to_next_slot).await; + } else { + error!(log, "Failed to read slot clock"); + // If we can't read the slot clock, just wait another slot. + sleep(slot_duration).await; + } + } + }; + executor.spawn(validator_registration_fut, "validator_registration_service"); + Ok(()) + } + /// Return `true` if the current slot is close to or past the Bellatrix fork epoch. /// /// This avoids spamming the BN with preparations before the Bellatrix fork epoch, which may @@ -188,6 +272,33 @@ impl PreparationService { } fn collect_preparation_data(&self, spec: &ChainSpec) -> Vec { + self.collect_data(spec, |_, validator_index, fee_recipient| { + ProposerPreparationData { + validator_index, + fee_recipient, + } + }) + } + + fn collect_validator_registration_keys( + &self, + spec: &ChainSpec, + ) -> Vec { + self.collect_data(spec, |pubkey, _, fee_recipient| { + ValidatorRegistrationKey { + fee_recipient, + //TODO(sean) this is geth's default, we should make this configurable and maybe have the default be dynamic. + // Discussion here: https://github.com/ethereum/builder-specs/issues/17 + gas_limit: 30_000_000, + pubkey, + } + }) + } + + fn collect_data(&self, spec: &ChainSpec, map_fn: G) -> Vec + where + G: Fn(PublicKeyBytes, u64, Address) -> U, + { let log = self.context.log(); let fee_recipient_file = self @@ -234,10 +345,7 @@ impl PreparationService { .or(self.fee_recipient); if let Some(fee_recipient) = fee_recipient { - Some(ProposerPreparationData { - validator_index, - fee_recipient, - }) + Some(map_fn(pubkey, validator_index, fee_recipient)) } else { if spec.bellatrix_fork_epoch.is_some() { error!( @@ -284,4 +392,116 @@ impl PreparationService { } Ok(()) } + + /// Register validators with builders, used in the blinded block proposal flow. + async fn register_validators(&self, spec: &ChainSpec) -> Result<(), String> { + let registration_keys = self.collect_validator_registration_keys(spec); + + let mut changed_keys = vec![]; + + // Need to scope this so the read lock is not held across an await point (I don't know why + // but the explicit `drop` is not enough). + { + let guard = self.validator_registration_cache.read(); + for key in registration_keys.iter() { + if !guard.contains_key(key) { + changed_keys.push(key.clone()); + } + } + drop(guard); + } + + // Check if any have changed or it's been `EPOCHS_PER_VALIDATOR_REGISTRATION_SUBMISSION`. + if let Some(slot) = self.slot_clock.now() { + if slot % (E::slots_per_epoch() * EPOCHS_PER_VALIDATOR_REGISTRATION_SUBMISSION) == 0 { + self.publish_validator_registration_data(registration_keys) + .await?; + } else if !changed_keys.is_empty() { + self.publish_validator_registration_data(changed_keys) + .await?; + } + } + + Ok(()) + } + + async fn publish_validator_registration_data( + &self, + registration_keys: Vec, + ) -> Result<(), String> { + let log = self.context.log(); + + let registration_data_len = registration_keys.len(); + let mut signed = Vec::with_capacity(registration_data_len); + + for key in registration_keys { + let cached_registration_opt = + self.validator_registration_cache.read().get(&key).cloned(); + + let signed_data = if let Some(signed_data) = cached_registration_opt { + signed_data + } else { + let timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map_err(|e| format!("{e:?}"))? + .as_secs(); + + let ValidatorRegistrationKey { + fee_recipient, + gas_limit, + pubkey, + } = key.clone(); + + let signed_data = match self + .validator_store + .sign_validator_registration_data(ValidatorRegistrationData { + fee_recipient, + gas_limit, + timestamp, + pubkey, + }) + .await + { + Ok(data) => data, + Err(e) => { + error!(log, "Unable to sign validator registration data"; "error" => ?e, "pubkey" => ?pubkey); + continue; + } + }; + + self.validator_registration_cache + .write() + .insert(key, signed_data.clone()); + + signed_data + }; + signed.push(signed_data); + } + + if !signed.is_empty() { + let signed_ref = signed.as_slice(); + + match self + .beacon_nodes + .first_success(RequireSynced::Yes, |beacon_node| async move { + beacon_node + .post_validator_register_validator(signed_ref) + .await + }) + .await + { + Ok(()) => debug!( + log, + "Published validator registration"; + "count" => registration_data_len, + ), + Err(e) => error!( + log, + "Unable to publish validator registration"; + "error" => %e, + ), + } + } + Ok(()) + } } diff --git a/validator_client/src/signing_method.rs b/validator_client/src/signing_method.rs index 0daefc43c..de69d9900 100644 --- a/validator_client/src/signing_method.rs +++ b/validator_client/src/signing_method.rs @@ -30,6 +30,7 @@ pub enum Error { ShuttingDown, TokioJoin(String), MergeForkNotSupported, + GenesisForkVersionRequired, } /// Enumerates all messages that can be signed by a validator. @@ -45,6 +46,7 @@ pub enum SignableMessage<'a, T: EthSpec, Payload: ExecPayload = FullPayload), + ValidatorRegistration(&'a ValidatorRegistrationData), } impl<'a, T: EthSpec, Payload: ExecPayload> SignableMessage<'a, T, Payload> { @@ -64,6 +66,7 @@ impl<'a, T: EthSpec, Payload: ExecPayload> SignableMessage<'a, T, Payload> { beacon_block_root, .. } => beacon_block_root.signing_root(domain), SignableMessage::SignedContributionAndProof(c) => c.signing_root(domain), + SignableMessage::ValidatorRegistration(v) => v.signing_root(domain), } } } @@ -129,6 +132,22 @@ impl SigningMethod { let signing_root = signable_message.signing_root(domain_hash); + let fork_info = Some(ForkInfo { + fork, + genesis_validators_root, + }); + + self.get_signature_from_root(signable_message, signing_root, executor, fork_info) + .await + } + + pub async fn get_signature_from_root>( + &self, + signable_message: SignableMessage<'_, T, Payload>, + signing_root: Hash256, + executor: &TaskExecutor, + fork_info: Option, + ) -> Result { match self { SigningMethod::LocalKeystore { voting_keypair, .. } => { let _timer = @@ -181,21 +200,21 @@ impl SigningMethod { SignableMessage::SignedContributionAndProof(c) => { Web3SignerObject::ContributionAndProof(c) } + SignableMessage::ValidatorRegistration(v) => { + Web3SignerObject::ValidatorRegistration(v) + } }; // Determine the Web3Signer message type. let message_type = object.message_type(); - // The `fork_info` field is not required for deposits since they sign across the - // genesis fork version. - let fork_info = if let Web3SignerObject::Deposit { .. } = &object { - None - } else { - Some(ForkInfo { - fork, - genesis_validators_root, - }) - }; + if matches!( + object, + Web3SignerObject::Deposit { .. } | Web3SignerObject::ValidatorRegistration(_) + ) && fork_info.is_some() + { + return Err(Error::GenesisForkVersionRequired); + } let request = SigningRequest { message_type, diff --git a/validator_client/src/signing_method/web3signer.rs b/validator_client/src/signing_method/web3signer.rs index 9ac1655cc..0ab37484b 100644 --- a/validator_client/src/signing_method/web3signer.rs +++ b/validator_client/src/signing_method/web3signer.rs @@ -17,6 +17,7 @@ pub enum MessageType { SyncCommitteeMessage, SyncCommitteeSelectionProof, SyncCommitteeContributionAndProof, + ValidatorRegistration, } #[derive(Debug, PartialEq, Copy, Clone, Serialize)] @@ -64,6 +65,7 @@ pub enum Web3SignerObject<'a, T: EthSpec, Payload: ExecPayload> { }, SyncAggregatorSelectionData(&'a SyncAggregatorSelectionData), ContributionAndProof(&'a ContributionAndProof), + ValidatorRegistration(&'a ValidatorRegistrationData), } impl<'a, T: EthSpec, Payload: ExecPayload> Web3SignerObject<'a, T, Payload> { @@ -93,6 +95,7 @@ impl<'a, T: EthSpec, Payload: ExecPayload> Web3SignerObject<'a, T, Payload> { Web3SignerObject::ContributionAndProof(_) => { MessageType::SyncCommitteeContributionAndProof } + Web3SignerObject::ValidatorRegistration(_) => MessageType::ValidatorRegistration, } } } diff --git a/validator_client/src/validator_store.rs b/validator_client/src/validator_store.rs index b39ef9ef8..36ec5e895 100644 --- a/validator_client/src/validator_store.rs +++ b/validator_client/src/validator_store.rs @@ -20,9 +20,9 @@ use types::{ attestation::Error as AttestationError, graffiti::GraffitiString, Address, AggregateAndProof, Attestation, BeaconBlock, BlindedPayload, ChainSpec, ContributionAndProof, Domain, Epoch, EthSpec, ExecPayload, Fork, Graffiti, Hash256, Keypair, PublicKeyBytes, SelectionProof, - Signature, SignedAggregateAndProof, SignedBeaconBlock, SignedContributionAndProof, Slot, - SyncAggregatorSelectionData, SyncCommitteeContribution, SyncCommitteeMessage, - SyncSelectionProof, SyncSubnetId, + Signature, SignedAggregateAndProof, SignedBeaconBlock, SignedContributionAndProof, SignedRoot, + SignedValidatorRegistrationData, Slot, SyncAggregatorSelectionData, SyncCommitteeContribution, + SyncCommitteeMessage, SyncSelectionProof, SyncSubnetId, ValidatorRegistrationData, }; use validator_dir::ValidatorDir; @@ -524,6 +524,35 @@ impl ValidatorStore { } } + pub async fn sign_validator_registration_data( + &self, + validator_registration_data: ValidatorRegistrationData, + ) -> Result { + let domain_hash = self.spec.get_builder_domain(); + let signing_root = validator_registration_data.signing_root(domain_hash); + + let signing_method = + self.doppelganger_bypassed_signing_method(validator_registration_data.pubkey)?; + let signature = signing_method + .get_signature_from_root::>( + SignableMessage::ValidatorRegistration(&validator_registration_data), + signing_root, + &self.task_executor, + None, + ) + .await?; + + metrics::inc_counter_vec( + &metrics::SIGNED_VALIDATOR_REGISTRATIONS_TOTAL, + &[metrics::SUCCESS], + ); + + Ok(SignedValidatorRegistrationData { + message: validator_registration_data, + signature, + }) + } + /// Signs an `AggregateAndProof` for a given validator. /// /// The resulting `SignedAggregateAndProof` is sent on the aggregation channel and cannot be