RPC: batch - preserve order, streaming to in-mem buf (#2541)

* preserve order in batch

* fix batch order

* base fee in header json

* less logs in tests

* less logs in tests

* save

* save
This commit is contained in:
Alex Sharov 2021-08-19 09:26:06 +07:00 committed by GitHub
parent 85565dcd92
commit e7574a6d14
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 42 additions and 68 deletions

View File

@ -39,6 +39,7 @@ func init() {
var r enr.Record var r enr.Record
r.Set(enr.IP{0, 0, 0, 0}) r.Set(enr.IP{0, 0, 0, 0})
nullNode = enode.SignNull(&r, enode.ID{}) nullNode = enode.SignNull(&r, enode.ID{})
log.Root().SetHandler(log.LvlFilterHandler(log.LvlInfo, log.StderrHandler))
} }
func newTestTable(t transport) (*Table, *enode.DB) { func newTestTable(t transport) (*Table, *enode.DB) {

View File

@ -55,7 +55,7 @@ func TestClientSyncTree(t *testing.T) {
wantSeq = uint(1) wantSeq = uint(1)
) )
c := NewClient(Config{Resolver: r, Logger: testlog.Logger(t, log.LvlTrace)}) c := NewClient(Config{Resolver: r, Logger: testlog.Logger(t, log.LvlInfo)})
stree, err := c.SyncTree("enrtree://AKPYQIUQIL7PSIACI32J7FGZW56E5FKHEFCCOFHILBIMW3M6LWXS2@n") stree, err := c.SyncTree("enrtree://AKPYQIUQIL7PSIACI32J7FGZW56E5FKHEFCCOFHILBIMW3M6LWXS2@n")
if err != nil { if err != nil {
t.Fatal("sync error:", err) t.Fatal("sync error:", err)
@ -89,7 +89,7 @@ func TestClientSyncTreeBadNode(t *testing.T) {
"C7HRFPF3BLGF3YR4DY5KX3SMBE.n": "enrtree://AM5FCQLWIZX2QFPNJAP7VUERCCRNGRHWZG3YYHIUV7BVDQ5FDPRT2@morenodes.example.org", "C7HRFPF3BLGF3YR4DY5KX3SMBE.n": "enrtree://AM5FCQLWIZX2QFPNJAP7VUERCCRNGRHWZG3YYHIUV7BVDQ5FDPRT2@morenodes.example.org",
"INDMVBZEEQ4ESVYAKGIYU74EAA.n": "enr:-----", "INDMVBZEEQ4ESVYAKGIYU74EAA.n": "enr:-----",
} }
c := NewClient(Config{Resolver: r, Logger: testlog.Logger(t, log.LvlTrace)}) c := NewClient(Config{Resolver: r, Logger: testlog.Logger(t, log.LvlInfo)})
_, err := c.SyncTree("enrtree://AKPYQIUQIL7PSIACI32J7FGZW56E5FKHEFCCOFHILBIMW3M6LWXS2@n") _, err := c.SyncTree("enrtree://AKPYQIUQIL7PSIACI32J7FGZW56E5FKHEFCCOFHILBIMW3M6LWXS2@n")
wantErr := nameError{name: "INDMVBZEEQ4ESVYAKGIYU74EAA.n", err: entryError{typ: "enr", err: errInvalidENR}} wantErr := nameError{name: "INDMVBZEEQ4ESVYAKGIYU74EAA.n", err: entryError{typ: "enr", err: errInvalidENR}}
if err != wantErr { if err != wantErr {
@ -104,7 +104,7 @@ func TestIterator(t *testing.T) {
r := mapResolver(tree.ToTXT("n")) r := mapResolver(tree.ToTXT("n"))
c := NewClient(Config{ c := NewClient(Config{
Resolver: r, Resolver: r,
Logger: testlog.Logger(t, log.LvlTrace), Logger: testlog.Logger(t, log.LvlInfo),
RateLimit: 500, RateLimit: 500,
}) })
it, err := c.NewIterator(url) it, err := c.NewIterator(url)
@ -145,7 +145,7 @@ func TestIteratorLinks(t *testing.T) {
tree2, url2 := makeTestTree("t2", nodes[10:], []string{url1}) tree2, url2 := makeTestTree("t2", nodes[10:], []string{url1})
c := NewClient(Config{ c := NewClient(Config{
Resolver: newMapResolver(tree1.ToTXT("t1"), tree2.ToTXT("t2")), Resolver: newMapResolver(tree1.ToTXT("t1"), tree2.ToTXT("t2")),
Logger: testlog.Logger(t, log.LvlTrace), Logger: testlog.Logger(t, log.LvlInfo),
RateLimit: 500, RateLimit: 500,
}) })
it, err := c.NewIterator(url2) it, err := c.NewIterator(url2)
@ -165,7 +165,7 @@ func TestIteratorNodeUpdates(t *testing.T) {
resolver = newMapResolver() resolver = newMapResolver()
c = NewClient(Config{ c = NewClient(Config{
Resolver: resolver, Resolver: resolver,
Logger: testlog.Logger(t, log.LvlTrace), Logger: testlog.Logger(t, log.LvlInfo),
RecheckInterval: 20 * time.Minute, RecheckInterval: 20 * time.Minute,
RateLimit: 500, RateLimit: 500,
}) })
@ -202,7 +202,7 @@ func TestIteratorRootRecheckOnFail(t *testing.T) {
resolver = newMapResolver() resolver = newMapResolver()
c = NewClient(Config{ c = NewClient(Config{
Resolver: resolver, Resolver: resolver,
Logger: testlog.Logger(t, log.LvlTrace), Logger: testlog.Logger(t, log.LvlInfo),
RecheckInterval: 20 * time.Minute, RecheckInterval: 20 * time.Minute,
RateLimit: 500, RateLimit: 500,
// Disabling the cache is required for this test because the client doesn't // Disabling the cache is required for this test because the client doesn't
@ -239,7 +239,7 @@ func TestIteratorEmptyTree(t *testing.T) {
resolver = newMapResolver() resolver = newMapResolver()
c = NewClient(Config{ c = NewClient(Config{
Resolver: resolver, Resolver: resolver,
Logger: testlog.Logger(t, log.LvlTrace), Logger: testlog.Logger(t, log.LvlInfo),
RecheckInterval: 20 * time.Minute, RecheckInterval: 20 * time.Minute,
RateLimit: 500, RateLimit: 500,
}) })
@ -300,7 +300,7 @@ func TestIteratorLinkUpdates(t *testing.T) {
resolver = newMapResolver() resolver = newMapResolver()
c = NewClient(Config{ c = NewClient(Config{
Resolver: resolver, Resolver: resolver,
Logger: testlog.Logger(t, log.LvlTrace), Logger: testlog.Logger(t, log.LvlInfo),
RecheckInterval: 20 * time.Minute, RecheckInterval: 20 * time.Minute,
RateLimit: 500, RateLimit: 500,
}) })

View File

@ -34,6 +34,9 @@ import (
"github.com/ledgerwatch/log/v3" "github.com/ledgerwatch/log/v3"
) )
func init() {
log.Root().SetHandler(log.LvlFilterHandler(log.LvlInfo, log.StderrHandler))
}
func TestClientRequest(t *testing.T) { func TestClientRequest(t *testing.T) {
server := newTestServer() server := newTestServer()
defer server.Stop() defer server.Stop()

View File

@ -17,6 +17,7 @@
package rpc package rpc
import ( import (
"bytes"
"context" "context"
"encoding/json" "encoding/json"
"reflect" "reflect"
@ -26,6 +27,7 @@ import (
"time" "time"
jsoniter "github.com/json-iterator/go" jsoniter "github.com/json-iterator/go"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/log/v3" "github.com/ledgerwatch/log/v3"
) )
@ -101,12 +103,6 @@ func newHandler(connCtx context.Context, conn jsonWriter, idgen func() ID, reg *
// handleBatch executes all messages in a batch and returns the responses. // handleBatch executes all messages in a batch and returns the responses.
func (h *handler) handleBatch(msgs []*jsonrpcMessage, stream *jsoniter.Stream) { func (h *handler) handleBatch(msgs []*jsonrpcMessage, stream *jsoniter.Stream) {
needWriteStream := false
if stream == nil {
stream = jsoniter.NewStream(jsoniter.ConfigDefault, nil, 4096)
needWriteStream = true
}
// Emit error response for empty batches: // Emit error response for empty batches:
if len(msgs) == 0 { if len(msgs) == 0 {
h.startCallProc(func(cp *callProc) { h.startCallProc(func(cp *callProc) {
@ -127,76 +123,43 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage, stream *jsoniter.Stream) {
} }
// Process calls on a goroutine because they may block indefinitely: // Process calls on a goroutine because they may block indefinitely:
h.startCallProc(func(cp *callProc) { h.startCallProc(func(cp *callProc) {
stream.WriteArrayStart()
firstResponse := true
// All goroutines will place results right to this array. Because requests order must match reply orders. // All goroutines will place results right to this array. Because requests order must match reply orders.
answersWithNils := make([]interface{}, len(msgs))
// Bounded parallelism pattern explanation https://blog.golang.org/pipelines#TOC_9. // Bounded parallelism pattern explanation https://blog.golang.org/pipelines#TOC_9.
boundedConcurrency := make(chan struct{}, h.maxBatchConcurrency) boundedConcurrency := make(chan struct{}, h.maxBatchConcurrency)
defer close(boundedConcurrency) defer close(boundedConcurrency)
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
wg.Add(len(msgs)) wg.Add(len(msgs))
streamMutex := sync.Mutex{}
writeToStream := func(buffer []byte) {
if len(buffer) == 0 {
return
}
streamMutex.Lock()
defer streamMutex.Unlock()
if !firstResponse {
stream.WriteMore()
}
stream.Write(buffer)
firstResponse = false
}
for i := range calls { for i := range calls {
if calls[i].isSubscribe() {
// Force subscribe call to work in non-streaming mode
response := h.handleCallMsg(cp, calls[i], nil)
if response != nil {
b, _ := json.Marshal(response)
writeToStream(b)
}
}
boundedConcurrency <- struct{}{} boundedConcurrency <- struct{}{}
go func(i int) { go func(i int) {
defer func() { defer func() {
wg.Done() wg.Done()
<-boundedConcurrency <-boundedConcurrency
}() }()
cb := h.reg.callback(calls[i].Method)
var response *jsonrpcMessage buf := bytes.NewBuffer(nil)
if cb != nil && cb.streamable { // cb == nil: means no such method and this case is thread-safe stream := jsoniter.NewStream(jsoniter.ConfigDefault, buf, 4096)
batchStream := jsoniter.NewStream(jsoniter.ConfigDefault, nil, 4096) if res := h.handleCallMsg(cp, calls[i], stream); res != nil {
response = h.handleCallMsg(cp, calls[i], batchStream) answersWithNils[i] = res
if response == nil {
writeToStream(batchStream.Buffer())
}
} else {
response = h.handleCallMsg(cp, calls[i], stream)
} }
// Marshal inside goroutine (parallel) _ = stream.Flush()
if response != nil { if buf.Len() > 0 && answersWithNils[i] == nil {
buffer, _ := json.Marshal(response) answersWithNils[i] = json.RawMessage(common.CopyBytes(buf.Bytes()))
writeToStream(buffer)
} }
}(i) }(i)
} }
wg.Wait() wg.Wait()
answers := make([]interface{}, 0, len(msgs))
stream.WriteArrayEnd() for _, answer := range answersWithNils {
stream.Flush() if answer != nil {
answers = append(answers, answer)
if needWriteStream { }
h.conn.writeJSON(cp.ctx, json.RawMessage(stream.Buffer()))
} else {
stream.Write([]byte("\n"))
} }
h.addSubscriptions(cp.notifiers) h.addSubscriptions(cp.notifiers)
if len(answers) > 0 {
h.conn.writeJSON(cp.ctx, answers)
}
for _, n := range cp.notifiers { for _, n := range cp.notifiers {
n.activate() n.activate()
} }

View File

@ -149,7 +149,7 @@ func newCallback(receiver, fn reflect.Value, name string) *callback {
outs[i] = fntype.Out(i) outs[i] = fntype.Out(i)
} }
if len(outs) > 2 { if len(outs) > 2 {
log.Warn("Cannot register RPC callback [%s] - maximum 2 return values are allowed, got %d", name, len(outs)) log.Warn(fmt.Sprintf("Cannot register RPC callback [%s] - maximum 2 return values are allowed, got %d", name, len(outs)))
return nil return nil
} }
// If an error is returned, it must be the last returned value. // If an error is returned, it must be the last returned value.
@ -158,14 +158,14 @@ func newCallback(receiver, fn reflect.Value, name string) *callback {
c.errPos = 0 c.errPos = 0
case len(outs) == 2: case len(outs) == 2:
if isErrorType(outs[0]) || !isErrorType(outs[1]) { if isErrorType(outs[0]) || !isErrorType(outs[1]) {
log.Warn("Cannot register RPC callback [%s] - error must the last return value", name) log.Warn(fmt.Sprintf("Cannot register RPC callback [%s] - error must the last return value", name))
return nil return nil
} }
c.errPos = 1 c.errPos = 1
} }
// If there is only one return value (error), and the last argument is *jsoniter.Stream, mark it as streamable // If there is only one return value (error), and the last argument is *jsoniter.Stream, mark it as streamable
if len(outs) != 1 && c.streamable { if len(outs) != 1 && c.streamable {
log.Warn("Cannot register RPC callback [%s] - streamable method may only return 1 value (error)", name) log.Warn(fmt.Sprintf("Cannot register RPC callback [%s] - streamable method may only return 1 value (error)", name))
return nil return nil
} }
return c return c

View File

@ -1,7 +1,7 @@
// There is no response for all-notification batches. // There is no response for all-notification batches.
--> [{"jsonrpc":"2.0","method":"test_echo","params":["x",99]}] --> [{"jsonrpc":"2.0","method":"test_echo","params":["x",99]}]
<-- []
// This test checks regular batch calls. // This test checks regular batch calls.
--> [{"jsonrpc":"2.0","id":2,"method":"test_echo","params":[]}, {"jsonrpc":"2.0","id": 3,"method":"test_echo","params":["x",3]}] --> [{"jsonrpc":"2.0","id":2,"method":"test_echo","params":[]}, {"jsonrpc":"2.0","id": 3,"method":"test_echo","params":["x",3]}]

View File

@ -19,9 +19,12 @@ package tests
import ( import (
"runtime" "runtime"
"testing" "testing"
"github.com/ledgerwatch/log/v3"
) )
func TestBlockchain(t *testing.T) { func TestBlockchain(t *testing.T) {
log.Root().SetHandler(log.LvlFilterHandler(log.LvlInfo, log.StderrHandler))
if runtime.GOOS == "windows" { if runtime.GOOS == "windows" {
t.Skip("fix me on win please") // after remove ChainReader from consensus engine - this test can be changed to create less databases, then can enable on win. now timeout after 20min t.Skip("fix me on win please") // after remove ChainReader from consensus engine - this test can be changed to create less databases, then can enable on win. now timeout after 20min
} }

View File

@ -27,9 +27,11 @@ import (
"github.com/ledgerwatch/erigon-lib/kv/memdb" "github.com/ledgerwatch/erigon-lib/kv/memdb"
"github.com/ledgerwatch/erigon/core/vm" "github.com/ledgerwatch/erigon/core/vm"
"github.com/ledgerwatch/log/v3"
) )
func TestState(t *testing.T) { func TestState(t *testing.T) {
log.Root().SetHandler(log.LvlFilterHandler(log.LvlInfo, log.StderrHandler))
if runtime.GOOS == "windows" { if runtime.GOOS == "windows" {
t.Skip("fix me on win please") // it's too slow on win, need generally improve speed of this tests t.Skip("fix me on win please") // it's too slow on win, need generally improve speed of this tests
} }

View File

@ -22,9 +22,11 @@ import (
"github.com/ledgerwatch/erigon-lib/kv/memdb" "github.com/ledgerwatch/erigon-lib/kv/memdb"
"github.com/ledgerwatch/erigon/core/vm" "github.com/ledgerwatch/erigon/core/vm"
"github.com/ledgerwatch/log/v3"
) )
func TestVM(t *testing.T) { func TestVM(t *testing.T) {
log.Root().SetHandler(log.LvlFilterHandler(log.LvlInfo, log.StderrHandler))
t.Parallel() t.Parallel()
vmt := new(testMatcher) vmt := new(testMatcher)
vmt.slow("^vmPerformance") vmt.slow("^vmPerformance")