Nishant Das 661cbc45ae
Vendor Leaky Bucket Implementation (#11560)
* add changes

* fix tests

* change to minute

* remove dep

* remove

* fix tests

* add test for period

* improve

* linter

* build files

* ci

* make it stricter

* fix tests

* fix

* Update beacon-chain/sync/rate_limiter.go

Co-authored-by: Preston Van Loon <preston@prysmaticlabs.com>

Co-authored-by: terencechain <terence@prysmaticlabs.com>
Co-authored-by: Preston Van Loon <preston@prysmaticlabs.com>
2022-10-20 16:40:13 -05:00

495 lines
15 KiB
Go

// Package p2p defines the network protocol implementation for Ethereum consensus
// used by beacon nodes, including peer discovery using discv5, gossip-sub
// using libp2p, and handing peer lifecycles + handshakes.
package p2p
import (
"context"
"crypto/ecdsa"
"sync"
"time"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/libp2p/go-libp2p"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/p2p/protocol/identify"
"github.com/multiformats/go-multiaddr"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v3/async"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/core/feed"
statefeed "github.com/prysmaticlabs/prysm/v3/beacon-chain/core/feed/state"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/encoder"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/peers"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/peers/scorers"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/types"
"github.com/prysmaticlabs/prysm/v3/config/params"
leakybucket "github.com/prysmaticlabs/prysm/v3/container/leaky-bucket"
prysmnetwork "github.com/prysmaticlabs/prysm/v3/network"
"github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1/metadata"
"github.com/prysmaticlabs/prysm/v3/runtime"
"github.com/prysmaticlabs/prysm/v3/time/slots"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)
var _ runtime.Service = (*Service)(nil)
// In the event that we are at our peer limit, we
// stop looking for new peers and instead poll
// for the current peer limit status for the time period
// defined below.
var pollingPeriod = 6 * time.Second
// Refresh rate of ENR set at twice per slot.
var refreshRate = slots.DivideSlotBy(2)
// maxBadResponses is the maximum number of bad responses from a peer before we stop talking to it.
const maxBadResponses = 5
// pubsubQueueSize is the size that we assign to our validation queue and outbound message queue for
// gossipsub.
const pubsubQueueSize = 600
// maxDialTimeout is the timeout for a single peer dial.
var maxDialTimeout = params.BeaconNetworkConfig().RespTimeout
// Service for managing peer to peer (p2p) networking.
type Service struct {
started bool
isPreGenesis bool
pingMethod func(ctx context.Context, id peer.ID) error
cancel context.CancelFunc
cfg *Config
peers *peers.Status
addrFilter *multiaddr.Filters
ipLimiter *leakybucket.Collector
privKey *ecdsa.PrivateKey
metaData metadata.Metadata
pubsub *pubsub.PubSub
joinedTopics map[string]*pubsub.Topic
joinedTopicsLock sync.Mutex
subnetsLock map[uint64]*sync.RWMutex
subnetsLockLock sync.Mutex // Lock access to subnetsLock
initializationLock sync.Mutex
dv5Listener Listener
startupErr error
stateNotifier statefeed.Notifier
ctx context.Context
host host.Host
genesisTime time.Time
genesisValidatorsRoot []byte
activeValidatorCount uint64
}
// NewService initializes a new p2p service compatible with shared.Service interface. No
// connections are made until the Start function is called during the service registry startup.
func NewService(ctx context.Context, cfg *Config) (*Service, error) {
var err error
ctx, cancel := context.WithCancel(ctx)
_ = cancel // govet fix for lost cancel. Cancel is handled in service.Stop().
s := &Service{
ctx: ctx,
stateNotifier: cfg.StateNotifier,
cancel: cancel,
cfg: cfg,
isPreGenesis: true,
joinedTopics: make(map[string]*pubsub.Topic, len(gossipTopicMappings)),
subnetsLock: make(map[uint64]*sync.RWMutex),
}
dv5Nodes := parseBootStrapAddrs(s.cfg.BootstrapNodeAddr)
cfg.Discv5BootStrapAddr = dv5Nodes
ipAddr := prysmnetwork.IPAddr()
s.privKey, err = privKey(s.cfg)
if err != nil {
log.WithError(err).Error("Failed to generate p2p private key")
return nil, err
}
s.metaData, err = metaDataFromConfig(s.cfg)
if err != nil {
log.WithError(err).Error("Failed to create peer metadata")
return nil, err
}
s.addrFilter, err = configureFilter(s.cfg)
if err != nil {
log.WithError(err).Error("Failed to create address filter")
return nil, err
}
s.ipLimiter = leakybucket.NewCollector(ipLimit, ipBurst, 30*time.Second, true /* deleteEmptyBuckets */)
opts := s.buildOptions(ipAddr, s.privKey)
h, err := libp2p.New(opts...)
if err != nil {
log.WithError(err).Error("Failed to create p2p host")
return nil, err
}
s.host = h
s.host.RemoveStreamHandler(identify.IDDelta)
// Gossipsub registration is done before we add in any new peers
// due to libp2p's gossipsub implementation not taking into
// account previously added peers when creating the gossipsub
// object.
psOpts := s.pubsubOptions()
// Set the pubsub global parameters that we require.
setPubSubParameters()
// Reinitialize them in the event we are running a custom config.
attestationSubnetCount = params.BeaconNetworkConfig().AttestationSubnetCount
syncCommsSubnetCount = params.BeaconConfig().SyncCommitteeSubnetCount
gs, err := pubsub.NewGossipSub(s.ctx, s.host, psOpts...)
if err != nil {
log.WithError(err).Error("Failed to start pubsub")
return nil, err
}
s.pubsub = gs
s.peers = peers.NewStatus(ctx, &peers.StatusConfig{
PeerLimit: int(s.cfg.MaxPeers),
ScorerParams: &scorers.Config{
BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{
Threshold: maxBadResponses,
DecayInterval: time.Hour,
},
},
})
// Initialize Data maps.
types.InitializeDataMaps()
return s, nil
}
// Start the p2p service.
func (s *Service) Start() {
if s.started {
log.Error("Attempted to start p2p service when it was already started")
return
}
// Waits until the state is initialized via an event feed.
// Used for fork-related data when connecting peers.
s.awaitStateInitialized()
s.isPreGenesis = false
var peersToWatch []string
if s.cfg.RelayNodeAddr != "" {
peersToWatch = append(peersToWatch, s.cfg.RelayNodeAddr)
if err := dialRelayNode(s.ctx, s.host, s.cfg.RelayNodeAddr); err != nil {
log.WithError(err).Errorf("Could not dial relay node")
}
}
if !s.cfg.NoDiscovery {
ipAddr := prysmnetwork.IPAddr()
listener, err := s.startDiscoveryV5(
ipAddr,
s.privKey,
)
if err != nil {
log.WithError(err).Fatal("Failed to start discovery")
s.startupErr = err
return
}
err = s.connectToBootnodes()
if err != nil {
log.WithError(err).Error("Could not add bootnode to the exclusion list")
s.startupErr = err
return
}
s.dv5Listener = listener
go s.listenForNewNodes()
}
s.started = true
if len(s.cfg.StaticPeers) > 0 {
addrs, err := PeersFromStringAddrs(s.cfg.StaticPeers)
if err != nil {
log.WithError(err).Error("Could not connect to static peer")
}
s.connectWithAllPeers(addrs)
}
// Initialize metadata according to the
// current epoch.
s.RefreshENR()
// if the current epoch is beyond bellatrix, increase the
// MaxGossipSize and MaxChunkSize to 10Mb.
s.increaseMaxMessageSizesForBellatrix()
// Periodic functions.
async.RunEvery(s.ctx, params.BeaconNetworkConfig().TtfbTimeout, func() {
ensurePeerConnections(s.ctx, s.host, peersToWatch...)
})
async.RunEvery(s.ctx, 30*time.Minute, s.Peers().Prune)
async.RunEvery(s.ctx, params.BeaconNetworkConfig().RespTimeout, s.updateMetrics)
async.RunEvery(s.ctx, refreshRate, s.RefreshENR)
async.RunEvery(s.ctx, 1*time.Minute, func() {
log.WithFields(logrus.Fields{
"inbound": len(s.peers.InboundConnected()),
"outbound": len(s.peers.OutboundConnected()),
"activePeers": len(s.peers.Active()),
}).Info("Peer summary")
})
multiAddrs := s.host.Network().ListenAddresses()
logIPAddr(s.host.ID(), multiAddrs...)
p2pHostAddress := s.cfg.HostAddress
p2pTCPPort := s.cfg.TCPPort
if p2pHostAddress != "" {
logExternalIPAddr(s.host.ID(), p2pHostAddress, p2pTCPPort)
verifyConnectivity(p2pHostAddress, p2pTCPPort, "tcp")
}
p2pHostDNS := s.cfg.HostDNS
if p2pHostDNS != "" {
logExternalDNSAddr(s.host.ID(), p2pHostDNS, p2pTCPPort)
}
go s.forkWatcher()
}
// Stop the p2p service and terminate all peer connections.
func (s *Service) Stop() error {
defer s.cancel()
s.started = false
if s.dv5Listener != nil {
s.dv5Listener.Close()
}
return nil
}
// Status of the p2p service. Will return an error if the service is considered unhealthy to
// indicate that this node should not serve traffic until the issue has been resolved.
func (s *Service) Status() error {
if s.isPreGenesis {
return nil
}
if !s.started {
return errors.New("not running")
}
if s.startupErr != nil {
return s.startupErr
}
if s.genesisTime.IsZero() {
return errors.New("no genesis time set")
}
return nil
}
// Started returns true if the p2p service has successfully started.
func (s *Service) Started() bool {
return s.started
}
// Encoding returns the configured networking encoding.
func (_ *Service) Encoding() encoder.NetworkEncoding {
return &encoder.SszNetworkEncoder{}
}
// PubSub returns the p2p pubsub framework.
func (s *Service) PubSub() *pubsub.PubSub {
return s.pubsub
}
// Host returns the currently running libp2p
// host of the service.
func (s *Service) Host() host.Host {
return s.host
}
// SetStreamHandler sets the protocol handler on the p2p host multiplexer.
// This method is a pass through to libp2pcore.Host.SetStreamHandler.
func (s *Service) SetStreamHandler(topic string, handler network.StreamHandler) {
s.host.SetStreamHandler(protocol.ID(topic), handler)
}
// PeerID returns the Peer ID of the local peer.
func (s *Service) PeerID() peer.ID {
return s.host.ID()
}
// Disconnect from a peer.
func (s *Service) Disconnect(pid peer.ID) error {
return s.host.Network().ClosePeer(pid)
}
// Connect to a specific peer.
func (s *Service) Connect(pi peer.AddrInfo) error {
return s.host.Connect(s.ctx, pi)
}
// Peers returns the peer status interface.
func (s *Service) Peers() *peers.Status {
return s.peers
}
// ENR returns the local node's current ENR.
func (s *Service) ENR() *enr.Record {
if s.dv5Listener == nil {
return nil
}
return s.dv5Listener.Self().Record()
}
// DiscoveryAddresses represents our enr addresses as multiaddresses.
func (s *Service) DiscoveryAddresses() ([]multiaddr.Multiaddr, error) {
if s.dv5Listener == nil {
return nil, nil
}
return convertToUdpMultiAddr(s.dv5Listener.Self())
}
// Metadata returns a copy of the peer's metadata.
func (s *Service) Metadata() metadata.Metadata {
return s.metaData.Copy()
}
// MetadataSeq returns the metadata sequence number.
func (s *Service) MetadataSeq() uint64 {
return s.metaData.SequenceNumber()
}
// AddPingMethod adds the metadata ping rpc method to the p2p service, so that it can
// be used to refresh ENR.
func (s *Service) AddPingMethod(reqFunc func(ctx context.Context, id peer.ID) error) {
s.pingMethod = reqFunc
}
func (s *Service) pingPeers() {
if s.pingMethod == nil {
return
}
for _, pid := range s.peers.Connected() {
go func(id peer.ID) {
if err := s.pingMethod(s.ctx, id); err != nil {
log.WithField("peer", id).WithError(err).Debug("Failed to ping peer")
}
}(pid)
}
}
// Waits for the beacon state to be initialized, important
// for initializing the p2p service as p2p needs to be aware
// of genesis information for peering.
func (s *Service) awaitStateInitialized() {
s.initializationLock.Lock()
defer s.initializationLock.Unlock()
if s.isInitialized() {
return
}
stateChannel := make(chan *feed.Event, 1)
stateSub := s.stateNotifier.StateFeed().Subscribe(stateChannel)
cleanup := stateSub.Unsubscribe
defer cleanup()
for {
select {
case event := <-stateChannel:
if event.Type == statefeed.Initialized {
data, ok := event.Data.(*statefeed.InitializedData)
if !ok {
// log.Fatalf will prevent defer from being called
cleanup()
log.Fatalf("Received wrong data over state initialized feed: %v", data)
}
s.genesisTime = data.StartTime
s.genesisValidatorsRoot = data.GenesisValidatorsRoot
_, err := s.currentForkDigest() // initialize fork digest cache
if err != nil {
log.WithError(err).Error("Could not initialize fork digest")
}
return
}
case <-s.ctx.Done():
log.Debug("Context closed, exiting goroutine")
return
}
}
}
func (s *Service) connectWithAllPeers(multiAddrs []multiaddr.Multiaddr) {
addrInfos, err := peer.AddrInfosFromP2pAddrs(multiAddrs...)
if err != nil {
log.WithError(err).Error("Could not convert to peer address info's from multiaddresses")
return
}
for _, info := range addrInfos {
// make each dial non-blocking
go func(info peer.AddrInfo) {
if err := s.connectWithPeer(s.ctx, info); err != nil {
log.WithError(err).Tracef("Could not connect with peer %s", info.String())
}
}(info)
}
}
func (s *Service) connectWithPeer(ctx context.Context, info peer.AddrInfo) error {
ctx, span := trace.StartSpan(ctx, "p2p.connectWithPeer")
defer span.End()
if info.ID == s.host.ID() {
return nil
}
if s.Peers().IsBad(info.ID) {
return errors.New("refused to connect to bad peer")
}
ctx, cancel := context.WithTimeout(ctx, maxDialTimeout)
defer cancel()
if err := s.host.Connect(ctx, info); err != nil {
s.Peers().Scorers().BadResponsesScorer().Increment(info.ID)
return err
}
return nil
}
func (s *Service) connectToBootnodes() error {
nodes := make([]*enode.Node, 0, len(s.cfg.Discv5BootStrapAddr))
for _, addr := range s.cfg.Discv5BootStrapAddr {
bootNode, err := enode.Parse(enode.ValidSchemes, addr)
if err != nil {
return err
}
// do not dial bootnodes with their tcp ports not set
if err := bootNode.Record().Load(enr.WithEntry("tcp", new(enr.TCP))); err != nil {
if !enr.IsNotFound(err) {
log.WithError(err).Error("Could not retrieve tcp port")
}
continue
}
nodes = append(nodes, bootNode)
}
multiAddresses := convertToMultiAddr(nodes)
s.connectWithAllPeers(multiAddresses)
return nil
}
// Returns true if the service is aware of the genesis time and genesis validators root. This is
// required for discovery and pubsub validation.
func (s *Service) isInitialized() bool {
return !s.genesisTime.IsZero() && len(s.genesisValidatorsRoot) == 32
}
// increaseMaxMessageSizesForBellatrix increases the max sizes of gossip and chunk from 1 Mb to 10Mb,
// if the current epoch is or above the configured BellatrixForkEpoch.
func (s *Service) increaseMaxMessageSizesForBellatrix() {
currentSlot := slots.Since(s.genesisTime)
currentEpoch := slots.ToEpoch(currentSlot)
if currentEpoch >= params.BeaconConfig().BellatrixForkEpoch {
encoder.SetMaxGossipSizeForBellatrix()
encoder.SetMaxChunkSizeForBellatrix()
}
}