package rpc import ( "context" "errors" "math/big" "reflect" "testing" "time" "github.com/prysmaticlabs/prysm/shared/event" ptypes "github.com/gogo/protobuf/types" "github.com/golang/mock/gomock" "github.com/prysmaticlabs/prysm/beacon-chain/db" "github.com/prysmaticlabs/prysm/beacon-chain/internal" pbp2p "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" pb "github.com/prysmaticlabs/prysm/proto/beacon/rpc/v1" "github.com/prysmaticlabs/prysm/shared/params" "github.com/prysmaticlabs/prysm/shared/testutil" logTest "github.com/sirupsen/logrus/hooks/test" ) type faultyPOWChainService struct { chainStartFeed *event.Feed } func (f *faultyPOWChainService) HasChainStartLogOccurred() (bool, uint64, error) { return false, uint64(time.Now().Unix()), nil } func (f *faultyPOWChainService) ChainStartFeed() *event.Feed { return f.chainStartFeed } func (f *faultyPOWChainService) LatestBlockNumber() *big.Int { return big.NewInt(0) } type mockPOWChainService struct { chainStartFeed *event.Feed latestBlockNumber *big.Int } func (m *mockPOWChainService) HasChainStartLogOccurred() (bool, uint64, error) { return true, uint64(time.Unix(0, 0).Unix()), nil } func (m *mockPOWChainService) ChainStartFeed() *event.Feed { return m.chainStartFeed } func (m *mockPOWChainService) LatestBlockNumber() *big.Int { return m.latestBlockNumber } func TestWaitForChainStart_ContextClosed(t *testing.T) { hook := logTest.NewGlobal() ctx, cancel := context.WithCancel(context.Background()) beaconServer := &BeaconServer{ ctx: ctx, powChainService: &faultyPOWChainService{ chainStartFeed: new(event.Feed), }, chainService: newMockChainService(), } exitRoutine := make(chan bool) ctrl := gomock.NewController(t) defer ctrl.Finish() mockStream := internal.NewMockBeaconService_WaitForChainStartServer(ctrl) go func(tt *testing.T) { if err := beaconServer.WaitForChainStart(&ptypes.Empty{}, mockStream); err != nil { tt.Errorf("Could not call RPC method: %v", err) } <-exitRoutine }(t) cancel() exitRoutine <- true testutil.AssertLogsContain(t, hook, "RPC context closed, exiting goroutine") } func TestWaitForChainStart_AlreadyStarted(t *testing.T) { beaconServer := &BeaconServer{ ctx: context.Background(), powChainService: &mockPOWChainService{ chainStartFeed: new(event.Feed), }, chainService: newMockChainService(), } ctrl := gomock.NewController(t) defer ctrl.Finish() mockStream := internal.NewMockBeaconService_WaitForChainStartServer(ctrl) mockStream.EXPECT().Send( &pb.ChainStartResponse{ Started: true, GenesisTime: uint64(time.Unix(0, 0).Unix()), }, ).Return(nil) if err := beaconServer.WaitForChainStart(&ptypes.Empty{}, mockStream); err != nil { t.Errorf("Could not call RPC method: %v", err) } } func TestWaitForChainStart_NotStartedThenLogFired(t *testing.T) { hook := logTest.NewGlobal() beaconServer := &BeaconServer{ ctx: context.Background(), chainStartChan: make(chan time.Time, 1), powChainService: &faultyPOWChainService{ chainStartFeed: new(event.Feed), }, chainService: newMockChainService(), } exitRoutine := make(chan bool) ctrl := gomock.NewController(t) defer ctrl.Finish() mockStream := internal.NewMockBeaconService_WaitForChainStartServer(ctrl) mockStream.EXPECT().Send( &pb.ChainStartResponse{ Started: true, GenesisTime: uint64(time.Unix(0, 0).Unix()), }, ).Return(nil) go func(tt *testing.T) { if err := beaconServer.WaitForChainStart(&ptypes.Empty{}, mockStream); err != nil { tt.Errorf("Could not call RPC method: %v", err) } <-exitRoutine }(t) beaconServer.chainStartChan <- time.Unix(0, 0) exitRoutine <- true testutil.AssertLogsContain(t, hook, "Sending ChainStart log and genesis time to connected validator clients") } func TestLatestAttestation_ContextClosed(t *testing.T) { hook := logTest.NewGlobal() mockOperationService := &mockOperationService{} ctx, cancel := context.WithCancel(context.Background()) beaconServer := &BeaconServer{ ctx: ctx, operationService: mockOperationService, chainService: newMockChainService(), } exitRoutine := make(chan bool) ctrl := gomock.NewController(t) defer ctrl.Finish() mockStream := internal.NewMockBeaconService_LatestAttestationServer(ctrl) go func(tt *testing.T) { if err := beaconServer.LatestAttestation(&ptypes.Empty{}, mockStream); err != nil { tt.Errorf("Could not call RPC method: %v", err) } <-exitRoutine }(t) cancel() exitRoutine <- true testutil.AssertLogsContain(t, hook, "RPC context closed, exiting goroutine") } func TestLatestAttestation_FaultyServer(t *testing.T) { mockOperationService := &mockOperationService{} ctx, cancel := context.WithCancel(context.Background()) beaconServer := &BeaconServer{ ctx: ctx, operationService: mockOperationService, incomingAttestation: make(chan *pbp2p.Attestation, 0), chainService: newMockChainService(), } ctrl := gomock.NewController(t) defer ctrl.Finish() exitRoutine := make(chan bool) attestation := &pbp2p.Attestation{} mockStream := internal.NewMockBeaconService_LatestAttestationServer(ctrl) mockStream.EXPECT().Send(attestation).Return(errors.New("something wrong")) // Tests a faulty stream. go func(tt *testing.T) { if err := beaconServer.LatestAttestation(&ptypes.Empty{}, mockStream); err.Error() != "something wrong" { tt.Errorf("Faulty stream should throw correct error, wanted 'something wrong', got %v", err) } <-exitRoutine }(t) beaconServer.incomingAttestation <- attestation cancel() exitRoutine <- true } func TestLatestAttestation_SendsCorrectly(t *testing.T) { hook := logTest.NewGlobal() operationService := &mockOperationService{} ctx, cancel := context.WithCancel(context.Background()) beaconServer := &BeaconServer{ ctx: ctx, operationService: operationService, incomingAttestation: make(chan *pbp2p.Attestation, 0), chainService: newMockChainService(), } ctrl := gomock.NewController(t) defer ctrl.Finish() exitRoutine := make(chan bool) attestation := &pbp2p.Attestation{} mockStream := internal.NewMockBeaconService_LatestAttestationServer(ctrl) mockStream.EXPECT().Send(attestation).Return(nil) // Tests a good stream. go func(tt *testing.T) { if err := beaconServer.LatestAttestation(&ptypes.Empty{}, mockStream); err != nil { tt.Errorf("Could not call RPC method: %v", err) } <-exitRoutine }(t) beaconServer.incomingAttestation <- attestation cancel() exitRoutine <- true testutil.AssertLogsContain(t, hook, "Sending attestation to RPC clients") } func TestPendingDeposits_ReturnsErrorOnUnknownBlockNum(t *testing.T) { p := &mockPOWChainService{ latestBlockNumber: nil, } bs := BeaconServer{powChainService: p} _, err := bs.PendingDeposits(context.Background(), nil) if err.Error() != "latest PoW block number is unknown" { t.Errorf("Received unexpected error: %v", err) } } func TestPendingDeposits_ReturnsDepositsOutsideEth1FollowWindow(t *testing.T) { p := &mockPOWChainService{ latestBlockNumber: big.NewInt(int64(10 + params.BeaconConfig().Eth1FollowDistance)), } d := &db.BeaconDB{} // Using the merkleTreeIndex as the block number for this test... readyDeposits := []*pbp2p.Deposit{ { MerkleTreeIndex: 1, }, { MerkleTreeIndex: 2, }, } recentDeposits := []*pbp2p.Deposit{ { MerkleTreeIndex: params.BeaconConfig().Eth1FollowDistance + 100, }, { MerkleTreeIndex: params.BeaconConfig().Eth1FollowDistance + 101, }, } ctx := context.Background() for _, dp := range append(recentDeposits, readyDeposits...) { d.InsertPendingDeposit(ctx, dp, big.NewInt(int64(dp.MerkleTreeIndex))) } bs := &BeaconServer{ beaconDB: d, powChainService: p, chainService: newMockChainService(), } result, err := bs.PendingDeposits(ctx, nil) if err != nil { t.Fatal(err) } if !reflect.DeepEqual(result.PendingDeposits, readyDeposits) { t.Errorf("Received unexpected list of deposits: %+v, wanted: %+v", result, readyDeposits) } // It should also return the recent deposits after their follow window. p.latestBlockNumber = big.NewInt(0).Add(p.latestBlockNumber, big.NewInt(10000)) allResp, err := bs.PendingDeposits(ctx, nil) if err != nil { t.Fatal(err) } if len(allResp.PendingDeposits) != len(recentDeposits)+len(readyDeposits) { t.Errorf( "Received unexpected number of pending deposits: %d, wanted: %d", len(allResp.PendingDeposits), len(recentDeposits)+len(readyDeposits), ) } }