mirror of
https://gitlab.com/pulsechaincom/lighthouse-pulse.git
synced 2024-12-22 03:30:38 +00:00
Add blob_sidecar
event to SSE (#4790)
* Add `blob_sidecar` event to SSE. * Return 202 if a block is published but failed blob validation when validation level is `Gossip`. * Move `BlobSidecar` event to `process_gossip_blob` and add test. * Emit `BlobSidecar` event when blobs are received over rpc. * Improve test assertions on `SseBlobSidecar`s. * Add quotes to blob index serialization in `SseBlobSidecar` Co-authored-by: realbigsean <seananderson33@GMAIL.com> --------- Co-authored-by: realbigsean <seananderson33@GMAIL.com>
This commit is contained in:
parent
4555e33048
commit
38e7172508
@ -70,7 +70,7 @@ use crate::{
|
||||
kzg_utils, metrics, AvailabilityPendingExecutedBlock, BeaconChainError, BeaconForkChoiceStore,
|
||||
BeaconSnapshot, CachedHead,
|
||||
};
|
||||
use eth2::types::{EventKind, SseBlock, SseExtendedPayloadAttributes, SyncDuty};
|
||||
use eth2::types::{EventKind, SseBlobSidecar, SseBlock, SseExtendedPayloadAttributes, SyncDuty};
|
||||
use execution_layer::{
|
||||
BlockProposalContents, BuilderParams, ChainHealth, ExecutionLayer, FailedCondition,
|
||||
PayloadAttributes, PayloadStatus,
|
||||
@ -2809,6 +2809,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
return Err(BlockError::BlockIsAlreadyKnown);
|
||||
}
|
||||
|
||||
if let Some(event_handler) = self.event_handler.as_ref() {
|
||||
if event_handler.has_blob_sidecar_subscribers() {
|
||||
event_handler.register(EventKind::BlobSidecar(SseBlobSidecar::from_blob_sidecar(
|
||||
blob.as_blob(),
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
self.data_availability_checker
|
||||
.notify_gossip_blob(blob.as_blob().slot, block_root, &blob);
|
||||
let r = self.check_gossip_blob_availability_and_import(blob).await;
|
||||
@ -2833,6 +2841,16 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
return Err(BlockError::BlockIsAlreadyKnown);
|
||||
}
|
||||
|
||||
if let Some(event_handler) = self.event_handler.as_ref() {
|
||||
if event_handler.has_blob_sidecar_subscribers() {
|
||||
for blob in blobs.iter().filter_map(|maybe_blob| maybe_blob.as_ref()) {
|
||||
event_handler.register(EventKind::BlobSidecar(
|
||||
SseBlobSidecar::from_blob_sidecar(blob),
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.data_availability_checker
|
||||
.notify_rpc_blobs(slot, block_root, &blobs);
|
||||
let r = self
|
||||
|
@ -10,7 +10,7 @@ use crate::block_verification::cheap_state_advance_to_obtain_committees;
|
||||
use crate::data_availability_checker::AvailabilityCheckError;
|
||||
use crate::kzg_utils::{validate_blob, validate_blobs};
|
||||
use crate::{metrics, BeaconChainError};
|
||||
use kzg::Kzg;
|
||||
use kzg::{Kzg, KzgCommitment};
|
||||
use slog::{debug, warn};
|
||||
use ssz_derive::{Decode, Encode};
|
||||
use ssz_types::VariableList;
|
||||
@ -182,6 +182,12 @@ impl<T: BeaconChainTypes> GossipVerifiedBlob<T> {
|
||||
pub fn slot(&self) -> Slot {
|
||||
self.blob.message.slot
|
||||
}
|
||||
pub fn index(&self) -> u64 {
|
||||
self.blob.message.index
|
||||
}
|
||||
pub fn kzg_commitment(&self) -> KzgCommitment {
|
||||
self.blob.message.kzg_commitment
|
||||
}
|
||||
pub fn proposer_index(&self) -> u64 {
|
||||
self.blob.message.proposer_index
|
||||
}
|
||||
|
@ -9,6 +9,7 @@ const DEFAULT_CHANNEL_CAPACITY: usize = 16;
|
||||
pub struct ServerSentEventHandler<T: EthSpec> {
|
||||
attestation_tx: Sender<EventKind<T>>,
|
||||
block_tx: Sender<EventKind<T>>,
|
||||
blob_sidecar_tx: Sender<EventKind<T>>,
|
||||
finalized_tx: Sender<EventKind<T>>,
|
||||
head_tx: Sender<EventKind<T>>,
|
||||
exit_tx: Sender<EventKind<T>>,
|
||||
@ -31,6 +32,7 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
|
||||
pub fn new_with_capacity(log: Logger, capacity: usize) -> Self {
|
||||
let (attestation_tx, _) = broadcast::channel(capacity);
|
||||
let (block_tx, _) = broadcast::channel(capacity);
|
||||
let (blob_sidecar_tx, _) = broadcast::channel(capacity);
|
||||
let (finalized_tx, _) = broadcast::channel(capacity);
|
||||
let (head_tx, _) = broadcast::channel(capacity);
|
||||
let (exit_tx, _) = broadcast::channel(capacity);
|
||||
@ -43,6 +45,7 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
|
||||
Self {
|
||||
attestation_tx,
|
||||
block_tx,
|
||||
blob_sidecar_tx,
|
||||
finalized_tx,
|
||||
head_tx,
|
||||
exit_tx,
|
||||
@ -73,6 +76,10 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
|
||||
.block_tx
|
||||
.send(kind)
|
||||
.map(|count| log_count("block", count)),
|
||||
EventKind::BlobSidecar(_) => self
|
||||
.blob_sidecar_tx
|
||||
.send(kind)
|
||||
.map(|count| log_count("blob sidecar", count)),
|
||||
EventKind::FinalizedCheckpoint(_) => self
|
||||
.finalized_tx
|
||||
.send(kind)
|
||||
@ -119,6 +126,10 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
|
||||
self.block_tx.subscribe()
|
||||
}
|
||||
|
||||
pub fn subscribe_blob_sidecar(&self) -> Receiver<EventKind<T>> {
|
||||
self.blob_sidecar_tx.subscribe()
|
||||
}
|
||||
|
||||
pub fn subscribe_finalized(&self) -> Receiver<EventKind<T>> {
|
||||
self.finalized_tx.subscribe()
|
||||
}
|
||||
@ -159,6 +170,10 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
|
||||
self.block_tx.receiver_count() > 0
|
||||
}
|
||||
|
||||
pub fn has_blob_sidecar_subscribers(&self) -> bool {
|
||||
self.blob_sidecar_tx.receiver_count() > 0
|
||||
}
|
||||
|
||||
pub fn has_finalized_subscribers(&self) -> bool {
|
||||
self.finalized_tx.receiver_count() > 0
|
||||
}
|
||||
|
99
beacon_node/beacon_chain/tests/events.rs
Normal file
99
beacon_node/beacon_chain/tests/events.rs
Normal file
@ -0,0 +1,99 @@
|
||||
use beacon_chain::blob_verification::GossipVerifiedBlob;
|
||||
use beacon_chain::test_utils::BeaconChainHarness;
|
||||
use bls::Signature;
|
||||
use eth2::types::{EventKind, SseBlobSidecar};
|
||||
use rand::rngs::StdRng;
|
||||
use rand::SeedableRng;
|
||||
use std::marker::PhantomData;
|
||||
use std::sync::Arc;
|
||||
use types::blob_sidecar::FixedBlobSidecarList;
|
||||
use types::{BlobSidecar, EthSpec, ForkName, MinimalEthSpec, SignedBlobSidecar};
|
||||
|
||||
type E = MinimalEthSpec;
|
||||
|
||||
/// Verifies that a blob event is emitted when a gossip verified blob is received via gossip or the publish block API.
|
||||
#[tokio::test]
|
||||
async fn blob_sidecar_event_on_process_gossip_blob() {
|
||||
let spec = ForkName::Deneb.make_genesis_spec(E::default_spec());
|
||||
let harness = BeaconChainHarness::builder(E::default())
|
||||
.spec(spec)
|
||||
.deterministic_keypairs(8)
|
||||
.fresh_ephemeral_store()
|
||||
.mock_execution_layer()
|
||||
.build();
|
||||
|
||||
// subscribe to blob sidecar events
|
||||
let event_handler = harness.chain.event_handler.as_ref().unwrap();
|
||||
let mut blob_event_receiver = event_handler.subscribe_blob_sidecar();
|
||||
|
||||
// build and process a gossip verified blob
|
||||
let kzg = harness.chain.kzg.as_ref().unwrap();
|
||||
let mut rng = StdRng::seed_from_u64(0xDEADBEEF0BAD5EEDu64);
|
||||
let signed_sidecar = SignedBlobSidecar {
|
||||
message: BlobSidecar::random_valid(&mut rng, kzg)
|
||||
.map(Arc::new)
|
||||
.unwrap(),
|
||||
signature: Signature::empty(),
|
||||
_phantom: PhantomData,
|
||||
};
|
||||
let gossip_verified_blob = GossipVerifiedBlob::__assumed_valid(signed_sidecar);
|
||||
let expected_sse_blobs = SseBlobSidecar::from_blob_sidecar(gossip_verified_blob.as_blob());
|
||||
|
||||
let _ = harness
|
||||
.chain
|
||||
.process_gossip_blob(gossip_verified_blob)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let sidecar_event = blob_event_receiver.try_recv().unwrap();
|
||||
assert_eq!(sidecar_event, EventKind::BlobSidecar(expected_sse_blobs));
|
||||
}
|
||||
|
||||
/// Verifies that a blob event is emitted when blobs are received via RPC.
|
||||
#[tokio::test]
|
||||
async fn blob_sidecar_event_on_process_rpc_blobs() {
|
||||
let spec = ForkName::Deneb.make_genesis_spec(E::default_spec());
|
||||
let harness = BeaconChainHarness::builder(E::default())
|
||||
.spec(spec)
|
||||
.deterministic_keypairs(8)
|
||||
.fresh_ephemeral_store()
|
||||
.mock_execution_layer()
|
||||
.build();
|
||||
|
||||
// subscribe to blob sidecar events
|
||||
let event_handler = harness.chain.event_handler.as_ref().unwrap();
|
||||
let mut blob_event_receiver = event_handler.subscribe_blob_sidecar();
|
||||
|
||||
// build and process multiple rpc blobs
|
||||
let kzg = harness.chain.kzg.as_ref().unwrap();
|
||||
let mut rng = StdRng::seed_from_u64(0xDEADBEEF0BAD5EEDu64);
|
||||
|
||||
let blob_1 = BlobSidecar::random_valid(&mut rng, kzg)
|
||||
.map(Arc::new)
|
||||
.unwrap();
|
||||
let blob_2 = Arc::new(BlobSidecar {
|
||||
index: 1,
|
||||
..BlobSidecar::random_valid(&mut rng, kzg).unwrap()
|
||||
});
|
||||
let blobs = FixedBlobSidecarList::from(vec![Some(blob_1.clone()), Some(blob_2.clone())]);
|
||||
let expected_sse_blobs = vec![
|
||||
SseBlobSidecar::from_blob_sidecar(blob_1.as_ref()),
|
||||
SseBlobSidecar::from_blob_sidecar(blob_2.as_ref()),
|
||||
];
|
||||
|
||||
let _ = harness
|
||||
.chain
|
||||
.process_rpc_blobs(blob_1.slot, blob_1.block_root, blobs)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut sse_blobs: Vec<SseBlobSidecar> = vec![];
|
||||
while let Ok(sidecar_event) = blob_event_receiver.try_recv() {
|
||||
if let EventKind::BlobSidecar(sse_blob_sidecar) = sidecar_event {
|
||||
sse_blobs.push(sse_blob_sidecar);
|
||||
} else {
|
||||
panic!("`BlobSidecar` event kind expected.");
|
||||
}
|
||||
}
|
||||
assert_eq!(sse_blobs, expected_sse_blobs);
|
||||
}
|
@ -2,6 +2,7 @@ mod attestation_production;
|
||||
mod attestation_verification;
|
||||
mod block_verification;
|
||||
mod capella;
|
||||
mod events;
|
||||
mod merge;
|
||||
mod op_verification;
|
||||
mod payload_invalidation;
|
||||
|
@ -4425,6 +4425,9 @@ pub fn serve<T: BeaconChainTypes>(
|
||||
let receiver = match topic {
|
||||
api_types::EventTopic::Head => event_handler.subscribe_head(),
|
||||
api_types::EventTopic::Block => event_handler.subscribe_block(),
|
||||
api_types::EventTopic::BlobSidecar => {
|
||||
event_handler.subscribe_blob_sidecar()
|
||||
}
|
||||
api_types::EventTopic::Attestation => {
|
||||
event_handler.subscribe_attestation()
|
||||
}
|
||||
|
@ -199,9 +199,17 @@ pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlockConten
|
||||
if let Some(gossip_verified_blobs) = gossip_verified_blobs {
|
||||
for blob in gossip_verified_blobs {
|
||||
if let Err(e) = chain.process_gossip_blob(blob).await {
|
||||
return Err(warp_utils::reject::custom_bad_request(format!(
|
||||
"Invalid blob: {e}"
|
||||
)));
|
||||
let msg = format!("Invalid blob: {e}");
|
||||
return if let BroadcastValidation::Gossip = validation_level {
|
||||
Err(warp_utils::reject::broadcast_without_import(msg))
|
||||
} else {
|
||||
error!(
|
||||
log,
|
||||
"Invalid blob provided to HTTP API";
|
||||
"reason" => &msg
|
||||
);
|
||||
Err(warp_utils::reject::custom_bad_request(msg))
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -260,13 +260,13 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
pub fn generate_rpc_blobs_process_fn(
|
||||
self: Arc<Self>,
|
||||
block_root: Hash256,
|
||||
block: FixedBlobSidecarList<T::EthSpec>,
|
||||
blobs: FixedBlobSidecarList<T::EthSpec>,
|
||||
seen_timestamp: Duration,
|
||||
process_type: BlockProcessType,
|
||||
) -> AsyncFn {
|
||||
let process_fn = async move {
|
||||
self.clone()
|
||||
.process_rpc_blobs(block_root, block, seen_timestamp, process_type)
|
||||
.process_rpc_blobs(block_root, blobs, seen_timestamp, process_type)
|
||||
.await;
|
||||
};
|
||||
Box::pin(process_fn)
|
||||
|
@ -888,6 +888,28 @@ pub struct SseBlock {
|
||||
pub execution_optimistic: bool,
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct SseBlobSidecar {
|
||||
pub block_root: Hash256,
|
||||
#[serde(with = "serde_utils::quoted_u64")]
|
||||
pub index: u64,
|
||||
pub slot: Slot,
|
||||
pub kzg_commitment: KzgCommitment,
|
||||
pub versioned_hash: VersionedHash,
|
||||
}
|
||||
|
||||
impl SseBlobSidecar {
|
||||
pub fn from_blob_sidecar<E: EthSpec>(blob_sidecar: &BlobSidecar<E>) -> SseBlobSidecar {
|
||||
SseBlobSidecar {
|
||||
block_root: blob_sidecar.block_root,
|
||||
index: blob_sidecar.index,
|
||||
slot: blob_sidecar.slot,
|
||||
kzg_commitment: blob_sidecar.kzg_commitment,
|
||||
versioned_hash: blob_sidecar.kzg_commitment.calculate_versioned_hash(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct SseFinalizedCheckpoint {
|
||||
pub block: Hash256,
|
||||
@ -1018,6 +1040,7 @@ impl ForkVersionDeserialize for SseExtendedPayloadAttributes {
|
||||
pub enum EventKind<T: EthSpec> {
|
||||
Attestation(Box<Attestation<T>>),
|
||||
Block(SseBlock),
|
||||
BlobSidecar(SseBlobSidecar),
|
||||
FinalizedCheckpoint(SseFinalizedCheckpoint),
|
||||
Head(SseHead),
|
||||
VoluntaryExit(SignedVoluntaryExit),
|
||||
@ -1034,6 +1057,7 @@ impl<T: EthSpec> EventKind<T> {
|
||||
match self {
|
||||
EventKind::Head(_) => "head",
|
||||
EventKind::Block(_) => "block",
|
||||
EventKind::BlobSidecar(_) => "blob_sidecar",
|
||||
EventKind::Attestation(_) => "attestation",
|
||||
EventKind::VoluntaryExit(_) => "voluntary_exit",
|
||||
EventKind::FinalizedCheckpoint(_) => "finalized_checkpoint",
|
||||
@ -1071,6 +1095,9 @@ impl<T: EthSpec> EventKind<T> {
|
||||
"block" => Ok(EventKind::Block(serde_json::from_str(data).map_err(
|
||||
|e| ServerError::InvalidServerSentEvent(format!("Block: {:?}", e)),
|
||||
)?)),
|
||||
"blob_sidecar" => Ok(EventKind::BlobSidecar(serde_json::from_str(data).map_err(
|
||||
|e| ServerError::InvalidServerSentEvent(format!("Blob Sidecar: {:?}", e)),
|
||||
)?)),
|
||||
"chain_reorg" => Ok(EventKind::ChainReorg(serde_json::from_str(data).map_err(
|
||||
|e| ServerError::InvalidServerSentEvent(format!("Chain Reorg: {:?}", e)),
|
||||
)?)),
|
||||
@ -1123,6 +1150,7 @@ pub struct EventQuery {
|
||||
pub enum EventTopic {
|
||||
Head,
|
||||
Block,
|
||||
BlobSidecar,
|
||||
Attestation,
|
||||
VoluntaryExit,
|
||||
FinalizedCheckpoint,
|
||||
@ -1141,6 +1169,7 @@ impl FromStr for EventTopic {
|
||||
match s {
|
||||
"head" => Ok(EventTopic::Head),
|
||||
"block" => Ok(EventTopic::Block),
|
||||
"blob_sidecar" => Ok(EventTopic::BlobSidecar),
|
||||
"attestation" => Ok(EventTopic::Attestation),
|
||||
"voluntary_exit" => Ok(EventTopic::VoluntaryExit),
|
||||
"finalized_checkpoint" => Ok(EventTopic::FinalizedCheckpoint),
|
||||
@ -1160,6 +1189,7 @@ impl fmt::Display for EventTopic {
|
||||
match self {
|
||||
EventTopic::Head => write!(f, "head"),
|
||||
EventTopic::Block => write!(f, "block"),
|
||||
EventTopic::BlobSidecar => write!(f, "blob_sidecar"),
|
||||
EventTopic::Attestation => write!(f, "attestation"),
|
||||
EventTopic::VoluntaryExit => write!(f, "voluntary_exit"),
|
||||
EventTopic::FinalizedCheckpoint => write!(f, "finalized_checkpoint"),
|
||||
|
Loading…
Reference in New Issue
Block a user