mirror of
https://gitlab.com/pulsechaincom/prysm-pulse.git
synced 2025-01-11 04:00:05 +00:00
Make validator stable when beacon node goes offline (#8278)
* Make validator stable POC * fix feedback raul and nishant * fix wait till first iteration * fix imports * retry tests * fix init * test retry receive blocks * remove redundant return statement * terence feedback * terence feedback * remove log * to check for context after fist call * remove fatal Co-authored-by: Raul Jordan <raul@prysmaticlabs.com>
This commit is contained in:
parent
d53fdcf781
commit
4595789ac8
@ -15,12 +15,8 @@ var _ Validator = (*FakeValidator)(nil)
|
||||
type FakeValidator struct {
|
||||
DoneCalled bool
|
||||
WaitForWalletInitializationCalled bool
|
||||
WaitForActivationCalled bool
|
||||
WaitForChainStartCalled bool
|
||||
WaitForSyncCalled bool
|
||||
SlasherReadyCalled bool
|
||||
NextSlotCalled bool
|
||||
CanonicalHeadSlotCalled bool
|
||||
UpdateDutiesCalled bool
|
||||
UpdateProtectionsCalled bool
|
||||
RoleAtCalled bool
|
||||
@ -30,6 +26,12 @@ type FakeValidator struct {
|
||||
SaveProtectionsCalled bool
|
||||
DeleteProtectionCalled bool
|
||||
SlotDeadlineCalled bool
|
||||
WaitForChainStartCalled int
|
||||
WaitForSyncCalled int
|
||||
WaitForActivationCalled int
|
||||
CanonicalHeadSlotCalled int
|
||||
ReceiveBlocksCalled int
|
||||
RetryTillSuccess int
|
||||
ProposeBlockArg1 uint64
|
||||
AttestToBlockHeadArg1 uint64
|
||||
RoleAtArg1 uint64
|
||||
@ -62,19 +64,28 @@ func (fv *FakeValidator) WaitForWalletInitialization(_ context.Context) error {
|
||||
|
||||
// WaitForChainStart for mocking.
|
||||
func (fv *FakeValidator) WaitForChainStart(_ context.Context) error {
|
||||
fv.WaitForChainStartCalled = true
|
||||
fv.WaitForChainStartCalled++
|
||||
if fv.RetryTillSuccess >= fv.WaitForChainStartCalled {
|
||||
return errConnectionIssue
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// WaitForActivation for mocking.
|
||||
func (fv *FakeValidator) WaitForActivation(_ context.Context, _ chan struct{}) error {
|
||||
fv.WaitForActivationCalled = true
|
||||
fv.WaitForActivationCalled++
|
||||
if fv.RetryTillSuccess >= fv.WaitForActivationCalled {
|
||||
return errConnectionIssue
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// WaitForSync for mocking.
|
||||
func (fv *FakeValidator) WaitForSync(_ context.Context) error {
|
||||
fv.WaitForSyncCalled = true
|
||||
fv.WaitForSyncCalled++
|
||||
if fv.RetryTillSuccess >= fv.WaitForSyncCalled {
|
||||
return errConnectionIssue
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -86,7 +97,10 @@ func (fv *FakeValidator) SlasherReady(_ context.Context) error {
|
||||
|
||||
// CanonicalHeadSlot for mocking.
|
||||
func (fv *FakeValidator) CanonicalHeadSlot(_ context.Context) (uint64, error) {
|
||||
fv.CanonicalHeadSlotCalled = true
|
||||
fv.CanonicalHeadSlotCalled++
|
||||
if fv.RetryTillSuccess > fv.CanonicalHeadSlotCalled {
|
||||
return 0, errConnectionIssue
|
||||
}
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
@ -195,4 +209,9 @@ func (fv *FakeValidator) GetKeymanager() keymanager.IKeymanager {
|
||||
}
|
||||
|
||||
// ReceiveBlocks for mocking
|
||||
func (fv *FakeValidator) ReceiveBlocks(ctx context.Context) {}
|
||||
func (fv *FakeValidator) ReceiveBlocks(ctx context.Context, connectionErrorChannel chan error) {
|
||||
fv.ReceiveBlocksCalled++
|
||||
if fv.RetryTillSuccess > fv.ReceiveBlocksCalled {
|
||||
connectionErrorChannel <- errConnectionIssue
|
||||
}
|
||||
}
|
||||
|
@ -6,6 +6,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
|
||||
"github.com/prysmaticlabs/prysm/shared/bytesutil"
|
||||
"github.com/prysmaticlabs/prysm/shared/featureconfig"
|
||||
@ -16,6 +17,10 @@ import (
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
// time to wait before trying to reconnect with beacon node.
|
||||
var backOffPeriod = 10 * time.Second
|
||||
var errConnectionIssue = errors.New("could not connect")
|
||||
|
||||
// Validator interface defines the primary methods of a validator client.
|
||||
type Validator interface {
|
||||
Done()
|
||||
@ -38,7 +43,7 @@ type Validator interface {
|
||||
WaitForWalletInitialization(ctx context.Context) error
|
||||
AllValidatorsAreExited(ctx context.Context) (bool, error)
|
||||
GetKeymanager() keymanager.IKeymanager
|
||||
ReceiveBlocks(ctx context.Context)
|
||||
ReceiveBlocks(ctx context.Context, connectionErrorChannel chan error)
|
||||
}
|
||||
|
||||
// Run the main validator routine. This routine exits if the context is
|
||||
@ -64,37 +69,79 @@ func run(ctx context.Context, v Validator) {
|
||||
log.Fatalf("Slasher is not ready: %v", err)
|
||||
}
|
||||
}
|
||||
if err := v.WaitForChainStart(ctx); err != nil {
|
||||
log.Fatalf("Could not determine if beacon chain started: %v", err)
|
||||
}
|
||||
if err := v.WaitForSync(ctx); err != nil {
|
||||
log.Fatalf("Could not determine if beacon node synced: %v", err)
|
||||
}
|
||||
ticker := time.NewTicker(backOffPeriod)
|
||||
defer ticker.Stop()
|
||||
|
||||
var headSlot uint64
|
||||
firstTime := true
|
||||
accountsChangedChan := make(chan struct{}, 1)
|
||||
for {
|
||||
if !firstTime {
|
||||
if ctx.Err() != nil {
|
||||
log.Info("Context canceled, stopping validator")
|
||||
return // Exit if context is canceled.
|
||||
}
|
||||
<-ticker.C
|
||||
} else {
|
||||
firstTime = false
|
||||
}
|
||||
err := v.WaitForChainStart(ctx)
|
||||
if isConnectionError(err) {
|
||||
log.Warnf("Could not determine if beacon chain started: %v", err)
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
log.Fatalf("Could not determine if beacon chain started: %v", err)
|
||||
}
|
||||
err = v.WaitForSync(ctx)
|
||||
if isConnectionError(err) {
|
||||
log.Warnf("Could not determine if beacon chain started: %v", err)
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
log.Fatalf("Could not determine if beacon node synced: %v", err)
|
||||
}
|
||||
err = v.WaitForActivation(ctx, accountsChangedChan)
|
||||
if isConnectionError(err) {
|
||||
log.Warnf("Could not wait for validator activation: %v", err)
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
log.Fatalf("Could not wait for validator activation: %v", err)
|
||||
}
|
||||
headSlot, err = v.CanonicalHeadSlot(ctx)
|
||||
if isConnectionError(err) {
|
||||
log.Warnf("Could not get current canonical head slot: %v", err)
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
log.Fatalf("Could not get current canonical head slot: %v", err)
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
go handleAccountsChanged(ctx, v, accountsChangedChan)
|
||||
if err := v.WaitForActivation(ctx, accountsChangedChan); err != nil {
|
||||
log.Fatalf("Could not wait for validator activation: %v", err)
|
||||
}
|
||||
|
||||
go v.ReceiveBlocks(ctx)
|
||||
|
||||
headSlot, err := v.CanonicalHeadSlot(ctx)
|
||||
if err != nil {
|
||||
log.Fatalf("Could not get current canonical head slot: %v", err)
|
||||
}
|
||||
connectionErrorChannel := make(chan error, 1)
|
||||
go v.ReceiveBlocks(ctx, connectionErrorChannel)
|
||||
if err := v.UpdateDuties(ctx, headSlot); err != nil {
|
||||
handleAssignmentError(err, headSlot)
|
||||
}
|
||||
|
||||
for {
|
||||
slotCtx, cancel := context.WithCancel(ctx)
|
||||
ctx, span := trace.StartSpan(ctx, "validator.processSlot")
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Info("Context canceled, stopping validator")
|
||||
span.End()
|
||||
cancel()
|
||||
return // Exit if context is canceled.
|
||||
case blocksError := <-connectionErrorChannel:
|
||||
if blocksError != nil {
|
||||
log.WithError(blocksError).Warn("block stream interrupted")
|
||||
go v.ReceiveBlocks(ctx, connectionErrorChannel)
|
||||
continue
|
||||
}
|
||||
case slot := <-v.NextSlot():
|
||||
span.AddAttributes(trace.Int64Attribute("slot", int64(slot)))
|
||||
|
||||
@ -108,7 +155,7 @@ func run(ctx context.Context, v Validator) {
|
||||
}
|
||||
|
||||
deadline := v.SlotDeadline(slot)
|
||||
slotCtx, cancel := context.WithDeadline(ctx, deadline)
|
||||
slotCtx, cancel = context.WithDeadline(ctx, deadline)
|
||||
log := log.WithField("slot", slot)
|
||||
log.WithField("deadline", deadline).Debug("Set deadline for proposals and attestations")
|
||||
|
||||
@ -172,6 +219,10 @@ func run(ctx context.Context, v Validator) {
|
||||
}
|
||||
}
|
||||
|
||||
func isConnectionError(err error) bool {
|
||||
return err != nil && errors.Is(err, errConnectionIssue)
|
||||
}
|
||||
|
||||
func handleAssignmentError(err error, slot uint64) {
|
||||
if errCode, ok := status.FromError(err); ok && errCode.Code() == codes.NotFound {
|
||||
log.WithField(
|
||||
|
@ -30,13 +30,34 @@ func TestCancelledContext_CleansUpValidator(t *testing.T) {
|
||||
func TestCancelledContext_WaitsForChainStart(t *testing.T) {
|
||||
v := &FakeValidator{Keymanager: &mockKeymanager{accountsChangedFeed: &event.Feed{}}}
|
||||
run(cancelledContext(), v)
|
||||
assert.Equal(t, true, v.WaitForChainStartCalled, "Expected WaitForChainStart() to be called")
|
||||
assert.Equal(t, 1, v.WaitForChainStartCalled, "Expected WaitForChainStart() to be called")
|
||||
}
|
||||
|
||||
func TestRetry_On_ConnectionError(t *testing.T) {
|
||||
retry := 10
|
||||
v := &FakeValidator{
|
||||
Keymanager: &mockKeymanager{accountsChangedFeed: &event.Feed{}},
|
||||
RetryTillSuccess: retry,
|
||||
}
|
||||
backOffPeriod = 10 * time.Millisecond
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
go run(ctx, v)
|
||||
// each step will fail (retry times)=10 this sleep times will wait more then
|
||||
// the time it takes for all steps to succeed before main loop.
|
||||
time.Sleep(time.Duration(retry*6) * backOffPeriod)
|
||||
cancel()
|
||||
// every call will fail retry=10 times so first one will be called 4 * retry=10.
|
||||
assert.Equal(t, retry*4, v.WaitForChainStartCalled, "Expected WaitForChainStart() to be called")
|
||||
assert.Equal(t, retry*3, v.WaitForSyncCalled, "Expected WaitForSync() to be called")
|
||||
assert.Equal(t, retry*2, v.WaitForActivationCalled, "Expected WaitForActivation() to be called")
|
||||
assert.Equal(t, retry, v.CanonicalHeadSlotCalled, "Expected WaitForActivation() to be called")
|
||||
assert.Equal(t, retry, v.ReceiveBlocksCalled, "Expected WaitForActivation() to be called")
|
||||
}
|
||||
|
||||
func TestCancelledContext_WaitsForActivation(t *testing.T) {
|
||||
v := &FakeValidator{Keymanager: &mockKeymanager{accountsChangedFeed: &event.Feed{}}}
|
||||
run(cancelledContext(), v)
|
||||
assert.Equal(t, true, v.WaitForActivationCalled, "Expected WaitForActivation() to be called")
|
||||
assert.Equal(t, 1, v.WaitForActivationCalled, "Expected WaitForActivation() to be called")
|
||||
}
|
||||
|
||||
func TestCancelledContext_ChecksSlasherReady(t *testing.T) {
|
||||
|
@ -138,7 +138,7 @@ func (v *validator) WaitForChainStart(ctx context.Context) error {
|
||||
// First, check if the beacon chain has started.
|
||||
stream, err := v.validatorClient.WaitForChainStart(ctx, &ptypes.Empty{})
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not setup beacon chain ChainStart streaming client")
|
||||
return errors.Wrap(errConnectionIssue, errors.Wrap(err, "could not setup beacon chain ChainStart streaming client").Error())
|
||||
}
|
||||
|
||||
log.Info("Waiting for beacon chain start log from the ETH 1.0 deposit contract")
|
||||
@ -148,7 +148,7 @@ func (v *validator) WaitForChainStart(ctx context.Context) error {
|
||||
return errors.Wrap(ctx.Err(), "context has been canceled so shutting down the loop")
|
||||
}
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not receive ChainStart from stream")
|
||||
return errors.Wrap(errConnectionIssue, errors.Wrap(err, "could not receive ChainStart from stream").Error())
|
||||
}
|
||||
v.genesisTime = chainStartRes.GenesisTime
|
||||
curGenValRoot, err := v.db.GenesisValidatorsRoot(ctx)
|
||||
@ -172,6 +172,8 @@ func (v *validator) WaitForChainStart(ctx context.Context) error {
|
||||
)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return errConnectionIssue
|
||||
}
|
||||
|
||||
// Once the ChainStart log is received, we update the genesis time of the validator client
|
||||
@ -188,7 +190,7 @@ func (v *validator) WaitForSync(ctx context.Context) error {
|
||||
|
||||
s, err := v.node.GetSyncStatus(ctx, &ptypes.Empty{})
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not get sync status")
|
||||
return errors.Wrap(errConnectionIssue, errors.Wrap(err, "could not get sync status").Error())
|
||||
}
|
||||
if !s.Syncing {
|
||||
return nil
|
||||
@ -200,7 +202,7 @@ func (v *validator) WaitForSync(ctx context.Context) error {
|
||||
case <-time.After(slotutil.DivideSlotBy(2 /* twice per slot */)):
|
||||
s, err := v.node.GetSyncStatus(ctx, &ptypes.Empty{})
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not get sync status")
|
||||
return errors.Wrap(errConnectionIssue, errors.Wrap(err, "could not get sync status").Error())
|
||||
}
|
||||
if !s.Syncing {
|
||||
return nil
|
||||
@ -246,10 +248,11 @@ func (v *validator) SlasherReady(ctx context.Context) error {
|
||||
// ReceiveBlocks starts a gRPC client stream listener to obtain
|
||||
// blocks from the beacon node. Upon receiving a block, the service
|
||||
// broadcasts it to a feed for other usages to subscribe to.
|
||||
func (v *validator) ReceiveBlocks(ctx context.Context) {
|
||||
func (v *validator) ReceiveBlocks(ctx context.Context, connectionErrorChannel chan error) {
|
||||
stream, err := v.beaconClient.StreamBlocks(ctx, ðpb.StreamBlocksRequest{VerifiedOnly: true})
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to retrieve blocks stream")
|
||||
log.WithError(err).Error("Failed to retrieve blocks stream, " + errConnectionIssue.Error())
|
||||
connectionErrorChannel <- errors.Wrap(errConnectionIssue, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
@ -260,7 +263,8 @@ func (v *validator) ReceiveBlocks(ctx context.Context) {
|
||||
}
|
||||
res, err := stream.Recv()
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Could not receive blocks from beacon node")
|
||||
log.WithError(err).Error("Could not receive blocks from beacon node, " + errConnectionIssue.Error())
|
||||
connectionErrorChannel <- errors.Wrap(errConnectionIssue, err.Error())
|
||||
return
|
||||
}
|
||||
if res == nil || res.Block == nil {
|
||||
@ -331,7 +335,7 @@ func (v *validator) CanonicalHeadSlot(ctx context.Context) (uint64, error) {
|
||||
defer span.End()
|
||||
head, err := v.beaconClient.GetChainHead(ctx, &ptypes.Empty{})
|
||||
if err != nil {
|
||||
return 0, err
|
||||
return 0, errors.Wrap(errConnectionIssue, err.Error())
|
||||
}
|
||||
return head.HeadSlot, nil
|
||||
}
|
||||
|
@ -912,7 +912,8 @@ func TestService_ReceiveBlocks_NilBlock(t *testing.T) {
|
||||
).Do(func() {
|
||||
cancel()
|
||||
})
|
||||
v.ReceiveBlocks(ctx)
|
||||
connectionErrorChannel := make(chan error)
|
||||
v.ReceiveBlocks(ctx, connectionErrorChannel)
|
||||
require.Equal(t, uint64(0), v.highestValidSlot)
|
||||
}
|
||||
|
||||
@ -939,6 +940,7 @@ func TestService_ReceiveBlocks_SetHighest(t *testing.T) {
|
||||
).Do(func() {
|
||||
cancel()
|
||||
})
|
||||
v.ReceiveBlocks(ctx)
|
||||
connectionErrorChannel := make(chan error)
|
||||
v.ReceiveBlocks(ctx, connectionErrorChannel)
|
||||
require.Equal(t, slot, v.highestValidSlot)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user