Bellatrix p2p changes (#10072)

* Add `process_execution_payload` to `process_block`

* bring in bellatrix p2p changes

* lint

* fix testcase to use MaxChunkSize as config to change allowed size instead of config

* add left out config params

* Fix TestForkSchedule_CorrectNumberOfForks

* fixed Nishant's comments

* gofmt

* add switch case

Co-authored-by: Mohamed Zahoor <zahoor@zahoor.in>
Co-authored-by: Zahoor Mohamed <zahoor@prysmaticlabs.com>
Co-authored-by: Nishant Das <nishdas93@gmail.com>
This commit is contained in:
terence tsao 2022-01-20 06:12:15 -08:00 committed by GitHub
parent af06bb9737
commit 288b38be8e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 143 additions and 30 deletions

View File

@ -16,7 +16,8 @@ import (
var _ NetworkEncoding = (*SszNetworkEncoder)(nil)
// MaxGossipSize allowed for gossip messages.
var MaxGossipSize = params.BeaconNetworkConfig().GossipMaxSize // 1 Mib
var MaxGossipSize = params.BeaconNetworkConfig().GossipMaxSize // 1 Mib.
var MaxChunkSize = params.BeaconNetworkConfig().MaxChunkSize // 1 Mib.
// This pool defines the sync pool for our buffered snappy writers, so that they
// can be constantly reused.
@ -59,11 +60,11 @@ func (_ SszNetworkEncoder) EncodeWithMaxLength(w io.Writer, msg fastssz.Marshale
if err != nil {
return 0, err
}
if uint64(len(b)) > params.BeaconNetworkConfig().MaxChunkSize {
if uint64(len(b)) > MaxChunkSize {
return 0, fmt.Errorf(
"size of encoded message is %d which is larger than the provided max limit of %d",
len(b),
params.BeaconNetworkConfig().MaxChunkSize,
MaxChunkSize,
)
}
// write varint first
@ -110,11 +111,11 @@ func (e SszNetworkEncoder) DecodeWithMaxLength(r io.Reader, to fastssz.Unmarshal
if err != nil {
return err
}
if msgLen > params.BeaconNetworkConfig().MaxChunkSize {
if msgLen > MaxChunkSize {
return fmt.Errorf(
"remaining bytes %d goes over the provided max limit of %d",
msgLen,
params.BeaconNetworkConfig().MaxChunkSize,
MaxChunkSize,
)
}
msgMax, err := e.MaxLength(msgLen)
@ -199,3 +200,13 @@ func newBufferedWriter(w io.Writer) *snappy.Writer {
bufW.Reset(w)
return bufW
}
// SetMaxGossipSizeForBellatrix sets the MaxGossipSize to 10Mb.
func SetMaxGossipSizeForBellatrix() {
MaxGossipSize = params.BeaconNetworkConfig().GossipMaxSizeBellatrix
}
// SetMaxChunkSizeForBellatrix sets the MaxChunkSize to 10Mb.
func SetMaxChunkSizeForBellatrix() {
MaxChunkSize = params.BeaconNetworkConfig().MaxChunkSizeBellatrix
}

View File

@ -77,10 +77,10 @@ func TestSszNetworkEncoder_EncodeWithMaxLength(t *testing.T) {
e := &encoder.SszNetworkEncoder{}
params.SetupTestConfigCleanup(t)
c := params.BeaconNetworkConfig()
c.MaxChunkSize = uint64(5)
encoder.MaxChunkSize = uint64(5)
params.OverrideBeaconNetworkConfig(c)
_, err := e.EncodeWithMaxLength(buf, msg)
wanted := fmt.Sprintf("which is larger than the provided max limit of %d", params.BeaconNetworkConfig().MaxChunkSize)
wanted := fmt.Sprintf("which is larger than the provided max limit of %d", encoder.MaxChunkSize)
assert.ErrorContains(t, wanted, err)
}
@ -95,7 +95,7 @@ func TestSszNetworkEncoder_DecodeWithMaxLength(t *testing.T) {
params.SetupTestConfigCleanup(t)
c := params.BeaconNetworkConfig()
maxChunkSize := uint64(5)
c.MaxChunkSize = maxChunkSize
encoder.MaxChunkSize = maxChunkSize
params.OverrideBeaconNetworkConfig(c)
_, err := e.EncodeGossip(buf, msg)
require.NoError(t, err)
@ -113,7 +113,7 @@ func TestSszNetworkEncoder_DecodeWithMultipleFrames(t *testing.T) {
c := params.BeaconNetworkConfig()
// 4 * 1 Mib
maxChunkSize := uint64(1 << 22)
c.MaxChunkSize = maxChunkSize
encoder.MaxChunkSize = maxChunkSize
params.OverrideBeaconNetworkConfig(c)
_, err := e.EncodeWithMaxLength(buf, st.InnerStateUnsafe().(*ethpb.BeaconState))
require.NoError(t, err)

View File

@ -1,6 +1,7 @@
package p2p
import (
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/encoder"
"github.com/prysmaticlabs/prysm/config/params"
"github.com/prysmaticlabs/prysm/time/slots"
)
@ -14,7 +15,8 @@ func (s *Service) forkWatcher() {
select {
case currSlot := <-slotTicker.C():
currEpoch := slots.ToEpoch(currSlot)
if currEpoch == params.BeaconConfig().AltairForkEpoch {
if currEpoch == params.BeaconConfig().AltairForkEpoch ||
currEpoch == params.BeaconConfig().BellatrixForkEpoch {
// If we are in the fork epoch, we update our enr with
// the updated fork digest. These repeatedly does
// this over the epoch, which might be slightly wasteful
@ -23,6 +25,12 @@ func (s *Service) forkWatcher() {
if err != nil {
log.WithError(err).Error("Could not add fork entry")
}
// from Bellatrix Epoch, the MaxGossipSize and the MaxChunkSize is changed to 10Mb.
if currEpoch == params.BeaconConfig().BellatrixForkEpoch {
encoder.SetMaxGossipSizeForBellatrix()
encoder.SetMaxChunkSizeForBellatrix()
}
}
case <-s.ctx.Done():
log.Debug("Context closed, exiting goroutine")

View File

@ -25,8 +25,13 @@ var gossipTopicMappings = map[string]proto.Message{
// GossipTopicMappings is a function to return the assigned data type
// versioned by epoch.
func GossipTopicMappings(topic string, epoch types.Epoch) proto.Message {
if topic == BlockSubnetTopicFormat && epoch >= params.BeaconConfig().AltairForkEpoch {
return &ethpb.SignedBeaconBlockAltair{}
if topic == BlockSubnetTopicFormat {
if epoch >= params.BeaconConfig().BellatrixForkEpoch {
return &ethpb.SignedBeaconBlockMerge{}
}
if epoch >= params.BeaconConfig().AltairForkEpoch {
return &ethpb.SignedBeaconBlockAltair{}
}
}
return gossipTopicMappings[topic]
}
@ -51,4 +56,5 @@ func init() {
}
// Specially handle Altair Objects.
GossipTypeMapping[reflect.TypeOf(&ethpb.SignedBeaconBlockAltair{})] = BlockSubnetTopicFormat
GossipTypeMapping[reflect.TypeOf(&ethpb.SignedBeaconBlockMerge{})] = BlockSubnetTopicFormat
}

View File

@ -24,18 +24,28 @@ func TestMappingHasNoDuplicates(t *testing.T) {
func TestGossipTopicMappings_CorrectBlockType(t *testing.T) {
params.SetupTestConfigCleanup(t)
bCfg := params.BeaconConfig()
forkEpoch := eth2types.Epoch(100)
bCfg.AltairForkEpoch = forkEpoch
altairForkEpoch := eth2types.Epoch(100)
BellatrixForkEpoch := eth2types.Epoch(200)
bCfg.AltairForkEpoch = altairForkEpoch
bCfg.BellatrixForkEpoch = BellatrixForkEpoch
bCfg.ForkVersionSchedule[bytesutil.ToBytes4(bCfg.AltairForkVersion)] = eth2types.Epoch(100)
bCfg.ForkVersionSchedule[bytesutil.ToBytes4(bCfg.BellatrixForkVersion)] = eth2types.Epoch(200)
params.OverrideBeaconConfig(bCfg)
// Before Fork
// Phase 0
pMessage := GossipTopicMappings(BlockSubnetTopicFormat, 0)
_, ok := pMessage.(*ethpb.SignedBeaconBlock)
assert.Equal(t, true, ok)
// After Fork
pMessage = GossipTopicMappings(BlockSubnetTopicFormat, forkEpoch)
// Altair Fork
pMessage = GossipTopicMappings(BlockSubnetTopicFormat, altairForkEpoch)
_, ok = pMessage.(*ethpb.SignedBeaconBlockAltair)
assert.Equal(t, true, ok)
// Bellatrix Fork
pMessage = GossipTopicMappings(BlockSubnetTopicFormat, BellatrixForkEpoch)
_, ok = pMessage.(*ethpb.SignedBeaconBlockMerge)
assert.Equal(t, true, ok)
}

View File

@ -2,6 +2,7 @@ package p2p
import (
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
types "github.com/prysmaticlabs/eth2-types"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/encoder"
"github.com/prysmaticlabs/prysm/config/params"
"github.com/prysmaticlabs/prysm/crypto/hash"
@ -45,7 +46,7 @@ func MsgID(genesisValidatorsRoot []byte, pmsg *pubsub_pb.Message) string {
return string(msg)
}
if fEpoch >= params.BeaconConfig().AltairForkEpoch {
return altairMsgID(pmsg)
return postAltairMsgID(pmsg, fEpoch)
}
decodedData, err := encoder.DecodeSnappy(pmsg.Data, params.BeaconNetworkConfig().GossipMaxSize)
if err != nil {
@ -69,12 +70,18 @@ func MsgID(genesisValidatorsRoot []byte, pmsg *pubsub_pb.Message) string {
// + message.topic + snappy_decompress(message.data))[:20]. Otherwise, set message-id to the first 20 bytes of the SHA256 hash of the concatenation
// of the following data: MESSAGE_DOMAIN_INVALID_SNAPPY, the length of the topic byte string (encoded as little-endian uint64),
// the topic byte string, and the raw message data: i.e. SHA256(MESSAGE_DOMAIN_INVALID_SNAPPY + uint_to_bytes(uint64(len(message.topic))) + message.topic + message.data)[:20].
func altairMsgID(pmsg *pubsub_pb.Message) string {
func postAltairMsgID(pmsg *pubsub_pb.Message, fEpoch types.Epoch) string {
topic := *pmsg.Topic
topicLen := uint64(len(topic))
topicLenBytes := bytesutil.Uint64ToBytesLittleEndian(topicLen)
decodedData, err := encoder.DecodeSnappy(pmsg.Data, params.BeaconNetworkConfig().GossipMaxSize)
// beyond merge epoch, allow 10 Mib gossip data size
gossipPubSubSize := params.BeaconNetworkConfig().GossipMaxSize
if fEpoch >= params.BeaconConfig().BellatrixForkEpoch {
gossipPubSubSize = params.BeaconNetworkConfig().GossipMaxSizeBellatrix
}
decodedData, err := encoder.DecodeSnappy(pmsg.Data, gossipPubSubSize)
if err != nil {
totalLength := len(params.BeaconNetworkConfig().MessageDomainInvalidSnappy) + len(topicLenBytes) + int(topicLen) + len(pmsg.Data)
combinedData := make([]byte, 0, totalLength)

View File

@ -64,6 +64,35 @@ func TestMessageIDFunction_HashesCorrectlyAltair(t *testing.T) {
assert.Equal(t, msgID, p2p.MsgID(genesisValidatorsRoot, nMsg), "Got incorrect msg id")
}
func TestMessageIDFunction_HashesCorrectlyBellatrix(t *testing.T) {
genesisValidatorsRoot := bytesutil.PadTo([]byte{'A'}, 32)
d, err := signing.ComputeForkDigest(params.BeaconConfig().BellatrixForkVersion, genesisValidatorsRoot)
assert.NoError(t, err)
tpc := fmt.Sprintf(p2p.BlockSubnetTopicFormat, d)
topicLen := uint64(len(tpc))
topicLenBytes := bytesutil.Uint64ToBytesLittleEndian(topicLen)
invalidSnappy := [32]byte{'J', 'U', 'N', 'K'}
pMsg := &pubsubpb.Message{Data: invalidSnappy[:], Topic: &tpc}
// Create object to hash
combinedObj := append(params.BeaconNetworkConfig().MessageDomainInvalidSnappy[:], topicLenBytes...)
combinedObj = append(combinedObj, tpc...)
combinedObj = append(combinedObj, pMsg.Data...)
hashedData := hash.Hash(combinedObj)
msgID := string(hashedData[:20])
assert.Equal(t, msgID, p2p.MsgID(genesisValidatorsRoot, pMsg), "Got incorrect msg id")
validObj := [32]byte{'v', 'a', 'l', 'i', 'd'}
enc := snappy.Encode(nil, validObj[:])
nMsg := &pubsubpb.Message{Data: enc, Topic: &tpc}
// Create object to hash
combinedObj = append(params.BeaconNetworkConfig().MessageDomainValidSnappy[:], topicLenBytes...)
combinedObj = append(combinedObj, tpc...)
combinedObj = append(combinedObj, validObj[:]...)
hashedData = hash.Hash(combinedObj)
msgID = string(hashedData[:20])
assert.Equal(t, msgID, p2p.MsgID(genesisValidatorsRoot, nMsg), "Got incorrect msg id")
}
func TestMsgID_WithNilTopic(t *testing.T) {
msg := &pubsubpb.Message{
Data: make([]byte, 32),

View File

@ -37,19 +37,30 @@ func (s *Service) CanSubscribe(topic string) bool {
if parts[1] != "eth2" {
return false
}
fd, err := s.currentForkDigest()
phase0ForkDigest, err := s.currentForkDigest()
if err != nil {
log.WithError(err).Error("Could not determine fork digest")
return false
}
digest, err := forks.ForkDigestFromEpoch(params.BeaconConfig().AltairForkEpoch, s.genesisValidatorsRoot)
altairForkDigest, err := forks.ForkDigestFromEpoch(params.BeaconConfig().AltairForkEpoch, s.genesisValidatorsRoot)
if err != nil {
log.WithError(err).Error("Could not determine next fork digest")
log.WithError(err).Error("Could not determine altair fork digest")
return false
}
if parts[2] != fmt.Sprintf("%x", fd) && parts[2] != fmt.Sprintf("%x", digest) {
bellatrixForkDigest, err := forks.ForkDigestFromEpoch(params.BeaconConfig().BellatrixForkEpoch, s.genesisValidatorsRoot)
if err != nil {
log.WithError(err).Error("Could not determine merge fork digest")
return false
}
switch parts[2] {
case fmt.Sprintf("%x", phase0ForkDigest):
case fmt.Sprintf("%x", altairForkDigest):
case fmt.Sprintf("%x", bellatrixForkDigest):
default:
return false
}
if parts[4] != encoder.ProtocolSuffixSSZSnappy {
return false
}

View File

@ -102,7 +102,7 @@ func TestTopicFromMessage_CorrectType(t *testing.T) {
assert.Equal(t, SchemaVersionV1, version)
}
// After Fork
// Altair Fork
for m := range messageMapping {
topic, err := TopicFromMessage(m, forkEpoch)
assert.NoError(t, err)

View File

@ -234,6 +234,10 @@ func (s *Service) Start() {
// current epoch.
s.RefreshENR()
// if the current epoch is beyond bellatrix, increase the
// MaxGossipSize and MaxChunkSize to 10Mb.
s.increaseMaxMessageSizesForBellatrix()
// Periodic functions.
async.RunEvery(s.ctx, params.BeaconNetworkConfig().TtfbTimeout, func() {
ensurePeerConnections(s.ctx, s.host, peersToWatch...)
@ -491,3 +495,14 @@ func (s *Service) connectToBootnodes() error {
func (s *Service) isInitialized() bool {
return !s.genesisTime.IsZero() && len(s.genesisValidatorsRoot) == 32
}
// increaseMaxMessageSizesForBellatrix increases the max sizes of gossip and chunk from 1 Mb to 10Mb,
// if the current epoch is or above the configured BellatrixForkEpoch.
func (s *Service) increaseMaxMessageSizesForBellatrix() {
currentSlot := slots.Since(s.genesisTime)
currentEpoch := slots.ToEpoch(currentSlot)
if currentEpoch >= params.BeaconConfig().BellatrixForkEpoch {
encoder.SetMaxGossipSizeForBellatrix()
encoder.SetMaxChunkSizeForBellatrix()
}
}

View File

@ -38,6 +38,9 @@ func InitializeDataMaps() {
bytesutil.ToBytes4(params.BeaconConfig().AltairForkVersion): func() (block.SignedBeaconBlock, error) {
return wrapper.WrappedAltairSignedBeaconBlock(&ethpb.SignedBeaconBlockAltair{Block: &ethpb.BeaconBlockAltair{}})
},
bytesutil.ToBytes4(params.BeaconConfig().BellatrixForkVersion): func() (block.SignedBeaconBlock, error) {
return wrapper.WrappedMergeSignedBeaconBlock(&ethpb.SignedBeaconBlockMerge{Block: &ethpb.BeaconBlockMerge{}})
},
}
// Reset our metadata map.
@ -48,5 +51,8 @@ func InitializeDataMaps() {
bytesutil.ToBytes4(params.BeaconConfig().AltairForkVersion): func() metadata.Metadata {
return wrapper.WrappedMetadataV1(&ethpb.MetaDataV1{})
},
bytesutil.ToBytes4(params.BeaconConfig().BellatrixForkVersion): func() metadata.Metadata {
return wrapper.WrappedMetadataV1(&ethpb.MetaDataV1{})
},
}
}

View File

@ -399,5 +399,5 @@ func TestForkSchedule_CorrectNumberOfForks(t *testing.T) {
resp, err := s.GetForkSchedule(context.Background(), &emptypb.Empty{})
require.NoError(t, err)
// Genesis and Altair.
assert.Equal(t, 2, len(resp.Data))
assert.Equal(t, 3, len(resp.Data))
}

View File

@ -195,4 +195,6 @@ func (b *BeaconChainConfig) InitializeForkSchedule() {
b.ForkVersionSchedule[bytesutil.ToBytes4(b.GenesisForkVersion)] = b.GenesisEpoch
// Set Altair fork data.
b.ForkVersionSchedule[bytesutil.ToBytes4(b.AltairForkVersion)] = b.AltairForkEpoch
// Set Bellatrix fork data.
b.ForkVersionSchedule[bytesutil.ToBytes4(b.BellatrixForkVersion)] = b.BellatrixForkEpoch
}

View File

@ -23,11 +23,15 @@ const (
genesisForkEpoch = 0
// Altair Fork Epoch for mainnet config.
mainnetAltairForkEpoch = 74240 // Oct 27, 2021, 10:56:23am UTC
// Placeholder for the merge epoch until it is decided
mainnetBellatrixForkEpoch = math.MaxUint64
)
var mainnetNetworkConfig = &NetworkConfig{
GossipMaxSize: 1 << 20, // 1 MiB
MaxChunkSize: 1 << 20, // 1 MiB
GossipMaxSize: 1 << 20, // 1 MiB
GossipMaxSizeBellatrix: 10 * 1 << 20, // 10 MiB
MaxChunkSize: 1 << 20, // 1 MiB
MaxChunkSizeBellatrix: 10 * 1 << 20, // 10 MiB
AttestationSubnetCount: 64,
AttestationPropagationSlotRange: 32,
MaxRequestBlocks: 1 << 10, // 1024
@ -199,6 +203,7 @@ var mainnetBeaconConfig = &BeaconChainConfig{
ForkVersionSchedule: map[[4]byte]types.Epoch{
{0, 0, 0, 0}: genesisForkEpoch,
{1, 0, 0, 0}: mainnetAltairForkEpoch,
{2, 0, 0, 0}: mainnetBellatrixForkEpoch,
// Any further forks must be specified here by their epoch number.
},
@ -237,6 +242,6 @@ var mainnetBeaconConfig = &BeaconChainConfig{
// Light client
MinSyncCommitteeParticipants: 1,
// Merge
// Bellatrix
TerminalBlockHashActivationEpoch: math.MaxUint64,
}

View File

@ -96,6 +96,7 @@ func MinimalSpecConfig() *BeaconChainConfig {
minimalConfig.ForkVersionSchedule = map[[4]byte]types.Epoch{
{0, 0, 0, 1}: 0,
{1, 0, 0, 1}: math.MaxUint64,
{2, 0, 0, 1}: math.MaxUint64,
}
minimalConfig.SyncCommitteeSize = 32
minimalConfig.InactivityScoreBias = 4

View File

@ -10,7 +10,9 @@ import (
// NetworkConfig defines the spec based network parameters.
type NetworkConfig struct {
GossipMaxSize uint64 `yaml:"GOSSIP_MAX_SIZE"` // GossipMaxSize is the maximum allowed size of uncompressed gossip messages.
MaxChunkSize uint64 `yaml:"MAX_CHUNK_SIZE"` // MaxChunkSize is the the maximum allowed size of uncompressed req/resp chunked responses.
GossipMaxSizeBellatrix uint64 `yaml:"GOSSIP_MAX_SIZE_BELLATRIX"` // GossipMaxSizeBellatrix is the maximum allowed size of uncompressed gossip messages after the bellatrix epoch.
MaxChunkSize uint64 `yaml:"MAX_CHUNK_SIZE"` // MaxChunkSize is the maximum allowed size of uncompressed req/resp chunked responses.
MaxChunkSizeBellatrix uint64 `yaml:"MAX_CHUNK_SIZE_BELLATRIX"` // MaxChunkSizeBellatrix is the maximum allowed size of uncompressed req/resp chunked responses after the bellatrix epoch.
AttestationSubnetCount uint64 `yaml:"ATTESTATION_SUBNET_COUNT"` // AttestationSubnetCount is the number of attestation subnets used in the gossipsub protocol.
AttestationPropagationSlotRange types.Slot `yaml:"ATTESTATION_PROPAGATION_SLOT_RANGE"` // AttestationPropagationSlotRange is the maximum number of slots during which an attestation can be propagated.
MaxRequestBlocks uint64 `yaml:"MAX_REQUEST_BLOCKS"` // MaxRequestBlocks is the maximum number of blocks in a single request.