Correct a race condition when dialing peers (#4056)

There is a race condition which occurs when multiple discovery queries return at almost the exact same time and they independently contain a useful peer we would like to connect to.

The condition can occur that we can add the same peer to the dial queue, before we get a chance to process the queue. 
This ends up displaying an error to the user: 
```
ERRO Dialing an already dialing peer
```
Although this error is harmless it's not ideal. 

There are two solutions to resolving this:
1. As we decide to dial the peer, we change the state in the peer-db to dialing (before we add it to the queue) which would prevent other requests from adding to the queue. 
2. We prevent duplicates in the dial queue

This PR has opted for 2. because 1. will complicate the code in that we are changing states in non-intuitive places. Although this technically adds a very slight performance cost, its probably a cleaner solution as we can keep the state-changing logic in one place.
This commit is contained in:
Age Manning 2023-03-16 05:44:54 +00:00
parent 1ec3041673
commit 3d99ce25f8
8 changed files with 41 additions and 12 deletions

17
Cargo.lock generated
View File

@ -1614,6 +1614,16 @@ dependencies = [
"tokio-util 0.6.10", "tokio-util 0.6.10",
] ]
[[package]]
name = "delay_map"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e4355c25cbf99edcb6b4a0e906f6bdc6956eda149e84455bea49696429b2f8e8"
dependencies = [
"futures",
"tokio-util 0.7.7",
]
[[package]] [[package]]
name = "deposit_contract" name = "deposit_contract"
version = "0.2.0" version = "0.2.0"
@ -1812,7 +1822,7 @@ dependencies = [
"aes 0.7.5", "aes 0.7.5",
"aes-gcm 0.9.4", "aes-gcm 0.9.4",
"arrayvec", "arrayvec",
"delay_map", "delay_map 0.1.2",
"enr", "enr",
"fnv", "fnv",
"futures", "futures",
@ -4404,7 +4414,7 @@ dependencies = [
name = "lighthouse_network" name = "lighthouse_network"
version = "0.2.0" version = "0.2.0"
dependencies = [ dependencies = [
"delay_map", "delay_map 0.3.0",
"directory", "directory",
"dirs", "dirs",
"discv5", "discv5",
@ -5021,7 +5031,7 @@ name = "network"
version = "0.2.0" version = "0.2.0"
dependencies = [ dependencies = [
"beacon_chain", "beacon_chain",
"delay_map", "delay_map 0.3.0",
"derivative", "derivative",
"environment", "environment",
"error-chain", "error-chain",
@ -7985,6 +7995,7 @@ dependencies = [
"futures-io", "futures-io",
"futures-sink", "futures-sink",
"pin-project-lite 0.2.9", "pin-project-lite 0.2.9",
"slab",
"tokio", "tokio",
"tracing", "tracing",
] ]

View File

@ -1,4 +1,4 @@
FROM rust:1.65.0-bullseye AS builder FROM rust:1.66.0-bullseye AS builder
RUN apt-get update && apt-get -y upgrade && apt-get install -y cmake libclang-dev protobuf-compiler RUN apt-get update && apt-get -y upgrade && apt-get install -y cmake libclang-dev protobuf-compiler
COPY . lighthouse COPY . lighthouse
ARG FEATURES ARG FEATURES

View File

@ -42,7 +42,7 @@ strum = { version = "0.24.0", features = ["derive"] }
superstruct = "0.5.0" superstruct = "0.5.0"
prometheus-client = "0.18.0" prometheus-client = "0.18.0"
unused_port = { path = "../../common/unused_port" } unused_port = { path = "../../common/unused_port" }
delay_map = "0.1.1" delay_map = "0.3.0"
void = "1" void = "1"
[dependencies.libp2p] [dependencies.libp2p]

View File

@ -13,7 +13,7 @@ use peerdb::{client::ClientKind, BanOperation, BanResult, ScoreUpdateResult};
use rand::seq::SliceRandom; use rand::seq::SliceRandom;
use slog::{debug, error, trace, warn}; use slog::{debug, error, trace, warn};
use smallvec::SmallVec; use smallvec::SmallVec;
use std::collections::VecDeque; use std::collections::BTreeMap;
use std::{ use std::{
sync::Arc, sync::Arc,
time::{Duration, Instant}, time::{Duration, Instant},
@ -77,7 +77,7 @@ pub struct PeerManager<TSpec: EthSpec> {
/// The target number of peers we would like to connect to. /// The target number of peers we would like to connect to.
target_peers: usize, target_peers: usize,
/// Peers queued to be dialed. /// Peers queued to be dialed.
peers_to_dial: VecDeque<(PeerId, Option<Enr>)>, peers_to_dial: BTreeMap<PeerId, Option<Enr>>,
/// The number of temporarily banned peers. This is used to prevent instantaneous /// The number of temporarily banned peers. This is used to prevent instantaneous
/// reconnection. /// reconnection.
// NOTE: This just prevents re-connections. The state of the peer is otherwise unaffected. A // NOTE: This just prevents re-connections. The state of the peer is otherwise unaffected. A
@ -308,7 +308,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
/// proves resource constraining, we should switch to multiaddr dialling here. /// proves resource constraining, we should switch to multiaddr dialling here.
#[allow(clippy::mutable_key_type)] #[allow(clippy::mutable_key_type)]
pub fn peers_discovered(&mut self, results: HashMap<PeerId, Option<Instant>>) -> Vec<PeerId> { pub fn peers_discovered(&mut self, results: HashMap<PeerId, Option<Instant>>) -> Vec<PeerId> {
let mut to_dial_peers = Vec::new(); let mut to_dial_peers = Vec::with_capacity(4);
let connected_or_dialing = self.network_globals.connected_or_dialing_peers(); let connected_or_dialing = self.network_globals.connected_or_dialing_peers();
for (peer_id, min_ttl) in results { for (peer_id, min_ttl) in results {
@ -398,7 +398,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
// A peer is being dialed. // A peer is being dialed.
pub fn dial_peer(&mut self, peer_id: &PeerId, enr: Option<Enr>) { pub fn dial_peer(&mut self, peer_id: &PeerId, enr: Option<Enr>) {
self.peers_to_dial.push_back((*peer_id, enr)); self.peers_to_dial.insert(*peer_id, enr);
} }
/// Reports if a peer is banned or not. /// Reports if a peer is banned or not.
@ -1185,6 +1185,18 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
// Unban any peers that have served their temporary ban timeout // Unban any peers that have served their temporary ban timeout
self.unban_temporary_banned_peers(); self.unban_temporary_banned_peers();
// Maintains memory by shrinking mappings
self.shrink_mappings();
}
// Reduce memory footprint by routinely shrinking associating mappings.
fn shrink_mappings(&mut self) {
self.inbound_ping_peers.shrink_to(5);
self.outbound_ping_peers.shrink_to(5);
self.status_peers.shrink_to(5);
self.temporary_banned_peers.shrink_to_fit();
self.sync_committee_subnets.shrink_to_fit();
} }
// Update metrics related to peer scoring. // Update metrics related to peer scoring.

View File

@ -89,7 +89,7 @@ impl<TSpec: EthSpec> NetworkBehaviour for PeerManager<TSpec> {
self.events.shrink_to_fit(); self.events.shrink_to_fit();
} }
if let Some((peer_id, maybe_enr)) = self.peers_to_dial.pop_front() { if let Some((peer_id, maybe_enr)) = self.peers_to_dial.pop_first() {
self.inject_peer_connection(&peer_id, ConnectingType::Dialing, maybe_enr); self.inject_peer_connection(&peer_id, ConnectingType::Dialing, maybe_enr);
let handler = self.new_handler(); let handler = self.new_handler();
return Poll::Ready(NetworkBehaviourAction::Dial { return Poll::Ready(NetworkBehaviourAction::Dial {

View File

@ -43,7 +43,7 @@ if-addrs = "0.6.4"
strum = "0.24.0" strum = "0.24.0"
tokio-util = { version = "0.6.3", features = ["time"] } tokio-util = { version = "0.6.3", features = ["time"] }
derivative = "2.2.0" derivative = "2.2.0"
delay_map = "0.1.1" delay_map = "0.3.0"
ethereum-types = { version = "0.14.1", optional = true } ethereum-types = { version = "0.14.1", optional = true }
operation_pool = { path = "../operation_pool" } operation_pool = { path = "../operation_pool" }
execution_layer = { path = "../execution_layer" } execution_layer = { path = "../execution_layer" }

View File

@ -160,6 +160,12 @@ where
self.map.contains(key) self.map.contains(key)
} }
/// Shrink the mappings to fit the current size.
pub fn shrink_to_fit(&mut self) {
self.map.shrink_to_fit();
self.list.shrink_to_fit();
}
#[cfg(test)] #[cfg(test)]
#[track_caller] #[track_caller]
fn check_invariant(&self) { fn check_invariant(&self) {

View File

@ -4,7 +4,7 @@ version = "3.5.1"
authors = ["Sigma Prime <contact@sigmaprime.io>"] authors = ["Sigma Prime <contact@sigmaprime.io>"]
edition = "2021" edition = "2021"
autotests = false autotests = false
rust-version = "1.65" rust-version = "1.66"
[features] [features]
default = ["slasher-mdbx"] default = ["slasher-mdbx"]