WIP - Upgrade Sync algorithm

This commit is contained in:
Age Manning 2019-09-02 05:38:11 +10:00
parent 192380cb58
commit 74baeb4d08
No known key found for this signature in database
GPG Key ID: 05EED64B79E06A93
3 changed files with 180 additions and 32 deletions

View File

@ -19,3 +19,4 @@ futures = "0.1.25"
error-chain = "0.12.0" error-chain = "0.12.0"
tokio = "0.1.16" tokio = "0.1.16"
parking_lot = "0.9.0" parking_lot = "0.9.0"
smallvec = "0.6.10"

View File

@ -1,33 +1,110 @@
//! The `ImportManager` facilities the block syncing logic of lighthouse. The current networking
//! specification provides two methods from which to obtain blocks from peers. The `BeaconBlocks`
//! request and the `RecentBeaconBlocks` request. The former is used to obtain a large number of
//! blocks and the latter allows for searching for blocks given a block-hash.
//!
//! These two RPC methods are designed for two type of syncing.
//! - Long range (batch) sync, when a client is out of date and needs to the latest head.
//! - Parent lookup - when a peer provides us a block whose parent is unknown to us.
//!
//! Both of these syncing strategies are built into the `ImportManager`.
//!
//!
//! Currently the long-range (batch) syncing method functions by opportunistically downloading
//! batches blocks from all peers who know about a chain that we do not. When a new peer connects
//! which has a later head that is greater than `SLOT_IMPORT_TOLERANCE` from our current head slot,
//! the manager's state becomes `Syncing` and begins a batch syncing process with this peer. If
//! further peers connect, this process is run in parallel with those peers, until our head is
//! within `SLOT_IMPORT_TOLERANCE` of all connected peers.
//!
//! Batch Syncing
//!
//! This syncing process start by requesting `MAX_BLOCKS_PER_REQUEST` blocks from a peer with an
//! unknown chain (with a greater slot height) starting from our current head slot. If the earliest
//! block returned is known to us, then the group of blocks returned form part of a known chain,
//! and we process this batch of blocks, before requesting more batches forward and processing
//! those in turn until we reach the peer's chain's head. If the first batch doesn't contain a
//! block we know of, we must iteratively request blocks backwards (until our latest finalized head
//! slot) until we find a common ancestor before we can start processing the blocks. If no common
//! ancestor is found, the peer has a chain which is not part of our finalized head slot and we
//! drop the peer and the downloaded blocks.
//! Once we are fully synced with all known peers, the state of the manager becomes `Regular` which
//! then allows for parent lookups of propagated blocks.
//!
//! A schematic version of this logic with two chain variations looks like the following.
//!
//! |----------------------|---------------------------------|
//! ^finalized head ^current local head ^remotes head
//!
//!
//! An example of the remotes chain diverging before our current head.
//! |---------------------------|
//! ^---------------------------------------------|
//! ^chain diverges |initial batch| ^remotes head
//!
//! In this example, we cannot process the initial batch as it is not on a known chain. We must
//! then backwards sync until we reach a common chain to begin forwarding batch syncing.
//!
//!
//! Parent Lookup
//!
//! When a block with an unknown parent is received and we are in `Regular` sync mode, the block is
//! queued for lookup. A round-robin approach is used to request the parent from the known list of
//! fully sync'd peers. If `PARENT_FAIL_TOLERANCE` attempts at requesting the block fails, we
//! drop the propagated block and downvote the peer that sent it to us.
use super::simple_sync::{PeerSyncInfo, FUTURE_SLOT_TOLERANCE}; use super::simple_sync::{PeerSyncInfo, FUTURE_SLOT_TOLERANCE};
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome}; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome};
use eth2_libp2p::rpc::methods::*; use eth2_libp2p::rpc::methods::*;
use eth2_libp2p::rpc::RequestId; use eth2_libp2p::rpc::RequestId;
use eth2_libp2p::PeerId; use eth2_libp2p::PeerId;
use slog::{debug, info, trace, warn, Logger}; use slog::{debug, info, trace, warn, Logger};
use smallvec::SmallVec;
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::ops::{Add, Sub}; use std::ops::{Add, Sub};
use std::sync::Arc; use std::sync::{Arc, Weak};
use types::{BeaconBlock, EthSpec, Hash256, Slot}; use types::{BeaconBlock, EthSpec, Hash256, Slot};
/// Blocks are downloaded in batches from peers. This constant specifies how many blocks per batch
/// is requested. Currently the value is small for testing. This will be incremented for
/// production.
const MAX_BLOCKS_PER_REQUEST: u64 = 10; const MAX_BLOCKS_PER_REQUEST: u64 = 10;
/// The number of slots that we can import blocks ahead of us, before going into full Sync mode. /// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync
/// from a peer. If a peer is within this tolerance (forwards or backwards), it is treated as a
/// fully sync'd peer.
const SLOT_IMPORT_TOLERANCE: usize = 10; const SLOT_IMPORT_TOLERANCE: usize = 10;
/// How many attempts we try to find a parent of a block before we give up trying .
const PARENT_FAIL_TOLERANCE: usize = 3; const PARENT_FAIL_TOLERANCE: usize = 3;
/// The maximum depth we will search for a parent block. In principle we should have sync'd any
/// canonical chain to its head once the peer connects. A chain should not appear where it's depth
/// is further back than the most recent head slot.
const PARENT_DEPTH_TOLERANCE: usize = SLOT_IMPORT_TOLERANCE * 2; const PARENT_DEPTH_TOLERANCE: usize = SLOT_IMPORT_TOLERANCE * 2;
#[derive(PartialEq)] #[derive(PartialEq)]
/// The current state of a block or batches lookup.
enum BlockRequestsState { enum BlockRequestsState {
/// The object is queued to be downloaded from a peer but has not yet been requested.
Queued, Queued,
/// The batch or parent has been requested with the `RequestId` and we are awaiting a response.
Pending(RequestId), Pending(RequestId),
Complete, /// The downloaded blocks are ready to be processed by the beacon chain. For a batch process
/// this means we have found a common chain.
ReadyToProcess,
/// A failure has occurred and we will drop and downvote the peer that caused the request.
Failed, Failed,
} }
/// `BlockRequests` keep track of the long-range (batch) sync process per peer.
struct BlockRequests<T: EthSpec> { struct BlockRequests<T: EthSpec> {
/// The peer's head slot and the target of this batch download.
target_head_slot: Slot, target_head_slot: Slot,
/// The peer's head root, used to specify which chain of blocks we are downloading from the
/// blocks.
target_head_root: Hash256, target_head_root: Hash256,
/// The blocks that we have currently downloaded from the peer that are yet to be processed.
downloaded_blocks: Vec<BeaconBlock<T>>, downloaded_blocks: Vec<BeaconBlock<T>>,
/// The current state of this batch request.
state: BlockRequestsState, state: BlockRequestsState,
/// Specifies whether the current state is syncing forwards or backwards. /// Specifies whether the current state is syncing forwards or backwards.
forward_sync: bool, forward_sync: bool,
@ -35,16 +112,22 @@ struct BlockRequests<T: EthSpec> {
current_start_slot: Slot, current_start_slot: Slot,
} }
/// Maintains a sequential list of parents to lookup and the lookup's current state.
struct ParentRequests<T: EthSpec> { struct ParentRequests<T: EthSpec> {
/// The blocks that have currently been downloaded.
downloaded_blocks: Vec<BeaconBlock<T>>, downloaded_blocks: Vec<BeaconBlock<T>>,
/// The number of failed attempts to retrieve a parent block. If too many attempts occur, this
/// lookup is failed and rejected.
failed_attempts: usize, failed_attempts: usize,
last_submitted_peer: PeerId, // to downvote the submitting peer. /// The peer who last submitted a block. If the chain ends or fails, this is the peer that is
/// downvoted.
last_submitted_peer: PeerId,
/// The current state of the parent lookup.
state: BlockRequestsState, state: BlockRequestsState,
} }
impl<T: EthSpec> BlockRequests<T> { impl<T: EthSpec> BlockRequests<T> {
// gets the start slot for next batch /// Gets the next start slot for a batch and transitions the state to a Queued state.
// last block slot downloaded plus 1
fn update_start_slot(&mut self) { fn update_start_slot(&mut self) {
if self.forward_sync { if self.forward_sync {
self.current_start_slot += Slot::from(MAX_BLOCKS_PER_REQUEST); self.current_start_slot += Slot::from(MAX_BLOCKS_PER_REQUEST);
@ -56,58 +139,104 @@ impl<T: EthSpec> BlockRequests<T> {
} }
#[derive(PartialEq, Debug, Clone)] #[derive(PartialEq, Debug, Clone)]
/// The current state of the `ImportManager`.
enum ManagerState { enum ManagerState {
/// The manager is performing a long-range (batch) sync. In this mode, parent lookups are
/// disabled.
Syncing, Syncing,
/// The manager is up to date with all known peers and is connected to at least one
/// fully-syncing peer. In this state, parent lookups are enabled.
Regular, Regular,
/// No useful peers are connected. Long-range sync's cannot proceed and we have no useful
/// peers to download parents for. More peers need to be connected before we can proceed.
Stalled, Stalled,
} }
/// The output states that can occur from driving (polling) the manager state machine.
pub(crate) enum ImportManagerOutcome { pub(crate) enum ImportManagerOutcome {
/// There is no further work to complete. The manager is waiting for further input.
Idle, Idle,
/// A `BeaconBlocks` request is required.
RequestBlocks { RequestBlocks {
peer_id: PeerId, peer_id: PeerId,
request_id: RequestId, request_id: RequestId,
request: BeaconBlocksRequest, request: BeaconBlocksRequest,
}, },
/// A `RecentBeaconBlocks` request is required.
RecentRequest(PeerId, RecentBeaconBlocksRequest),
/// Updates information with peer via requesting another HELLO handshake. /// Updates information with peer via requesting another HELLO handshake.
Hello(PeerId), Hello(PeerId),
RecentRequest(PeerId, RecentBeaconBlocksRequest), /// A peer has caused a punishable error and should be downvoted.
DownvotePeer(PeerId), DownvotePeer(PeerId),
} }
/// The primary object for handling and driving all the current syncing logic. It maintains the
/// current state of the syncing process, the number of useful peers, downloaded blocks and
/// controls the logic behind both the long-range (batch) sync and the on-going potential parent
/// look-up of blocks.
pub struct ImportManager<T: BeaconChainTypes> { pub struct ImportManager<T: BeaconChainTypes> {
/// A reference to the underlying beacon chain. /// A weak reference to the underlying beacon chain.
chain: Arc<BeaconChain<T>>, chain: Weak<BeaconChain<T>>,
/// The current state of the import manager.
state: ManagerState, state: ManagerState,
/// A collection of `BlockRequest` per peer that is currently being downloaded. Used in the
/// long-range (batch) sync process.
import_queue: HashMap<PeerId, BlockRequests<T::EthSpec>>, import_queue: HashMap<PeerId, BlockRequests<T::EthSpec>>,
parent_queue: Vec<ParentRequests<T::EthSpec>>, /// A collection of parent block lookups.
parent_queue: SmallVec<[ParentRequests<T::EthSpec>; 3]>,
/// The collection of known, connected, fully-sync'd peers.
full_peers: HashSet<PeerId>, full_peers: HashSet<PeerId>,
/// The current request Id. This is used to keep track of responses to various outbound
/// requests. This is an internal accounting mechanism, request id's are never sent to any
/// peers.
current_req_id: usize, current_req_id: usize,
/// The logger for the import manager.
log: Logger, log: Logger,
} }
impl<T: BeaconChainTypes> ImportManager<T> { impl<T: BeaconChainTypes> ImportManager<T> {
/// Generates a new `ImportManager` given a logger and an Arc reference to a beacon chain. The
/// import manager keeps a weak reference to the beacon chain, which allows the chain to be
/// dropped during the syncing process. The syncing handles this termination gracefully.
pub fn new(beacon_chain: Arc<BeaconChain<T>>, log: &slog::Logger) -> Self { pub fn new(beacon_chain: Arc<BeaconChain<T>>, log: &slog::Logger) -> Self {
ImportManager { ImportManager {
chain: beacon_chain.clone(), chain: Arc::downgrade(&beacon_chain),
state: ManagerState::Regular, state: ManagerState::Regular,
import_queue: HashMap::new(), import_queue: HashMap::new(),
parent_queue: Vec::new(), parent_queue: SmallVec::new(),
full_peers: HashSet::new(), full_peers: HashSet::new(),
current_req_id: 0, current_req_id: 0,
log: log.clone(), log: log.clone(),
} }
} }
/// A peer has connected which has blocks that are unknown to us.
///
/// This function handles the logic associated with the connection of a new peer. If the peer
/// is sufficiently ahead of our current head, a long-range (batch) sync is started and
/// batches of blocks are queued to download from the peer. Batched blocks begin at our
/// current head. If the resulting downloaded blocks are part of our current chain, we
/// continue with a forward sync. If not, we download blocks (in batches) backwards until we
/// reach a common ancestor. Batches are then processed and downloaded sequentially forwards.
///
/// If the peer is within the `SLOT_IMPORT_TOLERANCE`, then it's head is sufficiently close to
/// ours that we consider it fully sync'd with respect to our current chain.
pub fn add_peer(&mut self, peer_id: PeerId, remote: PeerSyncInfo) { pub fn add_peer(&mut self, peer_id: PeerId, remote: PeerSyncInfo) {
// TODO: Improve comments. // ensure the beacon chain still exists
// initially try to download blocks from our current head let chain = match self.chain.upgrade() {
// then backwards search all the way back to our finalized epoch until we match on a chain Some(chain) => chain,
// has to be done sequentially to find next slot to start the batch from None => {
warn!(self.log,
"Beacon chain dropped. Peer not considered for sync";
"peer_id" => format!("{:?}", peer_id));
return;
}
};
let local = PeerSyncInfo::from(&self.chain); let local = PeerSyncInfo::from(&chain);
// If a peer is within SLOT_IMPORT_TOLERANCE from our head slot, ignore a batch sync // If a peer is within SLOT_IMPORT_TOLERANCE from our head slot, ignore a batch sync,
// consider it a fully-sync'd peer.
if remote.head_slot.sub(local.head_slot).as_usize() < SLOT_IMPORT_TOLERANCE { if remote.head_slot.sub(local.head_slot).as_usize() < SLOT_IMPORT_TOLERANCE {
trace!(self.log, "Ignoring full sync with peer"; trace!(self.log, "Ignoring full sync with peer";
"peer" => format!("{:?}", peer_id), "peer" => format!("{:?}", peer_id),
@ -116,34 +245,53 @@ impl<T: BeaconChainTypes> ImportManager<T> {
); );
// remove the peer from the queue if it exists // remove the peer from the queue if it exists
self.import_queue.remove(&peer_id); self.import_queue.remove(&peer_id);
self.add_full_peer(peer_id);
//
return; return;
} }
// Check if the peer is significantly is behind us. If within `SLOT_IMPORT_TOLERANCE`
// treat them as a fully synced peer. If not, ignore them in the sync process
if local.head_slot.sub(remote.head_slot).as_usize() < SLOT_IMPORT_TOLERANCE {
self.add_full_peer(peer_id);
} else {
debug!(
self.log,
"Out of sync peer connected";
"peer" => format!("{:?}", peer_id),
);
return;
}
// Check if we are already downloading blocks from this peer, if so update, if not set up
// a new request structure
if let Some(block_requests) = self.import_queue.get_mut(&peer_id) { if let Some(block_requests) = self.import_queue.get_mut(&peer_id) {
// update the target head slot // update the target head slot
if remote.head_slot > block_requests.target_head_slot { if remote.head_slot > block_requests.target_head_slot {
block_requests.target_head_slot = remote.head_slot; block_requests.target_head_slot = remote.head_slot;
} }
} else { } else {
// not already downloading blocks from this peer
let block_requests = BlockRequests { let block_requests = BlockRequests {
target_head_slot: remote.head_slot, // this should be larger than the current head. It is checked in the SyncManager before add_peer is called target_head_slot: remote.head_slot, // this should be larger than the current head. It is checked in the SyncManager before add_peer is called
target_head_root: remote.head_root, target_head_root: remote.head_root,
downloaded_blocks: Vec::new(), downloaded_blocks: Vec::new(),
state: BlockRequestsState::Queued, state: BlockRequestsState::Queued,
forward_sync: true, forward_sync: true,
current_start_slot: self.chain.best_slot(), current_start_slot: chain.best_slot(),
}; };
self.import_queue.insert(peer_id, block_requests); self.import_queue.insert(peer_id, block_requests);
} }
} }
/// A `BeaconBlocks` request has received a response. This function process the response.
pub fn beacon_blocks_response( pub fn beacon_blocks_response(
&mut self, &mut self,
peer_id: PeerId, peer_id: PeerId,
request_id: RequestId, request_id: RequestId,
mut blocks: Vec<BeaconBlock<T::EthSpec>>, mut blocks: Vec<BeaconBlock<T::EthSpec>>,
) { ) {
// find the request // find the request associated with this response
let block_requests = match self let block_requests = match self
.import_queue .import_queue
.get_mut(&peer_id) .get_mut(&peer_id)

View File

@ -16,8 +16,6 @@ use types::{Attestation, BeaconBlock, Epoch, EthSpec, Hash256, Slot};
/// Otherwise we queue it. /// Otherwise we queue it.
pub(crate) const FUTURE_SLOT_TOLERANCE: u64 = 1; pub(crate) const FUTURE_SLOT_TOLERANCE: u64 = 1;
/// The number of slots behind our head that we still treat a peer as a fully synced peer.
const FULL_PEER_TOLERANCE: u64 = 10;
const SHOULD_FORWARD_GOSSIP_BLOCK: bool = true; const SHOULD_FORWARD_GOSSIP_BLOCK: bool = true;
const SHOULD_NOT_FORWARD_GOSSIP_BLOCK: bool = false; const SHOULD_NOT_FORWARD_GOSSIP_BLOCK: bool = false;
@ -189,18 +187,17 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
.exists::<BeaconBlock<T::EthSpec>>(&remote.head_root) .exists::<BeaconBlock<T::EthSpec>>(&remote.head_root)
.unwrap_or_else(|_| false) .unwrap_or_else(|_| false)
{ {
trace!(
self.log, "Out of date or potentially sync'd peer found";
"peer" => format!("{:?}", peer_id),
"remote_head_slot" => remote.head_slot
"remote_latest_finalized_epoch" => remote.finalized_epoch,
);
// If the node's best-block is already known to us and they are close to our current // If the node's best-block is already known to us and they are close to our current
// head, treat them as a fully sync'd peer. // head, treat them as a fully sync'd peer.
if self.chain.best_slot().sub(remote.head_slot).as_u64() < FULL_PEER_TOLERANCE { self.manager.add_peer(peer_id, remote);
self.manager.add_full_peer(peer_id); self.process_sync();
self.process_sync();
} else {
debug!(
self.log,
"Out of sync peer connected";
"peer" => format!("{:?}", peer_id),
);
}
} else { } else {
// The remote node has an equal or great finalized epoch and we don't know it's head. // The remote node has an equal or great finalized epoch and we don't know it's head.
// //
@ -218,6 +215,8 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
} }
} }
/// This function drives the `ImportManager` state machine. The outcomes it provides are
/// actioned until the `ImportManager` is idle.
fn process_sync(&mut self) { fn process_sync(&mut self) {
loop { loop {
match self.manager.poll() { match self.manager.poll() {