Check init sync before getting payload attributes (#13479)

* Check init sync before getting payload attributes

This PR adds a helper to forkchoice to return the delay of the latest
imported block. It also adds a helper with an heuristic to check if the
node is during init sync. If the highest imported node was imported with
a delay of less than an epoch then the node is considered in regular
sync. If on the other hand, in addition the highest imported node is
more than two epochs old, then the node is considered in init Sync.

The helper to check this only uses forkchoice and therefore requires a
read lock. There are four paths to call this

1) During regular block processing, we defer a function to send the
   second FCU call with attributes. This function may not be called at
all if we are not regularly syncing
2) During regular block processing, we check in the path
   `postBlockProces->getFCUArgs->computePayloadAttributes` the payload
attributes if we are syncing a late block. In this case forkchoice is
already locked and we add a call in `getFCUArgs` to return early if not
regularly syncing
3) During handling of late blocks on `lateBlockTasks` we simply return
   early if not in regular sync (This is the biggest change as it takes
a longer FC lock for lateBlockTasks)
4) On Attestation processing, in UpdateHead, we are already locked so we
   just add a check to not update head on this path if not regularly
syncing.

* fix build

* Fix mocks
This commit is contained in:
Potuz 2024-01-17 12:39:28 -03:00 committed by GitHub
parent 87b53db3b4
commit 79bb7efbf8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 91 additions and 10 deletions

View File

@ -556,3 +556,18 @@ func (s *Service) RecentBlockSlot(root [32]byte) (primitives.Slot, error) {
defer s.cfg.ForkChoiceStore.RUnlock()
return s.cfg.ForkChoiceStore.Slot(root)
}
// inRegularSync applies the following heuristics to decide if the node is in
// regular sync mode vs init sync mode using only forkchoice.
// It checks that the highest received block is behind the current time by at least 2 epochs
// and that it was imported at least one epoch late if both of these
// tests pass then the node is in init sync. The caller of this function MUST
// have a lock on forkchoice
func (s *Service) inRegularSync() bool {
currentSlot := s.CurrentSlot()
fc := s.cfg.ForkChoiceStore
if currentSlot-fc.HighestReceivedBlockSlot() < 2*params.BeaconConfig().SlotsPerEpoch {
return true
}
return fc.HighestReceivedBlockDelay() < params.BeaconConfig().SlotsPerEpoch
}

View File

@ -593,3 +593,26 @@ func TestService_IsFinalized(t *testing.T) {
require.Equal(t, true, c.IsFinalized(ctx, br))
require.Equal(t, false, c.IsFinalized(ctx, [32]byte{'c'}))
}
func TestService_inRegularSync(t *testing.T) {
ctx := context.Background()
c := &Service{cfg: &config{ForkChoiceStore: doublylinkedtree.New()}, head: &head{root: [32]byte{'b'}}}
ojc := &ethpb.Checkpoint{Root: params.BeaconConfig().ZeroHash[:]}
ofc := &ethpb.Checkpoint{Root: params.BeaconConfig().ZeroHash[:]}
st, blkRoot, err := prepareForkchoiceState(ctx, 100, [32]byte{'a'}, [32]byte{}, params.BeaconConfig().ZeroHash, ojc, ofc)
require.NoError(t, err)
require.NoError(t, c.cfg.ForkChoiceStore.InsertNode(ctx, st, blkRoot))
require.Equal(t, false, c.inRegularSync())
c.SetGenesisTime(time.Now().Add(time.Second * time.Duration(-1*int64(params.BeaconConfig().SlotsPerEpoch)*int64(params.BeaconConfig().SecondsPerSlot))))
st, blkRoot, err = prepareForkchoiceState(ctx, 128, [32]byte{'b'}, [32]byte{'a'}, params.BeaconConfig().ZeroHash, ojc, ofc)
require.NoError(t, err)
require.NoError(t, c.cfg.ForkChoiceStore.InsertNode(ctx, st, blkRoot))
require.Equal(t, false, c.inRegularSync())
c.SetGenesisTime(time.Now().Add(time.Second * time.Duration(-5*int64(params.BeaconConfig().SlotsPerEpoch)*int64(params.BeaconConfig().SecondsPerSlot))))
require.Equal(t, true, c.inRegularSync())
c.SetGenesisTime(time.Now().Add(time.Second * time.Duration(-1*int64(params.BeaconConfig().SlotsPerEpoch)*int64(params.BeaconConfig().SecondsPerSlot))))
c.cfg.ForkChoiceStore.SetGenesisTime(uint64(time.Now().Unix()))
require.Equal(t, true, c.inRegularSync())
}

