diff --git a/beacon-chain/sync/pending_blocks_queue.go b/beacon-chain/sync/pending_blocks_queue.go index 568370a99..c3439e5f4 100644 --- a/beacon-chain/sync/pending_blocks_queue.go +++ b/beacon-chain/sync/pending_blocks_queue.go @@ -55,6 +55,9 @@ func (s *Service) processPendingBlocks(ctx context.Context) error { ctx, span := trace.StartSpan(ctx, "processPendingBlocks") defer span.End() + // Remove old blocks from our expiration cache. + s.deleteExpiredBlocksFromCache() + // Validate pending slots before processing. if err := s.validatePendingSlots(); err != nil { return errors.Wrap(err, "could not validate pending slots") @@ -432,6 +435,15 @@ func (s *Service) deleteBlockFromPendingQueue(slot primitives.Slot, b interfaces return nil } +// This method manually clears our cache so that all expired +// entries are correctly removed. +func (s *Service) deleteExpiredBlocksFromCache() { + s.pendingQueueLock.Lock() + defer s.pendingQueueLock.Unlock() + + s.slotToPendingBlocks.DeleteExpired() +} + // Insert block to the list in the pending queue using the slot as key. // Note: this helper is not thread safe. func (s *Service) insertBlockToPendingQueue(_ primitives.Slot, b interfaces.ReadOnlySignedBeaconBlock, r [32]byte) error { diff --git a/beacon-chain/sync/pending_blocks_queue_test.go b/beacon-chain/sync/pending_blocks_queue_test.go index a21aacb0f..2bde21a27 100644 --- a/beacon-chain/sync/pending_blocks_queue_test.go +++ b/beacon-chain/sync/pending_blocks_queue_test.go @@ -884,3 +884,61 @@ func TestAlreadySyncingBlock(t *testing.T) { require.NoError(t, r.processPendingBlocks(ctx)) require.LogsContain(t, hook, "Skipping pending block already being processed") } + +func TestExpirationCache_PruneOldBlocksCorrectly(t *testing.T) { + ctx := context.Background() + db := dbtest.SetupDB(t) + + mockChain := &mock.ChainService{ + FinalizedCheckPoint: ðpb.Checkpoint{ + Epoch: 0, + }, + } + + p1 := p2ptest.NewTestP2P(t) + // Reset expiration time + currExpTime := pendingBlockExpTime + defer func() { + pendingBlockExpTime = currExpTime + }() + pendingBlockExpTime = 500 * time.Millisecond + + r := NewService(ctx, + WithStateGen(stategen.New(db, doublylinkedtree.New())), + WithDatabase(db), + WithChainService(mockChain), + WithP2P(p1), + ) + b1 := util.NewBeaconBlock() + b1.Block.Slot = 1 + b1.Block.ProposerIndex = 10 + b1Root, err := b1.Block.HashTreeRoot() + require.NoError(t, err) + wsb, err := blocks.NewSignedBeaconBlock(b1) + require.NoError(t, err) + require.NoError(t, r.insertBlockToPendingQueue(1, wsb, b1Root)) + + // Add new block with the same slot. + b2 := util.NewBeaconBlock() + b2.Block.Slot = 1 + b2.Block.ProposerIndex = 11 + b2Root, err := b2.Block.HashTreeRoot() + require.NoError(t, err) + wsb, err = blocks.NewSignedBeaconBlock(b2) + require.NoError(t, err) + require.NoError(t, r.insertBlockToPendingQueue(1, wsb, b2Root)) + + require.Equal(t, true, r.seenPendingBlocks[b1Root]) + require.Equal(t, true, r.seenPendingBlocks[b2Root]) + require.Equal(t, 2, len(r.pendingBlocksInCache(1))) + + // Wait for expiration cache to cleanup and remove old block. + time.Sleep(2 * pendingBlockExpTime) + + // Run pending queue with expired blocks. + require.NoError(t, r.processPendingBlocks(ctx)) + + assert.Equal(t, false, r.seenPendingBlocks[b1Root]) + assert.Equal(t, false, r.seenPendingBlocks[b2Root]) + assert.Equal(t, 0, len(r.pendingBlocksInCache(1))) +} diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go index 8adcca900..fe3378fe7 100644 --- a/beacon-chain/sync/service.go +++ b/beacon-chain/sync/service.go @@ -34,11 +34,13 @@ import ( "github.com/prysmaticlabs/prysm/v4/beacon-chain/state/stategen" lruwrpr "github.com/prysmaticlabs/prysm/v4/cache/lru" "github.com/prysmaticlabs/prysm/v4/config/params" + "github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces" leakybucket "github.com/prysmaticlabs/prysm/v4/container/leaky-bucket" ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/v4/runtime" prysmTime "github.com/prysmaticlabs/prysm/v4/time" "github.com/prysmaticlabs/prysm/v4/time/slots" + "github.com/trailofbits/go-mutexasserts" ) var _ runtime.Service = (*Service)(nil) @@ -152,7 +154,7 @@ type Service struct { // NewService initializes new regular sync service. func NewService(ctx context.Context, opts ...Option) *Service { - c := gcache.New(pendingBlockExpTime /* exp time */, 2*pendingBlockExpTime /* prune time */) + c := gcache.New(pendingBlockExpTime /* exp time */, 0 /* disable janitor */) ctx, cancel := context.WithCancel(ctx) r := &Service{ ctx: ctx, @@ -170,6 +172,28 @@ func NewService(ctx context.Context, opts ...Option) *Service { return nil } } + // Correctly remove it from our seen pending block map. + // The eviction method always assumes that the mutex is held. + r.slotToPendingBlocks.OnEvicted(func(s string, i interface{}) { + if !mutexasserts.RWMutexLocked(&r.pendingQueueLock) { + log.Errorf("Mutex is not locked during cache eviction of values") + // Continue on to allow elements to be properly removed. + } + blks, ok := i.([]interfaces.ReadOnlySignedBeaconBlock) + if !ok { + log.Errorf("Invalid type retrieved from the cache: %T", i) + return + } + + for _, b := range blks { + root, err := b.Block().HashTreeRoot() + if err != nil { + log.WithError(err).Error("Could not calculate htr of block") + continue + } + delete(r.seenPendingBlocks, root) + } + }) r.subHandler = newSubTopicHandler() r.rateLimiter = newRateLimiter(r.cfg.p2p) r.initCaches()