mirror of
https://gitlab.com/pulsechaincom/go-pulse.git
synced 2025-01-03 01:07:39 +00:00
ae42148093
rpc: fix connection tracking in Server When upgrading to mapset/v2 with generics, the set element type used in rpc.Server had to be changed to *ServerCodec because ServerCodec is not 'comparable'. While the distinction is technically correct, we know all possible ServerCodec types, and all of them are comparable. So just use a map instead.
206 lines
5.9 KiB
Go
206 lines
5.9 KiB
Go
// Copyright 2015 The go-ethereum Authors
|
|
// This file is part of the go-ethereum library.
|
|
//
|
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Lesser General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Lesser General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Lesser General Public License
|
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package rpc
|
|
|
|
import (
|
|
"context"
|
|
"io"
|
|
"sync"
|
|
"sync/atomic"
|
|
|
|
"github.com/ethereum/go-ethereum/log"
|
|
)
|
|
|
|
const MetadataApi = "rpc"
|
|
const EngineApi = "engine"
|
|
|
|
// CodecOption specifies which type of messages a codec supports.
|
|
//
|
|
// Deprecated: this option is no longer honored by Server.
|
|
type CodecOption int
|
|
|
|
const (
|
|
// OptionMethodInvocation is an indication that the codec supports RPC method calls
|
|
OptionMethodInvocation CodecOption = 1 << iota
|
|
|
|
// OptionSubscriptions is an indication that the codec supports RPC notifications
|
|
OptionSubscriptions = 1 << iota // support pub sub
|
|
)
|
|
|
|
// Server is an RPC server.
|
|
type Server struct {
|
|
services serviceRegistry
|
|
idgen func() ID
|
|
|
|
mutex sync.Mutex
|
|
codecs map[ServerCodec]struct{}
|
|
run int32
|
|
}
|
|
|
|
// NewServer creates a new server instance with no registered handlers.
|
|
func NewServer() *Server {
|
|
server := &Server{
|
|
idgen: randomIDGenerator(),
|
|
codecs: make(map[ServerCodec]struct{}),
|
|
run: 1,
|
|
}
|
|
// Register the default service providing meta information about the RPC service such
|
|
// as the services and methods it offers.
|
|
rpcService := &RPCService{server}
|
|
server.RegisterName(MetadataApi, rpcService)
|
|
return server
|
|
}
|
|
|
|
// RegisterName creates a service for the given receiver type under the given name. When no
|
|
// methods on the given receiver match the criteria to be either a RPC method or a
|
|
// subscription an error is returned. Otherwise a new service is created and added to the
|
|
// service collection this server provides to clients.
|
|
func (s *Server) RegisterName(name string, receiver interface{}) error {
|
|
return s.services.registerName(name, receiver)
|
|
}
|
|
|
|
// ServeCodec reads incoming requests from codec, calls the appropriate callback and writes
|
|
// the response back using the given codec. It will block until the codec is closed or the
|
|
// server is stopped. In either case the codec is closed.
|
|
//
|
|
// Note that codec options are no longer supported.
|
|
func (s *Server) ServeCodec(codec ServerCodec, options CodecOption) {
|
|
defer codec.close()
|
|
|
|
if !s.trackCodec(codec) {
|
|
return
|
|
}
|
|
defer s.untrackCodec(codec)
|
|
|
|
c := initClient(codec, s.idgen, &s.services)
|
|
<-codec.closed()
|
|
c.Close()
|
|
}
|
|
|
|
func (s *Server) trackCodec(codec ServerCodec) bool {
|
|
s.mutex.Lock()
|
|
defer s.mutex.Unlock()
|
|
|
|
if atomic.LoadInt32(&s.run) == 0 {
|
|
return false // Don't serve if server is stopped.
|
|
}
|
|
s.codecs[codec] = struct{}{}
|
|
return true
|
|
}
|
|
|
|
func (s *Server) untrackCodec(codec ServerCodec) {
|
|
s.mutex.Lock()
|
|
defer s.mutex.Unlock()
|
|
|
|
delete(s.codecs, codec)
|
|
}
|
|
|
|
// serveSingleRequest reads and processes a single RPC request from the given codec. This
|
|
// is used to serve HTTP connections. Subscriptions and reverse calls are not allowed in
|
|
// this mode.
|
|
func (s *Server) serveSingleRequest(ctx context.Context, codec ServerCodec) {
|
|
// Don't serve if server is stopped.
|
|
if atomic.LoadInt32(&s.run) == 0 {
|
|
return
|
|
}
|
|
|
|
h := newHandler(ctx, codec, s.idgen, &s.services)
|
|
h.allowSubscribe = false
|
|
defer h.close(io.EOF, nil)
|
|
|
|
reqs, batch, err := codec.readBatch()
|
|
if err != nil {
|
|
if err != io.EOF {
|
|
codec.writeJSON(ctx, errorMessage(&invalidMessageError{"parse error"}))
|
|
}
|
|
return
|
|
}
|
|
if batch {
|
|
h.handleBatch(reqs)
|
|
} else {
|
|
h.handleMsg(reqs[0])
|
|
}
|
|
}
|
|
|
|
// Stop stops reading new requests, waits for stopPendingRequestTimeout to allow pending
|
|
// requests to finish, then closes all codecs which will cancel pending requests and
|
|
// subscriptions.
|
|
func (s *Server) Stop() {
|
|
s.mutex.Lock()
|
|
defer s.mutex.Unlock()
|
|
|
|
if atomic.CompareAndSwapInt32(&s.run, 1, 0) {
|
|
log.Debug("RPC server shutting down")
|
|
for codec := range s.codecs {
|
|
codec.close()
|
|
}
|
|
}
|
|
}
|
|
|
|
// RPCService gives meta information about the server.
|
|
// e.g. gives information about the loaded modules.
|
|
type RPCService struct {
|
|
server *Server
|
|
}
|
|
|
|
// Modules returns the list of RPC services with their version number
|
|
func (s *RPCService) Modules() map[string]string {
|
|
s.server.services.mu.Lock()
|
|
defer s.server.services.mu.Unlock()
|
|
|
|
modules := make(map[string]string)
|
|
for name := range s.server.services.services {
|
|
modules[name] = "1.0"
|
|
}
|
|
return modules
|
|
}
|
|
|
|
// PeerInfo contains information about the remote end of the network connection.
|
|
//
|
|
// This is available within RPC method handlers through the context. Call
|
|
// PeerInfoFromContext to get information about the client connection related to
|
|
// the current method call.
|
|
type PeerInfo struct {
|
|
// Transport is name of the protocol used by the client.
|
|
// This can be "http", "ws" or "ipc".
|
|
Transport string
|
|
|
|
// Address of client. This will usually contain the IP address and port.
|
|
RemoteAddr string
|
|
|
|
// Additional information for HTTP and WebSocket connections.
|
|
HTTP struct {
|
|
// Protocol version, i.e. "HTTP/1.1". This is not set for WebSocket.
|
|
Version string
|
|
// Header values sent by the client.
|
|
UserAgent string
|
|
Origin string
|
|
Host string
|
|
}
|
|
}
|
|
|
|
type peerInfoContextKey struct{}
|
|
|
|
// PeerInfoFromContext returns information about the client's network connection.
|
|
// Use this with the context passed to RPC method handler functions.
|
|
//
|
|
// The zero value is returned if no connection info is present in ctx.
|
|
func PeerInfoFromContext(ctx context.Context) PeerInfo {
|
|
info, _ := ctx.Value(peerInfoContextKey{}).(PeerInfo)
|
|
return info
|
|
}
|