Fix last errors stopping full chain sync

This commit is contained in:
Paul Hauner 2019-03-24 16:35:07 +11:00
parent 1ea9959632
commit 3dc5595a6f
No known key found for this signature in database
GPG Key ID: D362883A9218FCC6
6 changed files with 121 additions and 61 deletions

View File

@ -26,7 +26,10 @@ pub enum ValidBlock {
#[derive(Debug, PartialEq)]
pub enum InvalidBlock {
/// The block slot is greater than the present slot.
FutureSlot,
FutureSlot {
present_slot: Slot,
block_slot: Slot,
},
/// The block state_root does not match the generated state.
StateRootMismatch,
/// The blocks parent_root is unknown.
@ -53,7 +56,7 @@ impl BlockProcessingOutcome {
match self {
BlockProcessingOutcome::ValidBlock(_) => false,
BlockProcessingOutcome::InvalidBlock(r) => match r {
InvalidBlock::FutureSlot => true,
InvalidBlock::FutureSlot { .. } => true,
InvalidBlock::StateRootMismatch => true,
InvalidBlock::ParentUnknown => false,
InvalidBlock::SlotProcessingError(_) => false,
@ -302,6 +305,49 @@ where
self.canonical_head.read()
}
/// Updates the canonical `BeaconState` with the supplied state.
///
/// Advances the chain forward to the present slot. This method is better than just setting
/// state and calling `catchup_state` as it will not result in an old state being installed and
/// then having it iteratively updated -- in such a case it's possible for another thread to
/// find the state at an old slot.
pub fn update_state(&self, mut state: BeaconState) -> Result<(), Error> {
let latest_block_header = self.head().beacon_block.block_header();
let present_slot = match self.slot_clock.present_slot() {
Ok(Some(slot)) => slot,
_ => return Err(Error::UnableToReadSlot),
};
// If required, transition the new state to the present slot.
for _ in state.slot.as_u64()..present_slot.as_u64() {
per_slot_processing(&mut state, &latest_block_header, &self.spec)?;
}
*self.state.write() = state;
Ok(())
}
/// Ensures the current canonical `BeaconState` has been transitioned to match the `slot_clock`.
pub fn catchup_state(&self) -> Result<(), Error> {
let latest_block_header = self.head().beacon_block.block_header();
let present_slot = match self.slot_clock.present_slot() {
Ok(Some(slot)) => slot,
_ => return Err(Error::UnableToReadSlot),
};
let mut state = self.state.write();
// If required, transition the new state to the present slot.
for _ in state.slot.as_u64()..present_slot.as_u64() {
per_slot_processing(&mut *state, &latest_block_header, &self.spec)?;
}
Ok(())
}
/// Update the justified head to some new values.
pub fn update_finalized_head(
&self,
@ -325,28 +371,6 @@ where
self.finalized_head.read()
}
/// Advance the `self.state` `BeaconState` to the supplied slot.
///
/// This will perform per_slot and per_epoch processing as required.
///
/// The `previous_block_root` will be set to the root of the current head block (as determined
/// by the fork-choice rule).
///
/// It is important to note that this is _not_ the state corresponding to the canonical head
/// block, instead it is that state which may or may not have had additional per slot/epoch
/// processing applied to it.
pub fn advance_state(&self, slot: Slot) -> Result<(), SlotProcessingError> {
let state_slot = self.state.read().slot;
let latest_block_header = self.head().beacon_block.block_header();
for _ in state_slot.as_u64()..slot.as_u64() {
per_slot_processing(&mut *self.state.write(), &latest_block_header, &self.spec)?;
}
Ok(())
}
/// Returns the validator index (if any) for the given public key.
///
/// Information is retrieved from the present `beacon_state.validator_registry`.
@ -724,7 +748,10 @@ where
if block.slot > present_slot {
return Ok(BlockProcessingOutcome::InvalidBlock(
InvalidBlock::FutureSlot,
InvalidBlock::FutureSlot {
present_slot,
block_slot: block.slot,
},
));
}
@ -800,8 +827,9 @@ where
// run instead.
if self.head().beacon_block_root == parent_block_root {
self.update_canonical_head(block.clone(), block_root, state.clone(), state_root);
// Update the local state variable.
*self.state.write() = state;
// Update the canonical `BeaconState`.
self.update_state(state)?;
}
Ok(BlockProcessingOutcome::ValidBlock(ValidBlock::Processed))
@ -891,7 +919,10 @@ where
.ok_or_else(|| Error::MissingBeaconState(block.state_root))?;
let state_root = state.canonical_root();
self.update_canonical_head(block, block_root, state, state_root);
self.update_canonical_head(block, block_root, state.clone(), state_root);
// Update the canonical `BeaconState`.
self.update_state(state)?;
}
Ok(())

View File

@ -3,7 +3,7 @@ use types::{BeaconBlock, BeaconState, Hash256};
/// Represents some block and it's associated state. Generally, this will be used for tracking the
/// head, justified head and finalized head.
#[derive(Clone, Serialize)]
#[derive(Clone, Serialize, PartialEq, Debug)]
pub struct CheckPoint {
pub beacon_block: BeaconBlock,
pub beacon_block_root: Hash256,

View File

@ -1,5 +1,6 @@
use fork_choice::ForkChoiceError;
use state_processing::BlockProcessingError;
use state_processing::SlotProcessingError;
use types::*;
macro_rules! easy_from_to {
@ -16,14 +17,18 @@ macro_rules! easy_from_to {
pub enum BeaconChainError {
InsufficientValidators,
BadRecentBlockRoots,
UnableToReadSlot,
BeaconStateError(BeaconStateError),
DBInconsistent(String),
DBError(String),
ForkChoiceError(ForkChoiceError),
MissingBeaconBlock(Hash256),
MissingBeaconState(Hash256),
SlotProcessingError(SlotProcessingError),
}
easy_from_to!(SlotProcessingError, BeaconChainError);
#[derive(Debug, PartialEq)]
pub enum BlockProductionError {
UnableToGetBlockRootFromState,

View File

@ -131,7 +131,9 @@ impl BeaconChainHarness {
);
self.beacon_chain.slot_clock.set_slot(slot.as_u64());
self.beacon_chain.advance_state(slot).unwrap();
self.beacon_chain
.catchup_state()
.expect("Failed to catch state");
slot
}

View File

@ -430,7 +430,7 @@ impl SimpleSync {
let mut errored = 0;
// Loop through all of the complete blocks in the queue.
for (queue_index, block, sender) in self.import_queue.complete_blocks() {
for (block_root, block, sender) in self.import_queue.complete_blocks() {
match self.chain.process_block(block) {
Ok(outcome) => {
if outcome.is_invalid() {
@ -447,7 +447,7 @@ impl SimpleSync {
// If this results to true, the item will be removed from the queue.
if outcome.sucessfully_processed() {
successful += 1;
self.import_queue.partials.remove(queue_index);
self.import_queue.remove(block_root);
}
}
Err(e) => {
@ -457,13 +457,15 @@ impl SimpleSync {
}
}
info!(
self.log,
"ProcessBlocks";
"invalid" => invalid,
"successful" => successful,
"errored" => errored,
)
if successful > 0 {
info!(self.log, "Imported {} blocks", successful)
}
if invalid > 0 {
warn!(self.log, "Rejected {} invalid blocks", invalid)
}
if errored > 0 {
warn!(self.log, "Failed to process {} blocks", errored)
}
}
fn request_block_roots(
@ -548,33 +550,35 @@ impl ImportQueue {
}
}
/// Completes all possible partials into `BeaconBlock` and returns them, sorted by slot number.
/// Does not delete the partials from the queue, this must be done manually.
/// Completes all possible partials into `BeaconBlock` and returns them, sorted by increasing
/// slot number. Does not delete the partials from the queue, this must be done manually.
///
/// Returns `(queue_index, block, sender)`:
///
/// - `queue_index`: used to remove the entry if it is successfully processed.
/// - `block_root`: may be used to remove the entry if it is successfully processed.
/// - `block`: the completed block.
/// - `sender`: the `PeerId` the provided the `BeaconBlockBody` which completed the partial.
pub fn complete_blocks(&self) -> Vec<(usize, BeaconBlock, PeerId)> {
let mut completable: Vec<(usize, &PartialBeaconBlock)> = self
pub fn complete_blocks(&self) -> Vec<(Hash256, BeaconBlock, PeerId)> {
let mut complete: Vec<(Hash256, BeaconBlock, PeerId)> = self
.partials
.iter()
.enumerate()
.filter(|(_i, partial)| partial.completable())
.filter_map(|partial| partial.clone().complete())
.collect();
// Sort the completable partials to be in ascending slot order.
completable.sort_unstable_by(|a, b| a.1.header.slot.partial_cmp(&b.1.header.slot).unwrap());
complete.sort_unstable_by(|a, b| a.1.slot.partial_cmp(&b.1.slot).unwrap());
completable
complete
}
/// Removes the first `PartialBeaconBlock` with a matching `block_root`, returning the partial
/// if it exists.
pub fn remove(&mut self, block_root: Hash256) -> Option<PartialBeaconBlock> {
let position = self
.partials
.iter()
.map(|(i, partial)| {
let (block, _root, sender) =
(*partial).clone().complete().expect("Body must be Some");
(*i, block, sender)
})
.collect()
.position(|p| p.block_root == block_root)?;
Some(self.partials.remove(position))
}
/// Flushes all stale entries from the queue.
@ -716,15 +720,11 @@ pub struct PartialBeaconBlock {
}
impl PartialBeaconBlock {
pub fn completable(&self) -> bool {
self.body.is_some()
}
/// Given a `body`, consumes `self` and returns a complete `BeaconBlock` along with its root.
pub fn complete(self) -> Option<(BeaconBlock, Hash256, PeerId)> {
pub fn complete(self) -> Option<(Hash256, BeaconBlock, PeerId)> {
Some((
self.header.into_block(self.body?),
self.block_root,
self.header.into_block(self.body?),
self.sender,
))
}

View File

@ -541,6 +541,28 @@ fn sync_two_nodes() {
// A provides block bodies to B.
node_a.tee_block_body_response(&node_b);
std::thread::sleep(Duration::from_secs(60));
std::thread::sleep(Duration::from_secs(10));
node_b.harness.run_fork_choice();
let node_a_chain = node_a
.harness
.beacon_chain
.chain_dump()
.expect("Can't dump node a chain");
let node_b_chain = node_b
.harness
.beacon_chain
.chain_dump()
.expect("Can't dump node b chain");
assert_eq!(
node_a_chain.len(),
node_b_chain.len(),
"Chains should be equal length"
);
assert_eq!(node_a_chain, node_b_chain, "Chains should be identical");
runtime.shutdown_now();
}