mirror of
https://gitlab.com/pulsechaincom/lighthouse-pulse.git
synced 2024-12-22 19:50:37 +00:00
light client optimistic update reprocessing (#3799)
## Issue Addressed Currently there is a race between receiving blocks and receiving light client optimistic updates (in unstable), which results in processing errors. This is a continuation of PR #3693 and seeks to progress on issue #3651 ## Proposed Changes Add the parent_root to ReprocessQueueMessage::BlockImported so we can remove blocks from queue when a block arrives that has the same parent root. We use the parent root as opposed to the block_root because the LightClientOptimisticUpdate does not contain the block_root. If light_client_optimistic_update.attested_header.canonical_root() != head_block.message().parent_root() then we queue the update. Otherwise we process immediately. ## Additional Info michaelsproul came up with this idea. The code was heavily based off of the attestation reprocessing. I have not properly tested this to see if it works as intended.
This commit is contained in:
parent
2802bc9a9c
commit
a7351c00c0
@ -2,6 +2,7 @@ use crate::{
|
||||
beacon_chain::MAXIMUM_GOSSIP_CLOCK_DISPARITY, BeaconChain, BeaconChainError, BeaconChainTypes,
|
||||
};
|
||||
use derivative::Derivative;
|
||||
use eth2::types::Hash256;
|
||||
use slot_clock::SlotClock;
|
||||
use std::time::Duration;
|
||||
use strum::AsRefStr;
|
||||
@ -36,6 +37,8 @@ pub enum Error {
|
||||
SigSlotStartIsNone,
|
||||
/// Failed to construct a LightClientOptimisticUpdate from state.
|
||||
FailedConstructingUpdate,
|
||||
/// Unknown block with parent root.
|
||||
UnknownBlockParentRoot(Hash256),
|
||||
/// Beacon chain error occured.
|
||||
BeaconChainError(BeaconChainError),
|
||||
LightClientUpdateError(LightClientUpdateError),
|
||||
@ -58,6 +61,7 @@ impl From<LightClientUpdateError> for Error {
|
||||
#[derivative(Clone(bound = "T: BeaconChainTypes"))]
|
||||
pub struct VerifiedLightClientOptimisticUpdate<T: BeaconChainTypes> {
|
||||
light_client_optimistic_update: LightClientOptimisticUpdate<T::EthSpec>,
|
||||
pub parent_root: Hash256,
|
||||
seen_timestamp: Duration,
|
||||
}
|
||||
|
||||
@ -107,6 +111,16 @@ impl<T: BeaconChainTypes> VerifiedLightClientOptimisticUpdate<T> {
|
||||
None => return Err(Error::SigSlotStartIsNone),
|
||||
}
|
||||
|
||||
// check if we can process the optimistic update immediately
|
||||
// otherwise queue
|
||||
let canonical_root = light_client_optimistic_update
|
||||
.attested_header
|
||||
.canonical_root();
|
||||
|
||||
if canonical_root != head_block.message().parent_root() {
|
||||
return Err(Error::UnknownBlockParentRoot(canonical_root));
|
||||
}
|
||||
|
||||
let optimistic_update =
|
||||
LightClientOptimisticUpdate::new(&chain.spec, head_block, &attested_state)?;
|
||||
|
||||
@ -119,6 +133,7 @@ impl<T: BeaconChainTypes> VerifiedLightClientOptimisticUpdate<T> {
|
||||
|
||||
Ok(Self {
|
||||
light_client_optimistic_update,
|
||||
parent_root: canonical_root,
|
||||
seen_timestamp,
|
||||
})
|
||||
}
|
||||
|
@ -67,7 +67,8 @@ use types::{
|
||||
SignedVoluntaryExit, SubnetId, SyncCommitteeMessage, SyncSubnetId,
|
||||
};
|
||||
use work_reprocessing_queue::{
|
||||
spawn_reprocess_scheduler, QueuedAggregate, QueuedRpcBlock, QueuedUnaggregate, ReadyWork,
|
||||
spawn_reprocess_scheduler, QueuedAggregate, QueuedLightClientUpdate, QueuedRpcBlock,
|
||||
QueuedUnaggregate, ReadyWork,
|
||||
};
|
||||
|
||||
use worker::{Toolbox, Worker};
|
||||
@ -137,6 +138,10 @@ const MAX_GOSSIP_FINALITY_UPDATE_QUEUE_LEN: usize = 1_024;
|
||||
/// before we start dropping them.
|
||||
const MAX_GOSSIP_OPTIMISTIC_UPDATE_QUEUE_LEN: usize = 1_024;
|
||||
|
||||
/// The maximum number of queued `LightClientOptimisticUpdate` objects received on gossip that will be stored
|
||||
/// for reprocessing before we start dropping them.
|
||||
const MAX_GOSSIP_OPTIMISTIC_UPDATE_REPROCESS_QUEUE_LEN: usize = 128;
|
||||
|
||||
/// The maximum number of queued `SyncCommitteeMessage` objects that will be stored before we start dropping
|
||||
/// them.
|
||||
const MAX_SYNC_MESSAGE_QUEUE_LEN: usize = 2048;
|
||||
@ -213,6 +218,7 @@ pub const BLOCKS_BY_ROOTS_REQUEST: &str = "blocks_by_roots_request";
|
||||
pub const LIGHT_CLIENT_BOOTSTRAP_REQUEST: &str = "light_client_bootstrap";
|
||||
pub const UNKNOWN_BLOCK_ATTESTATION: &str = "unknown_block_attestation";
|
||||
pub const UNKNOWN_BLOCK_AGGREGATE: &str = "unknown_block_aggregate";
|
||||
pub const UNKNOWN_LIGHT_CLIENT_UPDATE: &str = "unknown_light_client_update";
|
||||
|
||||
/// A simple first-in-first-out queue with a maximum length.
|
||||
struct FifoQueue<T> {
|
||||
@ -694,6 +700,21 @@ impl<T: BeaconChainTypes> std::convert::From<ReadyWork<T>> for WorkEvent<T> {
|
||||
seen_timestamp,
|
||||
},
|
||||
},
|
||||
ReadyWork::LightClientUpdate(QueuedLightClientUpdate {
|
||||
peer_id,
|
||||
message_id,
|
||||
light_client_optimistic_update,
|
||||
seen_timestamp,
|
||||
..
|
||||
}) => Self {
|
||||
drop_during_sync: true,
|
||||
work: Work::UnknownLightClientOptimisticUpdate {
|
||||
message_id,
|
||||
peer_id,
|
||||
light_client_optimistic_update,
|
||||
seen_timestamp,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -733,6 +754,12 @@ pub enum Work<T: BeaconChainTypes> {
|
||||
aggregate: Box<SignedAggregateAndProof<T::EthSpec>>,
|
||||
seen_timestamp: Duration,
|
||||
},
|
||||
UnknownLightClientOptimisticUpdate {
|
||||
message_id: MessageId,
|
||||
peer_id: PeerId,
|
||||
light_client_optimistic_update: Box<LightClientOptimisticUpdate<T::EthSpec>>,
|
||||
seen_timestamp: Duration,
|
||||
},
|
||||
GossipAggregateBatch {
|
||||
packages: Vec<GossipAggregatePackage<T::EthSpec>>,
|
||||
},
|
||||
@ -845,6 +872,7 @@ impl<T: BeaconChainTypes> Work<T> {
|
||||
Work::LightClientBootstrapRequest { .. } => LIGHT_CLIENT_BOOTSTRAP_REQUEST,
|
||||
Work::UnknownBlockAttestation { .. } => UNKNOWN_BLOCK_ATTESTATION,
|
||||
Work::UnknownBlockAggregate { .. } => UNKNOWN_BLOCK_AGGREGATE,
|
||||
Work::UnknownLightClientOptimisticUpdate { .. } => UNKNOWN_LIGHT_CLIENT_UPDATE,
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -979,6 +1007,8 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
|
||||
// Using a FIFO queue for light client updates to maintain sequence order.
|
||||
let mut finality_update_queue = FifoQueue::new(MAX_GOSSIP_FINALITY_UPDATE_QUEUE_LEN);
|
||||
let mut optimistic_update_queue = FifoQueue::new(MAX_GOSSIP_OPTIMISTIC_UPDATE_QUEUE_LEN);
|
||||
let mut unknown_light_client_update_queue =
|
||||
FifoQueue::new(MAX_GOSSIP_OPTIMISTIC_UPDATE_REPROCESS_QUEUE_LEN);
|
||||
|
||||
// Using a FIFO queue since blocks need to be imported sequentially.
|
||||
let mut rpc_block_queue = FifoQueue::new(MAX_RPC_BLOCK_QUEUE_LEN);
|
||||
@ -1346,6 +1376,9 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
|
||||
Work::UnknownBlockAggregate { .. } => {
|
||||
unknown_block_aggregate_queue.push(work)
|
||||
}
|
||||
Work::UnknownLightClientOptimisticUpdate { .. } => {
|
||||
unknown_light_client_update_queue.push(work, work_id, &self.log)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1665,6 +1698,7 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
|
||||
message_id,
|
||||
peer_id,
|
||||
*light_client_optimistic_update,
|
||||
Some(work_reprocessing_tx),
|
||||
seen_timestamp,
|
||||
)
|
||||
}),
|
||||
@ -1787,6 +1821,20 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
|
||||
seen_timestamp,
|
||||
)
|
||||
}),
|
||||
Work::UnknownLightClientOptimisticUpdate {
|
||||
message_id,
|
||||
peer_id,
|
||||
light_client_optimistic_update,
|
||||
seen_timestamp,
|
||||
} => task_spawner.spawn_blocking(move || {
|
||||
worker.process_gossip_optimistic_update(
|
||||
message_id,
|
||||
peer_id,
|
||||
*light_client_optimistic_update,
|
||||
None,
|
||||
seen_timestamp,
|
||||
)
|
||||
}),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
@ -19,7 +19,7 @@ use futures::task::Poll;
|
||||
use futures::{Stream, StreamExt};
|
||||
use lighthouse_network::{MessageId, PeerId};
|
||||
use logging::TimeLatch;
|
||||
use slog::{crit, debug, error, warn, Logger};
|
||||
use slog::{crit, debug, error, trace, warn, Logger};
|
||||
use slot_clock::SlotClock;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::pin::Pin;
|
||||
@ -30,12 +30,16 @@ use task_executor::TaskExecutor;
|
||||
use tokio::sync::mpsc::{self, Receiver, Sender};
|
||||
use tokio::time::error::Error as TimeError;
|
||||
use tokio_util::time::delay_queue::{DelayQueue, Key as DelayKey};
|
||||
use types::{Attestation, EthSpec, Hash256, SignedAggregateAndProof, SignedBeaconBlock, SubnetId};
|
||||
use types::{
|
||||
Attestation, EthSpec, Hash256, LightClientOptimisticUpdate, SignedAggregateAndProof,
|
||||
SignedBeaconBlock, SubnetId,
|
||||
};
|
||||
|
||||
const TASK_NAME: &str = "beacon_processor_reprocess_queue";
|
||||
const GOSSIP_BLOCKS: &str = "gossip_blocks";
|
||||
const RPC_BLOCKS: &str = "rpc_blocks";
|
||||
const ATTESTATIONS: &str = "attestations";
|
||||
const LIGHT_CLIENT_UPDATES: &str = "lc_updates";
|
||||
|
||||
/// Queue blocks for re-processing with an `ADDITIONAL_QUEUED_BLOCK_DELAY` after the slot starts.
|
||||
/// This is to account for any slight drift in the system clock.
|
||||
@ -44,6 +48,9 @@ const ADDITIONAL_QUEUED_BLOCK_DELAY: Duration = Duration::from_millis(5);
|
||||
/// For how long to queue aggregated and unaggregated attestations for re-processing.
|
||||
pub const QUEUED_ATTESTATION_DELAY: Duration = Duration::from_secs(12);
|
||||
|
||||
/// For how long to queue light client updates for re-processing.
|
||||
pub const QUEUED_LIGHT_CLIENT_UPDATE_DELAY: Duration = Duration::from_secs(12);
|
||||
|
||||
/// For how long to queue rpc blocks before sending them back for reprocessing.
|
||||
pub const QUEUED_RPC_BLOCK_DELAY: Duration = Duration::from_secs(3);
|
||||
|
||||
@ -55,6 +62,9 @@ const MAXIMUM_QUEUED_BLOCKS: usize = 16;
|
||||
/// How many attestations we keep before new ones get dropped.
|
||||
const MAXIMUM_QUEUED_ATTESTATIONS: usize = 16_384;
|
||||
|
||||
/// How many light client updates we keep before new ones get dropped.
|
||||
const MAXIMUM_QUEUED_LIGHT_CLIENT_UPDATES: usize = 128;
|
||||
|
||||
/// Messages that the scheduler can receive.
|
||||
pub enum ReprocessQueueMessage<T: BeaconChainTypes> {
|
||||
/// A block that has been received early and we should queue for later processing.
|
||||
@ -62,13 +72,18 @@ pub enum ReprocessQueueMessage<T: BeaconChainTypes> {
|
||||
/// A gossip block for hash `X` is being imported, we should queue the rpc block for the same
|
||||
/// hash until the gossip block is imported.
|
||||
RpcBlock(QueuedRpcBlock<T::EthSpec>),
|
||||
/// A block that was successfully processed. We use this to handle attestations for unknown
|
||||
/// blocks.
|
||||
BlockImported(Hash256),
|
||||
/// A block that was successfully processed. We use this to handle attestations and light client updates
|
||||
/// for unknown blocks.
|
||||
BlockImported {
|
||||
block_root: Hash256,
|
||||
parent_root: Hash256,
|
||||
},
|
||||
/// An unaggregated attestation that references an unknown block.
|
||||
UnknownBlockUnaggregate(QueuedUnaggregate<T::EthSpec>),
|
||||
/// An aggregated attestation that references an unknown block.
|
||||
UnknownBlockAggregate(QueuedAggregate<T::EthSpec>),
|
||||
/// A light client optimistic update that references a parent root that has not been seen as a parent.
|
||||
UnknownLightClientOptimisticUpdate(QueuedLightClientUpdate<T::EthSpec>),
|
||||
}
|
||||
|
||||
/// Events sent by the scheduler once they are ready for re-processing.
|
||||
@ -77,6 +92,7 @@ pub enum ReadyWork<T: BeaconChainTypes> {
|
||||
RpcBlock(QueuedRpcBlock<T::EthSpec>),
|
||||
Unaggregate(QueuedUnaggregate<T::EthSpec>),
|
||||
Aggregate(QueuedAggregate<T::EthSpec>),
|
||||
LightClientUpdate(QueuedLightClientUpdate<T::EthSpec>),
|
||||
}
|
||||
|
||||
/// An Attestation for which the corresponding block was not seen while processing, queued for
|
||||
@ -99,6 +115,16 @@ pub struct QueuedAggregate<T: EthSpec> {
|
||||
pub seen_timestamp: Duration,
|
||||
}
|
||||
|
||||
/// A light client update for which the corresponding parent block was not seen while processing,
|
||||
/// queued for later.
|
||||
pub struct QueuedLightClientUpdate<T: EthSpec> {
|
||||
pub peer_id: PeerId,
|
||||
pub message_id: MessageId,
|
||||
pub light_client_optimistic_update: Box<LightClientOptimisticUpdate<T>>,
|
||||
pub parent_root: Hash256,
|
||||
pub seen_timestamp: Duration,
|
||||
}
|
||||
|
||||
/// A block that arrived early and has been queued for later import.
|
||||
pub struct QueuedGossipBlock<T: BeaconChainTypes> {
|
||||
pub peer_id: PeerId,
|
||||
@ -127,6 +153,8 @@ enum InboundEvent<T: BeaconChainTypes> {
|
||||
ReadyRpcBlock(QueuedRpcBlock<T::EthSpec>),
|
||||
/// An aggregated or unaggregated attestation is ready for re-processing.
|
||||
ReadyAttestation(QueuedAttestationId),
|
||||
/// A light client update that is ready for re-processing.
|
||||
ReadyLightClientUpdate(QueuedLightClientUpdateId),
|
||||
/// A `DelayQueue` returned an error.
|
||||
DelayQueueError(TimeError, &'static str),
|
||||
/// A message sent to the `ReprocessQueue`
|
||||
@ -147,6 +175,8 @@ struct ReprocessQueue<T: BeaconChainTypes> {
|
||||
rpc_block_delay_queue: DelayQueue<QueuedRpcBlock<T::EthSpec>>,
|
||||
/// Queue to manage scheduled attestations.
|
||||
attestations_delay_queue: DelayQueue<QueuedAttestationId>,
|
||||
/// Queue to manage scheduled light client updates.
|
||||
lc_updates_delay_queue: DelayQueue<QueuedLightClientUpdateId>,
|
||||
|
||||
/* Queued items */
|
||||
/// Queued blocks.
|
||||
@ -157,15 +187,23 @@ struct ReprocessQueue<T: BeaconChainTypes> {
|
||||
queued_unaggregates: FnvHashMap<usize, (QueuedUnaggregate<T::EthSpec>, DelayKey)>,
|
||||
/// Attestations (aggregated and unaggregated) per root.
|
||||
awaiting_attestations_per_root: HashMap<Hash256, Vec<QueuedAttestationId>>,
|
||||
/// Queued Light Client Updates.
|
||||
queued_lc_updates: FnvHashMap<usize, (QueuedLightClientUpdate<T::EthSpec>, DelayKey)>,
|
||||
/// Light Client Updates per parent_root.
|
||||
awaiting_lc_updates_per_parent_root: HashMap<Hash256, Vec<QueuedLightClientUpdateId>>,
|
||||
|
||||
/* Aux */
|
||||
/// Next attestation id, used for both aggregated and unaggregated attestations
|
||||
next_attestation: usize,
|
||||
next_lc_update: usize,
|
||||
early_block_debounce: TimeLatch,
|
||||
rpc_block_debounce: TimeLatch,
|
||||
attestation_delay_debounce: TimeLatch,
|
||||
lc_update_delay_debounce: TimeLatch,
|
||||
}
|
||||
|
||||
pub type QueuedLightClientUpdateId = usize;
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
enum QueuedAttestationId {
|
||||
Aggregate(usize),
|
||||
@ -235,6 +273,20 @@ impl<T: BeaconChainTypes> Stream for ReprocessQueue<T> {
|
||||
Poll::Ready(None) | Poll::Pending => (),
|
||||
}
|
||||
|
||||
match self.lc_updates_delay_queue.poll_expired(cx) {
|
||||
Poll::Ready(Some(Ok(lc_id))) => {
|
||||
return Poll::Ready(Some(InboundEvent::ReadyLightClientUpdate(
|
||||
lc_id.into_inner(),
|
||||
)));
|
||||
}
|
||||
Poll::Ready(Some(Err(e))) => {
|
||||
return Poll::Ready(Some(InboundEvent::DelayQueueError(e, "lc_updates_queue")));
|
||||
}
|
||||
// `Poll::Ready(None)` means that there are no more entries in the delay queue and we
|
||||
// will continue to get this result until something else is added into the queue.
|
||||
Poll::Ready(None) | Poll::Pending => (),
|
||||
}
|
||||
|
||||
// Last empty the messages channel.
|
||||
match self.work_reprocessing_rx.poll_recv(cx) {
|
||||
Poll::Ready(Some(message)) => return Poll::Ready(Some(InboundEvent::Msg(message))),
|
||||
@ -264,14 +316,19 @@ pub fn spawn_reprocess_scheduler<T: BeaconChainTypes>(
|
||||
gossip_block_delay_queue: DelayQueue::new(),
|
||||
rpc_block_delay_queue: DelayQueue::new(),
|
||||
attestations_delay_queue: DelayQueue::new(),
|
||||
lc_updates_delay_queue: DelayQueue::new(),
|
||||
queued_gossip_block_roots: HashSet::new(),
|
||||
queued_lc_updates: FnvHashMap::default(),
|
||||
queued_aggregates: FnvHashMap::default(),
|
||||
queued_unaggregates: FnvHashMap::default(),
|
||||
awaiting_attestations_per_root: HashMap::new(),
|
||||
awaiting_lc_updates_per_parent_root: HashMap::new(),
|
||||
next_attestation: 0,
|
||||
next_lc_update: 0,
|
||||
early_block_debounce: TimeLatch::default(),
|
||||
rpc_block_debounce: TimeLatch::default(),
|
||||
attestation_delay_debounce: TimeLatch::default(),
|
||||
lc_update_delay_debounce: TimeLatch::default(),
|
||||
};
|
||||
|
||||
executor.spawn(
|
||||
@ -473,9 +530,49 @@ impl<T: BeaconChainTypes> ReprocessQueue<T> {
|
||||
|
||||
self.next_attestation += 1;
|
||||
}
|
||||
InboundEvent::Msg(BlockImported(root)) => {
|
||||
InboundEvent::Msg(UnknownLightClientOptimisticUpdate(
|
||||
queued_light_client_optimistic_update,
|
||||
)) => {
|
||||
if self.lc_updates_delay_queue.len() >= MAXIMUM_QUEUED_LIGHT_CLIENT_UPDATES {
|
||||
if self.lc_update_delay_debounce.elapsed() {
|
||||
error!(
|
||||
log,
|
||||
"Light client updates delay queue is full";
|
||||
"queue_size" => MAXIMUM_QUEUED_LIGHT_CLIENT_UPDATES,
|
||||
"msg" => "check system clock"
|
||||
);
|
||||
}
|
||||
// Drop the light client update.
|
||||
return;
|
||||
}
|
||||
|
||||
let lc_id: QueuedLightClientUpdateId = self.next_lc_update;
|
||||
|
||||
// Register the delay.
|
||||
let delay_key = self
|
||||
.lc_updates_delay_queue
|
||||
.insert(lc_id, QUEUED_LIGHT_CLIENT_UPDATE_DELAY);
|
||||
|
||||
// Register the light client update for the corresponding root.
|
||||
self.awaiting_lc_updates_per_parent_root
|
||||
.entry(queued_light_client_optimistic_update.parent_root)
|
||||
.or_default()
|
||||
.push(lc_id);
|
||||
|
||||
// Store the light client update and its info.
|
||||
self.queued_lc_updates.insert(
|
||||
self.next_lc_update,
|
||||
(queued_light_client_optimistic_update, delay_key),
|
||||
);
|
||||
|
||||
self.next_lc_update += 1;
|
||||
}
|
||||
InboundEvent::Msg(BlockImported {
|
||||
block_root,
|
||||
parent_root,
|
||||
}) => {
|
||||
// Unqueue the attestations we have for this root, if any.
|
||||
if let Some(queued_ids) = self.awaiting_attestations_per_root.remove(&root) {
|
||||
if let Some(queued_ids) = self.awaiting_attestations_per_root.remove(&block_root) {
|
||||
for id in queued_ids {
|
||||
metrics::inc_counter(
|
||||
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_MATCHED_ATTESTATIONS,
|
||||
@ -511,12 +608,62 @@ impl<T: BeaconChainTypes> ReprocessQueue<T> {
|
||||
error!(
|
||||
log,
|
||||
"Unknown queued attestation for block root";
|
||||
"block_root" => ?root,
|
||||
"block_root" => ?block_root,
|
||||
"att_id" => ?id,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
// Unqueue the light client optimistic updates we have for this root, if any.
|
||||
if let Some(queued_lc_id) = self
|
||||
.awaiting_lc_updates_per_parent_root
|
||||
.remove(&parent_root)
|
||||
{
|
||||
debug!(
|
||||
log,
|
||||
"Dequeuing light client optimistic updates";
|
||||
"parent_root" => %parent_root,
|
||||
"count" => queued_lc_id.len(),
|
||||
);
|
||||
|
||||
for lc_id in queued_lc_id {
|
||||
metrics::inc_counter(
|
||||
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_MATCHED_OPTIMISTIC_UPDATES,
|
||||
);
|
||||
if let Some((work, delay_key)) = self.queued_lc_updates.remove(&lc_id).map(
|
||||
|(light_client_optimistic_update, delay_key)| {
|
||||
(
|
||||
ReadyWork::LightClientUpdate(light_client_optimistic_update),
|
||||
delay_key,
|
||||
)
|
||||
},
|
||||
) {
|
||||
// Remove the delay
|
||||
self.lc_updates_delay_queue.remove(&delay_key);
|
||||
|
||||
// Send the work
|
||||
match self.ready_work_tx.try_send(work) {
|
||||
Ok(_) => trace!(
|
||||
log,
|
||||
"reprocessing light client update sent";
|
||||
),
|
||||
Err(_) => error!(
|
||||
log,
|
||||
"Failed to send scheduled light client update";
|
||||
),
|
||||
}
|
||||
} else {
|
||||
// There is a mismatch between the light client update ids registered for this
|
||||
// root and the queued light client updates. This should never happen.
|
||||
error!(
|
||||
log,
|
||||
"Unknown queued light client update for parent root";
|
||||
"parent_root" => ?parent_root,
|
||||
"lc_id" => ?lc_id,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// A block that was queued for later processing is now ready to be processed.
|
||||
InboundEvent::ReadyGossipBlock(ready_block) => {
|
||||
@ -591,6 +738,38 @@ impl<T: BeaconChainTypes> ReprocessQueue<T> {
|
||||
}
|
||||
}
|
||||
}
|
||||
InboundEvent::ReadyLightClientUpdate(queued_id) => {
|
||||
metrics::inc_counter(
|
||||
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_EXPIRED_OPTIMISTIC_UPDATES,
|
||||
);
|
||||
|
||||
if let Some((parent_root, work)) = self.queued_lc_updates.remove(&queued_id).map(
|
||||
|(queued_lc_update, _delay_key)| {
|
||||
(
|
||||
queued_lc_update.parent_root,
|
||||
ReadyWork::LightClientUpdate(queued_lc_update),
|
||||
)
|
||||
},
|
||||
) {
|
||||
if self.ready_work_tx.try_send(work).is_err() {
|
||||
error!(
|
||||
log,
|
||||
"Failed to send scheduled light client optimistic update";
|
||||
);
|
||||
}
|
||||
|
||||
if let Some(queued_lc_updates) = self
|
||||
.awaiting_lc_updates_per_parent_root
|
||||
.get_mut(&parent_root)
|
||||
{
|
||||
if let Some(index) =
|
||||
queued_lc_updates.iter().position(|&id| id == queued_id)
|
||||
{
|
||||
queued_lc_updates.swap_remove(index);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
metrics::set_gauge_vec(
|
||||
@ -608,5 +787,10 @@ impl<T: BeaconChainTypes> ReprocessQueue<T> {
|
||||
&[ATTESTATIONS],
|
||||
self.attestations_delay_queue.len() as i64,
|
||||
);
|
||||
metrics::set_gauge_vec(
|
||||
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_TOTAL,
|
||||
&[LIGHT_CLIENT_UPDATES],
|
||||
self.lc_updates_delay_queue.len() as i64,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -28,7 +28,8 @@ use types::{
|
||||
|
||||
use super::{
|
||||
super::work_reprocessing_queue::{
|
||||
QueuedAggregate, QueuedGossipBlock, QueuedUnaggregate, ReprocessQueueMessage,
|
||||
QueuedAggregate, QueuedGossipBlock, QueuedLightClientUpdate, QueuedUnaggregate,
|
||||
ReprocessQueueMessage,
|
||||
},
|
||||
Worker,
|
||||
};
|
||||
@ -953,7 +954,10 @@ impl<T: BeaconChainTypes> Worker<T> {
|
||||
metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL);
|
||||
|
||||
if reprocess_tx
|
||||
.try_send(ReprocessQueueMessage::BlockImported(block_root))
|
||||
.try_send(ReprocessQueueMessage::BlockImported {
|
||||
block_root,
|
||||
parent_root: block.message().parent_root(),
|
||||
})
|
||||
.is_err()
|
||||
{
|
||||
error!(
|
||||
@ -1330,7 +1334,7 @@ impl<T: BeaconChainTypes> Worker<T> {
|
||||
LightClientFinalityUpdateError::InvalidLightClientFinalityUpdate => {
|
||||
debug!(
|
||||
self.log,
|
||||
"LC invalid finality update";
|
||||
"Light client invalid finality update";
|
||||
"peer" => %peer_id,
|
||||
"error" => ?e,
|
||||
);
|
||||
@ -1344,7 +1348,7 @@ impl<T: BeaconChainTypes> Worker<T> {
|
||||
LightClientFinalityUpdateError::TooEarly => {
|
||||
debug!(
|
||||
self.log,
|
||||
"LC finality update too early";
|
||||
"Light client finality update too early";
|
||||
"peer" => %peer_id,
|
||||
"error" => ?e,
|
||||
);
|
||||
@ -1357,7 +1361,7 @@ impl<T: BeaconChainTypes> Worker<T> {
|
||||
}
|
||||
LightClientFinalityUpdateError::FinalityUpdateAlreadySeen => debug!(
|
||||
self.log,
|
||||
"LC finality update already seen";
|
||||
"Light client finality update already seen";
|
||||
"peer" => %peer_id,
|
||||
"error" => ?e,
|
||||
),
|
||||
@ -1366,7 +1370,7 @@ impl<T: BeaconChainTypes> Worker<T> {
|
||||
| LightClientFinalityUpdateError::SigSlotStartIsNone
|
||||
| LightClientFinalityUpdateError::FailedConstructingUpdate => debug!(
|
||||
self.log,
|
||||
"LC error constructing finality update";
|
||||
"Light client error constructing finality update";
|
||||
"peer" => %peer_id,
|
||||
"error" => ?e,
|
||||
),
|
||||
@ -1381,22 +1385,77 @@ impl<T: BeaconChainTypes> Worker<T> {
|
||||
message_id: MessageId,
|
||||
peer_id: PeerId,
|
||||
light_client_optimistic_update: LightClientOptimisticUpdate<T::EthSpec>,
|
||||
reprocess_tx: Option<mpsc::Sender<ReprocessQueueMessage<T>>>,
|
||||
seen_timestamp: Duration,
|
||||
) {
|
||||
match self
|
||||
.chain
|
||||
.verify_optimistic_update_for_gossip(light_client_optimistic_update, seen_timestamp)
|
||||
{
|
||||
Ok(_verified_light_client_optimistic_update) => {
|
||||
match self.chain.verify_optimistic_update_for_gossip(
|
||||
light_client_optimistic_update.clone(),
|
||||
seen_timestamp,
|
||||
) {
|
||||
Ok(verified_light_client_optimistic_update) => {
|
||||
debug!(
|
||||
self.log,
|
||||
"Light client successful optimistic update";
|
||||
"peer" => %peer_id,
|
||||
"parent_root" => %verified_light_client_optimistic_update.parent_root,
|
||||
);
|
||||
|
||||
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept);
|
||||
}
|
||||
Err(e) => {
|
||||
metrics::register_optimistic_update_error(&e);
|
||||
match e {
|
||||
LightClientOptimisticUpdateError::InvalidLightClientOptimisticUpdate => {
|
||||
LightClientOptimisticUpdateError::UnknownBlockParentRoot(parent_root) => {
|
||||
metrics::inc_counter(
|
||||
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_SENT_OPTIMISTIC_UPDATES,
|
||||
);
|
||||
debug!(
|
||||
self.log,
|
||||
"LC invalid optimistic update";
|
||||
"Optimistic update for unknown block";
|
||||
"peer_id" => %peer_id,
|
||||
"parent_root" => ?parent_root
|
||||
);
|
||||
|
||||
if let Some(sender) = reprocess_tx {
|
||||
let msg = ReprocessQueueMessage::UnknownLightClientOptimisticUpdate(
|
||||
QueuedLightClientUpdate {
|
||||
peer_id,
|
||||
message_id,
|
||||
light_client_optimistic_update: Box::new(
|
||||
light_client_optimistic_update,
|
||||
),
|
||||
parent_root,
|
||||
seen_timestamp,
|
||||
},
|
||||
);
|
||||
|
||||
if sender.try_send(msg).is_err() {
|
||||
error!(
|
||||
self.log,
|
||||
"Failed to send optimistic update for re-processing";
|
||||
)
|
||||
}
|
||||
} else {
|
||||
debug!(
|
||||
self.log,
|
||||
"Not sending light client update because it had been reprocessed";
|
||||
"peer_id" => %peer_id,
|
||||
"parent_root" => ?parent_root
|
||||
);
|
||||
|
||||
self.propagate_validation_result(
|
||||
message_id,
|
||||
peer_id,
|
||||
MessageAcceptance::Ignore,
|
||||
);
|
||||
}
|
||||
return;
|
||||
}
|
||||
LightClientOptimisticUpdateError::InvalidLightClientOptimisticUpdate => {
|
||||
metrics::register_optimistic_update_error(&e);
|
||||
|
||||
debug!(
|
||||
self.log,
|
||||
"Light client invalid optimistic update";
|
||||
"peer" => %peer_id,
|
||||
"error" => ?e,
|
||||
);
|
||||
@ -1408,9 +1467,10 @@ impl<T: BeaconChainTypes> Worker<T> {
|
||||
)
|
||||
}
|
||||
LightClientOptimisticUpdateError::TooEarly => {
|
||||
metrics::register_optimistic_update_error(&e);
|
||||
debug!(
|
||||
self.log,
|
||||
"LC optimistic update too early";
|
||||
"Light client optimistic update too early";
|
||||
"peer" => %peer_id,
|
||||
"error" => ?e,
|
||||
);
|
||||
@ -1421,21 +1481,29 @@ impl<T: BeaconChainTypes> Worker<T> {
|
||||
"light_client_gossip_error",
|
||||
);
|
||||
}
|
||||
LightClientOptimisticUpdateError::OptimisticUpdateAlreadySeen => debug!(
|
||||
self.log,
|
||||
"LC optimistic update already seen";
|
||||
"peer" => %peer_id,
|
||||
"error" => ?e,
|
||||
),
|
||||
LightClientOptimisticUpdateError::OptimisticUpdateAlreadySeen => {
|
||||
metrics::register_optimistic_update_error(&e);
|
||||
|
||||
debug!(
|
||||
self.log,
|
||||
"Light client optimistic update already seen";
|
||||
"peer" => %peer_id,
|
||||
"error" => ?e,
|
||||
)
|
||||
}
|
||||
LightClientOptimisticUpdateError::BeaconChainError(_)
|
||||
| LightClientOptimisticUpdateError::LightClientUpdateError(_)
|
||||
| LightClientOptimisticUpdateError::SigSlotStartIsNone
|
||||
| LightClientOptimisticUpdateError::FailedConstructingUpdate => debug!(
|
||||
self.log,
|
||||
"LC error constructing optimistic update";
|
||||
"peer" => %peer_id,
|
||||
"error" => ?e,
|
||||
),
|
||||
| LightClientOptimisticUpdateError::FailedConstructingUpdate => {
|
||||
metrics::register_optimistic_update_error(&e);
|
||||
|
||||
debug!(
|
||||
self.log,
|
||||
"Light client error constructing optimistic update";
|
||||
"peer" => %peer_id,
|
||||
"error" => ?e,
|
||||
)
|
||||
}
|
||||
}
|
||||
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
|
||||
}
|
||||
|
@ -84,6 +84,7 @@ impl<T: BeaconChainTypes> Worker<T> {
|
||||
}
|
||||
};
|
||||
let slot = block.slot();
|
||||
let parent_root = block.message().parent_root();
|
||||
let result = self
|
||||
.chain
|
||||
.process_block(
|
||||
@ -101,7 +102,10 @@ impl<T: BeaconChainTypes> Worker<T> {
|
||||
info!(self.log, "New RPC block received"; "slot" => slot, "hash" => %hash);
|
||||
|
||||
// Trigger processing for work referencing this block.
|
||||
let reprocess_msg = ReprocessQueueMessage::BlockImported(hash);
|
||||
let reprocess_msg = ReprocessQueueMessage::BlockImported {
|
||||
block_root: hash,
|
||||
parent_root,
|
||||
};
|
||||
if reprocess_tx.try_send(reprocess_msg).is_err() {
|
||||
error!(self.log, "Failed to inform block import"; "source" => "rpc", "block_root" => %hash)
|
||||
};
|
||||
|
@ -370,6 +370,21 @@ lazy_static! {
|
||||
"Number of queued attestations where as matching block has been imported."
|
||||
);
|
||||
|
||||
/*
|
||||
* Light client update reprocessing queue metrics.
|
||||
*/
|
||||
pub static ref BEACON_PROCESSOR_REPROCESSING_QUEUE_EXPIRED_OPTIMISTIC_UPDATES: Result<IntCounter> = try_create_int_counter(
|
||||
"beacon_processor_reprocessing_queue_expired_optimistic_updates",
|
||||
"Number of queued light client optimistic updates which have expired before a matching block has been found."
|
||||
);
|
||||
pub static ref BEACON_PROCESSOR_REPROCESSING_QUEUE_MATCHED_OPTIMISTIC_UPDATES: Result<IntCounter> = try_create_int_counter(
|
||||
"beacon_processor_reprocessing_queue_matched_optimistic_updates",
|
||||
"Number of queued light client optimistic updates where as matching block has been imported."
|
||||
);
|
||||
pub static ref BEACON_PROCESSOR_REPROCESSING_QUEUE_SENT_OPTIMISTIC_UPDATES: Result<IntCounter> = try_create_int_counter(
|
||||
"beacon_processor_reprocessing_queue_sent_optimistic_updates",
|
||||
"Number of queued light client optimistic updates where as matching block has been imported."
|
||||
);
|
||||
}
|
||||
|
||||
pub fn update_bandwidth_metrics(bandwidth: Arc<BandwidthSinks>) {
|
||||
|
Loading…
Reference in New Issue
Block a user