From 1644289a08b5e356aa66eb4fa546a8295905db4c Mon Sep 17 00:00:00 2001 From: blacktemplar Date: Sat, 24 Oct 2020 01:05:37 +0000 Subject: [PATCH] Updates the libp2p to the second newest commit => Allow only one topic per message (#1819) As @AgeManning mentioned the newest libp2p version had some problems and got downgraded again on lighthouse master. This is an intermediate version that makes no problems and only adds a small change of allowing only one topic per message. --- Cargo.lock | 30 +++--- beacon_node/eth2_libp2p/Cargo.toml | 2 +- beacon_node/eth2_libp2p/src/behaviour/mod.rs | 8 +- beacon_node/eth2_libp2p/src/types/pubsub.rs | 108 ++++++++----------- 4 files changed, 64 insertions(+), 84 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ca2397dc7..02c47002b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3037,7 +3037,7 @@ checksum = "c7d73b3f436185384286bd8098d17ec07c9a7d2388a6599f824d8502b529702a" [[package]] name = "libp2p" version = "0.29.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=a731aa803d986977c25a77ed2b002d9578f7377c#a731aa803d986977c25a77ed2b002d9578f7377c" +source = "git+https://github.com/sigp/rust-libp2p?rev=8c6ce6eb1228de568568f6cd72fb134dea5f9669#8c6ce6eb1228de568568f6cd72fb134dea5f9669" dependencies = [ "atomic", "bytes 0.5.6", @@ -3054,7 +3054,7 @@ dependencies = [ "libp2p-tcp", "libp2p-websocket", "multihash", - "parity-multiaddr 0.9.3 (git+https://github.com/sigp/rust-libp2p?rev=a731aa803d986977c25a77ed2b002d9578f7377c)", + "parity-multiaddr 0.9.3 (git+https://github.com/sigp/rust-libp2p?rev=8c6ce6eb1228de568568f6cd72fb134dea5f9669)", "parking_lot 0.11.0", "pin-project 0.4.27", "smallvec 1.4.2", @@ -3098,7 +3098,7 @@ dependencies = [ [[package]] name = "libp2p-core" version = "0.22.2" -source = "git+https://github.com/sigp/rust-libp2p?rev=a731aa803d986977c25a77ed2b002d9578f7377c#a731aa803d986977c25a77ed2b002d9578f7377c" +source = "git+https://github.com/sigp/rust-libp2p?rev=8c6ce6eb1228de568568f6cd72fb134dea5f9669#8c6ce6eb1228de568568f6cd72fb134dea5f9669" dependencies = [ "asn1_der", "bs58", @@ -3112,7 +3112,7 @@ dependencies = [ "log 0.4.11", "multihash", "multistream-select 0.8.3", - "parity-multiaddr 0.9.3 (git+https://github.com/sigp/rust-libp2p?rev=a731aa803d986977c25a77ed2b002d9578f7377c)", + "parity-multiaddr 0.9.3 (git+https://github.com/sigp/rust-libp2p?rev=8c6ce6eb1228de568568f6cd72fb134dea5f9669)", "parking_lot 0.11.0", "pin-project 0.4.27", "prost", @@ -3131,7 +3131,7 @@ dependencies = [ [[package]] name = "libp2p-core-derive" version = "0.20.2" -source = "git+https://github.com/sigp/rust-libp2p?rev=a731aa803d986977c25a77ed2b002d9578f7377c#a731aa803d986977c25a77ed2b002d9578f7377c" +source = "git+https://github.com/sigp/rust-libp2p?rev=8c6ce6eb1228de568568f6cd72fb134dea5f9669#8c6ce6eb1228de568568f6cd72fb134dea5f9669" dependencies = [ "quote", "syn", @@ -3140,7 +3140,7 @@ dependencies = [ [[package]] name = "libp2p-dns" version = "0.22.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=a731aa803d986977c25a77ed2b002d9578f7377c#a731aa803d986977c25a77ed2b002d9578f7377c" +source = "git+https://github.com/sigp/rust-libp2p?rev=8c6ce6eb1228de568568f6cd72fb134dea5f9669#8c6ce6eb1228de568568f6cd72fb134dea5f9669" dependencies = [ "futures 0.3.6", "libp2p-core 0.22.2", @@ -3150,7 +3150,7 @@ dependencies = [ [[package]] name = "libp2p-gossipsub" version = "0.22.1" -source = "git+https://github.com/sigp/rust-libp2p?rev=a731aa803d986977c25a77ed2b002d9578f7377c#a731aa803d986977c25a77ed2b002d9578f7377c" +source = "git+https://github.com/sigp/rust-libp2p?rev=8c6ce6eb1228de568568f6cd72fb134dea5f9669#8c6ce6eb1228de568568f6cd72fb134dea5f9669" dependencies = [ "base64 0.12.3", "byteorder", @@ -3174,7 +3174,7 @@ dependencies = [ [[package]] name = "libp2p-identify" version = "0.22.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=a731aa803d986977c25a77ed2b002d9578f7377c#a731aa803d986977c25a77ed2b002d9578f7377c" +source = "git+https://github.com/sigp/rust-libp2p?rev=8c6ce6eb1228de568568f6cd72fb134dea5f9669#8c6ce6eb1228de568568f6cd72fb134dea5f9669" dependencies = [ "futures 0.3.6", "libp2p-core 0.22.2", @@ -3189,7 +3189,7 @@ dependencies = [ [[package]] name = "libp2p-mplex" version = "0.23.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=a731aa803d986977c25a77ed2b002d9578f7377c#a731aa803d986977c25a77ed2b002d9578f7377c" +source = "git+https://github.com/sigp/rust-libp2p?rev=8c6ce6eb1228de568568f6cd72fb134dea5f9669#8c6ce6eb1228de568568f6cd72fb134dea5f9669" dependencies = [ "bytes 0.5.6", "fnv", @@ -3204,7 +3204,7 @@ dependencies = [ [[package]] name = "libp2p-noise" version = "0.24.1" -source = "git+https://github.com/sigp/rust-libp2p?rev=a731aa803d986977c25a77ed2b002d9578f7377c#a731aa803d986977c25a77ed2b002d9578f7377c" +source = "git+https://github.com/sigp/rust-libp2p?rev=8c6ce6eb1228de568568f6cd72fb134dea5f9669#8c6ce6eb1228de568568f6cd72fb134dea5f9669" dependencies = [ "bytes 0.5.6", "curve25519-dalek", @@ -3225,7 +3225,7 @@ dependencies = [ [[package]] name = "libp2p-swarm" version = "0.22.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=a731aa803d986977c25a77ed2b002d9578f7377c#a731aa803d986977c25a77ed2b002d9578f7377c" +source = "git+https://github.com/sigp/rust-libp2p?rev=8c6ce6eb1228de568568f6cd72fb134dea5f9669#8c6ce6eb1228de568568f6cd72fb134dea5f9669" dependencies = [ "either", "futures 0.3.6", @@ -3240,7 +3240,7 @@ dependencies = [ [[package]] name = "libp2p-tcp" version = "0.22.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=a731aa803d986977c25a77ed2b002d9578f7377c#a731aa803d986977c25a77ed2b002d9578f7377c" +source = "git+https://github.com/sigp/rust-libp2p?rev=8c6ce6eb1228de568568f6cd72fb134dea5f9669#8c6ce6eb1228de568568f6cd72fb134dea5f9669" dependencies = [ "futures 0.3.6", "futures-timer", @@ -3255,7 +3255,7 @@ dependencies = [ [[package]] name = "libp2p-websocket" version = "0.23.1" -source = "git+https://github.com/sigp/rust-libp2p?rev=a731aa803d986977c25a77ed2b002d9578f7377c#a731aa803d986977c25a77ed2b002d9578f7377c" +source = "git+https://github.com/sigp/rust-libp2p?rev=8c6ce6eb1228de568568f6cd72fb134dea5f9669#8c6ce6eb1228de568568f6cd72fb134dea5f9669" dependencies = [ "async-tls", "either", @@ -3680,7 +3680,7 @@ dependencies = [ [[package]] name = "multistream-select" version = "0.8.3" -source = "git+https://github.com/sigp/rust-libp2p?rev=a731aa803d986977c25a77ed2b002d9578f7377c#a731aa803d986977c25a77ed2b002d9578f7377c" +source = "git+https://github.com/sigp/rust-libp2p?rev=8c6ce6eb1228de568568f6cd72fb134dea5f9669#8c6ce6eb1228de568568f6cd72fb134dea5f9669" dependencies = [ "bytes 0.5.6", "futures 0.3.6", @@ -4005,7 +4005,7 @@ dependencies = [ [[package]] name = "parity-multiaddr" version = "0.9.3" -source = "git+https://github.com/sigp/rust-libp2p?rev=a731aa803d986977c25a77ed2b002d9578f7377c#a731aa803d986977c25a77ed2b002d9578f7377c" +source = "git+https://github.com/sigp/rust-libp2p?rev=8c6ce6eb1228de568568f6cd72fb134dea5f9669#8c6ce6eb1228de568568f6cd72fb134dea5f9669" dependencies = [ "arrayref", "bs58", diff --git a/beacon_node/eth2_libp2p/Cargo.toml b/beacon_node/eth2_libp2p/Cargo.toml index 966a1bcd3..eae5ed13e 100644 --- a/beacon_node/eth2_libp2p/Cargo.toml +++ b/beacon_node/eth2_libp2p/Cargo.toml @@ -42,7 +42,7 @@ regex = "1.3.9" [dependencies.libp2p] #version = "0.23.0" git = "https://github.com/sigp/rust-libp2p" -rev = "a731aa803d986977c25a77ed2b002d9578f7377c" +rev = "8c6ce6eb1228de568568f6cd72fb134dea5f9669" default-features = false features = ["websocket", "identify", "mplex", "noise", "gossipsub", "dns", "tcp-tokio"] diff --git a/beacon_node/eth2_libp2p/src/behaviour/mod.rs b/beacon_node/eth2_libp2p/src/behaviour/mod.rs index 593b7279b..46a53c7ca 100644 --- a/beacon_node/eth2_libp2p/src/behaviour/mod.rs +++ b/beacon_node/eth2_libp2p/src/behaviour/mod.rs @@ -90,8 +90,8 @@ pub enum BehaviourEvent { id: MessageId, /// The peer from which we received this message, not the peer that published it. source: PeerId, - /// The topics that this message was sent on. - topics: Vec, + /// The topic that this message was sent on. + topic: TopicHash, /// The message itself. message: PubsubMessage, }, @@ -537,7 +537,7 @@ impl Behaviour { } => { // Note: We are keeping track here of the peer that sent us the message, not the // peer that originally published the message. - match PubsubMessage::decode(&gs_msg.topics, gs_msg.data()) { + match PubsubMessage::decode(&gs_msg.topic, gs_msg.data()) { Err(e) => { debug!(self.log, "Could not decode gossipsub message"; "error" => e); //reject the message @@ -554,7 +554,7 @@ impl Behaviour { self.add_event(BehaviourEvent::PubsubMessage { id, source: propagation_source, - topics: gs_msg.topics, + topic: gs_msg.topic, message: msg, }); } diff --git a/beacon_node/eth2_libp2p/src/types/pubsub.rs b/beacon_node/eth2_libp2p/src/types/pubsub.rs index c27d3df4f..6d9502fa4 100644 --- a/beacon_node/eth2_libp2p/src/types/pubsub.rs +++ b/beacon_node/eth2_libp2p/src/types/pubsub.rs @@ -94,78 +94,58 @@ impl PubsubMessage { } } - /// This decodes `data` into a `PubsubMessage` given a list of topics. - /// - /// The topics are checked - /// in order and as soon as one topic matches the decoded data, we return the data. + /// This decodes `data` into a `PubsubMessage` given a topic. /* Note: This is assuming we are not hashing topics. If we choose to hash topics, these will * need to be modified. - * - * Also note that a message can be associated with many topics. As soon as one of the topics is - * known we match. If none of the topics are known we return an unknown state. */ - pub fn decode(topics: &[TopicHash], data: &MessageData) -> Result { - let mut unknown_topics = Vec::new(); - for topic in topics { - match GossipTopic::decode(topic.as_str()) { - Err(_) => { - unknown_topics.push(topic); - continue; - } - Ok(gossip_topic) => { - let decompressed_data = match gossip_topic.encoding() { - GossipEncoding::SSZSnappy => data.decompressed.as_ref()?.as_slice(), - }; - // the ssz decoders - match gossip_topic.kind() { - GossipKind::BeaconAggregateAndProof => { - let agg_and_proof = - SignedAggregateAndProof::from_ssz_bytes(decompressed_data) - .map_err(|e| format!("{:?}", e))?; - return Ok(PubsubMessage::AggregateAndProofAttestation(Box::new( - agg_and_proof, - ))); - } - GossipKind::Attestation(subnet_id) => { - let attestation = Attestation::from_ssz_bytes(decompressed_data) + pub fn decode(topic: &TopicHash, data: &MessageData) -> Result { + match GossipTopic::decode(topic.as_str()) { + Err(_) => Err(format!("Unknown gossipsub topic: {:?}", topic)), + Ok(gossip_topic) => { + let decompressed_data = match gossip_topic.encoding() { + GossipEncoding::SSZSnappy => data.decompressed.as_ref()?.as_slice(), + }; + // the ssz decoders + match gossip_topic.kind() { + GossipKind::BeaconAggregateAndProof => { + let agg_and_proof = + SignedAggregateAndProof::from_ssz_bytes(decompressed_data) .map_err(|e| format!("{:?}", e))?; - return Ok(PubsubMessage::Attestation(Box::new(( - *subnet_id, - attestation, - )))); - } - GossipKind::BeaconBlock => { - let beacon_block = SignedBeaconBlock::from_ssz_bytes(decompressed_data) - .map_err(|e| format!("{:?}", e))?; - return Ok(PubsubMessage::BeaconBlock(Box::new(beacon_block))); - } - GossipKind::VoluntaryExit => { - let voluntary_exit = - SignedVoluntaryExit::from_ssz_bytes(decompressed_data) - .map_err(|e| format!("{:?}", e))?; - return Ok(PubsubMessage::VoluntaryExit(Box::new(voluntary_exit))); - } - GossipKind::ProposerSlashing => { - let proposer_slashing = - ProposerSlashing::from_ssz_bytes(decompressed_data) - .map_err(|e| format!("{:?}", e))?; - return Ok(PubsubMessage::ProposerSlashing(Box::new( - proposer_slashing, - ))); - } - GossipKind::AttesterSlashing => { - let attester_slashing = - AttesterSlashing::from_ssz_bytes(decompressed_data) - .map_err(|e| format!("{:?}", e))?; - return Ok(PubsubMessage::AttesterSlashing(Box::new( - attester_slashing, - ))); - } + Ok(PubsubMessage::AggregateAndProofAttestation(Box::new( + agg_and_proof, + ))) + } + GossipKind::Attestation(subnet_id) => { + let attestation = Attestation::from_ssz_bytes(decompressed_data) + .map_err(|e| format!("{:?}", e))?; + Ok(PubsubMessage::Attestation(Box::new(( + *subnet_id, + attestation, + )))) + } + GossipKind::BeaconBlock => { + let beacon_block = SignedBeaconBlock::from_ssz_bytes(decompressed_data) + .map_err(|e| format!("{:?}", e))?; + Ok(PubsubMessage::BeaconBlock(Box::new(beacon_block))) + } + GossipKind::VoluntaryExit => { + let voluntary_exit = SignedVoluntaryExit::from_ssz_bytes(decompressed_data) + .map_err(|e| format!("{:?}", e))?; + Ok(PubsubMessage::VoluntaryExit(Box::new(voluntary_exit))) + } + GossipKind::ProposerSlashing => { + let proposer_slashing = ProposerSlashing::from_ssz_bytes(decompressed_data) + .map_err(|e| format!("{:?}", e))?; + Ok(PubsubMessage::ProposerSlashing(Box::new(proposer_slashing))) + } + GossipKind::AttesterSlashing => { + let attester_slashing = AttesterSlashing::from_ssz_bytes(decompressed_data) + .map_err(|e| format!("{:?}", e))?; + Ok(PubsubMessage::AttesterSlashing(Box::new(attester_slashing))) } } } } - Err(format!("Unknown gossipsub topics: {:?}", unknown_topics)) } /// Encodes a `PubsubMessage` based on the topic encodings. The first known encoding is used. If