mirror of
https://gitlab.com/pulsechaincom/prysm-pulse.git
synced 2025-01-07 02:02:18 +00:00
174 lines
6.2 KiB
Go
174 lines
6.2 KiB
Go
// Package slasher implements slashing detection for eth2, able to catch slashable attestations
|
|
// and proposals that it receives via two event feeds, respectively. Any found slashings
|
|
// are then submitted to the beacon node's slashing operations pool. See the design document
|
|
// here https://hackmd.io/@prysmaticlabs/slasher.
|
|
package slasher
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
types "github.com/prysmaticlabs/eth2-types"
|
|
"github.com/prysmaticlabs/prysm/async/event"
|
|
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
|
|
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
|
|
statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state"
|
|
"github.com/prysmaticlabs/prysm/beacon-chain/db"
|
|
"github.com/prysmaticlabs/prysm/beacon-chain/operations/slashings"
|
|
"github.com/prysmaticlabs/prysm/beacon-chain/state/stategen"
|
|
"github.com/prysmaticlabs/prysm/beacon-chain/sync"
|
|
"github.com/prysmaticlabs/prysm/config/params"
|
|
ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
|
|
"github.com/prysmaticlabs/prysm/time/slots"
|
|
)
|
|
|
|
// ServiceConfig for the slasher service in the beacon node.
|
|
// This struct allows us to specify required dependencies and
|
|
// parameters for slasher to function as needed.
|
|
type ServiceConfig struct {
|
|
IndexedAttestationsFeed *event.Feed
|
|
BeaconBlockHeadersFeed *event.Feed
|
|
Database db.SlasherDatabase
|
|
StateNotifier statefeed.Notifier
|
|
AttestationStateFetcher blockchain.AttestationStateFetcher
|
|
StateGen stategen.StateManager
|
|
SlashingPoolInserter slashings.PoolInserter
|
|
HeadStateFetcher blockchain.HeadFetcher
|
|
SyncChecker sync.Checker
|
|
}
|
|
|
|
// SlashingChecker is an interface for defining services that the beacon node may interact with to provide slashing data.
|
|
type SlashingChecker interface {
|
|
IsSlashableBlock(ctx context.Context, proposal *ethpb.SignedBeaconBlockHeader) (*ethpb.ProposerSlashing, error)
|
|
IsSlashableAttestation(ctx context.Context, attestation *ethpb.IndexedAttestation) ([]*ethpb.AttesterSlashing, error)
|
|
HighestAttestations(
|
|
ctx context.Context, indices []types.ValidatorIndex,
|
|
) ([]*ethpb.HighestAttestation, error)
|
|
}
|
|
|
|
// Service defining a slasher implementation as part of
|
|
// the beacon node, able to detect eth2 slashable offenses.
|
|
type Service struct {
|
|
params *Parameters
|
|
serviceCfg *ServiceConfig
|
|
indexedAttsChan chan *ethpb.IndexedAttestation
|
|
beaconBlockHeadersChan chan *ethpb.SignedBeaconBlockHeader
|
|
attsQueue *attestationsQueue
|
|
blksQueue *blocksQueue
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
genesisTime time.Time
|
|
attsSlotTicker *slots.SlotTicker
|
|
blocksSlotTicker *slots.SlotTicker
|
|
pruningSlotTicker *slots.SlotTicker
|
|
}
|
|
|
|
// New instantiates a new slasher from configuration values.
|
|
func New(ctx context.Context, srvCfg *ServiceConfig) (*Service, error) {
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
return &Service{
|
|
params: DefaultParams(),
|
|
serviceCfg: srvCfg,
|
|
indexedAttsChan: make(chan *ethpb.IndexedAttestation, 1),
|
|
beaconBlockHeadersChan: make(chan *ethpb.SignedBeaconBlockHeader, 1),
|
|
attsQueue: newAttestationsQueue(),
|
|
blksQueue: newBlocksQueue(),
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
}, nil
|
|
}
|
|
|
|
// Start listening for received indexed attestations and blocks
|
|
// and perform slashing detection on them.
|
|
func (s *Service) Start() {
|
|
go s.run()
|
|
}
|
|
|
|
func (s *Service) run() {
|
|
stateChannel := make(chan *feed.Event, 1)
|
|
stateSub := s.serviceCfg.StateNotifier.StateFeed().Subscribe(stateChannel)
|
|
stateEvent := <-stateChannel
|
|
|
|
// Wait for us to receive the genesis time via a chain started notification.
|
|
if stateEvent.Type == statefeed.ChainStarted {
|
|
data, ok := stateEvent.Data.(*statefeed.ChainStartedData)
|
|
if !ok {
|
|
log.Error("Could not receive chain start notification, want *statefeed.ChainStartedData")
|
|
return
|
|
}
|
|
s.genesisTime = data.StartTime
|
|
log.WithField("genesisTime", s.genesisTime).Info("Starting slasher, received chain start event")
|
|
} else if stateEvent.Type == statefeed.Initialized {
|
|
// Alternatively, if the chain has already started, we then read the genesis
|
|
// time value from this data.
|
|
data, ok := stateEvent.Data.(*statefeed.InitializedData)
|
|
if !ok {
|
|
log.Error("Could not receive chain start notification, want *statefeed.ChainStartedData")
|
|
return
|
|
}
|
|
s.genesisTime = data.StartTime
|
|
log.WithField("genesisTime", s.genesisTime).Info("Starting slasher, chain already initialized")
|
|
} else {
|
|
// This should not happen.
|
|
log.Error("Could start slasher, could not receive chain start event")
|
|
return
|
|
}
|
|
|
|
stateSub.Unsubscribe()
|
|
s.waitForSync(s.genesisTime)
|
|
|
|
indexedAttsChan := make(chan *ethpb.IndexedAttestation, 1)
|
|
beaconBlockHeadersChan := make(chan *ethpb.SignedBeaconBlockHeader, 1)
|
|
log.Info("Completed chain sync, starting slashing detection")
|
|
go s.receiveAttestations(s.ctx, indexedAttsChan)
|
|
go s.receiveBlocks(s.ctx, beaconBlockHeadersChan)
|
|
|
|
secondsPerSlot := params.BeaconConfig().SecondsPerSlot
|
|
s.attsSlotTicker = slots.NewSlotTicker(s.genesisTime, secondsPerSlot)
|
|
s.blocksSlotTicker = slots.NewSlotTicker(s.genesisTime, secondsPerSlot)
|
|
s.pruningSlotTicker = slots.NewSlotTicker(s.genesisTime, secondsPerSlot)
|
|
go s.processQueuedAttestations(s.ctx, s.attsSlotTicker.C())
|
|
go s.processQueuedBlocks(s.ctx, s.blocksSlotTicker.C())
|
|
go s.pruneSlasherData(s.ctx, s.pruningSlotTicker.C())
|
|
}
|
|
|
|
// Stop the slasher service.
|
|
func (s *Service) Stop() error {
|
|
s.cancel()
|
|
if s.attsSlotTicker != nil {
|
|
s.attsSlotTicker.Done()
|
|
}
|
|
if s.blocksSlotTicker != nil {
|
|
s.blocksSlotTicker.Done()
|
|
}
|
|
if s.pruningSlotTicker != nil {
|
|
s.pruningSlotTicker.Done()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Status of the slasher service.
|
|
func (s *Service) Status() error {
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) waitForSync(genesisTime time.Time) {
|
|
if slots.SinceGenesis(genesisTime) == 0 || !s.serviceCfg.SyncChecker.Syncing() {
|
|
return
|
|
}
|
|
slotTicker := slots.NewSlotTicker(s.genesisTime, params.BeaconConfig().SecondsPerSlot)
|
|
defer slotTicker.Done()
|
|
for {
|
|
select {
|
|
case <-slotTicker.C():
|
|
// If node is still syncing, do not operate slasher.
|
|
if s.serviceCfg.SyncChecker.Syncing() {
|
|
continue
|
|
}
|
|
return
|
|
case <-s.ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|