diff --git a/beacon-chain/blockchain/testing/mock.go b/beacon-chain/blockchain/testing/mock.go index 155d157ab..f2000471f 100644 --- a/beacon-chain/blockchain/testing/mock.go +++ b/beacon-chain/blockchain/testing/mock.go @@ -61,6 +61,7 @@ type ChainService struct { SyncCommitteePubkeys [][]byte Genesis time.Time ForkChoiceStore forkchoice.ForkChoicer + ReceiveBlockMockErr error } // ForkChoicer mocks the same method in the chain service @@ -221,6 +222,9 @@ func (s *ChainService) ReceiveBlockBatch(ctx context.Context, blks []block.Signe // ReceiveBlock mocks ReceiveBlock method in chain service. func (s *ChainService) ReceiveBlock(ctx context.Context, block block.SignedBeaconBlock, _ [32]byte) error { + if s.ReceiveBlockMockErr != nil { + return s.ReceiveBlockMockErr + } if s.State == nil { return ErrNilState } diff --git a/beacon-chain/powchain/engine-api-client/v1/client.go b/beacon-chain/powchain/engine-api-client/v1/client.go index 0bcf21797..9ac2377e5 100644 --- a/beacon-chain/powchain/engine-api-client/v1/client.go +++ b/beacon-chain/powchain/engine-api-client/v1/client.go @@ -249,6 +249,9 @@ func handleRPCError(err error) error { if err == nil { return nil } + if isTimeout(err) { + return errors.Wrapf(ErrHTTPTimeout, "%s", err) + } e, ok := err.(rpc.Error) if !ok { return errors.Wrap(err, "got an unexpected error") @@ -277,3 +280,16 @@ func handleRPCError(err error) error { return err } } + +// ErrHTTPTimeout returns true if the error is a http.Client timeout error. +var ErrHTTPTimeout = errors.New("timeout from http.Client") + +type httpTimeoutError interface { + Error() string + Timeout() bool +} + +func isTimeout(e error) bool { + t, ok := e.(httpTimeoutError) + return ok && t.Timeout() +} diff --git a/beacon-chain/powchain/engine-api-client/v1/client_test.go b/beacon-chain/powchain/engine-api-client/v1/client_test.go index 3198828e2..405bef5f9 100644 --- a/beacon-chain/powchain/engine-api-client/v1/client_test.go +++ b/beacon-chain/powchain/engine-api-client/v1/client_test.go @@ -491,7 +491,8 @@ func TestExchangeTransitionConfiguration(t *testing.T) { } type customError struct { - code int + code int + timeout bool } func (c *customError) ErrorCode() int { @@ -502,6 +503,10 @@ func (*customError) Error() string { return "something went wrong" } +func (c *customError) Timeout() bool { + return c.timeout +} + type dataError struct { code int data interface{} @@ -534,6 +539,11 @@ func Test_handleRPCError(t *testing.T) { expectedContains: "got an unexpected error", given: errors.New("foo"), }, + { + name: "HTTP times out", + expectedContains: ErrHTTPTimeout.Error(), + given: &customError{timeout: true}, + }, { name: "ErrParse", expectedContains: ErrParse.Error(), diff --git a/beacon-chain/sync/BUILD.bazel b/beacon-chain/sync/BUILD.bazel index 2e42694ce..e292e094b 100644 --- a/beacon-chain/sync/BUILD.bazel +++ b/beacon-chain/sync/BUILD.bazel @@ -76,6 +76,7 @@ go_library( "//beacon-chain/p2p/encoder:go_default_library", "//beacon-chain/p2p/peers:go_default_library", "//beacon-chain/p2p/types:go_default_library", + "//beacon-chain/powchain/engine-api-client/v1:go_default_library", "//beacon-chain/state:go_default_library", "//beacon-chain/state/stategen:go_default_library", "//cache/lru:go_default_library", @@ -184,6 +185,7 @@ go_test( "//beacon-chain/p2p/peers:go_default_library", "//beacon-chain/p2p/testing:go_default_library", "//beacon-chain/p2p/types:go_default_library", + "//beacon-chain/powchain/engine-api-client/v1:go_default_library", "//beacon-chain/state:go_default_library", "//beacon-chain/state/stategen:go_default_library", "//beacon-chain/state/v1:go_default_library", diff --git a/beacon-chain/sync/pending_blocks_queue.go b/beacon-chain/sync/pending_blocks_queue.go index 088935c48..458d68777 100644 --- a/beacon-chain/sync/pending_blocks_queue.go +++ b/beacon-chain/sync/pending_blocks_queue.go @@ -12,6 +12,7 @@ import ( "github.com/prysmaticlabs/prysm/async" "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" p2ptypes "github.com/prysmaticlabs/prysm/beacon-chain/p2p/types" + v1 "github.com/prysmaticlabs/prysm/beacon-chain/powchain/engine-api-client/v1" "github.com/prysmaticlabs/prysm/config/params" "github.com/prysmaticlabs/prysm/crypto/rand" "github.com/prysmaticlabs/prysm/encoding/bytesutil" @@ -162,9 +163,11 @@ func (s *Service) processPendingBlocks(ctx context.Context) error { } if err := s.cfg.chain.ReceiveBlock(ctx, b, blkRoot); err != nil { - log.Debugf("Could not process block from slot %d: %v", b.Block().Slot(), err) - s.setBadBlock(ctx, blkRoot) - tracing.AnnotateError(span, err) + if !errors.Is(err, v1.ErrHTTPTimeout) { + log.Debugf("Could not process block from slot %d: %v", b.Block().Slot(), err) + tracing.AnnotateError(span, err) + s.setBadBlock(ctx, blkRoot) + } // In the next iteration of the queue, this block will be removed from // the pending queue as it has been marked as a 'bad' block. span.End() diff --git a/beacon-chain/sync/pending_blocks_queue_test.go b/beacon-chain/sync/pending_blocks_queue_test.go index 9d56c4969..686321627 100644 --- a/beacon-chain/sync/pending_blocks_queue_test.go +++ b/beacon-chain/sync/pending_blocks_queue_test.go @@ -19,6 +19,7 @@ import ( "github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers" p2ptest "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing" p2ptypes "github.com/prysmaticlabs/prysm/beacon-chain/p2p/types" + v1 "github.com/prysmaticlabs/prysm/beacon-chain/powchain/engine-api-client/v1" "github.com/prysmaticlabs/prysm/beacon-chain/state/stategen" "github.com/prysmaticlabs/prysm/config/params" "github.com/prysmaticlabs/prysm/crypto/rand" @@ -110,6 +111,86 @@ func TestRegularSyncBeaconBlockSubscriber_ProcessPendingBlocks1(t *testing.T) { assert.Equal(t, 2, len(r.seenPendingBlocks), "Incorrect size for seen pending block") } +func TestRegularSyncBeaconBlockSubscriber_ExecutionEngineTimesOut(t *testing.T) { + db := dbtest.SetupDB(t) + + p1 := p2ptest.NewTestP2P(t) + r := &Service{ + cfg: &config{ + p2p: p1, + beaconDB: db, + chain: &mock.ChainService{ + FinalizedCheckPoint: ðpb.Checkpoint{ + Epoch: 0, + }, + ReceiveBlockMockErr: v1.ErrHTTPTimeout, + }, + stateGen: stategen.New(db), + }, + slotToPendingBlocks: gcache.New(time.Second, 2*time.Second), + seenPendingBlocks: make(map[[32]byte]bool), + } + r.initCaches() + + b0 := util.NewBeaconBlock() + wsb, err := wrapper.WrappedSignedBeaconBlock(b0) + require.NoError(t, err) + require.NoError(t, r.cfg.beaconDB.SaveBlock(context.Background(), wsb)) + b0Root, err := b0.Block.HashTreeRoot() + require.NoError(t, err) + b3 := util.NewBeaconBlock() + b3.Block.Slot = 3 + b3.Block.ParentRoot = b0Root[:] + wsb, err = wrapper.WrappedSignedBeaconBlock(b3) + require.NoError(t, err) + require.NoError(t, r.cfg.beaconDB.SaveBlock(context.Background(), wsb)) + // Incomplete block link + b1 := util.NewBeaconBlock() + b1.Block.Slot = 1 + b1.Block.ParentRoot = b0Root[:] + b1Root, err := b1.Block.HashTreeRoot() + require.NoError(t, err) + b2 := util.NewBeaconBlock() + b2.Block.Slot = 2 + b2.Block.ParentRoot = b1Root[:] + b2Root, err := b1.Block.HashTreeRoot() + require.NoError(t, err) + + // Add b2 to the cache + wsb, err = wrapper.WrappedSignedBeaconBlock(b2) + require.NoError(t, err) + require.NoError(t, r.insertBlockToPendingQueue(b2.Block.Slot, wsb, b2Root)) + + require.NoError(t, r.processPendingBlocks(context.Background())) + assert.Equal(t, 1, len(r.slotToPendingBlocks.Items()), "Incorrect size for slot to pending blocks cache") + assert.Equal(t, 1, len(r.seenPendingBlocks), "Incorrect size for seen pending block") + + // Add b1 to the cache + wsb, err = wrapper.WrappedSignedBeaconBlock(b1) + require.NoError(t, err) + require.NoError(t, r.insertBlockToPendingQueue(b1.Block.Slot, wsb, b1Root)) + wsb, err = wrapper.WrappedSignedBeaconBlock(b1) + require.NoError(t, err) + require.NoError(t, r.cfg.beaconDB.SaveBlock(context.Background(), wsb)) + + nBlock := util.NewBeaconBlock() + nBlock.Block.Slot = b1.Block.Slot + nRoot, err := nBlock.Block.HashTreeRoot() + require.NoError(t, err) + + // Insert bad b1 in the cache to verify the good one doesn't get replaced. + wsb, err = wrapper.WrappedSignedBeaconBlock(nBlock) + require.NoError(t, err) + require.NoError(t, r.insertBlockToPendingQueue(nBlock.Block.Slot, wsb, nRoot)) + require.NoError(t, r.processPendingBlocks(context.Background())) // Marks a block as bad + require.NoError(t, r.processPendingBlocks(context.Background())) // Bad block removed on second run + + assert.Equal(t, 1, len(r.slotToPendingBlocks.Items()), "Incorrect size for slot to pending blocks cache") + assert.Equal(t, 2, len(r.seenPendingBlocks), "Incorrect size for seen pending block") + require.Equal(t, 1, len(r.badBlockCache.Keys())) // Account for the bad block above + require.Equal(t, 0, len(r.seenBlockCache.Keys())) +} + func TestRegularSync_InsertDuplicateBlocks(t *testing.T) { db := dbtest.SetupDB(t) diff --git a/beacon-chain/sync/subscriber_beacon_blocks.go b/beacon-chain/sync/subscriber_beacon_blocks.go index a77557ac3..acef027db 100644 --- a/beacon-chain/sync/subscriber_beacon_blocks.go +++ b/beacon-chain/sync/subscriber_beacon_blocks.go @@ -3,8 +3,10 @@ package sync import ( "context" + "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" "github.com/prysmaticlabs/prysm/beacon-chain/core/transition/interop" + v1 "github.com/prysmaticlabs/prysm/beacon-chain/powchain/engine-api-client/v1" "github.com/prysmaticlabs/prysm/config/features" ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/wrapper" @@ -30,8 +32,10 @@ func (s *Service) beaconBlockSubscriber(ctx context.Context, msg proto.Message) } if err := s.cfg.chain.ReceiveBlock(ctx, signed, root); err != nil { - interop.WriteBlockToDisk(signed, true /*failed*/) - s.setBadBlock(ctx, root) + if !errors.Is(err, v1.ErrHTTPTimeout) { + interop.WriteBlockToDisk(signed, true /*failed*/) + s.setBadBlock(ctx, root) + } return err } diff --git a/beacon-chain/sync/subscriber_beacon_blocks_test.go b/beacon-chain/sync/subscriber_beacon_blocks_test.go index c7e50940e..e7f088687 100644 --- a/beacon-chain/sync/subscriber_beacon_blocks_test.go +++ b/beacon-chain/sync/subscriber_beacon_blocks_test.go @@ -9,6 +9,8 @@ import ( "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" dbtest "github.com/prysmaticlabs/prysm/beacon-chain/db/testing" "github.com/prysmaticlabs/prysm/beacon-chain/operations/attestations" + v1 "github.com/prysmaticlabs/prysm/beacon-chain/powchain/engine-api-client/v1" + lruwrpr "github.com/prysmaticlabs/prysm/cache/lru" ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/testing/assert" "github.com/prysmaticlabs/prysm/testing/require" @@ -108,3 +110,18 @@ func TestService_beaconBlockSubscriber(t *testing.T) { }) } } + +func TestService_BeaconBlockSubscribe_ExecutionEngineTimesOut(t *testing.T) { + s := &Service{ + cfg: &config{ + chain: &chainMock.ChainService{ + ReceiveBlockMockErr: v1.ErrHTTPTimeout, + }, + }, + seenBlockCache: lruwrpr.New(10), + badBlockCache: lruwrpr.New(10), + } + require.ErrorIs(t, v1.ErrHTTPTimeout, s.beaconBlockSubscriber(context.Background(), util.NewBeaconBlock())) + require.Equal(t, 0, len(s.badBlockCache.Keys())) + require.Equal(t, 1, len(s.seenBlockCache.Keys())) +}