mirror of
https://gitlab.com/pulsechaincom/prysm-pulse.git
synced 2025-01-17 15:28:45 +00:00
a8e501b3cf
* update deps * update deps * update protos/* * update deps * reset protos * update protos * update shared/params/config * update protos * update /shared * update shared/slotutil and shared/testutil * update beacon-chain/core/helpers * updates beacon-chain/state * update beacon-chain/forkchoice * update beacon-chain/blockchain * update beacon-chain/cache * update beacon-chain/core * update beacon-chain/db * update beacon-chain/node * update beacon-chain/p2p * update beacon-chain/rpc * update beacon-chain/sync * go mod tidy * make sure that beacon-chain build suceeds * go fmt * update e2e tests * update slasher * remove redundant alias * update validator * gazelle * fix build errors in unit tests * go fmt * update deps * update fuzz/BUILD.bazel * fix unit tests * more unit test fixes * fix blockchain UTs * more unit test fixes
244 lines
8.3 KiB
Go
244 lines
8.3 KiB
Go
package detection
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
|
|
"github.com/prysmaticlabs/eth2-types"
|
|
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
|
|
"github.com/prysmaticlabs/prysm/shared/event"
|
|
"github.com/prysmaticlabs/prysm/slasher/beaconclient"
|
|
"github.com/prysmaticlabs/prysm/slasher/db"
|
|
"github.com/prysmaticlabs/prysm/slasher/detection/attestations"
|
|
"github.com/prysmaticlabs/prysm/slasher/detection/attestations/iface"
|
|
"github.com/prysmaticlabs/prysm/slasher/detection/proposals"
|
|
proposerIface "github.com/prysmaticlabs/prysm/slasher/detection/proposals/iface"
|
|
"github.com/sirupsen/logrus"
|
|
"go.opencensus.io/trace"
|
|
)
|
|
|
|
// Status detection statuses type.
|
|
type Status int
|
|
|
|
const (
|
|
// None slasher was not initialised.
|
|
None Status = iota
|
|
// Started service start has been called,
|
|
Started
|
|
// Syncing beacon client is still syncing.
|
|
Syncing
|
|
// HistoricalDetection slasher is replaying all attestations that
|
|
// were included in the canonical chain.
|
|
HistoricalDetection
|
|
// Ready slasher is ready to detect requests.
|
|
Ready
|
|
)
|
|
|
|
// String returns the string value of the status
|
|
func (s Status) String() string {
|
|
strings := [...]string{"None", "Started", "Syncing", "HistoricalDetection", "Ready"}
|
|
|
|
// prevent panicking in case of status is out-of-range
|
|
if s < None || s > Ready {
|
|
return "Unknown"
|
|
}
|
|
|
|
return strings[s]
|
|
}
|
|
|
|
// Service struct for the detection service of the slasher.
|
|
type Service struct {
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
slasherDB db.Database
|
|
blocksChan chan *ethpb.SignedBeaconBlock
|
|
attsChan chan *ethpb.IndexedAttestation
|
|
notifier beaconclient.Notifier
|
|
chainFetcher beaconclient.ChainFetcher
|
|
beaconClient *beaconclient.Service
|
|
attesterSlashingsFeed *event.Feed
|
|
proposerSlashingsFeed *event.Feed
|
|
minMaxSpanDetector iface.SpanDetector
|
|
proposalsDetector proposerIface.ProposalsDetector
|
|
historicalDetection bool
|
|
status Status
|
|
}
|
|
|
|
// Config options for the detection service.
|
|
type Config struct {
|
|
Notifier beaconclient.Notifier
|
|
SlasherDB db.Database
|
|
ChainFetcher beaconclient.ChainFetcher
|
|
BeaconClient *beaconclient.Service
|
|
AttesterSlashingsFeed *event.Feed
|
|
ProposerSlashingsFeed *event.Feed
|
|
HistoricalDetection bool
|
|
}
|
|
|
|
// New instantiates a new service.
|
|
func New(ctx context.Context, cfg *Config) *Service {
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
return &Service{
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
notifier: cfg.Notifier,
|
|
chainFetcher: cfg.ChainFetcher,
|
|
slasherDB: cfg.SlasherDB,
|
|
beaconClient: cfg.BeaconClient,
|
|
blocksChan: make(chan *ethpb.SignedBeaconBlock, 1),
|
|
attsChan: make(chan *ethpb.IndexedAttestation, 1),
|
|
attesterSlashingsFeed: cfg.AttesterSlashingsFeed,
|
|
proposerSlashingsFeed: cfg.ProposerSlashingsFeed,
|
|
minMaxSpanDetector: attestations.NewSpanDetector(cfg.SlasherDB),
|
|
proposalsDetector: proposals.NewProposeDetector(cfg.SlasherDB),
|
|
historicalDetection: cfg.HistoricalDetection,
|
|
status: None,
|
|
}
|
|
}
|
|
|
|
// Stop the notifier service.
|
|
func (s *Service) Stop() error {
|
|
s.cancel()
|
|
log.Info("Stopping service")
|
|
return nil
|
|
}
|
|
|
|
// Status returns an error if detection service is not ready yet.
|
|
func (s *Service) Status() error {
|
|
if s.status == Ready {
|
|
return nil
|
|
}
|
|
return errors.New(s.status.String())
|
|
}
|
|
|
|
// Start the detection service runtime.
|
|
func (s *Service) Start() {
|
|
// We wait for the gRPC beacon client to be ready and the beacon node
|
|
// to be fully synced before proceeding.
|
|
s.status = Started
|
|
ch := make(chan bool)
|
|
sub := s.notifier.ClientReadyFeed().Subscribe(ch)
|
|
s.status = Syncing
|
|
<-ch
|
|
sub.Unsubscribe()
|
|
|
|
if s.historicalDetection {
|
|
// The detection service runs detection on all historical
|
|
// chain data since genesis.
|
|
s.status = HistoricalDetection
|
|
s.detectHistoricalChainData(s.ctx)
|
|
}
|
|
s.status = Ready
|
|
// We listen to a stream of blocks and attestations from the beacon node.
|
|
go s.beaconClient.ReceiveBlocks(s.ctx)
|
|
go s.beaconClient.ReceiveAttestations(s.ctx)
|
|
// We subscribe to incoming blocks from the beacon node via
|
|
// our gRPC client to keep detecting slashable offenses.
|
|
go s.detectIncomingBlocks(s.ctx, s.blocksChan)
|
|
go s.detectIncomingAttestations(s.ctx, s.attsChan)
|
|
}
|
|
|
|
func (s *Service) detectHistoricalChainData(ctx context.Context) {
|
|
ctx, span := trace.StartSpan(ctx, "detection.detectHistoricalChainData")
|
|
defer span.End()
|
|
// We fetch both the latest persisted chain head in our DB as well
|
|
// as the current chain head from the beacon node via gRPC.
|
|
latestStoredHead, err := s.slasherDB.ChainHead(ctx)
|
|
if err != nil {
|
|
log.WithError(err).Error("Could not retrieve chain head from DB")
|
|
return
|
|
}
|
|
currentChainHead, err := s.chainFetcher.ChainHead(ctx)
|
|
if err != nil {
|
|
log.WithError(err).Error("Cannot retrieve chain head from beacon node")
|
|
return
|
|
}
|
|
var latestStoredEpoch types.Epoch
|
|
if latestStoredHead != nil {
|
|
latestStoredEpoch = latestStoredHead.HeadEpoch
|
|
}
|
|
log.Infof("Performing historical detection from epoch %d to %d", latestStoredEpoch, currentChainHead.HeadEpoch)
|
|
|
|
// We retrieve historical chain data from the last persisted chain head in the
|
|
// slasher DB up to the current beacon node's head epoch we retrieved via gRPC.
|
|
// If no data was persisted from previous sessions, we request data starting from
|
|
// the genesis epoch.
|
|
var storedEpoch types.Epoch
|
|
for epoch := latestStoredEpoch; epoch < currentChainHead.HeadEpoch; epoch++ {
|
|
if ctx.Err() != nil {
|
|
log.WithError(err).Errorf("Could not fetch attestations for epoch: %d", epoch)
|
|
return
|
|
}
|
|
indexedAtts, err := s.beaconClient.RequestHistoricalAttestations(ctx, epoch)
|
|
if err != nil {
|
|
log.WithError(err).Errorf("Could not fetch attestations for epoch: %d", epoch)
|
|
return
|
|
}
|
|
if err := s.slasherDB.SaveIndexedAttestations(ctx, indexedAtts); err != nil {
|
|
log.WithError(err).Error("could not save indexed attestations")
|
|
return
|
|
}
|
|
|
|
for _, att := range indexedAtts {
|
|
if ctx.Err() == context.Canceled {
|
|
log.WithError(ctx.Err()).Error("context has been canceled, ending detection")
|
|
return
|
|
}
|
|
slashings, err := s.DetectAttesterSlashings(ctx, att)
|
|
if err != nil {
|
|
log.WithError(err).Error("Could not detect attester slashings")
|
|
continue
|
|
}
|
|
if len(slashings) < 1 {
|
|
if err := s.minMaxSpanDetector.UpdateSpans(ctx, att); err != nil {
|
|
log.WithError(err).Error("Could not update spans")
|
|
}
|
|
}
|
|
s.submitAttesterSlashings(ctx, slashings)
|
|
|
|
if err := s.UpdateHighestAttestation(ctx, att); err != nil {
|
|
log.WithError(err).Errorf("Could not update highest attestation")
|
|
}
|
|
}
|
|
latestStoredHead = ðpb.ChainHead{HeadEpoch: epoch}
|
|
if err := s.slasherDB.SaveChainHead(ctx, latestStoredHead); err != nil {
|
|
log.WithError(err).Error("Could not persist chain head to disk")
|
|
}
|
|
storedEpoch = epoch
|
|
s.slasherDB.RemoveOldestFromCache(ctx)
|
|
if epoch == currentChainHead.HeadEpoch-1 {
|
|
currentChainHead, err = s.chainFetcher.ChainHead(ctx)
|
|
if err != nil {
|
|
log.WithError(err).Error("Cannot retrieve chain head from beacon node")
|
|
return
|
|
}
|
|
if epoch != currentChainHead.HeadEpoch-1 {
|
|
log.Infof("Continuing historical detection from epoch %d to %d", epoch, currentChainHead.HeadEpoch)
|
|
}
|
|
}
|
|
}
|
|
log.Infof("Completed slashing detection on historical chain data up to epoch %d", storedEpoch)
|
|
}
|
|
|
|
func (s *Service) submitAttesterSlashings(ctx context.Context, slashings []*ethpb.AttesterSlashing) {
|
|
ctx, span := trace.StartSpan(ctx, "detection.submitAttesterSlashings")
|
|
defer span.End()
|
|
for i := 0; i < len(slashings); i++ {
|
|
s.attesterSlashingsFeed.Send(slashings[i])
|
|
}
|
|
}
|
|
|
|
func (s *Service) submitProposerSlashing(ctx context.Context, slashing *ethpb.ProposerSlashing) {
|
|
ctx, span := trace.StartSpan(ctx, "detection.submitProposerSlashing")
|
|
defer span.End()
|
|
if slashing != nil && slashing.Header_1 != nil && slashing.Header_2 != nil {
|
|
log.WithFields(logrus.Fields{
|
|
"header1Slot": slashing.Header_1.Header.Slot,
|
|
"header2Slot": slashing.Header_2.Header.Slot,
|
|
"proposerIdxHeader1": slashing.Header_1.Header.ProposerIndex,
|
|
"proposerIdxHeader2": slashing.Header_2.Header.ProposerIndex,
|
|
}).Info("Found a proposer slashing! Submitting to beacon node")
|
|
s.proposerSlashingsFeed.Send(slashing)
|
|
}
|
|
}
|