Manu NALEPA 7a294e861e
Beacon node slasher improvement (#13549)
* Slasher: Ensure all gorouting are stopped before running `Stop` actions.

Fixes #13550.
In tests, `exitChan` are now useless since waitgroup are used to wait
for all goroutines to be stopped.

* `slasher.go`: Add comments and rename some variables. - NFC

* `detect_blocks.go`: Improve. - NFC

- Rename some variables.
- Add comments.
- Use second element of `range` when possible.

* `chunks.go`: Remove `_`receivers. - NFC

* `validateAttestationIntegrity`: Improve documentation. - NFC

* `filterAttestations`: Avoid `else`and rename variable. - NFC

* `slasher.go`: Fix and add comments.

* `SaveAttestationRecordsForValidators`: Remove unused code.

* `LastEpochWrittenForValidators`: Name variables consistently. - NFC

Avoid mixes between `indice(s)`and `index(es)`.

* `SaveLastEpochsWrittenForValidators`: Name variables consistently. - NFC

* `CheckAttesterDoubleVotes`: Rename variables and add comments. - NFC

* `schema.go`: Add comments. - NFC

* `processQueuedAttestations`: Add comments. - NFC

* `checkDoubleVotes`: Rename variable. - NFC

* `Test_processQueuedAttestations`: Ensure there is no error log.

* `shouldNotBeSlashable` => `shouldBeSlashable`

* `Test_processQueuedAttestations`: Add 2 test cases:
- Same target with different signing roots
- Same target with same signing roots

* `checkDoubleVotesOnDisk` ==> `checkDoubleVotes`.

Before this commit, `checkDoubleVotes` did two tasks:
- Checking if there are any slashable double votes in the input
  list of attestations with respect to each other.
- Checking if there are any slashable double votes in the input
  list of attestations with respect to our database.

However, `checkDoubleVotes` is called only in
`checkSlashableAttestations`.

And `checkSlashableAttestations` is called only in:
- `processQueuedAttestations`, and in
- `IsSlashableAttestation`

Study of case `processQueuedAttestations`:
---------------------------------------------
In `processQueuedAttestations`, `checkSlashableAttestations`
is ALWAYS called after
`Database.SaveAttestationRecordsForValidators`.

It means that, when calling `checkSlashableAttestations`,
`validAtts` are ALREADY stored in the DB.

Each attestation of `validAtts` will be checked twice:
- Against the other attestations of `validAtts` (the portion of
  deleted code)
- Against the content of the database.

One of those two checks is redundent.
==> We can remove the check against other attestations in `validAtts`.

Study of case `Database.SaveAttestationRecordsForValidators`:
----------------------------------------------------------------
In `Database.SaveAttestationRecordsForValidators`,
`checkSlashableAttestations` is ALWAYS called with a list of
attestations containing only ONE attestation.

This only attestaion will be checked twice:
- Against itself, and an attestation cannot conflict with itself.
- Against the content of the database.

==> We can remove the check against other attestations in `validAtts`.

=========================

In both cases, we showed that we can remove the check of attestation
against the content of `validAtts`, and the corresponding test
`Test_checkDoubleVotes_SlashableInputAttestations`.

* `Test_processQueuedBlocks_DetectsDoubleProposals`: Wrap proposals.

So we can add new proposals later.

* Fix slasher multiple proposals false negative.

If a first batch of blocks is sent with:
- validator 1 - slot 4 - signing root 1
- validator 1 - slot 5 - signing root 1

Then, if a second batch of blocks is sent with:
- validator 1 - slot 4 - signing root 2

Because we have two blocks proposed by the same validator (1) and for
the same slot (4), but with two different signing roots (1 and 2), the
validator 1 should be slashed.

This is not the case before this commit.
A new test case has been added as well to check this.

Fixes #13551

* `params.go`: Change comments. - NFC

* `CheckSlashable`: Keep the happy path without indentation.

* `detectAllAttesterSlashings` => `checkSurrounds`.

* Update beacon-chain/db/slasherkv/slasher.go

Co-authored-by: Sammy Rosso <15244892+saolyn@users.noreply.github.com>

* Update beacon-chain/db/slasherkv/slasher.go

Co-authored-by: Sammy Rosso <15244892+saolyn@users.noreply.github.com>

* `CheckAttesterDoubleVotes`: Keep happy path without indentation.

Well, even if, in our case, "happy path" mean slashing.

* 'SaveAttestationRecordsForValidators': Save the first attestation.

In case of multiple votes, arbitrarily save the first attestation.
Saving the first one in particular has no functional impact,
since in any case all attestations will be tested against
the content of the database. So all but the first one will be
detected as slashable.

However, saving the first one and not an other one let us not
to modify the end to end tests, since they expect the first one
to be saved in the database.

* Rename `min` => `minimum`.

Not to conflict with the new `min` built-in function.

* `couldNotSaveSlashableAtt` ==> `couldNotCheckSlashableAtt`

---------

Co-authored-by: Sammy Rosso <15244892+saolyn@users.noreply.github.com>
2024-01-31 09:49:14 +00:00

248 lines
8.1 KiB
Go

package slasher
import (
"context"
"time"
"github.com/pkg/errors"
slashertypes "github.com/prysmaticlabs/prysm/v4/beacon-chain/slasher/types"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v4/time/slots"
"github.com/sirupsen/logrus"
)
const (
couldNotSaveAttRecord = "Could not save attestation records to DB"
couldNotCheckSlashableAtt = "Could not check slashable attestations"
couldNotProcessAttesterSlashings = "Could not process attester slashings"
)
// Receive indexed attestations from some source event feed,
// validating their integrity before appending them to an attestation queue
// for batch processing in a separate routine.
func (s *Service) receiveAttestations(ctx context.Context, indexedAttsChan chan *ethpb.IndexedAttestation) {
defer s.wg.Done()
sub := s.serviceCfg.IndexedAttestationsFeed.Subscribe(indexedAttsChan)
defer sub.Unsubscribe()
for {
select {
case att := <-indexedAttsChan:
if !validateAttestationIntegrity(att) {
continue
}
signingRoot, err := att.Data.HashTreeRoot()
if err != nil {
log.WithError(err).Error("Could not get hash tree root of attestation")
continue
}
attWrapper := &slashertypes.IndexedAttestationWrapper{
IndexedAttestation: att,
SigningRoot: signingRoot,
}
s.attsQueue.push(attWrapper)
case err := <-sub.Err():
log.WithError(err).Debug("Subscriber closed with error")
return
case <-ctx.Done():
return
}
}
}
// Receive beacon blocks from some source event feed,
func (s *Service) receiveBlocks(ctx context.Context, beaconBlockHeadersChan chan *ethpb.SignedBeaconBlockHeader) {
defer s.wg.Done()
sub := s.serviceCfg.BeaconBlockHeadersFeed.Subscribe(beaconBlockHeadersChan)
defer sub.Unsubscribe()
for {
select {
case blockHeader := <-beaconBlockHeadersChan:
if !validateBlockHeaderIntegrity(blockHeader) {
continue
}
signingRoot, err := blockHeader.Header.HashTreeRoot()
if err != nil {
log.WithError(err).Error("Could not get hash tree root of signed block header")
continue
}
wrappedProposal := &slashertypes.SignedBlockHeaderWrapper{
SignedBeaconBlockHeader: blockHeader,
SigningRoot: signingRoot,
}
s.blksQueue.push(wrappedProposal)
case err := <-sub.Err():
log.WithError(err).Debug("Subscriber closed with error")
return
case <-ctx.Done():
return
}
}
}
// Process queued attestations every time a slot ticker fires. We retrieve
// these attestations from a queue, then group them all by validator chunk index.
// This grouping will allow us to perform detection on batches of attestations
// per validator chunk index which can be done concurrently.
func (s *Service) processQueuedAttestations(ctx context.Context, slotTicker <-chan primitives.Slot) {
defer s.wg.Done()
for {
select {
case currentSlot := <-slotTicker:
attestations := s.attsQueue.dequeue()
currentEpoch := slots.ToEpoch(currentSlot)
// We take all the attestations in the queue and filter out
// those which are valid now and valid in the future.
validAtts, validInFuture, numDropped := s.filterAttestations(attestations, currentEpoch)
deferredAttestationsTotal.Add(float64(len(validInFuture)))
droppedAttestationsTotal.Add(float64(numDropped))
// We add back those attestations that are valid in the future to the queue.
s.attsQueue.extend(validInFuture)
log.WithFields(logrus.Fields{
"currentSlot": currentSlot,
"currentEpoch": currentEpoch,
"numValidAtts": len(validAtts),
"numDeferredAtts": len(validInFuture),
"numDroppedAtts": numDropped,
}).Info("Processing queued attestations for slashing detection")
// Save the attestation records to our database.
// If multiple attestations are provided for the same validator index + target epoch combination,
// then last (validator index + target epoch) => signing root) link is kept into the database.
if err := s.serviceCfg.Database.SaveAttestationRecordsForValidators(
ctx, validAtts,
); err != nil {
log.WithError(err).Error(couldNotSaveAttRecord)
continue
}
// Check for slashings.
slashings, err := s.checkSlashableAttestations(ctx, currentEpoch, validAtts)
if err != nil {
log.WithError(err).Error(couldNotCheckSlashableAtt)
continue
}
// Process attester slashings by verifying their signatures, submitting
// to the beacon node's operations pool, and logging them.
if err := s.processAttesterSlashings(ctx, slashings); err != nil {
log.WithError(err).Error(couldNotProcessAttesterSlashings)
continue
}
processedAttestationsTotal.Add(float64(len(validAtts)))
case <-ctx.Done():
return
}
}
}
// Process queued blocks every time an epoch ticker fires. We retrieve
// these blocks from a queue, then perform double proposal detection.
func (s *Service) processQueuedBlocks(ctx context.Context, slotTicker <-chan primitives.Slot) {
defer s.wg.Done()
for {
select {
case currentSlot := <-slotTicker:
blocks := s.blksQueue.dequeue()
currentEpoch := slots.ToEpoch(currentSlot)
receivedBlocksTotal.Add(float64(len(blocks)))
log.WithFields(logrus.Fields{
"currentSlot": currentSlot,
"currentEpoch": currentEpoch,
"numBlocks": len(blocks),
}).Info("Processing queued blocks for slashing detection")
start := time.Now()
// Check for slashings.
slashings, err := s.detectProposerSlashings(ctx, blocks)
if err != nil {
log.WithError(err).Error("Could not detect proposer slashings")
continue
}
// Process proposer slashings by verifying their signatures, submitting
// to the beacon node's operations pool, and logging them.
if err := s.processProposerSlashings(ctx, slashings); err != nil {
log.WithError(err).Error("Could not process proposer slashings")
continue
}
log.WithField("elapsed", time.Since(start)).Debug("Done checking slashable blocks")
processedBlocksTotal.Add(float64(len(blocks)))
case <-ctx.Done():
return
}
}
}
// Prunes slasher data on each slot tick to prevent unnecessary build-up of disk space usage.
func (s *Service) pruneSlasherData(ctx context.Context, slotTicker <-chan primitives.Slot) {
defer s.wg.Done()
for {
select {
case <-slotTicker:
headEpoch := slots.ToEpoch(s.serviceCfg.HeadStateFetcher.HeadSlot())
if err := s.pruneSlasherDataWithinSlidingWindow(ctx, headEpoch); err != nil {
log.WithError(err).Error("Could not prune slasher data")
continue
}
case <-ctx.Done():
return
}
}
}
// Prunes slasher data by using a sliding window of [current_epoch - HISTORY_LENGTH, current_epoch].
// All data before that window is unnecessary for slasher, so can be periodically deleted.
// Say HISTORY_LENGTH is 4 and we have data for epochs 0, 1, 2, 3. Once we hit epoch 4, the sliding window
// we care about is 1, 2, 3, 4, so we can delete data for epoch 0.
func (s *Service) pruneSlasherDataWithinSlidingWindow(ctx context.Context, currentEpoch primitives.Epoch) error {
var maxPruningEpoch primitives.Epoch
if currentEpoch >= s.params.historyLength {
maxPruningEpoch = currentEpoch - s.params.historyLength
} else {
// If the current epoch is less than the history length, we should not
// attempt to prune at all.
return nil
}
start := time.Now()
log.WithFields(logrus.Fields{
"currentEpoch": currentEpoch,
"pruningAllBeforeEpoch": maxPruningEpoch,
}).Info("Pruning old attestations and proposals for slasher")
numPrunedAtts, err := s.serviceCfg.Database.PruneAttestationsAtEpoch(
ctx, maxPruningEpoch,
)
if err != nil {
return errors.Wrap(err, "Could not prune attestations")
}
numPrunedProposals, err := s.serviceCfg.Database.PruneProposalsAtEpoch(
ctx, maxPruningEpoch,
)
if err != nil {
return errors.Wrap(err, "Could not prune proposals")
}
fields := logrus.Fields{}
if numPrunedAtts > 0 {
fields["numPrunedAtts"] = numPrunedAtts
}
if numPrunedProposals > 0 {
fields["numPrunedProposals"] = numPrunedProposals
}
fields["elapsed"] = time.Since(start)
log.WithFields(fields).Info("Done pruning old attestations and proposals for slasher")
return nil
}