mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2025-01-05 10:32:19 +00:00
d6f144ceb0
* rpc: remove 'exported or builtin' restriction for parameters There is no technial reason for this restriction because package reflect can create values of any type. Requiring parameters and return values to be exported causes a lot of noise in package exports. * rpc: fix staticcheck warnings
328 lines
9.2 KiB
Go
328 lines
9.2 KiB
Go
// Copyright 2016 The go-ethereum Authors
|
|
// This file is part of the go-ethereum library.
|
|
//
|
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Lesser General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Lesser General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Lesser General Public License
|
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package rpc
|
|
|
|
import (
|
|
"bufio"
|
|
"container/list"
|
|
"context"
|
|
crand "crypto/rand"
|
|
"encoding/binary"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"errors"
|
|
"math/rand"
|
|
"reflect"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
var (
|
|
// ErrNotificationsUnsupported is returned when the connection doesn't support notifications
|
|
ErrNotificationsUnsupported = errors.New("notifications not supported")
|
|
// ErrNotificationNotFound is returned when the notification for the given id is not found
|
|
ErrSubscriptionNotFound = errors.New("subscription not found")
|
|
)
|
|
|
|
var globalGen = randomIDGenerator()
|
|
|
|
// ID defines a pseudo random number that is used to identify RPC subscriptions.
|
|
type ID string
|
|
|
|
// NewID returns a new, random ID.
|
|
func NewID() ID {
|
|
return globalGen()
|
|
}
|
|
|
|
// randomIDGenerator returns a function generates a random IDs.
|
|
func randomIDGenerator() func() ID {
|
|
seed, err := binary.ReadVarint(bufio.NewReader(crand.Reader))
|
|
if err != nil {
|
|
seed = int64(time.Now().Nanosecond())
|
|
}
|
|
var (
|
|
mu sync.Mutex
|
|
rng = rand.New(rand.NewSource(seed))
|
|
)
|
|
return func() ID {
|
|
mu.Lock()
|
|
defer mu.Unlock()
|
|
id := make([]byte, 16)
|
|
rng.Read(id)
|
|
return encodeID(id)
|
|
}
|
|
}
|
|
|
|
func encodeID(b []byte) ID {
|
|
id := hex.EncodeToString(b)
|
|
id = strings.TrimLeft(id, "0")
|
|
if id == "" {
|
|
id = "0" // ID's are RPC quantities, no leading zero's and 0 is 0x0.
|
|
}
|
|
return ID("0x" + id)
|
|
}
|
|
|
|
type notifierKey struct{}
|
|
|
|
// NotifierFromContext returns the Notifier value stored in ctx, if any.
|
|
func NotifierFromContext(ctx context.Context) (*Notifier, bool) {
|
|
n, ok := ctx.Value(notifierKey{}).(*Notifier)
|
|
return n, ok
|
|
}
|
|
|
|
// Notifier is tied to a RPC connection that supports subscriptions.
|
|
// Server callbacks use the notifier to send notifications.
|
|
type Notifier struct {
|
|
h *handler
|
|
namespace string
|
|
|
|
mu sync.Mutex
|
|
sub *Subscription
|
|
buffer []json.RawMessage
|
|
callReturned bool
|
|
activated bool
|
|
}
|
|
|
|
// CreateSubscription returns a new subscription that is coupled to the
|
|
// RPC connection. By default subscriptions are inactive and notifications
|
|
// are dropped until the subscription is marked as active. This is done
|
|
// by the RPC server after the subscription ID is send to the client.
|
|
func (n *Notifier) CreateSubscription() *Subscription {
|
|
n.mu.Lock()
|
|
defer n.mu.Unlock()
|
|
|
|
if n.sub != nil {
|
|
panic("can't create multiple subscriptions with Notifier")
|
|
} else if n.callReturned {
|
|
panic("can't create subscription after subscribe call has returned")
|
|
}
|
|
n.sub = &Subscription{ID: n.h.idgen(), namespace: n.namespace, err: make(chan error, 1)}
|
|
return n.sub
|
|
}
|
|
|
|
// Notify sends a notification to the client with the given data as payload.
|
|
// If an error occurs the RPC connection is closed and the error is returned.
|
|
func (n *Notifier) Notify(id ID, data interface{}) error {
|
|
enc, err := json.Marshal(data)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
n.mu.Lock()
|
|
defer n.mu.Unlock()
|
|
|
|
if n.sub == nil {
|
|
panic("can't Notify before subscription is created")
|
|
} else if n.sub.ID != id {
|
|
panic("Notify with wrong ID")
|
|
}
|
|
if n.activated {
|
|
return n.send(n.sub, enc)
|
|
}
|
|
n.buffer = append(n.buffer, enc)
|
|
return nil
|
|
}
|
|
|
|
// Closed returns a channel that is closed when the RPC connection is closed.
|
|
// Deprecated: use subscription error channel
|
|
func (n *Notifier) Closed() <-chan interface{} {
|
|
return n.h.conn.closed()
|
|
}
|
|
|
|
// takeSubscription returns the subscription (if one has been created). No subscription can
|
|
// be created after this call.
|
|
func (n *Notifier) takeSubscription() *Subscription {
|
|
n.mu.Lock()
|
|
defer n.mu.Unlock()
|
|
n.callReturned = true
|
|
return n.sub
|
|
}
|
|
|
|
// acticate is called after the subscription ID was sent to client. Notifications are
|
|
// buffered before activation. This prevents notifications being sent to the client before
|
|
// the subscription ID is sent to the client.
|
|
func (n *Notifier) activate() error {
|
|
n.mu.Lock()
|
|
defer n.mu.Unlock()
|
|
|
|
for _, data := range n.buffer {
|
|
if err := n.send(n.sub, data); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
n.activated = true
|
|
return nil
|
|
}
|
|
|
|
func (n *Notifier) send(sub *Subscription, data json.RawMessage) error {
|
|
params, _ := json.Marshal(&subscriptionResult{ID: string(sub.ID), Result: data})
|
|
ctx := context.Background()
|
|
return n.h.conn.writeJSON(ctx, &jsonrpcMessage{
|
|
Version: vsn,
|
|
Method: n.namespace + notificationMethodSuffix,
|
|
Params: params,
|
|
})
|
|
}
|
|
|
|
// A Subscription is created by a notifier and tight to that notifier. The client can use
|
|
// this subscription to wait for an unsubscribe request for the client, see Err().
|
|
type Subscription struct {
|
|
ID ID
|
|
namespace string
|
|
err chan error // closed on unsubscribe
|
|
}
|
|
|
|
// Err returns a channel that is closed when the client send an unsubscribe request.
|
|
func (s *Subscription) Err() <-chan error {
|
|
return s.err
|
|
}
|
|
|
|
// MarshalJSON marshals a subscription as its ID.
|
|
func (s *Subscription) MarshalJSON() ([]byte, error) {
|
|
return json.Marshal(s.ID)
|
|
}
|
|
|
|
// ClientSubscription is a subscription established through the Client's Subscribe or
|
|
// EthSubscribe methods.
|
|
type ClientSubscription struct {
|
|
client *Client
|
|
etype reflect.Type
|
|
channel reflect.Value
|
|
namespace string
|
|
subid string
|
|
in chan json.RawMessage
|
|
|
|
quitOnce sync.Once // ensures quit is closed once
|
|
quit chan struct{} // quit is closed when the subscription exits
|
|
errOnce sync.Once // ensures err is closed once
|
|
err chan error
|
|
}
|
|
|
|
func newClientSubscription(c *Client, namespace string, channel reflect.Value) *ClientSubscription {
|
|
sub := &ClientSubscription{
|
|
client: c,
|
|
namespace: namespace,
|
|
etype: channel.Type().Elem(),
|
|
channel: channel,
|
|
quit: make(chan struct{}),
|
|
err: make(chan error, 1),
|
|
in: make(chan json.RawMessage),
|
|
}
|
|
return sub
|
|
}
|
|
|
|
// Err returns the subscription error channel. The intended use of Err is to schedule
|
|
// resubscription when the client connection is closed unexpectedly.
|
|
//
|
|
// The error channel receives a value when the subscription has ended due
|
|
// to an error. The received error is nil if Close has been called
|
|
// on the underlying client and no other error has occurred.
|
|
//
|
|
// The error channel is closed when Unsubscribe is called on the subscription.
|
|
func (sub *ClientSubscription) Err() <-chan error {
|
|
return sub.err
|
|
}
|
|
|
|
// Unsubscribe unsubscribes the notification and closes the error channel.
|
|
// It can safely be called more than once.
|
|
func (sub *ClientSubscription) Unsubscribe() {
|
|
sub.quitWithError(true, nil)
|
|
sub.errOnce.Do(func() { close(sub.err) })
|
|
}
|
|
|
|
func (sub *ClientSubscription) quitWithError(unsubscribeServer bool, err error) {
|
|
sub.quitOnce.Do(func() {
|
|
// The dispatch loop won't be able to execute the unsubscribe call
|
|
// if it is blocked on deliver. Close sub.quit first because it
|
|
// unblocks deliver.
|
|
close(sub.quit)
|
|
if unsubscribeServer {
|
|
sub.requestUnsubscribe()
|
|
}
|
|
if err != nil {
|
|
if err == ErrClientQuit {
|
|
err = nil // Adhere to subscription semantics.
|
|
}
|
|
sub.err <- err
|
|
}
|
|
})
|
|
}
|
|
|
|
func (sub *ClientSubscription) deliver(result json.RawMessage) (ok bool) {
|
|
select {
|
|
case sub.in <- result:
|
|
return true
|
|
case <-sub.quit:
|
|
return false
|
|
}
|
|
}
|
|
|
|
func (sub *ClientSubscription) start() {
|
|
sub.quitWithError(sub.forward())
|
|
}
|
|
|
|
func (sub *ClientSubscription) forward() (unsubscribeServer bool, err error) {
|
|
cases := []reflect.SelectCase{
|
|
{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.quit)},
|
|
{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.in)},
|
|
{Dir: reflect.SelectSend, Chan: sub.channel},
|
|
}
|
|
buffer := list.New()
|
|
defer buffer.Init()
|
|
for {
|
|
var chosen int
|
|
var recv reflect.Value
|
|
if buffer.Len() == 0 {
|
|
// Idle, omit send case.
|
|
chosen, recv, _ = reflect.Select(cases[:2])
|
|
} else {
|
|
// Non-empty buffer, send the first queued item.
|
|
cases[2].Send = reflect.ValueOf(buffer.Front().Value)
|
|
chosen, recv, _ = reflect.Select(cases)
|
|
}
|
|
|
|
switch chosen {
|
|
case 0: // <-sub.quit
|
|
return false, nil
|
|
case 1: // <-sub.in
|
|
val, err := sub.unmarshal(recv.Interface().(json.RawMessage))
|
|
if err != nil {
|
|
return true, err
|
|
}
|
|
if buffer.Len() == maxClientSubscriptionBuffer {
|
|
return true, ErrSubscriptionQueueOverflow
|
|
}
|
|
buffer.PushBack(val)
|
|
case 2: // sub.channel<-
|
|
cases[2].Send = reflect.Value{} // Don't hold onto the value.
|
|
buffer.Remove(buffer.Front())
|
|
}
|
|
}
|
|
}
|
|
|
|
func (sub *ClientSubscription) unmarshal(result json.RawMessage) (interface{}, error) {
|
|
val := reflect.New(sub.etype)
|
|
err := json.Unmarshal(result, val.Interface())
|
|
return val.Elem().Interface(), err
|
|
}
|
|
|
|
func (sub *ClientSubscription) requestUnsubscribe() error {
|
|
var result interface{}
|
|
return sub.client.Call(&result, sub.namespace+unsubscribeMethodSuffix, sub.subid)
|
|
}
|