2020-08-19 11:46:20 +00:00
package cli
import (
"context"
2022-02-28 11:07:09 +00:00
"crypto/rand"
2021-04-24 15:46:29 +00:00
"encoding/binary"
2022-03-14 13:47:26 +00:00
"errors"
2020-08-19 11:46:20 +00:00
"fmt"
2021-12-06 12:03:46 +00:00
"net"
2020-09-18 10:23:35 +00:00
"net/http"
2022-02-28 11:07:09 +00:00
"os"
2022-02-12 13:33:09 +00:00
"path/filepath"
2022-02-12 11:40:19 +00:00
"strings"
2020-10-10 06:06:54 +00:00
"time"
2020-09-18 10:23:35 +00:00
2022-11-20 03:41:30 +00:00
"github.com/ledgerwatch/erigon-lib/common/datadir"
2022-10-02 07:48:26 +00:00
"github.com/ledgerwatch/erigon-lib/common/dir"
2022-08-12 09:13:14 +00:00
libstate "github.com/ledgerwatch/erigon-lib/state"
2022-11-07 13:04:31 +00:00
2022-08-19 02:35:43 +00:00
"github.com/ledgerwatch/erigon/eth/ethconfig"
2022-07-06 10:44:06 +00:00
"github.com/ledgerwatch/erigon/rpc/rpccfg"
2022-10-25 02:58:25 +00:00
"github.com/ledgerwatch/erigon/turbo/debug"
"github.com/ledgerwatch/erigon/turbo/logging"
2022-07-06 10:44:06 +00:00
2022-02-12 12:47:19 +00:00
"github.com/ledgerwatch/erigon-lib/direct"
2021-07-01 21:31:14 +00:00
"github.com/ledgerwatch/erigon-lib/gointerfaces"
2021-09-03 04:19:35 +00:00
"github.com/ledgerwatch/erigon-lib/gointerfaces/grpcutil"
2021-08-07 05:03:12 +00:00
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
2022-02-11 10:11:59 +00:00
"github.com/ledgerwatch/erigon-lib/gointerfaces/txpool"
2021-07-29 11:53:13 +00:00
"github.com/ledgerwatch/erigon-lib/kv"
2021-09-29 01:36:25 +00:00
"github.com/ledgerwatch/erigon-lib/kv/kvcache"
2021-07-29 11:53:13 +00:00
kv2 "github.com/ledgerwatch/erigon-lib/kv/mdbx"
2021-08-17 08:52:55 +00:00
"github.com/ledgerwatch/erigon-lib/kv/remotedb"
2021-09-15 07:22:57 +00:00
"github.com/ledgerwatch/erigon-lib/kv/remotedbserver"
2022-11-07 13:04:31 +00:00
"github.com/ledgerwatch/log/v3"
"github.com/spf13/cobra"
"golang.org/x/sync/semaphore"
"google.golang.org/grpc"
grpcHealth "google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
2022-02-16 17:38:54 +00:00
"github.com/ledgerwatch/erigon/cmd/rpcdaemon/cli/httpcfg"
2021-09-28 09:27:57 +00:00
"github.com/ledgerwatch/erigon/cmd/rpcdaemon/health"
2022-05-26 03:31:06 +00:00
"github.com/ledgerwatch/erigon/cmd/rpcdaemon/rpcservices"
2021-05-20 18:25:53 +00:00
"github.com/ledgerwatch/erigon/cmd/utils"
2022-02-28 11:07:09 +00:00
"github.com/ledgerwatch/erigon/common"
2022-03-14 13:47:26 +00:00
"github.com/ledgerwatch/erigon/common/hexutil"
2021-05-20 18:25:53 +00:00
"github.com/ledgerwatch/erigon/common/paths"
2021-12-05 02:03:08 +00:00
"github.com/ledgerwatch/erigon/core/rawdb"
2021-05-20 18:25:53 +00:00
"github.com/ledgerwatch/erigon/node"
2022-05-26 05:27:44 +00:00
"github.com/ledgerwatch/erigon/node/nodecfg"
2021-12-05 02:03:08 +00:00
"github.com/ledgerwatch/erigon/params"
2021-05-20 18:25:53 +00:00
"github.com/ledgerwatch/erigon/rpc"
2022-06-10 15:18:43 +00:00
"github.com/ledgerwatch/erigon/turbo/rpchelper"
2022-05-26 03:31:06 +00:00
"github.com/ledgerwatch/erigon/turbo/services"
2021-11-14 04:08:52 +00:00
"github.com/ledgerwatch/erigon/turbo/snapshotsync"
2022-05-10 08:54:44 +00:00
"github.com/ledgerwatch/erigon/turbo/snapshotsync/snap"
2020-08-19 11:46:20 +00:00
)
var rootCmd = & cobra . Command {
Use : "rpcdaemon" ,
2021-05-26 10:35:39 +00:00
Short : "rpcdaemon is JSON RPC server that connects to Erigon node for remote DB access" ,
2020-08-19 11:46:20 +00:00
}
2022-11-06 11:22:12 +00:00
var (
stateCacheStr string
)
2022-02-16 17:38:54 +00:00
func RootCommand ( ) ( * cobra . Command , * httpcfg . HttpCfg ) {
2022-10-21 05:36:17 +00:00
utils . CobraFlags ( rootCmd , debug . Flags , utils . MetricFlags , logging . Flags )
2020-08-19 11:46:20 +00:00
2022-08-05 03:15:40 +00:00
cfg := & httpcfg . HttpCfg { Enabled : true , StateCache : kvcache . DefaultCoherentConfig }
2021-09-03 04:19:35 +00:00
rootCmd . PersistentFlags ( ) . StringVar ( & cfg . PrivateApiAddr , "private.api.addr" , "127.0.0.1:9090" , "private api network address, for example: 127.0.0.1:9090" )
2022-02-22 17:39:48 +00:00
rootCmd . PersistentFlags ( ) . StringVar ( & cfg . DataDir , "datadir" , "" , "path to Erigon working directory" )
2022-05-26 05:27:44 +00:00
rootCmd . PersistentFlags ( ) . StringVar ( & cfg . HttpListenAddress , "http.addr" , nodecfg . DefaultHTTPHost , "HTTP-RPC server listening interface" )
2020-09-11 20:17:37 +00:00
rootCmd . PersistentFlags ( ) . StringVar ( & cfg . TLSCertfile , "tls.cert" , "" , "certificate for client side TLS handshake" )
2020-09-19 14:16:04 +00:00
rootCmd . PersistentFlags ( ) . StringVar ( & cfg . TLSKeyFile , "tls.key" , "" , "key file for client side TLS handshake" )
rootCmd . PersistentFlags ( ) . StringVar ( & cfg . TLSCACert , "tls.cacert" , "" , "CA certificate for client side TLS handshake" )
2022-05-26 05:27:44 +00:00
rootCmd . PersistentFlags ( ) . IntVar ( & cfg . HttpPort , "http.port" , nodecfg . DefaultHTTPPort , "HTTP-RPC server listening port" )
2020-08-19 11:46:20 +00:00
rootCmd . PersistentFlags ( ) . StringSliceVar ( & cfg . HttpCORSDomain , "http.corsdomain" , [ ] string { } , "Comma separated list of domains from which to accept cross origin requests (browser enforced)" )
2022-05-26 05:27:44 +00:00
rootCmd . PersistentFlags ( ) . StringSliceVar ( & cfg . HttpVirtualHost , "http.vhosts" , nodecfg . DefaultConfig . HTTPVirtualHosts , "Comma separated list of virtual hostnames from which to accept requests (server enforced). Accepts '*' wildcard." )
2021-05-31 16:18:32 +00:00
rootCmd . PersistentFlags ( ) . BoolVar ( & cfg . HttpCompression , "http.compression" , true , "Disable http compression" )
2022-09-01 15:44:37 +00:00
rootCmd . PersistentFlags ( ) . StringSliceVar ( & cfg . API , "http.api" , [ ] string { "eth" , "erigon" } , "API's offered over the HTTP-RPC interface: eth,erigon,web3,net,debug,trace,txpool,db. Supported methods: https://github.com/ledgerwatch/erigon/tree/devel/cmd/rpcdaemon" )
2021-08-23 14:30:21 +00:00
rootCmd . PersistentFlags ( ) . Uint64Var ( & cfg . Gascap , "rpc.gascap" , 50000000 , "Sets a cap on gas that can be used in eth_call/estimateGas" )
2020-08-29 15:50:24 +00:00
rootCmd . PersistentFlags ( ) . Uint64Var ( & cfg . MaxTraces , "trace.maxtraces" , 200 , "Sets a limit on traces that can be returned in trace_filter" )
2020-09-01 16:00:47 +00:00
rootCmd . PersistentFlags ( ) . BoolVar ( & cfg . WebsocketEnabled , "ws" , false , "Enable Websockets" )
2021-06-11 09:21:39 +00:00
rootCmd . PersistentFlags ( ) . BoolVar ( & cfg . WebsocketCompression , "ws.compression" , false , "Enable Websocket compression (RFC 7692)" )
2020-11-10 09:08:42 +00:00
rootCmd . PersistentFlags ( ) . StringVar ( & cfg . RpcAllowListFilePath , "rpc.accessList" , "" , "Specify granular (method-by-method) API allowlist" )
2022-07-06 10:44:06 +00:00
rootCmd . PersistentFlags ( ) . UintVar ( & cfg . RpcBatchConcurrency , utils . RpcBatchConcurrencyFlag . Name , 2 , utils . RpcBatchConcurrencyFlag . Usage )
rootCmd . PersistentFlags ( ) . BoolVar ( & cfg . RpcStreamingDisable , utils . RpcStreamingDisableFlag . Name , false , utils . RpcStreamingDisableFlag . Usage )
2022-08-13 09:16:18 +00:00
rootCmd . PersistentFlags ( ) . IntVar ( & cfg . DBReadConcurrency , utils . DBReadConcurrencyFlag . Name , utils . DBReadConcurrencyFlag . Value , utils . DBReadConcurrencyFlag . Usage )
2021-06-16 17:24:56 +00:00
rootCmd . PersistentFlags ( ) . BoolVar ( & cfg . TraceCompatibility , "trace.compat" , false , "Bug for bug compatibility with OE for trace_ routines" )
2022-02-17 02:06:22 +00:00
rootCmd . PersistentFlags ( ) . StringVar ( & cfg . TxPoolApiAddr , "txpool.api.addr" , "" , "txpool api network address, for example: 127.0.0.1:9090 (default: use value of --private.api.addr)" )
2022-06-05 06:33:55 +00:00
rootCmd . PersistentFlags ( ) . BoolVar ( & cfg . Sync . UseSnapshots , "snapshot" , true , utils . SnapshotFlag . Usage )
2022-11-06 11:22:12 +00:00
rootCmd . PersistentFlags ( ) . StringVar ( & stateCacheStr , "state.cache" , "0MB" , "Amount of data to store in StateCache (enabled if no --datadir set). Set 0 to disable StateCache. Defaults to 0MB RAM" )
2021-12-06 12:03:46 +00:00
rootCmd . PersistentFlags ( ) . BoolVar ( & cfg . GRPCServerEnabled , "grpc" , false , "Enable GRPC server" )
2022-05-26 05:27:44 +00:00
rootCmd . PersistentFlags ( ) . StringVar ( & cfg . GRPCListenAddress , "grpc.addr" , nodecfg . DefaultGRPCHost , "GRPC server listening interface" )
rootCmd . PersistentFlags ( ) . IntVar ( & cfg . GRPCPort , "grpc.port" , nodecfg . DefaultGRPCPort , "GRPC server listening port" )
2021-12-06 12:03:46 +00:00
rootCmd . PersistentFlags ( ) . BoolVar ( & cfg . GRPCHealthCheckEnabled , "grpc.healthcheck" , false , "Enable GRPC health check" )
allow rpcdaemon to bind to tcp (#6184)
this pr adds CLI flag to allow the rpcdaemon to bind to a TCP port.
this is very useful if one wants to maintain a remote connection with
the rpcdaemon without using websocket. This is useful because a lot of
issues come with the websocket protocol (compression, max size, etc).
TCP socket gets around these things (it is just raw json over tcp
stream)
the rpc package already supports this, it was just a matter of adding
the bind.
try `echo
'{"jsonrpc":"2.0","method":"eth_blockNumber","id":"1","params":[""]}' |
nc localhost 8548` as a basic test
to test. Subscriptions are also working (idk how to send keepalives with
netcat)
the default rpc.(*Client).Dial method does not support TCP. I have not
included that in this PR. The code for such is as follow
```
// DialTCP create a new TCP client that connects to the given endpoint.
//
// The context is used for the initial connection establishment. It does not
// affect subsequent interactions with the client.
func DialTCP(ctx context.Context, endpoint string) (*Client, error) {
parsed, err := url.Parse(endpoint)
if err != nil {
return nil, err
}
ans := make(chan *Client)
errc := make(chan error)
go func() {
client, err := newClient(ctx, func(ctx context.Context) (ServerCodec, error) {
conn, err := net.Dial("tcp", parsed.Host)
if err != nil {
return nil, err
}
return NewCodec(conn), nil
})
if err != nil {
errc <- err
return
}
ans <- client
}()
select {
case err := <-errc:
return nil, err
case a := <-ans:
return a, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}
// DialContext creates a new RPC client, just like Dial.
//
// The context is used to cancel or time out the initial connection establishment. It does
// not affect subsequent interactions with the client.
func DialContext(ctx context.Context, rawurl string) (*Client, error) {
u, err := url.Parse(rawurl)
if err != nil {
return nil, err
}
switch u.Scheme {
case "http", "https":
return DialHTTP(rawurl)
case "ws", "wss":
return DialWebsocket(ctx, rawurl, "")
case "tcp":
return DialTCP(ctx, rawurl)
case "stdio":
return DialStdIO(ctx)
case "":
return DialIPC(ctx, rawurl)
default:
return nil, fmt.Errorf("no known transport for URL scheme %q", u.Scheme)
}
}
```
let me know if you would like me to add this to the PR as well. the TCP
connection can then be established with `rpc.Dial("tcp://host:port")`
2022-12-03 07:22:47 +00:00
rootCmd . PersistentFlags ( ) . BoolVar ( & cfg . TCPServerEnabled , "tcp" , false , "Enable TCP server" )
rootCmd . PersistentFlags ( ) . StringVar ( & cfg . TCPListenAddress , "tcp.addr" , nodecfg . DefaultTCPHost , "TCP server listening interface" )
rootCmd . PersistentFlags ( ) . IntVar ( & cfg . TCPPort , "tcp.port" , nodecfg . DefaultTCPPort , "TCP server listening port" )
2022-06-19 12:40:28 +00:00
rootCmd . PersistentFlags ( ) . BoolVar ( & cfg . TraceRequests , utils . HTTPTraceFlag . Name , false , "Trace HTTP requests with INFO level" )
2022-07-04 11:07:45 +00:00
rootCmd . PersistentFlags ( ) . DurationVar ( & cfg . HTTPTimeouts . ReadTimeout , "http.timeouts.read" , rpccfg . DefaultHTTPTimeouts . ReadTimeout , "Maximum duration for reading the entire request, including the body." )
rootCmd . PersistentFlags ( ) . DurationVar ( & cfg . HTTPTimeouts . WriteTimeout , "http.timeouts.write" , rpccfg . DefaultHTTPTimeouts . WriteTimeout , "Maximum duration before timing out writes of the response. It is reset whenever a new request's header is read" )
rootCmd . PersistentFlags ( ) . DurationVar ( & cfg . HTTPTimeouts . IdleTimeout , "http.timeouts.idle" , rpccfg . DefaultHTTPTimeouts . IdleTimeout , "Maximum amount of time to wait for the next request when keep-alives are enabled. If http.timeouts.idle is zero, the value of http.timeouts.read is used" )
2022-09-17 06:25:27 +00:00
rootCmd . PersistentFlags ( ) . DurationVar ( & cfg . EvmCallTimeout , "rpc.evmtimeout" , rpccfg . DefaultEvmCallTimeout , "Maximum amount of time to wait for the answer from EVM call." )
2020-11-10 09:08:42 +00:00
if err := rootCmd . MarkPersistentFlagFilename ( "rpc.accessList" , "json" ) ; err != nil {
panic ( err )
}
2021-04-19 05:44:14 +00:00
if err := rootCmd . MarkPersistentFlagDirname ( "datadir" ) ; err != nil {
panic ( err )
}
rootCmd . PersistentPreRunE = func ( cmd * cobra . Command , args [ ] string ) error {
2022-10-21 05:36:17 +00:00
if err := debug . SetupCobra ( cmd ) ; err != nil {
2021-04-19 14:14:12 +00:00
return err
}
2022-11-06 11:22:12 +00:00
err := cfg . StateCache . CacheSize . UnmarshalText ( [ ] byte ( stateCacheStr ) )
if err != nil {
return fmt . Errorf ( "state.cache value of %v is not valid" , stateCacheStr )
}
err = cfg . StateCache . CodeCacheSize . UnmarshalText ( [ ] byte ( stateCacheStr ) )
if err != nil {
return fmt . Errorf ( "state.cache value of %v is not valid" , stateCacheStr )
}
2022-06-07 03:24:50 +00:00
cfg . WithDatadir = cfg . DataDir != ""
2022-05-10 08:54:44 +00:00
if cfg . WithDatadir {
2022-02-22 17:39:48 +00:00
if cfg . DataDir == "" {
cfg . DataDir = paths . DefaultDataDir ( )
2021-04-19 05:44:14 +00:00
}
2022-06-07 03:24:50 +00:00
cfg . Dirs = datadir . New ( cfg . DataDir )
2021-04-19 05:44:14 +00:00
}
2022-02-17 02:06:22 +00:00
if cfg . TxPoolApiAddr == "" {
cfg . TxPoolApiAddr = cfg . PrivateApiAddr
}
2021-04-19 05:44:14 +00:00
return nil
}
rootCmd . PersistentPostRunE = func ( cmd * cobra . Command , args [ ] string ) error {
2022-10-21 05:36:17 +00:00
debug . Exit ( )
2021-04-19 05:44:14 +00:00
return nil
}
2020-08-19 11:46:20 +00:00
2021-09-29 01:36:25 +00:00
cfg . StateCache . MetricsLabel = "rpc"
2020-08-19 11:46:20 +00:00
return rootCmd , cfg
}
2021-09-29 01:36:25 +00:00
type StateChangesClient interface {
StateChanges ( ctx context . Context , in * remote . StateChangeRequest , opts ... grpc . CallOption ) ( remote . KV_StateChangesClient , error )
}
func subscribeToStateChangesLoop ( ctx context . Context , client StateChangesClient , cache kvcache . Cache ) {
go func ( ) {
for {
select {
case <- ctx . Done ( ) :
return
default :
}
if err := subscribeToStateChanges ( ctx , client , cache ) ; err != nil {
2022-02-18 02:54:38 +00:00
if grpcutil . IsRetryLater ( err ) || grpcutil . IsEndOfStream ( err ) {
time . Sleep ( 3 * time . Second )
2021-09-29 01:36:25 +00:00
continue
}
log . Warn ( "[txpool.handleStateChanges]" , "err" , err )
}
}
} ( )
}
func subscribeToStateChanges ( ctx context . Context , client StateChangesClient , cache kvcache . Cache ) error {
streamCtx , cancel := context . WithCancel ( ctx )
defer cancel ( )
stream , err := client . StateChanges ( streamCtx , & remote . StateChangeRequest { WithStorage : true , WithTransactions : false } , grpc . WaitForReady ( true ) )
if err != nil {
return err
}
for req , err := stream . Recv ( ) ; ; req , err = stream . Recv ( ) {
if err != nil {
return err
}
if req == nil {
return nil
}
cache . OnNewBlock ( req )
}
}
func checkDbCompatibility ( ctx context . Context , db kv . RoDB ) error {
2021-04-24 15:46:29 +00:00
// DB schema version compatibility check
var version [ ] byte
var compatErr error
2021-07-28 02:47:38 +00:00
var compatTx kv . Tx
2021-09-29 01:36:25 +00:00
if compatTx , compatErr = db . BeginRo ( ctx ) ; compatErr != nil {
2021-04-24 15:46:29 +00:00
return fmt . Errorf ( "open Ro Tx for DB schema compability check: %w" , compatErr )
}
defer compatTx . Rollback ( )
2021-07-28 02:47:38 +00:00
if version , compatErr = compatTx . GetOne ( kv . DatabaseInfo , kv . DBSchemaVersionKey ) ; compatErr != nil {
2021-04-24 15:46:29 +00:00
return fmt . Errorf ( "read version for DB schema compability check: %w" , compatErr )
}
if len ( version ) != 12 {
2021-05-26 10:35:39 +00:00
return fmt . Errorf ( "database does not have major schema version. upgrade and restart Erigon core" )
2021-04-24 15:46:29 +00:00
}
2021-07-17 02:09:56 +00:00
major := binary . BigEndian . Uint32 ( version )
2021-04-24 15:46:29 +00:00
minor := binary . BigEndian . Uint32 ( version [ 4 : ] )
patch := binary . BigEndian . Uint32 ( version [ 8 : ] )
var compatible bool
2021-07-28 02:47:38 +00:00
dbSchemaVersion := & kv . DBSchemaVersion
2021-05-22 09:20:43 +00:00
if major != dbSchemaVersion . Major {
2021-04-24 15:46:29 +00:00
compatible = false
2021-05-22 09:20:43 +00:00
} else if minor != dbSchemaVersion . Minor {
2021-04-24 15:46:29 +00:00
compatible = false
} else {
compatible = true
}
if ! compatible {
return fmt . Errorf ( "incompatible DB Schema versions: reader %d.%d.%d, database %d.%d.%d" ,
2021-05-22 09:20:43 +00:00
dbSchemaVersion . Major , dbSchemaVersion . Minor , dbSchemaVersion . Patch ,
2021-04-24 15:46:29 +00:00
major , minor , patch )
}
2021-05-22 09:20:43 +00:00
log . Info ( "DB schemas compatible" , "reader" , fmt . Sprintf ( "%d.%d.%d" , dbSchemaVersion . Major , dbSchemaVersion . Minor , dbSchemaVersion . Patch ) ,
2021-04-24 15:46:29 +00:00
"database" , fmt . Sprintf ( "%d.%d.%d" , major , minor , patch ) )
return nil
}
2022-08-19 02:35:43 +00:00
func EmbeddedServices ( ctx context . Context ,
erigonDB kv . RoDB , stateCacheCfg kvcache . CoherentConfig ,
2022-11-07 13:04:31 +00:00
blockReader services . FullBlockReader , ethBackendServer remote . ETHBACKENDServer , txPoolServer txpool . TxpoolServer ,
miningServer txpool . MiningServer , stateDiffClient StateChangesClient ,
2022-09-18 10:41:01 +00:00
) ( eth rpchelper . ApiBackend , txPool txpool . TxpoolClient , mining txpool . MiningClient , stateCache kvcache . Cache , ff * rpchelper . Filters , err error ) {
2022-11-06 11:22:12 +00:00
if stateCacheCfg . CacheSize > 0 {
2022-08-10 10:30:02 +00:00
// notification about new blocks (state stream) doesn't work now inside erigon - because
// erigon does send this stream to privateAPI (erigon with enabled rpc, still have enabled privateAPI).
// without this state stream kvcache can't work and only slow-down things
2022-11-06 11:22:12 +00:00
// ... adding back in place to see about the above statement
stateCache = kvcache . New ( stateCacheCfg )
2022-02-12 12:47:19 +00:00
} else {
stateCache = kvcache . NewDummy ( )
}
2022-11-07 13:04:31 +00:00
2022-02-16 04:24:51 +00:00
subscribeToStateChangesLoop ( ctx , stateDiffClient , stateCache )
2022-02-12 12:47:19 +00:00
directClient := direct . NewEthBackendClientDirect ( ethBackendServer )
2022-05-26 03:31:06 +00:00
eth = rpcservices . NewRemoteBackend ( directClient , erigonDB , blockReader )
2022-02-12 12:47:19 +00:00
txPool = direct . NewTxPoolClient ( txPoolServer )
mining = direct . NewMiningClient ( miningServer )
2022-06-10 15:18:43 +00:00
ff = rpchelper . New ( ctx , eth , txPool , mining , func ( ) { } )
2022-08-12 09:13:14 +00:00
2022-02-12 12:47:19 +00:00
return
}
2022-02-11 10:11:59 +00:00
// RemoteServices - use when RPCDaemon run as independent process. Still it can use --datadir flag to enable
2022-05-10 08:54:44 +00:00
// `cfg.WithDatadir` (mode when it on 1 machine with Erigon)
2022-02-16 17:38:54 +00:00
func RemoteServices ( ctx context . Context , cfg httpcfg . HttpCfg , logger log . Logger , rootCancel context . CancelFunc ) (
2022-02-12 12:47:19 +00:00
db kv . RoDB , borDb kv . RoDB ,
2022-06-10 15:18:43 +00:00
eth rpchelper . ApiBackend , txPool txpool . TxpoolClient , mining txpool . MiningClient ,
2022-06-01 02:57:12 +00:00
stateCache kvcache . Cache , blockReader services . FullBlockReader ,
2022-09-18 10:41:01 +00:00
ff * rpchelper . Filters , agg * libstate . Aggregator22 , err error ) {
2022-05-10 08:54:44 +00:00
if ! cfg . WithDatadir && cfg . PrivateApiAddr == "" {
2022-09-18 10:41:01 +00:00
return nil , nil , nil , nil , nil , nil , nil , ff , nil , fmt . Errorf ( "either remote db or local db must be specified" )
2021-04-26 05:37:48 +00:00
}
2021-11-14 04:08:52 +00:00
2020-09-18 10:23:35 +00:00
// Do not change the order of these checks. Chaindata needs to be checked first, because PrivateApiAddr has default value which is not ""
// If PrivateApiAddr is checked first, the Chaindata option will never work
2022-05-10 08:54:44 +00:00
if cfg . WithDatadir {
2022-10-10 11:40:21 +00:00
dir . MustExist ( cfg . Dirs . SnapHistory )
2021-07-28 02:47:38 +00:00
var rwKv kv . RwDB
2022-06-07 03:24:50 +00:00
log . Trace ( "Creating chain db" , "path" , cfg . Dirs . Chaindata )
2022-07-14 10:01:57 +00:00
limiter := semaphore . NewWeighted ( int64 ( cfg . DBReadConcurrency ) )
2022-06-07 03:24:50 +00:00
rwKv , err = kv2 . NewMDBX ( logger ) . RoTxsLimiter ( limiter ) . Path ( cfg . Dirs . Chaindata ) . Readonly ( ) . Open ( )
2021-06-16 10:57:58 +00:00
if err != nil {
2022-09-18 10:41:01 +00:00
return nil , nil , nil , nil , nil , nil , nil , ff , nil , err
2020-08-19 11:46:20 +00:00
}
2021-09-29 01:36:25 +00:00
if compatErr := checkDbCompatibility ( ctx , rwKv ) ; compatErr != nil {
2022-09-18 10:41:01 +00:00
return nil , nil , nil , nil , nil , nil , nil , ff , nil , compatErr
2021-04-24 15:46:29 +00:00
}
2021-07-28 02:47:38 +00:00
db = rwKv
2021-09-29 01:36:25 +00:00
stateCache = kvcache . NewDummy ( )
2022-02-07 21:30:46 +00:00
blockReader = snapshotsync . NewBlockReader ( )
// bor (consensus) specific db
var borKv kv . RoDB
2022-02-22 17:39:48 +00:00
borDbPath := filepath . Join ( cfg . DataDir , "bor" )
2022-02-09 02:28:06 +00:00
{
// ensure db exist
2022-02-09 02:31:12 +00:00
tmpDb , err := kv2 . NewMDBX ( logger ) . Path ( borDbPath ) . Label ( kv . ConsensusDB ) . Open ( )
2022-02-09 02:28:06 +00:00
if err != nil {
2022-09-18 10:41:01 +00:00
return nil , nil , nil , nil , nil , nil , nil , ff , nil , err
2022-02-09 02:28:06 +00:00
}
tmpDb . Close ( )
}
2022-02-07 21:30:46 +00:00
log . Trace ( "Creating consensus db" , "path" , borDbPath )
2022-02-09 02:31:12 +00:00
borKv , err = kv2 . NewMDBX ( logger ) . Path ( borDbPath ) . Label ( kv . ConsensusDB ) . Readonly ( ) . Open ( )
2022-02-07 21:30:46 +00:00
if err != nil {
2022-09-18 10:41:01 +00:00
return nil , nil , nil , nil , nil , nil , nil , ff , nil , err
2022-02-07 21:30:46 +00:00
}
// Skip the compatibility check, until we have a schema in erigon-lib
borDb = borKv
2021-09-30 16:40:58 +00:00
} else {
2022-11-06 11:22:12 +00:00
if cfg . StateCache . CacheSize > 0 {
stateCache = kvcache . New ( cfg . StateCache )
2021-10-12 09:27:47 +00:00
} else {
stateCache = kvcache . NewDummy ( )
}
2021-09-30 16:40:58 +00:00
log . Info ( "if you run RPCDaemon on same machine with Erigon add --datadir option" )
2021-02-12 16:47:32 +00:00
}
2022-04-25 12:57:54 +00:00
2022-03-19 16:15:12 +00:00
if db != nil {
var cc * params . ChainConfig
if err := db . View ( context . Background ( ) , func ( tx kv . Tx ) error {
genesisBlock , err := rawdb . ReadBlockByNumber ( tx , 0 )
if err != nil {
return err
}
2022-05-12 08:14:16 +00:00
if genesisBlock == nil {
return fmt . Errorf ( "genesis not found in DB. Likely Erigon was never started on this datadir" )
}
2022-03-19 16:15:12 +00:00
cc , err = rawdb . ReadChainConfig ( tx , genesisBlock . Hash ( ) )
if err != nil {
return err
}
2022-05-26 05:27:44 +00:00
cfg . Snap . Enabled , err = snap . Enabled ( tx )
2022-05-10 08:54:44 +00:00
if err != nil {
return err
}
2022-03-19 16:15:12 +00:00
return nil
} ) ; err != nil {
2022-09-18 10:41:01 +00:00
return nil , nil , nil , nil , nil , nil , nil , ff , nil , err
2022-03-17 01:28:17 +00:00
}
2022-03-19 16:15:12 +00:00
if cc == nil {
2022-09-18 10:41:01 +00:00
return nil , nil , nil , nil , nil , nil , nil , ff , nil , fmt . Errorf ( "chain config not found in db. Need start erigon at least once on this db" )
2022-03-17 01:28:17 +00:00
}
2022-06-05 06:33:55 +00:00
cfg . Snap . Enabled = cfg . Snap . Enabled || cfg . Sync . UseSnapshots
2022-03-17 01:28:17 +00:00
}
2021-09-29 01:36:25 +00:00
creds , err := grpcutil . TLS ( cfg . TLSCACert , cfg . TLSCertfile , cfg . TLSKeyFile )
if err != nil {
2022-09-18 10:41:01 +00:00
return nil , nil , nil , nil , nil , nil , nil , ff , nil , fmt . Errorf ( "open tls cert: %w" , err )
2021-09-29 01:36:25 +00:00
}
conn , err := grpcutil . Connect ( creds , cfg . PrivateApiAddr )
if err != nil {
2022-09-18 10:41:01 +00:00
return nil , nil , nil , nil , nil , nil , nil , ff , nil , fmt . Errorf ( "could not connect to execution service privateApi: %w" , err )
2021-09-29 01:36:25 +00:00
}
kvClient := remote . NewKVClient ( conn )
remoteKv , err := remotedb . NewRemote ( gointerfaces . VersionFromProto ( remotedbserver . KvServiceAPIVersion ) , logger , kvClient ) . Open ( )
if err != nil {
2022-09-18 10:41:01 +00:00
return nil , nil , nil , nil , nil , nil , nil , ff , nil , fmt . Errorf ( "could not connect to remoteKv: %w" , err )
2021-09-29 01:36:25 +00:00
}
subscribeToStateChangesLoop ( ctx , kvClient , stateCache )
2022-07-28 09:57:38 +00:00
onNewSnapshot := func ( ) { }
if cfg . WithDatadir {
if cfg . Snap . Enabled {
2022-10-02 07:48:26 +00:00
2022-07-28 09:57:38 +00:00
allSnapshots := snapshotsync . NewRoSnapshots ( cfg . Snap , cfg . Dirs . Snap )
2022-08-23 09:28:07 +00:00
// To povide good UX - immediatly can read snapshots after RPCDaemon start, even if Erigon is down
// Erigon does store list of snapshots in db: means RPCDaemon can read this list now, but read by `kvClient.Snapshots` after establish grpc connection
allSnapshots . OptimisticReopenWithDB ( db )
allSnapshots . LogStat ( )
2022-10-01 02:29:28 +00:00
2022-10-15 01:20:58 +00:00
if agg , err = libstate . NewAggregator22 ( cfg . Dirs . SnapHistory , cfg . Dirs . Tmp , ethconfig . HistoryV3AggregationStep , db ) ; err != nil {
2022-10-01 02:29:28 +00:00
return nil , nil , nil , nil , nil , nil , nil , ff , nil , fmt . Errorf ( "create aggregator: %w" , err )
}
2022-10-13 02:46:32 +00:00
_ = agg . ReopenFiles ( )
2022-10-01 02:29:28 +00:00
db . View ( context . Background ( ) , func ( tx kv . Tx ) error {
2022-10-05 10:54:54 +00:00
agg . LogStats ( tx , func ( endTxNumMinimax uint64 ) uint64 {
2022-10-01 02:29:28 +00:00
_ , histBlockNumProgress , _ := rawdb . TxNums . FindBlockNum ( tx , endTxNumMinimax )
return histBlockNumProgress
} )
return nil
} )
2022-07-28 09:57:38 +00:00
onNewSnapshot = func ( ) {
go func ( ) { // don't block events processing by network communication
reply , err := kvClient . Snapshots ( ctx , & remote . SnapshotsRequest { } , grpc . WaitForReady ( true ) )
if err != nil {
log . Warn ( "[Snapshots] reopen" , "err" , err )
return
}
2022-11-04 09:07:46 +00:00
if err := allSnapshots . ReopenList ( reply . Files , true ) ; err != nil {
2022-07-28 09:57:38 +00:00
log . Error ( "[Snapshots] reopen" , "err" , err )
} else {
allSnapshots . LogStat ( )
}
2022-10-01 02:29:28 +00:00
if err = agg . ReopenFiles ( ) ; err != nil {
log . Error ( "[Snapshots] reopen" , "err" , err )
} else {
db . View ( context . Background ( ) , func ( tx kv . Tx ) error {
2022-10-05 10:54:54 +00:00
agg . LogStats ( tx , func ( endTxNumMinimax uint64 ) uint64 {
2022-10-01 02:29:28 +00:00
_ , histBlockNumProgress , _ := rawdb . TxNums . FindBlockNum ( tx , endTxNumMinimax )
return histBlockNumProgress
} )
return nil
} )
}
2022-07-28 09:57:38 +00:00
} ( )
}
onNewSnapshot ( )
2022-08-25 05:24:01 +00:00
// TODO: how to don't block startup on remote RPCDaemon?
// txNums = exec22.TxNumsFromDB(allSnapshots, db)
2022-07-28 09:57:38 +00:00
blockReader = snapshotsync . NewBlockReaderWithSnapshots ( allSnapshots )
} else {
log . Info ( "Use --snapshots=false" )
}
}
2022-05-10 08:54:44 +00:00
if ! cfg . WithDatadir {
2021-11-21 03:32:14 +00:00
blockReader = snapshotsync . NewRemoteBlockReader ( remote . NewETHBACKENDClient ( conn ) )
}
2022-05-26 03:31:06 +00:00
remoteEth := rpcservices . NewRemoteBackend ( remote . NewETHBACKENDClient ( conn ) , db , blockReader )
2021-11-14 04:08:52 +00:00
blockReader = remoteEth
2021-09-29 01:36:25 +00:00
txpoolConn := conn
2021-11-22 18:38:51 +00:00
if cfg . TxPoolApiAddr != cfg . PrivateApiAddr {
2021-09-29 01:36:25 +00:00
txpoolConn , err = grpcutil . Connect ( creds , cfg . TxPoolApiAddr )
2020-09-18 10:23:35 +00:00
if err != nil {
2022-09-18 10:41:01 +00:00
return nil , nil , nil , nil , nil , nil , nil , ff , nil , fmt . Errorf ( "could not connect to txpool api: %w" , err )
2021-09-03 04:19:35 +00:00
}
2021-09-29 01:36:25 +00:00
}
2021-11-22 18:38:51 +00:00
2022-02-11 10:11:59 +00:00
mining = txpool . NewMiningClient ( txpoolConn )
2022-05-26 03:31:06 +00:00
miningService := rpcservices . NewMiningService ( mining )
2022-02-11 10:11:59 +00:00
txPool = txpool . NewTxpoolClient ( txpoolConn )
2022-05-26 03:31:06 +00:00
txPoolService := rpcservices . NewTxPoolService ( txPool )
2021-09-29 01:36:25 +00:00
if db == nil {
db = remoteKv
}
eth = remoteEth
go func ( ) {
if ! remoteKv . EnsureVersionCompatibility ( ) {
rootCancel ( )
2020-09-18 10:23:35 +00:00
}
2021-09-29 01:36:25 +00:00
if ! remoteEth . EnsureVersionCompatibility ( ) {
rootCancel ( )
2021-08-07 05:03:12 +00:00
}
2022-02-11 10:11:59 +00:00
if mining != nil && ! miningService . EnsureVersionCompatibility ( ) {
2021-09-29 01:36:25 +00:00
rootCancel ( )
2021-09-03 04:19:35 +00:00
}
2022-02-11 10:11:59 +00:00
if ! txPoolService . EnsureVersionCompatibility ( ) {
2021-09-29 01:36:25 +00:00
rootCancel ( )
2021-02-12 16:47:32 +00:00
}
2021-09-29 01:36:25 +00:00
} ( )
2022-01-20 15:34:00 +00:00
2022-06-10 15:18:43 +00:00
ff = rpchelper . New ( ctx , eth , txPool , mining , onNewSnapshot )
2022-09-18 10:41:01 +00:00
return db , borDb , eth , txPool , mining , stateCache , blockReader , ff , agg , err
2020-08-19 11:46:20 +00:00
}
2022-07-31 10:16:19 +00:00
func StartRpcServer ( ctx context . Context , cfg httpcfg . HttpCfg , rpcAPI [ ] rpc . API , authAPI [ ] rpc . API ) error {
2022-08-01 15:12:35 +00:00
if len ( authAPI ) > 0 {
engineInfo , err := startAuthenticatedRpcServer ( cfg , authAPI )
if err != nil {
return err
}
go stopAuthenticatedRpcServer ( ctx , engineInfo )
2022-07-31 10:16:19 +00:00
}
if cfg . Enabled {
return startRegularRpcServer ( ctx , cfg , rpcAPI )
}
2022-01-25 16:44:35 +00:00
2022-07-31 10:16:19 +00:00
return nil
}
func startRegularRpcServer ( ctx context . Context , cfg httpcfg . HttpCfg , rpcAPI [ ] rpc . API ) error {
2020-08-19 11:46:20 +00:00
// register apis and create handler stack
httpEndpoint := fmt . Sprintf ( "%s:%d" , cfg . HttpListenAddress , cfg . HttpPort )
2020-09-02 05:56:48 +00:00
2022-07-31 10:16:19 +00:00
log . Trace ( "TraceRequests = %t\n" , cfg . TraceRequests )
2022-07-06 10:44:06 +00:00
srv := rpc . NewServer ( cfg . RpcBatchConcurrency , cfg . TraceRequests , cfg . RpcStreamingDisable )
2020-11-10 09:08:42 +00:00
allowListForRPC , err := parseAllowListForRPC ( cfg . RpcAllowListFilePath )
if err != nil {
return err
}
srv . SetAllowList ( allowListForRPC )
2022-01-25 16:44:35 +00:00
var defaultAPIList [ ] rpc . API
for _ , api := range rpcAPI {
if api . Namespace != "engine" {
defaultAPIList = append ( defaultAPIList , api )
2022-03-02 13:58:46 +00:00
}
}
2022-01-25 16:44:35 +00:00
2022-03-02 13:58:46 +00:00
var apiFlags [ ] string
2022-01-25 16:44:35 +00:00
for _ , flag := range cfg . API {
if flag != "engine" {
apiFlags = append ( apiFlags , flag )
}
}
if err := node . RegisterApisFromWhitelist ( defaultAPIList , apiFlags , srv , false ) ; err != nil {
2020-08-20 03:52:27 +00:00
return fmt . Errorf ( "could not start register RPC apis: %w" , err )
2020-08-19 11:46:20 +00:00
}
2020-09-02 05:56:48 +00:00
2021-05-31 16:18:32 +00:00
httpHandler := node . NewHTTPHandlerStack ( srv , cfg . HttpCORSDomain , cfg . HttpVirtualHost , cfg . HttpCompression )
2020-09-02 05:56:48 +00:00
var wsHandler http . Handler
2020-09-01 16:00:47 +00:00
if cfg . WebsocketEnabled {
2022-03-23 16:12:19 +00:00
wsHandler = srv . WebsocketHandler ( [ ] string { "*" } , nil , cfg . WebsocketCompression )
2020-09-02 05:56:48 +00:00
}
2022-03-23 16:12:19 +00:00
apiHandler , err := createHandler ( cfg , defaultAPIList , httpHandler , wsHandler , nil )
2022-02-28 11:07:09 +00:00
if err != nil {
return err
}
2020-09-02 05:56:48 +00:00
2022-07-04 11:07:45 +00:00
listener , _ , err := node . StartHTTPEndpoint ( httpEndpoint , cfg . HTTPTimeouts , apiHandler )
2020-09-02 05:56:48 +00:00
if err != nil {
return fmt . Errorf ( "could not start RPC api: %w" , err )
2020-08-19 11:46:20 +00:00
}
allow rpcdaemon to bind to tcp (#6184)
this pr adds CLI flag to allow the rpcdaemon to bind to a TCP port.
this is very useful if one wants to maintain a remote connection with
the rpcdaemon without using websocket. This is useful because a lot of
issues come with the websocket protocol (compression, max size, etc).
TCP socket gets around these things (it is just raw json over tcp
stream)
the rpc package already supports this, it was just a matter of adding
the bind.
try `echo
'{"jsonrpc":"2.0","method":"eth_blockNumber","id":"1","params":[""]}' |
nc localhost 8548` as a basic test
to test. Subscriptions are also working (idk how to send keepalives with
netcat)
the default rpc.(*Client).Dial method does not support TCP. I have not
included that in this PR. The code for such is as follow
```
// DialTCP create a new TCP client that connects to the given endpoint.
//
// The context is used for the initial connection establishment. It does not
// affect subsequent interactions with the client.
func DialTCP(ctx context.Context, endpoint string) (*Client, error) {
parsed, err := url.Parse(endpoint)
if err != nil {
return nil, err
}
ans := make(chan *Client)
errc := make(chan error)
go func() {
client, err := newClient(ctx, func(ctx context.Context) (ServerCodec, error) {
conn, err := net.Dial("tcp", parsed.Host)
if err != nil {
return nil, err
}
return NewCodec(conn), nil
})
if err != nil {
errc <- err
return
}
ans <- client
}()
select {
case err := <-errc:
return nil, err
case a := <-ans:
return a, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}
// DialContext creates a new RPC client, just like Dial.
//
// The context is used to cancel or time out the initial connection establishment. It does
// not affect subsequent interactions with the client.
func DialContext(ctx context.Context, rawurl string) (*Client, error) {
u, err := url.Parse(rawurl)
if err != nil {
return nil, err
}
switch u.Scheme {
case "http", "https":
return DialHTTP(rawurl)
case "ws", "wss":
return DialWebsocket(ctx, rawurl, "")
case "tcp":
return DialTCP(ctx, rawurl)
case "stdio":
return DialStdIO(ctx)
case "":
return DialIPC(ctx, rawurl)
default:
return nil, fmt.Errorf("no known transport for URL scheme %q", u.Scheme)
}
}
```
let me know if you would like me to add this to the PR as well. the TCP
connection can then be established with `rpc.Dial("tcp://host:port")`
2022-12-03 07:22:47 +00:00
if cfg . TCPServerEnabled {
tcpEndpoint := fmt . Sprintf ( "%s:%d" , cfg . TCPListenAddress , cfg . TCPPort )
tcpListener , err := net . Listen ( "tcp" , tcpEndpoint )
if err != nil {
return fmt . Errorf ( "could not start TCP Listener: %w" , err )
}
go func ( ) {
defer tcpListener . Close ( )
err := srv . ServeListener ( tcpListener )
if err != nil {
log . Error ( "TCP Listener Fatal Error" , "err" , err )
}
} ( )
log . Info ( "TCP Endpoint opened" , "url" , tcpEndpoint )
}
2021-12-06 12:03:46 +00:00
info := [ ] interface { } { "url" , httpEndpoint , "ws" , cfg . WebsocketEnabled ,
"ws.compression" , cfg . WebsocketCompression , "grpc" , cfg . GRPCServerEnabled }
2022-01-25 16:44:35 +00:00
2021-12-06 12:03:46 +00:00
var (
healthServer * grpcHealth . Server
grpcServer * grpc . Server
grpcListener net . Listener
grpcEndpoint string
)
if cfg . GRPCServerEnabled {
grpcEndpoint = fmt . Sprintf ( "%s:%d" , cfg . GRPCListenAddress , cfg . GRPCPort )
if grpcListener , err = net . Listen ( "tcp" , grpcEndpoint ) ; err != nil {
return fmt . Errorf ( "could not start GRPC listener: %w" , err )
}
grpcServer = grpc . NewServer ( )
if cfg . GRPCHealthCheckEnabled {
healthServer = grpcHealth . NewServer ( )
grpc_health_v1 . RegisterHealthServer ( grpcServer , healthServer )
}
go grpcServer . Serve ( grpcListener )
info = append ( info , "grpc.port" , cfg . GRPCPort )
}
2020-09-02 05:56:48 +00:00
2021-12-06 12:03:46 +00:00
log . Info ( "HTTP endpoint opened" , info ... )
2020-08-19 11:46:20 +00:00
defer func ( ) {
2020-10-10 06:06:54 +00:00
srv . Stop ( )
shutdownCtx , cancel := context . WithTimeout ( context . Background ( ) , 5 * time . Second )
defer cancel ( )
_ = listener . Shutdown ( shutdownCtx )
2020-08-19 11:46:20 +00:00
log . Info ( "HTTP endpoint closed" , "url" , httpEndpoint )
2021-12-06 12:03:46 +00:00
if cfg . GRPCServerEnabled {
if cfg . GRPCHealthCheckEnabled {
healthServer . Shutdown ( )
}
grpcServer . GracefulStop ( )
_ = grpcListener . Close ( )
log . Info ( "GRPC endpoint closed" , "url" , grpcEndpoint )
}
2020-08-19 11:46:20 +00:00
} ( )
2020-10-10 06:06:54 +00:00
<- ctx . Done ( )
log . Info ( "Exiting..." )
2020-08-20 03:52:27 +00:00
return nil
2020-08-19 11:46:20 +00:00
}
2022-01-25 16:44:35 +00:00
2022-08-01 14:54:04 +00:00
type engineInfo struct {
Srv * rpc . Server
EngineSrv * rpc . Server
EngineListener * http . Server
EngineHttpEndpoint string
}
func startAuthenticatedRpcServer ( cfg httpcfg . HttpCfg , rpcAPI [ ] rpc . API ) ( * engineInfo , error ) {
2022-07-31 10:16:19 +00:00
log . Trace ( "TraceRequests = %t\n" , cfg . TraceRequests )
srv := rpc . NewServer ( cfg . RpcBatchConcurrency , cfg . TraceRequests , cfg . RpcStreamingDisable )
2022-08-02 06:15:01 +00:00
engineListener , engineSrv , engineHttpEndpoint , err := createEngineListener ( cfg , rpcAPI )
if err != nil {
return nil , fmt . Errorf ( "could not start RPC api for engine: %w" , err )
2022-07-31 10:16:19 +00:00
}
2022-08-01 14:54:04 +00:00
return & engineInfo { Srv : srv , EngineSrv : engineSrv , EngineListener : engineListener , EngineHttpEndpoint : engineHttpEndpoint } , nil
}
2022-07-31 10:16:19 +00:00
2022-08-01 14:54:04 +00:00
func stopAuthenticatedRpcServer ( ctx context . Context , engineInfo * engineInfo ) {
2022-07-31 10:16:19 +00:00
defer func ( ) {
2022-08-01 14:54:04 +00:00
engineInfo . Srv . Stop ( )
if engineInfo . EngineSrv != nil {
engineInfo . EngineSrv . Stop ( )
2022-07-31 10:16:19 +00:00
}
shutdownCtx , cancel := context . WithTimeout ( context . Background ( ) , 5 * time . Second )
defer cancel ( )
2022-08-01 14:54:04 +00:00
if engineInfo . EngineListener != nil {
_ = engineInfo . EngineListener . Shutdown ( shutdownCtx )
log . Info ( "Engine HTTP endpoint close" , "url" , engineInfo . EngineHttpEndpoint )
2022-07-31 10:16:19 +00:00
}
} ( )
<- ctx . Done ( )
2022-08-01 14:54:04 +00:00
log . Info ( "Exiting Engine..." )
2022-07-31 10:16:19 +00:00
}
2022-02-12 11:40:19 +00:00
// isWebsocket checks the header of a http request for a websocket upgrade request.
func isWebsocket ( r * http . Request ) bool {
2022-08-30 07:37:14 +00:00
return strings . EqualFold ( r . Header . Get ( "Upgrade" ) , "websocket" ) &&
2022-02-12 11:40:19 +00:00
strings . Contains ( strings . ToLower ( r . Header . Get ( "Connection" ) ) , "upgrade" )
}
2022-03-14 13:47:26 +00:00
// obtainJWTSecret loads the jwt-secret, either from the provided config,
// or from the default location. If neither of those are present, it generates
// a new secret and stores to the default location.
func obtainJWTSecret ( cfg httpcfg . HttpCfg ) ( [ ] byte , error ) {
// try reading from file
2022-03-21 01:33:10 +00:00
log . Info ( "Reading JWT secret" , "path" , cfg . JWTSecretPath )
2022-04-07 01:53:34 +00:00
// If we run the rpcdaemon and datadir is not specified we just use jwt.hex in current directory.
if len ( cfg . JWTSecretPath ) == 0 {
cfg . JWTSecretPath = "jwt.hex"
}
2022-03-21 01:33:10 +00:00
if data , err := os . ReadFile ( cfg . JWTSecretPath ) ; err == nil {
2022-03-14 13:47:26 +00:00
jwtSecret := common . FromHex ( strings . TrimSpace ( string ( data ) ) )
if len ( jwtSecret ) == 32 {
return jwtSecret , nil
2022-02-28 11:07:09 +00:00
}
2022-03-21 01:33:10 +00:00
log . Error ( "Invalid JWT secret" , "path" , cfg . JWTSecretPath , "length" , len ( jwtSecret ) )
2022-03-14 13:47:26 +00:00
return nil , errors . New ( "invalid JWT secret" )
}
// Need to generate one
jwtSecret := make ( [ ] byte , 32 )
rand . Read ( jwtSecret )
2022-03-21 01:33:10 +00:00
if err := os . WriteFile ( cfg . JWTSecretPath , [ ] byte ( hexutil . Encode ( jwtSecret ) ) , 0600 ) ; err != nil {
2022-03-14 13:47:26 +00:00
return nil , err
}
2022-03-21 01:33:10 +00:00
log . Info ( "Generated JWT secret" , "path" , cfg . JWTSecretPath )
2022-03-14 13:47:26 +00:00
return jwtSecret , nil
}
2022-03-23 16:12:19 +00:00
func createHandler ( cfg httpcfg . HttpCfg , apiList [ ] rpc . API , httpHandler http . Handler , wsHandler http . Handler , jwtSecret [ ] byte ) ( http . Handler , error ) {
2022-01-25 16:44:35 +00:00
var handler http . Handler = http . HandlerFunc ( func ( w http . ResponseWriter , r * http . Request ) {
// adding a healthcheck here
if health . ProcessHealthcheckIfNeeded ( w , r , apiList ) {
return
}
2022-02-12 11:40:19 +00:00
if cfg . WebsocketEnabled && wsHandler != nil && isWebsocket ( r ) {
2022-01-25 16:44:35 +00:00
wsHandler . ServeHTTP ( w , r )
return
}
2022-02-28 11:07:09 +00:00
2022-03-23 16:12:19 +00:00
if jwtSecret != nil && ! rpc . CheckJwtSecret ( w , r , jwtSecret ) {
return
2022-02-28 11:07:09 +00:00
}
2022-01-25 16:44:35 +00:00
httpHandler . ServeHTTP ( w , r )
} )
2022-02-28 11:07:09 +00:00
return handler , nil
2022-01-25 16:44:35 +00:00
}
2022-05-10 11:04:52 +00:00
func createEngineListener ( cfg httpcfg . HttpCfg , engineApi [ ] rpc . API ) ( * http . Server , * rpc . Server , string , error ) {
2022-08-02 06:15:01 +00:00
engineHttpEndpoint := fmt . Sprintf ( "%s:%d" , cfg . AuthRpcHTTPListenAddress , cfg . AuthRpcPort )
2022-01-25 16:44:35 +00:00
2022-07-06 10:44:06 +00:00
engineSrv := rpc . NewServer ( cfg . RpcBatchConcurrency , cfg . TraceRequests , true )
2022-01-25 16:44:35 +00:00
2022-03-02 13:58:46 +00:00
if err := node . RegisterApisFromWhitelist ( engineApi , nil , engineSrv , true ) ; err != nil {
2022-05-10 11:04:52 +00:00
return nil , nil , "" , fmt . Errorf ( "could not start register RPC engine api: %w" , err )
2022-01-25 16:44:35 +00:00
}
2022-03-23 16:12:19 +00:00
jwtSecret , err := obtainJWTSecret ( cfg )
if err != nil {
2022-05-10 11:04:52 +00:00
return nil , nil , "" , err
2022-03-23 16:12:19 +00:00
}
2022-08-02 06:15:01 +00:00
wsHandler := engineSrv . WebsocketHandler ( [ ] string { "*" } , jwtSecret , cfg . WebsocketCompression )
2022-03-23 16:12:19 +00:00
2022-08-02 06:15:01 +00:00
engineHttpHandler := node . NewHTTPHandlerStack ( engineSrv , nil /* authCors */ , cfg . AuthRpcVirtualHost , cfg . HttpCompression )
2022-02-28 11:07:09 +00:00
2022-05-10 11:04:52 +00:00
engineApiHandler , err := createHandler ( cfg , engineApi , engineHttpHandler , wsHandler , jwtSecret )
2022-02-28 11:07:09 +00:00
if err != nil {
2022-05-10 11:04:52 +00:00
return nil , nil , "" , err
2022-02-28 11:07:09 +00:00
}
2022-01-25 16:44:35 +00:00
2022-08-02 06:15:01 +00:00
engineListener , _ , err := node . StartHTTPEndpoint ( engineHttpEndpoint , cfg . AuthRpcTimeouts , engineApiHandler )
2022-01-25 16:44:35 +00:00
if err != nil {
2022-05-10 11:04:52 +00:00
return nil , nil , "" , fmt . Errorf ( "could not start RPC api: %w" , err )
2022-02-28 11:07:09 +00:00
}
2022-08-02 11:24:25 +00:00
engineInfo := [ ] interface { } { "url" , engineHttpEndpoint , "ws" , true , "ws.compression" , cfg . WebsocketCompression }
2022-05-10 11:04:52 +00:00
log . Info ( "HTTP endpoint opened for Engine API" , engineInfo ... )
2022-01-25 16:44:35 +00:00
2022-05-10 11:04:52 +00:00
return engineListener , engineSrv , engineHttpEndpoint , nil
2022-01-25 16:44:35 +00:00
}