mirror of
https://gitlab.com/pulsechaincom/prysm-pulse.git
synced 2024-12-22 19:40:37 +00:00
Fix Deadlock in StreamChainHead (#12250)
* fix it possibly * buffer it more * fix test
This commit is contained in:
parent
0325741318
commit
37182168e3
@ -346,12 +346,19 @@ func (bs *Server) StreamBlocks(req *ethpb.StreamBlocksRequest, stream ethpb.Beac
|
||||
// StreamChainHead to clients every single time the head block and state of the chain change.
|
||||
// DEPRECATED: This endpoint is superseded by the /eth/v1/events Beacon API endpoint
|
||||
func (bs *Server) StreamChainHead(_ *emptypb.Empty, stream ethpb.BeaconChain_StreamChainHeadServer) error {
|
||||
stateChannel := make(chan *feed.Event, 1)
|
||||
stateChannel := make(chan *feed.Event, 4)
|
||||
stateSub := bs.StateNotifier.StateFeed().Subscribe(stateChannel)
|
||||
defer stateSub.Unsubscribe()
|
||||
for {
|
||||
select {
|
||||
case stateEvent := <-stateChannel:
|
||||
// In the event our node is in sync mode
|
||||
// we do not send the chainhead to the caller
|
||||
// due to the possibility of deadlocks when retrieving
|
||||
// all the chain related data.
|
||||
if bs.SyncChecker.Syncing() {
|
||||
continue
|
||||
}
|
||||
if stateEvent.Type == statefeed.BlockProcessed {
|
||||
res, err := bs.chainHeadRetrieval(stream.Context())
|
||||
if err != nil {
|
||||
|
@ -13,6 +13,7 @@ import (
|
||||
statefeed "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed/state"
|
||||
dbTest "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/testing"
|
||||
state_native "github.com/prysmaticlabs/prysm/v4/beacon-chain/state/state-native"
|
||||
mockSync "github.com/prysmaticlabs/prysm/v4/beacon-chain/sync/initial-sync/testing"
|
||||
"github.com/prysmaticlabs/prysm/v4/config/features"
|
||||
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
|
||||
"github.com/prysmaticlabs/prysm/v4/config/params"
|
||||
@ -287,6 +288,7 @@ func TestServer_StreamChainHead_OnHeadUpdated(t *testing.T) {
|
||||
CurrentJustifiedCheckPoint: s.CurrentJustifiedCheckpoint(),
|
||||
PreviousJustifiedCheckPoint: s.PreviousJustifiedCheckpoint()},
|
||||
OptimisticModeFetcher: &chainMock.ChainService{},
|
||||
SyncChecker: &mockSync.Sync{IsSyncing: false},
|
||||
}
|
||||
exitRoutine := make(chan bool)
|
||||
ctrl := gomock.NewController(t)
|
||||
|
Loading…
Reference in New Issue
Block a user