prysm-pulse/beacon-chain/rpc/prysm/v1alpha1/validator/blocks_test.go

157 lines
5.5 KiB
Go
Raw Normal View History

package validator
import (
"context"
"testing"
"github.com/golang/mock/gomock"
chainMock "github.com/prysmaticlabs/prysm/v3/beacon-chain/blockchain/testing"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/core/altair"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/core/feed"
blockfeed "github.com/prysmaticlabs/prysm/v3/beacon-chain/core/feed/block"
statefeed "github.com/prysmaticlabs/prysm/v3/beacon-chain/core/feed/state"
dbTest "github.com/prysmaticlabs/prysm/v3/beacon-chain/db/testing"
"github.com/prysmaticlabs/prysm/v3/config/params"
"github.com/prysmaticlabs/prysm/v3/consensus-types/blocks"
ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v3/testing/assert"
"github.com/prysmaticlabs/prysm/v3/testing/mock"
"github.com/prysmaticlabs/prysm/v3/testing/require"
"github.com/prysmaticlabs/prysm/v3/testing/util"
)
func TestServer_StreamAltairBlocksVerified_ContextCanceled(t *testing.T) {
ctx := context.Background()
chainService := &chainMock.ChainService{}
ctx, cancel := context.WithCancel(ctx)
server := &Server{
Ctx: ctx,
StateNotifier: chainService.StateNotifier(),
HeadFetcher: chainService,
}
exitRoutine := make(chan bool)
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockStream := mock.NewMockBeaconNodeValidatorAltair_StreamBlocksServer(ctrl)
mockStream.EXPECT().Context().Return(ctx)
go func(tt *testing.T) {
assert.ErrorContains(tt, "Context canceled", server.StreamBlocksAltair(&ethpb.StreamBlocksRequest{
VerifiedOnly: true,
}, mockStream))
<-exitRoutine
}(t)
cancel()
exitRoutine <- true
}
func TestServer_StreamAltairBlocks_ContextCanceled(t *testing.T) {
ctx := context.Background()
chainService := &chainMock.ChainService{}
ctx, cancel := context.WithCancel(ctx)
server := &Server{
Ctx: ctx,
BlockNotifier: chainService.BlockNotifier(),
HeadFetcher: chainService,
}
exitRoutine := make(chan bool)
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockStream := mock.NewMockBeaconNodeValidatorAltair_StreamBlocksServer(ctrl)
mockStream.EXPECT().Context().Return(ctx)
go func(tt *testing.T) {
assert.ErrorContains(tt, "Context canceled", server.StreamBlocksAltair(&ethpb.StreamBlocksRequest{}, mockStream))
<-exitRoutine
}(t)
cancel()
exitRoutine <- true
}
func TestServer_StreamAltairBlocks_OnHeadUpdated(t *testing.T) {
params.SetupTestConfigCleanup(t)
2022-09-15 18:47:51 +00:00
params.OverrideBeaconConfig(params.BeaconConfig())
ctx := context.Background()
beaconState, privs := util.DeterministicGenesisStateAltair(t, 64)
c, err := altair.NextSyncCommittee(ctx, beaconState)
require.NoError(t, err)
require.NoError(t, beaconState.SetCurrentSyncCommittee(c))
b, err := util.GenerateFullBlockAltair(beaconState, privs, util.DefaultBlockGenConfig(), 1)
require.NoError(t, err)
chainService := &chainMock.ChainService{State: beaconState}
server := &Server{
Ctx: ctx,
BlockNotifier: chainService.BlockNotifier(),
HeadFetcher: chainService,
}
exitRoutine := make(chan bool)
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockStream := mock.NewMockBeaconNodeValidatorAltair_StreamBlocksServer(ctrl)
mockStream.EXPECT().Send(&ethpb.StreamBlocksResponse{Block: &ethpb.StreamBlocksResponse_AltairBlock{AltairBlock: b}}).Do(func(arg0 interface{}) {
exitRoutine <- true
})
mockStream.EXPECT().Context().Return(ctx).AnyTimes()
go func(tt *testing.T) {
assert.NoError(tt, server.StreamBlocksAltair(&ethpb.StreamBlocksRequest{}, mockStream), "Could not call RPC method")
}(t)
wrappedBlk, err := blocks.NewSignedBeaconBlock(b)
require.NoError(t, err)
// Send in a loop to ensure it is delivered (busy wait for the service to subscribe to the state feed).
for sent := 0; sent == 0; {
sent = server.BlockNotifier.BlockFeed().Send(&feed.Event{
Type: blockfeed.ReceivedBlock,
Data: &blockfeed.ReceivedBlockData{SignedBlock: wrappedBlk},
})
}
<-exitRoutine
}
func TestServer_StreamAltairBlocksVerified_OnHeadUpdated(t *testing.T) {
db := dbTest.SetupDB(t)
ctx := context.Background()
beaconState, privs := util.DeterministicGenesisStateAltair(t, 32)
c, err := altair.NextSyncCommittee(ctx, beaconState)
require.NoError(t, err)
require.NoError(t, beaconState.SetCurrentSyncCommittee(c))
b, err := util.GenerateFullBlockAltair(beaconState, privs, util.DefaultBlockGenConfig(), 1)
require.NoError(t, err)
r, err := b.Block.HashTreeRoot()
require.NoError(t, err)
wrappedBlk := util.SaveBlock(t, ctx, db, b)
chainService := &chainMock.ChainService{State: beaconState}
server := &Server{
Ctx: ctx,
StateNotifier: chainService.StateNotifier(),
HeadFetcher: chainService,
}
exitRoutine := make(chan bool)
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockStream := mock.NewMockBeaconNodeValidatorAltair_StreamBlocksServer(ctrl)
mockStream.EXPECT().Send(&ethpb.StreamBlocksResponse{Block: &ethpb.StreamBlocksResponse_AltairBlock{AltairBlock: b}}).Do(func(arg0 interface{}) {
exitRoutine <- true
})
mockStream.EXPECT().Context().Return(ctx).AnyTimes()
go func(tt *testing.T) {
assert.NoError(tt, server.StreamBlocksAltair(&ethpb.StreamBlocksRequest{
VerifiedOnly: true,
}, mockStream), "Could not call RPC method")
}(t)
// Send in a loop to ensure it is delivered (busy wait for the service to subscribe to the state feed).
for sent := 0; sent == 0; {
sent = server.StateNotifier.StateFeed().Send(&feed.Event{
Type: statefeed.BlockProcessed,
Data: &statefeed.BlockProcessedData{Slot: b.Block.Slot, BlockRoot: r, SignedBlock: wrappedBlk},
})
}
<-exitRoutine
}