Simplify fcu 1 (#13387)

* Remove unsafe proposer indices cache

* Simplify FCU #1

This PR starts the process of gradually simplifying FCU
It removes the responsibility of getting the state and block from this
function and informing if head has changed. It is only called when the
imported block has actually become head.

* Add a call to FCU in edge cases
This commit is contained in:
Potuz 2023-12-30 09:20:20 -03:00 committed by GitHub
parent cff5e2b5fe
commit 9809f5ac77
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 84 additions and 90 deletions

View File

@ -44,50 +44,44 @@ func (s *Service) getStateAndBlock(ctx context.Context, r [32]byte) (state.Beaco
return headState, newHeadBlock, nil
}
type fcuConfig struct {
headState state.BeaconState
headBlock interfaces.ReadOnlySignedBeaconBlock
headRoot [32]byte
proposingSlot primitives.Slot
}
// fockchoiceUpdateWithExecution is a wrapper around notifyForkchoiceUpdate. It decides whether a new call to FCU should be made.
// it returns true if the new head is updated
func (s *Service) forkchoiceUpdateWithExecution(ctx context.Context, newHeadRoot [32]byte, proposingSlot primitives.Slot) (bool, error) {
func (s *Service) forkchoiceUpdateWithExecution(ctx context.Context, args *fcuConfig) error {
_, span := trace.StartSpan(ctx, "beacon-chain.blockchain.forkchoiceUpdateWithExecution")
defer span.End()
// Note: Use the service context here to avoid the parent context being ended during a forkchoice update.
ctx = trace.NewContext(s.ctx, span)
isNewHead := s.isNewHead(newHeadRoot)
if !isNewHead {
return false, nil
}
headState, headBlock, err := s.getStateAndBlock(ctx, newHeadRoot)
if err != nil {
log.WithError(err).Error("Could not get forkchoice update argument")
return false, nil
}
_, tracked := s.trackedProposer(headState, proposingSlot)
_, tracked := s.trackedProposer(args.headState, args.proposingSlot)
if (tracked || features.Get().PrepareAllPayloads) && !features.Get().DisableReorgLateBlocks {
if s.shouldOverrideFCU(newHeadRoot, proposingSlot) {
return false, nil
if s.shouldOverrideFCU(args.headRoot, args.proposingSlot) {
return nil
}
}
_, err = s.notifyForkchoiceUpdate(ctx, &notifyForkchoiceUpdateArg{
headState: headState,
headRoot: newHeadRoot,
headBlock: headBlock.Block(),
_, err := s.notifyForkchoiceUpdate(ctx, &notifyForkchoiceUpdateArg{
headState: args.headState,
headRoot: args.headRoot,
headBlock: args.headBlock.Block(),
})
if err != nil {
return false, errors.Wrap(err, "could not notify forkchoice update")
return errors.Wrap(err, "could not notify forkchoice update")
}
if err := s.saveHead(ctx, newHeadRoot, headBlock, headState); err != nil {
if err := s.saveHead(ctx, args.headRoot, args.headBlock, args.headState); err != nil {
log.WithError(err).Error("could not save head")
}
// Only need to prune attestations from pool if the head has changed.
if err := s.pruneAttsFromPool(headBlock); err != nil {
if err := s.pruneAttsFromPool(args.headBlock); err != nil {
log.WithError(err).Error("could not prune attestations from pool")
}
return true, nil
return nil
}
// shouldOverrideFCU checks whether the incoming block is still subject to being

View File

@ -58,33 +58,14 @@ func TestService_getHeadStateAndBlock(t *testing.T) {
}
func TestService_forkchoiceUpdateWithExecution_exceptionalCases(t *testing.T) {
hook := logTest.NewGlobal()
ctx := context.Background()
opts := testServiceOptsWithDB(t)
service, err := NewService(ctx, opts...)
require.NoError(t, err)
service.cfg.PayloadIDCache = cache.NewPayloadIDCache()
_, err = service.forkchoiceUpdateWithExecution(ctx, service.headRoot(), service.CurrentSlot()+1)
require.NoError(t, err)
hookErr := "could not notify forkchoice update"
invalidStateErr := "could not get state summary: could not find block in DB"
require.LogsDoNotContain(t, hook, invalidStateErr)
require.LogsDoNotContain(t, hook, hookErr)
gb, err := blocks.NewSignedBeaconBlock(util.NewBeaconBlock())
require.NoError(t, err)
require.NoError(t, service.saveInitSyncBlock(ctx, [32]byte{'a'}, gb))
_, err = service.forkchoiceUpdateWithExecution(ctx, [32]byte{'a'}, service.CurrentSlot()+1)
require.NoError(t, err)
require.LogsContain(t, hook, invalidStateErr)
service.cfg.TrackedValidatorsCache = cache.NewTrackedValidatorsCache()
hook.Reset()
service.head = &head{
root: [32]byte{'a'},
block: nil, /* should not panic if notify head uses correct head */
}
// Block in Cache
b := util.NewBeaconBlock()
b.Block.Slot = 2
wsb, err := blocks.NewSignedBeaconBlock(b)
@ -99,12 +80,6 @@ func TestService_forkchoiceUpdateWithExecution_exceptionalCases(t *testing.T) {
state: st,
}
service.cfg.PayloadIDCache.Set(2, [32]byte{2}, [8]byte{1})
_, err = service.forkchoiceUpdateWithExecution(ctx, r1, service.CurrentSlot())
require.NoError(t, err)
require.LogsDoNotContain(t, hook, invalidStateErr)
require.LogsDoNotContain(t, hook, hookErr)
// Block in DB
b = util.NewBeaconBlock()
b.Block.Slot = 3
util.SaveBlock(t, ctx, service.cfg.BeaconDB, b)
@ -117,19 +92,17 @@ func TestService_forkchoiceUpdateWithExecution_exceptionalCases(t *testing.T) {
state: st,
}
service.cfg.PayloadIDCache.Set(2, [32]byte{2}, [8]byte{1})
_, err = service.forkchoiceUpdateWithExecution(ctx, r1, service.CurrentSlot()+1)
require.NoError(t, err)
require.LogsDoNotContain(t, hook, invalidStateErr)
require.LogsDoNotContain(t, hook, hookErr)
args := &fcuConfig{
headState: st,
headRoot: r1,
headBlock: wsb,
proposingSlot: service.CurrentSlot() + 1,
}
require.NoError(t, service.forkchoiceUpdateWithExecution(ctx, args))
payloadID, has := service.cfg.PayloadIDCache.PayloadID(2, [32]byte{2})
require.Equal(t, true, has)
require.Equal(t, primitives.PayloadID{1}, payloadID)
// Test zero headRoot returns immediately.
headRoot := service.headRoot()
_, err = service.forkchoiceUpdateWithExecution(ctx, [32]byte{}, service.CurrentSlot()+1)
require.NoError(t, err)
require.Equal(t, service.headRoot(), headRoot)
}
func TestService_forkchoiceUpdateWithExecution_SameHeadRootNewProposer(t *testing.T) {
@ -173,9 +146,13 @@ func TestService_forkchoiceUpdateWithExecution_SameHeadRootNewProposer(t *testin
service.head.block = sb
service.head.state = st
service.cfg.PayloadIDCache.Set(service.CurrentSlot()+1, [32]byte{} /* root */, [8]byte{})
_, err = service.forkchoiceUpdateWithExecution(ctx, r, service.CurrentSlot()+1)
require.NoError(t, err)
args := &fcuConfig{
headState: st,
headBlock: sb,
headRoot: r,
proposingSlot: service.CurrentSlot() + 1,
}
require.NoError(t, service.forkchoiceUpdateWithExecution(ctx, args))
}
func TestShouldOverrideFCU(t *testing.T) {

View File

@ -73,6 +73,7 @@ func (s *Service) postBlockProcess(ctx context.Context, signed interfaces.ReadOn
if err != nil {
log.WithError(err).Warn("Could not update head")
}
newBlockHeadElapsedTime.Observe(float64(time.Since(start).Milliseconds()))
if blockRoot != headRoot {
receivedWeight, err := s.cfg.ForkChoiceStore.Weight(blockRoot)
if err != nil {
@ -88,10 +89,25 @@ func (s *Service) postBlockProcess(ctx context.Context, signed interfaces.ReadOn
"headRoot": fmt.Sprintf("%#x", headRoot),
"headWeight": headWeight,
}).Debug("Head block is not the received block")
}
newBlockHeadElapsedTime.Observe(float64(time.Since(start).Milliseconds()))
if headRoot == blockRoot {
if s.isNewHead(headRoot) {
headState, headBlock, err := s.getStateAndBlock(ctx, headRoot)
if err != nil {
log.WithError(err).Error("Could not get forkchoice update argument")
return nil
}
// verify conditions for FCU, notifies FCU, and saves the new head.
// This function also prunes attestations, other similar operations happen in prunePostBlockOperationPools.
args := &fcuConfig{
headState: headState,
headBlock: headBlock,
headRoot: headRoot,
proposingSlot: s.CurrentSlot() + 1,
}
if err := s.forkchoiceUpdateWithExecution(ctx, args); err != nil {
return err
}
}
} else {
// Updating next slot state cache can happen in the background
// except in the epoch boundary in which case we lock to handle
// the shuffling and proposer caches updates.
@ -114,19 +130,23 @@ func (s *Service) postBlockProcess(ctx context.Context, signed interfaces.ReadOn
}
}()
}
// verify conditions for FCU, notifies FCU, and saves the new head.
// This function also prunes attestations, other similar operations happen in prunePostBlockOperationPools.
args := &fcuConfig{
headState: postState,
headBlock: signed,
headRoot: headRoot,
proposingSlot: s.CurrentSlot() + 1,
}
if err := s.forkchoiceUpdateWithExecution(ctx, args); err != nil {
return err
}
}
// verify conditions for FCU, notifies FCU, and saves the new head.
// This function also prunes attestations, other similar operations happen in prunePostBlockOperationPools.
if _, err := s.forkchoiceUpdateWithExecution(ctx, headRoot, s.CurrentSlot()+1); err != nil {
return err
}
optimistic, err := s.cfg.ForkChoiceStore.IsOptimistic(blockRoot)
if err != nil {
log.WithError(err).Debug("Could not check if block is optimistic")
optimistic = true
}
// Send notification of the processed block to the state feed.
s.cfg.StateNotifier.StateFeed().Send(&feed.Event{
Type: statefeed.BlockProcessed,

View File

@ -130,28 +130,31 @@ func (s *Service) UpdateHead(ctx context.Context, proposingSlot primitives.Slot)
processAttsElapsedTime.Observe(float64(time.Since(start).Milliseconds()))
start = time.Now()
// return early if we haven't changed head
newHeadRoot, err := s.cfg.ForkChoiceStore.Head(ctx)
if err != nil {
log.WithError(err).Error("Could not compute head from new attestations")
// Fallback to our current head root in the event of a failure.
s.headLock.RLock()
newHeadRoot = s.headRoot()
s.headLock.RUnlock()
return
}
if !s.isNewHead(newHeadRoot) {
return
}
headState, headBlock, err := s.getStateAndBlock(ctx, newHeadRoot)
if err != nil {
log.WithError(err).Error("could not get head block")
return
}
newAttHeadElapsedTime.Observe(float64(time.Since(start).Milliseconds()))
changed, err := s.forkchoiceUpdateWithExecution(s.ctx, newHeadRoot, proposingSlot)
if err != nil {
args := &fcuConfig{
headState: headState,
headRoot: newHeadRoot,
headBlock: headBlock,
proposingSlot: proposingSlot,
}
if err := s.forkchoiceUpdateWithExecution(s.ctx, args); err != nil {
log.WithError(err).Error("could not update forkchoice")
}
if changed {
s.headLock.RLock()
log.WithFields(logrus.Fields{
"oldHeadRoot": fmt.Sprintf("%#x", s.headRoot()),
"newHeadRoot": fmt.Sprintf("%#x", newHeadRoot),
}).Debug("Head changed due to attestations")
s.headLock.RUnlock()
}
log.WithField("newHeadRoot", fmt.Sprintf("%#x", newHeadRoot)).Debug("Head changed due to attestations")
}
// This processes fork choice attestations from the pool to account for validator votes and fork choice.