mirror of
https://gitlab.com/pulsechaincom/prysm-pulse.git
synced 2025-01-07 10:12:19 +00:00
7f7866ff2a
* Starting a quick PoC * Rate limit to one epoch worth of blocks in memory * Proof of concept working * Quick comment out * Save previous finalized checkpoint * Test * Minor fixes * More run time fixes * Remove panic * Feature flag * Removed unused methods * Fixed tests * E2e test * comment * Compatible with current initial sync * Starting * New cache * Cache getters and setters * It should be part of state gen * Need to use cache for DB * Don't have to use finalized state * Rm unused file * some changes to memory mgmt when using mempool * More run time fixes * Can sync to head * Feedback * Revert "some changes to memory mgmt when using mempool" This reverts commit f5b3e7ff4714fef9f0397007f519a45fa259ad24. * Fixed sync tests * Fixed existing tests * Test for state summary getter * Gaz * Fix kafka passthrough * Fixed inputs * Gaz * Fixed build * Fixed visibility * Trying without the ignore * Didn't work.. * Fix kafka Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com> Co-authored-by: Preston Van Loon <preston@prysmaticlabs.com>
190 lines
7.0 KiB
Go
190 lines
7.0 KiB
Go
package sync
|
|
|
|
import (
|
|
"context"
|
|
"encoding/hex"
|
|
"sync"
|
|
"time"
|
|
|
|
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
|
|
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
|
|
"github.com/prysmaticlabs/prysm/shared/bls"
|
|
"github.com/prysmaticlabs/prysm/shared/bytesutil"
|
|
"github.com/prysmaticlabs/prysm/shared/featureconfig"
|
|
"github.com/prysmaticlabs/prysm/shared/params"
|
|
"github.com/prysmaticlabs/prysm/shared/runutil"
|
|
"github.com/prysmaticlabs/prysm/shared/traceutil"
|
|
"github.com/sirupsen/logrus"
|
|
"go.opencensus.io/trace"
|
|
"golang.org/x/exp/rand"
|
|
)
|
|
|
|
// This defines how often a node cleans up and processes pending attestations in the queue.
|
|
var processPendingAttsPeriod = time.Duration(params.BeaconConfig().SecondsPerSlot/2) * time.Second
|
|
|
|
// This processes pending attestation queues on every `processPendingAttsPeriod`.
|
|
func (s *Service) processPendingAttsQueue() {
|
|
ctx := context.Background()
|
|
mutex := new(sync.Mutex)
|
|
runutil.RunEvery(s.ctx, processPendingAttsPeriod, func() {
|
|
mutex.Lock()
|
|
if err := s.processPendingAtts(ctx); err != nil {
|
|
log.WithError(err).Errorf("Could not process pending attestation: %v", err)
|
|
}
|
|
mutex.Unlock()
|
|
})
|
|
}
|
|
|
|
// This defines how pending attestations are processed. It contains features:
|
|
// 1. Clean up invalid pending attestations from the queue.
|
|
// 2. Check if pending attestations can be processed when the block has arrived.
|
|
// 3. Request block from a random peer if unable to proceed step 2.
|
|
func (s *Service) processPendingAtts(ctx context.Context) error {
|
|
ctx, span := trace.StartSpan(ctx, "processPendingAtts")
|
|
defer span.End()
|
|
|
|
pids := s.p2p.Peers().Connected()
|
|
|
|
// Before a node processes pending attestations queue, it verifies
|
|
// the attestations in the queue are still valid. Attestations will
|
|
// be deleted from the queue if invalid (ie. getting staled from falling too many slots behind).
|
|
s.validatePendingAtts(ctx, s.chain.CurrentSlot())
|
|
|
|
roots := make([][32]byte, 0, len(s.blkRootToPendingAtts))
|
|
s.pendingAttsLock.RLock()
|
|
for br := range s.blkRootToPendingAtts {
|
|
roots = append(roots, br)
|
|
}
|
|
s.pendingAttsLock.RUnlock()
|
|
|
|
for _, bRoot := range roots {
|
|
s.pendingAttsLock.RLock()
|
|
attestations := s.blkRootToPendingAtts[bRoot]
|
|
s.pendingAttsLock.RUnlock()
|
|
// Has the pending attestation's missing block arrived and the node processed block yet?
|
|
hasStateSummary := featureconfig.Get().NewStateMgmt && s.db.HasStateSummary(ctx, bRoot) || s.stateSummaryCache.Has(bRoot)
|
|
if s.db.HasBlock(ctx, bRoot) && (s.db.HasState(ctx, bRoot) || hasStateSummary) {
|
|
numberOfBlocksRecoveredFromAtt.Inc()
|
|
for _, att := range attestations {
|
|
// The pending attestations can arrive in both aggregated and unaggregated forms,
|
|
// each from has distinct validation steps.
|
|
if helpers.IsAggregated(att.Aggregate) {
|
|
// Save the pending aggregated attestation to the pool if it passes the aggregated
|
|
// validation steps.
|
|
if s.validateBlockInAttestation(ctx, att) && s.validateAggregatedAtt(ctx, att) {
|
|
if err := s.attPool.SaveAggregatedAttestation(att.Aggregate); err != nil {
|
|
return err
|
|
}
|
|
numberOfAttsRecovered.Inc()
|
|
|
|
// Broadcasting the attestation again once a node is able to process it.
|
|
if err := s.p2p.Broadcast(ctx, att); err != nil {
|
|
log.WithError(err).Error("Failed to broadcast")
|
|
}
|
|
}
|
|
} else {
|
|
// Save the pending unaggregated attestation to the pool if the BLS signature is
|
|
// valid.
|
|
if _, err := bls.SignatureFromBytes(att.Aggregate.Signature); err != nil {
|
|
continue
|
|
}
|
|
if err := s.attPool.SaveUnaggregatedAttestation(att.Aggregate); err != nil {
|
|
return err
|
|
}
|
|
numberOfAttsRecovered.Inc()
|
|
|
|
// Broadcasting the attestation again once a node is able to process it.
|
|
if err := s.p2p.Broadcast(ctx, att); err != nil {
|
|
log.WithError(err).Error("Failed to broadcast")
|
|
}
|
|
}
|
|
}
|
|
log.WithFields(logrus.Fields{
|
|
"blockRoot": hex.EncodeToString(bytesutil.Trunc(bRoot[:])),
|
|
"pendingAttsCount": len(attestations),
|
|
}).Info("Verified and saved pending attestations to pool")
|
|
|
|
// Delete the missing block root key from pending attestation queue so a node will not request for the block again.
|
|
s.pendingAttsLock.Lock()
|
|
delete(s.blkRootToPendingAtts, bRoot)
|
|
s.pendingAttsLock.Unlock()
|
|
} else {
|
|
// Pending attestation's missing block has not arrived yet.
|
|
log.WithFields(logrus.Fields{
|
|
"currentSlot": s.chain.CurrentSlot(),
|
|
"attSlot": attestations[0].Aggregate.Data.Slot,
|
|
"attCount": len(attestations),
|
|
"blockRoot": hex.EncodeToString(bytesutil.Trunc(bRoot[:])),
|
|
}).Debug("Requesting block for pending attestation")
|
|
|
|
// Start with a random peer to query, but choose the first peer in our unsorted list that claims to
|
|
// have a head slot newer or equal to the pending attestation's target boundary slot.
|
|
if len(pids) == 0 {
|
|
return nil
|
|
}
|
|
pid := pids[rand.Int()%len(pids)]
|
|
targetSlot := helpers.SlotToEpoch(attestations[0].Aggregate.Data.Target.Epoch)
|
|
for _, p := range pids {
|
|
if cs, _ := s.p2p.Peers().ChainState(p); cs != nil && cs.HeadSlot >= targetSlot {
|
|
pid = p
|
|
break
|
|
}
|
|
}
|
|
|
|
req := [][32]byte{bRoot}
|
|
if err := s.sendRecentBeaconBlocksRequest(ctx, req, pid); err != nil {
|
|
traceutil.AnnotateError(span, err)
|
|
log.Errorf("Could not send recent block request: %v", err)
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// This defines how pending attestations is saved in the map. The key is the
|
|
// root of the missing block. The value is the list of pending attestations
|
|
// that voted for that block root.
|
|
func (s *Service) savePendingAtt(att *ethpb.AggregateAttestationAndProof) {
|
|
root := bytesutil.ToBytes32(att.Aggregate.Data.BeaconBlockRoot)
|
|
|
|
s.pendingAttsLock.Lock()
|
|
defer s.pendingAttsLock.Unlock()
|
|
_, ok := s.blkRootToPendingAtts[root]
|
|
if !ok {
|
|
s.blkRootToPendingAtts[root] = []*ethpb.AggregateAttestationAndProof{att}
|
|
return
|
|
}
|
|
|
|
s.blkRootToPendingAtts[root] = append(s.blkRootToPendingAtts[root], att)
|
|
}
|
|
|
|
// This validates the pending attestations in the queue are still valid.
|
|
// If not valid, a node will remove it in the queue in place. The validity
|
|
// check specifies the pending attestation could not fall one epoch behind
|
|
// of the current slot.
|
|
func (s *Service) validatePendingAtts(ctx context.Context, slot uint64) {
|
|
ctx, span := trace.StartSpan(ctx, "validatePendingAtts")
|
|
defer span.End()
|
|
|
|
s.pendingAttsLock.Lock()
|
|
defer s.pendingAttsLock.Unlock()
|
|
|
|
for bRoot, atts := range s.blkRootToPendingAtts {
|
|
for i := len(atts) - 1; i >= 0; i-- {
|
|
if slot >= atts[i].Aggregate.Data.Slot+params.BeaconConfig().SlotsPerEpoch {
|
|
// Remove the pending attestation from the list in place.
|
|
atts = append(atts[:i], atts[i+1:]...)
|
|
numberOfAttsNotRecovered.Inc()
|
|
}
|
|
}
|
|
s.blkRootToPendingAtts[bRoot] = atts
|
|
|
|
// If the pending attestations list of a given block root is empty,
|
|
// a node will remove the key from the map to avoid dangling keys.
|
|
if len(s.blkRootToPendingAtts[bRoot]) == 0 {
|
|
delete(s.blkRootToPendingAtts, bRoot)
|
|
numberOfBlocksNotRecoveredFromAtt.Inc()
|
|
}
|
|
}
|
|
}
|