Added get peers and peers msg.

This commit is contained in:
obscuren 2014-01-18 00:19:29 +01:00
parent 5e5f386108
commit ee61cfcfa7
2 changed files with 94 additions and 13 deletions

64
peer.go
View File

@ -5,6 +5,7 @@ import (
"github.com/ethereum/ethwire-go" "github.com/ethereum/ethwire-go"
"log" "log"
"net" "net"
"strconv"
"sync/atomic" "sync/atomic"
"time" "time"
) )
@ -37,6 +38,9 @@ type Peer struct {
// Last received pong message // Last received pong message
lastPong int64 lastPong int64
// Indicates whether a MsgGetPeersTy was requested of the peer
// this to prevent receiving false peers.
requestedPeerList bool
} }
func NewPeer(conn net.Conn, server *Server, inbound bool) *Peer { func NewPeer(conn net.Conn, server *Server, inbound bool) *Peer {
@ -160,8 +164,6 @@ out:
log.Printf("Received %s\n", msg.Type.String()) log.Printf("Received %s\n", msg.Type.String())
} }
// TODO Hash data and check if for existence (= ignore)
switch msg.Type { switch msg.Type {
case ethwire.MsgHandshakeTy: case ethwire.MsgHandshakeTy:
// Version message // Version message
@ -172,20 +174,34 @@ out:
log.Println(err) log.Println(err)
} }
case ethwire.MsgTxTy: case ethwire.MsgTxTy:
//p.server.blockManager.AddToTransactionPool(ethutil.NewTransactionFromData(ethutil.Encode(msg.Data)))
case ethwire.MsgInvTy: case ethwire.MsgInvTy:
case ethwire.MsgGetPeersTy: case ethwire.MsgGetPeersTy:
p.requestedPeerList = true
// Peer asked for list of connected peers
p.pushPeers()
case ethwire.MsgPeersTy: case ethwire.MsgPeersTy:
// Received a list of peers (probably because MsgGetPeersTy was send)
// Only act on message if we actually requested for a peers list
if p.requestedPeerList {
data := ethutil.Conv(msg.Data)
// Create new list of possible peers for the server to process
peers := make([]string, data.Length())
// Parse each possible peer
for i := 0; i < data.Length(); i++ {
peers[i] = data.Get(i).AsString() + strconv.Itoa(int(data.Get(i).AsUint()))
}
// Connect to the list of peers
p.server.ProcessPeerList(peers)
// Mark unrequested again
p.requestedPeerList = false
}
case ethwire.MsgPingTy: case ethwire.MsgPingTy:
// Respond back with pong // Respond back with pong
p.writeMessage(&ethwire.Msg{Type: ethwire.MsgPongTy}) p.QueueMessage(&ethwire.Msg{Type: ethwire.MsgPongTy})
case ethwire.MsgPongTy: case ethwire.MsgPongTy:
p.lastPong = time.Now().Unix() p.lastPong = time.Now().Unix()
/*
case "blockmine":
d, _ := ethutil.Decode(msg.Data, 0)
log.Printf("block mined %s\n", d)
*/
} }
} }
@ -231,6 +247,20 @@ func (p *Peer) pushHandshake() error {
return nil return nil
} }
// Pushes the list of outbound peers to the client when requested
func (p *Peer) pushPeers() {
outPeers := make([]interface{}, len(p.server.OutboundPeers()))
// Serialise each peer
for i, peer := range p.server.OutboundPeers() {
outPeers[i] = peer.RlpEncode()
}
// Send message to the peer with the known list of connected clients
msg := ethwire.NewMessage(ethwire.MsgPeersTy, ethutil.Encode(outPeers))
p.QueueMessage(msg)
}
func (p *Peer) handleHandshake(msg *ethwire.Msg) { func (p *Peer) handleHandshake(msg *ethwire.Msg) {
c := ethutil.Conv(msg.Data) c := ethutil.Conv(msg.Data)
// [PROTOCOL_VERSION, NETWORK_ID, CLIENT_ID] // [PROTOCOL_VERSION, NETWORK_ID, CLIENT_ID]
@ -255,3 +285,19 @@ func (p *Peer) handleHandshake(msg *ethwire.Msg) {
} }
} }
} }
func (p *Peer) RlpEncode() []byte {
host, prt, err := net.SplitHostPort(p.conn.RemoteAddr().String())
if err != nil {
return nil
}
i, err := strconv.Atoi(prt)
if err != nil {
return nil
}
port := ethutil.NumberToBytes(uint16(i), 16)
return ethutil.Encode([]interface{}{host, port})
}

View File

@ -20,6 +20,10 @@ func eachPeer(peers *list.List, callback func(*Peer, *list.Element)) {
} }
} }
const (
processReapingTimeout = 60 // TODO increase
)
type Server struct { type Server struct {
// Channel for shutting down the server // Channel for shutting down the server
shutdownChan chan bool shutdownChan chan bool
@ -66,6 +70,13 @@ func (s *Server) AddPeer(conn net.Conn) {
} }
} }
func (s *Server) ProcessPeerList(addrs []string) {
for _, addr := range addrs {
// TODO Probably requires some sanity checks
s.ConnectToPeer(addr)
}
}
func (s *Server) ConnectToPeer(addr string) error { func (s *Server) ConnectToPeer(addr string) error {
peer := NewOutboundPeer(addr, s) peer := NewOutboundPeer(addr, s)
@ -74,16 +85,40 @@ func (s *Server) ConnectToPeer(addr string) error {
return nil return nil
} }
func (s *Server) OutboundPeers() []*Peer {
// Create a new peer slice with at least the length of the total peers
outboundPeers := make([]*Peer, s.peers.Len())
length := 0
eachPeer(s.peers, func(p *Peer, e *list.Element) {
if !p.inbound {
outboundPeers[length] = p
length++
}
})
return outboundPeers[:length]
}
func (s *Server) InboundPeers() []*Peer {
// Create a new peer slice with at least the length of the total peers
inboundPeers := make([]*Peer, s.peers.Len())
length := 0
eachPeer(s.peers, func(p *Peer, e *list.Element) {
if p.inbound {
inboundPeers[length] = p
length++
}
})
return inboundPeers[:length]
}
func (s *Server) Broadcast(msgType ethwire.MsgType, data []byte) { func (s *Server) Broadcast(msgType ethwire.MsgType, data []byte) {
eachPeer(s.peers, func(p *Peer, e *list.Element) { eachPeer(s.peers, func(p *Peer, e *list.Element) {
p.QueueMessage(ethwire.NewMessage(msgType, data)) p.QueueMessage(ethwire.NewMessage(msgType, data))
}) })
} }
const (
processReapingTimeout = 1 // TODO increase
)
func (s *Server) ReapDeadPeers() { func (s *Server) ReapDeadPeers() {
for { for {
eachPeer(s.peers, func(p *Peer, e *list.Element) { eachPeer(s.peers, func(p *Peer, e *list.Element) {