diff --git a/Cargo.toml b/Cargo.toml index 58f426b25..2be246a9c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,7 @@ version = "0.0.1" authors = ["Paul Hauner "] [dependencies] +bigint = "4.2" blake2 = "^0.7.1" bls = { git = "https://github.com/sigp/bls" } bytes = "" @@ -27,6 +28,8 @@ slog-term = "^2.4.0" slog-async = "^2.3.0" tokio-io = "0.1" tokio-core = "0.1" +tokio-timer = "0.1" +tokio-stdin = "0.1" [dependencies.pairing] git = "https://github.com/mmaker/pairing" diff --git a/src/p2p/floodsub.rs b/src/p2p/floodsub.rs index 8ed788aba..3bc14ab70 100644 --- a/src/p2p/floodsub.rs +++ b/src/p2p/floodsub.rs @@ -1,3 +1,4 @@ +extern crate bigint; extern crate bytes; extern crate futures; extern crate libp2p_peerstore; @@ -10,16 +11,19 @@ extern crate libp2p_kad; extern crate slog; extern crate tokio_core; extern crate tokio_io; +extern crate tokio_timer; +extern crate tokio_stdin; -use self::futures::Future; +use self::bigint::U512; +use self::futures::{ Future, Stream }; use self::libp2p_peerstore::PeerId; -use self::libp2p_core::{ Endpoint, Multiaddr, Transport, ConnectionUpgrade }; +use self::libp2p_core::{ AddrComponent, Endpoint, Multiaddr, Transport, ConnectionUpgrade }; use self::libp2p_floodsub::{ FloodSubUpgrade, FloodSubFuture }; use self::libp2p_kad::{ KademliaUpgrade, KademliaProcessingFuture}; use self::libp2p_identify::{ IdentifyInfo, IdentifyTransport, IdentifyOutput }; use self::slog::Logger; use std::sync::{ Arc, RwLock }; -use std::time::Duration; +use std::time::{ Duration, Instant }; use std::ops::Deref; use std::io::Error as IoError; use libp2p_peerstore::memory_peerstore::MemoryPeerstore; @@ -35,23 +39,23 @@ pub fn listen(peer_id: PeerId, info!(log, "Local PeerId: {:?}", peer_id); - let core = tokio_core::reactor::Core::new().expect("tokio failure."); + let mut core = tokio_core::reactor::Core::new().expect("tokio failure."); let listened_addrs = Arc::new(RwLock::new(vec![])); let transport = libp2p_tcp_transport::TcpConfig::new(core.handle()) .with_upgrade(libp2p_core::upgrade::PlainTextConfig) .with_upgrade(libp2p_mplex::BufferedMultiplexConfig::<[_; 256]>::new()) .into_connection_reuse(); - + let (floodsub_upgrade, floodsub_rx) = FloodSubUpgrade::new(peer_id.clone()); let transport_sockets = { - let listened_addrs = Arc::new(RwLock::new(vec![])); + let listened_addrs = listened_addrs.clone(); let listen_multiaddr = listen_multiaddr.clone(); IdentifyTransport::new(transport.clone(), peer_store.clone()) .map(move |out, _, _| { if let(Some(ref observed), ref listen_multiaddr) = (out.observed_addr, listen_multiaddr) { - if let Some(viewed_from_outisde) = transport.nat_traversal(listen_multiaddr, observed) { - listened_addrs.write().unwrap().push(viewed_from_outisde); + if let Some(viewed_from_outside) = transport.nat_traversal(listen_multiaddr, observed) { + listened_addrs.write().unwrap().push(viewed_from_outside); } } out.socket @@ -76,14 +80,15 @@ pub fn listen(peer_id: PeerId, }; let swarm_listened_addrs = listened_addrs.clone(); - let (swarm_ctrl, swarm_future) = libp2p_core::swarm( + let swarm_peer_id = peer_id.clone(); + let (swarm_ctl, swarm_future) = libp2p_core::swarm( transport_sockets.clone().with_upgrade(upgrade), move |upgrade, client_addr| match upgrade { FinalUpgrade::Kad(kad) => Box::new(kad) as Box<_>, FinalUpgrade::FloodSub(future) => Box::new(future) as Box<_>, FinalUpgrade::Identify(IdentifyOutput::Sender { sender, .. }) => sender.send( IdentifyInfo { - public_key: peer_id.clone().into_bytes(), + public_key: swarm_peer_id.clone().into_bytes(), agent_version: "lighthouse/1.0.0".to_owned(), protocol_version: "rust-libp2p/1.0.0".to_owned(), listen_addrs: swarm_listened_addrs.read().unwrap().to_vec(), @@ -101,11 +106,97 @@ pub fn listen(peer_id: PeerId, }, ); - let actual_addr = swarm_ctrl + let actual_addr = swarm_ctl .listen_on(listen_multiaddr) .expect("Failed to listen on multiaddr"); info!(log, "Listening on: {:?}", actual_addr); + + let (kad_ctl, kad_init) = kad_ctl_proto.start( + swarm_ctl.clone(), + transport_sockets.clone().with_upgrade(kad_upgrade)); + + let topic = libp2p_floodsub::TopicBuilder::new("lighthouse").build(); + + let floodsub_ctl = libp2p_floodsub::FloodSubController:: + new(&floodsub_upgrade); + + floodsub_ctl.subscribe(&topic); + + let floodsub_rx = floodsub_rx.for_each(|msg| { + if let Ok(msg) = String::from_utf8(msg.data) { + info!(log, "< {}", msg); + } + Ok(()) + }); + + let kad_poll = { + let polling_peer_id = peer_id.clone(); + tokio_timer::wheel() + .build() + .interval_at(Instant::now(), Duration::from_secs(30)) + .map_err(|_| -> IoError { unreachable!() }) + .and_then(move |()| kad_ctl.find_node(peer_id.clone())) + .for_each(move |peers| { + let local_hash = U512::from(polling_peer_id.hash()); + info!(log, "Peer discovery results:"); + for peer in peers { + let peer_hash = U512::from(peer.hash()); + let distance = 512 - (local_hash ^ peer_hash).leading_zeros(); + let peer_addr = AddrComponent::P2P(peer.into_bytes()).into(); + let dial_result = swarm_ctl.dial( + peer_addr, + transport_sockets.clone().with_upgrade( + floodsub_upgrade.clone() + ) + ); + if let Err(err) = dial_result { + warn!(log, "Dialling {:?} failed.", err) + } + } + Ok(()) + }) + }; + + let stdin = build_stdin_future().for_each(move |msg| { + info!(log, "> {}", msg); + floodsub_ctl.publish(&topic, msg.into_bytes()); + Ok(()) + }); + + let final_future = swarm_future + .select(floodsub_rx) + .map_err(|(err, _)| err) + .map(|((), _)| ()) + .select(stdin) + .map_err(|(err, _)| err) + .map(|((), _)| ()) + .select(kad_poll) + .map_err(|(err, _)| err) + .map(|((), _)| ()) + .select(kad_init) + .map_err(|(err, _)| err) + .and_then(|((), n)| n); + + core.run(final_future).unwrap(); +} + +fn build_stdin_future() -> impl Stream { + use std::mem; + + let mut buffer = Vec::new(); + tokio_stdin::spawn_stdin_stream_unbounded() + .map_err(|_| -> IoError { panic!() }) + .filter_map(move |msg| { + if msg != b'\r' && msg != b'\n' { + buffer.push(msg); + return None; + } else if buffer.is_empty() { + return None; + } + + Some(String::from_utf8(mem::replace(&mut buffer, Vec::new())).unwrap()) + }) } #[derive(Clone)]