prysm-pulse/cmd/prysmctl/p2p/client.go
Raul Jordan 100ca0ebaf
Prysmctl Command to Request Beacon Nodes for Block Ranges Over P2P (#11035)
* first

* attempt p2p connect send tool

* attempt

* stream registration

* trying to register

* attempt

* workinnnn

* begin

* p2p prysmctl tool

* ignore

* fix

* delete deprecated

* p2p smaller iface surface area

* further p2p refactor

* gaz

* better logging

* process

* all functionality

* fix up

* rhandle

* v2 req

* cmd

* send sub

* v1 handle

* show head slot

* cmd

* cmd lib

* gazelle fix

* bazel

* gaz

* work on the handshake items

* prevent dial to self

* add config awareness

* gaz

* inferring host addrs from p2p

* initialize data mappings

* add own mock

* fix up logic

* gaz

* add img

* gaz

* add images

* builds

* builds

* nishant feedback:

Co-authored-by: Nishant Das <nishdas93@gmail.com>
2022-08-17 06:38:57 +00:00

231 lines
6.9 KiB
Go

package p2p
import (
"context"
"crypto/ecdsa"
"crypto/rand"
"net"
"time"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/host"
corenet "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-libp2p/p2p/protocol/identify"
"github.com/libp2p/go-libp2p/p2p/security/noise"
"github.com/libp2p/go-libp2p/p2p/transport/tcp"
"github.com/pkg/errors"
ssz "github.com/prysmaticlabs/fastssz"
"github.com/prysmaticlabs/go-bitfield"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/encoder"
"github.com/prysmaticlabs/prysm/v3/consensus-types/wrapper"
ecdsaprysm "github.com/prysmaticlabs/prysm/v3/crypto/ecdsa"
"github.com/prysmaticlabs/prysm/v3/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v3/monitoring/tracing"
"github.com/prysmaticlabs/prysm/v3/network"
"github.com/prysmaticlabs/prysm/v3/network/forks"
pb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1/metadata"
"github.com/prysmaticlabs/prysm/v3/runtime/version"
"github.com/prysmaticlabs/prysm/v3/time/slots"
"go.opencensus.io/trace"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb"
)
// A minimal client for peering with beacon nodes over libp2p and sending p2p RPC requests for data.
type client struct {
host host.Host
meta metadata.Metadata
beaconClient pb.BeaconChainClient
nodeClient pb.NodeClient
}
func newClient(beaconEndpoints []string, clientPort uint) (*client, error) {
ipAdd := ipAddr()
priv, err := privKey()
if err != nil {
return nil, errors.Wrap(err, "could not set up p2p private key")
}
meta, err := readMetadata()
if err != nil {
return nil, errors.Wrap(err, "could not set up p2p metadata")
}
listen, err := p2p.MultiAddressBuilder(ipAdd.String(), clientPort)
if err != nil {
return nil, errors.Wrap(err, "could not set up listening multiaddr")
}
options := []libp2p.Option{
privKeyOption(priv),
libp2p.ListenAddrs(listen),
libp2p.UserAgent(version.BuildData()),
libp2p.Transport(tcp.NewTCPTransport),
}
options = append(options, libp2p.Security(noise.ID, noise.New))
options = append(options, libp2p.Ping(false))
h, err := libp2p.New(options...)
if err != nil {
return nil, errors.Wrap(err, "could not start libp2p")
}
h.RemoveStreamHandler(identify.IDDelta)
if len(beaconEndpoints) == 0 {
return nil, errors.New("no specified beacon API endpoints")
}
conn, err := grpc.Dial(beaconEndpoints[0], grpc.WithInsecure())
if err != nil {
return nil, err
}
beaconClient := pb.NewBeaconChainClient(conn)
nodeClient := pb.NewNodeClient(conn)
return &client{
host: h,
meta: meta,
beaconClient: beaconClient,
nodeClient: nodeClient,
}, nil
}
func (c *client) Close() {
if err := c.host.Close(); err != nil {
panic(err)
}
}
func (c *client) Encoding() encoder.NetworkEncoding {
return &encoder.SszNetworkEncoder{}
}
func (c *client) MetadataSeq() uint64 {
return c.meta.SequenceNumber()
}
// Send a request to specific peer. The returned stream may be used for reading,
// but has been closed for writing.
// When done, the caller must Close() or Reset() on the stream.
func (c *client) Send(
ctx context.Context,
message interface{},
baseTopic string,
pid peer.ID,
) (corenet.Stream, error) {
ctx, span := trace.StartSpan(ctx, "p2p.Send")
defer span.End()
topic := baseTopic + c.Encoding().ProtocolSuffix()
span.AddAttributes(trace.StringAttribute("topic", topic))
// Apply max dial timeout when opening a new stream.
ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
defer cancel()
stream, err := c.host.NewStream(ctx, pid, protocol.ID(topic))
if err != nil {
tracing.AnnotateError(span, err)
return nil, errors.Wrap(err, "could not open new stream")
}
// do not encode anything if we are sending a metadata request
if baseTopic != p2p.RPCMetaDataTopicV1 && baseTopic != p2p.RPCMetaDataTopicV2 {
castedMsg, ok := message.(ssz.Marshaler)
if !ok {
return nil, errors.Errorf("%T does not support the ssz marshaller interface", message)
}
if _, err := c.Encoding().EncodeWithMaxLength(stream, castedMsg); err != nil {
tracing.AnnotateError(span, err)
_err := stream.Reset()
_ = _err
return nil, err
}
}
// Close stream for writing.
if err := stream.CloseWrite(); err != nil {
tracing.AnnotateError(span, err)
_err := stream.Reset()
_ = _err
return nil, errors.Wrap(err, "could not close write")
}
return stream, nil
}
func (c *client) retrievePeerAddressesViaRPC(ctx context.Context, beaconEndpoints []string) ([]string, error) {
if len(beaconEndpoints) == 0 {
return nil, errors.New("no beacon RPC endpoints specified")
}
peers := make([]string, 0)
for i := 0; i < len(beaconEndpoints); i++ {
conn, err := grpc.Dial(beaconEndpoints[i], grpc.WithInsecure())
if err != nil {
return nil, err
}
nodeClient := pb.NewNodeClient(conn)
hostData, err := nodeClient.GetHost(ctx, &emptypb.Empty{})
if err != nil {
return nil, err
}
if len(hostData.Addresses) == 0 {
continue
}
peers = append(peers, hostData.Addresses[0]+"/p2p/"+hostData.PeerId)
}
return peers, nil
}
func (c *client) initializeMockChainService(ctx context.Context) (*mockChain, error) {
genesisResp, err := c.nodeClient.GetGenesis(ctx, &emptypb.Empty{})
if err != nil {
return nil, err
}
currEpoch := slots.ToEpoch(slots.SinceGenesis(genesisResp.GenesisTime.AsTime()))
currFork, err := forks.Fork(currEpoch)
if err != nil {
return nil, err
}
return &mockChain{
genesisTime: genesisResp.GenesisTime.AsTime(),
currentFork: currFork,
genesisValsRoot: bytesutil.ToBytes32(genesisResp.GenesisValidatorsRoot),
}, nil
}
// Retrieves an external ipv4 address and converts into a libp2p formatted value.
func ipAddr() net.IP {
ip, err := network.ExternalIP()
if err != nil {
panic(err)
}
return net.ParseIP(ip)
}
// Determines a private key for p2p networking from the p2p service's
// configuration struct. If no key is found, it generates a new one.
func privKey() (*ecdsa.PrivateKey, error) {
priv, _, err := crypto.GenerateSecp256k1Key(rand.Reader)
if err != nil {
return nil, err
}
return ecdsaprysm.ConvertFromInterfacePrivKey(priv)
}
// Adds a private key to the libp2p option if the option was provided.
// If the private key file is missing or cannot be read, or if the
// private key contents cannot be marshaled, an exception is thrown.
func privKeyOption(privkey *ecdsa.PrivateKey) libp2p.Option {
return func(cfg *libp2p.Config) error {
ifaceKey, err := ecdsaprysm.ConvertToInterfacePrivkey(privkey)
if err != nil {
return err
}
return cfg.Apply(libp2p.Identity(ifaceKey))
}
}
func readMetadata() (metadata.Metadata, error) {
metaData := &pb.MetaDataV1{
SeqNumber: 0,
Attnets: bitfield.NewBitvector64(),
}
return wrapper.WrappedMetadataV1(metaData), nil
}