From 755a09d164f7790cd74fda8efd3ae8d033579a88 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Sun, 24 Mar 2019 18:34:44 +1100 Subject: [PATCH] Move ImportQueue into own file --- beacon_node/network/src/sync/import_queue.rs | 232 +++++++++++++++++++ beacon_node/network/src/sync/mod.rs | 1 + beacon_node/network/src/sync/simple_sync.rs | 230 +----------------- 3 files changed, 236 insertions(+), 227 deletions(-) create mode 100644 beacon_node/network/src/sync/import_queue.rs diff --git a/beacon_node/network/src/sync/import_queue.rs b/beacon_node/network/src/sync/import_queue.rs new file mode 100644 index 000000000..6508af89e --- /dev/null +++ b/beacon_node/network/src/sync/import_queue.rs @@ -0,0 +1,232 @@ +use crate::beacon_chain::BeaconChain; +use eth2_libp2p::rpc::methods::*; +use eth2_libp2p::PeerId; +use slog::{debug, error}; +use ssz::TreeHash; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use types::{BeaconBlock, BeaconBlockBody, BeaconBlockHeader, Hash256}; + +/// Provides a queue for fully and partially built `BeaconBlock`s. +/// +/// The queue is fundamentally a `Vec` where no two items have the same +/// `item.block_root`. This struct it backed by a `Vec` not a `HashMap` for the following two +/// reasons: +/// +/// - When we receive a `BeaconBlockBody`, the only way we can find it's matching +/// `BeaconBlockHeader` is to find a header such that `header.beacon_block_body == +/// hash_tree_root(body)`. Therefore, if we used a `HashMap` we would need to use the root of +/// `BeaconBlockBody` as the key. +/// - It is possible for multiple distinct blocks to have identical `BeaconBlockBodies`. Therefore +/// we cannot use a `HashMap` keyed by the root of `BeaconBlockBody`. +pub struct ImportQueue { + pub chain: Arc, + /// Partially imported blocks, keyed by the root of `BeaconBlockBody`. + pub partials: Vec, + /// Time before a queue entry is considered state. + pub stale_time: Duration, + /// Logging + log: slog::Logger, +} + +impl ImportQueue { + /// Return a new, empty queue. + pub fn new(chain: Arc, stale_time: Duration, log: slog::Logger) -> Self { + Self { + chain, + partials: vec![], + stale_time, + log, + } + } + + /// Completes all possible partials into `BeaconBlock` and returns them, sorted by increasing + /// slot number. Does not delete the partials from the queue, this must be done manually. + /// + /// Returns `(queue_index, block, sender)`: + /// + /// - `block_root`: may be used to remove the entry if it is successfully processed. + /// - `block`: the completed block. + /// - `sender`: the `PeerId` the provided the `BeaconBlockBody` which completed the partial. + pub fn complete_blocks(&self) -> Vec<(Hash256, BeaconBlock, PeerId)> { + let mut complete: Vec<(Hash256, BeaconBlock, PeerId)> = self + .partials + .iter() + .filter_map(|partial| partial.clone().complete()) + .collect(); + + // Sort the completable partials to be in ascending slot order. + complete.sort_unstable_by(|a, b| a.1.slot.partial_cmp(&b.1.slot).unwrap()); + + complete + } + + /// Removes the first `PartialBeaconBlock` with a matching `block_root`, returning the partial + /// if it exists. + pub fn remove(&mut self, block_root: Hash256) -> Option { + let position = self + .partials + .iter() + .position(|p| p.block_root == block_root)?; + Some(self.partials.remove(position)) + } + + /// Flushes all stale entries from the queue. + /// + /// An entry is stale if it has as a `inserted` time that is more than `self.stale_time` in the + /// past. + pub fn remove_stale(&mut self) { + let stale_indices: Vec = self + .partials + .iter() + .enumerate() + .filter_map(|(i, partial)| { + if partial.inserted + self.stale_time <= Instant::now() { + Some(i) + } else { + None + } + }) + .collect(); + + if !stale_indices.is_empty() { + debug!( + self.log, + "ImportQueue removing stale entries"; + "stale_items" => stale_indices.len(), + "stale_time_seconds" => self.stale_time.as_secs() + ); + } + + stale_indices.iter().for_each(|&i| { + self.partials.remove(i); + }); + } + + /// Returns `true` if `self.chain` has not yet processed this block. + fn is_new_block(&self, block_root: &Hash256) -> bool { + self.chain + .is_new_block_root(&block_root) + .unwrap_or_else(|_| { + error!(self.log, "Unable to determine if block is new."); + true + }) + } + + /// Returns the index of the first new root in the list of block roots. + pub fn first_new_root(&mut self, roots: &[BlockRootSlot]) -> Option { + roots + .iter() + .position(|brs| self.is_new_block(&brs.block_root)) + } + + /// Adds the `headers` to the `partials` queue. Returns a list of `Hash256` block roots for + /// which we should use to request `BeaconBlockBodies`. + /// + /// If a `header` is not in the queue and has not been processed by the chain it is added to + /// the queue and it's block root is included in the output. + /// + /// If a `header` is already in the queue, but not yet processed by the chain the block root is + /// included in the output and the `inserted` time for the partial record is set to + /// `Instant::now()`. Updating the `inserted` time stops the partial from becoming stale. + /// + /// Presently the queue enforces that a `BeaconBlockHeader` _must_ be received before its + /// `BeaconBlockBody`. This is not a natural requirement and we could enhance the queue to lift + /// this restraint. + pub fn enqueue_headers( + &mut self, + headers: Vec, + sender: PeerId, + ) -> Vec { + let mut required_bodies: Vec = vec![]; + + for header in headers { + let block_root = Hash256::from_slice(&header.hash_tree_root()[..]); + + if self.is_new_block(&block_root) { + self.insert_header(block_root, header, sender.clone()); + required_bodies.push(block_root) + } + } + + required_bodies + } + + /// If there is a matching `header` for this `body`, adds it to the queue. + /// + /// If there is no `header` for the `body`, the body is simply discarded. + pub fn enqueue_bodies(&mut self, bodies: Vec, sender: PeerId) { + for body in bodies { + self.insert_body(body, sender.clone()); + } + } + + /// Inserts a header to the queue. + /// + /// If the header already exists, the `inserted` time is set to `now` and not other + /// modifications are made. + fn insert_header(&mut self, block_root: Hash256, header: BeaconBlockHeader, sender: PeerId) { + if let Some(i) = self + .partials + .iter() + .position(|p| p.block_root == block_root) + { + self.partials[i].inserted = Instant::now(); + } else { + self.partials.push(PartialBeaconBlock { + block_root, + header, + body: None, + inserted: Instant::now(), + sender, + }) + } + } + + /// Updates an existing partial with the `body`. + /// + /// If there is no header for the `body`, the body is simply discarded. + /// + /// If the body already existed, the `inserted` time is set to `now`. + fn insert_body(&mut self, body: BeaconBlockBody, sender: PeerId) { + let body_root = Hash256::from_slice(&body.hash_tree_root()[..]); + + self.partials.iter_mut().for_each(|mut p| { + if body_root == p.header.block_body_root { + p.inserted = Instant::now(); + + if p.body.is_none() { + p.body = Some(body.clone()); + p.sender = sender.clone(); + } + } + }); + } +} + +/// Individual components of a `BeaconBlock`, potentially all that are required to form a full +/// `BeaconBlock`. +#[derive(Clone, Debug)] +pub struct PartialBeaconBlock { + /// `BeaconBlock` root. + pub block_root: Hash256, + pub header: BeaconBlockHeader, + pub body: Option, + /// The instant at which this record was created or last meaningfully modified. Used to + /// determine if an entry is stale and should be removed. + pub inserted: Instant, + /// The `PeerId` that last meaningfully contributed to this item. + pub sender: PeerId, +} + +impl PartialBeaconBlock { + /// Consumes `self` and returns a full built `BeaconBlock`, it's root and the `sender` + /// `PeerId`, if enough information exists to complete the block. Otherwise, returns `None`. + pub fn complete(self) -> Option<(Hash256, BeaconBlock, PeerId)> { + Some(( + self.block_root, + self.header.into_block(self.body?), + self.sender, + )) + } +} diff --git a/beacon_node/network/src/sync/mod.rs b/beacon_node/network/src/sync/mod.rs index 8f5216b85..fac1b46eb 100644 --- a/beacon_node/network/src/sync/mod.rs +++ b/beacon_node/network/src/sync/mod.rs @@ -1,3 +1,4 @@ +mod import_queue; /// Syncing for lighthouse. /// /// Stores the various syncing methods for the beacon chain. diff --git a/beacon_node/network/src/sync/simple_sync.rs b/beacon_node/network/src/sync/simple_sync.rs index d6b9e63ae..c23a6ec56 100644 --- a/beacon_node/network/src/sync/simple_sync.rs +++ b/beacon_node/network/src/sync/simple_sync.rs @@ -1,14 +1,14 @@ +use super::import_queue::ImportQueue; use crate::beacon_chain::BeaconChain; use crate::message_handler::NetworkContext; use eth2_libp2p::rpc::methods::*; use eth2_libp2p::rpc::{RPCRequest, RPCResponse}; use eth2_libp2p::PeerId; use slog::{debug, error, info, o, warn}; -use ssz::TreeHash; use std::collections::HashMap; use std::sync::Arc; -use std::time::{Duration, Instant}; -use types::{BeaconBlock, BeaconBlockBody, BeaconBlockHeader, Epoch, Hash256, Slot}; +use std::time::Duration; +use types::{Epoch, Hash256, Slot}; /// The number of slots that we can import blocks ahead of us, before going into full Sync mode. const SLOT_IMPORT_TOLERANCE: u64 = 100; @@ -578,227 +578,3 @@ impl SimpleSync { self.chain.hello_message() } } - -/// Provides a queue for fully and partially built `BeaconBlock`s. -/// -/// The queue is fundamentally a `Vec` where no two items have the same -/// `item.block_root`. This struct it backed by a `Vec` not a `HashMap` for the following two -/// reasons: -/// -/// - When we receive a `BeaconBlockBody`, the only way we can find it's matching -/// `BeaconBlockHeader` is to find a header such that `header.beacon_block_body == -/// hash_tree_root(body)`. Therefore, if we used a `HashMap` we would need to use the root of -/// `BeaconBlockBody` as the key. -/// - It is possible for multiple distinct blocks to have identical `BeaconBlockBodies`. Therefore -/// we cannot use a `HashMap` keyed by the root of `BeaconBlockBody`. -pub struct ImportQueue { - pub chain: Arc, - /// Partially imported blocks, keyed by the root of `BeaconBlockBody`. - pub partials: Vec, - /// Time before a queue entry is considered state. - pub stale_time: Duration, - /// Logging - log: slog::Logger, -} - -impl ImportQueue { - /// Return a new, empty queue. - pub fn new(chain: Arc, stale_time: Duration, log: slog::Logger) -> Self { - Self { - chain, - partials: vec![], - stale_time, - log, - } - } - - /// Completes all possible partials into `BeaconBlock` and returns them, sorted by increasing - /// slot number. Does not delete the partials from the queue, this must be done manually. - /// - /// Returns `(queue_index, block, sender)`: - /// - /// - `block_root`: may be used to remove the entry if it is successfully processed. - /// - `block`: the completed block. - /// - `sender`: the `PeerId` the provided the `BeaconBlockBody` which completed the partial. - pub fn complete_blocks(&self) -> Vec<(Hash256, BeaconBlock, PeerId)> { - let mut complete: Vec<(Hash256, BeaconBlock, PeerId)> = self - .partials - .iter() - .filter_map(|partial| partial.clone().complete()) - .collect(); - - // Sort the completable partials to be in ascending slot order. - complete.sort_unstable_by(|a, b| a.1.slot.partial_cmp(&b.1.slot).unwrap()); - - complete - } - - /// Removes the first `PartialBeaconBlock` with a matching `block_root`, returning the partial - /// if it exists. - pub fn remove(&mut self, block_root: Hash256) -> Option { - let position = self - .partials - .iter() - .position(|p| p.block_root == block_root)?; - Some(self.partials.remove(position)) - } - - /// Flushes all stale entries from the queue. - /// - /// An entry is stale if it has as a `inserted` time that is more than `self.stale_time` in the - /// past. - pub fn remove_stale(&mut self) { - let stale_indices: Vec = self - .partials - .iter() - .enumerate() - .filter_map(|(i, partial)| { - if partial.inserted + self.stale_time <= Instant::now() { - Some(i) - } else { - None - } - }) - .collect(); - - if !stale_indices.is_empty() { - debug!( - self.log, - "ImportQueue removing stale entries"; - "stale_items" => stale_indices.len(), - "stale_time_seconds" => self.stale_time.as_secs() - ); - } - - stale_indices.iter().for_each(|&i| { - self.partials.remove(i); - }); - } - - /// Returns `true` if `self.chain` has not yet processed this block. - fn is_new_block(&self, block_root: &Hash256) -> bool { - self.chain - .is_new_block_root(&block_root) - .unwrap_or_else(|_| { - error!(self.log, "Unable to determine if block is new."); - true - }) - } - - /// Returns the index of the first new root in the list of block roots. - pub fn first_new_root(&mut self, roots: &[BlockRootSlot]) -> Option { - roots - .iter() - .position(|brs| self.is_new_block(&brs.block_root)) - } - - /// Adds the `headers` to the `partials` queue. Returns a list of `Hash256` block roots for - /// which we should use to request `BeaconBlockBodies`. - /// - /// If a `header` is not in the queue and has not been processed by the chain it is added to - /// the queue and it's block root is included in the output. - /// - /// If a `header` is already in the queue, but not yet processed by the chain the block root is - /// included in the output and the `inserted` time for the partial record is set to - /// `Instant::now()`. Updating the `inserted` time stops the partial from becoming stale. - /// - /// Presently the queue enforces that a `BeaconBlockHeader` _must_ be received before its - /// `BeaconBlockBody`. This is not a natural requirement and we could enhance the queue to lift - /// this restraint. - pub fn enqueue_headers( - &mut self, - headers: Vec, - sender: PeerId, - ) -> Vec { - let mut required_bodies: Vec = vec![]; - - for header in headers { - let block_root = Hash256::from_slice(&header.hash_tree_root()[..]); - - if self.is_new_block(&block_root) { - self.insert_header(block_root, header, sender.clone()); - required_bodies.push(block_root) - } - } - - required_bodies - } - - /// If there is a matching `header` for this `body`, adds it to the queue. - /// - /// If there is no `header` for the `body`, the body is simply discarded. - pub fn enqueue_bodies(&mut self, bodies: Vec, sender: PeerId) { - for body in bodies { - self.insert_body(body, sender.clone()); - } - } - - /// Inserts a header to the queue. - /// - /// If the header already exists, the `inserted` time is set to `now` and not other - /// modifications are made. - fn insert_header(&mut self, block_root: Hash256, header: BeaconBlockHeader, sender: PeerId) { - if let Some(i) = self - .partials - .iter() - .position(|p| p.block_root == block_root) - { - self.partials[i].inserted = Instant::now(); - } else { - self.partials.push(PartialBeaconBlock { - block_root, - header, - body: None, - inserted: Instant::now(), - sender, - }) - } - } - - /// Updates an existing partial with the `body`. - /// - /// If there is no header for the `body`, the body is simply discarded. - /// - /// If the body already existed, the `inserted` time is set to `now`. - fn insert_body(&mut self, body: BeaconBlockBody, sender: PeerId) { - let body_root = Hash256::from_slice(&body.hash_tree_root()[..]); - - self.partials.iter_mut().for_each(|mut p| { - if body_root == p.header.block_body_root { - p.inserted = Instant::now(); - - if p.body.is_none() { - p.body = Some(body.clone()); - p.sender = sender.clone(); - } - } - }); - } -} - -/// Individual components of a `BeaconBlock`, potentially all that are required to form a full -/// `BeaconBlock`. -#[derive(Clone, Debug)] -pub struct PartialBeaconBlock { - /// `BeaconBlock` root. - pub block_root: Hash256, - pub header: BeaconBlockHeader, - pub body: Option, - /// The instant at which this record was created or last meaningfully modified. Used to - /// determine if an entry is stale and should be removed. - pub inserted: Instant, - /// The `PeerId` that last meaningfully contributed to this item. - pub sender: PeerId, -} - -impl PartialBeaconBlock { - /// Consumes `self` and returns a full built `BeaconBlock`, it's root and the `sender` - /// `PeerId`, if enough information exists to complete the block. Otherwise, returns `None`. - pub fn complete(self) -> Option<(Hash256, BeaconBlock, PeerId)> { - Some(( - self.block_root, - self.header.into_block(self.body?), - self.sender, - )) - } -}