From 64ad2af100ead7af752af843ee541c2068d04626 Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Fri, 17 Sep 2021 01:11:16 +0000 Subject: [PATCH] Subscribe to altair gossip topics 2 slots before fork (#2532) ## Issue Addressed N/A ## Proposed Changes Add a fork_digest to `ForkContext` only if it is set in the config. Reject gossip messages on post fork topics before the fork happens. Edit: Instead of rejecting gossip messages on post fork topics, we now subscribe to post fork topics 2 slots before the fork. Co-authored-by: Age Manning --- beacon_node/eth2_libp2p/src/behaviour/mod.rs | 9 +++ .../eth2_libp2p/src/rpc/codec/ssz_snappy.rs | 6 +- beacon_node/eth2_libp2p/tests/common/mod.rs | 8 ++- beacon_node/eth2_libp2p/tests/rpc_tests.rs | 37 ++++++++--- beacon_node/network/src/service.rs | 62 ++++++++++++++++--- consensus/types/src/chain_spec.rs | 42 +++++++++++-- consensus/types/src/fork_context.rs | 5 +- 7 files changed, 145 insertions(+), 24 deletions(-) diff --git a/beacon_node/eth2_libp2p/src/behaviour/mod.rs b/beacon_node/eth2_libp2p/src/behaviour/mod.rs index 8d0c9ee5b..864886174 100644 --- a/beacon_node/eth2_libp2p/src/behaviour/mod.rs +++ b/beacon_node/eth2_libp2p/src/behaviour/mod.rs @@ -326,6 +326,15 @@ impl Behaviour { self.unsubscribe(gossip_topic) } + /// Subscribe to all currently subscribed topics with the new fork digest. + pub fn subscribe_new_fork_topics(&mut self, new_fork_digest: [u8; 4]) { + let subscriptions = self.network_globals.gossipsub_subscriptions.read().clone(); + for mut topic in subscriptions.into_iter() { + topic.fork_digest = new_fork_digest; + self.subscribe(topic); + } + } + /// Unsubscribe from all topics that doesn't have the given fork_digest pub fn unsubscribe_from_fork_topics_except(&mut self, except: [u8; 4]) { let subscriptions = self.network_globals.gossipsub_subscriptions.read().clone(); diff --git a/beacon_node/eth2_libp2p/src/rpc/codec/ssz_snappy.rs b/beacon_node/eth2_libp2p/src/rpc/codec/ssz_snappy.rs index 915572fd1..6d5985931 100644 --- a/beacon_node/eth2_libp2p/src/rpc/codec/ssz_snappy.rs +++ b/beacon_node/eth2_libp2p/src/rpc/codec/ssz_snappy.rs @@ -615,7 +615,11 @@ mod tests { type Spec = types::MainnetEthSpec; fn fork_context() -> ForkContext { - ForkContext::new::(types::Slot::new(0), Hash256::zero(), &Spec::default_spec()) + let mut chain_spec = Spec::default_spec(); + // Set fork_epoch to `Some` to ensure that the `ForkContext` object + // includes altair in the list of forks + chain_spec.altair_fork_epoch = Some(types::Epoch::new(42)); + ForkContext::new::(types::Slot::new(0), Hash256::zero(), &chain_spec) } fn base_block() -> SignedBeaconBlock { diff --git a/beacon_node/eth2_libp2p/tests/common/mod.rs b/beacon_node/eth2_libp2p/tests/common/mod.rs index 8c28512d0..1023bbacd 100644 --- a/beacon_node/eth2_libp2p/tests/common/mod.rs +++ b/beacon_node/eth2_libp2p/tests/common/mod.rs @@ -11,14 +11,18 @@ use std::sync::Arc; use std::sync::Weak; use std::time::Duration; use tokio::runtime::Runtime; -use types::{ChainSpec, EnrForkId, ForkContext, Hash256, MinimalEthSpec}; +use types::{ChainSpec, EnrForkId, EthSpec, ForkContext, Hash256, MinimalEthSpec}; type E = MinimalEthSpec; use tempfile::Builder as TempBuilder; /// Returns a dummy fork context fn fork_context() -> ForkContext { - ForkContext::new::(types::Slot::new(0), Hash256::zero(), &ChainSpec::minimal()) + let mut chain_spec = E::default_spec(); + // Set fork_epoch to `Some` to ensure that the `ForkContext` object + // includes altair in the list of forks + chain_spec.altair_fork_epoch = Some(types::Epoch::new(42)); + ForkContext::new::(types::Slot::new(0), Hash256::zero(), &chain_spec) } pub struct Libp2pInstance(LibP2PService, exit_future::Signal); diff --git a/beacon_node/eth2_libp2p/tests/rpc_tests.rs b/beacon_node/eth2_libp2p/tests/rpc_tests.rs index 9d1faf748..1e5f1e4e1 100644 --- a/beacon_node/eth2_libp2p/tests/rpc_tests.rs +++ b/beacon_node/eth2_libp2p/tests/rpc_tests.rs @@ -138,11 +138,16 @@ fn test_blocks_by_range_chunked_rpc() { step: 0, }); - // BlocksByRange Response let spec = E::default_spec(); - let empty_block = BeaconBlock::empty(&spec); - let empty_signed = SignedBeaconBlock::from_block(empty_block, Signature::empty()); - let rpc_response = Response::BlocksByRange(Some(Box::new(empty_signed))); + + // BlocksByRange Response + let full_block = BeaconBlock::Base(BeaconBlockBase::::full(&spec)); + let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty()); + let rpc_response_base = Response::BlocksByRange(Some(Box::new(signed_full_block))); + + let full_block = BeaconBlock::Altair(BeaconBlockAltair::::full(&spec)); + let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty()); + let rpc_response_altair = Response::BlocksByRange(Some(Box::new(signed_full_block))); // keep count of the number of messages received let mut messages_received = 0; @@ -167,7 +172,11 @@ fn test_blocks_by_range_chunked_rpc() { warn!(log, "Sender received a response"); match response { Response::BlocksByRange(Some(_)) => { - assert_eq!(response, rpc_response.clone()); + if messages_received < 5 { + assert_eq!(response, rpc_response_base.clone()); + } else { + assert_eq!(response, rpc_response_altair.clone()); + } messages_received += 1; warn!(log, "Chunk received"); } @@ -197,7 +206,14 @@ fn test_blocks_by_range_chunked_rpc() { if request == rpc_request { // send the response warn!(log, "Receiver got request"); - for _ in 1..=messages_to_send { + for i in 0..messages_to_send { + // Send first half of responses as base blocks and + // second half as altair blocks. + let rpc_response = if i < 5 { + rpc_response_base.clone() + } else { + rpc_response_altair.clone() + }; receiver.swarm.behaviour_mut().send_successful_response( peer_id, id, @@ -481,7 +497,7 @@ fn test_blocks_by_root_chunked_rpc() { let log_level = Level::Debug; let enable_logging = false; - let messages_to_send = 3; + let messages_to_send = 10; let log = common::build_log(log_level, enable_logging); let spec = E::default_spec(); @@ -497,6 +513,13 @@ fn test_blocks_by_root_chunked_rpc() { Hash256::from_low_u64_be(0), Hash256::from_low_u64_be(0), Hash256::from_low_u64_be(0), + Hash256::from_low_u64_be(0), + Hash256::from_low_u64_be(0), + Hash256::from_low_u64_be(0), + Hash256::from_low_u64_be(0), + Hash256::from_low_u64_be(0), + Hash256::from_low_u64_be(0), + Hash256::from_low_u64_be(0), ]), }); diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 6dd0b58d1..0f905edfc 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -25,14 +25,16 @@ use task_executor::ShutdownReason; use tokio::sync::mpsc; use tokio::time::Sleep; use types::{ - EthSpec, ForkContext, ForkName, RelativeEpoch, SubnetId, SyncCommitteeSubscription, - SyncSubnetId, Unsigned, ValidatorSubscription, + ChainSpec, EthSpec, ForkContext, ForkName, RelativeEpoch, Slot, SubnetId, + SyncCommitteeSubscription, SyncSubnetId, Unsigned, ValidatorSubscription, }; mod tests; /// The interval (in seconds) that various network metrics will update. const METRIC_UPDATE_INTERVAL: u64 = 1; +/// Number of slots before the fork when we should subscribe to the new fork topics. +const SUBSCRIBE_DELAY_SLOTS: u64 = 2; /// Delay after a fork where we unsubscribe from pre-fork topics. const UNSUBSCRIBE_DELAY_EPOCHS: u64 = 2; @@ -129,6 +131,8 @@ pub struct NetworkService { discovery_auto_update: bool, /// A delay that expires when a new fork takes place. next_fork_update: Pin>>, + /// A delay that expires when we need to subscribe to a new fork's topics. + next_fork_subscriptions: Pin>>, /// A delay that expires when we need to unsubscribe from old fork topics. next_unsubscribe: Pin>>, /// Subscribe to all the subnets once synced. @@ -179,6 +183,7 @@ impl NetworkService { // keep track of when our fork_id needs to be updated let next_fork_update = Box::pin(next_fork_delay(&beacon_chain).into()); + let next_fork_subscriptions = Box::pin(next_fork_subscriptions_delay(&beacon_chain).into()); let next_unsubscribe = Box::pin(None.into()); let current_slot = beacon_chain @@ -192,6 +197,8 @@ impl NetworkService { &beacon_chain.spec, )); + debug!(network_log, "Current fork"; "fork_name" => ?fork_context.current_fork()); + // launch libp2p service let (network_globals, mut libp2p) = LibP2PService::new( executor.clone(), @@ -254,6 +261,7 @@ impl NetworkService { upnp_mappings: (None, None), discovery_auto_update: config.discv5_config.enr_update, next_fork_update, + next_fork_subscriptions, next_unsubscribe, subscribe_all_subnets: config.subscribe_all_subnets, shutdown_after_sync: config.shutdown_after_sync, @@ -274,12 +282,23 @@ impl NetworkService { /// digests since we should be subscribed to post fork topics before the fork. pub fn required_gossip_fork_digests(&self) -> Vec<[u8; 4]> { let fork_context = &self.fork_context; + let spec = &self.beacon_chain.spec; match fork_context.current_fork() { ForkName::Base => { - if fork_context.fork_exists(ForkName::Altair) { - fork_context.all_fork_digests() - } else { - vec![fork_context.genesis_context_bytes()] + // If we are SUBSCRIBE_DELAY_SLOTS before the fork slot, subscribe only to Base, + // else subscribe to Base and Altair. + let current_slot = self.beacon_chain.slot().unwrap_or(spec.genesis_slot); + match spec.next_fork_epoch::(current_slot) { + Some((_, fork_epoch)) => { + if current_slot.saturating_add(Slot::new(SUBSCRIBE_DELAY_SLOTS)) + >= fork_epoch.start_slot(T::EthSpec::slots_per_epoch()) + { + fork_context.all_fork_digests() + } else { + vec![fork_context.genesis_context_bytes()] + } + } + None => vec![fork_context.genesis_context_bytes()], } } ForkName::Altair => vec![fork_context @@ -619,6 +638,7 @@ fn spawn_service( } => { // Update prometheus metrics. metrics::expose_receive_metrics(&message); + match message { // attestation information gets processed in the attestation service PubsubMessage::Attestation(ref subnet_and_attestation) => { @@ -671,7 +691,7 @@ fn spawn_service( if let Some(new_fork_name) = fork_context.from_context_bytes(new_enr_fork_id.fork_digest) { info!( service.log, - "Updating enr fork version"; + "Transitioned to new fork"; "old_fork" => ?fork_context.current_fork(), "new_fork" => ?new_fork_name, ); @@ -701,6 +721,18 @@ fn spawn_service( info!(service.log, "Unsubscribed from old fork topics"); service.next_unsubscribe = Box::pin(None.into()); } + Some(_) = &mut service.next_fork_subscriptions => { + if let Some((fork_name, _)) = service.beacon_chain.duration_to_next_fork() { + let fork_version = service.beacon_chain.spec.fork_version_for_name(fork_name); + let fork_digest = ChainSpec::compute_fork_digest(fork_version, service.beacon_chain.genesis_validators_root); + info!(service.log, "Subscribing to new fork topics"); + service.libp2p.swarm.behaviour_mut().subscribe_new_fork_topics(fork_digest); + } + else { + error!(service.log, "Fork subscription scheduled but no fork scheduled"); + } + service.next_fork_subscriptions = Box::pin(next_fork_subscriptions_delay(&service.beacon_chain).into()); + } } metrics::update_bandwidth_metrics(service.libp2p.bandwidth.clone()); } @@ -717,6 +749,22 @@ fn next_fork_delay( .map(|(_, until_fork)| tokio::time::sleep(until_fork)) } +/// Returns a `Sleep` that triggers `SUBSCRIBE_DELAY_SLOTS` before the next fork. +/// Returns `None` if there are no scheduled forks or we are already past `current_slot + SUBSCRIBE_DELAY_SLOTS > fork_slot`. +fn next_fork_subscriptions_delay( + beacon_chain: &BeaconChain, +) -> Option { + if let Some((_, duration_to_fork)) = beacon_chain.duration_to_next_fork() { + let duration_to_subscription = duration_to_fork.saturating_sub(Duration::from_secs( + beacon_chain.spec.seconds_per_slot * SUBSCRIBE_DELAY_SLOTS, + )); + if !duration_to_subscription.is_zero() { + return Some(tokio::time::sleep(duration_to_subscription)); + } + } + None +} + impl Drop for NetworkService { fn drop(&mut self) { // network thread is terminating diff --git a/consensus/types/src/chain_spec.rs b/consensus/types/src/chain_spec.rs index 7c0c348fd..69bcdd735 100644 --- a/consensus/types/src/chain_spec.rs +++ b/consensus/types/src/chain_spec.rs @@ -1,7 +1,8 @@ use crate::*; use eth2_serde_utils::quoted_u64::MaybeQuoted; use int_to_bytes::int_to_bytes4; -use serde_derive::{Deserialize, Serialize}; +use serde::{Deserializer, Serialize, Serializer}; +use serde_derive::Deserialize; use std::fs::File; use std::path::Path; use tree_hash::TreeHash; @@ -467,7 +468,7 @@ impl ChainSpec { domain_sync_committee_selection_proof: 8, domain_contribution_and_proof: 9, altair_fork_version: [0x01, 0x00, 0x00, 0x00], - altair_fork_epoch: Some(Epoch::new(u64::MAX)), + altair_fork_epoch: None, /* * Network specific @@ -506,7 +507,7 @@ impl ChainSpec { // Altair epochs_per_sync_committee_period: Epoch::new(8), altair_fork_version: [0x01, 0x00, 0x00, 0x01], - altair_fork_epoch: Some(Epoch::new(u64::MAX)), + altair_fork_epoch: None, // Other network_id: 2, // lighthouse testnet network id deposit_chain_id: 5, @@ -544,7 +545,9 @@ pub struct Config { #[serde(with = "eth2_serde_utils::bytes_4_hex")] altair_fork_version: [u8; 4], - altair_fork_epoch: Option>, + #[serde(serialize_with = "serialize_fork_epoch")] + #[serde(deserialize_with = "deserialize_fork_epoch")] + pub altair_fork_epoch: Option>, #[serde(with = "eth2_serde_utils::quoted_u64")] seconds_per_slot: u64, @@ -582,6 +585,35 @@ impl Default for Config { } } +/// Util function to serialize a `None` fork epoch value +/// as `Epoch::max_value()`. +fn serialize_fork_epoch(val: &Option>, s: S) -> Result +where + S: Serializer, +{ + match val { + None => MaybeQuoted { + value: Epoch::max_value(), + } + .serialize(s), + Some(epoch) => epoch.serialize(s), + } +} + +/// Util function to deserialize a u64::max() fork epoch as `None`. +fn deserialize_fork_epoch<'de, D>(deserializer: D) -> Result>, D::Error> +where + D: Deserializer<'de>, +{ + let decoded: Option> = serde::de::Deserialize::deserialize(deserializer)?; + if let Some(fork_epoch) = decoded { + if fork_epoch.value != Epoch::max_value() { + return Ok(Some(fork_epoch)); + } + } + Ok(None) +} + impl Config { /// Maps `self` to an identifier for an `EthSpec` instance. /// @@ -606,7 +638,7 @@ impl Config { altair_fork_version: spec.altair_fork_version, altair_fork_epoch: spec .altair_fork_epoch - .map(|slot| MaybeQuoted { value: slot }), + .map(|epoch| MaybeQuoted { value: epoch }), seconds_per_slot: spec.seconds_per_slot, seconds_per_eth1_block: spec.seconds_per_eth1_block, diff --git a/consensus/types/src/fork_context.rs b/consensus/types/src/fork_context.rs index 6da188570..1d488f769 100644 --- a/consensus/types/src/fork_context.rs +++ b/consensus/types/src/fork_context.rs @@ -26,12 +26,13 @@ impl ForkContext { ChainSpec::compute_fork_digest(spec.genesis_fork_version, genesis_validators_root), )]; - // Only add Altair to list of forks if it's enabled (i.e. spec.altair_fork_epoch != None) + // Only add Altair to list of forks if it's enabled + // Note: `altair_fork_epoch == None` implies altair hasn't been activated yet on the config. if spec.altair_fork_epoch.is_some() { fork_to_digest.push(( ForkName::Altair, ChainSpec::compute_fork_digest(spec.altair_fork_version, genesis_validators_root), - )) + )); } let fork_to_digest: HashMap = fork_to_digest.into_iter().collect();