From 74baeb4d08a958890b85a53ed93c6a382623b60b Mon Sep 17 00:00:00 2001 From: Age Manning Date: Mon, 2 Sep 2019 05:38:11 +1000 Subject: [PATCH] WIP - Upgrade Sync algorithm --- beacon_node/network/Cargo.toml | 1 + beacon_node/network/src/sync/manager.rs | 188 +++++++++++++++++--- beacon_node/network/src/sync/simple_sync.rs | 23 ++- 3 files changed, 180 insertions(+), 32 deletions(-) diff --git a/beacon_node/network/Cargo.toml b/beacon_node/network/Cargo.toml index dc08bd311..06fc06dde 100644 --- a/beacon_node/network/Cargo.toml +++ b/beacon_node/network/Cargo.toml @@ -19,3 +19,4 @@ futures = "0.1.25" error-chain = "0.12.0" tokio = "0.1.16" parking_lot = "0.9.0" +smallvec = "0.6.10" diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index b81da0991..9b2d780f4 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -1,33 +1,110 @@ +//! The `ImportManager` facilities the block syncing logic of lighthouse. The current networking +//! specification provides two methods from which to obtain blocks from peers. The `BeaconBlocks` +//! request and the `RecentBeaconBlocks` request. The former is used to obtain a large number of +//! blocks and the latter allows for searching for blocks given a block-hash. +//! +//! These two RPC methods are designed for two type of syncing. +//! - Long range (batch) sync, when a client is out of date and needs to the latest head. +//! - Parent lookup - when a peer provides us a block whose parent is unknown to us. +//! +//! Both of these syncing strategies are built into the `ImportManager`. +//! +//! +//! Currently the long-range (batch) syncing method functions by opportunistically downloading +//! batches blocks from all peers who know about a chain that we do not. When a new peer connects +//! which has a later head that is greater than `SLOT_IMPORT_TOLERANCE` from our current head slot, +//! the manager's state becomes `Syncing` and begins a batch syncing process with this peer. If +//! further peers connect, this process is run in parallel with those peers, until our head is +//! within `SLOT_IMPORT_TOLERANCE` of all connected peers. +//! +//! Batch Syncing +//! +//! This syncing process start by requesting `MAX_BLOCKS_PER_REQUEST` blocks from a peer with an +//! unknown chain (with a greater slot height) starting from our current head slot. If the earliest +//! block returned is known to us, then the group of blocks returned form part of a known chain, +//! and we process this batch of blocks, before requesting more batches forward and processing +//! those in turn until we reach the peer's chain's head. If the first batch doesn't contain a +//! block we know of, we must iteratively request blocks backwards (until our latest finalized head +//! slot) until we find a common ancestor before we can start processing the blocks. If no common +//! ancestor is found, the peer has a chain which is not part of our finalized head slot and we +//! drop the peer and the downloaded blocks. +//! Once we are fully synced with all known peers, the state of the manager becomes `Regular` which +//! then allows for parent lookups of propagated blocks. +//! +//! A schematic version of this logic with two chain variations looks like the following. +//! +//! |----------------------|---------------------------------| +//! ^finalized head ^current local head ^remotes head +//! +//! +//! An example of the remotes chain diverging before our current head. +//! |---------------------------| +//! ^---------------------------------------------| +//! ^chain diverges |initial batch| ^remotes head +//! +//! In this example, we cannot process the initial batch as it is not on a known chain. We must +//! then backwards sync until we reach a common chain to begin forwarding batch syncing. +//! +//! +//! Parent Lookup +//! +//! When a block with an unknown parent is received and we are in `Regular` sync mode, the block is +//! queued for lookup. A round-robin approach is used to request the parent from the known list of +//! fully sync'd peers. If `PARENT_FAIL_TOLERANCE` attempts at requesting the block fails, we +//! drop the propagated block and downvote the peer that sent it to us. + use super::simple_sync::{PeerSyncInfo, FUTURE_SLOT_TOLERANCE}; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome}; use eth2_libp2p::rpc::methods::*; use eth2_libp2p::rpc::RequestId; use eth2_libp2p::PeerId; use slog::{debug, info, trace, warn, Logger}; +use smallvec::SmallVec; use std::collections::{HashMap, HashSet}; use std::ops::{Add, Sub}; -use std::sync::Arc; +use std::sync::{Arc, Weak}; use types::{BeaconBlock, EthSpec, Hash256, Slot}; +/// Blocks are downloaded in batches from peers. This constant specifies how many blocks per batch +/// is requested. Currently the value is small for testing. This will be incremented for +/// production. const MAX_BLOCKS_PER_REQUEST: u64 = 10; -/// The number of slots that we can import blocks ahead of us, before going into full Sync mode. +/// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync +/// from a peer. If a peer is within this tolerance (forwards or backwards), it is treated as a +/// fully sync'd peer. const SLOT_IMPORT_TOLERANCE: usize = 10; +/// How many attempts we try to find a parent of a block before we give up trying . const PARENT_FAIL_TOLERANCE: usize = 3; +/// The maximum depth we will search for a parent block. In principle we should have sync'd any +/// canonical chain to its head once the peer connects. A chain should not appear where it's depth +/// is further back than the most recent head slot. const PARENT_DEPTH_TOLERANCE: usize = SLOT_IMPORT_TOLERANCE * 2; #[derive(PartialEq)] +/// The current state of a block or batches lookup. enum BlockRequestsState { + /// The object is queued to be downloaded from a peer but has not yet been requested. Queued, + /// The batch or parent has been requested with the `RequestId` and we are awaiting a response. Pending(RequestId), - Complete, + /// The downloaded blocks are ready to be processed by the beacon chain. For a batch process + /// this means we have found a common chain. + ReadyToProcess, + /// A failure has occurred and we will drop and downvote the peer that caused the request. Failed, } +/// `BlockRequests` keep track of the long-range (batch) sync process per peer. struct BlockRequests { + /// The peer's head slot and the target of this batch download. target_head_slot: Slot, + /// The peer's head root, used to specify which chain of blocks we are downloading from the + /// blocks. target_head_root: Hash256, + /// The blocks that we have currently downloaded from the peer that are yet to be processed. downloaded_blocks: Vec>, + /// The current state of this batch request. state: BlockRequestsState, /// Specifies whether the current state is syncing forwards or backwards. forward_sync: bool, @@ -35,16 +112,22 @@ struct BlockRequests { current_start_slot: Slot, } +/// Maintains a sequential list of parents to lookup and the lookup's current state. struct ParentRequests { + /// The blocks that have currently been downloaded. downloaded_blocks: Vec>, + /// The number of failed attempts to retrieve a parent block. If too many attempts occur, this + /// lookup is failed and rejected. failed_attempts: usize, - last_submitted_peer: PeerId, // to downvote the submitting peer. + /// The peer who last submitted a block. If the chain ends or fails, this is the peer that is + /// downvoted. + last_submitted_peer: PeerId, + /// The current state of the parent lookup. state: BlockRequestsState, } impl BlockRequests { - // gets the start slot for next batch - // last block slot downloaded plus 1 + /// Gets the next start slot for a batch and transitions the state to a Queued state. fn update_start_slot(&mut self) { if self.forward_sync { self.current_start_slot += Slot::from(MAX_BLOCKS_PER_REQUEST); @@ -56,58 +139,104 @@ impl BlockRequests { } #[derive(PartialEq, Debug, Clone)] +/// The current state of the `ImportManager`. enum ManagerState { + /// The manager is performing a long-range (batch) sync. In this mode, parent lookups are + /// disabled. Syncing, + /// The manager is up to date with all known peers and is connected to at least one + /// fully-syncing peer. In this state, parent lookups are enabled. Regular, + /// No useful peers are connected. Long-range sync's cannot proceed and we have no useful + /// peers to download parents for. More peers need to be connected before we can proceed. Stalled, } +/// The output states that can occur from driving (polling) the manager state machine. pub(crate) enum ImportManagerOutcome { + /// There is no further work to complete. The manager is waiting for further input. Idle, + /// A `BeaconBlocks` request is required. RequestBlocks { peer_id: PeerId, request_id: RequestId, request: BeaconBlocksRequest, }, + /// A `RecentBeaconBlocks` request is required. + RecentRequest(PeerId, RecentBeaconBlocksRequest), /// Updates information with peer via requesting another HELLO handshake. Hello(PeerId), - RecentRequest(PeerId, RecentBeaconBlocksRequest), + /// A peer has caused a punishable error and should be downvoted. DownvotePeer(PeerId), } +/// The primary object for handling and driving all the current syncing logic. It maintains the +/// current state of the syncing process, the number of useful peers, downloaded blocks and +/// controls the logic behind both the long-range (batch) sync and the on-going potential parent +/// look-up of blocks. pub struct ImportManager { - /// A reference to the underlying beacon chain. - chain: Arc>, + /// A weak reference to the underlying beacon chain. + chain: Weak>, + /// The current state of the import manager. state: ManagerState, + /// A collection of `BlockRequest` per peer that is currently being downloaded. Used in the + /// long-range (batch) sync process. import_queue: HashMap>, - parent_queue: Vec>, + /// A collection of parent block lookups. + parent_queue: SmallVec<[ParentRequests; 3]>, + /// The collection of known, connected, fully-sync'd peers. full_peers: HashSet, + /// The current request Id. This is used to keep track of responses to various outbound + /// requests. This is an internal accounting mechanism, request id's are never sent to any + /// peers. current_req_id: usize, + /// The logger for the import manager. log: Logger, } impl ImportManager { + /// Generates a new `ImportManager` given a logger and an Arc reference to a beacon chain. The + /// import manager keeps a weak reference to the beacon chain, which allows the chain to be + /// dropped during the syncing process. The syncing handles this termination gracefully. pub fn new(beacon_chain: Arc>, log: &slog::Logger) -> Self { ImportManager { - chain: beacon_chain.clone(), + chain: Arc::downgrade(&beacon_chain), state: ManagerState::Regular, import_queue: HashMap::new(), - parent_queue: Vec::new(), + parent_queue: SmallVec::new(), full_peers: HashSet::new(), current_req_id: 0, log: log.clone(), } } + /// A peer has connected which has blocks that are unknown to us. + /// + /// This function handles the logic associated with the connection of a new peer. If the peer + /// is sufficiently ahead of our current head, a long-range (batch) sync is started and + /// batches of blocks are queued to download from the peer. Batched blocks begin at our + /// current head. If the resulting downloaded blocks are part of our current chain, we + /// continue with a forward sync. If not, we download blocks (in batches) backwards until we + /// reach a common ancestor. Batches are then processed and downloaded sequentially forwards. + /// + /// If the peer is within the `SLOT_IMPORT_TOLERANCE`, then it's head is sufficiently close to + /// ours that we consider it fully sync'd with respect to our current chain. pub fn add_peer(&mut self, peer_id: PeerId, remote: PeerSyncInfo) { - // TODO: Improve comments. - // initially try to download blocks from our current head - // then backwards search all the way back to our finalized epoch until we match on a chain - // has to be done sequentially to find next slot to start the batch from + // ensure the beacon chain still exists + let chain = match self.chain.upgrade() { + Some(chain) => chain, + None => { + warn!(self.log, + "Beacon chain dropped. Peer not considered for sync"; + "peer_id" => format!("{:?}", peer_id)); + return; + } + }; - let local = PeerSyncInfo::from(&self.chain); + let local = PeerSyncInfo::from(&chain); - // If a peer is within SLOT_IMPORT_TOLERANCE from our head slot, ignore a batch sync + // If a peer is within SLOT_IMPORT_TOLERANCE from our head slot, ignore a batch sync, + // consider it a fully-sync'd peer. if remote.head_slot.sub(local.head_slot).as_usize() < SLOT_IMPORT_TOLERANCE { trace!(self.log, "Ignoring full sync with peer"; "peer" => format!("{:?}", peer_id), @@ -116,34 +245,53 @@ impl ImportManager { ); // remove the peer from the queue if it exists self.import_queue.remove(&peer_id); + self.add_full_peer(peer_id); + // return; } + // Check if the peer is significantly is behind us. If within `SLOT_IMPORT_TOLERANCE` + // treat them as a fully synced peer. If not, ignore them in the sync process + if local.head_slot.sub(remote.head_slot).as_usize() < SLOT_IMPORT_TOLERANCE { + self.add_full_peer(peer_id); + } else { + debug!( + self.log, + "Out of sync peer connected"; + "peer" => format!("{:?}", peer_id), + ); + return; + } + + // Check if we are already downloading blocks from this peer, if so update, if not set up + // a new request structure if let Some(block_requests) = self.import_queue.get_mut(&peer_id) { // update the target head slot if remote.head_slot > block_requests.target_head_slot { block_requests.target_head_slot = remote.head_slot; } } else { + // not already downloading blocks from this peer let block_requests = BlockRequests { target_head_slot: remote.head_slot, // this should be larger than the current head. It is checked in the SyncManager before add_peer is called target_head_root: remote.head_root, downloaded_blocks: Vec::new(), state: BlockRequestsState::Queued, forward_sync: true, - current_start_slot: self.chain.best_slot(), + current_start_slot: chain.best_slot(), }; self.import_queue.insert(peer_id, block_requests); } } + /// A `BeaconBlocks` request has received a response. This function process the response. pub fn beacon_blocks_response( &mut self, peer_id: PeerId, request_id: RequestId, mut blocks: Vec>, ) { - // find the request + // find the request associated with this response let block_requests = match self .import_queue .get_mut(&peer_id) diff --git a/beacon_node/network/src/sync/simple_sync.rs b/beacon_node/network/src/sync/simple_sync.rs index 573ac9dd1..dd857d8c3 100644 --- a/beacon_node/network/src/sync/simple_sync.rs +++ b/beacon_node/network/src/sync/simple_sync.rs @@ -16,8 +16,6 @@ use types::{Attestation, BeaconBlock, Epoch, EthSpec, Hash256, Slot}; /// Otherwise we queue it. pub(crate) const FUTURE_SLOT_TOLERANCE: u64 = 1; -/// The number of slots behind our head that we still treat a peer as a fully synced peer. -const FULL_PEER_TOLERANCE: u64 = 10; const SHOULD_FORWARD_GOSSIP_BLOCK: bool = true; const SHOULD_NOT_FORWARD_GOSSIP_BLOCK: bool = false; @@ -189,18 +187,17 @@ impl SimpleSync { .exists::>(&remote.head_root) .unwrap_or_else(|_| false) { + trace!( + self.log, "Out of date or potentially sync'd peer found"; + "peer" => format!("{:?}", peer_id), + "remote_head_slot" => remote.head_slot + "remote_latest_finalized_epoch" => remote.finalized_epoch, + ); + // If the node's best-block is already known to us and they are close to our current // head, treat them as a fully sync'd peer. - if self.chain.best_slot().sub(remote.head_slot).as_u64() < FULL_PEER_TOLERANCE { - self.manager.add_full_peer(peer_id); - self.process_sync(); - } else { - debug!( - self.log, - "Out of sync peer connected"; - "peer" => format!("{:?}", peer_id), - ); - } + self.manager.add_peer(peer_id, remote); + self.process_sync(); } else { // The remote node has an equal or great finalized epoch and we don't know it's head. // @@ -218,6 +215,8 @@ impl SimpleSync { } } + /// This function drives the `ImportManager` state machine. The outcomes it provides are + /// actioned until the `ImportManager` is idle. fn process_sync(&mut self) { loop { match self.manager.poll() {