Register Subscribers After Node Is Synced (#7468)

* wait for synced

* fix again

* add test

* fix all

* fixes deepsource reported issue

Co-authored-by: Victor Farazdagi <simple.square@gmail.com>
This commit is contained in:
Nishant Das 2020-10-10 16:50:28 +08:00 committed by GitHub
parent 4c09e59b3b
commit 43765b5cb0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 215 additions and 66 deletions

View File

@ -118,6 +118,7 @@ go_test(
"//beacon-chain/sync:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//shared/bytesutil:go_default_library",
"//shared/event:go_default_library",
"//shared/featureconfig:go_default_library",
"//shared/hashutil:go_default_library",
"//shared/params:go_default_library",

View File

@ -2,7 +2,6 @@ package initialsync
import (
"context"
"fmt"
"testing"
eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
@ -333,6 +332,7 @@ func TestService_processBlock(t *testing.T) {
Epoch: 0,
},
},
StateNotifier: &mock.MockStateNotifier{},
})
ctx := context.Background()
genesis := makeGenesisTime(32)
@ -392,6 +392,7 @@ func TestService_processBlockBatch(t *testing.T) {
Epoch: 0,
},
},
StateNotifier: &mock.MockStateNotifier{},
})
ctx := context.Background()
genesis := makeGenesisTime(32)
@ -439,8 +440,7 @@ func TestService_processBlockBatch(t *testing.T) {
ctx context.Context, blocks []*eth.SignedBeaconBlock, blockRoots [][32]byte) error {
return nil
})
expectedErr := fmt.Sprintf("no good blocks in batch")
assert.ErrorContains(t, expectedErr, err)
assert.ErrorContains(t, "no good blocks in batch", err)
var badBatch2 []*eth.SignedBeaconBlock
for i, b := range batch2 {

View File

@ -54,13 +54,14 @@ type Service struct {
stateNotifier statefeed.Notifier
counter *ratecounter.RateCounter
lastProcessedSlot uint64
genesisChan chan time.Time
}
// NewService configures the initial sync service responsible for bringing the node up to the
// latest head of the blockchain.
func NewService(ctx context.Context, cfg *Config) *Service {
ctx, cancel := context.WithCancel(ctx)
return &Service{
s := &Service{
ctx: ctx,
cancel: cancel,
chain: cfg.Chain,
@ -68,13 +69,18 @@ func NewService(ctx context.Context, cfg *Config) *Service {
db: cfg.DB,
stateNotifier: cfg.StateNotifier,
counter: ratecounter.NewRateCounter(counterSeconds * time.Second),
genesisChan: make(chan time.Time),
}
go s.waitForStateInitialization()
return s
}
// Start the initial sync service.
func (s *Service) Start() {
genesis, err := s.waitForStateInitialization()
if err != nil {
// Wait for state initialized event.
genesis := <-s.genesisChan
if genesis.IsZero() {
log.Debug("Exiting Initial Sync Service")
return
}
if flags.Get().DisableSync {
@ -169,15 +175,7 @@ func (s *Service) waitForMinimumPeers() {
// waitForStateInitialization makes sure that beacon node is ready to be accessed: it is either
// already properly configured or system waits up until state initialized event is triggered.
func (s *Service) waitForStateInitialization() (time.Time, error) {
headState, err := s.chain.HeadState(s.ctx)
if err != nil {
return time.Time{}, err
}
if headState != nil {
return time.Unix(int64(headState.GenesisTime()), 0), nil
}
func (s *Service) waitForStateInitialization() {
// Wait for state to be initialized.
stateChannel := make(chan *feed.Event, 1)
stateSub := s.stateNotifier.StateFeed().Subscribe(stateChannel)
@ -193,14 +191,19 @@ func (s *Service) waitForStateInitialization() (time.Time, error) {
continue
}
log.WithField("starttime", data.StartTime).Debug("Received state initialized event")
return data.StartTime, nil
s.genesisChan <- data.StartTime
return
}
case <-s.ctx.Done():
log.Debug("Context closed, exiting goroutine")
return time.Time{}, errors.New("context closed")
// Send a zero time in the event we are exiting.
s.genesisChan <- time.Time{}
return
case err := <-stateSub.Err():
log.WithError(err).Error("Subscription to state notifier failed")
return time.Time{}, err
// Send a zero time in the event we are exiting.
s.genesisChan <- time.Time{}
return
}
}
}

View File

@ -14,6 +14,7 @@ import (
dbtest "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/flags"
p2pt "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing"
"github.com/prysmaticlabs/prysm/shared/event"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/testutil"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
@ -32,6 +33,7 @@ func TestService_InitStartStop(t *testing.T) {
tests := []struct {
name string
assert func()
methodRuns func(fd *event.Feed)
chainService func() *mock.ChainService
}{
{
@ -45,7 +47,7 @@ func TestService_InitStartStop(t *testing.T) {
chainService: func() *mock.ChainService {
// Set to future time (genesis time hasn't arrived yet).
st := testutil.NewBeaconState()
require.NoError(t, st.SetGenesisTime(uint64(time.Unix(4113849600, 0).Unix())))
return &mock.ChainService{
State: st,
FinalizedCheckPoint: &eth.Checkpoint{
@ -53,9 +55,19 @@ func TestService_InitStartStop(t *testing.T) {
},
}
},
methodRuns: func(fd *event.Feed) {
// Send valid event.
fd.Send(&feed.Event{
Type: statefeed.Initialized,
Data: &statefeed.InitializedData{
StartTime: time.Unix(4113849600, 0),
GenesisValidatorsRoot: make([]byte, 32),
},
})
},
assert: func() {
assert.LogsContain(t, hook, "Genesis time has not arrived - not syncing")
assert.LogsDoNotContain(t, hook, "Waiting for state to be initialized")
assert.LogsContain(t, hook, "Waiting for state to be initialized")
},
},
{
@ -63,7 +75,6 @@ func TestService_InitStartStop(t *testing.T) {
chainService: func() *mock.ChainService {
// Set to nearby slot.
st := testutil.NewBeaconState()
require.NoError(t, st.SetGenesisTime(uint64(time.Now().Add(-5*time.Minute).Unix())))
return &mock.ChainService{
State: st,
FinalizedCheckPoint: &eth.Checkpoint{
@ -71,10 +82,20 @@ func TestService_InitStartStop(t *testing.T) {
},
}
},
methodRuns: func(fd *event.Feed) {
// Send valid event.
fd.Send(&feed.Event{
Type: statefeed.Initialized,
Data: &statefeed.InitializedData{
StartTime: time.Now().Add(-5 * time.Minute),
GenesisValidatorsRoot: make([]byte, 32),
},
})
},
assert: func() {
assert.LogsContain(t, hook, "Chain started within the last epoch - not syncing")
assert.LogsDoNotContain(t, hook, "Genesis time has not arrived - not syncing")
assert.LogsDoNotContain(t, hook, "Waiting for state to be initialized")
assert.LogsContain(t, hook, "Waiting for state to be initialized")
},
},
{
@ -83,7 +104,6 @@ func TestService_InitStartStop(t *testing.T) {
// Set to some future slot, and then make sure that current head matches it.
st := testutil.NewBeaconState()
futureSlot := uint64(27354)
require.NoError(t, st.SetGenesisTime(uint64(makeGenesisTime(futureSlot).Unix())))
require.NoError(t, st.SetSlot(futureSlot))
return &mock.ChainService{
State: st,
@ -92,19 +112,33 @@ func TestService_InitStartStop(t *testing.T) {
},
}
},
methodRuns: func(fd *event.Feed) {
futureSlot := uint64(27354)
// Send valid event.
fd.Send(&feed.Event{
Type: statefeed.Initialized,
Data: &statefeed.InitializedData{
StartTime: makeGenesisTime(futureSlot),
GenesisValidatorsRoot: make([]byte, 32),
},
})
},
assert: func() {
assert.LogsContain(t, hook, "Starting initial chain sync...")
assert.LogsContain(t, hook, "Already synced to the current chain head")
assert.LogsDoNotContain(t, hook, "Chain started within the last epoch - not syncing")
assert.LogsDoNotContain(t, hook, "Genesis time has not arrived - not syncing")
assert.LogsDoNotContain(t, hook, "Waiting for state to be initialized")
assert.LogsContain(t, hook, "Waiting for state to be initialized")
},
},
}
p := p2pt.NewTestP2P(t)
connectPeers(t, p, []*peerData{}, p.Peers())
for _, tt := range tests {
for i, tt := range tests {
if i == 0 {
continue
}
t.Run(tt.name, func(t *testing.T) {
defer hook.Reset()
ctx, cancel := context.WithCancel(context.Background())
@ -114,12 +148,18 @@ func TestService_InitStartStop(t *testing.T) {
if tt.chainService != nil {
mc = tt.chainService()
}
// Initialize feed
notifier := &mock.MockStateNotifier{}
s := NewService(ctx, &Config{
P2P: p,
Chain: mc,
StateNotifier: mc.StateNotifier(),
StateNotifier: notifier,
})
time.Sleep(500 * time.Millisecond)
assert.NotNil(t, s)
if tt.methodRuns != nil {
tt.methodRuns(notifier.StateFeed())
}
wg := &sync.WaitGroup{}
wg.Add(1)
@ -127,14 +167,15 @@ func TestService_InitStartStop(t *testing.T) {
s.Start()
wg.Done()
}()
go func() {
// Allow to exit from test (on no head loop waiting for head is started).
// In most tests, this is redundant, as Start() already exited.
time.AfterFunc(500*time.Millisecond, func() {
time.AfterFunc(3*time.Second, func() {
cancel()
})
}()
if testutil.WaitTimeout(wg, time.Second*2) {
if testutil.WaitTimeout(wg, time.Second*4) {
t.Fatalf("Test should have exited by now, timed out")
}
tt.assert()
@ -153,28 +194,6 @@ func TestService_waitForStateInitialization(t *testing.T) {
return s
}
t.Run("head state exists", func(t *testing.T) {
defer hook.Reset()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mc := &mock.ChainService{
State: testutil.NewBeaconState(),
FinalizedCheckPoint: &eth.Checkpoint{
Epoch: 0,
},
}
s := newService(ctx, mc)
expectedGenesisTime := time.Unix(25000, 0)
var receivedGenesisTime time.Time
require.NoError(t, mc.State.SetGenesisTime(uint64(expectedGenesisTime.Unix())))
receivedGenesisTime, err := s.waitForStateInitialization()
assert.NoError(t, err)
assert.Equal(t, expectedGenesisTime, receivedGenesisTime)
assert.LogsDoNotContain(t, hook, "Waiting for state to be initialized")
})
t.Run("no state and context close", func(t *testing.T) {
defer hook.Reset()
ctx, cancel := context.WithCancel(context.Background())
@ -184,8 +203,9 @@ func TestService_waitForStateInitialization(t *testing.T) {
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
_, err := s.waitForStateInitialization()
assert.ErrorContains(t, "context closed", err)
go s.waitForStateInitialization()
currTime := <-s.genesisChan
assert.Equal(t, true, currTime.IsZero())
wg.Done()
}()
go func() {
@ -213,9 +233,9 @@ func TestService_waitForStateInitialization(t *testing.T) {
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
var err error
receivedGenesisTime, err = s.waitForStateInitialization()
assert.NoError(t, err)
go s.waitForStateInitialization()
receivedGenesisTime = <-s.genesisChan
assert.Equal(t, false, receivedGenesisTime.IsZero())
wg.Done()
}()
go func() {
@ -245,6 +265,46 @@ func TestService_waitForStateInitialization(t *testing.T) {
assert.LogsContain(t, hook, "Received state initialized event")
assert.LogsDoNotContain(t, hook, "Context closed, exiting goroutine")
})
t.Run("no state and state init event received and service start", func(t *testing.T) {
defer hook.Reset()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s := newService(ctx, &mock.ChainService{})
// Initialize mock feed
_ = s.stateNotifier.StateFeed()
expectedGenesisTime := time.Now().Add(60 * time.Second)
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
s.waitForStateInitialization()
wg.Done()
}()
wg.Add(1)
go func() {
time.AfterFunc(500*time.Millisecond, func() {
// Send valid event.
s.stateNotifier.StateFeed().Send(&feed.Event{
Type: statefeed.Initialized,
Data: &statefeed.InitializedData{
StartTime: expectedGenesisTime,
GenesisValidatorsRoot: make([]byte, 32),
},
})
})
s.Start()
wg.Done()
}()
if testutil.WaitTimeout(wg, time.Second*3) {
t.Fatalf("Test should have exited by now, timed out")
}
assert.LogsContain(t, hook, "Waiting for state to be initialized")
assert.LogsContain(t, hook, "Received state initialized event")
assert.LogsDoNotContain(t, hook, "Context closed, exiting goroutine")
})
}
func TestService_markSynced(t *testing.T) {

View File

@ -234,27 +234,38 @@ func (s *Service) registerHandlers() {
stateChannel := make(chan *feed.Event, 1)
stateSub := s.stateNotifier.StateFeed().Subscribe(stateChannel)
defer stateSub.Unsubscribe()
for !s.chainStarted {
for {
select {
case event := <-stateChannel:
if event.Type == statefeed.Initialized {
switch event.Type {
case statefeed.Initialized:
data, ok := event.Data.(*statefeed.InitializedData)
if !ok {
log.Error("Event feed data is not type *statefeed.InitializedData")
return
}
log.WithField("starttime", data.StartTime).Debug("Received state initialized event")
startTime := data.StartTime
log.WithField("starttime", startTime).Debug("Received state initialized event")
// Register respective rpc and pubsub handlers at state initialized event.
// Register respective rpc handlers at state initialized event.
s.registerRPCHandlers()
s.registerSubscribers()
if data.StartTime.After(timeutils.Now()) {
stateSub.Unsubscribe()
time.Sleep(timeutils.Until(data.StartTime))
// Wait for chainstart in separate routine.
go func() {
if startTime.After(timeutils.Now()) {
time.Sleep(timeutils.Until(startTime))
}
log.WithField("starttime", startTime).Debug("Chain started in sync service")
s.chainStarted = true
}()
case statefeed.Synced:
_, ok := event.Data.(*statefeed.SyncedData)
if !ok {
log.Error("Event feed data is not type *statefeed.SyncedData")
return
}
log.WithField("starttime", data.StartTime).Debug("Chain started in sync service")
s.chainStarted = true
// Register respective pubsub handlers at state synced event.
s.registerSubscribers()
return
}
case <-s.ctx.Done():
log.Debug("Context closed, exiting goroutine")

View File

@ -2,6 +2,7 @@ package sync
import (
"context"
"sync"
"testing"
"time"
@ -74,3 +75,76 @@ func TestSyncHandlers_WaitToSync(t *testing.T) {
time.Sleep(400 * time.Millisecond)
require.Equal(t, true, r.chainStarted, "Did not receive chain start event.")
}
func TestSyncHandlers_WaitTillSynced(t *testing.T) {
p2p := p2ptest.NewTestP2P(t)
chainService := &mockChain.ChainService{
Genesis: time.Now(),
ValidatorsRoot: [32]byte{'A'},
}
r := Service{
ctx: context.Background(),
p2p: p2p,
chain: chainService,
stateNotifier: chainService.StateNotifier(),
initialSync: &mockSync.Sync{IsSyncing: false},
}
topic := "/eth2/%x/beacon_block"
go r.registerHandlers()
time.Sleep(100 * time.Millisecond)
i := r.stateNotifier.StateFeed().Send(&feed.Event{
Type: statefeed.Initialized,
Data: &statefeed.InitializedData{
StartTime: time.Now(),
},
})
if i == 0 {
t.Fatal("didn't send genesis time to subscribers")
}
b := []byte("sk")
b32 := bytesutil.ToBytes32(b)
sk, err := bls.SecretKeyFromBytes(b32[:])
require.NoError(t, err)
msg := testutil.NewBeaconBlock()
msg.Block.ParentRoot = testutil.Random32Bytes(t)
msg.Signature = sk.Sign([]byte("data")).Marshal()
p2p.Digest, err = r.forkDigest()
r.blockNotifier = chainService.BlockNotifier()
blockChan := make(chan feed.Event, 1)
sub := r.blockNotifier.BlockFeed().Subscribe(blockChan)
require.NoError(t, err)
p2p.ReceivePubSub(topic, msg)
// wait for chainstart to be sent
time.Sleep(2 * time.Second)
require.Equal(t, true, r.chainStarted, "Did not receive chain start event.")
assert.Equal(t, 0, len(blockChan), "block was received by sync service despite not being fully synced")
i = r.stateNotifier.StateFeed().Send(&feed.Event{
Type: statefeed.Synced,
Data: &statefeed.SyncedData{
StartTime: time.Now(),
},
})
if i == 0 {
t.Fatal("didn't send genesis time to sync event subscribers")
}
wg := new(sync.WaitGroup)
wg.Add(1)
go func() {
// Wait for block to be received by service.
<-blockChan
wg.Done()
sub.Unsubscribe()
}()
p2p.ReceivePubSub(topic, msg)
// wait for message to be sent
testutil.WaitTimeout(wg, 2*time.Second)
}