From 67dccc5e43967c26686f238e1796dcc906b05e68 Mon Sep 17 00:00:00 2001 From: Potuz Date: Fri, 5 Jan 2024 23:29:07 -0300 Subject: [PATCH] Break out several helpers from `postBlockProcess` (#13419) * Break out several helpers from `postBlockProcess` In addition fix a bug found by @terencechain where we should use a slot context instead of the parent context in the second FCU call. * Remove calls for tracked proposer getPayloadAttribute already takes care of this Also compute correctly the time into voting window * call with attributes only when incoming block is canonical * check for empty payload instead of only nil * add unit tests * move log for non-canonical block * return early if the incoming block does not change head * Pass fcuArgs as arguments * lint --- .../blockchain/execution_engine_test.go | 32 ++++ .../blockchain/forkchoice_update_execution.go | 35 ++++ beacon-chain/blockchain/process_block.go | 166 +++++------------- .../blockchain/process_block_helpers.go | 127 +++++++++++++- beacon-chain/blockchain/process_block_test.go | 91 ++++++---- .../blockchain/receive_attestation.go | 9 +- .../blockchain/receive_attestation_test.go | 4 +- beacon-chain/blockchain/receive_block.go | 9 +- time/slots/slottime.go | 4 +- time/slots/slottime_test.go | 4 +- 10 files changed, 311 insertions(+), 170 deletions(-) diff --git a/beacon-chain/blockchain/execution_engine_test.go b/beacon-chain/blockchain/execution_engine_test.go index 3c76db73b..46872771c 100644 --- a/beacon-chain/blockchain/execution_engine_test.go +++ b/beacon-chain/blockchain/execution_engine_test.go @@ -1096,3 +1096,35 @@ func TestKZGCommitmentToVersionedHashes(t *testing.T) { require.Equal(t, vhs[0].String(), vh0) require.Equal(t, vhs[1].String(), vh1) } + +func TestComputePayloadAttribute(t *testing.T) { + service, tr := minimalTestService(t, WithPayloadIDCache(cache.NewPayloadIDCache())) + ctx := tr.ctx + + st, _ := util.DeterministicGenesisStateBellatrix(t, 1) + + service.cfg.TrackedValidatorsCache.Set(cache.TrackedValidator{Active: true, Index: 0}) + // Cache hit, advance state, no fee recipient + slot := primitives.Slot(1) + service.cfg.PayloadIDCache.Set(slot, [32]byte{}, [8]byte{}) + cfg := &postBlockProcessConfig{ + ctx: ctx, + blockRoot: [32]byte{'a'}, + } + fcu := &fcuConfig{ + headState: st, + proposingSlot: slot, + headRoot: [32]byte{}, + } + require.NoError(t, service.computePayloadAttributes(cfg, fcu)) + require.Equal(t, false, fcu.attributes.IsEmpty()) + require.Equal(t, params.BeaconConfig().EthBurnAddressHex, common.BytesToAddress(fcu.attributes.SuggestedFeeRecipient()).String()) + + // Cache hit, advance state, has fee recipient + suggestedAddr := common.HexToAddress("123") + service.cfg.TrackedValidatorsCache.Set(cache.TrackedValidator{Active: true, FeeRecipient: primitives.ExecutionAddress(suggestedAddr), Index: 0}) + service.cfg.PayloadIDCache.Set(slot, [32]byte{}, [8]byte{}) + require.NoError(t, service.computePayloadAttributes(cfg, fcu)) + require.Equal(t, false, fcu.attributes.IsEmpty()) + require.Equal(t, suggestedAddr, common.BytesToAddress(fcu.attributes.SuggestedFeeRecipient())) +} diff --git a/beacon-chain/blockchain/forkchoice_update_execution.go b/beacon-chain/blockchain/forkchoice_update_execution.go index 4f51175b1..bb334dae2 100644 --- a/beacon-chain/blockchain/forkchoice_update_execution.go +++ b/beacon-chain/blockchain/forkchoice_update_execution.go @@ -52,6 +52,41 @@ type fcuConfig struct { attributes payloadattribute.Attributer } +// sendFCU handles the logic to notify the engine of a forckhoice update +// for the first time when processing an incoming block during regular sync. It +// always updates the shuffling caches and handles epoch transitions when the +// incoming block is late, preparing payload attributes in this case while it +// only sends a message with empty attributes for early blocks. +func (s *Service) sendFCU(cfg *postBlockProcessConfig, fcuArgs *fcuConfig) error { + if !s.isNewHead(cfg.headRoot) { + return nil + } + if fcuArgs.attributes != nil && !fcuArgs.attributes.IsEmpty() && s.shouldOverrideFCU(cfg.headRoot, s.CurrentSlot()+1) { + return nil + } + return s.forkchoiceUpdateWithExecution(cfg.ctx, fcuArgs) +} + +// sendFCUWithAttributes computes the payload attributes and sends an FCU message +// to the engine if needed +func (s *Service) sendFCUWithAttributes(cfg *postBlockProcessConfig, fcuArgs *fcuConfig) { + slotCtx, cancel := context.WithTimeout(context.Background(), slotDeadline) + defer cancel() + cfg.ctx = slotCtx + if err := s.computePayloadAttributes(cfg, fcuArgs); err != nil { + log.WithError(err).Error("could not compute payload attributes") + return + } + if fcuArgs.attributes.IsEmpty() { + return + } + s.cfg.ForkChoiceStore.RLock() + defer s.cfg.ForkChoiceStore.RUnlock() + if _, err := s.notifyForkchoiceUpdate(cfg.ctx, fcuArgs); err != nil { + log.WithError(err).Error("could not update forkchoice with payload attributes for proposal") + } +} + // fockchoiceUpdateWithExecution is a wrapper around notifyForkchoiceUpdate. It decides whether a new call to FCU should be made. func (s *Service) forkchoiceUpdateWithExecution(ctx context.Context, args *fcuConfig) error { _, span := trace.StartSpan(ctx, "beacon-chain.blockchain.forkchoiceUpdateWithExecution") diff --git a/beacon-chain/blockchain/process_block.go b/beacon-chain/blockchain/process_block.go index 5a4238f30..09f244ad3 100644 --- a/beacon-chain/blockchain/process_block.go +++ b/beacon-chain/blockchain/process_block.go @@ -28,7 +28,6 @@ import ( "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1/attestation" "github.com/prysmaticlabs/prysm/v4/runtime/version" "github.com/prysmaticlabs/prysm/v4/time/slots" - "github.com/sirupsen/logrus" "go.opencensus.io/trace" ) @@ -41,149 +40,67 @@ const depositDeadline = 20 * time.Second // This defines size of the upper bound for initial sync block cache. var initialSyncBlockCacheSize = uint64(2 * params.BeaconConfig().SlotsPerEpoch) +// postBlockProcessConfig is a structure that contains the data needed to +// process the beacon block after validating the state transition function +type postBlockProcessConfig struct { + ctx context.Context + signed interfaces.ReadOnlySignedBeaconBlock + blockRoot [32]byte + headRoot [32]byte + postState state.BeaconState + isValidPayload bool +} + // postBlockProcess is called when a gossip block is received. This function performs // several duties most importantly informing the engine if head was updated, // saving the new head information to the blockchain package and // handling attestations, slashings and similar included in the block. -func (s *Service) postBlockProcess(ctx context.Context, signed interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte, postState state.BeaconState, isValidPayload bool) error { - ctx, span := trace.StartSpan(ctx, "blockChain.onBlock") +func (s *Service) postBlockProcess(cfg *postBlockProcessConfig) error { + ctx, span := trace.StartSpan(cfg.ctx, "blockChain.onBlock") defer span.End() - if err := consensusblocks.BeaconBlockIsNil(signed); err != nil { + cfg.ctx = ctx + if err := consensusblocks.BeaconBlockIsNil(cfg.signed); err != nil { return invalidBlock{error: err} } startTime := time.Now() - b := signed.Block() + fcuArgs := &fcuConfig{} - if err := s.cfg.ForkChoiceStore.InsertNode(ctx, postState, blockRoot); err != nil { - return errors.Wrapf(err, "could not insert block %d to fork choice store", signed.Block().Slot()) + defer s.handleSecondFCUCall(cfg, fcuArgs) + defer s.sendStateFeedOnBlock(cfg) + defer reportProcessingTime(startTime) + defer reportAttestationInclusion(cfg.signed.Block()) + + err := s.cfg.ForkChoiceStore.InsertNode(ctx, cfg.postState, cfg.blockRoot) + if err != nil { + return errors.Wrapf(err, "could not insert block %d to fork choice store", cfg.signed.Block().Slot()) } - if err := s.handleBlockAttestations(ctx, signed.Block(), postState); err != nil { + if err := s.handleBlockAttestations(ctx, cfg.signed.Block(), cfg.postState); err != nil { return errors.Wrap(err, "could not handle block's attestations") } - s.InsertSlashingsToForkChoiceStore(ctx, signed.Block().Body().AttesterSlashings()) - if isValidPayload { - if err := s.cfg.ForkChoiceStore.SetOptimisticToValid(ctx, blockRoot); err != nil { + s.InsertSlashingsToForkChoiceStore(ctx, cfg.signed.Block().Body().AttesterSlashings()) + if cfg.isValidPayload { + if err := s.cfg.ForkChoiceStore.SetOptimisticToValid(ctx, cfg.blockRoot); err != nil { return errors.Wrap(err, "could not set optimistic block to valid") } } - start := time.Now() - headRoot, err := s.cfg.ForkChoiceStore.Head(ctx) + cfg.headRoot, err = s.cfg.ForkChoiceStore.Head(ctx) if err != nil { log.WithError(err).Warn("Could not update head") } newBlockHeadElapsedTime.Observe(float64(time.Since(start).Milliseconds())) - proposingSlot := s.CurrentSlot() + 1 - var fcuArgs *fcuConfig - if blockRoot != headRoot { - receivedWeight, err := s.cfg.ForkChoiceStore.Weight(blockRoot) - if err != nil { - log.WithField("root", fmt.Sprintf("%#x", blockRoot)).Warn("could not determine node weight") - } - headWeight, err := s.cfg.ForkChoiceStore.Weight(headRoot) - if err != nil { - log.WithField("root", fmt.Sprintf("%#x", headRoot)).Warn("could not determine node weight") - } - log.WithFields(logrus.Fields{ - "receivedRoot": fmt.Sprintf("%#x", blockRoot), - "receivedWeight": receivedWeight, - "headRoot": fmt.Sprintf("%#x", headRoot), - "headWeight": headWeight, - }).Debug("Head block is not the received block") - headState, headBlock, err := s.getStateAndBlock(ctx, headRoot) - if err != nil { - log.WithError(err).Error("Could not get forkchoice update argument") - return nil - } - fcuArgs = &fcuConfig{ - headState: headState, - headBlock: headBlock, - headRoot: headRoot, - proposingSlot: proposingSlot, - } - } else { - fcuArgs = &fcuConfig{ - headState: postState, - headBlock: signed, - headRoot: headRoot, - proposingSlot: proposingSlot, - } + if cfg.headRoot != cfg.blockRoot { + s.logNonCanonicalBlockReceived(cfg.blockRoot, cfg.headRoot) + return nil } - isEarly := slots.WithinVotingWindow(uint64(s.genesisTime.Unix())) - shouldOverrideFCU := false - slot := postState.Slot() - if s.isNewHead(headRoot) { - // if the block is early send FCU without any payload attributes - if isEarly { - if err := s.forkchoiceUpdateWithExecution(ctx, fcuArgs); err != nil { - return err - } - } else { - // if the block is late lock and update the caches - if blockRoot == headRoot { - if err := transition.UpdateNextSlotCache(ctx, blockRoot[:], postState); err != nil { - return errors.Wrap(err, "could not update next slot state cache") - } - if slots.IsEpochEnd(slot) { - if err := s.handleEpochBoundary(ctx, slot, postState, blockRoot[:]); err != nil { - return errors.Wrap(err, "could not handle epoch boundary") - } - } - } - _, tracked := s.trackedProposer(fcuArgs.headState, proposingSlot) - if tracked { - shouldOverrideFCU = s.shouldOverrideFCU(headRoot, proposingSlot) - fcuArgs.attributes = s.getPayloadAttribute(ctx, fcuArgs.headState, proposingSlot, headRoot[:]) - } - if !shouldOverrideFCU { - if err := s.forkchoiceUpdateWithExecution(ctx, fcuArgs); err != nil { - return err - } - } - } + if err := s.getFCUArgs(cfg, fcuArgs); err != nil { + log.WithError(err).Error("Could not get forkchoice update argument") + return nil } - optimistic, err := s.cfg.ForkChoiceStore.IsOptimistic(blockRoot) - if err != nil { - log.WithError(err).Debug("Could not check if block is optimistic") - optimistic = true + if err := s.sendFCU(cfg, fcuArgs); err != nil { + return errors.Wrap(err, "could not send FCU to engine") } - // Send notification of the processed block to the state feed. - s.cfg.StateNotifier.StateFeed().Send(&feed.Event{ - Type: statefeed.BlockProcessed, - Data: &statefeed.BlockProcessedData{ - Slot: signed.Block().Slot(), - BlockRoot: blockRoot, - SignedBlock: signed, - Verified: true, - Optimistic: optimistic, - }, - }) - if blockRoot == headRoot && isEarly { - go func() { - slotCtx, cancel := context.WithTimeout(context.Background(), slotDeadline) - defer cancel() - if err := transition.UpdateNextSlotCache(slotCtx, blockRoot[:], postState); err != nil { - log.WithError(err).Error("could not update next slot state cache") - } - if slots.IsEpochEnd(slot) { - if err := s.handleEpochBoundary(ctx, slot, postState, blockRoot[:]); err != nil { - log.WithError(err).Error("could not handle epoch boundary") - } - } - if _, tracked := s.trackedProposer(fcuArgs.headState, proposingSlot); !tracked { - return - } - fcuArgs.attributes = s.getPayloadAttribute(ctx, fcuArgs.headState, proposingSlot, headRoot[:]) - s.cfg.ForkChoiceStore.RLock() - defer s.cfg.ForkChoiceStore.RUnlock() - if _, err := s.notifyForkchoiceUpdate(ctx, fcuArgs); err != nil { - log.WithError(err).Error("could not update forkchoice with payload attributes for proposal") - } - }() - } - defer reportAttestationInclusion(b) - onBlockProcessingTime.Observe(float64(time.Since(startTime).Milliseconds())) return nil } @@ -714,11 +631,6 @@ func (s *Service) lateBlockTasks(ctx context.Context) { log.WithError(err).Error("lateBlockTasks: could not update epoch boundary caches") } s.cfg.ForkChoiceStore.RUnlock() - _, tracked := s.trackedProposer(headState, s.CurrentSlot()+1) - // return early if we are not proposing next slot. - if !tracked { - return - } // return early if we already started building a block for the current // head root _, has := s.cfg.PayloadIDCache.PayloadID(s.CurrentSlot()+1, headRoot) @@ -740,6 +652,10 @@ func (s *Service) lateBlockTasks(ctx context.Context) { headBlock: headBlock, } fcuArgs.attributes = s.getPayloadAttribute(ctx, headState, s.CurrentSlot()+1, headRoot[:]) + // return early if we are not proposing next slot + if fcuArgs.attributes.IsEmpty() { + return + } _, err = s.notifyForkchoiceUpdate(ctx, fcuArgs) s.cfg.ForkChoiceStore.RUnlock() if err != nil { diff --git a/beacon-chain/blockchain/process_block_helpers.go b/beacon-chain/blockchain/process_block_helpers.go index 54e773543..ad214fbc7 100644 --- a/beacon-chain/blockchain/process_block_helpers.go +++ b/beacon-chain/blockchain/process_block_helpers.go @@ -3,9 +3,13 @@ package blockchain import ( "context" "fmt" + "time" "github.com/ethereum/go-ethereum/common" "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed" + statefeed "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed/state" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/transition" doublylinkedtree "github.com/prysmaticlabs/prysm/v4/beacon-chain/forkchoice/doubly-linked-tree" forkchoicetypes "github.com/prysmaticlabs/prysm/v4/beacon-chain/forkchoice/types" "github.com/prysmaticlabs/prysm/v4/beacon-chain/state" @@ -15,8 +19,8 @@ import ( "github.com/prysmaticlabs/prysm/v4/encoding/bytesutil" mathutil "github.com/prysmaticlabs/prysm/v4/math" ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" - "github.com/prysmaticlabs/prysm/v4/time" "github.com/prysmaticlabs/prysm/v4/time/slots" + "github.com/sirupsen/logrus" "go.opencensus.io/trace" ) @@ -25,6 +29,127 @@ func (s *Service) CurrentSlot() primitives.Slot { return slots.CurrentSlot(uint64(s.genesisTime.Unix())) } +// getFCUArgs returns the arguments to call forkchoice update +func (s *Service) getFCUArgs(cfg *postBlockProcessConfig, fcuArgs *fcuConfig) error { + if err := s.getFCUArgsEarlyBlock(cfg, fcuArgs); err != nil { + return err + } + slot := cfg.signed.Block().Slot() + if slots.WithinVotingWindow(uint64(s.genesisTime.Unix()), slot) { + return nil + } + return s.computePayloadAttributes(cfg, fcuArgs) +} + +func (s *Service) getFCUArgsEarlyBlock(cfg *postBlockProcessConfig, fcuArgs *fcuConfig) error { + if cfg.blockRoot == cfg.headRoot { + fcuArgs.headState = cfg.postState + fcuArgs.headBlock = cfg.signed + fcuArgs.headRoot = cfg.headRoot + fcuArgs.proposingSlot = s.CurrentSlot() + 1 + return nil + } + return s.fcuArgsNonCanonicalBlock(cfg, fcuArgs) +} + +// logNonCanonicalBlockReceived prints a message informing that the received +// block is not the head of the chain. It requires the caller holds a lock on +// Foprkchoice. +func (s *Service) logNonCanonicalBlockReceived(blockRoot [32]byte, headRoot [32]byte) { + receivedWeight, err := s.cfg.ForkChoiceStore.Weight(blockRoot) + if err != nil { + log.WithField("root", fmt.Sprintf("%#x", blockRoot)).Warn("could not determine node weight") + } + headWeight, err := s.cfg.ForkChoiceStore.Weight(headRoot) + if err != nil { + log.WithField("root", fmt.Sprintf("%#x", headRoot)).Warn("could not determine node weight") + } + log.WithFields(logrus.Fields{ + "receivedRoot": fmt.Sprintf("%#x", blockRoot), + "receivedWeight": receivedWeight, + "headRoot": fmt.Sprintf("%#x", headRoot), + "headWeight": headWeight, + }).Debug("Head block is not the received block") +} + +// fcuArgsNonCanonicalBlock returns the arguments to the FCU call when the +// incoming block is non-canonical, that is, based on the head root. +func (s *Service) fcuArgsNonCanonicalBlock(cfg *postBlockProcessConfig, fcuArgs *fcuConfig) error { + headState, headBlock, err := s.getStateAndBlock(cfg.ctx, cfg.headRoot) + if err != nil { + return err + } + fcuArgs.headState = headState + fcuArgs.headBlock = headBlock + fcuArgs.headRoot = cfg.headRoot + fcuArgs.proposingSlot = s.CurrentSlot() + 1 + return nil +} + +// sendStateFeedOnBlock sends an event that a new block has been synced +func (s *Service) sendStateFeedOnBlock(cfg *postBlockProcessConfig) { + optimistic, err := s.cfg.ForkChoiceStore.IsOptimistic(cfg.blockRoot) + if err != nil { + log.WithError(err).Debug("Could not check if block is optimistic") + optimistic = true + } + // Send notification of the processed block to the state feed. + s.cfg.StateNotifier.StateFeed().Send(&feed.Event{ + Type: statefeed.BlockProcessed, + Data: &statefeed.BlockProcessedData{ + Slot: cfg.signed.Block().Slot(), + BlockRoot: cfg.blockRoot, + SignedBlock: cfg.signed, + Verified: true, + Optimistic: optimistic, + }, + }) +} + +// updateCachesPostBlockProcessing updates the next slot cache and handles the epoch +// boundary in order to compute the right proposer indices after processing +// state transition. This function is called on late blocks while still locked, +// before sending FCU to the engine. +func (s *Service) updateCachesPostBlockProcessing(cfg *postBlockProcessConfig) error { + slot := cfg.postState.Slot() + if err := transition.UpdateNextSlotCache(cfg.ctx, cfg.blockRoot[:], cfg.postState); err != nil { + return errors.Wrap(err, "could not update next slot state cache") + } + if !slots.IsEpochEnd(slot) { + return nil + } + return s.handleEpochBoundary(cfg.ctx, slot, cfg.postState, cfg.blockRoot[:]) +} + +// handleSecondFCUCall handles a second call to FCU when syncing a new block. +// This is useful when proposing in the next block and we want to defer the +// computation of the next slot shuffling. +func (s *Service) handleSecondFCUCall(cfg *postBlockProcessConfig, fcuArgs *fcuConfig) { + if (fcuArgs.attributes == nil || fcuArgs.attributes.IsEmpty()) && cfg.headRoot == cfg.blockRoot { + go s.sendFCUWithAttributes(cfg, fcuArgs) + } +} + +// reportProcessingTime reports the metric of how long it took to process the +// current block +func reportProcessingTime(startTime time.Time) { + onBlockProcessingTime.Observe(float64(time.Since(startTime).Milliseconds())) +} + +// computePayloadAttributes modifies the passed FCU arguments to +// contain the right payload attributes with the tracked proposer. It gets +// called on blocks that arrive after the attestation voting window, or in a +// background routine after syncing early blocks. +func (s *Service) computePayloadAttributes(cfg *postBlockProcessConfig, fcuArgs *fcuConfig) error { + if cfg.blockRoot == cfg.headRoot { + if err := s.updateCachesPostBlockProcessing(cfg); err != nil { + return err + } + } + fcuArgs.attributes = s.getPayloadAttribute(cfg.ctx, fcuArgs.headState, fcuArgs.proposingSlot, cfg.headRoot[:]) + return nil +} + // getBlockPreState returns the pre state of an incoming block. It uses the parent root of the block // to retrieve the state in DB. It verifies the pre state's validity and the incoming block // is in the correct time window. diff --git a/beacon-chain/blockchain/process_block_test.go b/beacon-chain/blockchain/process_block_test.go index fc53ce923..1d67a364d 100644 --- a/beacon-chain/blockchain/process_block_test.go +++ b/beacon-chain/blockchain/process_block_test.go @@ -567,7 +567,7 @@ func TestOnBlock_CanFinalize_WithOnTick(t *testing.T) { postState, err := service.validateStateTransition(ctx, preState, wsb) require.NoError(t, err) require.NoError(t, service.savePostStateInfo(ctx, r, wsb, postState)) - require.NoError(t, service.postBlockProcess(ctx, wsb, r, postState, true)) + require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, wsb, r, [32]byte{}, postState, true})) require.NoError(t, service.updateJustificationOnBlock(ctx, preState, postState, currStoreJustifiedEpoch)) _, err = service.updateFinalizationOnBlock(ctx, preState, postState, currStoreFinalizedEpoch) require.NoError(t, err) @@ -615,7 +615,7 @@ func TestOnBlock_CanFinalize(t *testing.T) { postState, err := service.validateStateTransition(ctx, preState, wsb) require.NoError(t, err) require.NoError(t, service.savePostStateInfo(ctx, r, wsb, postState)) - require.NoError(t, service.postBlockProcess(ctx, wsb, r, postState, true)) + require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, wsb, r, [32]byte{}, postState, true})) require.NoError(t, service.updateJustificationOnBlock(ctx, preState, postState, currStoreJustifiedEpoch)) _, err = service.updateFinalizationOnBlock(ctx, preState, postState, currStoreFinalizedEpoch) require.NoError(t, err) @@ -641,7 +641,7 @@ func TestOnBlock_CanFinalize(t *testing.T) { func TestOnBlock_NilBlock(t *testing.T) { service, tr := minimalTestService(t) - err := service.postBlockProcess(tr.ctx, nil, [32]byte{}, nil, true) + err := service.postBlockProcess(&postBlockProcessConfig{tr.ctx, nil, [32]byte{}, [32]byte{}, nil, true}) require.Equal(t, true, IsInvalidBlock(err)) } @@ -689,7 +689,7 @@ func TestOnBlock_CallNewPayloadAndForkchoiceUpdated(t *testing.T) { postState, err := service.validateStateTransition(ctx, preState, wsb) require.NoError(t, err) require.NoError(t, service.savePostStateInfo(ctx, r, wsb, postState)) - require.NoError(t, service.postBlockProcess(ctx, wsb, r, postState, false)) + require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, wsb, r, [32]byte{}, postState, false})) testState, err = service.cfg.StateGen.StateByRoot(ctx, r) require.NoError(t, err) } @@ -1111,7 +1111,7 @@ func TestOnBlock_ProcessBlocksParallel(t *testing.T) { postState, err := service.validateStateTransition(ctx, preState, wsb1) require.NoError(t, err) lock.Lock() - require.NoError(t, service.postBlockProcess(ctx, wsb1, r1, postState, true)) + require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, wsb1, r1, [32]byte{}, postState, true})) lock.Unlock() wg.Done() }() @@ -1121,7 +1121,7 @@ func TestOnBlock_ProcessBlocksParallel(t *testing.T) { postState, err := service.validateStateTransition(ctx, preState, wsb2) require.NoError(t, err) lock.Lock() - require.NoError(t, service.postBlockProcess(ctx, wsb2, r2, postState, true)) + require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, wsb2, r2, [32]byte{}, postState, true})) lock.Unlock() wg.Done() }() @@ -1131,7 +1131,7 @@ func TestOnBlock_ProcessBlocksParallel(t *testing.T) { postState, err := service.validateStateTransition(ctx, preState, wsb3) require.NoError(t, err) lock.Lock() - require.NoError(t, service.postBlockProcess(ctx, wsb3, r3, postState, true)) + require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, wsb3, r3, [32]byte{}, postState, true})) lock.Unlock() wg.Done() }() @@ -1141,7 +1141,7 @@ func TestOnBlock_ProcessBlocksParallel(t *testing.T) { postState, err := service.validateStateTransition(ctx, preState, wsb4) require.NoError(t, err) lock.Lock() - require.NoError(t, service.postBlockProcess(ctx, wsb4, r4, postState, true)) + require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, wsb4, r4, [32]byte{}, postState, true})) lock.Unlock() wg.Done() }() @@ -1216,7 +1216,7 @@ func TestStore_NoViableHead_FCU(t *testing.T) { postState, err := service.validateStateTransition(ctx, preState, wsb) require.NoError(t, err) require.NoError(t, service.savePostStateInfo(ctx, root, wsb, postState)) - require.NoError(t, service.postBlockProcess(ctx, wsb, root, postState, false)) + require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, wsb, root, [32]byte{}, postState, false})) } for i := 6; i < 12; i++ { @@ -1234,7 +1234,7 @@ func TestStore_NoViableHead_FCU(t *testing.T) { postState, err := service.validateStateTransition(ctx, preState, wsb) require.NoError(t, err) require.NoError(t, service.savePostStateInfo(ctx, root, wsb, postState)) - err = service.postBlockProcess(ctx, wsb, root, postState, false) + err = service.postBlockProcess(&postBlockProcessConfig{ctx, wsb, root, [32]byte{}, postState, false}) require.NoError(t, err) } @@ -1253,7 +1253,7 @@ func TestStore_NoViableHead_FCU(t *testing.T) { postState, err := service.validateStateTransition(ctx, preState, wsb) require.NoError(t, err) require.NoError(t, service.savePostStateInfo(ctx, root, wsb, postState)) - err = service.postBlockProcess(ctx, wsb, root, postState, false) + err = service.postBlockProcess(&postBlockProcessConfig{ctx, wsb, root, [32]byte{}, postState, false}) require.NoError(t, err) } // Check that we haven't justified the second epoch yet @@ -1275,7 +1275,7 @@ func TestStore_NoViableHead_FCU(t *testing.T) { postState, err := service.validateStateTransition(ctx, preState, wsb) require.NoError(t, err) require.NoError(t, service.savePostStateInfo(ctx, firstInvalidRoot, wsb, postState)) - err = service.postBlockProcess(ctx, wsb, firstInvalidRoot, postState, false) + err = service.postBlockProcess(&postBlockProcessConfig{ctx, wsb, firstInvalidRoot, [32]byte{}, postState, false}) require.NoError(t, err) jc = service.cfg.ForkChoiceStore.JustifiedCheckpoint() require.Equal(t, primitives.Epoch(2), jc.Epoch) @@ -1303,7 +1303,7 @@ func TestStore_NoViableHead_FCU(t *testing.T) { postState, err = service.validateStateTransition(ctx, preState, wsb) require.NoError(t, err) require.NoError(t, service.savePostStateInfo(ctx, root, wsb, postState)) - err = service.postBlockProcess(ctx, wsb, root, postState, false) + err = service.postBlockProcess(&postBlockProcessConfig{ctx, wsb, root, [32]byte{}, postState, false}) require.ErrorContains(t, "received an INVALID payload from execution engine", err) // Check that forkchoice's head is the last invalid block imported. The // store's headroot is the previous head (since the invalid block did @@ -1332,7 +1332,7 @@ func TestStore_NoViableHead_FCU(t *testing.T) { postState, err = service.validateStateTransition(ctx, preState, wsb) require.NoError(t, err) require.NoError(t, service.savePostStateInfo(ctx, root, wsb, postState)) - err = service.postBlockProcess(ctx, wsb, root, postState, true) + err = service.postBlockProcess(&postBlockProcessConfig{ctx, wsb, root, [32]byte{}, postState, true}) require.NoError(t, err) // Check the newly imported block is head, it justified the right // checkpoint and the node is no longer optimistic @@ -1394,7 +1394,7 @@ func TestStore_NoViableHead_NewPayload(t *testing.T) { postState, err := service.validateStateTransition(ctx, preState, wsb) require.NoError(t, err) require.NoError(t, service.savePostStateInfo(ctx, root, wsb, postState)) - require.NoError(t, service.postBlockProcess(ctx, wsb, root, postState, false)) + require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, wsb, root, [32]byte{}, postState, false})) } for i := 6; i < 12; i++ { @@ -1412,7 +1412,7 @@ func TestStore_NoViableHead_NewPayload(t *testing.T) { postState, err := service.validateStateTransition(ctx, preState, wsb) require.NoError(t, err) require.NoError(t, service.savePostStateInfo(ctx, root, wsb, postState)) - err = service.postBlockProcess(ctx, wsb, root, postState, false) + err = service.postBlockProcess(&postBlockProcessConfig{ctx, wsb, root, [32]byte{}, postState, false}) require.NoError(t, err) } @@ -1432,7 +1432,7 @@ func TestStore_NoViableHead_NewPayload(t *testing.T) { postState, err := service.validateStateTransition(ctx, preState, wsb) require.NoError(t, err) require.NoError(t, service.savePostStateInfo(ctx, root, wsb, postState)) - err = service.postBlockProcess(ctx, wsb, root, postState, false) + err = service.postBlockProcess(&postBlockProcessConfig{ctx, wsb, root, [32]byte{}, postState, false}) require.NoError(t, err) } // Check that we haven't justified the second epoch yet @@ -1454,7 +1454,7 @@ func TestStore_NoViableHead_NewPayload(t *testing.T) { postState, err := service.validateStateTransition(ctx, preState, wsb) require.NoError(t, err) require.NoError(t, service.savePostStateInfo(ctx, firstInvalidRoot, wsb, postState)) - err = service.postBlockProcess(ctx, wsb, firstInvalidRoot, postState, false) + err = service.postBlockProcess(&postBlockProcessConfig{ctx, wsb, firstInvalidRoot, [32]byte{}, postState, false}) require.NoError(t, err) jc = service.cfg.ForkChoiceStore.JustifiedCheckpoint() require.Equal(t, primitives.Epoch(2), jc.Epoch) @@ -1510,7 +1510,7 @@ func TestStore_NoViableHead_NewPayload(t *testing.T) { postState, err = service.validateStateTransition(ctx, preState, wsb) require.NoError(t, err) require.NoError(t, service.savePostStateInfo(ctx, root, wsb, postState)) - err = service.postBlockProcess(ctx, wsb, root, postState, true) + err = service.postBlockProcess(&postBlockProcessConfig{ctx, wsb, root, [32]byte{}, postState, true}) require.NoError(t, err) // Check the newly imported block is head, it justified the right // checkpoint and the node is no longer optimistic @@ -1574,7 +1574,7 @@ func TestStore_NoViableHead_Liveness(t *testing.T) { postState, err := service.validateStateTransition(ctx, preState, wsb) require.NoError(t, err) require.NoError(t, service.savePostStateInfo(ctx, root, wsb, postState)) - require.NoError(t, service.postBlockProcess(ctx, wsb, root, postState, false)) + require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, wsb, root, [32]byte{}, postState, false})) } for i := 6; i < 12; i++ { @@ -1593,7 +1593,7 @@ func TestStore_NoViableHead_Liveness(t *testing.T) { postState, err := service.validateStateTransition(ctx, preState, wsb) require.NoError(t, err) require.NoError(t, service.savePostStateInfo(ctx, root, wsb, postState)) - err = service.postBlockProcess(ctx, wsb, root, postState, false) + err = service.postBlockProcess(&postBlockProcessConfig{ctx, wsb, root, [32]byte{}, postState, false}) require.NoError(t, err) } @@ -1612,7 +1612,7 @@ func TestStore_NoViableHead_Liveness(t *testing.T) { postState, err := service.validateStateTransition(ctx, preState, wsb) require.NoError(t, err) require.NoError(t, service.savePostStateInfo(ctx, lastValidRoot, wsb, postState)) - err = service.postBlockProcess(ctx, wsb, lastValidRoot, postState, false) + err = service.postBlockProcess(&postBlockProcessConfig{ctx, wsb, lastValidRoot, [32]byte{}, postState, false}) require.NoError(t, err) // save the post state and the payload Hash of this block since it will // be the LVH @@ -1639,7 +1639,7 @@ func TestStore_NoViableHead_Liveness(t *testing.T) { postState, err := service.validateStateTransition(ctx, preState, wsb) require.NoError(t, err) require.NoError(t, service.savePostStateInfo(ctx, invalidRoots[i-13], wsb, postState)) - err = service.postBlockProcess(ctx, wsb, invalidRoots[i-13], postState, false) + err = service.postBlockProcess(&postBlockProcessConfig{ctx, wsb, invalidRoots[i-13], [32]byte{}, postState, false}) require.NoError(t, err) } // Check that we have justified the second epoch @@ -1704,7 +1704,7 @@ func TestStore_NoViableHead_Liveness(t *testing.T) { postState, err = service.validateStateTransition(ctx, preState, wsb) require.NoError(t, err) require.NoError(t, service.savePostStateInfo(ctx, root, wsb, postState)) - require.NoError(t, service.postBlockProcess(ctx, wsb, root, postState, true)) + require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, wsb, root, [32]byte{}, postState, true})) // Check that the head is still INVALID and the node is still optimistic require.Equal(t, invalidHeadRoot, service.cfg.ForkChoiceStore.CachedHeadRoot()) optimistic, err = service.IsOptimistic(ctx) @@ -1727,7 +1727,7 @@ func TestStore_NoViableHead_Liveness(t *testing.T) { postState, err := service.validateStateTransition(ctx, preState, wsb) require.NoError(t, err) require.NoError(t, service.savePostStateInfo(ctx, root, wsb, postState)) - err = service.postBlockProcess(ctx, wsb, root, postState, true) + err = service.postBlockProcess(&postBlockProcessConfig{ctx, wsb, root, [32]byte{}, postState, true}) require.NoError(t, err) st, err = service.cfg.StateGen.StateByRoot(ctx, root) require.NoError(t, err) @@ -1753,7 +1753,7 @@ func TestStore_NoViableHead_Liveness(t *testing.T) { postState, err = service.validateStateTransition(ctx, preState, wsb) require.NoError(t, err) require.NoError(t, service.savePostStateInfo(ctx, root, wsb, postState)) - err = service.postBlockProcess(ctx, wsb, root, postState, true) + err = service.postBlockProcess(&postBlockProcessConfig{ctx, wsb, root, [32]byte{}, postState, true}) require.NoError(t, err) require.Equal(t, root, service.cfg.ForkChoiceStore.CachedHeadRoot()) sjc = service.CurrentJustifiedCheckpt() @@ -1809,7 +1809,7 @@ func TestNoViableHead_Reboot(t *testing.T) { postState, err := service.validateStateTransition(ctx, preState, wsb) require.NoError(t, err) require.NoError(t, service.savePostStateInfo(ctx, root, wsb, postState)) - require.NoError(t, service.postBlockProcess(ctx, wsb, root, postState, false)) + require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, wsb, root, [32]byte{}, postState, false})) } for i := 6; i < 12; i++ { @@ -1827,7 +1827,7 @@ func TestNoViableHead_Reboot(t *testing.T) { postState, err := service.validateStateTransition(ctx, preState, wsb) require.NoError(t, err) require.NoError(t, service.savePostStateInfo(ctx, root, wsb, postState)) - err = service.postBlockProcess(ctx, wsb, root, postState, false) + err = service.postBlockProcess(&postBlockProcessConfig{ctx, wsb, root, [32]byte{}, postState, false}) require.NoError(t, err) } @@ -1846,7 +1846,7 @@ func TestNoViableHead_Reboot(t *testing.T) { postState, err := service.validateStateTransition(ctx, preState, wsb) require.NoError(t, err) require.NoError(t, service.savePostStateInfo(ctx, lastValidRoot, wsb, postState)) - err = service.postBlockProcess(ctx, wsb, lastValidRoot, postState, false) + err = service.postBlockProcess(&postBlockProcessConfig{ctx, wsb, lastValidRoot, [32]byte{}, postState, false}) require.NoError(t, err) // save the post state and the payload Hash of this block since it will // be the LVH @@ -1875,7 +1875,7 @@ func TestNoViableHead_Reboot(t *testing.T) { postState, err := service.validateStateTransition(ctx, preState, wsb) require.NoError(t, err) require.NoError(t, service.savePostStateInfo(ctx, root, wsb, postState)) - require.NoError(t, service.postBlockProcess(ctx, wsb, root, postState, false)) + require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, wsb, root, [32]byte{}, postState, false})) require.NoError(t, service.updateJustificationOnBlock(ctx, preState, postState, currStoreJustifiedEpoch)) _, err = service.updateFinalizationOnBlock(ctx, preState, postState, currStoreFinalizedEpoch) require.NoError(t, err) @@ -1990,7 +1990,7 @@ func TestOnBlock_HandleBlockAttestations(t *testing.T) { postState, err := service.validateStateTransition(ctx, preState, wsb) require.NoError(t, err) require.NoError(t, service.savePostStateInfo(ctx, root, wsb, postState)) - require.NoError(t, service.postBlockProcess(ctx, wsb, root, postState, false)) + require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, wsb, root, [32]byte{}, postState, false})) st, err = service.HeadState(ctx) require.NoError(t, err) @@ -2197,6 +2197,35 @@ func TestMissingIndices(t *testing.T) { } } +func Test_getFCUArgs(t *testing.T) { + s, tr := minimalTestService(t) + ctx := tr.ctx + st, keys := util.DeterministicGenesisState(t, 64) + b, err := util.GenerateFullBlock(st, keys, util.DefaultBlockGenConfig(), 1) + require.NoError(t, err) + wsb, err := consensusblocks.NewSignedBeaconBlock(b) + require.NoError(t, err) + + cfg := &postBlockProcessConfig{ + ctx: ctx, + signed: wsb, + blockRoot: [32]byte{'a'}, + postState: st, + isValidPayload: true, + } + // error branch + fcuArgs := &fcuConfig{} + err = s.getFCUArgs(cfg, fcuArgs) + require.ErrorContains(t, "block does not exist", err) + + // canonical branch + cfg.headRoot = cfg.blockRoot + fcuArgs = &fcuConfig{} + err = s.getFCUArgs(cfg, fcuArgs) + require.NoError(t, err) + require.Equal(t, cfg.blockRoot, fcuArgs.headRoot) +} + func fakeCommitments(n int) [][]byte { f := make([][]byte, n) for i := range f { diff --git a/beacon-chain/blockchain/receive_attestation.go b/beacon-chain/blockchain/receive_attestation.go index c8ba5bcca..bdfd666ca 100644 --- a/beacon-chain/blockchain/receive_attestation.go +++ b/beacon-chain/blockchain/receive_attestation.go @@ -150,12 +150,9 @@ func (s *Service) UpdateHead(ctx context.Context, proposingSlot primitives.Slot) headBlock: headBlock, proposingSlot: proposingSlot, } - _, tracked := s.trackedProposer(headState, proposingSlot) - if tracked { - if s.shouldOverrideFCU(newHeadRoot, proposingSlot) { - return - } - fcuArgs.attributes = s.getPayloadAttribute(ctx, headState, proposingSlot, newHeadRoot[:]) + fcuArgs.attributes = s.getPayloadAttribute(ctx, headState, proposingSlot, newHeadRoot[:]) + if fcuArgs.attributes != nil && s.shouldOverrideFCU(newHeadRoot, proposingSlot) { + return } if err := s.forkchoiceUpdateWithExecution(s.ctx, fcuArgs); err != nil { log.WithError(err).Error("could not update forkchoice") diff --git a/beacon-chain/blockchain/receive_attestation_test.go b/beacon-chain/blockchain/receive_attestation_test.go index 7ebe332ef..57d19a12f 100644 --- a/beacon-chain/blockchain/receive_attestation_test.go +++ b/beacon-chain/blockchain/receive_attestation_test.go @@ -112,7 +112,7 @@ func TestService_ProcessAttestationsAndUpdateHead(t *testing.T) { postState, err := service.validateStateTransition(ctx, preState, wsb) require.NoError(t, err) require.NoError(t, service.savePostStateInfo(ctx, tRoot, wsb, postState)) - require.NoError(t, service.postBlockProcess(ctx, wsb, tRoot, postState, false)) + require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, wsb, tRoot, [32]byte{}, postState, false})) copied, err = service.cfg.StateGen.StateByRoot(ctx, tRoot) require.NoError(t, err) require.Equal(t, 2, fcs.NodeCount()) @@ -168,7 +168,7 @@ func TestService_UpdateHead_NoAtts(t *testing.T) { postState, err := service.validateStateTransition(ctx, preState, wsb) require.NoError(t, err) require.NoError(t, service.savePostStateInfo(ctx, tRoot, wsb, postState)) - require.NoError(t, service.postBlockProcess(ctx, wsb, tRoot, postState, false)) + require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, wsb, tRoot, [32]byte{}, postState, false})) require.Equal(t, 2, fcs.NodeCount()) require.NoError(t, service.cfg.BeaconDB.SaveBlock(ctx, wsb)) require.Equal(t, tRoot, service.head.root) diff --git a/beacon-chain/blockchain/receive_block.go b/beacon-chain/blockchain/receive_block.go index 7e2c494c7..0ebb483c6 100644 --- a/beacon-chain/blockchain/receive_block.go +++ b/beacon-chain/blockchain/receive_block.go @@ -119,7 +119,14 @@ func (s *Service) ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySig if err := s.savePostStateInfo(ctx, blockRoot, blockCopy, postState); err != nil { return errors.Wrap(err, "could not save post state info") } - if err := s.postBlockProcess(ctx, blockCopy, blockRoot, postState, isValidPayload); err != nil { + args := &postBlockProcessConfig{ + ctx: ctx, + signed: blockCopy, + blockRoot: blockRoot, + postState: postState, + isValidPayload: isValidPayload, + } + if err := s.postBlockProcess(args); err != nil { err := errors.Wrap(err, "could not process block") tracing.AnnotateError(span, err) return err diff --git a/time/slots/slottime.go b/time/slots/slottime.go index 1e2f5cf28..587398f85 100644 --- a/time/slots/slottime.go +++ b/time/slots/slottime.go @@ -267,7 +267,7 @@ func TimeIntoSlot(genesisTime uint64) time.Duration { // WithinVotingWindow returns whether the current time is within the voting window // (eg. 4 seconds on mainnet) of the current slot. -func WithinVotingWindow(genesisTime uint64) bool { +func WithinVotingWindow(genesisTime uint64, slot primitives.Slot) bool { votingWindow := params.BeaconConfig().SecondsPerSlot / params.BeaconConfig().IntervalsPerSlot - return TimeIntoSlot(genesisTime) < time.Duration(votingWindow)*time.Second + return time.Since(StartTime(genesisTime, slot)) < time.Duration(votingWindow)*time.Second } diff --git a/time/slots/slottime_test.go b/time/slots/slottime_test.go index 42bc79466..04ed844d8 100644 --- a/time/slots/slottime_test.go +++ b/time/slots/slottime_test.go @@ -603,7 +603,7 @@ func TestTimeIntoSlot(t *testing.T) { func TestWithinVotingWindow(t *testing.T) { genesisTime := uint64(time.Now().Add(-37 * time.Second).Unix()) - require.Equal(t, true, WithinVotingWindow(genesisTime)) + require.Equal(t, true, WithinVotingWindow(genesisTime, 3)) genesisTime = uint64(time.Now().Add(-40 * time.Second).Unix()) - require.Equal(t, false, WithinVotingWindow(genesisTime)) + require.Equal(t, false, WithinVotingWindow(genesisTime, 3)) }