mirror of
https://gitlab.com/pulsechaincom/prysm-pulse.git
synced 2025-01-10 19:51:20 +00:00
918129cf36
* refactor initialization to blocking startup method * require genesisSetter in blockchain, fix tests * work-around gazelle weirdness * fix dep gazelle ignores * only call SetGenesis once * fix typo * validator test setup and fix to return right error * move waitForChainStart to Start * wire up sync Service.genesisWaiter * fix p2p genesisWaiter plumbing * remove extra clock type, integrate into genesis and rename * use time.Now when no Nower is specified * remove unused ClockSetter * simplify rpc context checking * fix typo * use clock everywhere in sync; [32]byte val root * don't use DeepEqual to compare [32]byte and []byte * don't use clock in init sync, not wired up yet * use clock waiter in blockchain as well * use cancelable contexts in tests with goroutines * missed a reference to WithClockSetter * Update beacon-chain/startup/genesis.go Co-authored-by: Radosław Kapka <rkapka@wp.pl> * Update beacon-chain/blockchain/service_test.go Co-authored-by: Radosław Kapka <rkapka@wp.pl> * more clear docs * doc for NewClock * move clock typedef to more logical file name * adding documentation * gaz * fixes for capella * reducing test raciness * fix races in committee cache tests * lint * add tests on Duration slot math helper * startup package test coverage * fix bad merge * set non-zero genesis time in tests that call Start * happy deepsource, happy me-epsource * replace Synced event with channel * remove unused error * remove accidental wip commit * gaz! * remove unused event constants * remove sync statefeed subscription to fix deadlock * remove state notifier * fix build --------- Co-authored-by: Kasey Kirkham <kasey@users.noreply.github.com> Co-authored-by: Radosław Kapka <rkapka@wp.pl> Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com> Co-authored-by: nisdas <nishdas93@gmail.com>
72 lines
2.2 KiB
Go
72 lines
2.2 KiB
Go
package sync
|
|
|
|
import (
|
|
"reflect"
|
|
"strings"
|
|
|
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
|
"github.com/pkg/errors"
|
|
ssz "github.com/prysmaticlabs/fastssz"
|
|
"github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p"
|
|
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
|
|
"google.golang.org/protobuf/proto"
|
|
)
|
|
|
|
var errNilPubsubMessage = errors.New("nil pubsub message")
|
|
var errInvalidTopic = errors.New("invalid topic format")
|
|
|
|
func (s *Service) decodePubsubMessage(msg *pubsub.Message) (ssz.Unmarshaler, error) {
|
|
if msg == nil || msg.Topic == nil || *msg.Topic == "" {
|
|
return nil, errNilPubsubMessage
|
|
}
|
|
topic := *msg.Topic
|
|
fDigest, err := p2p.ExtractGossipDigest(topic)
|
|
if err != nil {
|
|
return nil, errors.Wrapf(err, "extraction failed for topic: %s", topic)
|
|
}
|
|
topic = strings.TrimSuffix(topic, s.cfg.p2p.Encoding().ProtocolSuffix())
|
|
topic, err = s.replaceForkDigest(topic)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// Specially handle subnet messages.
|
|
switch {
|
|
case strings.Contains(topic, p2p.GossipAttestationMessage):
|
|
topic = p2p.GossipTypeMapping[reflect.TypeOf(ðpb.Attestation{})]
|
|
// Given that both sync message related subnets have the same message name, we have to
|
|
// differentiate them below.
|
|
case strings.Contains(topic, p2p.GossipSyncCommitteeMessage) && !strings.Contains(topic, p2p.SyncContributionAndProofSubnetTopicFormat):
|
|
topic = p2p.GossipTypeMapping[reflect.TypeOf(ðpb.SyncCommitteeMessage{})]
|
|
}
|
|
|
|
base := p2p.GossipTopicMappings(topic, 0)
|
|
if base == nil {
|
|
return nil, p2p.ErrMessageNotMapped
|
|
}
|
|
m, ok := proto.Clone(base).(ssz.Unmarshaler)
|
|
if !ok {
|
|
return nil, errors.Errorf("message of %T does not support marshaller interface", base)
|
|
}
|
|
// Handle different message types across forks.
|
|
if topic == p2p.BlockSubnetTopicFormat {
|
|
m, err = extractBlockDataType(fDigest[:], s.cfg.clock)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
if err := s.cfg.p2p.Encoding().DecodeGossip(msg.Data, m); err != nil {
|
|
return nil, err
|
|
}
|
|
return m, nil
|
|
}
|
|
|
|
// Replaces our fork digest with the formatter.
|
|
func (_ *Service) replaceForkDigest(topic string) (string, error) {
|
|
subStrings := strings.Split(topic, "/")
|
|
if len(subStrings) != 4 {
|
|
return "", errInvalidTopic
|
|
}
|
|
subStrings[2] = "%x"
|
|
return strings.Join(subStrings, "/"), nil
|
|
}
|