prysm-pulse/beacon-chain/slasher/service.go

230 lines
7.9 KiB
Go
Raw Normal View History

// Package slasher implements slashing detection for eth2, able to catch slashable attestations
// and proposals that it receives via two event feeds, respectively. Any found slashings
// are then submitted to the beacon node's slashing operations pool. See the design document
// here https://hackmd.io/@prysmaticlabs/slasher.
package slasher
import (
"context"
"time"
types "github.com/prysmaticlabs/eth2-types"
"github.com/prysmaticlabs/prysm/async/event"
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
"github.com/prysmaticlabs/prysm/beacon-chain/operations/slashings"
"github.com/prysmaticlabs/prysm/beacon-chain/state/stategen"
"github.com/prysmaticlabs/prysm/beacon-chain/sync"
"github.com/prysmaticlabs/prysm/config/params"
ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/time/slots"
)
const (
shutdownTimeout = time.Minute * 5
)
// ServiceConfig for the slasher service in the beacon node.
// This struct allows us to specify required dependencies and
// parameters for slasher to function as needed.
type ServiceConfig struct {
IndexedAttestationsFeed *event.Feed
BeaconBlockHeadersFeed *event.Feed
Database db.SlasherDatabase
StateNotifier statefeed.Notifier
AttestationStateFetcher blockchain.AttestationStateFetcher
StateGen stategen.StateManager
SlashingPoolInserter slashings.PoolInserter
HeadStateFetcher blockchain.HeadFetcher
SyncChecker sync.Checker
}
// SlashingChecker is an interface for defining services that the beacon node may interact with to provide slashing data.
type SlashingChecker interface {
IsSlashableBlock(ctx context.Context, proposal *ethpb.SignedBeaconBlockHeader) (*ethpb.ProposerSlashing, error)
IsSlashableAttestation(ctx context.Context, attestation *ethpb.IndexedAttestation) ([]*ethpb.AttesterSlashing, error)
HighestAttestations(
ctx context.Context, indices []types.ValidatorIndex,
) ([]*ethpb.HighestAttestation, error)
}
// Service defining a slasher implementation as part of
// the beacon node, able to detect eth2 slashable offenses.
type Service struct {
params *Parameters
serviceCfg *ServiceConfig
indexedAttsChan chan *ethpb.IndexedAttestation
beaconBlockHeadersChan chan *ethpb.SignedBeaconBlockHeader
attsQueue *attestationsQueue
blksQueue *blocksQueue
ctx context.Context
cancel context.CancelFunc
genesisTime time.Time
attsSlotTicker *slots.SlotTicker
blocksSlotTicker *slots.SlotTicker
pruningSlotTicker *slots.SlotTicker
latestEpochWrittenForValidator map[types.ValidatorIndex]types.Epoch
}
// New instantiates a new slasher from configuration values.
func New(ctx context.Context, srvCfg *ServiceConfig) (*Service, error) {
ctx, cancel := context.WithCancel(ctx)
return &Service{
params: DefaultParams(),
serviceCfg: srvCfg,
indexedAttsChan: make(chan *ethpb.IndexedAttestation, 1),
beaconBlockHeadersChan: make(chan *ethpb.SignedBeaconBlockHeader, 1),
attsQueue: newAttestationsQueue(),
blksQueue: newBlocksQueue(),
ctx: ctx,
cancel: cancel,
latestEpochWrittenForValidator: make(map[types.ValidatorIndex]types.Epoch),
}, nil
}
// Start listening for received indexed attestations and blocks
// and perform slashing detection on them.
func (s *Service) Start() {
go s.run() // Start functions must be non-blocking.
}
func (s *Service) run() {
s.waitForChainInitialization()
s.waitForSync(s.genesisTime)
log.Info("Completed chain sync, starting slashing detection")
// Get the latest eopch written for each validator from disk on startup.
headState, err := s.serviceCfg.HeadStateFetcher.HeadState(s.ctx)
if err != nil {
log.WithError(err).Error("Failed to fetch head state")
return
}
numVals := headState.NumValidators()
validatorIndices := make([]types.ValidatorIndex, numVals)
for i := 0; i < numVals; i++ {
validatorIndices[i] = types.ValidatorIndex(i)
}
start := time.Now()
log.Info("Reading last epoch written for each validator...")
epochsByValidator, err := s.serviceCfg.Database.LastEpochWrittenForValidators(
s.ctx, validatorIndices,
)
if err != nil {
log.Error(err)
return
}
for _, item := range epochsByValidator {
s.latestEpochWrittenForValidator[item.ValidatorIndex] = item.Epoch
}
log.WithField("elapsed", time.Since(start)).Info(
"Finished retrieving last epoch written per validator",
)
indexedAttsChan := make(chan *ethpb.IndexedAttestation, 1)
beaconBlockHeadersChan := make(chan *ethpb.SignedBeaconBlockHeader, 1)
go s.receiveAttestations(s.ctx, indexedAttsChan)
go s.receiveBlocks(s.ctx, beaconBlockHeadersChan)
secondsPerSlot := params.BeaconConfig().SecondsPerSlot
s.attsSlotTicker = slots.NewSlotTicker(s.genesisTime, secondsPerSlot)
s.blocksSlotTicker = slots.NewSlotTicker(s.genesisTime, secondsPerSlot)
s.pruningSlotTicker = slots.NewSlotTicker(s.genesisTime, secondsPerSlot)
go s.processQueuedAttestations(s.ctx, s.attsSlotTicker.C())
go s.processQueuedBlocks(s.ctx, s.blocksSlotTicker.C())
go s.pruneSlasherData(s.ctx, s.pruningSlotTicker.C())
}
// Stop the slasher service.
func (s *Service) Stop() error {
s.cancel()
if s.attsSlotTicker != nil {
s.attsSlotTicker.Done()
}
if s.blocksSlotTicker != nil {
s.blocksSlotTicker.Done()
}
if s.pruningSlotTicker != nil {
s.pruningSlotTicker.Done()
}
// Flush the latest epoch written map to disk.
start := time.Now()
// New context as the service context has already been canceled.
ctx, innerCancel := context.WithTimeout(context.Background(), shutdownTimeout)
defer innerCancel()
log.Info("Flushing last epoch written for each validator to disk, please wait")
if err := s.serviceCfg.Database.SaveLastEpochsWrittenForValidators(
ctx, s.latestEpochWrittenForValidator,
); err != nil {
log.Error(err)
}
log.WithField("elapsed", time.Since(start)).Debug(
"Finished saving last epoch written per validator",
)
return nil
}
// Status of the slasher service.
func (_ *Service) Status() error {
return nil
}
func (s *Service) waitForChainInitialization() {
stateChannel := make(chan *feed.Event, 1)
stateSub := s.serviceCfg.StateNotifier.StateFeed().Subscribe(stateChannel)
defer stateSub.Unsubscribe()
defer close(stateChannel)
for {
select {
case stateEvent := <-stateChannel:
// Wait for us to receive the genesis time via a chain started notification.
if stateEvent.Type == statefeed.Initialized {
// Alternatively, if the chain has already started, we then read the genesis
// time value from this data.
data, ok := stateEvent.Data.(*statefeed.InitializedData)
if !ok {
log.Error(
"Could not receive chain start notification, want *statefeed.ChainStartedData",
)
return
}
s.genesisTime = data.StartTime
log.WithField("genesisTime", s.genesisTime).Info(
"Slasher received chain initialization event",
)
return
}
case err := <-stateSub.Err():
log.WithError(err).Error(
"Slasher could not subscribe to state events",
)
return
case <-s.ctx.Done():
return
}
}
}
func (s *Service) waitForSync(genesisTime time.Time) {
if slots.SinceGenesis(genesisTime) < params.BeaconConfig().SlotsPerEpoch || !s.serviceCfg.SyncChecker.Syncing() {
return
}
slotTicker := slots.NewSlotTicker(s.genesisTime, params.BeaconConfig().SecondsPerSlot)
defer slotTicker.Done()
for {
select {
case <-slotTicker.C():
// If node is still syncing, do not operate slasher.
if s.serviceCfg.SyncChecker.Syncing() {
continue
}
return
case <-s.ctx.Done():
return
}
}
}