From 337c2541614f3641e79b4c87cf2db79e70affa41 Mon Sep 17 00:00:00 2001 From: Potuz Date: Wed, 26 Jul 2023 14:46:18 -0400 Subject: [PATCH] update shuffling caches at epoch boundary (#12661) * update shuffling caches at epoch boundary * move span * do not advance to a past slot --- beacon-chain/blockchain/process_block.go | 108 +++++++++--------- beacon-chain/blockchain/process_block_test.go | 3 +- beacon-chain/blockchain/receive_block.go | 12 +- beacon-chain/blockchain/service.go | 32 +++--- 4 files changed, 80 insertions(+), 75 deletions(-) diff --git a/beacon-chain/blockchain/process_block.go b/beacon-chain/blockchain/process_block.go index 36c2c8ef8..68636a32f 100644 --- a/beacon-chain/blockchain/process_block.go +++ b/beacon-chain/blockchain/process_block.go @@ -126,8 +126,16 @@ func (s *Service) postBlockProcess(ctx context.Context, signed interfaces.ReadOn }) defer reportAttestationInclusion(b) - if err := s.handleEpochBoundary(ctx, postState, blockRoot[:]); err != nil { - return err + // Get the current head state (it may be different than the incoming + // postState) and update epoch boundary caches. We pass the postState + // slot instead of the headState slot below to deal with the case of an + // incoming non-canonical block + st, err := s.HeadState(ctx) + if err != nil { + return errors.Wrap(err, "could not get headState") + } + if err := s.handleEpochBoundary(ctx, postState.Slot(), st, headRoot[:]); err != nil { + return errors.Wrap(err, "could not handle epoch boundary") } onBlockProcessingTime.Observe(float64(time.Since(startTime).Milliseconds())) return nil @@ -323,60 +331,45 @@ func (s *Service) onBlockBatch(ctx context.Context, blks []interfaces.ReadOnlySi return s.saveHeadNoDB(ctx, lastB, lastBR, preState) } -// Epoch boundary bookkeeping such as logging epoch summaries. -func (s *Service) handleEpochBoundary(ctx context.Context, postState state.BeaconState, blockRoot []byte) error { +func (s *Service) updateEpochBoundaryCaches(ctx context.Context, st state.BeaconState) error { + e := coreTime.CurrentEpoch(st) + if err := helpers.UpdateCommitteeCache(ctx, st, e); err != nil { + return errors.Wrap(err, "could not update committee cache") + } + if err := helpers.UpdateProposerIndicesInCache(ctx, st, e); err != nil { + return errors.Wrap(err, "could not update proposer index cache") + } + go func() { + // Use a custom deadline here, since this method runs asynchronously. + // We ignore the parent method's context and instead create a new one + // with a custom deadline, therefore using the background context instead. + slotCtx, cancel := context.WithTimeout(context.Background(), slotDeadline) + defer cancel() + if err := helpers.UpdateProposerIndicesInCache(slotCtx, st, e+1); err != nil { + log.WithError(err).Warn("Failed to cache next epoch proposers") + } + }() + return nil +} + +// Epoch boundary tasks: it copies the headState and updates the epoch boundary +// caches. +func (s *Service) handleEpochBoundary(ctx context.Context, slot primitives.Slot, headState state.BeaconState, blockRoot []byte) error { ctx, span := trace.StartSpan(ctx, "blockChain.handleEpochBoundary") defer span.End() - - var err error - if postState.Slot()+1 == s.nextEpochBoundarySlot { - copied := postState.Copy() - copied, err := transition.ProcessSlotsUsingNextSlotCache(ctx, copied, blockRoot, copied.Slot()+1) - if err != nil { - return err - } - // Update caches for the next epoch at epoch boundary slot - 1. - if err := helpers.UpdateCommitteeCache(ctx, copied, coreTime.CurrentEpoch(copied)); err != nil { - return err - } - e := coreTime.CurrentEpoch(copied) - if err := helpers.UpdateProposerIndicesInCache(ctx, copied, e); err != nil { - return err - } - go func() { - // Use a custom deadline here, since this method runs asynchronously. - // We ignore the parent method's context and instead create a new one - // with a custom deadline, therefore using the background context instead. - slotCtx, cancel := context.WithTimeout(context.Background(), slotDeadline) - defer cancel() - if err := helpers.UpdateProposerIndicesInCache(slotCtx, copied, e+1); err != nil { - log.WithError(err).Warn("Failed to cache next epoch proposers") - } - }() - } else if postState.Slot() >= s.nextEpochBoundarySlot { - s.nextEpochBoundarySlot, err = slots.EpochStart(coreTime.NextEpoch(postState)) - if err != nil { - return err - } - - // Update caches at epoch boundary slot. - // The following updates have shortcut to return nil cheaply if fulfilled during boundary slot - 1. - if err := helpers.UpdateCommitteeCache(ctx, postState, coreTime.CurrentEpoch(postState)); err != nil { - return err - } - if err := helpers.UpdateProposerIndicesInCache(ctx, postState, coreTime.CurrentEpoch(postState)); err != nil { - return err - } - - headSt, err := s.HeadState(ctx) - if err != nil { - return err - } - if err := reportEpochMetrics(ctx, postState, headSt); err != nil { - return err - } + // return early if we are advancing to a past epoch + if slot < headState.Slot() { + return nil } - return nil + if (slot+1)%params.BeaconConfig().SlotsPerEpoch != 0 { + return nil + } + copied := headState.Copy() + copied, err := transition.ProcessSlotsUsingNextSlotCache(ctx, copied, blockRoot, slot+1) + if err != nil { + return err + } + return s.updateEpochBoundaryCaches(ctx, copied) } // This feeds in the attestations included in the block to fork choice store. It's allows fork choice store @@ -507,8 +500,9 @@ func (s *Service) runLateBlockTasks() { // lateBlockTasks is called 4 seconds into the slot and performs tasks // related to late blocks. It emits a MissedSlot state feed event. // It calls FCU and sets the right attributes if we are proposing next slot -// it also updates the next slot cache to deal with skipped slots. +// it also updates the next slot cache and the proposer index cache to deal with skipped slots. func (s *Service) lateBlockTasks(ctx context.Context) { + currentSlot := s.CurrentSlot() if s.CurrentSlot() == s.HeadSlot() { return } @@ -516,8 +510,10 @@ func (s *Service) lateBlockTasks(ctx context.Context) { Type: statefeed.MissedSlot, }) + s.headLock.RLock() headRoot := s.headRoot() headState := s.headState(ctx) + s.headLock.RUnlock() lastRoot, lastState := transition.LastCachedState() if lastState == nil { lastRoot, lastState = headRoot[:], headState @@ -528,7 +524,9 @@ func (s *Service) lateBlockTasks(ctx context.Context) { if err := transition.UpdateNextSlotCache(ctx, lastRoot, lastState); err != nil { log.WithError(err).Debug("could not update next slot state cache") } - + if err := s.handleEpochBoundary(ctx, currentSlot, headState, headRoot[:]); err != nil { + log.WithError(err).Error("lateBlockTasks: could not update epoch boundary caches") + } // Head root should be empty when retrieving proposer index for the next slot. _, id, has := s.cfg.ProposerSlotIndexCache.GetProposerPayloadIDs(s.CurrentSlot()+1, [32]byte{} /* head root */) // There exists proposer for next slot, but we haven't called fcu w/ payload attribute yet. diff --git a/beacon-chain/blockchain/process_block_test.go b/beacon-chain/blockchain/process_block_test.go index a4c7a1f8b..ac83ecb1c 100644 --- a/beacon-chain/blockchain/process_block_test.go +++ b/beacon-chain/blockchain/process_block_test.go @@ -539,8 +539,7 @@ func TestHandleEpochBoundary_UpdateFirstSlot(t *testing.T) { s, _ := util.DeterministicGenesisState(t, 1024) service.head = &head{state: s} require.NoError(t, s.SetSlot(2*params.BeaconConfig().SlotsPerEpoch)) - require.NoError(t, service.handleEpochBoundary(ctx, s, []byte{})) - require.Equal(t, 3*params.BeaconConfig().SlotsPerEpoch, service.nextEpochBoundarySlot) + require.NoError(t, service.handleEpochBoundary(ctx, s.Slot(), s, []byte{})) } func TestOnBlock_CanFinalize_WithOnTick(t *testing.T) { diff --git a/beacon-chain/blockchain/receive_block.go b/beacon-chain/blockchain/receive_block.go index d14310871..23f974308 100644 --- a/beacon-chain/blockchain/receive_block.go +++ b/beacon-chain/blockchain/receive_block.go @@ -8,6 +8,7 @@ import ( "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed" statefeed "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed/state" "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/helpers" + coreTime "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/time" "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/transition" "github.com/prysmaticlabs/prysm/v4/beacon-chain/state" "github.com/prysmaticlabs/prysm/v4/config/features" @@ -61,6 +62,7 @@ func (s *Service) ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySig // Save current justified and finalized epochs for future use. currStoreJustifiedEpoch := s.CurrentJustifiedCheckpt().Epoch currStoreFinalizedEpoch := s.FinalizedCheckpt().Epoch + currentEpoch := coreTime.CurrentEpoch(preState) preStateVersion, preStateHeader, err := getStateVersionAndPayload(preState) if err != nil { @@ -98,7 +100,15 @@ func (s *Service) ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySig tracing.AnnotateError(span, err) return err } - + if coreTime.CurrentEpoch(postState) > currentEpoch { + headSt, err := s.HeadState(ctx) + if err != nil { + return errors.Wrap(err, "could not get head state") + } + if err := reportEpochMetrics(ctx, postState, headSt); err != nil { + log.WithError(err).Error("could not report epoch metrics") + } + } if err := s.updateJustificationOnBlock(ctx, preState, postState, currStoreJustifiedEpoch); err != nil { return errors.Wrap(err, "could not update justified checkpoint") } diff --git a/beacon-chain/blockchain/service.go b/beacon-chain/blockchain/service.go index cc39c5f49..ffb1ab791 100644 --- a/beacon-chain/blockchain/service.go +++ b/beacon-chain/blockchain/service.go @@ -35,7 +35,6 @@ import ( "github.com/prysmaticlabs/prysm/v4/config/params" "github.com/prysmaticlabs/prysm/v4/consensus-types/blocks" "github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces" - "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" "github.com/prysmaticlabs/prysm/v4/encoding/bytesutil" ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" prysmTime "github.com/prysmaticlabs/prysm/v4/time" @@ -46,22 +45,21 @@ import ( // Service represents a service that handles the internal // logic of managing the full PoS beacon chain. type Service struct { - cfg *config - ctx context.Context - cancel context.CancelFunc - genesisTime time.Time - head *head - headLock sync.RWMutex - originBlockRoot [32]byte // genesis root, or weak subjectivity checkpoint root, depending on how the node is initialized - nextEpochBoundarySlot primitives.Slot - boundaryRoots [][32]byte - checkpointStateCache *cache.CheckpointStateCache - initSyncBlocks map[[32]byte]interfaces.ReadOnlySignedBeaconBlock - initSyncBlocksLock sync.RWMutex - wsVerifier *WeakSubjectivityVerifier - clockSetter startup.ClockSetter - clockWaiter startup.ClockWaiter - syncComplete chan struct{} + cfg *config + ctx context.Context + cancel context.CancelFunc + genesisTime time.Time + head *head + headLock sync.RWMutex + originBlockRoot [32]byte // genesis root, or weak subjectivity checkpoint root, depending on how the node is initialized + boundaryRoots [][32]byte + checkpointStateCache *cache.CheckpointStateCache + initSyncBlocks map[[32]byte]interfaces.ReadOnlySignedBeaconBlock + initSyncBlocksLock sync.RWMutex + wsVerifier *WeakSubjectivityVerifier + clockSetter startup.ClockSetter + clockWaiter startup.ClockWaiter + syncComplete chan struct{} } // config options for the service.