mirror of
https://gitlab.com/pulsechaincom/prysm-pulse.git
synced 2024-12-22 03:30:35 +00:00
Register RPC And Pubsub Handlers After Genesis is Determined (#6020)
* fix * fix test
This commit is contained in:
parent
a139b75dc5
commit
979c0074c7
@ -15,6 +15,7 @@ import (
|
||||
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
|
||||
blockfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/block"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed/operation"
|
||||
statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state"
|
||||
@ -27,6 +28,7 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/state/stategen"
|
||||
"github.com/prysmaticlabs/prysm/shared"
|
||||
"github.com/prysmaticlabs/prysm/shared/roughtime"
|
||||
"github.com/prysmaticlabs/prysm/shared/runutil"
|
||||
)
|
||||
|
||||
@ -131,8 +133,7 @@ func NewRegularSync(cfg *Config) *Service {
|
||||
blocksRateLimiter: leakybucket.NewCollector(allowedBlocksPerSecond, allowedBlocksBurst, false /* deleteEmptyBuckets */),
|
||||
}
|
||||
|
||||
r.registerRPCHandlers()
|
||||
go r.registerSubscribers()
|
||||
go r.registerHandlers()
|
||||
|
||||
return r
|
||||
}
|
||||
@ -209,6 +210,40 @@ func (r *Service) initCaches() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Service) registerHandlers() {
|
||||
// Wait until chain start.
|
||||
stateChannel := make(chan *feed.Event, 1)
|
||||
stateSub := r.stateNotifier.StateFeed().Subscribe(stateChannel)
|
||||
defer stateSub.Unsubscribe()
|
||||
for r.chainStarted == false {
|
||||
select {
|
||||
case event := <-stateChannel:
|
||||
if event.Type == statefeed.Initialized {
|
||||
data, ok := event.Data.(*statefeed.InitializedData)
|
||||
if !ok {
|
||||
log.Error("Event feed data is not type *statefeed.InitializedData")
|
||||
return
|
||||
}
|
||||
log.WithField("starttime", data.StartTime).Debug("Received state initialized event")
|
||||
if data.StartTime.After(roughtime.Now()) {
|
||||
stateSub.Unsubscribe()
|
||||
time.Sleep(roughtime.Until(data.StartTime))
|
||||
}
|
||||
r.chainStarted = true
|
||||
}
|
||||
case <-r.ctx.Done():
|
||||
log.Debug("Context closed, exiting goroutine")
|
||||
return
|
||||
case err := <-stateSub.Err():
|
||||
log.WithError(err).Error("Subscription to state notifier failed")
|
||||
return
|
||||
}
|
||||
}
|
||||
// Register respective rpc and pubsub handlers.
|
||||
r.registerRPCHandlers()
|
||||
r.registerSubscribers()
|
||||
}
|
||||
|
||||
// Checker defines a struct which can verify whether a node is currently
|
||||
// synchronizing a chain with the rest of peers in the network.
|
||||
type Checker interface {
|
||||
|
@ -1,14 +1,21 @@
|
||||
package sync
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
|
||||
mockChain "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
|
||||
statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state"
|
||||
p2ptest "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing"
|
||||
stateTrie "github.com/prysmaticlabs/prysm/beacon-chain/state"
|
||||
mockSync "github.com/prysmaticlabs/prysm/beacon-chain/sync/initial-sync/testing"
|
||||
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
|
||||
"github.com/prysmaticlabs/prysm/shared/bls"
|
||||
"github.com/prysmaticlabs/prysm/shared/bytesutil"
|
||||
"github.com/prysmaticlabs/prysm/shared/testutil"
|
||||
)
|
||||
|
||||
func TestService_StatusZeroEpoch(t *testing.T) {
|
||||
@ -31,3 +38,51 @@ func TestService_StatusZeroEpoch(t *testing.T) {
|
||||
t.Errorf("Wanted non failing status but got: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSyncHandlers_WaitToSync(t *testing.T) {
|
||||
p2p := p2ptest.NewTestP2P(t)
|
||||
chainService := &mockChain.ChainService{
|
||||
Genesis: time.Now(),
|
||||
ValidatorsRoot: [32]byte{'A'},
|
||||
}
|
||||
r := Service{
|
||||
ctx: context.Background(),
|
||||
p2p: p2p,
|
||||
chain: chainService,
|
||||
stateNotifier: chainService.StateNotifier(),
|
||||
initialSync: &mockSync.Sync{IsSyncing: false},
|
||||
}
|
||||
|
||||
topic := "/eth2/%x/beacon_block"
|
||||
go r.registerHandlers()
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
i := r.stateNotifier.StateFeed().Send(&feed.Event{
|
||||
Type: statefeed.Initialized,
|
||||
Data: &statefeed.InitializedData{
|
||||
StartTime: time.Now(),
|
||||
},
|
||||
})
|
||||
if i == 0 {
|
||||
t.Fatal("didn't send genesis time to subscribers")
|
||||
}
|
||||
b := []byte("sk")
|
||||
b32 := bytesutil.ToBytes32(b)
|
||||
sk, err := bls.SecretKeyFromBytes(b32[:])
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
msg := ðpb.SignedBeaconBlock{
|
||||
Block: ðpb.BeaconBlock{
|
||||
ParentRoot: testutil.Random32Bytes(t),
|
||||
},
|
||||
Signature: sk.Sign([]byte("data")).Marshal(),
|
||||
}
|
||||
p2p.ReceivePubSub(topic, msg)
|
||||
// wait for chainstart to be sent
|
||||
time.Sleep(400 * time.Millisecond)
|
||||
if !r.chainStarted {
|
||||
t.Fatal("Did not receive chain start event.")
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -13,13 +13,11 @@ import (
|
||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||
pb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
|
||||
statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
|
||||
"github.com/prysmaticlabs/prysm/shared/featureconfig"
|
||||
"github.com/prysmaticlabs/prysm/shared/messagehandler"
|
||||
"github.com/prysmaticlabs/prysm/shared/p2putils"
|
||||
"github.com/prysmaticlabs/prysm/shared/params"
|
||||
"github.com/prysmaticlabs/prysm/shared/roughtime"
|
||||
"github.com/prysmaticlabs/prysm/shared/sliceutil"
|
||||
"github.com/prysmaticlabs/prysm/shared/slotutil"
|
||||
"github.com/prysmaticlabs/prysm/shared/traceutil"
|
||||
@ -46,34 +44,6 @@ func (r *Service) noopValidator(ctx context.Context, _ peer.ID, msg *pubsub.Mess
|
||||
|
||||
// Register PubSub subscribers
|
||||
func (r *Service) registerSubscribers() {
|
||||
// Wait until chain start.
|
||||
stateChannel := make(chan *feed.Event, 1)
|
||||
stateSub := r.stateNotifier.StateFeed().Subscribe(stateChannel)
|
||||
defer stateSub.Unsubscribe()
|
||||
for r.chainStarted == false {
|
||||
select {
|
||||
case event := <-stateChannel:
|
||||
if event.Type == statefeed.Initialized {
|
||||
data, ok := event.Data.(*statefeed.InitializedData)
|
||||
if !ok {
|
||||
log.Error("Event feed data is not type *statefeed.InitializedData")
|
||||
return
|
||||
}
|
||||
log.WithField("starttime", data.StartTime).Debug("Received state initialized event")
|
||||
if data.StartTime.After(roughtime.Now()) {
|
||||
stateSub.Unsubscribe()
|
||||
time.Sleep(roughtime.Until(data.StartTime))
|
||||
}
|
||||
r.chainStarted = true
|
||||
}
|
||||
case <-r.ctx.Done():
|
||||
log.Debug("Context closed, exiting goroutine")
|
||||
return
|
||||
case err := <-stateSub.Err():
|
||||
log.WithError(err).Error("Subscription to state notifier failed")
|
||||
return
|
||||
}
|
||||
}
|
||||
r.subscribe(
|
||||
"/eth2/%x/beacon_block",
|
||||
r.validateBeaconBlockPubSub,
|
||||
|
@ -13,14 +13,11 @@ import (
|
||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||
pb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
|
||||
mockChain "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
|
||||
statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state"
|
||||
db "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/operations/slashings"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
|
||||
p2ptest "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing"
|
||||
mockSync "github.com/prysmaticlabs/prysm/beacon-chain/sync/initial-sync/testing"
|
||||
"github.com/prysmaticlabs/prysm/shared/bls"
|
||||
"github.com/prysmaticlabs/prysm/shared/bytesutil"
|
||||
"github.com/prysmaticlabs/prysm/shared/params"
|
||||
"github.com/prysmaticlabs/prysm/shared/testutil"
|
||||
@ -189,54 +186,6 @@ func TestSubscribe_ReceivesProposerSlashing(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestSubscribe_WaitToSync(t *testing.T) {
|
||||
p2p := p2ptest.NewTestP2P(t)
|
||||
chainService := &mockChain.ChainService{
|
||||
Genesis: time.Now(),
|
||||
ValidatorsRoot: [32]byte{'A'},
|
||||
}
|
||||
r := Service{
|
||||
ctx: context.Background(),
|
||||
p2p: p2p,
|
||||
chain: chainService,
|
||||
stateNotifier: chainService.StateNotifier(),
|
||||
initialSync: &mockSync.Sync{IsSyncing: false},
|
||||
}
|
||||
|
||||
topic := "/eth2/%x/beacon_block"
|
||||
go r.registerSubscribers()
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
i := r.stateNotifier.StateFeed().Send(&feed.Event{
|
||||
Type: statefeed.Initialized,
|
||||
Data: &statefeed.InitializedData{
|
||||
StartTime: time.Now(),
|
||||
},
|
||||
})
|
||||
if i == 0 {
|
||||
t.Fatal("didn't send genesis time to subscribers")
|
||||
}
|
||||
b := []byte("sk")
|
||||
b32 := bytesutil.ToBytes32(b)
|
||||
sk, err := bls.SecretKeyFromBytes(b32[:])
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
msg := &pb.SignedBeaconBlock{
|
||||
Block: &pb.BeaconBlock{
|
||||
ParentRoot: testutil.Random32Bytes(t),
|
||||
},
|
||||
Signature: sk.Sign([]byte("data")).Marshal(),
|
||||
}
|
||||
p2p.ReceivePubSub(topic, msg)
|
||||
// wait for chainstart to be sent
|
||||
time.Sleep(400 * time.Millisecond)
|
||||
if !r.chainStarted {
|
||||
t.Fatal("Did not receive chain start event.")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestSubscribe_HandlesPanic(t *testing.T) {
|
||||
p := p2ptest.NewTestP2P(t)
|
||||
r := Service{
|
||||
|
Loading…
Reference in New Issue
Block a user