prysm-pulse/cmd/prysmctl/p2p/handler.go
Raul Jordan 100ca0ebaf
Prysmctl Command to Request Beacon Nodes for Block Ranges Over P2P (#11035)
* first

* attempt p2p connect send tool

* attempt

* stream registration

* trying to register

* attempt

* workinnnn

* begin

* p2p prysmctl tool

* ignore

* fix

* delete deprecated

* p2p smaller iface surface area

* further p2p refactor

* gaz

* better logging

* process

* all functionality

* fix up

* rhandle

* v2 req

* cmd

* send sub

* v1 handle

* show head slot

* cmd

* cmd lib

* gazelle fix

* bazel

* gaz

* work on the handshake items

* prevent dial to self

* add config awareness

* gaz

* inferring host addrs from p2p

* initialize data mappings

* add own mock

* fix up logic

* gaz

* add img

* gaz

* add images

* builds

* builds

* nishant feedback:

Co-authored-by: Nishant Das <nishdas93@gmail.com>
2022-08-17 06:38:57 +00:00

103 lines
3.4 KiB
Go

package p2p
import (
"context"
"reflect"
"runtime/debug"
"strings"
libp2pcore "github.com/libp2p/go-libp2p-core"
corenet "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/protocol"
ssz "github.com/prysmaticlabs/fastssz"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p"
p2ptypes "github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/types"
)
type rpcHandler func(context.Context, interface{}, libp2pcore.Stream) error
// registerRPC for a given topic with an expected protobuf message type.
func (c *client) registerRPCHandler(baseTopic string, handle rpcHandler) {
topic := baseTopic + c.Encoding().ProtocolSuffix()
c.host.SetStreamHandler(protocol.ID(topic), func(stream corenet.Stream) {
defer func() {
if r := recover(); r != nil {
log.WithField("error", r).Error("Panic occurred")
log.Errorf("%s", debug.Stack())
}
}()
// Resetting after closing is a no-op so defer a reset in case something goes wrong.
// It's up to the handler to Close the stream (send an EOF) if
// it successfully writes a response. We don't blindly call
// Close here because we may have only written a partial
// response.
defer func() {
_err := stream.Reset()
_ = _err
}()
log.WithField("peer", stream.Conn().RemotePeer().Pretty()).WithField("topic", string(stream.Protocol()))
base, ok := p2p.RPCTopicMappings[baseTopic]
if !ok {
log.Errorf("Could not retrieve base message for topic %s", baseTopic)
return
}
t := reflect.TypeOf(base)
// Copy Base
base = reflect.New(t)
// since metadata requests do not have any data in the payload, we
// do not decode anything.
if baseTopic == p2p.RPCMetaDataTopicV1 || baseTopic == p2p.RPCMetaDataTopicV2 {
if err := handle(context.Background(), base, stream); err != nil {
if err != p2ptypes.ErrWrongForkDigestVersion {
log.WithError(err).Debug("Could not handle p2p RPC")
}
}
return
}
// Given we have an input argument that can be pointer or the actual object, this gives us
// a way to check for its reflect.Kind and based on the result, we can decode
// accordingly.
if t.Kind() == reflect.Ptr {
msg, ok := reflect.New(t.Elem()).Interface().(ssz.Unmarshaler)
if !ok {
log.Errorf("message of %T does not support marshaller interface", msg)
return
}
if err := c.Encoding().DecodeWithMaxLength(stream, msg); err != nil {
// Debug logs for goodbye/status errors
if strings.Contains(topic, p2p.RPCGoodByeTopicV1) || strings.Contains(topic, p2p.RPCStatusTopicV1) {
log.WithError(err).Debug("Could not decode goodbye stream message")
return
}
log.WithError(err).Debug("Could not decode stream message")
return
}
if err := handle(context.Background(), msg, stream); err != nil {
if err != p2ptypes.ErrWrongForkDigestVersion {
log.WithError(err).Debug("Could not handle p2p RPC")
}
}
} else {
nTyp := reflect.New(t)
msg, ok := nTyp.Interface().(ssz.Unmarshaler)
if !ok {
log.Errorf("message of %T does not support marshaller interface", msg)
return
}
if err := c.Encoding().DecodeWithMaxLength(stream, msg); err != nil {
log.WithError(err).Debug("Could not decode stream message")
return
}
if err := handle(context.Background(), nTyp.Elem().Interface(), stream); err != nil {
if err != p2ptypes.ErrWrongForkDigestVersion {
log.WithError(err).Debug("Could not handle p2p RPC")
}
}
}
})
}