rpc: fix error stream (#4860)

This commit is contained in:
Alex Sharov 2022-07-29 10:01:13 +07:00 committed by GitHub
parent 454cea4811
commit 2681ee392a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 52 additions and 52 deletions

View File

@ -7,12 +7,12 @@ Service to seed/download historical data (snapshots, immutable .seg files) by Bi
As many other Erigon components (txpool, sentry, rpc daemon) it may be built-into Erigon or run as separated process.
```shell
# 1. Downloader by default run inside Erigon, by `--snapshots=true` flag:
erigon --snapshots=true --datadir=<your_datadir>
# 1. Downloader by default run inside Erigon, by `--snapshots` flag:
erigon --snapshots --datadir=<your_datadir>
```
```shell
# 2. It's possible to start Downloader as independent process, by `--snapshots=true --downloader.api.addr=127.0.0.1:9093` flags:
# 2. It's possible to start Downloader as independent process, by `--snapshots --downloader.api.addr=127.0.0.1:9093` flags:
make erigon downloader
# Start downloader (can limit network usage by 512mb/sec: --torrent.download.rate=512mb --torrent.upload.rate=512mb)
@ -21,7 +21,7 @@ downloader --downloader.api.addr=127.0.0.1:9093 --torrent.port=42068 --datadir=<
# --torrent.port=42068 - is for public BitTorrent protocol listen
# Erigon on startup does send list of .torrent files to Downloader and wait for 100% download accomplishment
erigon --snapshots=true --downloader.api.addr=127.0.0.1:9093 --datadir=<your_datadir>
erigon --snapshots --downloader.api.addr=127.0.0.1:9093 --datadir=<your_datadir>
```
Use `--snap.keepblocks=true` to don't delete retired blocks from DB
@ -32,7 +32,7 @@ Any network/chain can start with snapshot sync:
- node will move old blocks from DB to snapshots of 1K blocks size, then merge snapshots to bigger range, until
snapshots of 500K blocks, then automatically start seeding new snapshot
Flag `--snapshots=true` is compatible with `--prune` flag
Flag `--snapshots` is compatible with `--prune` flag
## How to create new network or bootnode
@ -50,7 +50,7 @@ downloader torrent_hashes --rebuild --datadir=<your_datadir>
# Start downloader (seeds automatically)
downloader --downloader.api.addr=127.0.0.1:9093 --datadir=<your_datadir>
# Erigon is not required for snapshots seeding. But Erigon with --snapshots=true also does seeding.
# Erigon is not required for snapshots seeding. But Erigon with --snapshots also does seeding.
```
Additional info:

View File

@ -20,6 +20,7 @@ import (
"bytes"
"context"
"encoding/json"
"fmt"
"reflect"
"strconv"
"strings"
@ -458,58 +459,57 @@ func (h *handler) handleSubscribe(cp *callProc, msg *jsonrpcMessage, stream *jso
// runMethod runs the Go callback for an RPC method.
func (h *handler) runMethod(ctx context.Context, msg *jsonrpcMessage, callb *callback, args []reflect.Value, stream *jsoniter.Stream) *jsonrpcMessage {
if callb.streamable {
stream.WriteObjectStart()
stream.WriteObjectField("jsonrpc")
stream.WriteString("2.0")
stream.WriteMore()
if msg.ID != nil {
stream.WriteObjectField("id")
stream.Write(msg.ID)
stream.WriteMore()
}
stream.WriteObjectField("result")
_, err := callb.call(ctx, msg.Method, args, stream)
if err != nil {
return msg.errorResponse(err)
/*
stream.WriteMore()
stream.WriteObjectField("error")
stream.WriteObjectStart()
stream.WriteObjectField("code")
ec, ok := err.(Error)
if ok {
stream.WriteInt(ec.ErrorCode())
} else {
stream.WriteInt(defaultErrorCode)
}
stream.WriteMore()
stream.WriteObjectField("message")
stream.WriteString(fmt.Sprintf("%v", err))
de, ok := err.(DataError)
if ok {
stream.WriteMore()
stream.WriteObjectField("data")
data, derr := json.Marshal(de.ErrorData())
if derr == nil {
stream.Write(data)
} else {
stream.WriteString(fmt.Sprintf("%v", derr))
}
}
stream.WriteObjectEnd()
*/
}
stream.WriteObjectEnd()
stream.Flush()
return nil
} else {
if !callb.streamable {
result, err := callb.call(ctx, msg.Method, args, stream)
if err != nil {
return msg.errorResponse(err)
}
return msg.response(result)
}
stream.WriteObjectStart()
stream.WriteObjectField("jsonrpc")
stream.WriteString("2.0")
stream.WriteMore()
if msg.ID != nil {
stream.WriteObjectField("id")
stream.Write(msg.ID)
stream.WriteMore()
}
stream.WriteObjectField("result")
_, err := callb.call(ctx, msg.Method, args, stream)
if err != nil {
//return msg.errorResponse(err)
stream.WriteMore()
stream.WriteObjectField("error")
stream.WriteObjectStart()
stream.WriteObjectField("code")
ec, ok := err.(Error)
if ok {
stream.WriteInt(ec.ErrorCode())
} else {
stream.WriteInt(defaultErrorCode)
}
stream.WriteMore()
stream.WriteObjectField("message")
stream.WriteString(fmt.Sprintf("%v", err))
de, ok := err.(DataError)
if ok {
stream.WriteMore()
stream.WriteObjectField("data")
data, derr := json.Marshal(de.ErrorData())
if derr == nil {
stream.Write(data)
} else {
stream.WriteString(fmt.Sprintf("%v", derr))
}
}
stream.WriteObjectEnd()
}
stream.WriteObjectEnd()
stream.Flush()
return nil
}
// unsubscribe is the callback function for all *_unsubscribe calls.