erigon-pulse/cmd/devnet/services/subscriptions.go
Mark Holt 529d359ca6
Bor span testing (#7897)
An update to the devnet to introduce a local heimdall to facilitate
multiple validators without the need for an external process, and hence
validator registration/staking etc.

In this initial release only span generation is supported.  

It has the following changes:

* Introduction of a local grpc heimdall interface
* Allocation of accounts via a devnet account generator ()
* Introduction on 'Services' for the network config

"--chain bor-devnet --bor.localheimdall" will run a 2 validator network
with a local service
"--chain bor-devnet --bor.withoutheimdall" will sun a single validator
with no heimdall service as before

---------

Co-authored-by: Alex Sharp <alexsharp@Alexs-MacBook-Pro-2.local>
2023-07-18 09:47:04 +01:00

118 lines
3.1 KiB
Go

package services
import (
"context"
"fmt"
"github.com/ledgerwatch/erigon/cmd/devnet/devnet"
"github.com/ledgerwatch/erigon/cmd/devnet/devnetutils"
"github.com/ledgerwatch/erigon/cmd/devnet/requests"
"github.com/ledgerwatch/erigon/cmd/devnet/scenarios"
"github.com/ledgerwatch/erigon/rpc"
"github.com/ledgerwatch/log/v3"
)
func init() {
scenarios.RegisterStepHandlers(
scenarios.StepHandler(InitSubscriptions),
)
}
var (
Subscriptions *map[requests.SubMethod]*Subscription
)
// Subscription houses the client subscription, name and channel for its delivery
type Subscription struct {
Client *rpc.Client
ClientSub *rpc.ClientSubscription
Name requests.SubMethod
SubChan chan interface{}
}
// NewSubscription returns a new Subscription instance
func NewSubscription(name requests.SubMethod) *Subscription {
return &Subscription{
Name: name,
SubChan: make(chan interface{}),
}
}
func InitSubscriptions(ctx context.Context, methods []requests.SubMethod) {
logger := devnet.Logger(ctx)
logger.Trace("CONNECTING TO WEBSOCKETS AND SUBSCRIBING TO METHODS...")
if err := subscribeAll(methods, logger); err != nil {
logger.Error("failed to subscribe to all methods", "error", err)
return
}
}
// subscribe connects to a websocket client and returns the subscription handler and a channel buffer
func subscribe(client *rpc.Client, method requests.SubMethod, args ...interface{}) (*Subscription, error) {
methodSub := NewSubscription(method)
namespace, subMethod, err := devnetutils.NamespaceAndSubMethodFromMethod(string(method))
if err != nil {
return nil, fmt.Errorf("cannot get namespace and submethod from method: %v", err)
}
arr := append([]interface{}{subMethod}, args...)
sub, err := client.Subscribe(context.Background(), namespace, methodSub.SubChan, arr...)
if err != nil {
return nil, fmt.Errorf("client failed to subscribe: %v", err)
}
methodSub.ClientSub = sub
return methodSub, nil
}
func subscribeToMethod(method requests.SubMethod, logger log.Logger) (*Subscription, error) {
client, err := rpc.DialWebsocket(context.Background(), "ws://localhost:8545", "", logger)
if err != nil {
return nil, fmt.Errorf("failed to dial websocket: %v", err)
}
sub, err := subscribe(client, method)
if err != nil {
return nil, fmt.Errorf("error subscribing to method: %v", err)
}
sub.Client = client
return sub, nil
}
// UnsubscribeAll closes all the client subscriptions and empties their global subscription channel
func UnsubscribeAll() {
if Subscriptions == nil {
return
}
for _, methodSub := range *Subscriptions {
if methodSub != nil {
methodSub.ClientSub.Unsubscribe()
for len(methodSub.SubChan) > 0 {
<-methodSub.SubChan
}
methodSub.SubChan = nil // avoid memory leak
}
}
}
// subscribeAll subscribes to the range of methods provided
func subscribeAll(methods []requests.SubMethod, logger log.Logger) error {
m := make(map[requests.SubMethod]*Subscription)
Subscriptions = &m
for _, method := range methods {
sub, err := subscribeToMethod(method, logger)
if err != nil {
return err
}
(*Subscriptions)[method] = sub
}
return nil
}