prysm-pulse/beacon-chain/sync/initial-sync/service_test.go
Radosław Kapka 12403d249f
[Feature] - API Middleware (#8926)
* HTTP proxy server for Eth2 APIs (#8904)

* Implement API HTTP proxy server

* cleanup + more comments

* gateway will no longer be dependent on beaconv1

* handle error during ErrorJson type assertion

* simplify handling of endpoint data

* fix mux v1 route

* use URL encoding for all requests

* comment fieldProcessor

* fix failing test

* change proxy port to not interfere with e2e

* gzl

* simplify conditional expression

* Move appending custom error header to grpcutils package

* add api-middleware-port flag

* fix documentation for processField

* modify e2e port

* change field processing error message

* better error message for field processing

* simplify base64ToHexProcessor

* fix json structs

* Run several new endpoints through API middleware (#8922)

* Implement API HTTP proxy server

* cleanup + more comments

* gateway will no longer be dependent on beaconv1

* handle error during ErrorJson type assertion

* simplify handling of endpoint data

* fix mux v1 route

* use URL encoding for all requests

* comment fieldProcessor

* fix failing test

* change proxy port to not interfere with e2e

* gzl

* simplify conditional expression

* Move appending custom error header to grpcutils package

* add api-middleware-port flag

* fix documentation for processField

* modify e2e port

* change field processing error message

* better error message for field processing

* simplify base64ToHexProcessor

* fix json structs

* /eth/v1/beacon/states/{state_id}/validators

* /eth/v1/beacon/states/{state_id}/validators/{validator_id}

* /eth/v1/beacon/states/{state_id}/validator_balances

* /eth/v1/beacon/states/{state_id}/committees

* allow skipping base64-encoding for query params

* /eth/v1/beacon/pool/attestations

* replace break with continue

* Remove unused functions (#8924)

Co-authored-by: terence tsao <terence@prysmaticlabs.com>

* Process SSZ-serialized beacon state through API middleware (#8925)

* update field names

* Process SSZ-serialized beacon state through API middleware

* revert changes to go.mod and go.sum

* Revert "Merge branch '__develop' into feature/api-middleware"

This reverts commit 7c739a8fd71e2c1e3a14be85abd29a59b57ae9b5, reversing
changes made to 2d0f8e012ecb006888ed8e826b45625a3edc2eeb.

* update ethereumapis

* update validator field name

* update deps.bzl

* update json tags (#8942)

* Run `/node/syncing` through API Middleware (#8944)

* add IsSyncing field to grpc response

* run /node/syncing through the middleware

Co-authored-by: Raul Jordan <raul@prysmaticlabs.com>

* Return HTTP status codes other than 200 and 500 from node and debug endpoints (#8937)

* error codes for node endpoints

* error codes for debug endpoints

* better comment about headers

* gzl

* review comments

* comment on return value

* update fakeChecker used for fuzz tests

* fix failing tests

* Allow to pass URL params literally, without encoding to base64 (#8938)

* Allow to pass URL params literally, without encoding to base64

* fix compile error

Co-authored-by: Raul Jordan <raul@prysmaticlabs.com>

* Process SSZ-serialized beacon state through API middleware (#8925)

* update field names

* Process SSZ-serialized beacon state through API middleware

* revert changes to go.mod and go.sum

* Revert "Merge branch '__develop' into feature/api-middleware"

This reverts commit 7c739a8fd71e2c1e3a14be85abd29a59b57ae9b5, reversing
changes made to 2d0f8e012ecb006888ed8e826b45625a3edc2eeb.

* update ethereumapis

* update validator field name

* update deps.bzl

* update json tags (#8942)

* Run `/node/syncing` through API Middleware (#8944)

* add IsSyncing field to grpc response

* run /node/syncing through the middleware

Co-authored-by: Raul Jordan <raul@prysmaticlabs.com>

* Return HTTP status codes other than 200 and 500 from node and debug endpoints (#8937)

* error codes for node endpoints

* error codes for debug endpoints

* better comment about headers

* gzl

* review comments

* comment on return value

* update fakeChecker used for fuzz tests

* fix failing tests

* Allow to pass URL params literally, without encoding to base64 (#8938)

* Allow to pass URL params literally, without encoding to base64

* fix compile error

Co-authored-by: Raul Jordan <raul@prysmaticlabs.com>

* unused import

* Return correct status codes from beacon endpoints (#8960)

* Various API Middleware fixes (#8963)

* Return correct status codes from `/states` endpoints

* better error messages in debug and node

* better error messages in state

* returning correct error codes from validator endpoints

* correct error codes for getting a block header

* gzl

* fix err variable name

* fix nil block comparison

* test fixes

* make status enum test better

* fix ineffectual assignment

* make PR unstuck

* return proper status codes

* return uppercase keys from /config/spec

* return lowercase validator status

* convert requested enum values to uppercase

* validator fixes

* Implement `/beacon/headers` endpoint (#8966)

* Refactor API Middleware into more manageable code  (#8984)

* move endpoint registration out of shared package

* divide main function into smaller components

* return early on error

* implement hooks

* implement custom handlers and add documentation

* fix test compile error

* restrict package visibility

* remove redundant error checking

* rename file

* API Middleware unit tests (#8998)

* move endpoint registration out of shared package

* divide main function into smaller components

* return early on error

* implement hooks

* implement custom handlers and add documentation

* fix test compile error

* restrict package visibility

* remove redundant error checking

* rename file

* api_middleware_processing

* endpoints

* gzl

* remove gazelle:ignore

* merge

* Implement SSZ version of `/blocks/{block_id}` (#8970)

* Implement SSZ version of `/blocks/{block_id}`

* add dependencies back

* fix indentation in deps.bzl

* parameterize ssz functions

* get block ssz

* update ethereumapis dependency

* gzl

* Do not reuse `Endpoint` structs between API calls (#9007)

* code refactor

* implement endpoint factory

* fix test

* fmt

* include pbs

* gaz

* test naming fixes

* remove unused code

* radek comments

* revert endpoint test

* bring back bytes test case

* move `signedBeaconBlock` to `migration` package

* change `fmt.Errorf` to `errors.Wrap`

* capitalize SSZ

* capitalize URL

* more review feedback

* rename `handleGetBlockSSZ` to `handleGetBeaconBlockSSZ`

* rename `IndexOutOfRangeError` to `ValidatorIndexOutOfRangeError`

* simplify parameter names

* test header

* more corrections

* properly allocate array capacity

Co-authored-by: terence tsao <terence@prysmaticlabs.com>
Co-authored-by: Raul Jordan <raul@prysmaticlabs.com>
Co-authored-by: Nishant Das <nishdas93@gmail.com>
2021-06-15 10:28:49 -05:00

464 lines
13 KiB
Go

package initialsync
import (
"context"
"sync"
"testing"
"time"
"github.com/paulbellamy/ratecounter"
types "github.com/prysmaticlabs/eth2-types"
mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
dbtest "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
p2pt "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing"
"github.com/prysmaticlabs/prysm/cmd/beacon-chain/flags"
eth "github.com/prysmaticlabs/prysm/proto/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/abool"
"github.com/prysmaticlabs/prysm/shared/event"
"github.com/prysmaticlabs/prysm/shared/interfaces"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/testutil"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
"github.com/prysmaticlabs/prysm/shared/testutil/require"
logTest "github.com/sirupsen/logrus/hooks/test"
)
func TestService_Constants(t *testing.T) {
if params.BeaconConfig().MaxPeersToSync*flags.Get().BlockBatchLimit > 1000 {
t.Fatal("rpc rejects requests over 1000 range slots")
}
}
func TestService_InitStartStop(t *testing.T) {
hook := logTest.NewGlobal()
tests := []struct {
name string
assert func()
methodRuns func(fd *event.Feed)
chainService func() *mock.ChainService
}{
{
name: "head is not ready",
assert: func() {
assert.LogsContain(t, hook, "Waiting for state to be initialized")
},
},
{
name: "future genesis",
chainService: func() *mock.ChainService {
// Set to future time (genesis time hasn't arrived yet).
st, err := testutil.NewBeaconState()
require.NoError(t, err)
return &mock.ChainService{
State: st,
FinalizedCheckPoint: &eth.Checkpoint{
Epoch: 0,
},
}
},
methodRuns: func(fd *event.Feed) {
// Send valid event.
fd.Send(&feed.Event{
Type: statefeed.Initialized,
Data: &statefeed.InitializedData{
StartTime: time.Unix(4113849600, 0),
GenesisValidatorsRoot: make([]byte, 32),
},
})
},
assert: func() {
assert.LogsContain(t, hook, "Genesis time has not arrived - not syncing")
assert.LogsContain(t, hook, "Waiting for state to be initialized")
},
},
{
name: "zeroth epoch",
chainService: func() *mock.ChainService {
// Set to nearby slot.
st, err := testutil.NewBeaconState()
require.NoError(t, err)
return &mock.ChainService{
State: st,
FinalizedCheckPoint: &eth.Checkpoint{
Epoch: 0,
},
}
},
methodRuns: func(fd *event.Feed) {
// Send valid event.
fd.Send(&feed.Event{
Type: statefeed.Initialized,
Data: &statefeed.InitializedData{
StartTime: time.Now().Add(-5 * time.Minute),
GenesisValidatorsRoot: make([]byte, 32),
},
})
},
assert: func() {
assert.LogsContain(t, hook, "Chain started within the last epoch - not syncing")
assert.LogsDoNotContain(t, hook, "Genesis time has not arrived - not syncing")
assert.LogsContain(t, hook, "Waiting for state to be initialized")
},
},
{
name: "already synced",
chainService: func() *mock.ChainService {
// Set to some future slot, and then make sure that current head matches it.
st, err := testutil.NewBeaconState()
require.NoError(t, err)
futureSlot := types.Slot(27354)
require.NoError(t, st.SetSlot(futureSlot))
return &mock.ChainService{
State: st,
FinalizedCheckPoint: &eth.Checkpoint{
Epoch: helpers.SlotToEpoch(futureSlot),
},
}
},
methodRuns: func(fd *event.Feed) {
futureSlot := types.Slot(27354)
// Send valid event.
fd.Send(&feed.Event{
Type: statefeed.Initialized,
Data: &statefeed.InitializedData{
StartTime: makeGenesisTime(futureSlot),
GenesisValidatorsRoot: make([]byte, 32),
},
})
},
assert: func() {
assert.LogsContain(t, hook, "Starting initial chain sync...")
assert.LogsContain(t, hook, "Already synced to the current chain head")
assert.LogsDoNotContain(t, hook, "Chain started within the last epoch - not syncing")
assert.LogsDoNotContain(t, hook, "Genesis time has not arrived - not syncing")
assert.LogsContain(t, hook, "Waiting for state to be initialized")
},
},
}
p := p2pt.NewTestP2P(t)
connectPeers(t, p, []*peerData{}, p.Peers())
for i, tt := range tests {
if i == 0 {
continue
}
t.Run(tt.name, func(t *testing.T) {
defer hook.Reset()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mc := &mock.ChainService{}
// Allow overriding with customized chain service.
if tt.chainService != nil {
mc = tt.chainService()
}
// Initialize feed
notifier := &mock.MockStateNotifier{}
s := NewService(ctx, &Config{
P2P: p,
Chain: mc,
StateNotifier: notifier,
})
time.Sleep(500 * time.Millisecond)
assert.NotNil(t, s)
if tt.methodRuns != nil {
tt.methodRuns(notifier.StateFeed())
}
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
s.Start()
wg.Done()
}()
go func() {
// Allow to exit from test (on no head loop waiting for head is started).
// In most tests, this is redundant, as Start() already exited.
time.AfterFunc(3*time.Second, func() {
cancel()
})
}()
if testutil.WaitTimeout(wg, time.Second*4) {
t.Fatalf("Test should have exited by now, timed out")
}
tt.assert()
})
}
}
func TestService_waitForStateInitialization(t *testing.T) {
hook := logTest.NewGlobal()
newService := func(ctx context.Context, mc *mock.ChainService) *Service {
ctx, cancel := context.WithCancel(ctx)
s := &Service{
cfg: &Config{Chain: mc, StateNotifier: mc.StateNotifier()},
ctx: ctx,
cancel: cancel,
synced: abool.New(),
chainStarted: abool.New(),
counter: ratecounter.NewRateCounter(counterSeconds * time.Second),
genesisChan: make(chan time.Time),
}
return s
}
t.Run("no state and context close", func(t *testing.T) {
defer hook.Reset()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s := newService(ctx, &mock.ChainService{})
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
go s.waitForStateInitialization()
currTime := <-s.genesisChan
assert.Equal(t, true, currTime.IsZero())
wg.Done()
}()
go func() {
time.AfterFunc(500*time.Millisecond, func() {
cancel()
})
}()
if testutil.WaitTimeout(wg, time.Second*2) {
t.Fatalf("Test should have exited by now, timed out")
}
assert.LogsContain(t, hook, "Waiting for state to be initialized")
assert.LogsContain(t, hook, "Context closed, exiting goroutine")
assert.LogsDoNotContain(t, hook, "Subscription to state notifier failed")
})
t.Run("no state and state init event received", func(t *testing.T) {
defer hook.Reset()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s := newService(ctx, &mock.ChainService{})
expectedGenesisTime := time.Unix(358544700, 0)
var receivedGenesisTime time.Time
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
go s.waitForStateInitialization()
receivedGenesisTime = <-s.genesisChan
assert.Equal(t, false, receivedGenesisTime.IsZero())
wg.Done()
}()
go func() {
time.AfterFunc(500*time.Millisecond, func() {
// Send invalid event at first.
s.cfg.StateNotifier.StateFeed().Send(&feed.Event{
Type: statefeed.Initialized,
Data: &statefeed.BlockProcessedData{},
})
// Send valid event.
s.cfg.StateNotifier.StateFeed().Send(&feed.Event{
Type: statefeed.Initialized,
Data: &statefeed.InitializedData{
StartTime: expectedGenesisTime,
GenesisValidatorsRoot: make([]byte, 32),
},
})
})
}()
if testutil.WaitTimeout(wg, time.Second*2) {
t.Fatalf("Test should have exited by now, timed out")
}
assert.Equal(t, expectedGenesisTime, receivedGenesisTime)
assert.LogsContain(t, hook, "Event feed data is not type *statefeed.InitializedData")
assert.LogsContain(t, hook, "Waiting for state to be initialized")
assert.LogsContain(t, hook, "Received state initialized event")
assert.LogsDoNotContain(t, hook, "Context closed, exiting goroutine")
})
t.Run("no state and state init event received and service start", func(t *testing.T) {
defer hook.Reset()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s := newService(ctx, &mock.ChainService{})
// Initialize mock feed
_ = s.cfg.StateNotifier.StateFeed()
expectedGenesisTime := time.Now().Add(60 * time.Second)
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
s.waitForStateInitialization()
wg.Done()
}()
wg.Add(1)
go func() {
time.AfterFunc(500*time.Millisecond, func() {
// Send valid event.
s.cfg.StateNotifier.StateFeed().Send(&feed.Event{
Type: statefeed.Initialized,
Data: &statefeed.InitializedData{
StartTime: expectedGenesisTime,
GenesisValidatorsRoot: make([]byte, 32),
},
})
})
s.Start()
wg.Done()
}()
if testutil.WaitTimeout(wg, time.Second*5) {
t.Fatalf("Test should have exited by now, timed out")
}
assert.LogsContain(t, hook, "Waiting for state to be initialized")
assert.LogsContain(t, hook, "Received state initialized event")
assert.LogsDoNotContain(t, hook, "Context closed, exiting goroutine")
})
}
func TestService_markSynced(t *testing.T) {
mc := &mock.ChainService{}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s := NewService(ctx, &Config{
Chain: mc,
StateNotifier: mc.StateNotifier(),
})
require.NotNil(t, s)
assert.Equal(t, false, s.chainStarted.IsSet())
assert.Equal(t, false, s.synced.IsSet())
assert.Equal(t, true, s.Syncing())
assert.NoError(t, s.Status())
s.chainStarted.Set()
assert.ErrorContains(t, "syncing", s.Status())
expectedGenesisTime := time.Unix(358544700, 0)
var receivedGenesisTime time.Time
stateChannel := make(chan *feed.Event, 1)
stateSub := s.cfg.StateNotifier.StateFeed().Subscribe(stateChannel)
defer stateSub.Unsubscribe()
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
select {
case stateEvent := <-stateChannel:
if stateEvent.Type == statefeed.Synced {
data, ok := stateEvent.Data.(*statefeed.SyncedData)
require.Equal(t, true, ok, "Event feed data is not type *statefeed.SyncedData")
receivedGenesisTime = data.StartTime
}
case <-s.ctx.Done():
}
wg.Done()
}()
s.markSynced(expectedGenesisTime)
if testutil.WaitTimeout(wg, time.Second*2) {
t.Fatalf("Test should have exited by now, timed out")
}
assert.Equal(t, expectedGenesisTime, receivedGenesisTime)
assert.Equal(t, false, s.Syncing())
}
func TestService_Resync(t *testing.T) {
p := p2pt.NewTestP2P(t)
connectPeers(t, p, []*peerData{
{blocks: makeSequence(1, 160), finalizedEpoch: 5, headSlot: 160},
}, p.Peers())
cache.initializeRootCache(makeSequence(1, 160), t)
beaconDB := dbtest.SetupDB(t)
err := beaconDB.SaveBlock(context.Background(), interfaces.WrappedPhase0SignedBeaconBlock(testutil.NewBeaconBlock()))
require.NoError(t, err)
cache.RLock()
genesisRoot := cache.rootCache[0]
cache.RUnlock()
hook := logTest.NewGlobal()
tests := []struct {
name string
assert func(s *Service)
chainService func() *mock.ChainService
wantedErr string
}{
{
name: "no head state",
wantedErr: "could not retrieve head state",
},
{
name: "resync ok",
chainService: func() *mock.ChainService {
st, err := testutil.NewBeaconState()
require.NoError(t, err)
futureSlot := types.Slot(160)
require.NoError(t, st.SetGenesisTime(uint64(makeGenesisTime(futureSlot).Unix())))
return &mock.ChainService{
State: st,
Root: genesisRoot[:],
DB: beaconDB,
FinalizedCheckPoint: &eth.Checkpoint{
Epoch: helpers.SlotToEpoch(futureSlot),
},
}
},
assert: func(s *Service) {
assert.LogsContain(t, hook, "Resync attempt complete")
assert.Equal(t, types.Slot(160), s.cfg.Chain.HeadSlot())
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
defer hook.Reset()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mc := &mock.ChainService{}
// Allow overriding with customized chain service.
if tt.chainService != nil {
mc = tt.chainService()
}
s := NewService(ctx, &Config{
DB: beaconDB,
P2P: p,
Chain: mc,
StateNotifier: mc.StateNotifier(),
})
assert.NotNil(t, s)
assert.Equal(t, types.Slot(0), s.cfg.Chain.HeadSlot())
err := s.Resync()
if tt.wantedErr != "" {
assert.ErrorContains(t, tt.wantedErr, err)
} else {
assert.NoError(t, err)
}
if tt.assert != nil {
tt.assert(s)
}
})
}
}
func TestService_Initialized(t *testing.T) {
s := NewService(context.Background(), &Config{
StateNotifier: &mock.MockStateNotifier{},
})
s.chainStarted.Set()
assert.Equal(t, true, s.Initialized())
s.chainStarted.UnSet()
assert.Equal(t, false, s.Initialized())
}
func TestService_Synced(t *testing.T) {
s := NewService(context.Background(), &Config{
StateNotifier: &mock.MockStateNotifier{},
})
s.synced.UnSet()
assert.Equal(t, false, s.Synced())
s.synced.Set()
assert.Equal(t, true, s.Synced())
}