Handle blocks after chain start (#3486)

This commit is contained in:
shayzluf 2019-09-18 02:44:51 +05:30 committed by terence tsao
parent b1b76ac87c
commit b9f9cf0b2c
4 changed files with 28 additions and 2 deletions

View File

@ -15,4 +15,11 @@ var (
},
[]string{"topic"},
)
messageReceivedBeforeChainStartCounter = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "p2p_message_recieved_before_chain_start",
Help: "Count of messages received before chain started.",
},
[]string{"topic"},
)
)

View File

@ -29,6 +29,7 @@ type blockchainService interface {
blockchain.HeadFetcher
blockchain.FinalizationFetcher
blockchain.AttestationReceiver
blockchain.ChainFeeds
}
// NewRegularSync service.
@ -58,6 +59,7 @@ type RegularSync struct {
chain blockchainService
helloTracker map[peer.ID]*pb.Hello
helloTrackerLock sync.RWMutex
chainStarted bool
}
// Start the regular sync service.

View File

@ -9,6 +9,7 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/shared/roughtime"
"go.opencensus.io/trace"
)
@ -38,6 +39,18 @@ func noopValidator(_ context.Context, _ proto.Message, _ p2p.Broadcaster, _ bool
// Register PubSub subscribers
func (r *RegularSync) registerSubscribers() {
go func() {
ch := make(chan time.Time)
sub := r.chain.StateInitializedFeed().Subscribe(ch)
defer sub.Unsubscribe()
// Wait until chain start.
genesis := <-ch
if genesis.After(roughtime.Now()) {
time.Sleep(roughtime.Until(genesis))
}
r.chainStarted = true
}()
r.subscribe(
"/eth2/beacon_block",
r.validateBeaconBlockPubSub,
@ -132,7 +145,10 @@ func (r *RegularSync) subscribe(topic string, validate validator, handle subHand
// TODO(3147): Mark status unhealthy.
return
}
if !r.chainStarted {
messageReceivedBeforeChainStartCounter.WithLabelValues(topic + r.p2p.Encoding().ProtocolSuffix()).Inc()
continue
}
// Special validation occurs on messages received from ourselves.
fromSelf := msg.GetFrom() == r.p2p.PeerID()

View File

@ -31,6 +31,7 @@ func TestSubscribe_ReceivesValidMessage(t *testing.T) {
wg.Done()
return nil
})
r.chainStarted = true
p2p.ReceivePubSub(topic, &pb.VoluntaryExit{Epoch: 55})
@ -54,7 +55,7 @@ func TestSubscribe_HandlesPanic(t *testing.T) {
defer wg.Done()
panic("bad")
})
r.chainStarted = true
p2p.ReceivePubSub(topic, &pb.VoluntaryExit{Epoch: 55})
if testutil.WaitTimeout(&wg, time.Second) {