dvovk/tunnelwws (#8745)

- changed communication tunnel to web socket in order to connect to
remote nodes
- changed diagnostics.url flag to diagnostics.addr as now user need to
enter only address and support command will connect to it through
websocket
- changed flag debug.urls to debug.addrs in order to have ability to
change connection type between erigon and support to websocket and don't
change user API
- added auto trying to connect to connect to ws if connection with was
failed
This commit is contained in:
Dmytro 2023-11-16 17:37:29 +01:00 committed by GitHub
parent 27d8865f35
commit a6b5297b3e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 72 additions and 61 deletions

View File

@ -23,8 +23,8 @@ The devnet runs as a single `go` process which can be started with the following
| metrics | N | false | Enable metrics collection and reporting from devnet nodes | | metrics | N | false | Enable metrics collection and reporting from devnet nodes |
| metrics.node | N | 0 | At the moment only one node on the network can produce metrics. This value specifies index of the node in the cluster to attach to | | metrics.node | N | 0 | At the moment only one node on the network can produce metrics. This value specifies index of the node in the cluster to attach to |
| metrics.port | N | 6060 | The network port of the node to connect to for gather ing metrics | | metrics.port | N | 6060 | The network port of the node to connect to for gather ing metrics |
| diagnostics.url | N | | URL of the diagnostics system provided by the support team, include unique session PIN, if this is specified the devnet will start a `support` tunnel and connect to the diagnostics platform to provide metrics from the specified node on the devnet | | diagnostics.addr | N | | Address of the diagnostics system provided by the support team, include unique session PIN, if this is specified the devnet will start a `support` tunnel and connect to the diagnostics platform to provide metrics from the specified node on the devnet |
| insecure | N | false | Used if `diagnostics.url` is set to allow communication with diagnostics system using self-signed TLS certificates | | insecure | N | false | Used if `diagnostics.addr` is set to allow communication with diagnostics system
## Network Configuration ## Network Configuration

View File

@ -105,8 +105,8 @@ var (
} }
DiagnosticsURLFlag = cli.StringFlag{ DiagnosticsURLFlag = cli.StringFlag{
Name: "diagnostics.url", Name: "diagnostics.addr",
Usage: "URL of the diagnostics system provided by the support team, include unique session PIN", Usage: "Address of the diagnostics system provided by the support team, include unique session PIN",
} }
insecureFlag = cli.BoolFlag{ insecureFlag = cli.BoolFlag{

View File

@ -832,8 +832,8 @@ var (
} }
DiagnosticsURLFlag = cli.StringFlag{ DiagnosticsURLFlag = cli.StringFlag{
Name: "diagnostics.url", Name: "diagnostics.addr",
Usage: "URL of the diagnostics system provided by the support team", Usage: "Address of the diagnostics system provided by the support team",
} }
DiagnosticsInsecureFlag = cli.BoolFlag{ DiagnosticsInsecureFlag = cli.BoolFlag{

View File

@ -496,7 +496,7 @@ func (c *Client) write(ctx context.Context, msg interface{}, retry bool) error {
return err return err
} }
} }
err := c.writeConn.writeJSON(ctx, msg) err := c.writeConn.WriteJSON(ctx, msg)
if err != nil { if err != nil {
c.writeConn = nil c.writeConn = nil
if !retry { if !retry {
@ -629,7 +629,7 @@ func (c *Client) read(codec ServerCodec) {
for { for {
msgs, batch, err := codec.ReadBatch() msgs, batch, err := codec.ReadBatch()
if _, ok := err.(*json.SyntaxError); ok { if _, ok := err.(*json.SyntaxError); ok {
codec.writeJSON(context.Background(), errorMessage(&parseError{err.Error()})) codec.WriteJSON(context.Background(), errorMessage(&parseError{err.Error()}))
} }
if err != nil { if err != nil {
c.readErr <- err c.readErr <- err

View File

@ -142,7 +142,7 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) {
// Emit error response for empty batches: // Emit error response for empty batches:
if len(msgs) == 0 { if len(msgs) == 0 {
h.startCallProc(func(cp *callProc) { h.startCallProc(func(cp *callProc) {
h.conn.writeJSON(cp.ctx, errorMessage(&invalidRequestError{"empty batch"})) h.conn.WriteJSON(cp.ctx, errorMessage(&invalidRequestError{"empty batch"}))
}) })
return return
} }
@ -200,7 +200,7 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) {
} }
h.addSubscriptions(cp.notifiers) h.addSubscriptions(cp.notifiers)
if len(answers) > 0 { if len(answers) > 0 {
h.conn.writeJSON(cp.ctx, answers) h.conn.WriteJSON(cp.ctx, answers)
} }
for _, n := range cp.notifiers { for _, n := range cp.notifiers {
n.activate() n.activate()
@ -226,7 +226,7 @@ func (h *handler) handleMsg(msg *jsonrpcMessage, stream *jsoniter.Stream) {
stream.Write(buffer) stream.Write(buffer)
} }
if needWriteStream { if needWriteStream {
h.conn.writeJSON(cp.ctx, json.RawMessage(stream.Buffer())) h.conn.WriteJSON(cp.ctx, json.RawMessage(stream.Buffer()))
} else { } else {
stream.Write([]byte("\n")) stream.Write([]byte("\n"))
} }

View File

@ -55,7 +55,7 @@ type httpConn struct {
} }
// httpConn is treated specially by Client. // httpConn is treated specially by Client.
func (hc *httpConn) writeJSON(context.Context, interface{}) error { func (hc *httpConn) WriteJSON(context.Context, interface{}) error {
panic("writeJSON called on httpConn") panic("writeJSON called on httpConn")
} }

View File

@ -220,7 +220,7 @@ func (c *jsonCodec) ReadBatch() (messages []*jsonrpcMessage, batch bool, err err
return messages, batch, nil return messages, batch, nil
} }
func (c *jsonCodec) writeJSON(ctx context.Context, v interface{}) error { func (c *jsonCodec) WriteJSON(ctx context.Context, v interface{}) error {
c.encMu.Lock() c.encMu.Lock()
defer c.encMu.Unlock() defer c.encMu.Unlock()

View File

@ -124,13 +124,13 @@ func (s *Server) serveSingleRequest(ctx context.Context, codec ServerCodec, stre
reqs, batch, err := codec.ReadBatch() reqs, batch, err := codec.ReadBatch()
if err != nil { if err != nil {
if err != io.EOF { if err != io.EOF {
codec.writeJSON(ctx, errorMessage(&invalidMessageError{"parse error"})) codec.WriteJSON(ctx, errorMessage(&invalidMessageError{"parse error"}))
} }
return return
} }
if batch { if batch {
if s.batchLimit > 0 && len(reqs) > s.batchLimit { if s.batchLimit > 0 && len(reqs) > s.batchLimit {
codec.writeJSON(ctx, errorMessage(fmt.Errorf("batch limit %d exceeded (can increase by --rpc.batch.limit). Requested batch of size: %d", s.batchLimit, len(reqs)))) codec.WriteJSON(ctx, errorMessage(fmt.Errorf("batch limit %d exceeded (can increase by --rpc.batch.limit). Requested batch of size: %d", s.batchLimit, len(reqs))))
} else { } else {
h.handleBatch(reqs) h.handleBatch(reqs)
} }

View File

@ -174,7 +174,7 @@ func (n *Notifier) activate() error {
func (n *Notifier) send(sub *Subscription, data json.RawMessage) error { func (n *Notifier) send(sub *Subscription, data json.RawMessage) error {
params, _ := json.Marshal(&subscriptionResult{ID: string(sub.ID), Result: data}) params, _ := json.Marshal(&subscriptionResult{ID: string(sub.ID), Result: data})
ctx := context.Background() ctx := context.Background()
return n.h.conn.writeJSON(ctx, &jsonrpcMessage{ return n.h.conn.WriteJSON(ctx, &jsonrpcMessage{
Version: vsn, Version: vsn,
Method: n.namespace + notificationMethodSuffix, Method: n.namespace + notificationMethodSuffix,
Params: params, Params: params,

View File

@ -20,12 +20,13 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/ledgerwatch/erigon-lib/common/hexutil"
"math" "math"
"math/big" "math/big"
"strconv" "strconv"
"strings" "strings"
"github.com/ledgerwatch/erigon-lib/common/hexutil"
libcommon "github.com/ledgerwatch/erigon-lib/common" libcommon "github.com/ledgerwatch/erigon-lib/common"
) )
@ -61,7 +62,7 @@ type ServerCodec interface {
// jsonWriter can write JSON messages to its underlying connection. // jsonWriter can write JSON messages to its underlying connection.
// Implementations must be safe for concurrent use. // Implementations must be safe for concurrent use.
type jsonWriter interface { type jsonWriter interface {
writeJSON(context.Context, interface{}) error WriteJSON(context.Context, interface{}) error
// Closed returns a channel which is closed when the connection is closed. // Closed returns a channel which is closed when the connection is closed.
closed() <-chan interface{} closed() <-chan interface{}
// RemoteAddr returns the peer address of the connection. // RemoteAddr returns the peer address of the connection.

View File

@ -63,7 +63,7 @@ func (s *Server) WebsocketHandler(allowedOrigins []string, jwtSecret []byte, com
logger.Warn("WebSocket upgrade failed", "err", err) logger.Warn("WebSocket upgrade failed", "err", err)
return return
} }
codec := newWebsocketCodec(conn) codec := NewWebsocketCodec(conn)
s.ServeCodec(codec, 0) s.ServeCodec(codec, 0)
}) })
} }
@ -205,7 +205,7 @@ func DialWebsocketWithDialer(ctx context.Context, endpoint, origin string, diale
} }
return nil, hErr return nil, hErr
} }
return newWebsocketCodec(conn), nil return NewWebsocketCodec(conn), nil
}, logger) }, logger)
} }
@ -248,7 +248,7 @@ type websocketCodec struct {
pingReset chan struct{} pingReset chan struct{}
} }
func newWebsocketCodec(conn *websocket.Conn) ServerCodec { func NewWebsocketCodec(conn *websocket.Conn) ServerCodec {
conn.SetReadLimit(wsMessageSizeLimit) conn.SetReadLimit(wsMessageSizeLimit)
wc := &websocketCodec{ wc := &websocketCodec{
jsonCodec: NewFuncCodec(conn, conn.WriteJSON, conn.ReadJSON).(*jsonCodec), jsonCodec: NewFuncCodec(conn, conn.WriteJSON, conn.ReadJSON).(*jsonCodec),
@ -265,8 +265,8 @@ func (wc *websocketCodec) Close() {
wc.wg.Wait() wc.wg.Wait()
} }
func (wc *websocketCodec) writeJSON(ctx context.Context, v interface{}) error { func (wc *websocketCodec) WriteJSON(ctx context.Context, v interface{}) error {
err := wc.jsonCodec.writeJSON(ctx, v) err := wc.jsonCodec.WriteJSON(ctx, v)
if err == nil { if err == nil {
// Notify pingLoop to delay the next idle ping. // Notify pingLoop to delay the next idle ping.
select { select {

View File

@ -12,25 +12,36 @@ import (
"os" "os"
"os/signal" "os/signal"
"strconv" "strconv"
"sync"
"syscall" "syscall"
"time" "time"
"github.com/gorilla/websocket"
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote" "github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
"github.com/ledgerwatch/erigon-lib/gointerfaces/types" "github.com/ledgerwatch/erigon-lib/gointerfaces/types"
"github.com/ledgerwatch/erigon/rpc" "github.com/ledgerwatch/erigon/rpc"
"github.com/ledgerwatch/log/v3" "github.com/ledgerwatch/log/v3"
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
"golang.org/x/net/http2"
) )
const (
wsReadBuffer = 1024
wsWriteBuffer = 1024
wsPingInterval = 60 * time.Second
wsPingWriteTimeout = 5 * time.Second
wsMessageSizeLimit = 32 * 1024 * 1024
)
var wsBufferPool = new(sync.Pool)
var ( var (
diagnosticsURLFlag = cli.StringFlag{ diagnosticsURLFlag = cli.StringFlag{
Name: "diagnostics.url", Name: "diagnostics.addr",
Usage: "URL of the diagnostics system provided by the support team, include unique session PIN", Usage: "Address of the diagnostics system provided by the support team, include unique session PIN",
} }
debugURLsFlag = cli.StringSliceFlag{ debugURLsFlag = cli.StringSliceFlag{
Name: "debug.urls", Name: "debug.addrs",
Usage: "Comma separated list of URLs to the debug endpoints thats are being diagnosed", Usage: "Comma separated list of URLs to the debug endpoints thats are being diagnosed",
} }
@ -49,7 +60,7 @@ var supportCommand = cli.Command{
Action: MigrateFlags(connectDiagnostics), Action: MigrateFlags(connectDiagnostics),
Name: "support", Name: "support",
Usage: "Connect Erigon instance to a diagnostics system for support", Usage: "Connect Erigon instance to a diagnostics system for support",
ArgsUsage: "--diagnostics.url <URL for the diagnostics system> --ids <diagnostic session ids allowed to connect> --metrics.urls <http://erigon_host:metrics_port>", ArgsUsage: "--diagnostics.addr <URL for the diagnostics system> --ids <diagnostic session ids allowed to connect> --metrics.urls <http://erigon_host:metrics_port>",
Flags: []cli.Flag{ Flags: []cli.Flag{
&debugURLsFlag, &debugURLsFlag,
&diagnosticsURLFlag, &diagnosticsURLFlag,
@ -73,7 +84,11 @@ func ConnectDiagnostics(cliCtx *cli.Context, logger log.Logger) error {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
debugURLs := cliCtx.StringSlice(debugURLsFlag.Name) debugURLs := []string{}
for _, debugURL := range cliCtx.StringSlice(debugURLsFlag.Name) {
debugURLs = append(debugURLs, "http://"+debugURL)
}
diagnosticsUrl := cliCtx.String(diagnosticsURLFlag.Name) + "/bridge" diagnosticsUrl := cliCtx.String(diagnosticsURLFlag.Name) + "/bridge"
@ -117,28 +132,23 @@ func (c *conn) SetWriteDeadline(time time.Time) error {
return nil return nil
} }
// tunnel operates the tunnel from diagnostics system to the metrics URL for one http/2 request
// needs to be called repeatedly to implement re-connect logic
// tunnel operates the tunnel from diagnostics system to the metrics URL for one http/2 request // tunnel operates the tunnel from diagnostics system to the metrics URL for one http/2 request
// needs to be called repeatedly to implement re-connect logic // needs to be called repeatedly to implement re-connect logic
func tunnel(ctx context.Context, cancel context.CancelFunc, sigs chan os.Signal, tlsConfig *tls.Config, diagnosticsUrl string, sessionIds []string, debugURLs []string, logger log.Logger) error { func tunnel(ctx context.Context, cancel context.CancelFunc, sigs chan os.Signal, tlsConfig *tls.Config, diagnosticsUrl string, sessionIds []string, debugURLs []string, logger log.Logger) error {
diagnosticsClient := &http.Client{Transport: &http2.Transport{TLSClientConfig: tlsConfig}}
defer diagnosticsClient.CloseIdleConnections()
metricsClient := &http.Client{} metricsClient := &http.Client{}
defer metricsClient.CloseIdleConnections() defer metricsClient.CloseIdleConnections()
ctx1, cancel1 := context.WithCancel(ctx) ctx1, cancel1 := context.WithCancel(ctx)
defer cancel1() defer cancel1()
// Create a request object to send to the server
reader, writer := io.Pipe()
go func() { go func() {
select { select {
case <-sigs: case <-sigs:
cancel() cancel()
case <-ctx1.Done(): case <-ctx1.Done():
} }
reader.Close()
writer.Close()
}() }()
type enode struct { type enode struct {
@ -206,21 +216,23 @@ func tunnel(ctx context.Context, cancel context.CancelFunc, sigs chan os.Signal,
} }
} }
req, err := http.NewRequestWithContext(ctx1, http.MethodPost, diagnosticsUrl, reader) dialer := websocket.Dialer{
ReadBufferSize: wsReadBuffer,
if err != nil { WriteBufferSize: wsWriteBuffer,
return err WriteBufferPool: wsBufferPool,
} }
// Create a connection conn, resp, err := dialer.DialContext(ctx1, "wss://"+diagnosticsUrl, nil)
// Apply given context to the sent request
resp, err := diagnosticsClient.Do(req)
if err != nil { if err != nil {
return err conn, resp, err = dialer.DialContext(ctx1, "ws://"+diagnosticsUrl, nil)
if err != nil {
return err
}
} }
if resp.StatusCode != http.StatusOK { if resp.StatusCode != http.StatusSwitchingProtocols {
return fmt.Errorf("support request to %s failed: %s", diagnosticsUrl, resp.Status) return fmt.Errorf("support request to %s failed: %s", diagnosticsUrl, resp.Status)
} }
@ -230,7 +242,10 @@ func tunnel(ctx context.Context, cancel context.CancelFunc, sigs chan os.Signal,
Nodes []*info `json:"nodes"` Nodes []*info `json:"nodes"`
} }
err = json.NewEncoder(writer).Encode(&connectionInfo{ codec := rpc.NewWebsocketCodec(conn)
defer codec.Close()
err = codec.WriteJSON(ctx1, &connectionInfo{
Version: Version, Version: Version,
Sessions: sessionIds, Sessions: sessionIds,
Nodes: func() (replies []*info) { Nodes: func() (replies []*info) {
@ -248,12 +263,6 @@ func tunnel(ctx context.Context, cancel context.CancelFunc, sigs chan os.Signal,
logger.Info("Connected") logger.Info("Connected")
codec := rpc.NewCodec(&conn{
ReadCloser: resp.Body,
PipeWriter: writer,
})
defer codec.Close()
for { for {
requests, _, err := codec.ReadBatch() requests, _, err := codec.ReadBatch()
@ -306,11 +315,10 @@ func tunnel(ctx context.Context, cancel context.CancelFunc, sigs chan os.Signal,
} }
debugURL := node.debugURL + "/debug/" + requests[0].Method + queryString debugURL := node.debugURL + "/debug/" + requests[0].Method + queryString
debugResponse, err := metricsClient.Get(debugURL) debugResponse, err := metricsClient.Get(debugURL)
if err != nil { if err != nil {
return json.NewEncoder(writer).Encode(&nodeResponse{ return codec.WriteJSON(ctx1, &nodeResponse{
Id: requestId, Id: requestId,
Error: &responseError{ Error: &responseError{
Code: http.StatusFailedDependency, Code: http.StatusFailedDependency,
@ -322,9 +330,10 @@ func tunnel(ctx context.Context, cancel context.CancelFunc, sigs chan os.Signal,
defer debugResponse.Body.Close() defer debugResponse.Body.Close()
if resp.StatusCode != http.StatusOK { //Websocket ok message
if resp.StatusCode != http.StatusSwitchingProtocols {
body, _ := io.ReadAll(debugResponse.Body) body, _ := io.ReadAll(debugResponse.Body)
return json.NewEncoder(writer).Encode(&nodeResponse{ return codec.WriteJSON(ctx1, &nodeResponse{
Id: requestId, Id: requestId,
Error: &responseError{ Error: &responseError{
Code: int64(resp.StatusCode), Code: int64(resp.StatusCode),
@ -339,7 +348,7 @@ func tunnel(ctx context.Context, cancel context.CancelFunc, sigs chan os.Signal,
switch debugResponse.Header.Get("Content-Type") { switch debugResponse.Header.Get("Content-Type") {
case "application/json": case "application/json":
if _, err := io.Copy(buffer, debugResponse.Body); err != nil { if _, err := io.Copy(buffer, debugResponse.Body); err != nil {
return json.NewEncoder(writer).Encode(&nodeResponse{ return codec.WriteJSON(ctx1, &nodeResponse{
Id: requestId, Id: requestId,
Error: &responseError{ Error: &responseError{
Code: http.StatusInternalServerError, Code: http.StatusInternalServerError,
@ -347,10 +356,11 @@ func tunnel(ctx context.Context, cancel context.CancelFunc, sigs chan os.Signal,
}, },
Last: true, Last: true,
}) })
} }
case "application/octet-stream": case "application/octet-stream":
if _, err := io.Copy(buffer, debugResponse.Body); err != nil { if _, err := io.Copy(buffer, debugResponse.Body); err != nil {
return json.NewEncoder(writer).Encode(&nodeResponse{ return codec.WriteJSON(ctx1, &nodeResponse{
Id: requestId, Id: requestId,
Error: &responseError{ Error: &responseError{
Code: int64(http.StatusInternalServerError), Code: int64(http.StatusInternalServerError),
@ -376,7 +386,7 @@ func tunnel(ctx context.Context, cancel context.CancelFunc, sigs chan os.Signal,
buffer = bytes.NewBuffer(data) buffer = bytes.NewBuffer(data)
if err != nil { if err != nil {
return json.NewEncoder(writer).Encode(&nodeResponse{ return codec.WriteJSON(ctx1, &nodeResponse{
Id: requestId, Id: requestId,
Error: &responseError{ Error: &responseError{
Code: int64(http.StatusInternalServerError), Code: int64(http.StatusInternalServerError),
@ -387,7 +397,7 @@ func tunnel(ctx context.Context, cancel context.CancelFunc, sigs chan os.Signal,
} }
default: default:
return json.NewEncoder(writer).Encode(&nodeResponse{ return codec.WriteJSON(ctx1, &nodeResponse{
Id: requestId, Id: requestId,
Error: &responseError{ Error: &responseError{
Code: int64(http.StatusInternalServerError), Code: int64(http.StatusInternalServerError),
@ -397,7 +407,7 @@ func tunnel(ctx context.Context, cancel context.CancelFunc, sigs chan os.Signal,
}) })
} }
return json.NewEncoder(writer).Encode(&nodeResponse{ return codec.WriteJSON(ctx1, &nodeResponse{
Id: requestId, Id: requestId,
Result: json.RawMessage(buffer.Bytes()), Result: json.RawMessage(buffer.Bytes()),
Last: true, Last: true,