From 123d7379cc1ab2640c37bbf76f24fb347440b0ed Mon Sep 17 00:00:00 2001 From: Enrique Jose Avila Asapche Date: Tue, 27 Sep 2022 21:34:06 +0100 Subject: [PATCH] Gossip lightclient (#5549) * added subscription to gossip * added all handlers with logs * disconnecting from peers with goodbye message received * added time out for stream --- cmd/lightclient/clparams/config.go | 8 ++- cmd/lightclient/sentinel/handlers.go | 75 ++++++++++++++++++++++++- cmd/lightclient/sentinel/peers/peers.go | 7 ++- cmd/lightclient/sentinel/sentinel.go | 33 +++++++++++ go.mod | 4 ++ go.sum | 12 ++++ 6 files changed, 134 insertions(+), 5 deletions(-) diff --git a/cmd/lightclient/clparams/config.go b/cmd/lightclient/clparams/config.go index f7fa33894..aa73bbd08 100644 --- a/cmd/lightclient/clparams/config.go +++ b/cmd/lightclient/clparams/config.go @@ -25,9 +25,11 @@ import ( type NetworkType int var ( - MainnetNetwork NetworkType = 0 - GoerliNetwork NetworkType = 1 - SepoliaNetwork NetworkType = 2 + MainnetNetwork NetworkType = 0 + GoerliNetwork NetworkType = 1 + SepoliaNetwork NetworkType = 2 + TtfbTimeout time.Duration = 5 * time.Second + RespTimeout time.Duration = 10 * time.Second ) var ( diff --git a/cmd/lightclient/sentinel/handlers.go b/cmd/lightclient/sentinel/handlers.go index e6aaca386..f97b9a685 100644 --- a/cmd/lightclient/sentinel/handlers.go +++ b/cmd/lightclient/sentinel/handlers.go @@ -14,18 +14,41 @@ package sentinel import ( + "time" + + "github.com/ledgerwatch/erigon/cmd/lightclient/clparams" "github.com/ledgerwatch/log/v3" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/protocol" ssz "github.com/prysmaticlabs/fastssz" ) +var ( + ProtocolPrefix = "/eth2/beacon_chain/req" + reservedBytes = 128 +) + var handlers map[protocol.ID]network.StreamHandler = map[protocol.ID]network.StreamHandler{ - protocol.ID("/eth2/beacon_chain/req/ping/1/ssz_snappy"): pingHandler, + protocol.ID(ProtocolPrefix + "/ping/1/ssz_snappy"): pingHandler, + protocol.ID(ProtocolPrefix + "/status/1/ssz_snappy"): statusHandler, + protocol.ID(ProtocolPrefix + "/goodbye/1/ssz_snappy"): goodbyeHandler, + protocol.ID(ProtocolPrefix + "/beacon_blocks_by_range/1/ssz_snappy"): blocksByRangeHandler, + protocol.ID(ProtocolPrefix + "/beacon_blocks_by_root/1/ssz_snappy"): beaconBlocksByRootHandler, + protocol.ID(ProtocolPrefix + "/metadata/1/ssz_snappy"): metadataHandler, +} + +func setDeadLines(stream network.Stream) { + if err := stream.SetReadDeadline(time.Now().Add(clparams.TtfbTimeout)); err != nil { + log.Error("failed to set stream read dead line", "err", err) + } + if err := stream.SetWriteDeadline(time.Now().Add(clparams.RespTimeout)); err != nil { + log.Error("failed to set stream write dead line", "err", err) + } } func pingHandler(stream network.Stream) { pingBytes := make([]byte, 8) + setDeadLines(stream) n, err := stream.Read(pingBytes) if err != nil { log.Warn("handler crashed", "err", err) @@ -47,6 +70,56 @@ func pingHandler(stream network.Stream) { } } +func statusHandler(stream network.Stream) { + setDeadLines(stream) + + log.Info("Got status request") +} + +func goodbyeHandler(stream network.Stream) { + goodByeBytes := make([]byte, 8) + setDeadLines(stream) + n, err := stream.Read(goodByeBytes) + if err != nil { + log.Warn("Goodbye handler crashed", "err", err) + return + } + + if n != 8 { + log.Warn("Invalid goodbye message received") + return + } + + log.Info("[Lightclient] Received", "goodbye", ssz.UnmarshallUint64(goodByeBytes)) + n, err = stream.Write(goodByeBytes) + if err != nil { + log.Warn("Goodbye handler crashed", "err", err) + return + } + + if n != 8 { + log.Warn("Could not send Goodbye") + } + + peerId := stream.Conn().RemotePeer() + disconnectPeerCh <- peerId +} + +func blocksByRangeHandler(stream network.Stream) { + setDeadLines(stream) + log.Info("Got block by range handler call") +} + +func beaconBlocksByRootHandler(stream network.Stream) { + setDeadLines(stream) + log.Info("Got beacon block by root handler call") +} + +func metadataHandler(stream network.Stream) { + setDeadLines(stream) + log.Info("Got metadata handler call") +} + func (s *Sentinel) setupHandlers() { for id, handler := range handlers { s.host.SetStreamHandler(id, handler) diff --git a/cmd/lightclient/sentinel/peers/peers.go b/cmd/lightclient/sentinel/peers/peers.go index 033c34b30..50fcd2ac8 100644 --- a/cmd/lightclient/sentinel/peers/peers.go +++ b/cmd/lightclient/sentinel/peers/peers.go @@ -69,7 +69,12 @@ func (p *Peers) Penalize(pid peer.ID) { } func (p *Peers) banBadPeer(pid peer.ID) { - p.host.Peerstore().RemovePeer(pid) + p.DisconnectPeer(pid) p.badPeers.Add(pid, []byte{0}) log.Warn("[Peers] bad peers has been banned", "peer-id", pid) } + +func (p *Peers) DisconnectPeer(pid peer.ID) { + log.Info("[Peers] disconnecting from peer", "peer-id", pid) + p.host.Peerstore().RemovePeer(pid) +} diff --git a/cmd/lightclient/sentinel/sentinel.go b/cmd/lightclient/sentinel/sentinel.go index 95f364c1b..af78cd488 100644 --- a/cmd/lightclient/sentinel/sentinel.go +++ b/cmd/lightclient/sentinel/sentinel.go @@ -25,11 +25,15 @@ import ( "github.com/ledgerwatch/erigon/p2p/enr" "github.com/ledgerwatch/log/v3" "github.com/libp2p/go-libp2p" + pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/p2p/protocol/identify" "github.com/pkg/errors" ) +var disconnectPeerCh = make(chan peer.ID, 0) + type Sentinel struct { started bool listener *discover.UDPv5 // this is us in the network. @@ -37,6 +41,7 @@ type Sentinel struct { host host.Host cfg SentinelConfig peers *peers.Peers + pubsub *pubsub.PubSub } func (s *Sentinel) createLocalNode( @@ -114,9 +119,36 @@ func (s *Sentinel) createListener() (*discover.UDPv5, error) { return nil, err } + go func() { + for { + select { + case <-s.ctx.Done(): + close(disconnectPeerCh) + break + case pid := <-disconnectPeerCh: + s.peers.DisconnectPeer(pid) + default: + } + } + }() + return net, err } +func (s *Sentinel) pubsubOptions() []pubsub.Option { + pubsubQueueSize := 600 + psOpts := []pubsub.Option{ + pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign), + pubsub.WithNoAuthor(), + pubsub.WithSubscriptionFilter(nil), + pubsub.WithPeerOutboundQueueSize(pubsubQueueSize), + pubsub.WithMaxMessageSize(int(s.cfg.NetworkConfig.GossipMaxSize)), + pubsub.WithValidateQueueSize(pubsubQueueSize), + pubsub.WithGossipSubParams(pubsub.DefaultGossipSubParams()), + } + return psOpts +} + // This is just one of the examples from the libp2p repository. func New(ctx context.Context, cfg SentinelConfig) (*Sentinel, error) { s := &Sentinel{ @@ -137,6 +169,7 @@ func New(ctx context.Context, cfg SentinelConfig) (*Sentinel, error) { host.RemoveStreamHandler(identify.IDDelta) s.host = host s.peers = peers.New(s.host) + return s, nil } diff --git a/go.mod b/go.mod index c6aa8cc15..71eb49c24 100644 --- a/go.mod +++ b/go.mod @@ -102,6 +102,7 @@ require ( github.com/golang/mock v1.6.0 // indirect github.com/google/gopacket v1.1.19 // indirect github.com/ipfs/go-cid v0.3.2 // indirect + github.com/ipfs/go-log v1.0.5 // indirect github.com/ipfs/go-log/v2 v2.5.1 // indirect github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect github.com/klauspost/compress v1.15.10 // indirect @@ -111,6 +112,7 @@ require ( github.com/libp2p/go-cidranger v1.1.0 // indirect github.com/libp2p/go-flow-metrics v0.1.0 // indirect github.com/libp2p/go-libp2p-asn-util v0.2.0 // indirect + github.com/libp2p/go-libp2p-pubsub v0.8.1 // indirect github.com/libp2p/go-mplex v0.7.0 // indirect github.com/libp2p/go-msgio v0.2.0 // indirect github.com/libp2p/go-nat v0.1.0 // indirect @@ -141,6 +143,7 @@ require ( github.com/multiformats/go-varint v0.0.6 // indirect github.com/onsi/ginkgo v1.16.5 // indirect github.com/opencontainers/runtime-spec v1.0.2 // indirect + github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect github.com/prometheus/client_golang v1.13.0 // indirect github.com/prometheus/client_model v0.2.0 // indirect @@ -149,6 +152,7 @@ require ( github.com/prysmaticlabs/gohashtree v0.0.0-20220517220438-192ee5ae6982 // indirect github.com/raulk/go-watchdog v1.3.0 // indirect github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572 // indirect + github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee // indirect go.opentelemetry.io/otel v1.8.0 // indirect go.opentelemetry.io/otel/trace v1.8.0 // indirect go.uber.org/multierr v1.8.0 // indirect diff --git a/go.sum b/go.sum index 79574701f..1b801bed1 100644 --- a/go.sum +++ b/go.sum @@ -494,6 +494,9 @@ github.com/ipfs/go-cid v0.3.2 h1:OGgOd+JCFM+y1DjWPmVH+2/4POtpDzwcr7VgnB7mZXc= github.com/ipfs/go-cid v0.3.2/go.mod h1:gQ8pKqT/sUxGY+tIwy1RPpAojYu7jAyCp5Tz1svoupw= github.com/ipfs/go-detect-race v0.0.1 h1:qX/xay2W3E4Q1U7d9lNs1sU9nvguX0a7319XbyQ6cOk= github.com/ipfs/go-detect-race v0.0.1/go.mod h1:8BNT7shDZPo99Q74BpGMK+4D8Mn4j46UU0LZ723meps= +github.com/ipfs/go-log v1.0.5 h1:2dOuUCB1Z7uoczMWgAyDck5JLb72zHzrMnGnCNNbvY8= +github.com/ipfs/go-log v1.0.5/go.mod h1:j0b8ZoR+7+R99LD9jZ6+AJsrzkPbSXbZfGakb5JPtIo= +github.com/ipfs/go-log/v2 v2.1.3/go.mod h1:/8d0SH3Su5Ooc31QlL1WysJhvyOTDCjcCZ9Axpmri6g= github.com/ipfs/go-log/v2 v2.5.1 h1:1XdUzF7048prq4aBjDQQ4SL5RxftpRGdXhNRwKSAlcY= github.com/ipfs/go-log/v2 v2.5.1/go.mod h1:prSpmC1Gpllc9UYWxDiZDreBYw7zp4Iqp1kOLU9U5UI= github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus= @@ -578,6 +581,8 @@ github.com/libp2p/go-libp2p-asn-util v0.2.0 h1:rg3+Os8jbnO5DxkC7K/Utdi+DkY3q/d1/ github.com/libp2p/go-libp2p-asn-util v0.2.0/go.mod h1:WoaWxbHKBymSN41hWSq/lGKJEca7TNm58+gGJi2WsLI= github.com/libp2p/go-libp2p-core v0.20.1 h1:fQz4BJyIFmSZAiTbKV8qoYhEH5Dtv/cVhZbG3Ib/+Cw= github.com/libp2p/go-libp2p-core v0.20.1/go.mod h1:6zR8H7CvQWgYLsbG4on6oLNSGcyKaYFSEYyDt51+bIY= +github.com/libp2p/go-libp2p-pubsub v0.8.1 h1:hSw09NauFUaA0FLgQPBJp6QOy0a2n+HSkb8IeOx8OnY= +github.com/libp2p/go-libp2p-pubsub v0.8.1/go.mod h1:e4kT+DYjzPUYGZeWk4I+oxCSYTXizzXii5LDRRhjKSw= github.com/libp2p/go-libp2p-testing v0.12.0 h1:EPvBb4kKMWO29qP4mZGyhVzUyR25dvfUIK5WDu6iPUA= github.com/libp2p/go-mplex v0.7.0 h1:BDhFZdlk5tbr0oyFq/xv/NPGfjbnrsDam1EvutpBDbY= github.com/libp2p/go-mplex v0.7.0/go.mod h1:rW8ThnRcYWft/Jb2jeORBmPd6xuG3dGxWN/W168L9EU= @@ -727,6 +732,8 @@ github.com/opentracing-contrib/go-observer v0.0.0-20170622124052-a52f23424492/go github.com/opentracing/basictracer-go v1.0.0/go.mod h1:QfBfYuafItcjQuMwinw9GhYKwFXS9KnPs5lxoYwgW74= github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= +github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs= +github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc= github.com/openzipkin-contrib/zipkin-go-opentracing v0.4.5/go.mod h1:/wsWhb9smxSfWAKL3wpBW7V8scJMt8N8gnaMCS9E/cA= github.com/openzipkin/zipkin-go v0.1.1/go.mod h1:NtoC/o8u3JlF1lSlyPNswIbeQH9bJTmOf0Erfk+hxe8= github.com/openzipkin/zipkin-go v0.1.6/go.mod h1:QgAqvLzwWbR/WpD4A3cGpPtJrZXNIiJc5AZX7/PBEpw= @@ -962,6 +969,8 @@ github.com/valyala/histogram v1.2.0 h1:wyYGAZZt3CpwUiIb9AU/Zbllg1llXyrtApRS815OL github.com/valyala/histogram v1.2.0/go.mod h1:Hb4kBwb4UxsaNbbbh+RRz8ZR6pdodR57tzWUS3BUzXY= github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU= github.com/viant/toolbox v0.24.0/go.mod h1:OxMCG57V0PXuIP2HNQrtJf2CjqdmbrOx5EkMILuUhzM= +github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee h1:lYbXeSvJi5zk5GLKVuid9TVjS9a0OmLIDKTfoZBL6Ow= +github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee/go.mod h1:m2aV4LZI4Aez7dP5PMyVKEHhUyEJ/RjmPEDOpDvudHg= github.com/willf/bitset v1.1.9/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4= github.com/willf/bitset v1.1.10/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= @@ -992,6 +1001,7 @@ go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqe go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= +go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ= go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= @@ -999,12 +1009,14 @@ go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpK go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= +go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8= go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= go.uber.org/zap v1.13.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM= +go.uber.org/zap v1.16.0/go.mod h1:MA8QOfq0BHJwdXa996Y4dYkAqRKB8/1K1QMMZVaNZjQ= go.uber.org/zap v1.19.1/go.mod h1:j3DNczoxDZroyBnOT1L/Q79cfUMGZxlv/9dzN7SM1rI= go.uber.org/zap v1.23.0 h1:OjGQ5KQDEUawVHxNwQgPpiypGHOxo2mNZsOqTak4fFY= go.uber.org/zap v1.23.0/go.mod h1:D+nX8jyLsMHMYrln8A0rJjFt/T/9/bGgIhAqxv5URuY=