RPC: Enable back json streaming for non-batch and non-websocket cases (#4647)

* enable rpc streaming

* enable rpc streaming
This commit is contained in:
Alex Sharov 2022-07-06 16:44:06 +06:00 committed by GitHub
parent f19101d33b
commit d9cb87a149
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 71 additions and 35 deletions

View File

@ -6,7 +6,6 @@ import (
"encoding/binary"
"errors"
"fmt"
"github.com/ledgerwatch/erigon/rpc/rpccfg"
"net"
"net/http"
"os"
@ -15,6 +14,8 @@ import (
"strings"
"time"
"github.com/ledgerwatch/erigon/rpc/rpccfg"
"github.com/ledgerwatch/erigon-lib/direct"
"github.com/ledgerwatch/erigon-lib/gointerfaces"
"github.com/ledgerwatch/erigon-lib/gointerfaces/grpcutil"
@ -79,7 +80,8 @@ func RootCommand() (*cobra.Command, *httpcfg.HttpCfg) {
rootCmd.PersistentFlags().BoolVar(&cfg.WebsocketEnabled, "ws", false, "Enable Websockets")
rootCmd.PersistentFlags().BoolVar(&cfg.WebsocketCompression, "ws.compression", false, "Enable Websocket compression (RFC 7692)")
rootCmd.PersistentFlags().StringVar(&cfg.RpcAllowListFilePath, "rpc.accessList", "", "Specify granular (method-by-method) API allowlist")
rootCmd.PersistentFlags().UintVar(&cfg.RpcBatchConcurrency, "rpc.batch.concurrency", 2, "Does limit amount of goroutines to process 1 batch request. Means 1 bach request can't overload server. 1 batch still can have unlimited amount of request")
rootCmd.PersistentFlags().UintVar(&cfg.RpcBatchConcurrency, utils.RpcBatchConcurrencyFlag.Name, 2, utils.RpcBatchConcurrencyFlag.Usage)
rootCmd.PersistentFlags().BoolVar(&cfg.RpcStreamingDisable, utils.RpcStreamingDisableFlag.Name, false, utils.RpcStreamingDisableFlag.Usage)
rootCmd.PersistentFlags().IntVar(&cfg.DBReadConcurrency, "db.read.concurrency", runtime.GOMAXPROCS(-1), "Does limit amount of parallel db reads")
rootCmd.PersistentFlags().BoolVar(&cfg.TraceCompatibility, "trace.compat", false, "Bug for bug compatibility with OE for trace_ routines")
rootCmd.PersistentFlags().StringVar(&cfg.TxPoolApiAddr, "txpool.api.addr", "", "txpool api network address, for example: 127.0.0.1:9090 (default: use value of --private.api.addr)")
@ -439,7 +441,7 @@ func StartRpcServer(ctx context.Context, cfg httpcfg.HttpCfg, rpcAPI []rpc.API)
httpEndpoint := fmt.Sprintf("%s:%d", cfg.HttpListenAddress, cfg.HttpPort)
fmt.Printf("TraceRequests = %t\n", cfg.TraceRequests)
srv := rpc.NewServer(cfg.RpcBatchConcurrency, cfg.TraceRequests)
srv := rpc.NewServer(cfg.RpcBatchConcurrency, cfg.TraceRequests, cfg.RpcStreamingDisable)
allowListForRPC, err := parseAllowListForRPC(cfg.RpcAllowListFilePath)
if err != nil {
@ -613,7 +615,7 @@ func createHandler(cfg httpcfg.HttpCfg, apiList []rpc.API, httpHandler http.Hand
func createEngineListener(cfg httpcfg.HttpCfg, engineApi []rpc.API) (*http.Server, *rpc.Server, string, error) {
engineHttpEndpoint := fmt.Sprintf("%s:%d", cfg.EngineHTTPListenAddress, cfg.EnginePort)
engineSrv := rpc.NewServer(cfg.RpcBatchConcurrency, cfg.TraceRequests)
engineSrv := rpc.NewServer(cfg.RpcBatchConcurrency, cfg.TraceRequests, true)
allowListForRPC, err := parseAllowListForRPC(cfg.RpcAllowListFilePath)
if err != nil {

View File

@ -30,6 +30,7 @@ type HttpCfg struct {
WebsocketCompression bool
RpcAllowListFilePath string
RpcBatchConcurrency uint
RpcStreamingDisable bool
DBReadConcurrency int
TraceCompatibility bool // Bug for bug compatibility for trace_ routines with OpenEthereum
TxPoolApiAddr string

View File

@ -123,16 +123,20 @@ func (api *PrivateDebugAPIImpl) TraceTransaction(ctx context.Context, hash commo
// Retrieve the transaction and assemble its EVM context
blockNum, ok, err := api.txnLookup(ctx, tx, hash)
if err != nil {
stream.WriteNil()
return err
}
if !ok {
stream.WriteNil()
return nil
}
block, err := api.blockByNumberWithSenders(tx, blockNum)
if err != nil {
stream.WriteNil()
return err
}
if block == nil {
stream.WriteNil()
return nil
}
blockHash := block.Hash()
@ -148,12 +152,13 @@ func (api *PrivateDebugAPIImpl) TraceTransaction(ctx context.Context, hash commo
if txn == nil {
var borTx *types.Transaction
borTx, _, _, _, err = rawdb.ReadBorTransaction(tx, hash)
if err != nil {
stream.WriteNil()
return err
}
if borTx != nil {
stream.WriteNil()
return nil
}
stream.WriteNil()

View File

@ -81,7 +81,8 @@ func RootCommand() (*cobra.Command, *httpcfg.HttpCfg) {
rootCmd.PersistentFlags().BoolVar(&cfg.WebsocketEnabled, "ws", false, "Enable Websockets")
rootCmd.PersistentFlags().BoolVar(&cfg.WebsocketCompression, "ws.compression", false, "Enable Websocket compression (RFC 7692)")
rootCmd.PersistentFlags().StringVar(&cfg.RpcAllowListFilePath, "rpc.accessList", "", "Specify granular (method-by-method) API allowlist")
rootCmd.PersistentFlags().UintVar(&cfg.RpcBatchConcurrency, "rpc.batch.concurrency", 2, "Does limit amount of goroutines to process 1 batch request. Means 1 bach request can't overload server. 1 batch still can have unlimited amount of request")
rootCmd.PersistentFlags().UintVar(&cfg.RpcBatchConcurrency, utils.RpcBatchConcurrencyFlag.Name, 2, utils.RpcBatchConcurrencyFlag.Usage)
rootCmd.PersistentFlags().BoolVar(&cfg.RpcStreamingDisable, utils.RpcStreamingDisableFlag.Name, false, utils.RpcStreamingDisableFlag.Usage)
rootCmd.PersistentFlags().IntVar(&cfg.DBReadConcurrency, "db.read.concurrency", runtime.GOMAXPROCS(-1), "Does limit amount of parallel db reads")
rootCmd.PersistentFlags().BoolVar(&cfg.TraceCompatibility, "trace.compat", false, "Bug for bug compatibility with OE for trace_ routines")
rootCmd.PersistentFlags().StringVar(&cfg.TxPoolApiAddr, "txpool.api.addr", "", "txpool api network address, for example: 127.0.0.1:9090 (default: use value of --private.api.addr)")
@ -455,7 +456,7 @@ func StartRpcServer(ctx context.Context, cfg httpcfg.HttpCfg, rpcAPI []rpc.API)
// register apis and create handler stack
httpEndpoint := fmt.Sprintf("%s:%d", cfg.HttpListenAddress, cfg.HttpPort)
srv := rpc.NewServer(cfg.RpcBatchConcurrency, cfg.TraceRequests)
srv := rpc.NewServer(cfg.RpcBatchConcurrency, cfg.TraceRequests, cfg.RpcStreamingDisable)
allowListForRPC, err := parseAllowListForRPC(cfg.RpcAllowListFilePath)
if err != nil {
@ -629,7 +630,7 @@ func createHandler(cfg httpcfg.HttpCfg, apiList []rpc.API, httpHandler http.Hand
func createEngineListener(cfg httpcfg.HttpCfg, engineApi []rpc.API) (*http.Server, *rpc.Server, string, error) {
engineHttpEndpoint := fmt.Sprintf("%s:%d", cfg.EngineHTTPListenAddress, cfg.EnginePort)
engineSrv := rpc.NewServer(cfg.RpcBatchConcurrency, cfg.TraceRequests)
engineSrv := rpc.NewServer(cfg.RpcBatchConcurrency, cfg.TraceRequests, true)
allowListForRPC, err := parseAllowListForRPC(cfg.RpcAllowListFilePath)
if err != nil {

View File

@ -29,6 +29,7 @@ type HttpCfg struct {
WebsocketCompression bool
RpcAllowListFilePath string
RpcBatchConcurrency uint
RpcStreamingDisable bool
DBReadConcurrency int
TraceCompatibility bool // Bug for bug compatibility for trace_ routines with OpenEthereum
TxPoolApiAddr string

View File

@ -118,16 +118,20 @@ func (api *PrivateDebugAPIImpl) TraceTransaction(ctx context.Context, hash commo
// Retrieve the transaction and assemble its EVM context
blockNum, ok, err := api.txnLookup(ctx, tx, hash)
if err != nil {
stream.WriteNil()
return err
}
if !ok {
stream.WriteNil()
return nil
}
block, err := api.blockByNumberWithSenders(tx, blockNum)
if err != nil {
stream.WriteNil()
return err
}
if block == nil {
stream.WriteNil()
return nil
}
blockHash := block.Hash()

View File

@ -20,6 +20,15 @@ package utils
import (
"crypto/ecdsa"
"fmt"
"io"
"math/big"
"path/filepath"
"runtime"
"strconv"
"strings"
"text/tabwriter"
"text/template"
"github.com/c2h5oh/datasize"
"github.com/ledgerwatch/erigon-lib/kv/kvcache"
"github.com/ledgerwatch/erigon-lib/txpool"
@ -30,14 +39,6 @@ import (
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"github.com/urfave/cli"
"io"
"math/big"
"path/filepath"
"runtime"
"strconv"
"strings"
"text/tabwriter"
"text/template"
"github.com/ledgerwatch/erigon/eth/protocols/eth"
"github.com/ledgerwatch/erigon/params/networkname"
@ -363,6 +364,10 @@ var (
Usage: "Does limit amount of goroutines to process 1 batch request. Means 1 bach request can't overload server. 1 batch still can have unlimited amount of request",
Value: 2,
}
RpcStreamingDisableFlag = cli.BoolFlag{
Name: "rpc.streaming.disable",
Usage: "Erigon has enalbed json streamin for some heavy endpoints (like trace_*). It's treadoff: greatly reduce amount of RAM (in some cases from 30GB to 30mb), but it produce invalid json format if error happened in the middle of streaming (because json is not streaming-friendly format)",
}
HTTPTraceFlag = cli.BoolFlag{
Name: "http.trace",
Usage: "Trace HTTP requests with INFO level",

View File

@ -265,7 +265,7 @@ func (h *httpServer) enableRPC(apis []rpc.API, config httpConfig, allowList rpc.
}
// Create RPC server and handler.
srv := rpc.NewServer(50, false /* traceRequests */)
srv := rpc.NewServer(50, false /* traceRequests */, true)
srv.SetAllowList(allowList)
if err := RegisterApisFromWhitelist(apis, config.Modules, srv, false); err != nil {
return err
@ -298,7 +298,7 @@ func (h *httpServer) enableWS(apis []rpc.API, config wsConfig, allowList rpc.All
}
// Create RPC server and handler.
srv := rpc.NewServer(50, false /* traceRequests */)
srv := rpc.NewServer(50, false /* traceRequests */, true)
srv.SetAllowList(allowList)
if err := RegisterApisFromWhitelist(apis, config.Modules, srv, false); err != nil {
return err

View File

@ -560,7 +560,7 @@ func (c *Client) dispatch(codec ServerCodec) {
if op.batch {
conn.handler.handleBatch(op.msgs)
} else {
conn.handler.handleMsg(op.msgs[0])
conn.handler.handleMsg(op.msgs[0], nil)
}
case err := <-c.readErr:

View File

@ -178,19 +178,26 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) {
}
// handleMsg handles a single message.
func (h *handler) handleMsg(msg *jsonrpcMessage) {
func (h *handler) handleMsg(msg *jsonrpcMessage, stream *jsoniter.Stream) {
if ok := h.handleImmediate(msg); ok {
return
}
h.startCallProc(func(cp *callProc) {
stream := jsoniter.NewStream(jsoniter.ConfigDefault, nil, 4096)
needWriteStream := false
if stream == nil {
stream = jsoniter.NewStream(jsoniter.ConfigDefault, nil, 4096)
needWriteStream = true
}
answer := h.handleCallMsg(cp, msg, stream)
h.addSubscriptions(cp.notifiers)
if answer != nil {
h.conn.writeJSON(cp.ctx, answer)
} else {
_ = stream.Flush()
buffer, _ := json.Marshal(answer)
stream.Write(json.RawMessage(buffer))
}
if needWriteStream {
h.conn.writeJSON(cp.ctx, json.RawMessage(stream.Buffer()))
} else {
stream.Write([]byte("\n"))
}
for _, n := range cp.notifiers {
n.activate()

View File

@ -31,6 +31,7 @@ import (
"time"
"github.com/golang-jwt/jwt/v4"
jsoniter "github.com/json-iterator/go"
)
const (
@ -222,7 +223,11 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.Header().Set("content-type", contentType)
codec := newHTTPServerConn(r, w)
defer codec.close()
s.serveSingleRequest(ctx, codec)
var stream *jsoniter.Stream
if !s.disableStreaming {
stream = jsoniter.NewStream(jsoniter.ConfigDefault, w, 4096)
}
s.serveSingleRequest(ctx, codec, stream)
}
// validateRequest returns a non-zero response code and error message if the

View File

@ -104,7 +104,7 @@ func TestHTTPResponseWithEmptyGet(t *testing.T) {
func TestHTTPRespBodyUnlimited(t *testing.T) {
const respLength = maxRequestContentLength * 3
s := NewServer(50, false /* traceRequests */)
s := NewServer(50, false /* traceRequests */, true)
defer s.Stop()
if err := s.RegisterName("test", largeRespService{respLength}); err != nil {
t.Fatal(err)

View File

@ -22,6 +22,7 @@ import (
"sync/atomic"
mapset "github.com/deckarep/golang-set"
jsoniter "github.com/json-iterator/go"
"github.com/ledgerwatch/log/v3"
)
@ -49,12 +50,13 @@ type Server struct {
codecs mapset.Set
batchConcurrency uint
disableStreaming bool
traceRequests bool // Whether to print requests at INFO level
}
// NewServer creates a new server instance with no registered handlers.
func NewServer(batchConcurrency uint, traceRequests bool) *Server {
server := &Server{idgen: randomIDGenerator(), codecs: mapset.NewSet(), run: 1, batchConcurrency: batchConcurrency, traceRequests: traceRequests}
func NewServer(batchConcurrency uint, traceRequests, disableStreaming bool) *Server {
server := &Server{idgen: randomIDGenerator(), codecs: mapset.NewSet(), run: 1, batchConcurrency: batchConcurrency, disableStreaming: disableStreaming, traceRequests: traceRequests}
// Register the default service providing meta information about the RPC service such
// as the services and methods it offers.
rpcService := &RPCService{server: server}
@ -100,7 +102,7 @@ func (s *Server) ServeCodec(codec ServerCodec, options CodecOption) {
// serveSingleRequest reads and processes a single RPC request from the given codec. This
// is used to serve HTTP connections. Subscriptions and reverse calls are not allowed in
// this mode.
func (s *Server) serveSingleRequest(ctx context.Context, codec ServerCodec) {
func (s *Server) serveSingleRequest(ctx context.Context, codec ServerCodec, stream *jsoniter.Stream) {
// Don't serve if server is stopped.
if atomic.LoadInt32(&s.run) == 0 {
return
@ -120,7 +122,7 @@ func (s *Server) serveSingleRequest(ctx context.Context, codec ServerCodec) {
if batch {
h.handleBatch(reqs)
} else {
h.handleMsg(reqs[0])
h.handleMsg(reqs[0], stream)
}
}

View File

@ -31,7 +31,7 @@ import (
)
func TestServerRegisterName(t *testing.T) {
server := NewServer(50, false /* traceRequests */)
server := NewServer(50, false /* traceRequests */, true)
service := new(testService)
if err := server.RegisterName("test", service); err != nil {

View File

@ -53,7 +53,7 @@ func TestSubscriptions(t *testing.T) {
subCount = len(namespaces)
notificationCount = 3
server = NewServer(50, false /* traceRequests */)
server = NewServer(50, false /* traceRequests */, true)
clientConn, serverConn = net.Pipe()
out = json.NewEncoder(clientConn)
in = json.NewDecoder(clientConn)

View File

@ -26,7 +26,7 @@ import (
)
func newTestServer() *Server {
server := NewServer(50, false /* traceRequests */)
server := NewServer(50, false /* traceRequests */, true)
server.idgen = sequentialIDGenerator()
if err := server.RegisterName("test", new(testService)); err != nil {
panic(err)

View File

@ -163,7 +163,7 @@ func TestClientWebsocketPing(t *testing.T) {
// This checks that the websocket transport can deal with large messages.
func TestClientWebsocketLargeMessage(t *testing.T) {
var (
srv = NewServer(50, false /* traceRequests */)
srv = NewServer(50, false /* traceRequests */, true)
httpsrv = httptest.NewServer(srv.WebsocketHandler(nil, nil, false))
wsURL = "ws:" + strings.TrimPrefix(httpsrv.URL, "http:")
)

View File

@ -61,6 +61,7 @@ var DefaultFlags = []cli.Flag{
utils.HTTPTraceFlag,
utils.StateCacheFlag,
utils.RpcBatchConcurrencyFlag,
utils.RpcStreamingDisableFlag,
utils.DBReadConcurrencyFlag,
utils.RpcAccessListFlag,
utils.RpcTraceCompatFlag,

View File

@ -2,10 +2,11 @@ package cli
import (
"fmt"
"github.com/ledgerwatch/erigon/rpc/rpccfg"
"strings"
"time"
"github.com/ledgerwatch/erigon/rpc/rpccfg"
"github.com/c2h5oh/datasize"
"github.com/ledgerwatch/erigon-lib/etl"
"github.com/ledgerwatch/erigon-lib/kv"
@ -344,6 +345,7 @@ func setEmbeddedRpcDaemon(ctx *cli.Context, cfg *nodecfg.Config) {
WebsocketEnabled: ctx.GlobalIsSet(utils.WSEnabledFlag.Name),
RpcBatchConcurrency: ctx.GlobalUint(utils.RpcBatchConcurrencyFlag.Name),
RpcStreamingDisable: ctx.GlobalBool(utils.RpcStreamingDisableFlag.Name),
DBReadConcurrency: ctx.GlobalInt(utils.DBReadConcurrencyFlag.Name),
RpcAllowListFilePath: ctx.GlobalString(utils.RpcAccessListFlag.Name),
Gascap: ctx.GlobalUint64(utils.RpcGasCapFlag.Name),