Exit Pending Queue Properly (#7927)

* exit properly

* terence's review

* Update beacon-chain/sync/pending_blocks_queue.go

Co-authored-by: Preston Van Loon <preston@prysmaticlabs.com>

* Update beacon-chain/sync/pending_blocks_queue.go

Co-authored-by: Preston Van Loon <preston@prysmaticlabs.com>

* fix tests

Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
Co-authored-by: Preston Van Loon <preston@prysmaticlabs.com>
This commit is contained in:
Nishant Das 2020-11-24 03:34:40 +08:00 committed by GitHub
parent b0dfc46603
commit 6f766ed583
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 22 additions and 4 deletions

View File

@ -134,12 +134,20 @@ func (s *Service) processPendingBlocks(ctx context.Context) error {
log.Debugf("Could not validate block from slot %d: %v", b.Block.Slot, err) log.Debugf("Could not validate block from slot %d: %v", b.Block.Slot, err)
s.setBadBlock(ctx, blkRoot) s.setBadBlock(ctx, blkRoot)
traceutil.AnnotateError(span, err) traceutil.AnnotateError(span, err)
// In the next iteration of the queue, this block will be removed from
// the pending queue as it has been marked as a 'bad' block.
span.End()
continue
} }
if err := s.chain.ReceiveBlock(ctx, b, blkRoot); err != nil { if err := s.chain.ReceiveBlock(ctx, b, blkRoot); err != nil {
log.Debugf("Could not process block from slot %d: %v", b.Block.Slot, err) log.Debugf("Could not process block from slot %d: %v", b.Block.Slot, err)
s.setBadBlock(ctx, blkRoot) s.setBadBlock(ctx, blkRoot)
traceutil.AnnotateError(span, err) traceutil.AnnotateError(span, err)
// In the next iteration of the queue, this block will be removed from
// the pending queue as it has been marked as a 'bad' block.
span.End()
continue
} }
s.setSeenBlockIndexSlot(b.Block.Slot, b.Block.ProposerIndex) s.setSeenBlockIndexSlot(b.Block.Slot, b.Block.ProposerIndex)

View File

@ -82,7 +82,9 @@ func TestRegularSyncBeaconBlockSubscriber_ProcessPendingBlocks1(t *testing.T) {
// Insert bad b1 in the cache to verify the good one doesn't get replaced. // Insert bad b1 in the cache to verify the good one doesn't get replaced.
require.NoError(t, r.insertBlockToPendingQueue(b1.Block.Slot, testutil.NewBeaconBlock(), [32]byte{})) require.NoError(t, r.insertBlockToPendingQueue(b1.Block.Slot, testutil.NewBeaconBlock(), [32]byte{}))
require.NoError(t, r.processPendingBlocks(context.Background())) require.NoError(t, r.processPendingBlocks(context.Background())) // Marks a block as bad
require.NoError(t, r.processPendingBlocks(context.Background())) // Bad block removed on second run
assert.Equal(t, 1, len(r.slotToPendingBlocks.Items()), "Incorrect size for slot to pending blocks cache") assert.Equal(t, 1, len(r.slotToPendingBlocks.Items()), "Incorrect size for slot to pending blocks cache")
assert.Equal(t, 2, len(r.seenPendingBlocks), "Incorrect size for seen pending block") assert.Equal(t, 2, len(r.seenPendingBlocks), "Incorrect size for seen pending block")
} }
@ -213,14 +215,19 @@ func TestRegularSyncBeaconBlockSubscriber_ProcessPendingBlocks_2Chains(t *testin
require.NoError(t, r.insertBlockToPendingQueue(b4.Block.Slot, b4, b4Root)) require.NoError(t, r.insertBlockToPendingQueue(b4.Block.Slot, b4, b4Root))
require.NoError(t, r.insertBlockToPendingQueue(b5.Block.Slot, b5, b5Root)) require.NoError(t, r.insertBlockToPendingQueue(b5.Block.Slot, b5, b5Root))
require.NoError(t, r.processPendingBlocks(context.Background())) require.NoError(t, r.processPendingBlocks(context.Background())) // Marks a block as bad
require.NoError(t, r.processPendingBlocks(context.Background())) // Bad block removed on second run
assert.Equal(t, 2, len(r.slotToPendingBlocks.Items()), "Incorrect size for slot to pending blocks cache") assert.Equal(t, 2, len(r.slotToPendingBlocks.Items()), "Incorrect size for slot to pending blocks cache")
assert.Equal(t, 2, len(r.seenPendingBlocks), "Incorrect size for seen pending block") assert.Equal(t, 2, len(r.seenPendingBlocks), "Incorrect size for seen pending block")
// Add b3 to the cache // Add b3 to the cache
require.NoError(t, r.insertBlockToPendingQueue(b3.Block.Slot, b3, b3Root)) require.NoError(t, r.insertBlockToPendingQueue(b3.Block.Slot, b3, b3Root))
require.NoError(t, r.db.SaveBlock(context.Background(), b3)) require.NoError(t, r.db.SaveBlock(context.Background(), b3))
require.NoError(t, r.processPendingBlocks(context.Background()))
require.NoError(t, r.processPendingBlocks(context.Background())) // Marks a block as bad
require.NoError(t, r.processPendingBlocks(context.Background())) // Bad block removed on second run
assert.Equal(t, 1, len(r.slotToPendingBlocks.Items()), "Incorrect size for slot to pending blocks cache") assert.Equal(t, 1, len(r.slotToPendingBlocks.Items()), "Incorrect size for slot to pending blocks cache")
assert.Equal(t, 3, len(r.seenPendingBlocks), "Incorrect size for seen pending block") assert.Equal(t, 3, len(r.seenPendingBlocks), "Incorrect size for seen pending block")
@ -228,7 +235,10 @@ func TestRegularSyncBeaconBlockSubscriber_ProcessPendingBlocks_2Chains(t *testin
require.NoError(t, r.insertBlockToPendingQueue(b2.Block.Slot, b2, b2Root)) require.NoError(t, r.insertBlockToPendingQueue(b2.Block.Slot, b2, b2Root))
require.NoError(t, r.db.SaveBlock(context.Background(), b2)) require.NoError(t, r.db.SaveBlock(context.Background(), b2))
require.NoError(t, r.processPendingBlocks(context.Background()))
require.NoError(t, r.processPendingBlocks(context.Background())) // Marks a block as bad
require.NoError(t, r.processPendingBlocks(context.Background())) // Bad block removed on second run
assert.Equal(t, 0, len(r.slotToPendingBlocks.Items()), "Incorrect size for slot to pending blocks cache") assert.Equal(t, 0, len(r.slotToPendingBlocks.Items()), "Incorrect size for slot to pending blocks cache")
assert.Equal(t, 4, len(r.seenPendingBlocks), "Incorrect size for seen pending block") assert.Equal(t, 4, len(r.seenPendingBlocks), "Incorrect size for seen pending block")
} }