From 2cd3e3a768688748d8a55dcb91bab7f40a0c71e3 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Fri, 16 Sep 2022 08:54:03 +0000 Subject: [PATCH] Avoid duplicate committee cache loads (#3574) ## Issue Addressed NA ## Proposed Changes I have observed scenarios on Goerli where Lighthouse was receiving attestations which reference the same, un-cached shuffling on multiple threads at the same time. Lighthouse was then loading the same state from database and determining the shuffling on multiple threads at the same time. This is unnecessary load on the disk and RAM. This PR modifies the shuffling cache so that each entry can be either: - A committee - A promise for a committee (i.e., a `crossbeam_channel::Receiver`) Now, in the scenario where we have thread A and thread B simultaneously requesting the same un-cached shuffling, we will have the following: 1. Thread A will take the write-lock on the shuffling cache, find that there's no cached committee and then create a "promise" (a `crossbeam_channel::Sender`) for a committee before dropping the write-lock. 1. Thread B will then be allowed to take the write-lock for the shuffling cache and find the promise created by thread A. It will block the current thread waiting for thread A to fulfill that promise. 1. Thread A will load the state from disk, obtain the shuffling, send it down the channel, insert the entry into the cache and then continue to verify the attestation. 1. Thread B will then receive the shuffling from the receiver, be un-blocked and then continue to verify the attestation. In the case where thread A fails to generate the shuffling and drops the sender, the next time that specific shuffling is requested we will detect that the channel is disconnected and return a `None` entry for that shuffling. This will cause the shuffling to be re-calculated. ## Additional Info NA --- Cargo.lock | 1 + beacon_node/beacon_chain/Cargo.toml | 1 + beacon_node/beacon_chain/src/beacon_chain.rs | 30 +- beacon_node/beacon_chain/src/errors.rs | 2 + beacon_node/beacon_chain/src/metrics.rs | 4 + .../beacon_chain/src/shuffling_cache.rs | 325 +++++++++++++++++- .../beacon_chain/src/state_advance_timer.rs | 2 +- .../beacon_processor/worker/gossip_methods.rs | 18 +- consensus/types/src/beacon_state.rs | 20 ++ 9 files changed, 381 insertions(+), 22 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 58c9ec2a7..4ca2739d1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -387,6 +387,7 @@ version = "0.2.0" dependencies = [ "bitvec 0.20.4", "bls", + "crossbeam-channel", "derivative", "environment", "eth1", diff --git a/beacon_node/beacon_chain/Cargo.toml b/beacon_node/beacon_chain/Cargo.toml index 092f3064d..43cbdf134 100644 --- a/beacon_node/beacon_chain/Cargo.toml +++ b/beacon_node/beacon_chain/Cargo.toml @@ -63,6 +63,7 @@ superstruct = "0.5.0" hex = "0.4.2" exit-future = "0.2.0" unused_port = {path = "../../common/unused_port"} +crossbeam-channel = "0.5.6" [[test]] name = "beacon_chain_tests" diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 077b425c0..609969a9a 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -2646,7 +2646,7 @@ impl BeaconChain { self.shuffling_cache .try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT) .ok_or(Error::AttestationCacheLockTimeout)? - .insert(shuffling_id, committee_cache); + .insert_committee_cache(shuffling_id, committee_cache); } } @@ -4490,9 +4490,18 @@ impl BeaconChain { metrics::stop_timer(cache_wait_timer); - if let Some(committee_cache) = shuffling_cache.get(&shuffling_id) { - map_fn(committee_cache, shuffling_id.shuffling_decision_block) + if let Some(cache_item) = shuffling_cache.get(&shuffling_id) { + let committee_cache = cache_item.wait()?; + map_fn(&committee_cache, shuffling_id.shuffling_decision_block) } else { + // Create an entry in the cache that "promises" this value will eventually be computed. + // This avoids the case where multiple threads attempt to produce the same value at the + // same time. + // + // Creating the promise whilst we hold the `shuffling_cache` lock will prevent the same + // promise from being created twice. + let sender = shuffling_cache.create_promise(shuffling_id.clone())?; + // Drop the shuffling cache to avoid holding the lock for any longer than // required. drop(shuffling_cache); @@ -4585,17 +4594,26 @@ impl BeaconChain { state.build_committee_cache(relative_epoch, &self.spec)?; - let committee_cache = state.committee_cache(relative_epoch)?; + let committee_cache = state.take_committee_cache(relative_epoch)?; + let committee_cache = Arc::new(committee_cache); let shuffling_decision_block = shuffling_id.shuffling_decision_block; self.shuffling_cache .try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT) .ok_or(Error::AttestationCacheLockTimeout)? - .insert(shuffling_id, committee_cache); + .insert_committee_cache(shuffling_id, &committee_cache); metrics::stop_timer(committee_building_timer); - map_fn(committee_cache, shuffling_decision_block) + if let Err(e) = sender.send(committee_cache.clone()) { + debug!( + self.log, + "Did not fulfil committee promise"; + "error" => %e + ) + } + + map_fn(&committee_cache, shuffling_decision_block) } } diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index 604fb6bea..8b547acf0 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -202,6 +202,8 @@ pub enum BeaconChainError { }, AttestationHeadNotInForkChoice(Hash256), MissingPersistedForkChoice, + CommitteeCacheWait(crossbeam_channel::RecvError), + MaxCommitteePromises(usize), } easy_from_to!(SlotProcessingError, BeaconChainError); diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index b454a6ff8..ead4a5402 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -254,6 +254,10 @@ lazy_static! { try_create_int_counter("beacon_shuffling_cache_hits_total", "Count of times shuffling cache fulfils request"); pub static ref SHUFFLING_CACHE_MISSES: Result = try_create_int_counter("beacon_shuffling_cache_misses_total", "Count of times shuffling cache fulfils request"); + pub static ref SHUFFLING_CACHE_PROMISE_HITS: Result = + try_create_int_counter("beacon_shuffling_cache_promise_hits_total", "Count of times shuffling cache returns a promise to future shuffling"); + pub static ref SHUFFLING_CACHE_PROMISE_FAILS: Result = + try_create_int_counter("beacon_shuffling_cache_promise_fails_total", "Count of times shuffling cache detects a failed promise"); /* * Early attester cache diff --git a/beacon_node/beacon_chain/src/shuffling_cache.rs b/beacon_node/beacon_chain/src/shuffling_cache.rs index 0bbd4419b..3fc5bebdf 100644 --- a/beacon_node/beacon_chain/src/shuffling_cache.rs +++ b/beacon_node/beacon_chain/src/shuffling_cache.rs @@ -1,5 +1,7 @@ -use crate::metrics; +use crate::{metrics, BeaconChainError}; +use crossbeam_channel::{bounded, Receiver, Sender, TryRecvError}; use lru::LruCache; +use std::sync::Arc; use types::{beacon_state::CommitteeCache, AttestationShufflingId, Epoch, Hash256}; /// The size of the LRU cache that stores committee caches for quicker verification. @@ -9,12 +11,46 @@ use types::{beacon_state::CommitteeCache, AttestationShufflingId, Epoch, Hash256 /// ignores a few extra bytes in the caches that should be insignificant compared to the indices). const CACHE_SIZE: usize = 16; +/// The maximum number of concurrent committee cache "promises" that can be issued. In effect, this +/// limits the number of concurrent states that can be loaded into memory for the committee cache. +/// This prevents excessive memory usage at the cost of rejecting some attestations. +/// +/// We set this value to 2 since states can be quite large and have a significant impact on memory +/// usage. A healthy network cannot have more than a few committee caches and those caches should +/// always be inserted during block import. Unstable networks with a high degree of forking might +/// see some attestations dropped due to this concurrency limit, however I propose that this is +/// better than low-resource nodes going OOM. +const MAX_CONCURRENT_PROMISES: usize = 2; + +#[derive(Clone)] +pub enum CacheItem { + /// A committee. + Committee(Arc), + /// A promise for a future committee. + Promise(Receiver>), +} + +impl CacheItem { + pub fn is_promise(&self) -> bool { + matches!(self, CacheItem::Promise(_)) + } + + pub fn wait(self) -> Result, BeaconChainError> { + match self { + CacheItem::Committee(cache) => Ok(cache), + CacheItem::Promise(receiver) => receiver + .recv() + .map_err(BeaconChainError::CommitteeCacheWait), + } + } +} + /// Provides an LRU cache for `CommitteeCache`. /// /// It has been named `ShufflingCache` because `CommitteeCacheCache` is a bit weird and looks like /// a find/replace error. pub struct ShufflingCache { - cache: LruCache, + cache: LruCache, } impl ShufflingCache { @@ -24,27 +60,114 @@ impl ShufflingCache { } } - pub fn get(&mut self, key: &AttestationShufflingId) -> Option<&CommitteeCache> { - let opt = self.cache.get(key); - - if opt.is_some() { - metrics::inc_counter(&metrics::SHUFFLING_CACHE_HITS); - } else { - metrics::inc_counter(&metrics::SHUFFLING_CACHE_MISSES); + pub fn get(&mut self, key: &AttestationShufflingId) -> Option { + match self.cache.get(key) { + // The cache contained the committee cache, return it. + item @ Some(CacheItem::Committee(_)) => { + metrics::inc_counter(&metrics::SHUFFLING_CACHE_HITS); + item.cloned() + } + // The cache contains a promise for the committee cache. Check to see if the promise has + // already been resolved, without waiting for it. + item @ Some(CacheItem::Promise(receiver)) => match receiver.try_recv() { + // The promise has already been resolved. Replace the entry in the cache with a + // `Committee` entry and then return the committee. + Ok(committee) => { + metrics::inc_counter(&metrics::SHUFFLING_CACHE_PROMISE_HITS); + metrics::inc_counter(&metrics::SHUFFLING_CACHE_HITS); + let ready = CacheItem::Committee(committee); + self.cache.put(key.clone(), ready.clone()); + Some(ready) + } + // The promise has not yet been resolved. Return the promise so the caller can await + // it. + Err(TryRecvError::Empty) => { + metrics::inc_counter(&metrics::SHUFFLING_CACHE_PROMISE_HITS); + metrics::inc_counter(&metrics::SHUFFLING_CACHE_HITS); + item.cloned() + } + // The sender has been dropped without sending a committee. There was most likely an + // error computing the committee cache. Drop the key from the cache and return + // `None` so the caller can recompute the committee. + // + // It's worth noting that this is the only place where we removed unresolved + // promises from the cache. This means unresolved promises will only be removed if + // we try to access them again. This is OK, since the promises don't consume much + // memory and the nature of the LRU cache means that future, relevant entries will + // still be added to the cache. We expect that *all* promises should be resolved, + // unless there is a programming or database error. + Err(TryRecvError::Disconnected) => { + metrics::inc_counter(&metrics::SHUFFLING_CACHE_PROMISE_FAILS); + metrics::inc_counter(&metrics::SHUFFLING_CACHE_MISSES); + self.cache.pop(key); + None + } + }, + // The cache does not have this committee and it's not already promised to be computed. + None => { + metrics::inc_counter(&metrics::SHUFFLING_CACHE_MISSES); + None + } } - - opt } pub fn contains(&self, key: &AttestationShufflingId) -> bool { self.cache.contains(key) } - pub fn insert(&mut self, key: AttestationShufflingId, committee_cache: &CommitteeCache) { - if !self.cache.contains(&key) { - self.cache.put(key, committee_cache.clone()); + pub fn insert_committee_cache( + &mut self, + key: AttestationShufflingId, + committee_cache: &T, + ) { + if self + .cache + .get(&key) + // Replace the committee if it's not present or if it's a promise. A bird in the hand is + // worth two in the promise-bush! + .map_or(true, CacheItem::is_promise) + { + self.cache.put( + key, + CacheItem::Committee(committee_cache.to_arc_committee_cache()), + ); } } + + pub fn create_promise( + &mut self, + key: AttestationShufflingId, + ) -> Result>, BeaconChainError> { + let num_active_promises = self + .cache + .iter() + .filter(|(_, item)| item.is_promise()) + .count(); + if num_active_promises >= MAX_CONCURRENT_PROMISES { + return Err(BeaconChainError::MaxCommitteePromises(num_active_promises)); + } + + let (sender, receiver) = bounded(1); + self.cache.put(key, CacheItem::Promise(receiver)); + Ok(sender) + } +} + +/// A helper trait to allow lazy-cloning of the committee cache when inserting into the cache. +pub trait ToArcCommitteeCache { + fn to_arc_committee_cache(&self) -> Arc; +} + +impl ToArcCommitteeCache for CommitteeCache { + fn to_arc_committee_cache(&self) -> Arc { + Arc::new(self.clone()) + } +} + +impl ToArcCommitteeCache for Arc { + fn to_arc_committee_cache(&self) -> Arc { + self.clone() + } } impl Default for ShufflingCache { @@ -79,3 +202,177 @@ impl BlockShufflingIds { } } } + +// Disable tests in debug since the beacon chain harness is slow unless in release. +#[cfg(not(debug_assertions))] +#[cfg(test)] +mod test { + use super::*; + use crate::test_utils::EphemeralHarnessType; + use types::*; + + type BeaconChainHarness = + crate::test_utils::BeaconChainHarness>; + + /// Returns two different committee caches for testing. + fn committee_caches() -> (Arc, Arc) { + let harness = BeaconChainHarness::builder(MinimalEthSpec) + .default_spec() + .deterministic_keypairs(8) + .fresh_ephemeral_store() + .build(); + let (mut state, _) = harness.get_current_state_and_root(); + state + .build_committee_cache(RelativeEpoch::Current, &harness.chain.spec) + .unwrap(); + state + .build_committee_cache(RelativeEpoch::Next, &harness.chain.spec) + .unwrap(); + let committee_a = state + .committee_cache(RelativeEpoch::Current) + .unwrap() + .clone(); + let committee_b = state.committee_cache(RelativeEpoch::Next).unwrap().clone(); + assert!(committee_a != committee_b); + (Arc::new(committee_a), Arc::new(committee_b)) + } + + /// Builds a deterministic but incoherent shuffling ID from a `u64`. + fn shuffling_id(id: u64) -> AttestationShufflingId { + AttestationShufflingId { + shuffling_epoch: id.into(), + shuffling_decision_block: Hash256::from_low_u64_be(id), + } + } + + #[test] + fn resolved_promise() { + let (committee_a, _) = committee_caches(); + let id_a = shuffling_id(1); + let mut cache = ShufflingCache::new(); + + // Create a promise. + let sender = cache.create_promise(id_a.clone()).unwrap(); + + // Retrieve the newly created promise. + let item = cache.get(&id_a).unwrap(); + assert!( + matches!(item, CacheItem::Promise(_)), + "the item should be a promise" + ); + + // Resolve the promise. + sender.send(committee_a.clone()).unwrap(); + + // Ensure the promise has been resolved. + let item = cache.get(&id_a).unwrap(); + assert!( + matches!(item, CacheItem::Committee(committee) if committee == committee_a), + "the promise should be resolved" + ); + assert_eq!(cache.cache.len(), 1, "the cache should have one entry"); + } + + #[test] + fn unresolved_promise() { + let id_a = shuffling_id(1); + let mut cache = ShufflingCache::new(); + + // Create a promise. + let sender = cache.create_promise(id_a.clone()).unwrap(); + + // Retrieve the newly created promise. + let item = cache.get(&id_a).unwrap(); + assert!( + matches!(item, CacheItem::Promise(_)), + "the item should be a promise" + ); + + // Drop the sender without resolving the promise, simulating an error computing the + // committee. + drop(sender); + + // Ensure the key now indicates an empty slot. + assert!(cache.get(&id_a).is_none(), "the slot should be empty"); + assert!(cache.cache.is_empty(), "the cache should be empty"); + } + + #[test] + fn two_promises() { + let (committee_a, committee_b) = committee_caches(); + let (id_a, id_b) = (shuffling_id(1), shuffling_id(2)); + let mut cache = ShufflingCache::new(); + + // Create promise A. + let sender_a = cache.create_promise(id_a.clone()).unwrap(); + + // Retrieve promise A. + let item = cache.get(&id_a).unwrap(); + assert!( + matches!(item, CacheItem::Promise(_)), + "item a should be a promise" + ); + + // Create promise B. + let sender_b = cache.create_promise(id_b.clone()).unwrap(); + + // Retrieve promise B. + let item = cache.get(&id_b).unwrap(); + assert!( + matches!(item, CacheItem::Promise(_)), + "item b should be a promise" + ); + + // Resolve promise A. + sender_a.send(committee_a.clone()).unwrap(); + // Ensure promise A has been resolved. + let item = cache.get(&id_a).unwrap(); + assert!( + matches!(item, CacheItem::Committee(committee) if committee == committee_a), + "promise A should be resolved" + ); + + // Resolve promise B. + sender_b.send(committee_b.clone()).unwrap(); + // Ensure promise B has been resolved. + let item = cache.get(&id_b).unwrap(); + assert!( + matches!(item, CacheItem::Committee(committee) if committee == committee_b), + "promise B should be resolved" + ); + + // Check both entries again. + assert!( + matches!(cache.get(&id_a).unwrap(), CacheItem::Committee(committee) if committee == committee_a), + "promise A should remain resolved" + ); + assert!( + matches!(cache.get(&id_b).unwrap(), CacheItem::Committee(committee) if committee == committee_b), + "promise B should remain resolved" + ); + assert_eq!(cache.cache.len(), 2, "the cache should have two entries"); + } + + #[test] + fn too_many_promises() { + let mut cache = ShufflingCache::new(); + + for i in 0..MAX_CONCURRENT_PROMISES { + cache.create_promise(shuffling_id(i as u64)).unwrap(); + } + + // Ensure that the next promise returns an error. It is important for the application to + // dump his ass when he can't keep his promises, you're a queen and you deserve better. + assert!(matches!( + cache.create_promise(shuffling_id(MAX_CONCURRENT_PROMISES as u64)), + Err(BeaconChainError::MaxCommitteePromises( + MAX_CONCURRENT_PROMISES + )) + )); + assert_eq!( + cache.cache.len(), + MAX_CONCURRENT_PROMISES, + "the cache should have two entries" + ); + } +} diff --git a/beacon_node/beacon_chain/src/state_advance_timer.rs b/beacon_node/beacon_chain/src/state_advance_timer.rs index 4359b6f1e..72fc973e5 100644 --- a/beacon_node/beacon_chain/src/state_advance_timer.rs +++ b/beacon_node/beacon_chain/src/state_advance_timer.rs @@ -394,7 +394,7 @@ fn advance_head( .shuffling_cache .try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT) .ok_or(BeaconChainError::AttestationCacheLockTimeout)? - .insert(shuffling_id.clone(), committee_cache); + .insert_committee_cache(shuffling_id.clone(), committee_cache); debug!( log, diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index 93ed1b463..307b569a9 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -1752,6 +1752,19 @@ impl Worker { debug!(self.log, "Attestation for finalized state"; "peer_id" => % peer_id); self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); } + e @ AttnError::BeaconChainError(BeaconChainError::MaxCommitteePromises(_)) => { + debug!( + self.log, + "Dropping attestation"; + "target_root" => ?failed_att.attestation().data.target.root, + "beacon_block_root" => ?beacon_block_root, + "slot" => ?failed_att.attestation().data.slot, + "type" => ?attestation_type, + "error" => ?e, + "peer_id" => % peer_id + ); + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); + } AttnError::BeaconChainError(e) => { /* * Lighthouse hit an unexpected error whilst processing the attestation. It @@ -1762,7 +1775,10 @@ impl Worker { */ error!( self.log, - "Unable to validate aggregate"; + "Unable to validate attestation"; + "beacon_block_root" => ?beacon_block_root, + "slot" => ?failed_att.attestation().data.slot, + "type" => ?attestation_type, "peer_id" => %peer_id, "error" => ?e, ); diff --git a/consensus/types/src/beacon_state.rs b/consensus/types/src/beacon_state.rs index fca200312..a5d00cdf2 100644 --- a/consensus/types/src/beacon_state.rs +++ b/consensus/types/src/beacon_state.rs @@ -1498,6 +1498,26 @@ impl BeaconState { } } + /// Returns the cache for some `RelativeEpoch`, replacing the existing cache with an + /// un-initialized cache. Returns an error if the existing cache has not been initialized. + pub fn take_committee_cache( + &mut self, + relative_epoch: RelativeEpoch, + ) -> Result { + let i = Self::committee_cache_index(relative_epoch); + let current_epoch = self.current_epoch(); + let cache = self + .committee_caches_mut() + .get_mut(i) + .ok_or(Error::CommitteeCachesOutOfBounds(i))?; + + if cache.is_initialized_at(relative_epoch.into_epoch(current_epoch)) { + Ok(mem::take(cache)) + } else { + Err(Error::CommitteeCacheUninitialized(Some(relative_epoch))) + } + } + /// Drops the cache, leaving it in an uninitialized state. pub fn drop_committee_cache(&mut self, relative_epoch: RelativeEpoch) -> Result<(), Error> { *self.committee_cache_at_index_mut(Self::committee_cache_index(relative_epoch))? =