Fixes race condition at genesis (#5016)

* fixes race condition at genesis
This commit is contained in:
Victor Farazdagi 2020-03-06 00:20:38 +03:00 committed by GitHub
parent 6158a648cd
commit b4aaa610a1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 7 additions and 2 deletions

View File

@ -73,10 +73,11 @@ func (s *Service) IsValidAttestation(ctx context.Context, att *ethpb.Attestation
} }
// This processes attestations from the attestation pool to account for validator votes and fork choice. // This processes attestations from the attestation pool to account for validator votes and fork choice.
func (s *Service) processAttestation() { func (s *Service) processAttestation(subscribedToStateEvents chan struct{}) {
// Wait for state to be initialized. // Wait for state to be initialized.
stateChannel := make(chan *feed.Event, 1) stateChannel := make(chan *feed.Event, 1)
stateSub := s.stateNotifier.StateFeed().Subscribe(stateChannel) stateSub := s.stateNotifier.StateFeed().Subscribe(stateChannel)
subscribedToStateEvents <- struct{}{}
<-stateChannel <-stateChannel
stateSub.Unsubscribe() stateSub.Unsubscribe()

View File

@ -137,6 +137,9 @@ func (s *Service) Start() {
} }
} }
// Make sure that attestation processor is subscribed and ready for state initializing event.
attestationProcessorSubscribed := make(chan struct{}, 1)
// If the chain has already been initialized, simply start the block processing routine. // If the chain has already been initialized, simply start the block processing routine.
if beaconState != nil { if beaconState != nil {
log.Info("Blockchain data already exists in DB, initializing...") log.Info("Blockchain data already exists in DB, initializing...")
@ -183,6 +186,7 @@ func (s *Service) Start() {
stateChannel := make(chan *feed.Event, 1) stateChannel := make(chan *feed.Event, 1)
stateSub := s.stateNotifier.StateFeed().Subscribe(stateChannel) stateSub := s.stateNotifier.StateFeed().Subscribe(stateChannel)
defer stateSub.Unsubscribe() defer stateSub.Unsubscribe()
<-attestationProcessorSubscribed
for { for {
select { select {
case event := <-stateChannel: case event := <-stateChannel:
@ -203,7 +207,7 @@ func (s *Service) Start() {
}() }()
} }
go s.processAttestation() go s.processAttestation(attestationProcessorSubscribed)
} }
// processChainStartTime initializes a series of deposits from the ChainStart deposits in the eth1 // processChainStartTime initializes a series of deposits from the ChainStart deposits in the eth1