erigon-pulse/cmd/lightclient/sentinel/service/start.go
Mike Neuder 4b5880e8d3
Adding VoluntaryExit, ProposerSlashing, and AttesterSlashing topics to pubsub service (#5851)
This is a follow on PR in support of
https://github.com/ledgerwatch/erigon/issues/5824. See
https://github.com/ledgerwatch/erigon/pull/5841 for the
`beacon_aggregate_and_proof` topic.

This PR adds support for the `voluntary_exit, proposer_slashing, and
attester_slashing` topics as defined in the phase 0 spec
https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md#global-topics.

The CL types were already defined so this just adds the listening to the
pub sub service.
2022-10-24 17:52:49 +02:00

89 lines
2.4 KiB
Go

package service
import (
"context"
"fmt"
"net"
"time"
"github.com/ledgerwatch/erigon/cmd/lightclient/rpc/lightrpc"
"github.com/ledgerwatch/erigon/cmd/lightclient/sentinel"
"github.com/ledgerwatch/log/v3"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
type ServerConfig struct {
Network string
Addr string
}
func StartSentinelService(cfg *sentinel.SentinelConfig, srvCfg *ServerConfig) (lightrpc.SentinelClient, error) {
ctx := context.Background()
sent, err := sentinel.New(context.Background(), cfg)
if err != nil {
return nil, err
}
if err := sent.Start(); err != nil {
return nil, err
}
gossip_topics := []sentinel.GossipTopic{
sentinel.BeaconBlockSsz,
sentinel.BeaconAggregateAndProofSsz,
sentinel.VoluntaryExitSsz,
sentinel.ProposerSlashingSsz,
sentinel.AttesterSlashingSsz,
sentinel.LightClientFinalityUpdateSsz,
sentinel.LightClientOptimisticUpdateSsz,
}
for _, v := range gossip_topics {
// now lets separately connect to the gossip topics. this joins the room
subscriber, err := sent.SubscribeGossip(v)
if err != nil {
log.Error("failed to start sentinel", "err", err)
}
// actually start the subscription, aka listening and sending packets to the sentinel recv channel
err = subscriber.Listen()
if err != nil {
log.Error("failed to start sentinel", "err", err)
}
}
log.Info("Sentinel started", "enr", sent.String())
server := NewSentinelServer(ctx, sent)
go StartServe(server, srvCfg)
timeOutTimer := time.NewTimer(5 * time.Second)
WaitingLoop:
for {
select {
case <-timeOutTimer.C:
return nil, fmt.Errorf("[Server] timeout beginning server")
default:
if _, err := server.GetPeers(ctx, &lightrpc.EmptyRequest{}); err == nil {
break WaitingLoop
}
}
}
conn, err := grpc.DialContext(ctx, srvCfg.Addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, err
}
return lightrpc.NewSentinelClient(conn), nil
}
func StartServe(server *SentinelServer, srvCfg *ServerConfig) {
lis, err := net.Listen(srvCfg.Network, srvCfg.Addr)
if err != nil {
log.Warn("[Sentinel] could not serve service", "reason", err)
}
// Create a gRPC server
gRPCserver := grpc.NewServer()
go server.ListenToGossip()
// Regiser our server as a gRPC server
lightrpc.RegisterSentinelServer(gRPCserver, server)
if err := gRPCserver.Serve(lis); err != nil {
log.Warn("[Sentinel] could not serve service", "reason", err)
}
}