From 3dc5595a6fbbc3fd22e09fbcd36ca978d4b4186b Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Sun, 24 Mar 2019 16:35:07 +1100 Subject: [PATCH] Fix last errors stopping full chain sync --- beacon_node/beacon_chain/src/beacon_chain.rs | 87 +++++++++++++------ beacon_node/beacon_chain/src/checkpoint.rs | 2 +- beacon_node/beacon_chain/src/errors.rs | 5 ++ .../test_harness/src/beacon_chain_harness.rs | 4 +- beacon_node/network/src/sync/simple_sync.rs | 60 ++++++------- beacon_node/network/tests/tests.rs | 24 ++++- 6 files changed, 121 insertions(+), 61 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 966e73210..745ba5155 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -26,7 +26,10 @@ pub enum ValidBlock { #[derive(Debug, PartialEq)] pub enum InvalidBlock { /// The block slot is greater than the present slot. - FutureSlot, + FutureSlot { + present_slot: Slot, + block_slot: Slot, + }, /// The block state_root does not match the generated state. StateRootMismatch, /// The blocks parent_root is unknown. @@ -53,7 +56,7 @@ impl BlockProcessingOutcome { match self { BlockProcessingOutcome::ValidBlock(_) => false, BlockProcessingOutcome::InvalidBlock(r) => match r { - InvalidBlock::FutureSlot => true, + InvalidBlock::FutureSlot { .. } => true, InvalidBlock::StateRootMismatch => true, InvalidBlock::ParentUnknown => false, InvalidBlock::SlotProcessingError(_) => false, @@ -302,6 +305,49 @@ where self.canonical_head.read() } + /// Updates the canonical `BeaconState` with the supplied state. + /// + /// Advances the chain forward to the present slot. This method is better than just setting + /// state and calling `catchup_state` as it will not result in an old state being installed and + /// then having it iteratively updated -- in such a case it's possible for another thread to + /// find the state at an old slot. + pub fn update_state(&self, mut state: BeaconState) -> Result<(), Error> { + let latest_block_header = self.head().beacon_block.block_header(); + + let present_slot = match self.slot_clock.present_slot() { + Ok(Some(slot)) => slot, + _ => return Err(Error::UnableToReadSlot), + }; + + // If required, transition the new state to the present slot. + for _ in state.slot.as_u64()..present_slot.as_u64() { + per_slot_processing(&mut state, &latest_block_header, &self.spec)?; + } + + *self.state.write() = state; + + Ok(()) + } + + /// Ensures the current canonical `BeaconState` has been transitioned to match the `slot_clock`. + pub fn catchup_state(&self) -> Result<(), Error> { + let latest_block_header = self.head().beacon_block.block_header(); + + let present_slot = match self.slot_clock.present_slot() { + Ok(Some(slot)) => slot, + _ => return Err(Error::UnableToReadSlot), + }; + + let mut state = self.state.write(); + + // If required, transition the new state to the present slot. + for _ in state.slot.as_u64()..present_slot.as_u64() { + per_slot_processing(&mut *state, &latest_block_header, &self.spec)?; + } + + Ok(()) + } + /// Update the justified head to some new values. pub fn update_finalized_head( &self, @@ -325,28 +371,6 @@ where self.finalized_head.read() } - /// Advance the `self.state` `BeaconState` to the supplied slot. - /// - /// This will perform per_slot and per_epoch processing as required. - /// - /// The `previous_block_root` will be set to the root of the current head block (as determined - /// by the fork-choice rule). - /// - /// It is important to note that this is _not_ the state corresponding to the canonical head - /// block, instead it is that state which may or may not have had additional per slot/epoch - /// processing applied to it. - pub fn advance_state(&self, slot: Slot) -> Result<(), SlotProcessingError> { - let state_slot = self.state.read().slot; - - let latest_block_header = self.head().beacon_block.block_header(); - - for _ in state_slot.as_u64()..slot.as_u64() { - per_slot_processing(&mut *self.state.write(), &latest_block_header, &self.spec)?; - } - - Ok(()) - } - /// Returns the validator index (if any) for the given public key. /// /// Information is retrieved from the present `beacon_state.validator_registry`. @@ -724,7 +748,10 @@ where if block.slot > present_slot { return Ok(BlockProcessingOutcome::InvalidBlock( - InvalidBlock::FutureSlot, + InvalidBlock::FutureSlot { + present_slot, + block_slot: block.slot, + }, )); } @@ -800,8 +827,9 @@ where // run instead. if self.head().beacon_block_root == parent_block_root { self.update_canonical_head(block.clone(), block_root, state.clone(), state_root); - // Update the local state variable. - *self.state.write() = state; + + // Update the canonical `BeaconState`. + self.update_state(state)?; } Ok(BlockProcessingOutcome::ValidBlock(ValidBlock::Processed)) @@ -891,7 +919,10 @@ where .ok_or_else(|| Error::MissingBeaconState(block.state_root))?; let state_root = state.canonical_root(); - self.update_canonical_head(block, block_root, state, state_root); + self.update_canonical_head(block, block_root, state.clone(), state_root); + + // Update the canonical `BeaconState`. + self.update_state(state)?; } Ok(()) diff --git a/beacon_node/beacon_chain/src/checkpoint.rs b/beacon_node/beacon_chain/src/checkpoint.rs index 828e462de..78227e5c8 100644 --- a/beacon_node/beacon_chain/src/checkpoint.rs +++ b/beacon_node/beacon_chain/src/checkpoint.rs @@ -3,7 +3,7 @@ use types::{BeaconBlock, BeaconState, Hash256}; /// Represents some block and it's associated state. Generally, this will be used for tracking the /// head, justified head and finalized head. -#[derive(Clone, Serialize)] +#[derive(Clone, Serialize, PartialEq, Debug)] pub struct CheckPoint { pub beacon_block: BeaconBlock, pub beacon_block_root: Hash256, diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index b5f17efd2..a84e4b10e 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -1,5 +1,6 @@ use fork_choice::ForkChoiceError; use state_processing::BlockProcessingError; +use state_processing::SlotProcessingError; use types::*; macro_rules! easy_from_to { @@ -16,14 +17,18 @@ macro_rules! easy_from_to { pub enum BeaconChainError { InsufficientValidators, BadRecentBlockRoots, + UnableToReadSlot, BeaconStateError(BeaconStateError), DBInconsistent(String), DBError(String), ForkChoiceError(ForkChoiceError), MissingBeaconBlock(Hash256), MissingBeaconState(Hash256), + SlotProcessingError(SlotProcessingError), } +easy_from_to!(SlotProcessingError, BeaconChainError); + #[derive(Debug, PartialEq)] pub enum BlockProductionError { UnableToGetBlockRootFromState, diff --git a/beacon_node/beacon_chain/test_harness/src/beacon_chain_harness.rs b/beacon_node/beacon_chain/test_harness/src/beacon_chain_harness.rs index 1207fcf28..1498796b1 100644 --- a/beacon_node/beacon_chain/test_harness/src/beacon_chain_harness.rs +++ b/beacon_node/beacon_chain/test_harness/src/beacon_chain_harness.rs @@ -131,7 +131,9 @@ impl BeaconChainHarness { ); self.beacon_chain.slot_clock.set_slot(slot.as_u64()); - self.beacon_chain.advance_state(slot).unwrap(); + self.beacon_chain + .catchup_state() + .expect("Failed to catch state"); slot } diff --git a/beacon_node/network/src/sync/simple_sync.rs b/beacon_node/network/src/sync/simple_sync.rs index 369564a5e..76d630b9a 100644 --- a/beacon_node/network/src/sync/simple_sync.rs +++ b/beacon_node/network/src/sync/simple_sync.rs @@ -430,7 +430,7 @@ impl SimpleSync { let mut errored = 0; // Loop through all of the complete blocks in the queue. - for (queue_index, block, sender) in self.import_queue.complete_blocks() { + for (block_root, block, sender) in self.import_queue.complete_blocks() { match self.chain.process_block(block) { Ok(outcome) => { if outcome.is_invalid() { @@ -447,7 +447,7 @@ impl SimpleSync { // If this results to true, the item will be removed from the queue. if outcome.sucessfully_processed() { successful += 1; - self.import_queue.partials.remove(queue_index); + self.import_queue.remove(block_root); } } Err(e) => { @@ -457,13 +457,15 @@ impl SimpleSync { } } - info!( - self.log, - "ProcessBlocks"; - "invalid" => invalid, - "successful" => successful, - "errored" => errored, - ) + if successful > 0 { + info!(self.log, "Imported {} blocks", successful) + } + if invalid > 0 { + warn!(self.log, "Rejected {} invalid blocks", invalid) + } + if errored > 0 { + warn!(self.log, "Failed to process {} blocks", errored) + } } fn request_block_roots( @@ -548,33 +550,35 @@ impl ImportQueue { } } - /// Completes all possible partials into `BeaconBlock` and returns them, sorted by slot number. - /// Does not delete the partials from the queue, this must be done manually. + /// Completes all possible partials into `BeaconBlock` and returns them, sorted by increasing + /// slot number. Does not delete the partials from the queue, this must be done manually. /// /// Returns `(queue_index, block, sender)`: /// - /// - `queue_index`: used to remove the entry if it is successfully processed. + /// - `block_root`: may be used to remove the entry if it is successfully processed. /// - `block`: the completed block. /// - `sender`: the `PeerId` the provided the `BeaconBlockBody` which completed the partial. - pub fn complete_blocks(&self) -> Vec<(usize, BeaconBlock, PeerId)> { - let mut completable: Vec<(usize, &PartialBeaconBlock)> = self + pub fn complete_blocks(&self) -> Vec<(Hash256, BeaconBlock, PeerId)> { + let mut complete: Vec<(Hash256, BeaconBlock, PeerId)> = self .partials .iter() - .enumerate() - .filter(|(_i, partial)| partial.completable()) + .filter_map(|partial| partial.clone().complete()) .collect(); // Sort the completable partials to be in ascending slot order. - completable.sort_unstable_by(|a, b| a.1.header.slot.partial_cmp(&b.1.header.slot).unwrap()); + complete.sort_unstable_by(|a, b| a.1.slot.partial_cmp(&b.1.slot).unwrap()); - completable + complete + } + + /// Removes the first `PartialBeaconBlock` with a matching `block_root`, returning the partial + /// if it exists. + pub fn remove(&mut self, block_root: Hash256) -> Option { + let position = self + .partials .iter() - .map(|(i, partial)| { - let (block, _root, sender) = - (*partial).clone().complete().expect("Body must be Some"); - (*i, block, sender) - }) - .collect() + .position(|p| p.block_root == block_root)?; + Some(self.partials.remove(position)) } /// Flushes all stale entries from the queue. @@ -716,15 +720,11 @@ pub struct PartialBeaconBlock { } impl PartialBeaconBlock { - pub fn completable(&self) -> bool { - self.body.is_some() - } - /// Given a `body`, consumes `self` and returns a complete `BeaconBlock` along with its root. - pub fn complete(self) -> Option<(BeaconBlock, Hash256, PeerId)> { + pub fn complete(self) -> Option<(Hash256, BeaconBlock, PeerId)> { Some(( - self.header.into_block(self.body?), self.block_root, + self.header.into_block(self.body?), self.sender, )) } diff --git a/beacon_node/network/tests/tests.rs b/beacon_node/network/tests/tests.rs index 2952e5105..e12bf2628 100644 --- a/beacon_node/network/tests/tests.rs +++ b/beacon_node/network/tests/tests.rs @@ -541,6 +541,28 @@ fn sync_two_nodes() { // A provides block bodies to B. node_a.tee_block_body_response(&node_b); - std::thread::sleep(Duration::from_secs(60)); + std::thread::sleep(Duration::from_secs(10)); + + node_b.harness.run_fork_choice(); + + let node_a_chain = node_a + .harness + .beacon_chain + .chain_dump() + .expect("Can't dump node a chain"); + + let node_b_chain = node_b + .harness + .beacon_chain + .chain_dump() + .expect("Can't dump node b chain"); + + assert_eq!( + node_a_chain.len(), + node_b_chain.len(), + "Chains should be equal length" + ); + assert_eq!(node_a_chain, node_b_chain, "Chains should be identical"); + runtime.shutdown_now(); }