From b9f9cf0b2c70c4db2c0f97dbdeff4e7835ad50e7 Mon Sep 17 00:00:00 2001 From: shayzluf Date: Wed, 18 Sep 2019 02:44:51 +0530 Subject: [PATCH] Handle blocks after chain start (#3486) --- beacon-chain/sync/metrics.go | 7 +++++++ beacon-chain/sync/service.go | 2 ++ beacon-chain/sync/subscriber.go | 18 +++++++++++++++++- beacon-chain/sync/subscriber_test.go | 3 ++- 4 files changed, 28 insertions(+), 2 deletions(-) diff --git a/beacon-chain/sync/metrics.go b/beacon-chain/sync/metrics.go index 36c0a6ecc..1e1f2e7a1 100644 --- a/beacon-chain/sync/metrics.go +++ b/beacon-chain/sync/metrics.go @@ -15,4 +15,11 @@ var ( }, []string{"topic"}, ) + messageReceivedBeforeChainStartCounter = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "p2p_message_recieved_before_chain_start", + Help: "Count of messages received before chain started.", + }, + []string{"topic"}, + ) ) diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go index 41c205c94..f3b75d8a0 100644 --- a/beacon-chain/sync/service.go +++ b/beacon-chain/sync/service.go @@ -29,6 +29,7 @@ type blockchainService interface { blockchain.HeadFetcher blockchain.FinalizationFetcher blockchain.AttestationReceiver + blockchain.ChainFeeds } // NewRegularSync service. @@ -58,6 +59,7 @@ type RegularSync struct { chain blockchainService helloTracker map[peer.ID]*pb.Hello helloTrackerLock sync.RWMutex + chainStarted bool } // Start the regular sync service. diff --git a/beacon-chain/sync/subscriber.go b/beacon-chain/sync/subscriber.go index adc6f8003..8f84243b2 100644 --- a/beacon-chain/sync/subscriber.go +++ b/beacon-chain/sync/subscriber.go @@ -9,6 +9,7 @@ import ( "github.com/gogo/protobuf/proto" "github.com/prysmaticlabs/prysm/beacon-chain/p2p" + "github.com/prysmaticlabs/prysm/shared/roughtime" "go.opencensus.io/trace" ) @@ -38,6 +39,18 @@ func noopValidator(_ context.Context, _ proto.Message, _ p2p.Broadcaster, _ bool // Register PubSub subscribers func (r *RegularSync) registerSubscribers() { + go func() { + ch := make(chan time.Time) + sub := r.chain.StateInitializedFeed().Subscribe(ch) + defer sub.Unsubscribe() + + // Wait until chain start. + genesis := <-ch + if genesis.After(roughtime.Now()) { + time.Sleep(roughtime.Until(genesis)) + } + r.chainStarted = true + }() r.subscribe( "/eth2/beacon_block", r.validateBeaconBlockPubSub, @@ -132,7 +145,10 @@ func (r *RegularSync) subscribe(topic string, validate validator, handle subHand // TODO(3147): Mark status unhealthy. return } - + if !r.chainStarted { + messageReceivedBeforeChainStartCounter.WithLabelValues(topic + r.p2p.Encoding().ProtocolSuffix()).Inc() + continue + } // Special validation occurs on messages received from ourselves. fromSelf := msg.GetFrom() == r.p2p.PeerID() diff --git a/beacon-chain/sync/subscriber_test.go b/beacon-chain/sync/subscriber_test.go index 75b1c7621..d763c1b1c 100644 --- a/beacon-chain/sync/subscriber_test.go +++ b/beacon-chain/sync/subscriber_test.go @@ -31,6 +31,7 @@ func TestSubscribe_ReceivesValidMessage(t *testing.T) { wg.Done() return nil }) + r.chainStarted = true p2p.ReceivePubSub(topic, &pb.VoluntaryExit{Epoch: 55}) @@ -54,7 +55,7 @@ func TestSubscribe_HandlesPanic(t *testing.T) { defer wg.Done() panic("bad") }) - + r.chainStarted = true p2p.ReceivePubSub(topic, &pb.VoluntaryExit{Epoch: 55}) if testutil.WaitTimeout(&wg, time.Second) {