From ffbe5656ff2cba43c813f46f743fde4d1ab2dd58 Mon Sep 17 00:00:00 2001 From: Bas van Kervel Date: Thu, 25 Jun 2015 13:18:10 +0200 Subject: [PATCH] support for large requests/responses --- rpc/codec/json.go | 48 ++++++++++++++++++++++++++++--------------- rpc/comms/ipc.go | 1 + rpc/comms/ipc_unix.go | 2 +- 3 files changed, 33 insertions(+), 18 deletions(-) diff --git a/rpc/codec/json.go b/rpc/codec/json.go index 380b4cba7..1de649c21 100644 --- a/rpc/codec/json.go +++ b/rpc/codec/json.go @@ -2,28 +2,30 @@ package codec import ( "encoding/json" + "fmt" "net" + "time" "github.com/ethereum/go-ethereum/rpc/shared" ) const ( - MAX_REQUEST_SIZE = 1024 * 1024 + MAX_REQUEST_SIZE = 1024 * 1024 MAX_RESPONSE_SIZE = 1024 * 1024 ) // Json serialization support type JsonCodec struct { - c net.Conn - buffer []byte + c net.Conn + buffer []byte bytesInBuffer int } // Create new JSON coder instance func NewJsonCoder(conn net.Conn) ApiCoder { return &JsonCodec{ - c: conn, - buffer: make([]byte, MAX_REQUEST_SIZE), + c: conn, + buffer: make([]byte, MAX_REQUEST_SIZE), bytesInBuffer: 0, } } @@ -58,28 +60,40 @@ func (self *JsonCodec) ReadRequest() (requests []*shared.Request, isBatch bool, } func (self *JsonCodec) ReadResponse() (interface{}, error) { - var err error + bytesInBuffer := 0 buf := make([]byte, MAX_RESPONSE_SIZE) - n, _ := self.c.Read(buf) - var failure shared.ErrorResponse - if err = json.Unmarshal(buf[:n], &failure); err == nil && failure.Error != nil { - return failure, nil + deadline := time.Now().Add(15 * time.Second) + self.c.SetDeadline(deadline) + + for { + n, err := self.c.Read(buf[bytesInBuffer:]) + if err != nil { + return nil, err + } + bytesInBuffer += n + + var success shared.SuccessResponse + if err = json.Unmarshal(buf[:bytesInBuffer], &success); err == nil { + return success, nil + } + + var failure shared.ErrorResponse + if err = json.Unmarshal(buf[:bytesInBuffer], &failure); err == nil && failure.Error != nil { + return failure, nil + } } - var success shared.SuccessResponse - if err = json.Unmarshal(buf[:n], &success); err == nil { - return success, nil - } - - return nil, err + self.c.Close() + return nil, fmt.Errorf("Unable to read response") } -// Encode response to encoded form in underlying stream +// Decode data func (self *JsonCodec) Decode(data []byte, msg interface{}) error { return json.Unmarshal(data, msg) } +// Encode message func (self *JsonCodec) Encode(msg interface{}) ([]byte, error) { return json.Marshal(msg) } diff --git a/rpc/comms/ipc.go b/rpc/comms/ipc.go index 068a1288f..3cfcbf3cf 100644 --- a/rpc/comms/ipc.go +++ b/rpc/comms/ipc.go @@ -16,6 +16,7 @@ type IpcConfig struct { type ipcClient struct { endpoint string + c net.Conn codec codec.Codec coder codec.ApiCoder } diff --git a/rpc/comms/ipc_unix.go b/rpc/comms/ipc_unix.go index 295eb916b..5724231f4 100644 --- a/rpc/comms/ipc_unix.go +++ b/rpc/comms/ipc_unix.go @@ -18,7 +18,7 @@ func newIpcClient(cfg IpcConfig, codec codec.Codec) (*ipcClient, error) { return nil, err } - return &ipcClient{cfg.Endpoint, codec, codec.New(c)}, nil + return &ipcClient{cfg.Endpoint, c, codec, codec.New(c)}, nil } func (self *ipcClient) reconnect() error {