prysm-pulse/beacon-chain/sync/pending_attestations_queue.go
kasey 918129cf36
Replace statefeed Initialize (#12285)
* refactor initialization to blocking startup method

* require genesisSetter in blockchain, fix tests

* work-around gazelle weirdness

* fix dep gazelle ignores

* only call SetGenesis once

* fix typo

* validator test setup and fix to return right error

* move waitForChainStart to Start

* wire up sync Service.genesisWaiter

* fix p2p genesisWaiter plumbing

* remove extra clock type, integrate into genesis

and rename

* use time.Now when no Nower is specified

* remove unused ClockSetter

* simplify rpc context checking

* fix typo

* use clock everywhere in sync; [32]byte val root

* don't use DeepEqual to compare [32]byte and []byte

* don't use clock in init sync, not wired up yet

* use clock waiter in blockchain as well

* use cancelable contexts in tests with goroutines

* missed a reference to WithClockSetter

* Update beacon-chain/startup/genesis.go

Co-authored-by: Radosław Kapka <rkapka@wp.pl>

* Update beacon-chain/blockchain/service_test.go

Co-authored-by: Radosław Kapka <rkapka@wp.pl>

* more clear docs

* doc for NewClock

* move clock typedef to more logical file name

* adding documentation

* gaz

* fixes for capella

* reducing test raciness

* fix races in committee cache tests

* lint

* add tests on Duration slot math helper

* startup package test coverage

* fix bad merge

* set non-zero genesis time in tests that call Start

* happy deepsource, happy me-epsource

* replace Synced event with channel

* remove unused error

* remove accidental wip commit

* gaz!

* remove unused event constants

* remove sync statefeed subscription to fix deadlock

* remove state notifier

* fix build

---------

Co-authored-by: Kasey Kirkham <kasey@users.noreply.github.com>
Co-authored-by: Radosław Kapka <rkapka@wp.pl>
Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
Co-authored-by: nisdas <nishdas93@gmail.com>
2023-05-03 04:34:01 +00:00

210 lines
8.5 KiB
Go

package sync
import (
"context"
"encoding/hex"
"sync"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/prysmaticlabs/prysm/v4/async"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/v4/config/params"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v4/crypto/rand"
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v4/time/slots"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)
// This defines how often a node cleans up and processes pending attestations in the queue.
var processPendingAttsPeriod = slots.DivideSlotBy(2 /* twice per slot */)
// This processes pending attestation queues on every `processPendingAttsPeriod`.
func (s *Service) processPendingAttsQueue() {
// Prevents multiple queue processing goroutines (invoked by RunEvery) from contending for data.
mutex := new(sync.Mutex)
async.RunEvery(s.ctx, processPendingAttsPeriod, func() {
mutex.Lock()
if err := s.processPendingAtts(s.ctx); err != nil {
log.WithError(err).Debugf("Could not process pending attestation: %v", err)
}
mutex.Unlock()
})
}
// This defines how pending attestations are processed. It contains features:
// 1. Clean up invalid pending attestations from the queue.
// 2. Check if pending attestations can be processed when the block has arrived.
// 3. Request block from a random peer if unable to proceed step 2.
func (s *Service) processPendingAtts(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "processPendingAtts")
defer span.End()
// Before a node processes pending attestations queue, it verifies
// the attestations in the queue are still valid. Attestations will
// be deleted from the queue if invalid (ie. getting staled from falling too many slots behind).
s.validatePendingAtts(ctx, s.cfg.clock.CurrentSlot())
s.pendingAttsLock.RLock()
roots := make([][32]byte, 0, len(s.blkRootToPendingAtts))
for br := range s.blkRootToPendingAtts {
roots = append(roots, br)
}
s.pendingAttsLock.RUnlock()
var pendingRoots [][32]byte
randGen := rand.NewGenerator()
for _, bRoot := range roots {
s.pendingAttsLock.RLock()
attestations := s.blkRootToPendingAtts[bRoot]
s.pendingAttsLock.RUnlock()
// has the pending attestation's missing block arrived and the node processed block yet?
if s.cfg.beaconDB.HasBlock(ctx, bRoot) && (s.cfg.beaconDB.HasState(ctx, bRoot) || s.cfg.beaconDB.HasStateSummary(ctx, bRoot)) {
s.processAttestations(ctx, attestations)
log.WithFields(logrus.Fields{
"blockRoot": hex.EncodeToString(bytesutil.Trunc(bRoot[:])),
"pendingAttsCount": len(attestations),
}).Debug("Verified and saved pending attestations to pool")
// Delete the missing block root key from pending attestation queue so a node will not request for the block again.
s.pendingAttsLock.Lock()
delete(s.blkRootToPendingAtts, bRoot)
s.pendingAttsLock.Unlock()
} else {
// Pending attestation's missing block has not arrived yet.
log.WithFields(logrus.Fields{
"currentSlot": s.cfg.clock.CurrentSlot(),
"attSlot": attestations[0].Message.Aggregate.Data.Slot,
"attCount": len(attestations),
"blockRoot": hex.EncodeToString(bytesutil.Trunc(bRoot[:])),
}).Debug("Requesting block for pending attestation")
pendingRoots = append(pendingRoots, bRoot)
}
}
return s.sendBatchRootRequest(ctx, pendingRoots, randGen)
}
func (s *Service) processAttestations(ctx context.Context, attestations []*ethpb.SignedAggregateAttestationAndProof) {
for _, signedAtt := range attestations {
att := signedAtt.Message
// The pending attestations can arrive in both aggregated and unaggregated forms,
// each from has distinct validation steps.
if helpers.IsAggregated(att.Aggregate) {
// Save the pending aggregated attestation to the pool if it passes the aggregated
// validation steps.
valRes, err := s.validateAggregatedAtt(ctx, signedAtt)
if err != nil {
log.WithError(err).Debug("Pending aggregated attestation failed validation")
}
aggValid := pubsub.ValidationAccept == valRes
if s.validateBlockInAttestation(ctx, signedAtt) && aggValid {
if err := s.cfg.attPool.SaveAggregatedAttestation(att.Aggregate); err != nil {
log.WithError(err).Debug("Could not save aggregate attestation")
continue
}
s.setAggregatorIndexEpochSeen(att.Aggregate.Data.Target.Epoch, att.AggregatorIndex)
// Broadcasting the signed attestation again once a node is able to process it.
if err := s.cfg.p2p.Broadcast(ctx, signedAtt); err != nil {
log.WithError(err).Debug("Could not broadcast")
}
}
} else {
// This is an important validation before retrieving attestation pre state to defend against
// attestation's target intentionally reference checkpoint that's long ago.
// Verify current finalized checkpoint is an ancestor of the block defined by the attestation's beacon block root.
if !s.cfg.chain.InForkchoice(bytesutil.ToBytes32(att.Aggregate.Data.BeaconBlockRoot)) {
log.WithError(blockchain.ErrNotDescendantOfFinalized).Debug("Could not verify finalized consistency")
continue
}
if err := s.cfg.chain.VerifyLmdFfgConsistency(ctx, att.Aggregate); err != nil {
log.WithError(err).Debug("Could not verify FFG consistency")
continue
}
preState, err := s.cfg.chain.AttestationTargetState(ctx, att.Aggregate.Data.Target)
if err != nil {
log.WithError(err).Debug("Could not retrieve attestation prestate")
continue
}
valid, err := s.validateUnaggregatedAttWithState(ctx, att.Aggregate, preState)
if err != nil {
log.WithError(err).Debug("Pending unaggregated attestation failed validation")
continue
}
if valid == pubsub.ValidationAccept {
if err := s.cfg.attPool.SaveUnaggregatedAttestation(att.Aggregate); err != nil {
log.WithError(err).Debug("Could not save unaggregated attestation")
continue
}
s.setSeenCommitteeIndicesSlot(att.Aggregate.Data.Slot, att.Aggregate.Data.CommitteeIndex, att.Aggregate.AggregationBits)
valCount, err := helpers.ActiveValidatorCount(ctx, preState, slots.ToEpoch(att.Aggregate.Data.Slot))
if err != nil {
log.WithError(err).Debug("Could not retrieve active validator count")
continue
}
// Broadcasting the signed attestation again once a node is able to process it.
if err := s.cfg.p2p.BroadcastAttestation(ctx, helpers.ComputeSubnetForAttestation(valCount, signedAtt.Message.Aggregate), signedAtt.Message.Aggregate); err != nil {
log.WithError(err).Debug("Could not broadcast")
}
}
}
}
}
// This defines how pending attestations is saved in the map. The key is the
// root of the missing block. The value is the list of pending attestations
// that voted for that block root.
func (s *Service) savePendingAtt(att *ethpb.SignedAggregateAttestationAndProof) {
root := bytesutil.ToBytes32(att.Message.Aggregate.Data.BeaconBlockRoot)
s.pendingAttsLock.Lock()
defer s.pendingAttsLock.Unlock()
_, ok := s.blkRootToPendingAtts[root]
if !ok {
s.blkRootToPendingAtts[root] = []*ethpb.SignedAggregateAttestationAndProof{att}
return
}
// Skip if the attestation from the same aggregator already exists in the pending queue.
for _, a := range s.blkRootToPendingAtts[root] {
if a.Message.AggregatorIndex == att.Message.AggregatorIndex {
return
}
}
s.blkRootToPendingAtts[root] = append(s.blkRootToPendingAtts[root], att)
}
// This validates the pending attestations in the queue are still valid.
// If not valid, a node will remove it in the queue in place. The validity
// check specifies the pending attestation could not fall one epoch behind
// of the current slot.
func (s *Service) validatePendingAtts(ctx context.Context, slot primitives.Slot) {
ctx, span := trace.StartSpan(ctx, "validatePendingAtts")
defer span.End()
s.pendingAttsLock.Lock()
defer s.pendingAttsLock.Unlock()
for bRoot, atts := range s.blkRootToPendingAtts {
for i := len(atts) - 1; i >= 0; i-- {
if slot >= atts[i].Message.Aggregate.Data.Slot+params.BeaconConfig().SlotsPerEpoch {
// Remove the pending attestation from the list in place.
atts = append(atts[:i], atts[i+1:]...)
}
}
s.blkRootToPendingAtts[bRoot] = atts
// If the pending attestations list of a given block root is empty,
// a node will remove the key from the map to avoid dangling keys.
if len(s.blkRootToPendingAtts[bRoot]) == 0 {
delete(s.blkRootToPendingAtts, bRoot)
}
}
}