mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2024-12-22 19:50:36 +00:00
Gossip lightclient (#5549)
* added subscription to gossip * added all handlers with logs * disconnecting from peers with goodbye message received * added time out for stream
This commit is contained in:
parent
c2655ad6ef
commit
123d7379cc
@ -28,6 +28,8 @@ var (
|
||||
MainnetNetwork NetworkType = 0
|
||||
GoerliNetwork NetworkType = 1
|
||||
SepoliaNetwork NetworkType = 2
|
||||
TtfbTimeout time.Duration = 5 * time.Second
|
||||
RespTimeout time.Duration = 10 * time.Second
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -14,18 +14,41 @@
|
||||
package sentinel
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/ledgerwatch/erigon/cmd/lightclient/clparams"
|
||||
"github.com/ledgerwatch/log/v3"
|
||||
"github.com/libp2p/go-libp2p/core/network"
|
||||
"github.com/libp2p/go-libp2p/core/protocol"
|
||||
ssz "github.com/prysmaticlabs/fastssz"
|
||||
)
|
||||
|
||||
var (
|
||||
ProtocolPrefix = "/eth2/beacon_chain/req"
|
||||
reservedBytes = 128
|
||||
)
|
||||
|
||||
var handlers map[protocol.ID]network.StreamHandler = map[protocol.ID]network.StreamHandler{
|
||||
protocol.ID("/eth2/beacon_chain/req/ping/1/ssz_snappy"): pingHandler,
|
||||
protocol.ID(ProtocolPrefix + "/ping/1/ssz_snappy"): pingHandler,
|
||||
protocol.ID(ProtocolPrefix + "/status/1/ssz_snappy"): statusHandler,
|
||||
protocol.ID(ProtocolPrefix + "/goodbye/1/ssz_snappy"): goodbyeHandler,
|
||||
protocol.ID(ProtocolPrefix + "/beacon_blocks_by_range/1/ssz_snappy"): blocksByRangeHandler,
|
||||
protocol.ID(ProtocolPrefix + "/beacon_blocks_by_root/1/ssz_snappy"): beaconBlocksByRootHandler,
|
||||
protocol.ID(ProtocolPrefix + "/metadata/1/ssz_snappy"): metadataHandler,
|
||||
}
|
||||
|
||||
func setDeadLines(stream network.Stream) {
|
||||
if err := stream.SetReadDeadline(time.Now().Add(clparams.TtfbTimeout)); err != nil {
|
||||
log.Error("failed to set stream read dead line", "err", err)
|
||||
}
|
||||
if err := stream.SetWriteDeadline(time.Now().Add(clparams.RespTimeout)); err != nil {
|
||||
log.Error("failed to set stream write dead line", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
func pingHandler(stream network.Stream) {
|
||||
pingBytes := make([]byte, 8)
|
||||
setDeadLines(stream)
|
||||
n, err := stream.Read(pingBytes)
|
||||
if err != nil {
|
||||
log.Warn("handler crashed", "err", err)
|
||||
@ -47,6 +70,56 @@ func pingHandler(stream network.Stream) {
|
||||
}
|
||||
}
|
||||
|
||||
func statusHandler(stream network.Stream) {
|
||||
setDeadLines(stream)
|
||||
|
||||
log.Info("Got status request")
|
||||
}
|
||||
|
||||
func goodbyeHandler(stream network.Stream) {
|
||||
goodByeBytes := make([]byte, 8)
|
||||
setDeadLines(stream)
|
||||
n, err := stream.Read(goodByeBytes)
|
||||
if err != nil {
|
||||
log.Warn("Goodbye handler crashed", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
if n != 8 {
|
||||
log.Warn("Invalid goodbye message received")
|
||||
return
|
||||
}
|
||||
|
||||
log.Info("[Lightclient] Received", "goodbye", ssz.UnmarshallUint64(goodByeBytes))
|
||||
n, err = stream.Write(goodByeBytes)
|
||||
if err != nil {
|
||||
log.Warn("Goodbye handler crashed", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
if n != 8 {
|
||||
log.Warn("Could not send Goodbye")
|
||||
}
|
||||
|
||||
peerId := stream.Conn().RemotePeer()
|
||||
disconnectPeerCh <- peerId
|
||||
}
|
||||
|
||||
func blocksByRangeHandler(stream network.Stream) {
|
||||
setDeadLines(stream)
|
||||
log.Info("Got block by range handler call")
|
||||
}
|
||||
|
||||
func beaconBlocksByRootHandler(stream network.Stream) {
|
||||
setDeadLines(stream)
|
||||
log.Info("Got beacon block by root handler call")
|
||||
}
|
||||
|
||||
func metadataHandler(stream network.Stream) {
|
||||
setDeadLines(stream)
|
||||
log.Info("Got metadata handler call")
|
||||
}
|
||||
|
||||
func (s *Sentinel) setupHandlers() {
|
||||
for id, handler := range handlers {
|
||||
s.host.SetStreamHandler(id, handler)
|
||||
|
@ -69,7 +69,12 @@ func (p *Peers) Penalize(pid peer.ID) {
|
||||
}
|
||||
|
||||
func (p *Peers) banBadPeer(pid peer.ID) {
|
||||
p.host.Peerstore().RemovePeer(pid)
|
||||
p.DisconnectPeer(pid)
|
||||
p.badPeers.Add(pid, []byte{0})
|
||||
log.Warn("[Peers] bad peers has been banned", "peer-id", pid)
|
||||
}
|
||||
|
||||
func (p *Peers) DisconnectPeer(pid peer.ID) {
|
||||
log.Info("[Peers] disconnecting from peer", "peer-id", pid)
|
||||
p.host.Peerstore().RemovePeer(pid)
|
||||
}
|
||||
|
@ -25,11 +25,15 @@ import (
|
||||
"github.com/ledgerwatch/erigon/p2p/enr"
|
||||
"github.com/ledgerwatch/log/v3"
|
||||
"github.com/libp2p/go-libp2p"
|
||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||
"github.com/libp2p/go-libp2p/core/host"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/libp2p/go-libp2p/p2p/protocol/identify"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
var disconnectPeerCh = make(chan peer.ID, 0)
|
||||
|
||||
type Sentinel struct {
|
||||
started bool
|
||||
listener *discover.UDPv5 // this is us in the network.
|
||||
@ -37,6 +41,7 @@ type Sentinel struct {
|
||||
host host.Host
|
||||
cfg SentinelConfig
|
||||
peers *peers.Peers
|
||||
pubsub *pubsub.PubSub
|
||||
}
|
||||
|
||||
func (s *Sentinel) createLocalNode(
|
||||
@ -114,9 +119,36 @@ func (s *Sentinel) createListener() (*discover.UDPv5, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-s.ctx.Done():
|
||||
close(disconnectPeerCh)
|
||||
break
|
||||
case pid := <-disconnectPeerCh:
|
||||
s.peers.DisconnectPeer(pid)
|
||||
default:
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return net, err
|
||||
}
|
||||
|
||||
func (s *Sentinel) pubsubOptions() []pubsub.Option {
|
||||
pubsubQueueSize := 600
|
||||
psOpts := []pubsub.Option{
|
||||
pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign),
|
||||
pubsub.WithNoAuthor(),
|
||||
pubsub.WithSubscriptionFilter(nil),
|
||||
pubsub.WithPeerOutboundQueueSize(pubsubQueueSize),
|
||||
pubsub.WithMaxMessageSize(int(s.cfg.NetworkConfig.GossipMaxSize)),
|
||||
pubsub.WithValidateQueueSize(pubsubQueueSize),
|
||||
pubsub.WithGossipSubParams(pubsub.DefaultGossipSubParams()),
|
||||
}
|
||||
return psOpts
|
||||
}
|
||||
|
||||
// This is just one of the examples from the libp2p repository.
|
||||
func New(ctx context.Context, cfg SentinelConfig) (*Sentinel, error) {
|
||||
s := &Sentinel{
|
||||
@ -137,6 +169,7 @@ func New(ctx context.Context, cfg SentinelConfig) (*Sentinel, error) {
|
||||
host.RemoveStreamHandler(identify.IDDelta)
|
||||
s.host = host
|
||||
s.peers = peers.New(s.host)
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
|
4
go.mod
4
go.mod
@ -102,6 +102,7 @@ require (
|
||||
github.com/golang/mock v1.6.0 // indirect
|
||||
github.com/google/gopacket v1.1.19 // indirect
|
||||
github.com/ipfs/go-cid v0.3.2 // indirect
|
||||
github.com/ipfs/go-log v1.0.5 // indirect
|
||||
github.com/ipfs/go-log/v2 v2.5.1 // indirect
|
||||
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
|
||||
github.com/klauspost/compress v1.15.10 // indirect
|
||||
@ -111,6 +112,7 @@ require (
|
||||
github.com/libp2p/go-cidranger v1.1.0 // indirect
|
||||
github.com/libp2p/go-flow-metrics v0.1.0 // indirect
|
||||
github.com/libp2p/go-libp2p-asn-util v0.2.0 // indirect
|
||||
github.com/libp2p/go-libp2p-pubsub v0.8.1 // indirect
|
||||
github.com/libp2p/go-mplex v0.7.0 // indirect
|
||||
github.com/libp2p/go-msgio v0.2.0 // indirect
|
||||
github.com/libp2p/go-nat v0.1.0 // indirect
|
||||
@ -141,6 +143,7 @@ require (
|
||||
github.com/multiformats/go-varint v0.0.6 // indirect
|
||||
github.com/onsi/ginkgo v1.16.5 // indirect
|
||||
github.com/opencontainers/runtime-spec v1.0.2 // indirect
|
||||
github.com/opentracing/opentracing-go v1.2.0 // indirect
|
||||
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
|
||||
github.com/prometheus/client_golang v1.13.0 // indirect
|
||||
github.com/prometheus/client_model v0.2.0 // indirect
|
||||
@ -149,6 +152,7 @@ require (
|
||||
github.com/prysmaticlabs/gohashtree v0.0.0-20220517220438-192ee5ae6982 // indirect
|
||||
github.com/raulk/go-watchdog v1.3.0 // indirect
|
||||
github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572 // indirect
|
||||
github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee // indirect
|
||||
go.opentelemetry.io/otel v1.8.0 // indirect
|
||||
go.opentelemetry.io/otel/trace v1.8.0 // indirect
|
||||
go.uber.org/multierr v1.8.0 // indirect
|
||||
|
12
go.sum
12
go.sum
@ -494,6 +494,9 @@ github.com/ipfs/go-cid v0.3.2 h1:OGgOd+JCFM+y1DjWPmVH+2/4POtpDzwcr7VgnB7mZXc=
|
||||
github.com/ipfs/go-cid v0.3.2/go.mod h1:gQ8pKqT/sUxGY+tIwy1RPpAojYu7jAyCp5Tz1svoupw=
|
||||
github.com/ipfs/go-detect-race v0.0.1 h1:qX/xay2W3E4Q1U7d9lNs1sU9nvguX0a7319XbyQ6cOk=
|
||||
github.com/ipfs/go-detect-race v0.0.1/go.mod h1:8BNT7shDZPo99Q74BpGMK+4D8Mn4j46UU0LZ723meps=
|
||||
github.com/ipfs/go-log v1.0.5 h1:2dOuUCB1Z7uoczMWgAyDck5JLb72zHzrMnGnCNNbvY8=
|
||||
github.com/ipfs/go-log v1.0.5/go.mod h1:j0b8ZoR+7+R99LD9jZ6+AJsrzkPbSXbZfGakb5JPtIo=
|
||||
github.com/ipfs/go-log/v2 v2.1.3/go.mod h1:/8d0SH3Su5Ooc31QlL1WysJhvyOTDCjcCZ9Axpmri6g=
|
||||
github.com/ipfs/go-log/v2 v2.5.1 h1:1XdUzF7048prq4aBjDQQ4SL5RxftpRGdXhNRwKSAlcY=
|
||||
github.com/ipfs/go-log/v2 v2.5.1/go.mod h1:prSpmC1Gpllc9UYWxDiZDreBYw7zp4Iqp1kOLU9U5UI=
|
||||
github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus=
|
||||
@ -578,6 +581,8 @@ github.com/libp2p/go-libp2p-asn-util v0.2.0 h1:rg3+Os8jbnO5DxkC7K/Utdi+DkY3q/d1/
|
||||
github.com/libp2p/go-libp2p-asn-util v0.2.0/go.mod h1:WoaWxbHKBymSN41hWSq/lGKJEca7TNm58+gGJi2WsLI=
|
||||
github.com/libp2p/go-libp2p-core v0.20.1 h1:fQz4BJyIFmSZAiTbKV8qoYhEH5Dtv/cVhZbG3Ib/+Cw=
|
||||
github.com/libp2p/go-libp2p-core v0.20.1/go.mod h1:6zR8H7CvQWgYLsbG4on6oLNSGcyKaYFSEYyDt51+bIY=
|
||||
github.com/libp2p/go-libp2p-pubsub v0.8.1 h1:hSw09NauFUaA0FLgQPBJp6QOy0a2n+HSkb8IeOx8OnY=
|
||||
github.com/libp2p/go-libp2p-pubsub v0.8.1/go.mod h1:e4kT+DYjzPUYGZeWk4I+oxCSYTXizzXii5LDRRhjKSw=
|
||||
github.com/libp2p/go-libp2p-testing v0.12.0 h1:EPvBb4kKMWO29qP4mZGyhVzUyR25dvfUIK5WDu6iPUA=
|
||||
github.com/libp2p/go-mplex v0.7.0 h1:BDhFZdlk5tbr0oyFq/xv/NPGfjbnrsDam1EvutpBDbY=
|
||||
github.com/libp2p/go-mplex v0.7.0/go.mod h1:rW8ThnRcYWft/Jb2jeORBmPd6xuG3dGxWN/W168L9EU=
|
||||
@ -727,6 +732,8 @@ github.com/opentracing-contrib/go-observer v0.0.0-20170622124052-a52f23424492/go
|
||||
github.com/opentracing/basictracer-go v1.0.0/go.mod h1:QfBfYuafItcjQuMwinw9GhYKwFXS9KnPs5lxoYwgW74=
|
||||
github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
|
||||
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
|
||||
github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs=
|
||||
github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc=
|
||||
github.com/openzipkin-contrib/zipkin-go-opentracing v0.4.5/go.mod h1:/wsWhb9smxSfWAKL3wpBW7V8scJMt8N8gnaMCS9E/cA=
|
||||
github.com/openzipkin/zipkin-go v0.1.1/go.mod h1:NtoC/o8u3JlF1lSlyPNswIbeQH9bJTmOf0Erfk+hxe8=
|
||||
github.com/openzipkin/zipkin-go v0.1.6/go.mod h1:QgAqvLzwWbR/WpD4A3cGpPtJrZXNIiJc5AZX7/PBEpw=
|
||||
@ -962,6 +969,8 @@ github.com/valyala/histogram v1.2.0 h1:wyYGAZZt3CpwUiIb9AU/Zbllg1llXyrtApRS815OL
|
||||
github.com/valyala/histogram v1.2.0/go.mod h1:Hb4kBwb4UxsaNbbbh+RRz8ZR6pdodR57tzWUS3BUzXY=
|
||||
github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU=
|
||||
github.com/viant/toolbox v0.24.0/go.mod h1:OxMCG57V0PXuIP2HNQrtJf2CjqdmbrOx5EkMILuUhzM=
|
||||
github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee h1:lYbXeSvJi5zk5GLKVuid9TVjS9a0OmLIDKTfoZBL6Ow=
|
||||
github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee/go.mod h1:m2aV4LZI4Aez7dP5PMyVKEHhUyEJ/RjmPEDOpDvudHg=
|
||||
github.com/willf/bitset v1.1.9/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4=
|
||||
github.com/willf/bitset v1.1.10/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4=
|
||||
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
|
||||
@ -992,6 +1001,7 @@ go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqe
|
||||
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
|
||||
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
|
||||
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
|
||||
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
|
||||
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
|
||||
go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ=
|
||||
go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
|
||||
@ -999,12 +1009,14 @@ go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpK
|
||||
go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=
|
||||
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
|
||||
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
|
||||
go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU=
|
||||
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
|
||||
go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8=
|
||||
go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak=
|
||||
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA=
|
||||
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
|
||||
go.uber.org/zap v1.13.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM=
|
||||
go.uber.org/zap v1.16.0/go.mod h1:MA8QOfq0BHJwdXa996Y4dYkAqRKB8/1K1QMMZVaNZjQ=
|
||||
go.uber.org/zap v1.19.1/go.mod h1:j3DNczoxDZroyBnOT1L/Q79cfUMGZxlv/9dzN7SM1rI=
|
||||
go.uber.org/zap v1.23.0 h1:OjGQ5KQDEUawVHxNwQgPpiypGHOxo2mNZsOqTak4fFY=
|
||||
go.uber.org/zap v1.23.0/go.mod h1:D+nX8jyLsMHMYrln8A0rJjFt/T/9/bGgIhAqxv5URuY=
|
||||
|
Loading…
Reference in New Issue
Block a user