diff --git a/eth2/operation_pool/Cargo.toml b/eth2/operation_pool/Cargo.toml index e68d318b5..07cb61864 100644 --- a/eth2/operation_pool/Cargo.toml +++ b/eth2/operation_pool/Cargo.toml @@ -5,5 +5,8 @@ authors = ["Michael Sproul "] edition = "2018" [dependencies] +int_to_bytes = { path = "../utils/int_to_bytes" } +itertools = "0.8" types = { path = "../types" } -state_processing = { path = "../../eth2/state_processing" } +state_processing = { path = "../state_processing" } +ssz = { path = "../utils/ssz" } diff --git a/eth2/operation_pool/src/lib.rs b/eth2/operation_pool/src/lib.rs index 7c2525099..7a647450c 100644 --- a/eth2/operation_pool/src/lib.rs +++ b/eth2/operation_pool/src/lib.rs @@ -1,11 +1,15 @@ -use std::collections::{btree_map::Entry, BTreeMap, HashSet}; - +use int_to_bytes::int_to_bytes8; +use itertools::Itertools; +use ssz::ssz_encode; use state_processing::per_block_processing::{ - verify_deposit_merkle_proof, verify_exit, verify_proposer_slashing, verify_transfer, - verify_transfer_partial, + validate_attestation, verify_deposit_merkle_proof, verify_exit, verify_proposer_slashing, + verify_transfer, verify_transfer_partial, }; +use std::collections::{btree_map::Entry, hash_map, BTreeMap, HashMap, HashSet}; +use types::chain_spec::Domain; use types::{ - AttesterSlashing, BeaconState, ChainSpec, Deposit, ProposerSlashing, Transfer, VoluntaryExit, + Attestation, AttestationData, AttesterSlashing, BeaconState, ChainSpec, Deposit, Epoch, + ProposerSlashing, Transfer, VoluntaryExit, }; #[cfg(test)] @@ -15,6 +19,8 @@ const VERIFY_DEPOSIT_PROOFS: bool = true; #[derive(Default)] pub struct OperationPool { + /// Map from attestation ID (see below) to vectors of attestations. + attestations: HashMap>, /// Map from deposit index to deposit data. // NOTE: We assume that there is only one deposit per index // because the Eth1 data is updated (at most) once per epoch, @@ -31,6 +37,54 @@ pub struct OperationPool { transfers: HashSet, } +/// Serialized `AttestationData` augmented with a domain to encode the fork info. +#[derive(PartialEq, Eq, Clone, Hash, Debug)] +struct AttestationId(Vec); + +/// Number of domain bytes that the end of an attestation ID is padded with. +const DOMAIN_BYTES_LEN: usize = 8; + +impl AttestationId { + fn from_data(attestation: &AttestationData, state: &BeaconState, spec: &ChainSpec) -> Self { + let mut bytes = ssz_encode(attestation); + let epoch = attestation.slot.epoch(spec.slots_per_epoch); + bytes.extend_from_slice(&AttestationId::compute_domain_bytes(epoch, state, spec)); + AttestationId(bytes) + } + + fn compute_domain_bytes(epoch: Epoch, state: &BeaconState, spec: &ChainSpec) -> Vec { + int_to_bytes8(spec.get_domain(epoch, Domain::Attestation, &state.fork)) + } + + fn domain_bytes_match(&self, domain_bytes: &[u8]) -> bool { + &self.0[self.0.len() - DOMAIN_BYTES_LEN..] == domain_bytes + } +} + +/// Compute a fitness score for an attestation. +/// +/// The score is calculated by determining the number of *new* attestations that +/// the aggregate attestation introduces, and is proportional to the size of the reward we will +/// receive for including it in a block. +// TODO: this could be optimised with a map from validator index to whether that validator has +// attested in the *current* epoch. Alternatively, we could cache an index that allows us to +// quickly look up the attestations in the current epoch for a given shard. +fn attestation_score(attestation: &Attestation, state: &BeaconState) -> usize { + // Bitfield of validators whose attestations are new/fresh. + let mut new_validators = attestation.aggregation_bitfield.clone(); + + state + .current_epoch_attestations + .iter() + .filter(|current_attestation| current_attestation.data.shard == attestation.data.shard) + .for_each(|current_attestation| { + // Remove the validators who have signed the existing attestation (they are not new) + new_validators.difference_inplace(¤t_attestation.aggregation_bitfield); + }); + + new_validators.num_set_bits() +} + #[derive(Debug, PartialEq, Clone)] pub enum DepositInsertStatus { /// The deposit was not already in the pool. @@ -47,6 +101,70 @@ impl OperationPool { Self::default() } + /// Insert an attestation into the pool, aggregating it with existing attestations if possible. + pub fn insert_attestation( + &mut self, + attestation: Attestation, + state: &BeaconState, + spec: &ChainSpec, + ) -> Result<(), ()> { + // Check that attestation signatures are valid. + // FIXME: should disable the time-dependent checks. + validate_attestation(state, &attestation, spec).map_err(|_| ())?; + + let id = AttestationId::from_data(&attestation.data, state, spec); + + let existing_attestations = match self.attestations.entry(id) { + hash_map::Entry::Vacant(entry) => { + entry.insert(vec![attestation]); + return Ok(()); + } + hash_map::Entry::Occupied(entry) => entry.into_mut(), + }; + + let mut aggregated = false; + for existing_attestation in existing_attestations.iter_mut() { + if existing_attestation.signers_disjoint_from(&attestation) { + existing_attestation.aggregate(&attestation); + aggregated = true; + } else if *existing_attestation == attestation { + aggregated = true; + } + } + + if !aggregated { + existing_attestations.push(attestation); + } + + Ok(()) + } + + /// Get a list of attestations for inclusion in a block. + pub fn get_attestations(&self, state: &BeaconState, spec: &ChainSpec) -> Vec { + // Attestations for the current fork... + // TODO: should we also check domain bytes for the previous epoch? + let current_epoch = state.slot.epoch(spec.slots_per_epoch); + let domain_bytes = AttestationId::compute_domain_bytes(current_epoch, state, spec); + self.attestations + .iter() + .filter(|(key, _)| key.domain_bytes_match(&domain_bytes)) + .flat_map(|(_, attestations)| attestations) + // That are valid... + .filter(|attestation| validate_attestation(state, attestation, spec).is_ok()) + // Scored by the number of new attestations they introduce (descending) + .map(|att| (att, attestation_score(att, state))) + .sorted_by_key(|&(_, score)| std::cmp::Reverse(score)) + // Limited to the maximum number of attestations per block + .take(spec.max_attestations as usize) + .map(|(att, _)| att) + .cloned() + .collect() + } + + pub fn prune_attestations(&self, _finalized_state: &BeaconState, _spec: &ChainSpec) { + // TODO + } + /// Add a deposit to the pool. /// /// No two distinct deposits should be added with the same index. diff --git a/eth2/types/src/attestation.rs b/eth2/types/src/attestation.rs index 0b660466e..6c572c852 100644 --- a/eth2/types/src/attestation.rs +++ b/eth2/types/src/attestation.rs @@ -28,6 +28,25 @@ pub struct Attestation { pub aggregate_signature: AggregateSignature, } +impl Attestation { + /// Are the aggregation bitfields of these attestations disjoint? + pub fn signers_disjoint_from(&self, other: &Attestation) -> bool { + self.aggregation_bitfield.intersection(&other.aggregation_bitfield).is_zero() + } + + /// Aggregate another Attestation into this one. + /// + /// The aggregation bitfields must be disjoint, and the data must be the same. + pub fn aggregate(&mut self, other: &Attestation) { + debug_assert_eq!(self.data, other.data); + debug_assert!(self.signers_disjoint_from(other)); + + self.aggregation_bitfield.union_inplace(&other.aggregation_bitfield); + self.custody_bitfield.union_inplace(&other.custody_bitfield); + // FIXME: signature aggregation once our BLS library wraps it + } +} + #[cfg(test)] mod tests { use super::*;