mirror of
https://gitlab.com/pulsechaincom/prysm-pulse.git
synced 2025-01-20 16:41:11 +00:00
06a5548424
* `Test_processAttestations`: Remove duplicated tests. * Sort indexed attestations by data root. * `processAttestations`: Don't return duplicate slashings anymore. Fix https://github.com/prysmaticlabs/prysm/issues/13592. * `AttesterDoubleVote`: Rename fields. * Detect double votes in different batches. In order to do that: 1. Each attestation of the batch is tested against the other attestations of the batch. 2. Each attestation of the batch is tested against the content of the database. 2. Attestations are saved into the database. Fixes https://github.com/prysmaticlabs/prysm/issues/13590.
262 lines
8.6 KiB
Go
262 lines
8.6 KiB
Go
package slasher
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
"github.com/pkg/errors"
|
|
slashertypes "github.com/prysmaticlabs/prysm/v4/beacon-chain/slasher/types"
|
|
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
|
|
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
|
|
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
|
|
"github.com/prysmaticlabs/prysm/v4/time/slots"
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
const (
|
|
couldNotSaveAttRecord = "Could not save attestation records to DB"
|
|
couldNotCheckSlashableAtt = "Could not check slashable attestations"
|
|
couldNotProcessAttesterSlashings = "Could not process attester slashings"
|
|
)
|
|
|
|
// Receive indexed attestations from some source event feed,
|
|
// validating their integrity before appending them to an attestation queue
|
|
// for batch processing in a separate routine.
|
|
func (s *Service) receiveAttestations(ctx context.Context, indexedAttsChan chan *ethpb.IndexedAttestation) {
|
|
defer s.wg.Done()
|
|
|
|
sub := s.serviceCfg.IndexedAttestationsFeed.Subscribe(indexedAttsChan)
|
|
defer sub.Unsubscribe()
|
|
for {
|
|
select {
|
|
case att := <-indexedAttsChan:
|
|
if !validateAttestationIntegrity(att) {
|
|
continue
|
|
}
|
|
dataRoot, err := att.Data.HashTreeRoot()
|
|
if err != nil {
|
|
log.WithError(err).Error("Could not get hash tree root of attestation")
|
|
continue
|
|
}
|
|
attWrapper := &slashertypes.IndexedAttestationWrapper{
|
|
IndexedAttestation: att,
|
|
DataRoot: dataRoot,
|
|
}
|
|
s.attsQueue.push(attWrapper)
|
|
case err := <-sub.Err():
|
|
log.WithError(err).Debug("Subscriber closed with error")
|
|
return
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// Receive beacon blocks from some source event feed,
|
|
func (s *Service) receiveBlocks(ctx context.Context, beaconBlockHeadersChan chan *ethpb.SignedBeaconBlockHeader) {
|
|
defer s.wg.Done()
|
|
|
|
sub := s.serviceCfg.BeaconBlockHeadersFeed.Subscribe(beaconBlockHeadersChan)
|
|
defer sub.Unsubscribe()
|
|
for {
|
|
select {
|
|
case blockHeader := <-beaconBlockHeadersChan:
|
|
if !validateBlockHeaderIntegrity(blockHeader) {
|
|
continue
|
|
}
|
|
headerRoot, err := blockHeader.Header.HashTreeRoot()
|
|
if err != nil {
|
|
log.WithError(err).Error("Could not get hash tree root of signed block header")
|
|
continue
|
|
}
|
|
wrappedProposal := &slashertypes.SignedBlockHeaderWrapper{
|
|
SignedBeaconBlockHeader: blockHeader,
|
|
HeaderRoot: headerRoot,
|
|
}
|
|
s.blksQueue.push(wrappedProposal)
|
|
case err := <-sub.Err():
|
|
log.WithError(err).Debug("Subscriber closed with error")
|
|
return
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// Process queued attestations every time a slot ticker fires. We retrieve
|
|
// these attestations from a queue, then group them all by validator chunk index.
|
|
// This grouping will allow us to perform detection on batches of attestations
|
|
// per validator chunk index which can be done concurrently.
|
|
func (s *Service) processQueuedAttestations(ctx context.Context, slotTicker <-chan primitives.Slot) {
|
|
defer s.wg.Done()
|
|
|
|
for {
|
|
select {
|
|
case currentSlot := <-slotTicker:
|
|
// Retrieve all attestations from the queue.
|
|
attestations := s.attsQueue.dequeue()
|
|
|
|
// Process the retrieved attestations.
|
|
s.processAttestations(ctx, attestations, currentSlot)
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Service) processAttestations(
|
|
ctx context.Context,
|
|
attestations []*slashertypes.IndexedAttestationWrapper,
|
|
currentSlot primitives.Slot,
|
|
) map[[fieldparams.RootLength]byte]*ethpb.AttesterSlashing {
|
|
// Get the current epoch from the current slot.
|
|
currentEpoch := slots.ToEpoch(currentSlot)
|
|
|
|
// Take all the attestations in the queue and filter out
|
|
// those which are valid now and valid in the future.
|
|
validAttestations, validInFutureAttestations, numDropped := s.filterAttestations(attestations, currentEpoch)
|
|
|
|
// Increase corresponding prometheus metrics.
|
|
deferredAttestationsTotal.Add(float64(len(validInFutureAttestations)))
|
|
droppedAttestationsTotal.Add(float64(numDropped))
|
|
processedAttestationsTotal.Add(float64(len(validAttestations)))
|
|
|
|
// We add back those attestations that are valid in the future to the queue.
|
|
s.attsQueue.extend(validInFutureAttestations)
|
|
|
|
// Compute some counts.
|
|
queuedAttestationsCount := s.attsQueue.size()
|
|
validAttestationsCount := len(validAttestations)
|
|
validInFutureAttestationsCount := len(validInFutureAttestations)
|
|
|
|
// Log useful infrormation
|
|
log.WithFields(logrus.Fields{
|
|
"currentSlot": currentSlot,
|
|
"currentEpoch": currentEpoch,
|
|
"numValidAtts": validAttestationsCount,
|
|
"numDeferredAtts": validInFutureAttestationsCount,
|
|
"numDroppedAtts": numDropped,
|
|
"attsQueueSize": queuedAttestationsCount,
|
|
}).Info("Processing queued attestations for slashing detection")
|
|
|
|
// Check for attestatinos slashings (double, sourrounding, surrounded votes).
|
|
slashings, err := s.checkSlashableAttestations(ctx, currentEpoch, validAttestations)
|
|
if err != nil {
|
|
log.WithError(err).Error(couldNotCheckSlashableAtt)
|
|
return nil
|
|
}
|
|
|
|
// Process attester slashings by verifying their signatures, submitting
|
|
// to the beacon node's operations pool, and logging them.
|
|
processedAttesterSlashings, err := s.processAttesterSlashings(ctx, slashings)
|
|
if err != nil {
|
|
log.WithError(err).Error(couldNotProcessAttesterSlashings)
|
|
return nil
|
|
}
|
|
|
|
return processedAttesterSlashings
|
|
}
|
|
|
|
// Process queued blocks every time an epoch ticker fires. We retrieve
|
|
// these blocks from a queue, then perform double proposal detection.
|
|
func (s *Service) processQueuedBlocks(ctx context.Context, slotTicker <-chan primitives.Slot) {
|
|
defer s.wg.Done()
|
|
|
|
for {
|
|
select {
|
|
case currentSlot := <-slotTicker:
|
|
blocks := s.blksQueue.dequeue()
|
|
currentEpoch := slots.ToEpoch(currentSlot)
|
|
|
|
receivedBlocksTotal.Add(float64(len(blocks)))
|
|
|
|
log.WithFields(logrus.Fields{
|
|
"currentSlot": currentSlot,
|
|
"currentEpoch": currentEpoch,
|
|
"numBlocks": len(blocks),
|
|
}).Info("Processing queued blocks for slashing detection")
|
|
|
|
start := time.Now()
|
|
// Check for slashings.
|
|
slashings, err := s.detectProposerSlashings(ctx, blocks)
|
|
if err != nil {
|
|
log.WithError(err).Error("Could not detect proposer slashings")
|
|
continue
|
|
}
|
|
|
|
// Process proposer slashings by verifying their signatures, submitting
|
|
// to the beacon node's operations pool, and logging them.
|
|
if err := s.processProposerSlashings(ctx, slashings); err != nil {
|
|
log.WithError(err).Error("Could not process proposer slashings")
|
|
continue
|
|
}
|
|
|
|
log.WithField("elapsed", time.Since(start)).Debug("Done checking slashable blocks")
|
|
|
|
processedBlocksTotal.Add(float64(len(blocks)))
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// Prunes slasher data on each slot tick to prevent unnecessary build-up of disk space usage.
|
|
func (s *Service) pruneSlasherData(ctx context.Context, slotTicker <-chan primitives.Slot) {
|
|
defer s.wg.Done()
|
|
|
|
for {
|
|
select {
|
|
case <-slotTicker:
|
|
headEpoch := slots.ToEpoch(s.serviceCfg.HeadStateFetcher.HeadSlot())
|
|
if err := s.pruneSlasherDataWithinSlidingWindow(ctx, headEpoch); err != nil {
|
|
log.WithError(err).Error("Could not prune slasher data")
|
|
continue
|
|
}
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// Prunes slasher data by using a sliding window of [current_epoch - HISTORY_LENGTH, current_epoch].
|
|
// All data before that window is unnecessary for slasher, so can be periodically deleted.
|
|
// Say HISTORY_LENGTH is 4 and we have data for epochs 0, 1, 2, 3. Once we hit epoch 4, the sliding window
|
|
// we care about is 1, 2, 3, 4, so we can delete data for epoch 0.
|
|
func (s *Service) pruneSlasherDataWithinSlidingWindow(ctx context.Context, currentEpoch primitives.Epoch) error {
|
|
var maxPruningEpoch primitives.Epoch
|
|
if currentEpoch >= s.params.historyLength {
|
|
maxPruningEpoch = currentEpoch - s.params.historyLength
|
|
} else {
|
|
// If the current epoch is less than the history length, we should not
|
|
// attempt to prune at all.
|
|
return nil
|
|
}
|
|
start := time.Now()
|
|
log.WithFields(logrus.Fields{
|
|
"currentEpoch": currentEpoch,
|
|
"pruningAllBeforeEpoch": maxPruningEpoch,
|
|
}).Info("Pruning old attestations and proposals for slasher")
|
|
numPrunedAtts, err := s.serviceCfg.Database.PruneAttestationsAtEpoch(
|
|
ctx, maxPruningEpoch,
|
|
)
|
|
if err != nil {
|
|
return errors.Wrap(err, "Could not prune attestations")
|
|
}
|
|
numPrunedProposals, err := s.serviceCfg.Database.PruneProposalsAtEpoch(
|
|
ctx, maxPruningEpoch,
|
|
)
|
|
if err != nil {
|
|
return errors.Wrap(err, "Could not prune proposals")
|
|
}
|
|
fields := logrus.Fields{}
|
|
if numPrunedAtts > 0 {
|
|
fields["numPrunedAtts"] = numPrunedAtts
|
|
}
|
|
if numPrunedProposals > 0 {
|
|
fields["numPrunedProposals"] = numPrunedProposals
|
|
}
|
|
fields["elapsed"] = time.Since(start)
|
|
log.WithFields(fields).Info("Done pruning old attestations and proposals for slasher")
|
|
return nil
|
|
}
|