From e7574a6d14a64c5af58574d909b8c6a014bd2daf Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Thu, 19 Aug 2021 09:26:06 +0700 Subject: [PATCH] RPC: batch - preserve order, streaming to in-mem buf (#2541) * preserve order in batch * fix batch order * base fee in header json * less logs in tests * less logs in tests * save * save --- p2p/discover/table_util_test.go | 1 + p2p/dnsdisc/client_test.go | 16 +++---- rpc/client_test.go | 3 ++ rpc/handler.go | 75 +++++++++------------------------ rpc/service.go | 6 +-- rpc/testdata/reqresp-batch.js | 2 +- tests/block_test.go | 3 ++ tests/state_test.go | 2 + tests/vm_test.go | 2 + 9 files changed, 42 insertions(+), 68 deletions(-) diff --git a/p2p/discover/table_util_test.go b/p2p/discover/table_util_test.go index 26097aea8..c6c9da205 100644 --- a/p2p/discover/table_util_test.go +++ b/p2p/discover/table_util_test.go @@ -39,6 +39,7 @@ func init() { var r enr.Record r.Set(enr.IP{0, 0, 0, 0}) nullNode = enode.SignNull(&r, enode.ID{}) + log.Root().SetHandler(log.LvlFilterHandler(log.LvlInfo, log.StderrHandler)) } func newTestTable(t transport) (*Table, *enode.DB) { diff --git a/p2p/dnsdisc/client_test.go b/p2p/dnsdisc/client_test.go index d44198d01..d77dc3291 100644 --- a/p2p/dnsdisc/client_test.go +++ b/p2p/dnsdisc/client_test.go @@ -55,7 +55,7 @@ func TestClientSyncTree(t *testing.T) { wantSeq = uint(1) ) - c := NewClient(Config{Resolver: r, Logger: testlog.Logger(t, log.LvlTrace)}) + c := NewClient(Config{Resolver: r, Logger: testlog.Logger(t, log.LvlInfo)}) stree, err := c.SyncTree("enrtree://AKPYQIUQIL7PSIACI32J7FGZW56E5FKHEFCCOFHILBIMW3M6LWXS2@n") if err != nil { t.Fatal("sync error:", err) @@ -89,7 +89,7 @@ func TestClientSyncTreeBadNode(t *testing.T) { "C7HRFPF3BLGF3YR4DY5KX3SMBE.n": "enrtree://AM5FCQLWIZX2QFPNJAP7VUERCCRNGRHWZG3YYHIUV7BVDQ5FDPRT2@morenodes.example.org", "INDMVBZEEQ4ESVYAKGIYU74EAA.n": "enr:-----", } - c := NewClient(Config{Resolver: r, Logger: testlog.Logger(t, log.LvlTrace)}) + c := NewClient(Config{Resolver: r, Logger: testlog.Logger(t, log.LvlInfo)}) _, err := c.SyncTree("enrtree://AKPYQIUQIL7PSIACI32J7FGZW56E5FKHEFCCOFHILBIMW3M6LWXS2@n") wantErr := nameError{name: "INDMVBZEEQ4ESVYAKGIYU74EAA.n", err: entryError{typ: "enr", err: errInvalidENR}} if err != wantErr { @@ -104,7 +104,7 @@ func TestIterator(t *testing.T) { r := mapResolver(tree.ToTXT("n")) c := NewClient(Config{ Resolver: r, - Logger: testlog.Logger(t, log.LvlTrace), + Logger: testlog.Logger(t, log.LvlInfo), RateLimit: 500, }) it, err := c.NewIterator(url) @@ -145,7 +145,7 @@ func TestIteratorLinks(t *testing.T) { tree2, url2 := makeTestTree("t2", nodes[10:], []string{url1}) c := NewClient(Config{ Resolver: newMapResolver(tree1.ToTXT("t1"), tree2.ToTXT("t2")), - Logger: testlog.Logger(t, log.LvlTrace), + Logger: testlog.Logger(t, log.LvlInfo), RateLimit: 500, }) it, err := c.NewIterator(url2) @@ -165,7 +165,7 @@ func TestIteratorNodeUpdates(t *testing.T) { resolver = newMapResolver() c = NewClient(Config{ Resolver: resolver, - Logger: testlog.Logger(t, log.LvlTrace), + Logger: testlog.Logger(t, log.LvlInfo), RecheckInterval: 20 * time.Minute, RateLimit: 500, }) @@ -202,7 +202,7 @@ func TestIteratorRootRecheckOnFail(t *testing.T) { resolver = newMapResolver() c = NewClient(Config{ Resolver: resolver, - Logger: testlog.Logger(t, log.LvlTrace), + Logger: testlog.Logger(t, log.LvlInfo), RecheckInterval: 20 * time.Minute, RateLimit: 500, // Disabling the cache is required for this test because the client doesn't @@ -239,7 +239,7 @@ func TestIteratorEmptyTree(t *testing.T) { resolver = newMapResolver() c = NewClient(Config{ Resolver: resolver, - Logger: testlog.Logger(t, log.LvlTrace), + Logger: testlog.Logger(t, log.LvlInfo), RecheckInterval: 20 * time.Minute, RateLimit: 500, }) @@ -300,7 +300,7 @@ func TestIteratorLinkUpdates(t *testing.T) { resolver = newMapResolver() c = NewClient(Config{ Resolver: resolver, - Logger: testlog.Logger(t, log.LvlTrace), + Logger: testlog.Logger(t, log.LvlInfo), RecheckInterval: 20 * time.Minute, RateLimit: 500, }) diff --git a/rpc/client_test.go b/rpc/client_test.go index 64fe5470b..0d4234737 100644 --- a/rpc/client_test.go +++ b/rpc/client_test.go @@ -34,6 +34,9 @@ import ( "github.com/ledgerwatch/log/v3" ) +func init() { + log.Root().SetHandler(log.LvlFilterHandler(log.LvlInfo, log.StderrHandler)) +} func TestClientRequest(t *testing.T) { server := newTestServer() defer server.Stop() diff --git a/rpc/handler.go b/rpc/handler.go index 43efbdcf7..b54864749 100644 --- a/rpc/handler.go +++ b/rpc/handler.go @@ -17,6 +17,7 @@ package rpc import ( + "bytes" "context" "encoding/json" "reflect" @@ -26,6 +27,7 @@ import ( "time" jsoniter "github.com/json-iterator/go" + "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/log/v3" ) @@ -101,12 +103,6 @@ func newHandler(connCtx context.Context, conn jsonWriter, idgen func() ID, reg * // handleBatch executes all messages in a batch and returns the responses. func (h *handler) handleBatch(msgs []*jsonrpcMessage, stream *jsoniter.Stream) { - needWriteStream := false - if stream == nil { - stream = jsoniter.NewStream(jsoniter.ConfigDefault, nil, 4096) - needWriteStream = true - } - // Emit error response for empty batches: if len(msgs) == 0 { h.startCallProc(func(cp *callProc) { @@ -127,76 +123,43 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage, stream *jsoniter.Stream) { } // Process calls on a goroutine because they may block indefinitely: h.startCallProc(func(cp *callProc) { - stream.WriteArrayStart() - firstResponse := true // All goroutines will place results right to this array. Because requests order must match reply orders. + answersWithNils := make([]interface{}, len(msgs)) // Bounded parallelism pattern explanation https://blog.golang.org/pipelines#TOC_9. boundedConcurrency := make(chan struct{}, h.maxBatchConcurrency) defer close(boundedConcurrency) wg := sync.WaitGroup{} wg.Add(len(msgs)) - streamMutex := sync.Mutex{} - - writeToStream := func(buffer []byte) { - if len(buffer) == 0 { - return - } - - streamMutex.Lock() - defer streamMutex.Unlock() - - if !firstResponse { - stream.WriteMore() - } - stream.Write(buffer) - firstResponse = false - } - for i := range calls { - if calls[i].isSubscribe() { - // Force subscribe call to work in non-streaming mode - response := h.handleCallMsg(cp, calls[i], nil) - if response != nil { - b, _ := json.Marshal(response) - writeToStream(b) - } - } boundedConcurrency <- struct{}{} go func(i int) { defer func() { wg.Done() <-boundedConcurrency }() - cb := h.reg.callback(calls[i].Method) - var response *jsonrpcMessage - if cb != nil && cb.streamable { // cb == nil: means no such method and this case is thread-safe - batchStream := jsoniter.NewStream(jsoniter.ConfigDefault, nil, 4096) - response = h.handleCallMsg(cp, calls[i], batchStream) - if response == nil { - writeToStream(batchStream.Buffer()) - } - } else { - response = h.handleCallMsg(cp, calls[i], stream) + + buf := bytes.NewBuffer(nil) + stream := jsoniter.NewStream(jsoniter.ConfigDefault, buf, 4096) + if res := h.handleCallMsg(cp, calls[i], stream); res != nil { + answersWithNils[i] = res } - // Marshal inside goroutine (parallel) - if response != nil { - buffer, _ := json.Marshal(response) - writeToStream(buffer) + _ = stream.Flush() + if buf.Len() > 0 && answersWithNils[i] == nil { + answersWithNils[i] = json.RawMessage(common.CopyBytes(buf.Bytes())) } }(i) } wg.Wait() - - stream.WriteArrayEnd() - stream.Flush() - - if needWriteStream { - h.conn.writeJSON(cp.ctx, json.RawMessage(stream.Buffer())) - } else { - stream.Write([]byte("\n")) + answers := make([]interface{}, 0, len(msgs)) + for _, answer := range answersWithNils { + if answer != nil { + answers = append(answers, answer) + } } - h.addSubscriptions(cp.notifiers) + if len(answers) > 0 { + h.conn.writeJSON(cp.ctx, answers) + } for _, n := range cp.notifiers { n.activate() } diff --git a/rpc/service.go b/rpc/service.go index 6ed5468a2..b0b0e08f3 100644 --- a/rpc/service.go +++ b/rpc/service.go @@ -149,7 +149,7 @@ func newCallback(receiver, fn reflect.Value, name string) *callback { outs[i] = fntype.Out(i) } if len(outs) > 2 { - log.Warn("Cannot register RPC callback [%s] - maximum 2 return values are allowed, got %d", name, len(outs)) + log.Warn(fmt.Sprintf("Cannot register RPC callback [%s] - maximum 2 return values are allowed, got %d", name, len(outs))) return nil } // If an error is returned, it must be the last returned value. @@ -158,14 +158,14 @@ func newCallback(receiver, fn reflect.Value, name string) *callback { c.errPos = 0 case len(outs) == 2: if isErrorType(outs[0]) || !isErrorType(outs[1]) { - log.Warn("Cannot register RPC callback [%s] - error must the last return value", name) + log.Warn(fmt.Sprintf("Cannot register RPC callback [%s] - error must the last return value", name)) return nil } c.errPos = 1 } // If there is only one return value (error), and the last argument is *jsoniter.Stream, mark it as streamable if len(outs) != 1 && c.streamable { - log.Warn("Cannot register RPC callback [%s] - streamable method may only return 1 value (error)", name) + log.Warn(fmt.Sprintf("Cannot register RPC callback [%s] - streamable method may only return 1 value (error)", name)) return nil } return c diff --git a/rpc/testdata/reqresp-batch.js b/rpc/testdata/reqresp-batch.js index 964bc8084..977af7663 100644 --- a/rpc/testdata/reqresp-batch.js +++ b/rpc/testdata/reqresp-batch.js @@ -1,7 +1,7 @@ // There is no response for all-notification batches. --> [{"jsonrpc":"2.0","method":"test_echo","params":["x",99]}] -<-- [] + // This test checks regular batch calls. --> [{"jsonrpc":"2.0","id":2,"method":"test_echo","params":[]}, {"jsonrpc":"2.0","id": 3,"method":"test_echo","params":["x",3]}] diff --git a/tests/block_test.go b/tests/block_test.go index 77d14ee82..16bcf4d11 100644 --- a/tests/block_test.go +++ b/tests/block_test.go @@ -19,9 +19,12 @@ package tests import ( "runtime" "testing" + + "github.com/ledgerwatch/log/v3" ) func TestBlockchain(t *testing.T) { + log.Root().SetHandler(log.LvlFilterHandler(log.LvlInfo, log.StderrHandler)) if runtime.GOOS == "windows" { t.Skip("fix me on win please") // after remove ChainReader from consensus engine - this test can be changed to create less databases, then can enable on win. now timeout after 20min } diff --git a/tests/state_test.go b/tests/state_test.go index 949f6d30c..78aec0e3a 100644 --- a/tests/state_test.go +++ b/tests/state_test.go @@ -27,9 +27,11 @@ import ( "github.com/ledgerwatch/erigon-lib/kv/memdb" "github.com/ledgerwatch/erigon/core/vm" + "github.com/ledgerwatch/log/v3" ) func TestState(t *testing.T) { + log.Root().SetHandler(log.LvlFilterHandler(log.LvlInfo, log.StderrHandler)) if runtime.GOOS == "windows" { t.Skip("fix me on win please") // it's too slow on win, need generally improve speed of this tests } diff --git a/tests/vm_test.go b/tests/vm_test.go index 9f153ac30..688c081b3 100644 --- a/tests/vm_test.go +++ b/tests/vm_test.go @@ -22,9 +22,11 @@ import ( "github.com/ledgerwatch/erigon-lib/kv/memdb" "github.com/ledgerwatch/erigon/core/vm" + "github.com/ledgerwatch/log/v3" ) func TestVM(t *testing.T) { + log.Root().SetHandler(log.LvlFilterHandler(log.LvlInfo, log.StderrHandler)) t.Parallel() vmt := new(testMatcher) vmt.slow("^vmPerformance")