diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 0becbf2c9..fed48036d 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -54,6 +54,12 @@ pub enum BlockProcessingOutcome { PerBlockProcessingError(BlockProcessingError), } +#[derive(Debug, PartialEq)] +pub enum AttestationProcessingOutcome { + Processed, + UnknownHeadBlock { beacon_block_root: Hash256 }, +} + pub trait BeaconChainTypes { type Store: store::Store; type SlotClock: slot_clock::SlotClock; @@ -511,28 +517,114 @@ impl BeaconChain { /// /// If valid, the attestation is added to the `op_pool` and aggregated with another attestation /// if possible. - pub fn process_attestation(&self, attestation: Attestation) -> Result<(), Error> { + pub fn process_attestation( + &self, + attestation: Attestation, + ) -> Result { + // From the store, load the attestation's "head block". + // + // An honest validator would have set this block to be the head of the chain (i.e., the + // result of running fork choice). + if let Some(attestation_head_block) = self + .store + .get::>(&attestation.data.beacon_block_root)? + { + // Attempt to process the attestation using the `self.head()` state. + // + // This is purely an effort to avoid loading a `BeaconState` unnecessarily from the DB. + let outcome: Option> = { + // Take a read lock on the head beacon state. + // + // The purpose of this whole `let processed ...` block is to ensure that the read + // lock is dropped if we don't end up using the head beacon state. + let state = &self.head().beacon_state; + + // If it turns out that the attestation was made using the head state, then there + // is no need to load a state from the database to process the attestation. + if state.current_epoch() == attestation_head_block.epoch() + && state + .get_block_root(attestation_head_block.slot) + .map(|root| *root == attestation.data.beacon_block_root) + .unwrap_or_else(|_| false) + { + // The head state is able to be used to validate this attestation. No need to load + // anything from the database. + Some(self.process_attestation_for_state_and_block( + attestation.clone(), + state, + &attestation_head_block, + )) + } else { + None + } + }; + + // TODO: we could try and see if the "speculative state" (e.g., self.state) can support + // this, without needing to load it from the db. + + if let Some(result) = outcome { + result + } else { + // The state required to verify this attestation must be loaded from the database. + let mut state: BeaconState = self + .store + .get(&attestation_head_block.state_root)? + .ok_or_else(|| Error::MissingBeaconState(attestation_head_block.state_root))?; + + // Ensure the state loaded from the database matches the state of the attestation + // head block. + for _ in state.slot.as_u64()..attestation_head_block.slot.as_u64() { + per_slot_processing(&mut state, &self.spec)?; + } + + self.process_attestation_for_state_and_block( + attestation, + &state, + &attestation_head_block, + ) + } + } else { + // Reject any block where we have not processed `attestation.data.beacon_block_root`. + // + // This is likely overly restrictive, we could store the attestation for later + // processing. + warn!( + self.log, + "Dropping attestation for unknown block"; + "block" => format!("{}", attestation.data.beacon_block_root) + ); + Ok(AttestationProcessingOutcome::UnknownHeadBlock { + beacon_block_root: attestation.data.beacon_block_root, + }) + } + } + + fn process_attestation_for_state_and_block( + &self, + attestation: Attestation, + state: &BeaconState, + head_block: &BeaconBlock, + ) -> Result { self.metrics.attestation_processing_requests.inc(); let timer = self.metrics.attestation_processing_times.start_timer(); - if let Some(state) = self.get_attestation_state(&attestation) { - if self - .fork_choice - .should_process_attestation(&state, &attestation)? - { - let indexed_attestation = common::get_indexed_attestation(&state, &attestation)?; - per_block_processing::is_valid_indexed_attestation( - &state, - &indexed_attestation, - &self.spec, - )?; - self.fork_choice.process_attestation(&state, &attestation)?; - } + if self + .fork_choice + .should_process_attestation(state, &attestation)? + { + // TODO: check validation. + let indexed_attestation = common::get_indexed_attestation(state, &attestation)?; + per_block_processing::is_valid_indexed_attestation( + state, + &indexed_attestation, + &self.spec, + )?; + self.fork_choice.process_attestation(&state, &attestation)?; } let result = self .op_pool - .insert_attestation(attestation, &*self.state.read(), &self.spec); + .insert_attestation(attestation, state, &self.spec); timer.observe_duration(); @@ -540,14 +632,32 @@ impl BeaconChain { self.metrics.attestation_processing_successes.inc(); } - result.map_err(|e| BeaconChainError::AttestationValidationError(e)) + result + .map(|_| AttestationProcessingOutcome::Processed) + .map_err(|e| Error::AttestationValidationError(e)) } + fn state_can_process_attestation( + state: &BeaconState, + data: &AttestationData, + head_block: &BeaconBlock, + ) -> bool { + (state.current_epoch() - 1 <= data.target.epoch) + && (data.target.epoch <= state.current_epoch() + 1) + && state + .get_block_root(head_block.slot) + .map(|root| *root == data.beacon_block_root) + .unwrap_or_else(|_| false) + } + + /* /// Retrieves the `BeaconState` used to create the attestation. fn get_attestation_state( &self, attestation: &Attestation, ) -> Option> { + let state = &self.head().beacon_state; + // Current state is used if the attestation targets a historic block and a slot within an // equal or adjacent epoch. let slots_per_epoch = T::EthSpec::slots_per_epoch(); @@ -580,6 +690,7 @@ impl BeaconChain { _ => None, } } + */ /// Accept some deposit and queue it for inclusion in an appropriate block. pub fn process_deposit( diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index 266c598ac..0b8fae7bf 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -34,6 +34,9 @@ pub enum BeaconChainError { MissingBeaconState(Hash256), SlotProcessingError(SlotProcessingError), MetricsError(String), + NoStateForAttestation { + beacon_block_root: Hash256, + }, AttestationValidationError(AttestationValidationError), IndexedAttestationValidationError(IndexedAttestationValidationError), } diff --git a/beacon_node/network/src/sync/simple_sync.rs b/beacon_node/network/src/sync/simple_sync.rs index ac001415c..13e9203dd 100644 --- a/beacon_node/network/src/sync/simple_sync.rs +++ b/beacon_node/network/src/sync/simple_sync.rs @@ -630,7 +630,12 @@ impl SimpleSync { _network: &mut NetworkContext, ) { match self.chain.process_attestation(msg) { - Ok(()) => info!(self.log, "ImportedAttestation"; "source" => "gossip"), + Ok(outcome) => info!( + self.log, + "Processed attestation"; + "source" => "gossip", + "outcome" => format!("{:?}", outcome) + ), Err(e) => { warn!(self.log, "InvalidAttestation"; "source" => "gossip", "error" => format!("{:?}", e)) } diff --git a/eth2/types/src/beacon_block.rs b/eth2/types/src/beacon_block.rs index 772ef0c46..ecf879799 100644 --- a/eth2/types/src/beacon_block.rs +++ b/eth2/types/src/beacon_block.rs @@ -62,6 +62,11 @@ impl BeaconBlock { } } + /// Returns the epoch corresponding to `self.slot`. + pub fn epoch(&self) -> Epoch { + self.slot.epoch(T::slots_per_epoch()) + } + /// Returns the `signed_root` of the block. /// /// Spec v0.8.1