diff --git a/Cargo.lock b/Cargo.lock index 58c9ec2a7..4ca2739d1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -387,6 +387,7 @@ version = "0.2.0" dependencies = [ "bitvec 0.20.4", "bls", + "crossbeam-channel", "derivative", "environment", "eth1", diff --git a/beacon_node/beacon_chain/Cargo.toml b/beacon_node/beacon_chain/Cargo.toml index 092f3064d..43cbdf134 100644 --- a/beacon_node/beacon_chain/Cargo.toml +++ b/beacon_node/beacon_chain/Cargo.toml @@ -63,6 +63,7 @@ superstruct = "0.5.0" hex = "0.4.2" exit-future = "0.2.0" unused_port = {path = "../../common/unused_port"} +crossbeam-channel = "0.5.6" [[test]] name = "beacon_chain_tests" diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 077b425c0..609969a9a 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -2646,7 +2646,7 @@ impl BeaconChain { self.shuffling_cache .try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT) .ok_or(Error::AttestationCacheLockTimeout)? - .insert(shuffling_id, committee_cache); + .insert_committee_cache(shuffling_id, committee_cache); } } @@ -4490,9 +4490,18 @@ impl BeaconChain { metrics::stop_timer(cache_wait_timer); - if let Some(committee_cache) = shuffling_cache.get(&shuffling_id) { - map_fn(committee_cache, shuffling_id.shuffling_decision_block) + if let Some(cache_item) = shuffling_cache.get(&shuffling_id) { + let committee_cache = cache_item.wait()?; + map_fn(&committee_cache, shuffling_id.shuffling_decision_block) } else { + // Create an entry in the cache that "promises" this value will eventually be computed. + // This avoids the case where multiple threads attempt to produce the same value at the + // same time. + // + // Creating the promise whilst we hold the `shuffling_cache` lock will prevent the same + // promise from being created twice. + let sender = shuffling_cache.create_promise(shuffling_id.clone())?; + // Drop the shuffling cache to avoid holding the lock for any longer than // required. drop(shuffling_cache); @@ -4585,17 +4594,26 @@ impl BeaconChain { state.build_committee_cache(relative_epoch, &self.spec)?; - let committee_cache = state.committee_cache(relative_epoch)?; + let committee_cache = state.take_committee_cache(relative_epoch)?; + let committee_cache = Arc::new(committee_cache); let shuffling_decision_block = shuffling_id.shuffling_decision_block; self.shuffling_cache .try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT) .ok_or(Error::AttestationCacheLockTimeout)? - .insert(shuffling_id, committee_cache); + .insert_committee_cache(shuffling_id, &committee_cache); metrics::stop_timer(committee_building_timer); - map_fn(committee_cache, shuffling_decision_block) + if let Err(e) = sender.send(committee_cache.clone()) { + debug!( + self.log, + "Did not fulfil committee promise"; + "error" => %e + ) + } + + map_fn(&committee_cache, shuffling_decision_block) } } diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index 604fb6bea..8b547acf0 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -202,6 +202,8 @@ pub enum BeaconChainError { }, AttestationHeadNotInForkChoice(Hash256), MissingPersistedForkChoice, + CommitteeCacheWait(crossbeam_channel::RecvError), + MaxCommitteePromises(usize), } easy_from_to!(SlotProcessingError, BeaconChainError); diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index b454a6ff8..ead4a5402 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -254,6 +254,10 @@ lazy_static! { try_create_int_counter("beacon_shuffling_cache_hits_total", "Count of times shuffling cache fulfils request"); pub static ref SHUFFLING_CACHE_MISSES: Result = try_create_int_counter("beacon_shuffling_cache_misses_total", "Count of times shuffling cache fulfils request"); + pub static ref SHUFFLING_CACHE_PROMISE_HITS: Result = + try_create_int_counter("beacon_shuffling_cache_promise_hits_total", "Count of times shuffling cache returns a promise to future shuffling"); + pub static ref SHUFFLING_CACHE_PROMISE_FAILS: Result = + try_create_int_counter("beacon_shuffling_cache_promise_fails_total", "Count of times shuffling cache detects a failed promise"); /* * Early attester cache diff --git a/beacon_node/beacon_chain/src/shuffling_cache.rs b/beacon_node/beacon_chain/src/shuffling_cache.rs index 0bbd4419b..3fc5bebdf 100644 --- a/beacon_node/beacon_chain/src/shuffling_cache.rs +++ b/beacon_node/beacon_chain/src/shuffling_cache.rs @@ -1,5 +1,7 @@ -use crate::metrics; +use crate::{metrics, BeaconChainError}; +use crossbeam_channel::{bounded, Receiver, Sender, TryRecvError}; use lru::LruCache; +use std::sync::Arc; use types::{beacon_state::CommitteeCache, AttestationShufflingId, Epoch, Hash256}; /// The size of the LRU cache that stores committee caches for quicker verification. @@ -9,12 +11,46 @@ use types::{beacon_state::CommitteeCache, AttestationShufflingId, Epoch, Hash256 /// ignores a few extra bytes in the caches that should be insignificant compared to the indices). const CACHE_SIZE: usize = 16; +/// The maximum number of concurrent committee cache "promises" that can be issued. In effect, this +/// limits the number of concurrent states that can be loaded into memory for the committee cache. +/// This prevents excessive memory usage at the cost of rejecting some attestations. +/// +/// We set this value to 2 since states can be quite large and have a significant impact on memory +/// usage. A healthy network cannot have more than a few committee caches and those caches should +/// always be inserted during block import. Unstable networks with a high degree of forking might +/// see some attestations dropped due to this concurrency limit, however I propose that this is +/// better than low-resource nodes going OOM. +const MAX_CONCURRENT_PROMISES: usize = 2; + +#[derive(Clone)] +pub enum CacheItem { + /// A committee. + Committee(Arc), + /// A promise for a future committee. + Promise(Receiver>), +} + +impl CacheItem { + pub fn is_promise(&self) -> bool { + matches!(self, CacheItem::Promise(_)) + } + + pub fn wait(self) -> Result, BeaconChainError> { + match self { + CacheItem::Committee(cache) => Ok(cache), + CacheItem::Promise(receiver) => receiver + .recv() + .map_err(BeaconChainError::CommitteeCacheWait), + } + } +} + /// Provides an LRU cache for `CommitteeCache`. /// /// It has been named `ShufflingCache` because `CommitteeCacheCache` is a bit weird and looks like /// a find/replace error. pub struct ShufflingCache { - cache: LruCache, + cache: LruCache, } impl ShufflingCache { @@ -24,27 +60,114 @@ impl ShufflingCache { } } - pub fn get(&mut self, key: &AttestationShufflingId) -> Option<&CommitteeCache> { - let opt = self.cache.get(key); - - if opt.is_some() { - metrics::inc_counter(&metrics::SHUFFLING_CACHE_HITS); - } else { - metrics::inc_counter(&metrics::SHUFFLING_CACHE_MISSES); + pub fn get(&mut self, key: &AttestationShufflingId) -> Option { + match self.cache.get(key) { + // The cache contained the committee cache, return it. + item @ Some(CacheItem::Committee(_)) => { + metrics::inc_counter(&metrics::SHUFFLING_CACHE_HITS); + item.cloned() + } + // The cache contains a promise for the committee cache. Check to see if the promise has + // already been resolved, without waiting for it. + item @ Some(CacheItem::Promise(receiver)) => match receiver.try_recv() { + // The promise has already been resolved. Replace the entry in the cache with a + // `Committee` entry and then return the committee. + Ok(committee) => { + metrics::inc_counter(&metrics::SHUFFLING_CACHE_PROMISE_HITS); + metrics::inc_counter(&metrics::SHUFFLING_CACHE_HITS); + let ready = CacheItem::Committee(committee); + self.cache.put(key.clone(), ready.clone()); + Some(ready) + } + // The promise has not yet been resolved. Return the promise so the caller can await + // it. + Err(TryRecvError::Empty) => { + metrics::inc_counter(&metrics::SHUFFLING_CACHE_PROMISE_HITS); + metrics::inc_counter(&metrics::SHUFFLING_CACHE_HITS); + item.cloned() + } + // The sender has been dropped without sending a committee. There was most likely an + // error computing the committee cache. Drop the key from the cache and return + // `None` so the caller can recompute the committee. + // + // It's worth noting that this is the only place where we removed unresolved + // promises from the cache. This means unresolved promises will only be removed if + // we try to access them again. This is OK, since the promises don't consume much + // memory and the nature of the LRU cache means that future, relevant entries will + // still be added to the cache. We expect that *all* promises should be resolved, + // unless there is a programming or database error. + Err(TryRecvError::Disconnected) => { + metrics::inc_counter(&metrics::SHUFFLING_CACHE_PROMISE_FAILS); + metrics::inc_counter(&metrics::SHUFFLING_CACHE_MISSES); + self.cache.pop(key); + None + } + }, + // The cache does not have this committee and it's not already promised to be computed. + None => { + metrics::inc_counter(&metrics::SHUFFLING_CACHE_MISSES); + None + } } - - opt } pub fn contains(&self, key: &AttestationShufflingId) -> bool { self.cache.contains(key) } - pub fn insert(&mut self, key: AttestationShufflingId, committee_cache: &CommitteeCache) { - if !self.cache.contains(&key) { - self.cache.put(key, committee_cache.clone()); + pub fn insert_committee_cache( + &mut self, + key: AttestationShufflingId, + committee_cache: &T, + ) { + if self + .cache + .get(&key) + // Replace the committee if it's not present or if it's a promise. A bird in the hand is + // worth two in the promise-bush! + .map_or(true, CacheItem::is_promise) + { + self.cache.put( + key, + CacheItem::Committee(committee_cache.to_arc_committee_cache()), + ); } } + + pub fn create_promise( + &mut self, + key: AttestationShufflingId, + ) -> Result>, BeaconChainError> { + let num_active_promises = self + .cache + .iter() + .filter(|(_, item)| item.is_promise()) + .count(); + if num_active_promises >= MAX_CONCURRENT_PROMISES { + return Err(BeaconChainError::MaxCommitteePromises(num_active_promises)); + } + + let (sender, receiver) = bounded(1); + self.cache.put(key, CacheItem::Promise(receiver)); + Ok(sender) + } +} + +/// A helper trait to allow lazy-cloning of the committee cache when inserting into the cache. +pub trait ToArcCommitteeCache { + fn to_arc_committee_cache(&self) -> Arc; +} + +impl ToArcCommitteeCache for CommitteeCache { + fn to_arc_committee_cache(&self) -> Arc { + Arc::new(self.clone()) + } +} + +impl ToArcCommitteeCache for Arc { + fn to_arc_committee_cache(&self) -> Arc { + self.clone() + } } impl Default for ShufflingCache { @@ -79,3 +202,177 @@ impl BlockShufflingIds { } } } + +// Disable tests in debug since the beacon chain harness is slow unless in release. +#[cfg(not(debug_assertions))] +#[cfg(test)] +mod test { + use super::*; + use crate::test_utils::EphemeralHarnessType; + use types::*; + + type BeaconChainHarness = + crate::test_utils::BeaconChainHarness>; + + /// Returns two different committee caches for testing. + fn committee_caches() -> (Arc, Arc) { + let harness = BeaconChainHarness::builder(MinimalEthSpec) + .default_spec() + .deterministic_keypairs(8) + .fresh_ephemeral_store() + .build(); + let (mut state, _) = harness.get_current_state_and_root(); + state + .build_committee_cache(RelativeEpoch::Current, &harness.chain.spec) + .unwrap(); + state + .build_committee_cache(RelativeEpoch::Next, &harness.chain.spec) + .unwrap(); + let committee_a = state + .committee_cache(RelativeEpoch::Current) + .unwrap() + .clone(); + let committee_b = state.committee_cache(RelativeEpoch::Next).unwrap().clone(); + assert!(committee_a != committee_b); + (Arc::new(committee_a), Arc::new(committee_b)) + } + + /// Builds a deterministic but incoherent shuffling ID from a `u64`. + fn shuffling_id(id: u64) -> AttestationShufflingId { + AttestationShufflingId { + shuffling_epoch: id.into(), + shuffling_decision_block: Hash256::from_low_u64_be(id), + } + } + + #[test] + fn resolved_promise() { + let (committee_a, _) = committee_caches(); + let id_a = shuffling_id(1); + let mut cache = ShufflingCache::new(); + + // Create a promise. + let sender = cache.create_promise(id_a.clone()).unwrap(); + + // Retrieve the newly created promise. + let item = cache.get(&id_a).unwrap(); + assert!( + matches!(item, CacheItem::Promise(_)), + "the item should be a promise" + ); + + // Resolve the promise. + sender.send(committee_a.clone()).unwrap(); + + // Ensure the promise has been resolved. + let item = cache.get(&id_a).unwrap(); + assert!( + matches!(item, CacheItem::Committee(committee) if committee == committee_a), + "the promise should be resolved" + ); + assert_eq!(cache.cache.len(), 1, "the cache should have one entry"); + } + + #[test] + fn unresolved_promise() { + let id_a = shuffling_id(1); + let mut cache = ShufflingCache::new(); + + // Create a promise. + let sender = cache.create_promise(id_a.clone()).unwrap(); + + // Retrieve the newly created promise. + let item = cache.get(&id_a).unwrap(); + assert!( + matches!(item, CacheItem::Promise(_)), + "the item should be a promise" + ); + + // Drop the sender without resolving the promise, simulating an error computing the + // committee. + drop(sender); + + // Ensure the key now indicates an empty slot. + assert!(cache.get(&id_a).is_none(), "the slot should be empty"); + assert!(cache.cache.is_empty(), "the cache should be empty"); + } + + #[test] + fn two_promises() { + let (committee_a, committee_b) = committee_caches(); + let (id_a, id_b) = (shuffling_id(1), shuffling_id(2)); + let mut cache = ShufflingCache::new(); + + // Create promise A. + let sender_a = cache.create_promise(id_a.clone()).unwrap(); + + // Retrieve promise A. + let item = cache.get(&id_a).unwrap(); + assert!( + matches!(item, CacheItem::Promise(_)), + "item a should be a promise" + ); + + // Create promise B. + let sender_b = cache.create_promise(id_b.clone()).unwrap(); + + // Retrieve promise B. + let item = cache.get(&id_b).unwrap(); + assert!( + matches!(item, CacheItem::Promise(_)), + "item b should be a promise" + ); + + // Resolve promise A. + sender_a.send(committee_a.clone()).unwrap(); + // Ensure promise A has been resolved. + let item = cache.get(&id_a).unwrap(); + assert!( + matches!(item, CacheItem::Committee(committee) if committee == committee_a), + "promise A should be resolved" + ); + + // Resolve promise B. + sender_b.send(committee_b.clone()).unwrap(); + // Ensure promise B has been resolved. + let item = cache.get(&id_b).unwrap(); + assert!( + matches!(item, CacheItem::Committee(committee) if committee == committee_b), + "promise B should be resolved" + ); + + // Check both entries again. + assert!( + matches!(cache.get(&id_a).unwrap(), CacheItem::Committee(committee) if committee == committee_a), + "promise A should remain resolved" + ); + assert!( + matches!(cache.get(&id_b).unwrap(), CacheItem::Committee(committee) if committee == committee_b), + "promise B should remain resolved" + ); + assert_eq!(cache.cache.len(), 2, "the cache should have two entries"); + } + + #[test] + fn too_many_promises() { + let mut cache = ShufflingCache::new(); + + for i in 0..MAX_CONCURRENT_PROMISES { + cache.create_promise(shuffling_id(i as u64)).unwrap(); + } + + // Ensure that the next promise returns an error. It is important for the application to + // dump his ass when he can't keep his promises, you're a queen and you deserve better. + assert!(matches!( + cache.create_promise(shuffling_id(MAX_CONCURRENT_PROMISES as u64)), + Err(BeaconChainError::MaxCommitteePromises( + MAX_CONCURRENT_PROMISES + )) + )); + assert_eq!( + cache.cache.len(), + MAX_CONCURRENT_PROMISES, + "the cache should have two entries" + ); + } +} diff --git a/beacon_node/beacon_chain/src/state_advance_timer.rs b/beacon_node/beacon_chain/src/state_advance_timer.rs index 4359b6f1e..72fc973e5 100644 --- a/beacon_node/beacon_chain/src/state_advance_timer.rs +++ b/beacon_node/beacon_chain/src/state_advance_timer.rs @@ -394,7 +394,7 @@ fn advance_head( .shuffling_cache .try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT) .ok_or(BeaconChainError::AttestationCacheLockTimeout)? - .insert(shuffling_id.clone(), committee_cache); + .insert_committee_cache(shuffling_id.clone(), committee_cache); debug!( log, diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index 93ed1b463..307b569a9 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -1752,6 +1752,19 @@ impl Worker { debug!(self.log, "Attestation for finalized state"; "peer_id" => % peer_id); self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); } + e @ AttnError::BeaconChainError(BeaconChainError::MaxCommitteePromises(_)) => { + debug!( + self.log, + "Dropping attestation"; + "target_root" => ?failed_att.attestation().data.target.root, + "beacon_block_root" => ?beacon_block_root, + "slot" => ?failed_att.attestation().data.slot, + "type" => ?attestation_type, + "error" => ?e, + "peer_id" => % peer_id + ); + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); + } AttnError::BeaconChainError(e) => { /* * Lighthouse hit an unexpected error whilst processing the attestation. It @@ -1762,7 +1775,10 @@ impl Worker { */ error!( self.log, - "Unable to validate aggregate"; + "Unable to validate attestation"; + "beacon_block_root" => ?beacon_block_root, + "slot" => ?failed_att.attestation().data.slot, + "type" => ?attestation_type, "peer_id" => %peer_id, "error" => ?e, ); diff --git a/consensus/types/src/beacon_state.rs b/consensus/types/src/beacon_state.rs index fca200312..a5d00cdf2 100644 --- a/consensus/types/src/beacon_state.rs +++ b/consensus/types/src/beacon_state.rs @@ -1498,6 +1498,26 @@ impl BeaconState { } } + /// Returns the cache for some `RelativeEpoch`, replacing the existing cache with an + /// un-initialized cache. Returns an error if the existing cache has not been initialized. + pub fn take_committee_cache( + &mut self, + relative_epoch: RelativeEpoch, + ) -> Result { + let i = Self::committee_cache_index(relative_epoch); + let current_epoch = self.current_epoch(); + let cache = self + .committee_caches_mut() + .get_mut(i) + .ok_or(Error::CommitteeCachesOutOfBounds(i))?; + + if cache.is_initialized_at(relative_epoch.into_epoch(current_epoch)) { + Ok(mem::take(cache)) + } else { + Err(Error::CommitteeCacheUninitialized(Some(relative_epoch))) + } + } + /// Drops the cache, leaving it in an uninitialized state. pub fn drop_committee_cache(&mut self, relative_epoch: RelativeEpoch) -> Result<(), Error> { *self.committee_cache_at_index_mut(Self::committee_cache_index(relative_epoch))? =