View File

@ -67,7 +67,9 @@ func (s *Service) postBlockProcess(cfg *postBlockProcessConfig) error {
startTime := time.Now()
fcuArgs := &fcuConfig{}
defer s.handleSecondFCUCall(cfg, fcuArgs)
if s.inRegularSync() {
defer s.handleSecondFCUCall(cfg, fcuArgs)
}
defer s.sendLightClientFeeds(cfg)
defer s.sendStateFeedOnBlock(cfg)
defer reportProcessingTime(startTime)
@ -580,10 +582,15 @@ func (s *Service) lateBlockTasks(ctx context.Context) {
if s.CurrentSlot() == s.HeadSlot() {
return
}
s.cfg.ForkChoiceStore.RLock()
defer s.cfg.ForkChoiceStore.RUnlock()
// return early if we are in init sync
if !s.inRegularSync() {
return
}
s.cfg.StateNotifier.StateFeed().Send(&feed.Event{
Type: statefeed.MissedSlot,
})
s.headLock.RLock()
headRoot := s.headRoot()
headState := s.headState(ctx)
@ -598,12 +605,9 @@ func (s *Service) lateBlockTasks(ctx context.Context) {
if err := transition.UpdateNextSlotCache(ctx, lastRoot, lastState); err != nil {
log.WithError(err).Debug("could not update next slot state cache")
}
// handleEpochBoundary requires a forkchoice lock to obtain the target root.
s.cfg.ForkChoiceStore.RLock()
if err := s.handleEpochBoundary(ctx, currentSlot, headState, headRoot[:]); err != nil {
log.WithError(err).Error("lateBlockTasks: could not update epoch boundary caches")
}
s.cfg.ForkChoiceStore.RUnlock()
// return early if we already started building a block for the current
// head root
_, has := s.cfg.PayloadIDCache.PayloadID(s.CurrentSlot()+1, headRoot)
@ -629,9 +633,7 @@ func (s *Service) lateBlockTasks(ctx context.Context) {
if fcuArgs.attributes.IsEmpty() {
return
}
s.cfg.ForkChoiceStore.RLock()
_, err = s.notifyForkchoiceUpdate(ctx, fcuArgs)
s.cfg.ForkChoiceStore.RUnlock()
if err != nil {
log.WithError(err).Debug("could not perform late block tasks: failed to update forkchoice with engine")
}

View File

@ -37,6 +37,9 @@ func (s *Service) getFCUArgs(cfg *postBlockProcessConfig, fcuArgs *fcuConfig) er
if err := s.getFCUArgsEarlyBlock(cfg, fcuArgs); err != nil {
return err
}
if !s.inRegularSync() {
return nil
}
slot := cfg.signed.Block().Slot()
if slots.WithinVotingWindow(uint64(s.genesisTime.Unix()), slot) {
return nil

View File

@ -150,7 +150,9 @@ func (s *Service) UpdateHead(ctx context.Context, proposingSlot primitives.Slot)
headBlock: headBlock,
proposingSlot: proposingSlot,
}
fcuArgs.attributes = s.getPayloadAttribute(ctx, headState, proposingSlot, newHeadRoot[:])
if s.inRegularSync() {
fcuArgs.attributes = s.getPayloadAttribute(ctx, headState, proposingSlot, newHeadRoot[:])
}
if fcuArgs.attributes != nil && s.shouldOverrideFCU(newHeadRoot, proposingSlot) {
return
}

View File

@ -240,6 +240,20 @@ func (f *ForkChoice) HighestReceivedBlockSlot() primitives.Slot {
return f.store.highestReceivedNode.slot
}
// HighestReceivedBlockSlotDelay returns the number of slots that the highest
// received block was late when receiving it
func (f *ForkChoice) HighestReceivedBlockDelay() primitives.Slot {
n := f.store.highestReceivedNode
if n == nil {
return 0
}
secs, err := slots.SecondsSinceSlotStart(n.slot, f.store.genesisTime, n.timestamp)
if err != nil {
return 0
}
return primitives.Slot(secs / params.BeaconConfig().SecondsPerSlot)
}
// ReceivedBlocksLastEpoch returns the number of blocks received in the last epoch
func (f *ForkChoice) ReceivedBlocksLastEpoch() (uint64, error) {
count := uint64(0)

View File

@ -333,26 +333,29 @@ func TestForkChoice_ReceivedBlocksLastEpoch(t *testing.T) {
require.NoError(t, err)
require.Equal(t, uint64(1), count)
require.Equal(t, primitives.Slot(1), f.HighestReceivedBlockSlot())
require.Equal(t, primitives.Slot(0), f.HighestReceivedBlockDelay())
// 64
// Received block last epoch is 1
_, err = s.insert(context.Background(), 64, [32]byte{'A'}, b, b, 1, 1)
require.NoError(t, err)
s.genesisTime = uint64(time.Now().Add(time.Duration(-64*int64(params.BeaconConfig().SecondsPerSlot)) * time.Second).Unix())
s.genesisTime = uint64(time.Now().Add(time.Duration((-64*int64(params.BeaconConfig().SecondsPerSlot))-1) * time.Second).Unix())
count, err = f.ReceivedBlocksLastEpoch()
require.NoError(t, err)
require.Equal(t, uint64(1), count)
require.Equal(t, primitives.Slot(64), f.HighestReceivedBlockSlot())
require.Equal(t, primitives.Slot(0), f.HighestReceivedBlockDelay())
// 64 65
// Received block last epoch is 2
_, err = s.insert(context.Background(), 65, [32]byte{'B'}, b, b, 1, 1)
require.NoError(t, err)
s.genesisTime = uint64(time.Now().Add(time.Duration(-65*int64(params.BeaconConfig().SecondsPerSlot)) * time.Second).Unix())
s.genesisTime = uint64(time.Now().Add(time.Duration(-66*int64(params.BeaconConfig().SecondsPerSlot)) * time.Second).Unix())
count, err = f.ReceivedBlocksLastEpoch()
require.NoError(t, err)
require.Equal(t, uint64(2), count)
require.Equal(t, primitives.Slot(65), f.HighestReceivedBlockSlot())
require.Equal(t, primitives.Slot(1), f.HighestReceivedBlockDelay())
// 64 65 66
// Received block last epoch is 3

View File

@ -64,6 +64,7 @@ type FastGetter interface {
FinalizedPayloadBlockHash() [32]byte
HasNode([32]byte) bool
HighestReceivedBlockSlot() primitives.Slot
HighestReceivedBlockDelay() primitives.Slot
IsCanonical(root [32]byte) bool
IsOptimistic(root [32]byte) (bool, error)
IsViableForCheckpoint(*forkchoicetypes.Checkpoint) (bool, error)

View File

@ -114,6 +114,13 @@ func (ro *ROForkChoice) HighestReceivedBlockSlot() primitives.Slot {
return ro.getter.HighestReceivedBlockSlot()
}
// HighestReceivedBlockDelay delegates to the underlying forkchoice call, under a lock.
func (ro *ROForkChoice) HighestReceivedBlockDelay() primitives.Slot {
ro.l.RLock()
defer ro.l.RUnlock()
return ro.getter.HighestReceivedBlockDelay()
}
// ReceivedBlocksLastEpoch delegates to the underlying forkchoice call, under a lock.
func (ro *ROForkChoice) ReceivedBlocksLastEpoch() (uint64, error) {
ro.l.RLock()

View File

@ -29,6 +29,7 @@ const (
unrealizedJustifiedPayloadBlockHashCalled
nodeCountCalled
highestReceivedBlockSlotCalled
highestReceivedBlockDelayCalled
receivedBlocksLastEpochCalled
weightCalled
isOptimisticCalled
@ -113,6 +114,11 @@ func TestROLocking(t *testing.T) {
call: highestReceivedBlockSlotCalled,
cb: func(g FastGetter) { g.HighestReceivedBlockSlot() },
},
{
name: "highestReceivedBlockDelayCalled",
call: highestReceivedBlockDelayCalled,
cb: func(g FastGetter) { g.HighestReceivedBlockDelay() },
},
{
name: "receivedBlocksLastEpochCalled",
call: receivedBlocksLastEpochCalled,
@ -245,6 +251,11 @@ func (ro *mockROForkchoice) HighestReceivedBlockSlot() primitives.Slot {
return 0
}
func (ro *mockROForkchoice) HighestReceivedBlockDelay() primitives.Slot {
ro.calls = append(ro.calls, highestReceivedBlockDelayCalled)
return 0
}
func (ro *mockROForkchoice) ReceivedBlocksLastEpoch() (uint64, error) {
ro.calls = append(ro.calls, receivedBlocksLastEpochCalled)
return 0, nil