From 50ef0d7fbf28001021bf65362a0b9dfa54fafcda Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Mon, 20 Apr 2020 12:34:37 +1000 Subject: [PATCH] Check attestation shuffling when producing blocks (#900) Closes #845 --- beacon_node/beacon_chain/src/beacon_chain.rs | 88 ++++- beacon_node/beacon_chain/src/test_utils.rs | 4 +- .../tests/attestation_production.rs | 4 +- beacon_node/beacon_chain/tests/store_tests.rs | 305 +++++++++++++++++- eth2/operation_pool/src/lib.rs | 15 +- .../src/proto_array.rs | 37 +++ 6 files changed, 429 insertions(+), 24 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 2d1ee5c8a..8f4ef7c64 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -25,6 +25,7 @@ use state_processing::{ }; use std::borrow::Cow; use std::cmp::Ordering; +use std::collections::HashMap; use std::fs; use std::io::prelude::*; use std::sync::Arc; @@ -1112,6 +1113,76 @@ impl BeaconChain { } } + /// Check that the shuffling at `block_root` is equal to one of the shufflings of `state`. + /// + /// The `target_epoch` argument determines which shuffling to check compatibility with, it + /// should be equal to the current or previous epoch of `state`, or else `false` will be + /// returned. + /// + /// The compatibility check is designed to be fast: we check that the block that + /// determined the RANDAO mix for the `target_epoch` matches the ancestor of the block + /// identified by `block_root` (at that slot). + pub fn shuffling_is_compatible( + &self, + block_root: &Hash256, + target_epoch: Epoch, + state: &BeaconState, + ) -> bool { + let slots_per_epoch = T::EthSpec::slots_per_epoch(); + let shuffling_lookahead = 1 + self.spec.min_seed_lookahead.as_u64(); + + // Shuffling can't have changed if we're in the first few epochs + if state.current_epoch() < shuffling_lookahead { + return true; + } + + // Otherwise the shuffling is determined by the block at the end of the target epoch + // minus the shuffling lookahead (usually 2). We call this the "pivot". + let pivot_slot = + if target_epoch == state.previous_epoch() || target_epoch == state.current_epoch() { + (target_epoch - shuffling_lookahead).end_slot(slots_per_epoch) + } else { + return false; + }; + + let state_pivot_block_root = match state.get_block_root(pivot_slot) { + Ok(root) => *root, + Err(e) => { + warn!( + &self.log, + "Missing pivot block root for attestation"; + "slot" => pivot_slot, + "error" => format!("{:?}", e), + ); + return false; + } + }; + + // Use fork choice's view of the block DAG to quickly evaluate whether the attestation's + // pivot block is the same as the current state's pivot block. If it is, then the + // attestation's shuffling is the same as the current state's. + // To account for skipped slots, find the first block at *or before* the pivot slot. + let fork_choice_lock = self.fork_choice.core_proto_array(); + let pivot_block_root = fork_choice_lock + .iter_block_roots(block_root) + .find(|(_, slot)| *slot <= pivot_slot) + .map(|(block_root, _)| block_root); + drop(fork_choice_lock); + + match pivot_block_root { + Some(root) => root == state_pivot_block_root, + None => { + debug!( + &self.log, + "Discarding attestation because of missing ancestor"; + "pivot_slot" => pivot_slot.as_u64(), + "block_root" => format!("{:?}", block_root), + ); + false + } + } + } + /// Accept some exit and queue it for inclusion in an appropriate block. pub fn process_voluntary_exit( &self, @@ -1638,6 +1709,21 @@ impl BeaconChain { .deposits_for_block_inclusion(&state, ð1_data, &self.spec)? .into(); + // Map from attestation head block root to shuffling compatibility. + // Used to memoize the `attestation_shuffling_is_compatible` function. + let mut shuffling_filter_cache = HashMap::new(); + let attestation_filter = |att: &&Attestation| -> bool { + *shuffling_filter_cache + .entry((att.data.beacon_block_root, att.data.target.epoch)) + .or_insert_with(|| { + self.shuffling_is_compatible( + &att.data.beacon_block_root, + att.data.target.epoch, + &state, + ) + }) + }; + let mut block = SignedBeaconBlock { message: BeaconBlock { slot: state.slot, @@ -1652,7 +1738,7 @@ impl BeaconChain { attester_slashings: attester_slashings.into(), attestations: self .op_pool - .get_attestations(&state, &self.spec) + .get_attestations(&state, attestation_filter, &self.spec) .map_err(BlockProductionError::OpPoolError)? .into(), deposits, diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index d9d15b7b9..ad957c874 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -353,7 +353,9 @@ where .process_attestation(attestation) .expect("should not error during attestation processing") { - AttestationProcessingOutcome::Processed => (), + // PastEpoch can occur if we fork over several epochs + AttestationProcessingOutcome::Processed + | AttestationProcessingOutcome::PastEpoch { .. } => (), other => panic!("did not successfully process attestation: {:?}", other), } }); diff --git a/beacon_node/beacon_chain/tests/attestation_production.rs b/beacon_node/beacon_chain/tests/attestation_production.rs index 3b84a78ff..5a305e7cc 100644 --- a/beacon_node/beacon_chain/tests/attestation_production.rs +++ b/beacon_node/beacon_chain/tests/attestation_production.rs @@ -40,8 +40,8 @@ fn produces_attestations() { let state = &harness.chain.head().expect("should get head").beacon_state; assert_eq!(state.slot, num_blocks_produced, "head should have updated"); - assert!( - state.finalized_checkpoint.epoch > 0, + assert_ne!( + state.finalized_checkpoint.epoch, 0, "head should have updated" ); diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index ea36a0125..c4dd0b58c 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -20,11 +20,12 @@ use types::test_utils::{SeedableRng, XorShiftRng}; use types::*; // Should ideally be divisible by 3. -pub const VALIDATOR_COUNT: usize = 24; +pub const LOW_VALIDATOR_COUNT: usize = 24; +pub const HIGH_VALIDATOR_COUNT: usize = 64; lazy_static! { /// A cached set of keys. - static ref KEYPAIRS: Vec = types::test_utils::generate_deterministic_keypairs(VALIDATOR_COUNT); + static ref KEYPAIRS: Vec = types::test_utils::generate_deterministic_keypairs(HIGH_VALIDATOR_COUNT); } type E = MinimalEthSpec; @@ -57,7 +58,7 @@ fn full_participation_no_skips() { let num_blocks_produced = E::slots_per_epoch() * 5; let db_path = tempdir().unwrap(); let store = get_store(&db_path); - let harness = get_harness(store.clone(), VALIDATOR_COUNT); + let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT); harness.extend_chain( num_blocks_produced as usize, @@ -77,7 +78,7 @@ fn randomised_skips() { let mut num_blocks_produced = 0; let db_path = tempdir().unwrap(); let store = get_store(&db_path); - let harness = get_harness(store.clone(), VALIDATOR_COUNT); + let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT); let rng = &mut XorShiftRng::from_seed([42; 16]); let mut head_slot = 0; @@ -113,14 +114,16 @@ fn randomised_skips() { fn long_skip() { let db_path = tempdir().unwrap(); let store = get_store(&db_path); - let harness = get_harness(store.clone(), VALIDATOR_COUNT); + let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT); // Number of blocks to create in the first run, intentionally not falling on an epoch // boundary in order to check that the DB hot -> cold migration is capable of reaching // back across the skip distance, and correctly migrating those extra non-finalized states. let initial_blocks = E::slots_per_epoch() * 5 + E::slots_per_epoch() / 2; let skip_slots = E::slots_per_historical_root() as u64 * 8; - let final_blocks = E::slots_per_epoch() * 4; + // Create the minimum ~2.5 epochs of extra blocks required to re-finalize the chain. + // Having this set lower ensures that we start justifying and finalizing quickly after a skip. + let final_blocks = 2 * E::slots_per_epoch() + E::slots_per_epoch() / 2; harness.extend_chain( initial_blocks as usize, @@ -223,7 +226,7 @@ fn split_slot_restore() { let split_slot = { let store = get_store(&db_path); - let harness = get_harness(store.clone(), VALIDATOR_COUNT); + let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT); let num_blocks = 4 * E::slots_per_epoch(); @@ -251,10 +254,10 @@ fn epoch_boundary_state_attestation_processing() { let num_blocks_produced = E::slots_per_epoch() * 5; let db_path = tempdir().unwrap(); let store = get_store(&db_path); - let harness = get_harness(store.clone(), VALIDATOR_COUNT); + let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT); let late_validators = vec![0, 1]; - let timely_validators = (2..VALIDATOR_COUNT).collect::>(); + let timely_validators = (2..LOW_VALIDATOR_COUNT).collect::>(); let mut late_attestations = vec![]; @@ -333,7 +336,7 @@ fn epoch_boundary_state_attestation_processing() { fn delete_blocks_and_states() { let db_path = tempdir().unwrap(); let store = get_store(&db_path); - let harness = get_harness(store.clone(), VALIDATOR_COUNT); + let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT); let unforked_blocks = 4 * E::slots_per_epoch(); @@ -345,13 +348,11 @@ fn delete_blocks_and_states() { ); // Create a fork post-finalization. - let two_thirds = (VALIDATOR_COUNT / 3) * 2; + let two_thirds = (LOW_VALIDATOR_COUNT / 3) * 2; let honest_validators: Vec = (0..two_thirds).collect(); - let faulty_validators: Vec = (two_thirds..VALIDATOR_COUNT).collect(); + let faulty_validators: Vec = (two_thirds..LOW_VALIDATOR_COUNT).collect(); - // NOTE: should remove this -1 and/or write a similar test once #845 is resolved - // https://github.com/sigp/lighthouse/issues/845 - let fork_blocks = 2 * E::slots_per_epoch() - 1; + let fork_blocks = 2 * E::slots_per_epoch(); let (honest_head, faulty_head) = harness.generate_two_forks_by_skipping_a_block( &honest_validators, @@ -425,6 +426,280 @@ fn delete_blocks_and_states() { check_chain_dump(&harness, unforked_blocks + fork_blocks + 1); } +// Check that we never produce invalid blocks when there is deep forking that changes the shuffling. +// See https://github.com/sigp/lighthouse/issues/845 +fn multi_epoch_fork_valid_blocks_test( + initial_blocks: usize, + num_fork1_blocks: usize, + num_fork2_blocks: usize, + num_fork1_validators: usize, +) -> (TempDir, TestHarness, Hash256, Hash256) { + let db_path = tempdir().unwrap(); + let store = get_store(&db_path); + let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT); + + // Create the initial portion of the chain + if initial_blocks > 0 { + harness.extend_chain( + initial_blocks, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ); + } + + assert!(num_fork1_validators <= LOW_VALIDATOR_COUNT); + let fork1_validators: Vec = (0..num_fork1_validators).collect(); + let fork2_validators: Vec = (num_fork1_validators..LOW_VALIDATOR_COUNT).collect(); + + let (head1, head2) = harness.generate_two_forks_by_skipping_a_block( + &fork1_validators, + &fork2_validators, + num_fork1_blocks, + num_fork2_blocks, + ); + + (db_path, harness, head1, head2) +} + +// This is the minimal test of block production with different shufflings. +#[test] +fn block_production_different_shuffling_early() { + let slots_per_epoch = E::slots_per_epoch() as usize; + multi_epoch_fork_valid_blocks_test( + slots_per_epoch - 2, + slots_per_epoch + 3, + slots_per_epoch + 3, + LOW_VALIDATOR_COUNT / 2, + ); +} + +#[test] +fn block_production_different_shuffling_long() { + let slots_per_epoch = E::slots_per_epoch() as usize; + multi_epoch_fork_valid_blocks_test( + 2 * slots_per_epoch - 2, + 3 * slots_per_epoch, + 3 * slots_per_epoch, + LOW_VALIDATOR_COUNT / 2, + ); +} + +// Check that the op pool safely includes multiple attestations per block when necessary. +// This checks the correctness of the shuffling compatibility memoization. +#[test] +fn multiple_attestations_per_block() { + let db_path = tempdir().unwrap(); + let store = get_store(&db_path); + let harness = get_harness(store, HIGH_VALIDATOR_COUNT); + let chain = &harness.chain; + + harness.extend_chain( + MainnetEthSpec::slots_per_epoch() as usize * 3, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ); + + let head = chain.head().unwrap(); + let committees_per_slot = head + .beacon_state + .get_committee_count_at_slot(head.beacon_state.slot) + .unwrap(); + assert!(committees_per_slot > 1); + + for snapshot in chain.chain_dump().unwrap() { + assert_eq!( + snapshot.beacon_block.message.body.attestations.len() as u64, + if snapshot.beacon_block.slot() <= 1 { + 0 + } else { + committees_per_slot + } + ); + } +} + +#[test] +fn shuffling_compatible_linear_chain() { + let db_path = tempdir().unwrap(); + let store = get_store(&db_path); + let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT); + + // Skip the block at the end of the first epoch. + let head_block_root = harness.extend_chain( + 4 * E::slots_per_epoch() as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ); + + check_shuffling_compatible( + &harness, + &get_state_for_block(&harness, head_block_root), + head_block_root, + true, + true, + None, + None, + ); +} + +#[test] +fn shuffling_compatible_missing_pivot_block() { + let db_path = tempdir().unwrap(); + let store = get_store(&db_path); + let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT); + + // Skip the block at the end of the first epoch. + harness.extend_chain( + E::slots_per_epoch() as usize - 2, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ); + harness.advance_slot(); + harness.advance_slot(); + let head_block_root = harness.extend_chain( + 2 * E::slots_per_epoch() as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ); + + check_shuffling_compatible( + &harness, + &get_state_for_block(&harness, head_block_root), + head_block_root, + true, + true, + Some(E::slots_per_epoch() - 2), + Some(E::slots_per_epoch() - 2), + ); +} + +#[test] +fn shuffling_compatible_simple_fork() { + let slots_per_epoch = E::slots_per_epoch() as usize; + let (db_path, harness, head1, head2) = multi_epoch_fork_valid_blocks_test( + 2 * slots_per_epoch, + 3 * slots_per_epoch, + 3 * slots_per_epoch, + LOW_VALIDATOR_COUNT / 2, + ); + + let head1_state = get_state_for_block(&harness, head1); + let head2_state = get_state_for_block(&harness, head2); + + check_shuffling_compatible(&harness, &head1_state, head1, true, true, None, None); + check_shuffling_compatible(&harness, &head1_state, head2, false, false, None, None); + check_shuffling_compatible(&harness, &head2_state, head1, false, false, None, None); + check_shuffling_compatible(&harness, &head2_state, head2, true, true, None, None); + + drop(db_path); +} + +#[test] +fn shuffling_compatible_short_fork() { + let slots_per_epoch = E::slots_per_epoch() as usize; + let (db_path, harness, head1, head2) = multi_epoch_fork_valid_blocks_test( + 2 * slots_per_epoch - 2, + slots_per_epoch + 2, + slots_per_epoch + 2, + LOW_VALIDATOR_COUNT / 2, + ); + + let head1_state = get_state_for_block(&harness, head1); + let head2_state = get_state_for_block(&harness, head2); + + check_shuffling_compatible(&harness, &head1_state, head1, true, true, None, None); + check_shuffling_compatible(&harness, &head1_state, head2, false, true, None, None); + // NOTE: don't check this case, as block 14 from the first chain appears valid on the second + // chain due to it matching the second chain's block 15. + // check_shuffling_compatible(&harness, &head2_state, head1, false, true, None, None); + check_shuffling_compatible( + &harness, + &head2_state, + head2, + true, + true, + // Required because of the skipped slot. + Some(2 * E::slots_per_epoch() - 2), + None, + ); + + drop(db_path); +} + +fn get_state_for_block(harness: &TestHarness, block_root: Hash256) -> BeaconState { + let head_block = harness.chain.get_block(&block_root).unwrap().unwrap(); + harness + .chain + .get_state(&head_block.state_root(), Some(head_block.slot())) + .unwrap() + .unwrap() +} + +/// Check the invariants that apply to `shuffling_is_compatible`. +fn check_shuffling_compatible( + harness: &TestHarness, + head_state: &BeaconState, + head_block_root: Hash256, + current_epoch_valid: bool, + previous_epoch_valid: bool, + current_epoch_cutoff_slot: Option, + previous_epoch_cutoff_slot: Option, +) { + let shuffling_lookahead = harness.chain.spec.min_seed_lookahead.as_u64() + 1; + let current_pivot_slot = + (head_state.current_epoch() - shuffling_lookahead).end_slot(E::slots_per_epoch()); + let previous_pivot_slot = + (head_state.previous_epoch() - shuffling_lookahead).end_slot(E::slots_per_epoch()); + + for (block_root, slot) in harness + .chain + .rev_iter_block_roots_from(head_block_root) + .unwrap() + { + // Shuffling is compatible targeting the current epoch, + // iff slot is greater than or equal to the current epoch pivot block + assert_eq!( + harness.chain.shuffling_is_compatible( + &block_root, + head_state.current_epoch(), + &head_state + ), + current_epoch_valid + && slot >= current_epoch_cutoff_slot.unwrap_or(current_pivot_slot.as_u64()) + ); + // Similarly for the previous epoch + assert_eq!( + harness.chain.shuffling_is_compatible( + &block_root, + head_state.previous_epoch(), + &head_state + ), + previous_epoch_valid + && slot >= previous_epoch_cutoff_slot.unwrap_or(previous_pivot_slot.as_u64()) + ); + // Targeting the next epoch should always return false + assert_eq!( + harness.chain.shuffling_is_compatible( + &block_root, + head_state.current_epoch() + 1, + &head_state + ), + false + ); + // Targeting two epochs before the current epoch should also always return false + if head_state.current_epoch() >= 2 { + assert_eq!( + harness.chain.shuffling_is_compatible( + &block_root, + head_state.current_epoch() - 2, + &head_state + ), + false + ); + } + } +} + /// Check that the head state's slot matches `expected_slot`. fn check_slot(harness: &TestHarness, expected_slot: u64) { let state = &harness.chain.head().expect("should get head").beacon_state; diff --git a/eth2/operation_pool/src/lib.rs b/eth2/operation_pool/src/lib.rs index 7718c227a..ac2c1ed31 100644 --- a/eth2/operation_pool/src/lib.rs +++ b/eth2/operation_pool/src/lib.rs @@ -98,10 +98,14 @@ impl OperationPool { /// Get a list of attestations for inclusion in a block. /// - /// NOTE: Assumes that all attestations in the operation_pool are valid. + /// The `validity_filter` is a closure that provides extra filtering of the attestations + /// before an approximately optimal bundle is constructed. We use it to provide access + /// to the fork choice data from the `BeaconChain` struct that doesn't logically belong + /// in the operation pool. pub fn get_attestations( &self, state: &BeaconState, + validity_filter: impl FnMut(&&Attestation) -> bool, spec: &ChainSpec, ) -> Result>, OpPoolError> { // Attestations for the current fork, which may be from the current or previous epoch. @@ -143,6 +147,7 @@ impl OperationPool { ) .is_ok() }) + .filter(validity_filter) .flat_map(|att| AttMaxCover::new(att, state, total_active_balance, spec)); Ok(maximum_cover( @@ -584,7 +589,7 @@ mod release_tests { state.slot -= 1; assert_eq!( op_pool - .get_attestations(state, spec) + .get_attestations(state, |_| true, spec) .expect("should have attestations") .len(), 0 @@ -594,7 +599,7 @@ mod release_tests { state.slot += spec.min_attestation_inclusion_delay; let block_attestations = op_pool - .get_attestations(state, spec) + .get_attestations(state, |_| true, spec) .expect("Should have block attestations"); assert_eq!(block_attestations.len(), committees.len()); @@ -764,7 +769,7 @@ mod release_tests { state.slot += spec.min_attestation_inclusion_delay; let best_attestations = op_pool - .get_attestations(state, spec) + .get_attestations(state, |_| true, spec) .expect("should have best attestations"); assert_eq!(best_attestations.len(), max_attestations); @@ -839,7 +844,7 @@ mod release_tests { state.slot += spec.min_attestation_inclusion_delay; let best_attestations = op_pool - .get_attestations(state, spec) + .get_attestations(state, |_| true, spec) .expect("should have valid best attestations"); assert_eq!(best_attestations.len(), max_attestations); diff --git a/eth2/proto_array_fork_choice/src/proto_array.rs b/eth2/proto_array_fork_choice/src/proto_array.rs index ece8648bf..8516f8029 100644 --- a/eth2/proto_array_fork_choice/src/proto_array.rs +++ b/eth2/proto_array_fork_choice/src/proto_array.rs @@ -407,4 +407,41 @@ impl ProtoArray { && (node.finalized_epoch == self.finalized_epoch || self.finalized_epoch == Epoch::new(0)) } + + /// Return a reverse iterator over the nodes which comprise the chain ending at `block_root`. + pub fn iter_nodes<'a>(&'a self, block_root: &Hash256) -> Iter<'a> { + let next_node_index = self.indices.get(block_root).copied(); + Iter { + next_node_index, + proto_array: self, + } + } + + /// Return a reverse iterator over the block roots of the chain ending at `block_root`. + /// + /// Note that unlike many other iterators, this one WILL NOT yield anything at skipped slots. + pub fn iter_block_roots<'a>( + &'a self, + block_root: &Hash256, + ) -> impl Iterator + 'a { + self.iter_nodes(block_root) + .map(|node| (node.root, node.slot)) + } +} + +/// Reverse iterator over one path through a `ProtoArray`. +pub struct Iter<'a> { + next_node_index: Option, + proto_array: &'a ProtoArray, +} + +impl<'a> Iterator for Iter<'a> { + type Item = &'a ProtoNode; + + fn next(&mut self) -> Option { + let next_node_index = self.next_node_index?; + let node = self.proto_array.nodes.get(next_node_index)?; + self.next_node_index = node.parent; + Some(node) + } }