diff --git a/p2p/peer.go b/p2p/peer.go index ebf7490c6..1d2b726e8 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -32,10 +32,12 @@ import ( ) const ( - baseProtocolVersion = 4 + baseProtocolVersion = 5 baseProtocolLength = uint64(16) baseProtocolMaxMsgSize = 2 * 1024 + snappyProtocolVersion = 5 + pingInterval = 15 * time.Second ) diff --git a/p2p/rlpx.go b/p2p/rlpx.go index b2775cacd..24037ecc1 100644 --- a/p2p/rlpx.go +++ b/p2p/rlpx.go @@ -29,6 +29,7 @@ import ( "fmt" "hash" "io" + "io/ioutil" mrand "math/rand" "net" "sync" @@ -40,6 +41,7 @@ import ( "github.com/ethereum/go-ethereum/crypto/sha3" "github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/rlp" + "github.com/golang/snappy" ) const ( @@ -68,6 +70,10 @@ const ( discWriteTimeout = 1 * time.Second ) +// errPlainMessageTooLarge is returned if a decompressed message length exceeds +// the allowed 24 bits (i.e. length >= 16MB). +var errPlainMessageTooLarge = errors.New("message length >= 16MB") + // rlpx is the transport protocol used by actual (non-test) connections. // It wraps the frame encoder with locks and read/write deadlines. type rlpx struct { @@ -127,6 +133,9 @@ func (t *rlpx) doProtoHandshake(our *protoHandshake) (their *protoHandshake, err if err := <-werr; err != nil { return nil, fmt.Errorf("write error: %v", err) } + // If the protocol version supports Snappy encoding, upgrade immediately + t.rw.snappy = their.Version >= snappyProtocolVersion + return their, nil } @@ -556,6 +565,8 @@ type rlpxFrameRW struct { macCipher cipher.Block egressMAC hash.Hash ingressMAC hash.Hash + + snappy bool } func newRLPXFrameRW(conn io.ReadWriter, s secrets) *rlpxFrameRW { @@ -583,6 +594,17 @@ func newRLPXFrameRW(conn io.ReadWriter, s secrets) *rlpxFrameRW { func (rw *rlpxFrameRW) WriteMsg(msg Msg) error { ptype, _ := rlp.EncodeToBytes(msg.Code) + // if snappy is enabled, compress message now + if rw.snappy { + if msg.Size > maxUint24 { + return errPlainMessageTooLarge + } + payload, _ := ioutil.ReadAll(msg.Payload) + payload = snappy.Encode(nil, payload) + + msg.Payload = bytes.NewReader(payload) + msg.Size = uint32(len(payload)) + } // write header headbuf := make([]byte, 32) fsize := uint32(len(ptype)) + msg.Size @@ -668,6 +690,26 @@ func (rw *rlpxFrameRW) ReadMsg() (msg Msg, err error) { } msg.Size = uint32(content.Len()) msg.Payload = content + + // if snappy is enabled, verify and decompress message + if rw.snappy { + payload, err := ioutil.ReadAll(msg.Payload) + if err != nil { + return msg, err + } + size, err := snappy.DecodedLen(payload) + if err != nil { + return msg, err + } + if size > int(maxUint24) { + return msg, errPlainMessageTooLarge + } + payload, err = snappy.Decode(nil, payload) + if err != nil { + return msg, err + } + msg.Size, msg.Payload = uint32(size), bytes.NewReader(payload) + } return msg, nil }