From 979c0074c70db22b8fd8c0a05851763944bb8f5d Mon Sep 17 00:00:00 2001 From: Nishant Das Date: Thu, 28 May 2020 13:39:40 +0800 Subject: [PATCH] Register RPC And Pubsub Handlers After Genesis is Determined (#6020) * fix * fix test --- beacon-chain/sync/service.go | 39 +++++++++++++++++++- beacon-chain/sync/service_test.go | 55 ++++++++++++++++++++++++++++ beacon-chain/sync/subscriber.go | 30 --------------- beacon-chain/sync/subscriber_test.go | 51 -------------------------- 4 files changed, 92 insertions(+), 83 deletions(-) diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go index 508f81185..c55a79dce 100644 --- a/beacon-chain/sync/service.go +++ b/beacon-chain/sync/service.go @@ -15,6 +15,7 @@ import ( ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" "github.com/prysmaticlabs/prysm/beacon-chain/blockchain" "github.com/prysmaticlabs/prysm/beacon-chain/cache" + "github.com/prysmaticlabs/prysm/beacon-chain/core/feed" blockfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/block" "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/operation" statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state" @@ -27,6 +28,7 @@ import ( "github.com/prysmaticlabs/prysm/beacon-chain/p2p" "github.com/prysmaticlabs/prysm/beacon-chain/state/stategen" "github.com/prysmaticlabs/prysm/shared" + "github.com/prysmaticlabs/prysm/shared/roughtime" "github.com/prysmaticlabs/prysm/shared/runutil" ) @@ -131,8 +133,7 @@ func NewRegularSync(cfg *Config) *Service { blocksRateLimiter: leakybucket.NewCollector(allowedBlocksPerSecond, allowedBlocksBurst, false /* deleteEmptyBuckets */), } - r.registerRPCHandlers() - go r.registerSubscribers() + go r.registerHandlers() return r } @@ -209,6 +210,40 @@ func (r *Service) initCaches() error { return nil } +func (r *Service) registerHandlers() { + // Wait until chain start. + stateChannel := make(chan *feed.Event, 1) + stateSub := r.stateNotifier.StateFeed().Subscribe(stateChannel) + defer stateSub.Unsubscribe() + for r.chainStarted == false { + select { + case event := <-stateChannel: + if event.Type == statefeed.Initialized { + data, ok := event.Data.(*statefeed.InitializedData) + if !ok { + log.Error("Event feed data is not type *statefeed.InitializedData") + return + } + log.WithField("starttime", data.StartTime).Debug("Received state initialized event") + if data.StartTime.After(roughtime.Now()) { + stateSub.Unsubscribe() + time.Sleep(roughtime.Until(data.StartTime)) + } + r.chainStarted = true + } + case <-r.ctx.Done(): + log.Debug("Context closed, exiting goroutine") + return + case err := <-stateSub.Err(): + log.WithError(err).Error("Subscription to state notifier failed") + return + } + } + // Register respective rpc and pubsub handlers. + r.registerRPCHandlers() + r.registerSubscribers() +} + // Checker defines a struct which can verify whether a node is currently // synchronizing a chain with the rest of peers in the network. type Checker interface { diff --git a/beacon-chain/sync/service_test.go b/beacon-chain/sync/service_test.go index d9cf9c48b..485a70a16 100644 --- a/beacon-chain/sync/service_test.go +++ b/beacon-chain/sync/service_test.go @@ -1,14 +1,21 @@ package sync import ( + "context" "testing" "time" + ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" mockChain "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing" + "github.com/prysmaticlabs/prysm/beacon-chain/core/feed" + statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state" p2ptest "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing" stateTrie "github.com/prysmaticlabs/prysm/beacon-chain/state" mockSync "github.com/prysmaticlabs/prysm/beacon-chain/sync/initial-sync/testing" pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" + "github.com/prysmaticlabs/prysm/shared/bls" + "github.com/prysmaticlabs/prysm/shared/bytesutil" + "github.com/prysmaticlabs/prysm/shared/testutil" ) func TestService_StatusZeroEpoch(t *testing.T) { @@ -31,3 +38,51 @@ func TestService_StatusZeroEpoch(t *testing.T) { t.Errorf("Wanted non failing status but got: %v", err) } } + +func TestSyncHandlers_WaitToSync(t *testing.T) { + p2p := p2ptest.NewTestP2P(t) + chainService := &mockChain.ChainService{ + Genesis: time.Now(), + ValidatorsRoot: [32]byte{'A'}, + } + r := Service{ + ctx: context.Background(), + p2p: p2p, + chain: chainService, + stateNotifier: chainService.StateNotifier(), + initialSync: &mockSync.Sync{IsSyncing: false}, + } + + topic := "/eth2/%x/beacon_block" + go r.registerHandlers() + time.Sleep(100 * time.Millisecond) + i := r.stateNotifier.StateFeed().Send(&feed.Event{ + Type: statefeed.Initialized, + Data: &statefeed.InitializedData{ + StartTime: time.Now(), + }, + }) + if i == 0 { + t.Fatal("didn't send genesis time to subscribers") + } + b := []byte("sk") + b32 := bytesutil.ToBytes32(b) + sk, err := bls.SecretKeyFromBytes(b32[:]) + if err != nil { + t.Fatal(err) + } + + msg := ðpb.SignedBeaconBlock{ + Block: ðpb.BeaconBlock{ + ParentRoot: testutil.Random32Bytes(t), + }, + Signature: sk.Sign([]byte("data")).Marshal(), + } + p2p.ReceivePubSub(topic, msg) + // wait for chainstart to be sent + time.Sleep(400 * time.Millisecond) + if !r.chainStarted { + t.Fatal("Did not receive chain start event.") + } + +} diff --git a/beacon-chain/sync/subscriber.go b/beacon-chain/sync/subscriber.go index 1db859529..b8e3a3d03 100644 --- a/beacon-chain/sync/subscriber.go +++ b/beacon-chain/sync/subscriber.go @@ -13,13 +13,11 @@ import ( pubsub "github.com/libp2p/go-libp2p-pubsub" pb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" "github.com/prysmaticlabs/prysm/beacon-chain/core/feed" - statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state" "github.com/prysmaticlabs/prysm/beacon-chain/p2p" "github.com/prysmaticlabs/prysm/shared/featureconfig" "github.com/prysmaticlabs/prysm/shared/messagehandler" "github.com/prysmaticlabs/prysm/shared/p2putils" "github.com/prysmaticlabs/prysm/shared/params" - "github.com/prysmaticlabs/prysm/shared/roughtime" "github.com/prysmaticlabs/prysm/shared/sliceutil" "github.com/prysmaticlabs/prysm/shared/slotutil" "github.com/prysmaticlabs/prysm/shared/traceutil" @@ -46,34 +44,6 @@ func (r *Service) noopValidator(ctx context.Context, _ peer.ID, msg *pubsub.Mess // Register PubSub subscribers func (r *Service) registerSubscribers() { - // Wait until chain start. - stateChannel := make(chan *feed.Event, 1) - stateSub := r.stateNotifier.StateFeed().Subscribe(stateChannel) - defer stateSub.Unsubscribe() - for r.chainStarted == false { - select { - case event := <-stateChannel: - if event.Type == statefeed.Initialized { - data, ok := event.Data.(*statefeed.InitializedData) - if !ok { - log.Error("Event feed data is not type *statefeed.InitializedData") - return - } - log.WithField("starttime", data.StartTime).Debug("Received state initialized event") - if data.StartTime.After(roughtime.Now()) { - stateSub.Unsubscribe() - time.Sleep(roughtime.Until(data.StartTime)) - } - r.chainStarted = true - } - case <-r.ctx.Done(): - log.Debug("Context closed, exiting goroutine") - return - case err := <-stateSub.Err(): - log.WithError(err).Error("Subscription to state notifier failed") - return - } - } r.subscribe( "/eth2/%x/beacon_block", r.validateBeaconBlockPubSub, diff --git a/beacon-chain/sync/subscriber_test.go b/beacon-chain/sync/subscriber_test.go index 226267bb1..12c4ea213 100644 --- a/beacon-chain/sync/subscriber_test.go +++ b/beacon-chain/sync/subscriber_test.go @@ -13,14 +13,11 @@ import ( pubsub "github.com/libp2p/go-libp2p-pubsub" pb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" mockChain "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing" - "github.com/prysmaticlabs/prysm/beacon-chain/core/feed" - statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state" db "github.com/prysmaticlabs/prysm/beacon-chain/db/testing" "github.com/prysmaticlabs/prysm/beacon-chain/operations/slashings" "github.com/prysmaticlabs/prysm/beacon-chain/p2p" p2ptest "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing" mockSync "github.com/prysmaticlabs/prysm/beacon-chain/sync/initial-sync/testing" - "github.com/prysmaticlabs/prysm/shared/bls" "github.com/prysmaticlabs/prysm/shared/bytesutil" "github.com/prysmaticlabs/prysm/shared/params" "github.com/prysmaticlabs/prysm/shared/testutil" @@ -189,54 +186,6 @@ func TestSubscribe_ReceivesProposerSlashing(t *testing.T) { } } -func TestSubscribe_WaitToSync(t *testing.T) { - p2p := p2ptest.NewTestP2P(t) - chainService := &mockChain.ChainService{ - Genesis: time.Now(), - ValidatorsRoot: [32]byte{'A'}, - } - r := Service{ - ctx: context.Background(), - p2p: p2p, - chain: chainService, - stateNotifier: chainService.StateNotifier(), - initialSync: &mockSync.Sync{IsSyncing: false}, - } - - topic := "/eth2/%x/beacon_block" - go r.registerSubscribers() - time.Sleep(100 * time.Millisecond) - i := r.stateNotifier.StateFeed().Send(&feed.Event{ - Type: statefeed.Initialized, - Data: &statefeed.InitializedData{ - StartTime: time.Now(), - }, - }) - if i == 0 { - t.Fatal("didn't send genesis time to subscribers") - } - b := []byte("sk") - b32 := bytesutil.ToBytes32(b) - sk, err := bls.SecretKeyFromBytes(b32[:]) - if err != nil { - t.Fatal(err) - } - - msg := &pb.SignedBeaconBlock{ - Block: &pb.BeaconBlock{ - ParentRoot: testutil.Random32Bytes(t), - }, - Signature: sk.Sign([]byte("data")).Marshal(), - } - p2p.ReceivePubSub(topic, msg) - // wait for chainstart to be sent - time.Sleep(400 * time.Millisecond) - if !r.chainStarted { - t.Fatal("Did not receive chain start event.") - } - -} - func TestSubscribe_HandlesPanic(t *testing.T) { p := p2ptest.NewTestP2P(t) r := Service{