mirror of
https://gitlab.com/pulsechaincom/prysm-pulse.git
synced 2024-12-25 12:57:18 +00:00
18b3203f31
* Initial pass * Add metric to measure success * Use a subnet RWLock to prevent duplicate requests, give up after 3 attempts * push latest commented code * try with non-blocking broadcast * Add feature flag, ignore parent deadline if any * Add slot as metadata * add tests * gaz Co-authored-by: nisdas <nishdas93@gmail.com> Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
146 lines
4.1 KiB
Go
146 lines
4.1 KiB
Go
package p2p
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
|
|
"github.com/ethereum/go-ethereum/p2p/enode"
|
|
"github.com/ethereum/go-ethereum/p2p/enr"
|
|
"github.com/libp2p/go-libp2p-core/network"
|
|
"github.com/prysmaticlabs/go-bitfield"
|
|
"go.opencensus.io/trace"
|
|
|
|
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
|
|
"github.com/prysmaticlabs/prysm/shared/params"
|
|
)
|
|
|
|
var attestationSubnetCount = params.BeaconNetworkConfig().AttestationSubnetCount
|
|
|
|
var attSubnetEnrKey = params.BeaconNetworkConfig().AttSubnetKey
|
|
|
|
// FindPeersWithSubnet performs a network search for peers
|
|
// subscribed to a particular subnet. Then we try to connect
|
|
// with those peers.
|
|
func (s *Service) FindPeersWithSubnet(ctx context.Context, index uint64) (bool, error) {
|
|
ctx, span := trace.StartSpan(ctx, "p2p.FindPeersWithSubnet")
|
|
defer span.End()
|
|
|
|
span.AddAttributes(trace.Int64Attribute("index", int64(index)))
|
|
|
|
if s.dv5Listener == nil {
|
|
// return if discovery isn't set
|
|
return false, nil
|
|
}
|
|
iterator := s.dv5Listener.RandomNodes()
|
|
nodes := enode.ReadNodes(iterator, lookupLimit)
|
|
exists := false
|
|
for _, node := range nodes {
|
|
if err := ctx.Err(); err != nil {
|
|
return false, err
|
|
}
|
|
if node.IP() == nil {
|
|
continue
|
|
}
|
|
// do not look for nodes with no tcp port set
|
|
if err := node.Record().Load(enr.WithEntry("tcp", new(enr.TCP))); err != nil {
|
|
if !enr.IsNotFound(err) {
|
|
log.WithError(err).Debug("Could not retrieve tcp port")
|
|
}
|
|
continue
|
|
}
|
|
subnets, err := retrieveAttSubnets(node.Record())
|
|
if err != nil {
|
|
log.Debugf("could not retrieve subnets: %v", err)
|
|
continue
|
|
}
|
|
for _, comIdx := range subnets {
|
|
if comIdx == index {
|
|
info, multiAddr, err := convertToAddrInfo(node)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
if s.peers.IsActive(info.ID) {
|
|
exists = true
|
|
continue
|
|
}
|
|
if s.host.Network().Connectedness(info.ID) == network.Connected {
|
|
exists = true
|
|
continue
|
|
}
|
|
s.peers.Add(node.Record(), info.ID, multiAddr, network.DirUnknown)
|
|
if err := s.connectWithPeer(ctx, *info); err != nil {
|
|
log.WithError(err).Tracef("Could not connect with peer %s", info.String())
|
|
continue
|
|
}
|
|
exists = true
|
|
}
|
|
}
|
|
}
|
|
return exists, nil
|
|
}
|
|
|
|
func (s *Service) hasPeerWithSubnet(subnet uint64) bool {
|
|
return len(s.Peers().SubscribedToSubnet(subnet)) > 0
|
|
}
|
|
|
|
// Updates the service's discv5 listener record's attestation subnet
|
|
// with a new value for a bitfield of subnets tracked. It also updates
|
|
// the node's metadata by increasing the sequence number and the
|
|
// subnets tracked by the node.
|
|
func (s *Service) updateSubnetRecordWithMetadata(bitV bitfield.Bitvector64) {
|
|
entry := enr.WithEntry(attSubnetEnrKey, &bitV)
|
|
s.dv5Listener.LocalNode().Set(entry)
|
|
s.metaData = &pb.MetaData{
|
|
SeqNumber: s.metaData.SeqNumber + 1,
|
|
Attnets: bitV,
|
|
}
|
|
}
|
|
|
|
// Initializes a bitvector of attestation subnets beacon nodes is subscribed to
|
|
// and creates a new ENR entry with its default value.
|
|
func intializeAttSubnets(node *enode.LocalNode) *enode.LocalNode {
|
|
bitV := bitfield.NewBitvector64()
|
|
entry := enr.WithEntry(attSubnetEnrKey, bitV.Bytes())
|
|
node.Set(entry)
|
|
return node
|
|
}
|
|
|
|
// Reads the attestation subnets entry from a node's ENR and determines
|
|
// the committee indices of the attestation subnets the node is subscribed to.
|
|
func retrieveAttSubnets(record *enr.Record) ([]uint64, error) {
|
|
bitV, err := retrieveBitvector(record)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
committeeIdxs := []uint64{}
|
|
for i := uint64(0); i < attestationSubnetCount; i++ {
|
|
if bitV.BitAt(i) {
|
|
committeeIdxs = append(committeeIdxs, i)
|
|
}
|
|
}
|
|
return committeeIdxs, nil
|
|
}
|
|
|
|
// Parses the attestation subnets ENR entry in a node and extracts its value
|
|
// as a bitvector for further manipulation.
|
|
func retrieveBitvector(record *enr.Record) (bitfield.Bitvector64, error) {
|
|
bitV := bitfield.NewBitvector64()
|
|
entry := enr.WithEntry(attSubnetEnrKey, &bitV)
|
|
err := record.Load(entry)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return bitV, nil
|
|
}
|
|
|
|
func (s *Service) subnetLocker(i uint64) *sync.RWMutex {
|
|
s.subnetsLockLock.Lock()
|
|
defer s.subnetsLockLock.Unlock()
|
|
l, ok := s.subnetsLock[i]
|
|
if !ok {
|
|
l = &sync.RWMutex{}
|
|
s.subnetsLock[i] = l
|
|
}
|
|
return l
|
|
}
|