From ee61cfcfa776a1626fe3de7138942c1d796afdca Mon Sep 17 00:00:00 2001 From: obscuren Date: Sat, 18 Jan 2014 00:19:29 +0100 Subject: [PATCH] Added get peers and peers msg. --- peer.go | 64 +++++++++++++++++++++++++++++++++++++++++++++++-------- server.go | 43 +++++++++++++++++++++++++++++++++---- 2 files changed, 94 insertions(+), 13 deletions(-) diff --git a/peer.go b/peer.go index 8e6a5d540..158541028 100644 --- a/peer.go +++ b/peer.go @@ -5,6 +5,7 @@ import ( "github.com/ethereum/ethwire-go" "log" "net" + "strconv" "sync/atomic" "time" ) @@ -37,6 +38,9 @@ type Peer struct { // Last received pong message lastPong int64 + // Indicates whether a MsgGetPeersTy was requested of the peer + // this to prevent receiving false peers. + requestedPeerList bool } func NewPeer(conn net.Conn, server *Server, inbound bool) *Peer { @@ -160,8 +164,6 @@ out: log.Printf("Received %s\n", msg.Type.String()) } - // TODO Hash data and check if for existence (= ignore) - switch msg.Type { case ethwire.MsgHandshakeTy: // Version message @@ -172,20 +174,34 @@ out: log.Println(err) } case ethwire.MsgTxTy: + //p.server.blockManager.AddToTransactionPool(ethutil.NewTransactionFromData(ethutil.Encode(msg.Data))) case ethwire.MsgInvTy: case ethwire.MsgGetPeersTy: + p.requestedPeerList = true + // Peer asked for list of connected peers + p.pushPeers() case ethwire.MsgPeersTy: + // Received a list of peers (probably because MsgGetPeersTy was send) + // Only act on message if we actually requested for a peers list + if p.requestedPeerList { + data := ethutil.Conv(msg.Data) + // Create new list of possible peers for the server to process + peers := make([]string, data.Length()) + // Parse each possible peer + for i := 0; i < data.Length(); i++ { + peers[i] = data.Get(i).AsString() + strconv.Itoa(int(data.Get(i).AsUint())) + } + + // Connect to the list of peers + p.server.ProcessPeerList(peers) + // Mark unrequested again + p.requestedPeerList = false + } case ethwire.MsgPingTy: // Respond back with pong - p.writeMessage(ðwire.Msg{Type: ethwire.MsgPongTy}) + p.QueueMessage(ðwire.Msg{Type: ethwire.MsgPongTy}) case ethwire.MsgPongTy: p.lastPong = time.Now().Unix() - - /* - case "blockmine": - d, _ := ethutil.Decode(msg.Data, 0) - log.Printf("block mined %s\n", d) - */ } } @@ -231,6 +247,20 @@ func (p *Peer) pushHandshake() error { return nil } +// Pushes the list of outbound peers to the client when requested +func (p *Peer) pushPeers() { + outPeers := make([]interface{}, len(p.server.OutboundPeers())) + // Serialise each peer + for i, peer := range p.server.OutboundPeers() { + outPeers[i] = peer.RlpEncode() + } + + // Send message to the peer with the known list of connected clients + msg := ethwire.NewMessage(ethwire.MsgPeersTy, ethutil.Encode(outPeers)) + + p.QueueMessage(msg) +} + func (p *Peer) handleHandshake(msg *ethwire.Msg) { c := ethutil.Conv(msg.Data) // [PROTOCOL_VERSION, NETWORK_ID, CLIENT_ID] @@ -255,3 +285,19 @@ func (p *Peer) handleHandshake(msg *ethwire.Msg) { } } } + +func (p *Peer) RlpEncode() []byte { + host, prt, err := net.SplitHostPort(p.conn.RemoteAddr().String()) + if err != nil { + return nil + } + + i, err := strconv.Atoi(prt) + if err != nil { + return nil + } + + port := ethutil.NumberToBytes(uint16(i), 16) + + return ethutil.Encode([]interface{}{host, port}) +} diff --git a/server.go b/server.go index 9907f3b24..7a29d1bd9 100644 --- a/server.go +++ b/server.go @@ -20,6 +20,10 @@ func eachPeer(peers *list.List, callback func(*Peer, *list.Element)) { } } +const ( + processReapingTimeout = 60 // TODO increase +) + type Server struct { // Channel for shutting down the server shutdownChan chan bool @@ -66,6 +70,13 @@ func (s *Server) AddPeer(conn net.Conn) { } } +func (s *Server) ProcessPeerList(addrs []string) { + for _, addr := range addrs { + // TODO Probably requires some sanity checks + s.ConnectToPeer(addr) + } +} + func (s *Server) ConnectToPeer(addr string) error { peer := NewOutboundPeer(addr, s) @@ -74,16 +85,40 @@ func (s *Server) ConnectToPeer(addr string) error { return nil } +func (s *Server) OutboundPeers() []*Peer { + // Create a new peer slice with at least the length of the total peers + outboundPeers := make([]*Peer, s.peers.Len()) + length := 0 + eachPeer(s.peers, func(p *Peer, e *list.Element) { + if !p.inbound { + outboundPeers[length] = p + length++ + } + }) + + return outboundPeers[:length] +} + +func (s *Server) InboundPeers() []*Peer { + // Create a new peer slice with at least the length of the total peers + inboundPeers := make([]*Peer, s.peers.Len()) + length := 0 + eachPeer(s.peers, func(p *Peer, e *list.Element) { + if p.inbound { + inboundPeers[length] = p + length++ + } + }) + + return inboundPeers[:length] +} + func (s *Server) Broadcast(msgType ethwire.MsgType, data []byte) { eachPeer(s.peers, func(p *Peer, e *list.Element) { p.QueueMessage(ethwire.NewMessage(msgType, data)) }) } -const ( - processReapingTimeout = 1 // TODO increase -) - func (s *Server) ReapDeadPeers() { for { eachPeer(s.peers, func(p *Peer, e *list.Element) {