mirror of
https://gitlab.com/pulsechaincom/prysm-pulse.git
synced 2025-01-08 18:51:19 +00:00
Sync: don't set block to bad due to timeout (#10470)
* Add filter error and tests * Update BUILD.bazel * Kasey's feedback
This commit is contained in:
parent
5704fb34be
commit
7cdc741b2f
@ -61,6 +61,7 @@ type ChainService struct {
|
|||||||
SyncCommitteePubkeys [][]byte
|
SyncCommitteePubkeys [][]byte
|
||||||
Genesis time.Time
|
Genesis time.Time
|
||||||
ForkChoiceStore forkchoice.ForkChoicer
|
ForkChoiceStore forkchoice.ForkChoicer
|
||||||
|
ReceiveBlockMockErr error
|
||||||
}
|
}
|
||||||
|
|
||||||
// ForkChoicer mocks the same method in the chain service
|
// ForkChoicer mocks the same method in the chain service
|
||||||
@ -221,6 +222,9 @@ func (s *ChainService) ReceiveBlockBatch(ctx context.Context, blks []block.Signe
|
|||||||
|
|
||||||
// ReceiveBlock mocks ReceiveBlock method in chain service.
|
// ReceiveBlock mocks ReceiveBlock method in chain service.
|
||||||
func (s *ChainService) ReceiveBlock(ctx context.Context, block block.SignedBeaconBlock, _ [32]byte) error {
|
func (s *ChainService) ReceiveBlock(ctx context.Context, block block.SignedBeaconBlock, _ [32]byte) error {
|
||||||
|
if s.ReceiveBlockMockErr != nil {
|
||||||
|
return s.ReceiveBlockMockErr
|
||||||
|
}
|
||||||
if s.State == nil {
|
if s.State == nil {
|
||||||
return ErrNilState
|
return ErrNilState
|
||||||
}
|
}
|
||||||
|
@ -249,6 +249,9 @@ func handleRPCError(err error) error {
|
|||||||
if err == nil {
|
if err == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
if isTimeout(err) {
|
||||||
|
return errors.Wrapf(ErrHTTPTimeout, "%s", err)
|
||||||
|
}
|
||||||
e, ok := err.(rpc.Error)
|
e, ok := err.(rpc.Error)
|
||||||
if !ok {
|
if !ok {
|
||||||
return errors.Wrap(err, "got an unexpected error")
|
return errors.Wrap(err, "got an unexpected error")
|
||||||
@ -277,3 +280,16 @@ func handleRPCError(err error) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ErrHTTPTimeout returns true if the error is a http.Client timeout error.
|
||||||
|
var ErrHTTPTimeout = errors.New("timeout from http.Client")
|
||||||
|
|
||||||
|
type httpTimeoutError interface {
|
||||||
|
Error() string
|
||||||
|
Timeout() bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func isTimeout(e error) bool {
|
||||||
|
t, ok := e.(httpTimeoutError)
|
||||||
|
return ok && t.Timeout()
|
||||||
|
}
|
||||||
|
@ -491,7 +491,8 @@ func TestExchangeTransitionConfiguration(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type customError struct {
|
type customError struct {
|
||||||
code int
|
code int
|
||||||
|
timeout bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *customError) ErrorCode() int {
|
func (c *customError) ErrorCode() int {
|
||||||
@ -502,6 +503,10 @@ func (*customError) Error() string {
|
|||||||
return "something went wrong"
|
return "something went wrong"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *customError) Timeout() bool {
|
||||||
|
return c.timeout
|
||||||
|
}
|
||||||
|
|
||||||
type dataError struct {
|
type dataError struct {
|
||||||
code int
|
code int
|
||||||
data interface{}
|
data interface{}
|
||||||
@ -534,6 +539,11 @@ func Test_handleRPCError(t *testing.T) {
|
|||||||
expectedContains: "got an unexpected error",
|
expectedContains: "got an unexpected error",
|
||||||
given: errors.New("foo"),
|
given: errors.New("foo"),
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "HTTP times out",
|
||||||
|
expectedContains: ErrHTTPTimeout.Error(),
|
||||||
|
given: &customError{timeout: true},
|
||||||
|
},
|
||||||
{
|
{
|
||||||
name: "ErrParse",
|
name: "ErrParse",
|
||||||
expectedContains: ErrParse.Error(),
|
expectedContains: ErrParse.Error(),
|
||||||
|
@ -76,6 +76,7 @@ go_library(
|
|||||||
"//beacon-chain/p2p/encoder:go_default_library",
|
"//beacon-chain/p2p/encoder:go_default_library",
|
||||||
"//beacon-chain/p2p/peers:go_default_library",
|
"//beacon-chain/p2p/peers:go_default_library",
|
||||||
"//beacon-chain/p2p/types:go_default_library",
|
"//beacon-chain/p2p/types:go_default_library",
|
||||||
|
"//beacon-chain/powchain/engine-api-client/v1:go_default_library",
|
||||||
"//beacon-chain/state:go_default_library",
|
"//beacon-chain/state:go_default_library",
|
||||||
"//beacon-chain/state/stategen:go_default_library",
|
"//beacon-chain/state/stategen:go_default_library",
|
||||||
"//cache/lru:go_default_library",
|
"//cache/lru:go_default_library",
|
||||||
@ -184,6 +185,7 @@ go_test(
|
|||||||
"//beacon-chain/p2p/peers:go_default_library",
|
"//beacon-chain/p2p/peers:go_default_library",
|
||||||
"//beacon-chain/p2p/testing:go_default_library",
|
"//beacon-chain/p2p/testing:go_default_library",
|
||||||
"//beacon-chain/p2p/types:go_default_library",
|
"//beacon-chain/p2p/types:go_default_library",
|
||||||
|
"//beacon-chain/powchain/engine-api-client/v1:go_default_library",
|
||||||
"//beacon-chain/state:go_default_library",
|
"//beacon-chain/state:go_default_library",
|
||||||
"//beacon-chain/state/stategen:go_default_library",
|
"//beacon-chain/state/stategen:go_default_library",
|
||||||
"//beacon-chain/state/v1:go_default_library",
|
"//beacon-chain/state/v1:go_default_library",
|
||||||
|
@ -12,6 +12,7 @@ import (
|
|||||||
"github.com/prysmaticlabs/prysm/async"
|
"github.com/prysmaticlabs/prysm/async"
|
||||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
|
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
|
||||||
p2ptypes "github.com/prysmaticlabs/prysm/beacon-chain/p2p/types"
|
p2ptypes "github.com/prysmaticlabs/prysm/beacon-chain/p2p/types"
|
||||||
|
v1 "github.com/prysmaticlabs/prysm/beacon-chain/powchain/engine-api-client/v1"
|
||||||
"github.com/prysmaticlabs/prysm/config/params"
|
"github.com/prysmaticlabs/prysm/config/params"
|
||||||
"github.com/prysmaticlabs/prysm/crypto/rand"
|
"github.com/prysmaticlabs/prysm/crypto/rand"
|
||||||
"github.com/prysmaticlabs/prysm/encoding/bytesutil"
|
"github.com/prysmaticlabs/prysm/encoding/bytesutil"
|
||||||
@ -162,9 +163,11 @@ func (s *Service) processPendingBlocks(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if err := s.cfg.chain.ReceiveBlock(ctx, b, blkRoot); err != nil {
|
if err := s.cfg.chain.ReceiveBlock(ctx, b, blkRoot); err != nil {
|
||||||
log.Debugf("Could not process block from slot %d: %v", b.Block().Slot(), err)
|
if !errors.Is(err, v1.ErrHTTPTimeout) {
|
||||||
s.setBadBlock(ctx, blkRoot)
|
log.Debugf("Could not process block from slot %d: %v", b.Block().Slot(), err)
|
||||||
tracing.AnnotateError(span, err)
|
tracing.AnnotateError(span, err)
|
||||||
|
s.setBadBlock(ctx, blkRoot)
|
||||||
|
}
|
||||||
// In the next iteration of the queue, this block will be removed from
|
// In the next iteration of the queue, this block will be removed from
|
||||||
// the pending queue as it has been marked as a 'bad' block.
|
// the pending queue as it has been marked as a 'bad' block.
|
||||||
span.End()
|
span.End()
|
||||||
|
@ -19,6 +19,7 @@ import (
|
|||||||
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers"
|
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers"
|
||||||
p2ptest "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing"
|
p2ptest "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing"
|
||||||
p2ptypes "github.com/prysmaticlabs/prysm/beacon-chain/p2p/types"
|
p2ptypes "github.com/prysmaticlabs/prysm/beacon-chain/p2p/types"
|
||||||
|
v1 "github.com/prysmaticlabs/prysm/beacon-chain/powchain/engine-api-client/v1"
|
||||||
"github.com/prysmaticlabs/prysm/beacon-chain/state/stategen"
|
"github.com/prysmaticlabs/prysm/beacon-chain/state/stategen"
|
||||||
"github.com/prysmaticlabs/prysm/config/params"
|
"github.com/prysmaticlabs/prysm/config/params"
|
||||||
"github.com/prysmaticlabs/prysm/crypto/rand"
|
"github.com/prysmaticlabs/prysm/crypto/rand"
|
||||||
@ -110,6 +111,86 @@ func TestRegularSyncBeaconBlockSubscriber_ProcessPendingBlocks1(t *testing.T) {
|
|||||||
assert.Equal(t, 2, len(r.seenPendingBlocks), "Incorrect size for seen pending block")
|
assert.Equal(t, 2, len(r.seenPendingBlocks), "Incorrect size for seen pending block")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRegularSyncBeaconBlockSubscriber_ExecutionEngineTimesOut(t *testing.T) {
|
||||||
|
db := dbtest.SetupDB(t)
|
||||||
|
|
||||||
|
p1 := p2ptest.NewTestP2P(t)
|
||||||
|
r := &Service{
|
||||||
|
cfg: &config{
|
||||||
|
p2p: p1,
|
||||||
|
beaconDB: db,
|
||||||
|
chain: &mock.ChainService{
|
||||||
|
FinalizedCheckPoint: ðpb.Checkpoint{
|
||||||
|
Epoch: 0,
|
||||||
|
},
|
||||||
|
ReceiveBlockMockErr: v1.ErrHTTPTimeout,
|
||||||
|
},
|
||||||
|
stateGen: stategen.New(db),
|
||||||
|
},
|
||||||
|
slotToPendingBlocks: gcache.New(time.Second, 2*time.Second),
|
||||||
|
seenPendingBlocks: make(map[[32]byte]bool),
|
||||||
|
}
|
||||||
|
r.initCaches()
|
||||||
|
|
||||||
|
b0 := util.NewBeaconBlock()
|
||||||
|
wsb, err := wrapper.WrappedSignedBeaconBlock(b0)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, r.cfg.beaconDB.SaveBlock(context.Background(), wsb))
|
||||||
|
b0Root, err := b0.Block.HashTreeRoot()
|
||||||
|
require.NoError(t, err)
|
||||||
|
b3 := util.NewBeaconBlock()
|
||||||
|
b3.Block.Slot = 3
|
||||||
|
b3.Block.ParentRoot = b0Root[:]
|
||||||
|
wsb, err = wrapper.WrappedSignedBeaconBlock(b3)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, r.cfg.beaconDB.SaveBlock(context.Background(), wsb))
|
||||||
|
// Incomplete block link
|
||||||
|
b1 := util.NewBeaconBlock()
|
||||||
|
b1.Block.Slot = 1
|
||||||
|
b1.Block.ParentRoot = b0Root[:]
|
||||||
|
b1Root, err := b1.Block.HashTreeRoot()
|
||||||
|
require.NoError(t, err)
|
||||||
|
b2 := util.NewBeaconBlock()
|
||||||
|
b2.Block.Slot = 2
|
||||||
|
b2.Block.ParentRoot = b1Root[:]
|
||||||
|
b2Root, err := b1.Block.HashTreeRoot()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Add b2 to the cache
|
||||||
|
wsb, err = wrapper.WrappedSignedBeaconBlock(b2)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, r.insertBlockToPendingQueue(b2.Block.Slot, wsb, b2Root))
|
||||||
|
|
||||||
|
require.NoError(t, r.processPendingBlocks(context.Background()))
|
||||||
|
assert.Equal(t, 1, len(r.slotToPendingBlocks.Items()), "Incorrect size for slot to pending blocks cache")
|
||||||
|
assert.Equal(t, 1, len(r.seenPendingBlocks), "Incorrect size for seen pending block")
|
||||||
|
|
||||||
|
// Add b1 to the cache
|
||||||
|
wsb, err = wrapper.WrappedSignedBeaconBlock(b1)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, r.insertBlockToPendingQueue(b1.Block.Slot, wsb, b1Root))
|
||||||
|
wsb, err = wrapper.WrappedSignedBeaconBlock(b1)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, r.cfg.beaconDB.SaveBlock(context.Background(), wsb))
|
||||||
|
|
||||||
|
nBlock := util.NewBeaconBlock()
|
||||||
|
nBlock.Block.Slot = b1.Block.Slot
|
||||||
|
nRoot, err := nBlock.Block.HashTreeRoot()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Insert bad b1 in the cache to verify the good one doesn't get replaced.
|
||||||
|
wsb, err = wrapper.WrappedSignedBeaconBlock(nBlock)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, r.insertBlockToPendingQueue(nBlock.Block.Slot, wsb, nRoot))
|
||||||
|
require.NoError(t, r.processPendingBlocks(context.Background())) // Marks a block as bad
|
||||||
|
require.NoError(t, r.processPendingBlocks(context.Background())) // Bad block removed on second run
|
||||||
|
|
||||||
|
assert.Equal(t, 1, len(r.slotToPendingBlocks.Items()), "Incorrect size for slot to pending blocks cache")
|
||||||
|
assert.Equal(t, 2, len(r.seenPendingBlocks), "Incorrect size for seen pending block")
|
||||||
|
require.Equal(t, 1, len(r.badBlockCache.Keys())) // Account for the bad block above
|
||||||
|
require.Equal(t, 0, len(r.seenBlockCache.Keys()))
|
||||||
|
}
|
||||||
|
|
||||||
func TestRegularSync_InsertDuplicateBlocks(t *testing.T) {
|
func TestRegularSync_InsertDuplicateBlocks(t *testing.T) {
|
||||||
db := dbtest.SetupDB(t)
|
db := dbtest.SetupDB(t)
|
||||||
|
|
||||||
|
@ -3,8 +3,10 @@ package sync
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
|
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
|
||||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/transition/interop"
|
"github.com/prysmaticlabs/prysm/beacon-chain/core/transition/interop"
|
||||||
|
v1 "github.com/prysmaticlabs/prysm/beacon-chain/powchain/engine-api-client/v1"
|
||||||
"github.com/prysmaticlabs/prysm/config/features"
|
"github.com/prysmaticlabs/prysm/config/features"
|
||||||
ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
|
ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
|
||||||
"github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/wrapper"
|
"github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/wrapper"
|
||||||
@ -30,8 +32,10 @@ func (s *Service) beaconBlockSubscriber(ctx context.Context, msg proto.Message)
|
|||||||
}
|
}
|
||||||
|
|
||||||
if err := s.cfg.chain.ReceiveBlock(ctx, signed, root); err != nil {
|
if err := s.cfg.chain.ReceiveBlock(ctx, signed, root); err != nil {
|
||||||
interop.WriteBlockToDisk(signed, true /*failed*/)
|
if !errors.Is(err, v1.ErrHTTPTimeout) {
|
||||||
s.setBadBlock(ctx, root)
|
interop.WriteBlockToDisk(signed, true /*failed*/)
|
||||||
|
s.setBadBlock(ctx, root)
|
||||||
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -9,6 +9,8 @@ import (
|
|||||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
|
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
|
||||||
dbtest "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
|
dbtest "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
|
||||||
"github.com/prysmaticlabs/prysm/beacon-chain/operations/attestations"
|
"github.com/prysmaticlabs/prysm/beacon-chain/operations/attestations"
|
||||||
|
v1 "github.com/prysmaticlabs/prysm/beacon-chain/powchain/engine-api-client/v1"
|
||||||
|
lruwrpr "github.com/prysmaticlabs/prysm/cache/lru"
|
||||||
ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
|
ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
|
||||||
"github.com/prysmaticlabs/prysm/testing/assert"
|
"github.com/prysmaticlabs/prysm/testing/assert"
|
||||||
"github.com/prysmaticlabs/prysm/testing/require"
|
"github.com/prysmaticlabs/prysm/testing/require"
|
||||||
@ -108,3 +110,18 @@ func TestService_beaconBlockSubscriber(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestService_BeaconBlockSubscribe_ExecutionEngineTimesOut(t *testing.T) {
|
||||||
|
s := &Service{
|
||||||
|
cfg: &config{
|
||||||
|
chain: &chainMock.ChainService{
|
||||||
|
ReceiveBlockMockErr: v1.ErrHTTPTimeout,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
seenBlockCache: lruwrpr.New(10),
|
||||||
|
badBlockCache: lruwrpr.New(10),
|
||||||
|
}
|
||||||
|
require.ErrorIs(t, v1.ErrHTTPTimeout, s.beaconBlockSubscriber(context.Background(), util.NewBeaconBlock()))
|
||||||
|
require.Equal(t, 0, len(s.badBlockCache.Keys()))
|
||||||
|
require.Equal(t, 1, len(s.seenBlockCache.Keys()))
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user