Process atts and update head before proposing (#10653)

* Process atts and updeate head

* Fix ctx

* New test and old tests

* Update validator_test.go

* Update validator_test.go

* Update service.go

* Rename to UpdateHead

* Update receive_attestation.go

* Update receive_attestation.go

Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
This commit is contained in:
terencechain 2022-05-09 16:12:50 -07:00 committed by GitHub
parent 5d94030b4f
commit bbdf19cfd0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 125 additions and 44 deletions

View File

@ -31,6 +31,12 @@ type ChainInfoFetcher interface {
HeadDomainFetcher
}
// HeadUpdater defines a common interface for methods in blockchain service
// which allow to update the head info
type HeadUpdater interface {
UpdateHead(context.Context) error
}
// TimeFetcher retrieves the Ethereum consensus data that's related to time.
type TimeFetcher interface {
GenesisTime() time.Time

View File

@ -128,39 +128,52 @@ func (s *Service) spawnProcessAttestationsRoutine(stateFeed *event.Feed) {
return
}
// Continue when there's no fork choice attestation, there's nothing to process and update head.
// This covers the condition when the node is still initial syncing to the head of the chain.
if s.cfg.AttPool.ForkchoiceAttestationCount() == 0 {
continue
if err := s.UpdateHead(s.ctx); err != nil {
log.WithError(err).Error("Could not process attestations and update head")
return
}
s.processAttestations(s.ctx)
justified := s.store.JustifiedCheckpt()
if justified == nil {
log.WithError(errNilJustifiedInStore).Error("Could not get justified checkpoint")
continue
}
balances, err := s.justifiedBalances.get(s.ctx, bytesutil.ToBytes32(justified.Root))
if err != nil {
log.WithError(err).Errorf("Unable to get justified balances for root %v", justified.Root)
continue
}
newHeadRoot, err := s.updateHead(s.ctx, balances)
if err != nil {
log.WithError(err).Warn("Resolving fork due to new attestation")
}
if s.headRoot() != newHeadRoot {
log.WithFields(logrus.Fields{
"oldHeadRoot": fmt.Sprintf("%#x", s.headRoot()),
"newHeadRoot": fmt.Sprintf("%#x", newHeadRoot),
}).Debug("Head changed due to attestations")
}
s.notifyEngineIfChangedHead(s.ctx, newHeadRoot)
}
}
}()
}
// UpdateHead updates the canonical head of the chain based on information from fork-choice attestations and votes.
// It requires no external inputs.
func (s *Service) UpdateHead(ctx context.Context) error {
// Continue when there's no fork choice attestation, there's nothing to process and update head.
// This covers the condition when the node is still initial syncing to the head of the chain.
if s.cfg.AttPool.ForkchoiceAttestationCount() == 0 {
return nil
}
// Only one process can process attestations and update head at a time.
s.processAttestationsLock.Lock()
defer s.processAttestationsLock.Unlock()
s.processAttestations(ctx)
justified := s.store.JustifiedCheckpt()
if justified == nil {
return errNilJustifiedInStore
}
balances, err := s.justifiedBalances.get(ctx, bytesutil.ToBytes32(justified.Root))
if err != nil {
return err
}
newHeadRoot, err := s.updateHead(ctx, balances)
if err != nil {
log.WithError(err).Warn("Resolving fork due to new attestation")
}
if s.headRoot() != newHeadRoot {
log.WithFields(logrus.Fields{
"oldHeadRoot": fmt.Sprintf("%#x", s.headRoot()),
"newHeadRoot": fmt.Sprintf("%#x", newHeadRoot),
}).Debug("Head changed due to attestations")
}
s.notifyEngineIfChangedHead(ctx, newHeadRoot)
return nil
}
// This calls notify Forkchoice Update in the event that the head has changed
func (s *Service) notifyEngineIfChangedHead(ctx context.Context, newHeadRoot [32]byte) {
if s.headRoot() == newHeadRoot {

View File

@ -200,3 +200,37 @@ func TestNotifyEngineIfChangedHead(t *testing.T) {
require.Equal(t, types.ValidatorIndex(1), vId)
require.Equal(t, [8]byte{1}, payloadID)
}
func TestService_ProcessAttestationsAndUpdateHead(t *testing.T) {
ctx := context.Background()
opts := testServiceOptsWithDB(t)
opts = append(opts, WithAttestationPool(attestations.NewPool()), WithStateNotifier(&mockBeaconNode{}))
service, err := NewService(ctx, opts...)
require.NoError(t, err)
service.genesisTime = prysmTime.Now().Add(-1 * time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second)
genesisState, pks := util.DeterministicGenesisState(t, 64)
require.NoError(t, genesisState.SetGenesisTime(uint64(prysmTime.Now().Unix())-params.BeaconConfig().SecondsPerSlot))
require.NoError(t, service.saveGenesisData(ctx, genesisState))
atts, err := util.GenerateAttestations(genesisState, pks, 1, 0, false)
require.NoError(t, err)
tRoot := bytesutil.ToBytes32(atts[0].Data.Target.Root)
copied := genesisState.Copy()
copied, err = transition.ProcessSlots(ctx, copied, 1)
require.NoError(t, err)
require.NoError(t, service.cfg.BeaconDB.SaveState(ctx, copied, tRoot))
require.NoError(t, service.cfg.BeaconDB.SaveStateSummary(ctx, &ethpb.StateSummary{Root: tRoot[:]}))
require.NoError(t, service.cfg.ForkChoiceStore.InsertOptimisticBlock(ctx, 0, tRoot, tRoot, params.BeaconConfig().ZeroHash, 1, 1))
require.NoError(t, service.cfg.AttPool.SaveForkchoiceAttestations(atts))
b := util.NewBeaconBlock()
wb, err := wrapper.WrappedSignedBeaconBlock(b)
require.NoError(t, err)
r, err := b.Block.HashTreeRoot()
require.NoError(t, err)
require.NoError(t, service.cfg.BeaconDB.SaveBlock(ctx, wb))
service.head.root = r // Old head
require.Equal(t, 1, len(service.cfg.AttPool.ForkchoiceAttestations()))
require.NoError(t, err, service.UpdateHead(ctx))
require.Equal(t, tRoot, service.head.root) // Validate head is the new one
require.Equal(t, 0, len(service.cfg.AttPool.ForkchoiceAttestations())) // Validate att pool is empty
}

View File

@ -50,21 +50,22 @@ const headSyncMinEpochsAfterCheckpoint = 128
// Service represents a service that handles the internal
// logic of managing the full PoS beacon chain.
type Service struct {
cfg *config
ctx context.Context
cancel context.CancelFunc
genesisTime time.Time
head *head
headLock sync.RWMutex
originBlockRoot [32]byte // genesis root, or weak subjectivity checkpoint root, depending on how the node is initialized
nextEpochBoundarySlot types.Slot
boundaryRoots [][32]byte
checkpointStateCache *cache.CheckpointStateCache
initSyncBlocks map[[32]byte]interfaces.SignedBeaconBlock
initSyncBlocksLock sync.RWMutex
justifiedBalances *stateBalanceCache
wsVerifier *WeakSubjectivityVerifier
store *store.Store
cfg *config
ctx context.Context
cancel context.CancelFunc
genesisTime time.Time
head *head
headLock sync.RWMutex
originBlockRoot [32]byte // genesis root, or weak subjectivity checkpoint root, depending on how the node is initialized
nextEpochBoundarySlot types.Slot
boundaryRoots [][32]byte
checkpointStateCache *cache.CheckpointStateCache
initSyncBlocks map[[32]byte]interfaces.SignedBeaconBlock
initSyncBlocksLock sync.RWMutex
justifiedBalances *stateBalanceCache
wsVerifier *WeakSubjectivityVerifier
store *store.Store
processAttestationsLock sync.Mutex
}
// config options for the service.

View File

@ -451,6 +451,8 @@ func (s *ChainService) IsOptimisticForRoot(_ context.Context, _ [32]byte) (bool,
return s.Optimistic, nil
}
// ProcessAttestationsAndUpdateHead mocks the same method in the chain service.
func (s *ChainService) UpdateHead(_ context.Context) error { return nil }
// ReceiveAttesterSlashing mocks the same method in the chain service.
func (s *ChainService) ReceiveAttesterSlashing(context.Context, *ethpb.AttesterSlashing) {
}
func (s *ChainService) ReceiveAttesterSlashing(context.Context, *ethpb.AttesterSlashing) {}

View File

@ -779,6 +779,7 @@ func (b *BeaconNode) registerRPCService() error {
PeerManager: p2pService,
MetadataProvider: p2pService,
ChainInfoFetcher: chainService,
HeadUpdater: chainService,
HeadFetcher: chainService,
CanonicalFetcher: chainService,
ForkFetcher: chainService,

View File

@ -37,4 +37,5 @@ type Server struct {
V1Alpha1ValidatorServer *v1alpha1validator.Server
SyncChecker sync.Checker
CanonicalHistory *stategen.CanonicalHistory
HeadUpdater blockchain.HeadUpdater
}

View File

@ -14,6 +14,7 @@ import (
// providing RPC endpoints intended for validator clients.
type Server struct {
HeadFetcher blockchain.HeadFetcher
HeadUpdater blockchain.HeadUpdater
TimeFetcher blockchain.TimeFetcher
SyncChecker sync.Checker
AttestationsPool attestations.Pool

View File

@ -670,6 +670,7 @@ func TestProduceBlock(t *testing.T) {
HeadFetcher: &mockChain.ChainService{State: beaconState, Root: parentRoot[:]},
SyncChecker: &mockSync.Sync{IsSyncing: false},
BlockReceiver: &mockChain.ChainService{},
HeadUpdater: &mockChain.ChainService{},
ChainStartFetcher: &mockPOW.POWChain{},
Eth1InfoFetcher: &mockPOW.POWChain{},
Eth1BlockFetcher: &mockPOW.POWChain{},
@ -774,6 +775,7 @@ func TestProduceBlockV2(t *testing.T) {
HeadFetcher: &mockChain.ChainService{State: beaconState, Root: parentRoot[:]},
SyncChecker: &mockSync.Sync{IsSyncing: false},
BlockReceiver: &mockChain.ChainService{},
HeadUpdater: &mockChain.ChainService{},
ChainStartFetcher: &mockPOW.POWChain{},
Eth1InfoFetcher: &mockPOW.POWChain{},
Eth1BlockFetcher: &mockPOW.POWChain{},
@ -879,6 +881,7 @@ func TestProduceBlockV2(t *testing.T) {
HeadFetcher: &mockChain.ChainService{State: beaconState, Root: parentRoot[:]},
SyncChecker: &mockSync.Sync{IsSyncing: false},
BlockReceiver: &mockChain.ChainService{},
HeadUpdater: &mockChain.ChainService{},
ChainStartFetcher: &mockPOW.POWChain{},
Eth1InfoFetcher: &mockPOW.POWChain{},
Eth1BlockFetcher: &mockPOW.POWChain{},
@ -1028,6 +1031,7 @@ func TestProduceBlockV2(t *testing.T) {
HeadFetcher: &mockChain.ChainService{State: beaconState, Root: parentRoot[:]},
SyncChecker: &mockSync.Sync{IsSyncing: false},
BlockReceiver: &mockChain.ChainService{},
HeadUpdater: &mockChain.ChainService{},
ChainStartFetcher: &mockPOW.POWChain{},
Eth1InfoFetcher: &mockPOW.POWChain{},
Eth1BlockFetcher: &mockPOW.POWChain{},
@ -1174,6 +1178,7 @@ func TestProduceBlindedBlock(t *testing.T) {
HeadFetcher: &mockChain.ChainService{State: beaconState, Root: parentRoot[:]},
SyncChecker: &mockSync.Sync{IsSyncing: false},
BlockReceiver: &mockChain.ChainService{},
HeadUpdater: &mockChain.ChainService{},
ChainStartFetcher: &mockPOW.POWChain{},
Eth1InfoFetcher: &mockPOW.POWChain{},
Eth1BlockFetcher: &mockPOW.POWChain{},
@ -1280,6 +1285,7 @@ func TestProduceBlindedBlock(t *testing.T) {
HeadFetcher: &mockChain.ChainService{State: beaconState, Root: parentRoot[:]},
SyncChecker: &mockSync.Sync{IsSyncing: false},
BlockReceiver: &mockChain.ChainService{},
HeadUpdater: &mockChain.ChainService{},
ChainStartFetcher: &mockPOW.POWChain{},
Eth1InfoFetcher: &mockPOW.POWChain{},
Eth1BlockFetcher: &mockPOW.POWChain{},
@ -1429,6 +1435,7 @@ func TestProduceBlindedBlock(t *testing.T) {
HeadFetcher: &mockChain.ChainService{State: beaconState, Root: parentRoot[:]},
SyncChecker: &mockSync.Sync{IsSyncing: false},
BlockReceiver: &mockChain.ChainService{},
HeadUpdater: &mockChain.ChainService{},
ChainStartFetcher: &mockPOW.POWChain{},
Eth1InfoFetcher: &mockPOW.POWChain{},
Eth1BlockFetcher: &mockPOW.POWChain{},

View File

@ -46,4 +46,5 @@ type Server struct {
StateGen stategen.StateManager
SyncChecker sync.Checker
ReplayerBuilder stategen.ReplayerBuilder
HeadUpdater blockchain.HeadUpdater
}

View File

@ -82,6 +82,10 @@ func (vs *Server) buildPhase0BlockData(ctx context.Context, req *ethpb.BlockRequ
return nil, fmt.Errorf("syncing to latest head, not ready to respond")
}
if err := vs.HeadUpdater.UpdateHead(ctx); err != nil {
log.WithError(err).Error("Could not process attestations and update head")
}
// Retrieve the parent block as the current head of the canonical chain.
parentRoot, err := vs.HeadFetcher.HeadRoot(ctx)
if err != nil {

View File

@ -72,6 +72,7 @@ func TestProposer_GetBlock_OK(t *testing.T) {
HeadFetcher: &mock.ChainService{State: beaconState, Root: parentRoot[:]},
SyncChecker: &mockSync.Sync{IsSyncing: false},
BlockReceiver: &mock.ChainService{},
HeadUpdater: &mock.ChainService{},
ChainStartFetcher: &mockPOW.POWChain{},
Eth1InfoFetcher: &mockPOW.POWChain{},
Eth1BlockFetcher: &mockPOW.POWChain{},
@ -158,6 +159,7 @@ func TestProposer_GetBlock_AddsUnaggregatedAtts(t *testing.T) {
ChainStartFetcher: &mockPOW.POWChain{},
Eth1InfoFetcher: &mockPOW.POWChain{},
Eth1BlockFetcher: &mockPOW.POWChain{},
HeadUpdater: &mock.ChainService{},
MockEth1Votes: true,
SlashingsPool: slashings.NewPool(),
AttPool: attestations.NewPool(),
@ -2105,6 +2107,7 @@ func TestProposer_GetBeaconBlock_PreForkEpoch(t *testing.T) {
HeadFetcher: &mock.ChainService{State: beaconState, Root: parentRoot[:]},
SyncChecker: &mockSync.Sync{IsSyncing: false},
BlockReceiver: &mock.ChainService{},
HeadUpdater: &mock.ChainService{},
ChainStartFetcher: &mockPOW.POWChain{},
Eth1InfoFetcher: &mockPOW.POWChain{},
Eth1BlockFetcher: &mockPOW.POWChain{},
@ -2217,6 +2220,7 @@ func TestProposer_GetBeaconBlock_PostForkEpoch(t *testing.T) {
HeadFetcher: &mock.ChainService{State: beaconState, Root: parentRoot[:]},
SyncChecker: &mockSync.Sync{IsSyncing: false},
BlockReceiver: &mock.ChainService{},
HeadUpdater: &mock.ChainService{},
ChainStartFetcher: &mockPOW.POWChain{},
Eth1InfoFetcher: &mockPOW.POWChain{},
Eth1BlockFetcher: &mockPOW.POWChain{},
@ -2370,6 +2374,7 @@ func TestProposer_GetBeaconBlock_BellatrixEpoch(t *testing.T) {
TimeFetcher: &mock.ChainService{Genesis: time.Now()},
SyncChecker: &mockSync.Sync{IsSyncing: false},
BlockReceiver: &mock.ChainService{},
HeadUpdater: &mock.ChainService{},
ChainStartFetcher: &mockPOW.POWChain{},
Eth1InfoFetcher: &mockPOW.POWChain{},
Eth1BlockFetcher: c,

View File

@ -43,6 +43,7 @@ type Server struct {
AttestationCache *cache.AttestationCache
ProposerSlotIndexCache *cache.ProposerPayloadIDsCache
HeadFetcher blockchain.HeadFetcher
HeadUpdater blockchain.HeadUpdater
ForkFetcher blockchain.ForkFetcher
FinalizationFetcher blockchain.FinalizationFetcher
TimeFetcher blockchain.TimeFetcher

View File

@ -78,6 +78,7 @@ type Config struct {
BeaconMonitoringPort int
BeaconDB db.HeadAccessDatabase
ChainInfoFetcher blockchain.ChainInfoFetcher
HeadUpdater blockchain.HeadUpdater
HeadFetcher blockchain.HeadFetcher
CanonicalFetcher blockchain.CanonicalFetcher
ForkFetcher blockchain.ForkFetcher
@ -189,6 +190,7 @@ func (s *Service) Start() {
AttPool: s.cfg.AttestationsPool,
ExitPool: s.cfg.ExitPool,
HeadFetcher: s.cfg.HeadFetcher,
HeadUpdater: s.cfg.HeadUpdater,
ForkFetcher: s.cfg.ForkFetcher,
FinalizationFetcher: s.cfg.FinalizationFetcher,
TimeFetcher: s.cfg.GenesisTimeFetcher,
@ -215,6 +217,7 @@ func (s *Service) Start() {
}
validatorServerV1 := &validator.Server{
HeadFetcher: s.cfg.HeadFetcher,
HeadUpdater: s.cfg.HeadUpdater,
TimeFetcher: s.cfg.GenesisTimeFetcher,
SyncChecker: s.cfg.SyncService,
AttestationsPool: s.cfg.AttestationsPool,
@ -261,6 +264,7 @@ func (s *Service) Start() {
BeaconDB: s.cfg.BeaconDB,
AttestationsPool: s.cfg.AttestationsPool,
SlashingsPool: s.cfg.SlashingsPool,
HeadUpdater: s.cfg.HeadUpdater,
HeadFetcher: s.cfg.HeadFetcher,
FinalizationFetcher: s.cfg.FinalizationFetcher,
CanonicalFetcher: s.cfg.CanonicalFetcher,