Update to closer match @tomaka demo

This commit is contained in:
Paul Hauner 2018-07-29 10:11:37 +10:00
parent b96bf6e3de
commit 68af2011b5
2 changed files with 105 additions and 11 deletions

View File

@ -4,6 +4,7 @@ version = "0.0.1"
authors = ["Paul Hauner <paul@paulhauner.com>"] authors = ["Paul Hauner <paul@paulhauner.com>"]
[dependencies] [dependencies]
bigint = "4.2"
blake2 = "^0.7.1" blake2 = "^0.7.1"
bls = { git = "https://github.com/sigp/bls" } bls = { git = "https://github.com/sigp/bls" }
bytes = "" bytes = ""
@ -27,6 +28,8 @@ slog-term = "^2.4.0"
slog-async = "^2.3.0" slog-async = "^2.3.0"
tokio-io = "0.1" tokio-io = "0.1"
tokio-core = "0.1" tokio-core = "0.1"
tokio-timer = "0.1"
tokio-stdin = "0.1"
[dependencies.pairing] [dependencies.pairing]
git = "https://github.com/mmaker/pairing" git = "https://github.com/mmaker/pairing"

View File

@ -1,3 +1,4 @@
extern crate bigint;
extern crate bytes; extern crate bytes;
extern crate futures; extern crate futures;
extern crate libp2p_peerstore; extern crate libp2p_peerstore;
@ -10,16 +11,19 @@ extern crate libp2p_kad;
extern crate slog; extern crate slog;
extern crate tokio_core; extern crate tokio_core;
extern crate tokio_io; extern crate tokio_io;
extern crate tokio_timer;
extern crate tokio_stdin;
use self::futures::Future; use self::bigint::U512;
use self::futures::{ Future, Stream };
use self::libp2p_peerstore::PeerId; use self::libp2p_peerstore::PeerId;
use self::libp2p_core::{ Endpoint, Multiaddr, Transport, ConnectionUpgrade }; use self::libp2p_core::{ AddrComponent, Endpoint, Multiaddr, Transport, ConnectionUpgrade };
use self::libp2p_floodsub::{ FloodSubUpgrade, FloodSubFuture }; use self::libp2p_floodsub::{ FloodSubUpgrade, FloodSubFuture };
use self::libp2p_kad::{ KademliaUpgrade, KademliaProcessingFuture}; use self::libp2p_kad::{ KademliaUpgrade, KademliaProcessingFuture};
use self::libp2p_identify::{ IdentifyInfo, IdentifyTransport, IdentifyOutput }; use self::libp2p_identify::{ IdentifyInfo, IdentifyTransport, IdentifyOutput };
use self::slog::Logger; use self::slog::Logger;
use std::sync::{ Arc, RwLock }; use std::sync::{ Arc, RwLock };
use std::time::Duration; use std::time::{ Duration, Instant };
use std::ops::Deref; use std::ops::Deref;
use std::io::Error as IoError; use std::io::Error as IoError;
use libp2p_peerstore::memory_peerstore::MemoryPeerstore; use libp2p_peerstore::memory_peerstore::MemoryPeerstore;
@ -35,7 +39,7 @@ pub fn listen(peer_id: PeerId,
info!(log, "Local PeerId: {:?}", peer_id); info!(log, "Local PeerId: {:?}", peer_id);
let core = tokio_core::reactor::Core::new().expect("tokio failure."); let mut core = tokio_core::reactor::Core::new().expect("tokio failure.");
let listened_addrs = Arc::new(RwLock::new(vec![])); let listened_addrs = Arc::new(RwLock::new(vec![]));
let transport = libp2p_tcp_transport::TcpConfig::new(core.handle()) let transport = libp2p_tcp_transport::TcpConfig::new(core.handle())
.with_upgrade(libp2p_core::upgrade::PlainTextConfig) .with_upgrade(libp2p_core::upgrade::PlainTextConfig)
@ -45,13 +49,13 @@ pub fn listen(peer_id: PeerId,
let (floodsub_upgrade, floodsub_rx) = FloodSubUpgrade::new(peer_id.clone()); let (floodsub_upgrade, floodsub_rx) = FloodSubUpgrade::new(peer_id.clone());
let transport_sockets = { let transport_sockets = {
let listened_addrs = Arc::new(RwLock::new(vec![])); let listened_addrs = listened_addrs.clone();
let listen_multiaddr = listen_multiaddr.clone(); let listen_multiaddr = listen_multiaddr.clone();
IdentifyTransport::new(transport.clone(), peer_store.clone()) IdentifyTransport::new(transport.clone(), peer_store.clone())
.map(move |out, _, _| { .map(move |out, _, _| {
if let(Some(ref observed), ref listen_multiaddr) = (out.observed_addr, listen_multiaddr) { if let(Some(ref observed), ref listen_multiaddr) = (out.observed_addr, listen_multiaddr) {
if let Some(viewed_from_outisde) = transport.nat_traversal(listen_multiaddr, observed) { if let Some(viewed_from_outside) = transport.nat_traversal(listen_multiaddr, observed) {
listened_addrs.write().unwrap().push(viewed_from_outisde); listened_addrs.write().unwrap().push(viewed_from_outside);
} }
} }
out.socket out.socket
@ -76,14 +80,15 @@ pub fn listen(peer_id: PeerId,
}; };
let swarm_listened_addrs = listened_addrs.clone(); let swarm_listened_addrs = listened_addrs.clone();
let (swarm_ctrl, swarm_future) = libp2p_core::swarm( let swarm_peer_id = peer_id.clone();
let (swarm_ctl, swarm_future) = libp2p_core::swarm(
transport_sockets.clone().with_upgrade(upgrade), transport_sockets.clone().with_upgrade(upgrade),
move |upgrade, client_addr| match upgrade { move |upgrade, client_addr| match upgrade {
FinalUpgrade::Kad(kad) => Box::new(kad) as Box<_>, FinalUpgrade::Kad(kad) => Box::new(kad) as Box<_>,
FinalUpgrade::FloodSub(future) => Box::new(future) as Box<_>, FinalUpgrade::FloodSub(future) => Box::new(future) as Box<_>,
FinalUpgrade::Identify(IdentifyOutput::Sender { sender, .. }) => sender.send( FinalUpgrade::Identify(IdentifyOutput::Sender { sender, .. }) => sender.send(
IdentifyInfo { IdentifyInfo {
public_key: peer_id.clone().into_bytes(), public_key: swarm_peer_id.clone().into_bytes(),
agent_version: "lighthouse/1.0.0".to_owned(), agent_version: "lighthouse/1.0.0".to_owned(),
protocol_version: "rust-libp2p/1.0.0".to_owned(), protocol_version: "rust-libp2p/1.0.0".to_owned(),
listen_addrs: swarm_listened_addrs.read().unwrap().to_vec(), listen_addrs: swarm_listened_addrs.read().unwrap().to_vec(),
@ -101,11 +106,97 @@ pub fn listen(peer_id: PeerId,
}, },
); );
let actual_addr = swarm_ctrl let actual_addr = swarm_ctl
.listen_on(listen_multiaddr) .listen_on(listen_multiaddr)
.expect("Failed to listen on multiaddr"); .expect("Failed to listen on multiaddr");
info!(log, "Listening on: {:?}", actual_addr); info!(log, "Listening on: {:?}", actual_addr);
let (kad_ctl, kad_init) = kad_ctl_proto.start(
swarm_ctl.clone(),
transport_sockets.clone().with_upgrade(kad_upgrade));
let topic = libp2p_floodsub::TopicBuilder::new("lighthouse").build();
let floodsub_ctl = libp2p_floodsub::FloodSubController::
new(&floodsub_upgrade);
floodsub_ctl.subscribe(&topic);
let floodsub_rx = floodsub_rx.for_each(|msg| {
if let Ok(msg) = String::from_utf8(msg.data) {
info!(log, "< {}", msg);
}
Ok(())
});
let kad_poll = {
let polling_peer_id = peer_id.clone();
tokio_timer::wheel()
.build()
.interval_at(Instant::now(), Duration::from_secs(30))
.map_err(|_| -> IoError { unreachable!() })
.and_then(move |()| kad_ctl.find_node(peer_id.clone()))
.for_each(move |peers| {
let local_hash = U512::from(polling_peer_id.hash());
info!(log, "Peer discovery results:");
for peer in peers {
let peer_hash = U512::from(peer.hash());
let distance = 512 - (local_hash ^ peer_hash).leading_zeros();
let peer_addr = AddrComponent::P2P(peer.into_bytes()).into();
let dial_result = swarm_ctl.dial(
peer_addr,
transport_sockets.clone().with_upgrade(
floodsub_upgrade.clone()
)
);
if let Err(err) = dial_result {
warn!(log, "Dialling {:?} failed.", err)
}
}
Ok(())
})
};
let stdin = build_stdin_future().for_each(move |msg| {
info!(log, "> {}", msg);
floodsub_ctl.publish(&topic, msg.into_bytes());
Ok(())
});
let final_future = swarm_future
.select(floodsub_rx)
.map_err(|(err, _)| err)
.map(|((), _)| ())
.select(stdin)
.map_err(|(err, _)| err)
.map(|((), _)| ())
.select(kad_poll)
.map_err(|(err, _)| err)
.map(|((), _)| ())
.select(kad_init)
.map_err(|(err, _)| err)
.and_then(|((), n)| n);
core.run(final_future).unwrap();
}
fn build_stdin_future() -> impl Stream<Item = String, Error = IoError> {
use std::mem;
let mut buffer = Vec::new();
tokio_stdin::spawn_stdin_stream_unbounded()
.map_err(|_| -> IoError { panic!() })
.filter_map(move |msg| {
if msg != b'\r' && msg != b'\n' {
buffer.push(msg);
return None;
} else if buffer.is_empty() {
return None;
}
Some(String::from_utf8(mem::replace(&mut buffer, Vec::new())).unwrap())
})
} }
#[derive(Clone)] #[derive(Clone)]