prysm-pulse/beacon-chain/slasher/service.go
Raul Jordan 9edba29f64
Slasher Simulator Code for Testing Optimized Slasher Behavior (#9695)
* slashing simulator

* add in necessary items for slasher sim

* sim item

* fix up

* fixed build

* rev

* slasher sim in testing

* testonly

* gaz

* gaz

* fix viz

Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
Co-authored-by: prestonvanloon <preston@prysmaticlabs.com>
2021-09-29 02:27:21 +00:00

158 lines
5.6 KiB
Go

// Package slasher implements slashing detection for eth2, able to catch slashable attestations
// and proposals that it receives via two event feeds, respectively. Any found slashings
// are then submitted to the beacon node's slashing operations pool. See the design document
// here https://hackmd.io/@prysmaticlabs/slasher.
package slasher
import (
"context"
"time"
"github.com/prysmaticlabs/prysm/async/event"
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
"github.com/prysmaticlabs/prysm/beacon-chain/operations/slashings"
"github.com/prysmaticlabs/prysm/beacon-chain/state/stategen"
"github.com/prysmaticlabs/prysm/beacon-chain/sync"
"github.com/prysmaticlabs/prysm/config/params"
ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/time/slots"
)
// ServiceConfig for the slasher service in the beacon node.
// This struct allows us to specify required dependencies and
// parameters for slasher to function as needed.
type ServiceConfig struct {
IndexedAttestationsFeed *event.Feed
BeaconBlockHeadersFeed *event.Feed
Database db.SlasherDatabase
StateNotifier statefeed.Notifier
AttestationStateFetcher blockchain.AttestationStateFetcher
StateGen stategen.StateManager
SlashingPoolInserter slashings.PoolInserter
HeadStateFetcher blockchain.HeadFetcher
SyncChecker sync.Checker
}
// SlashingChecker is an interface for defining services that the beacon node may interact with to provide slashing data.
type SlashingChecker interface {
IsSlashableBlock(ctx context.Context, proposal *ethpb.SignedBeaconBlockHeader) (*ethpb.ProposerSlashing, error)
IsSlashableAttestation(ctx context.Context, attestation *ethpb.IndexedAttestation) ([]*ethpb.AttesterSlashing, error)
}
// Service defining a slasher implementation as part of
// the beacon node, able to detect eth2 slashable offenses.
type Service struct {
params *Parameters
serviceCfg *ServiceConfig
indexedAttsChan chan *ethpb.IndexedAttestation
beaconBlockHeadersChan chan *ethpb.SignedBeaconBlockHeader
attsQueue *attestationsQueue
blksQueue *blocksQueue
ctx context.Context
cancel context.CancelFunc
slotTicker *slots.SlotTicker
genesisTime time.Time
}
// New instantiates a new slasher from configuration values.
func New(ctx context.Context, srvCfg *ServiceConfig) (*Service, error) {
ctx, cancel := context.WithCancel(ctx)
return &Service{
params: DefaultParams(),
serviceCfg: srvCfg,
indexedAttsChan: make(chan *ethpb.IndexedAttestation, 1),
beaconBlockHeadersChan: make(chan *ethpb.SignedBeaconBlockHeader, 1),
attsQueue: newAttestationsQueue(),
blksQueue: newBlocksQueue(),
ctx: ctx,
cancel: cancel,
}, nil
}
// Start listening for received indexed attestations and blocks
// and perform slashing detection on them.
func (s *Service) Start() {
go s.run()
}
func (s *Service) run() {
stateChannel := make(chan *feed.Event, 1)
stateSub := s.serviceCfg.StateNotifier.StateFeed().Subscribe(stateChannel)
stateEvent := <-stateChannel
// Wait for us to receive the genesis time via a chain started notification.
if stateEvent.Type == statefeed.ChainStarted {
data, ok := stateEvent.Data.(*statefeed.ChainStartedData)
if !ok {
log.Error("Could not receive chain start notification, want *statefeed.ChainStartedData")
return
}
s.genesisTime = data.StartTime
log.WithField("genesisTime", s.genesisTime).Info("Starting slasher, received chain start event")
} else if stateEvent.Type == statefeed.Initialized {
// Alternatively, if the chain has already started, we then read the genesis
// time value from this data.
data, ok := stateEvent.Data.(*statefeed.InitializedData)
if !ok {
log.Error("Could not receive chain start notification, want *statefeed.ChainStartedData")
return
}
s.genesisTime = data.StartTime
log.WithField("genesisTime", s.genesisTime).Info("Starting slasher, chain already initialized")
} else {
// This should not happen.
log.Error("Could start slasher, could not receive chain start event")
return
}
stateSub.Unsubscribe()
secondsPerSlot := params.BeaconConfig().SecondsPerSlot
s.slotTicker = slots.NewSlotTicker(s.genesisTime, secondsPerSlot)
s.waitForSync(s.genesisTime)
indexedAttsChan := make(chan *ethpb.IndexedAttestation, 1)
beaconBlockHeadersChan := make(chan *ethpb.SignedBeaconBlockHeader, 1)
log.Info("Completed chain sync, starting slashing detection")
go s.processQueuedAttestations(s.ctx, s.slotTicker.C())
go s.processQueuedBlocks(s.ctx, s.slotTicker.C())
go s.receiveAttestations(s.ctx, indexedAttsChan)
go s.receiveBlocks(s.ctx, beaconBlockHeadersChan)
go s.pruneSlasherData(s.ctx, s.slotTicker.C())
}
// Stop the slasher service.
func (s *Service) Stop() error {
s.cancel()
if s.slotTicker != nil {
s.slotTicker.Done()
}
return nil
}
// Status of the slasher service.
func (s *Service) Status() error {
return nil
}
func (s *Service) waitForSync(genesisTime time.Time) {
if slots.SlotsSinceGenesis(genesisTime) == 0 || !s.serviceCfg.SyncChecker.Syncing() {
return
}
for {
select {
case <-s.slotTicker.C():
// If node is still syncing, do not operate slasher.
if s.serviceCfg.SyncChecker.Syncing() {
continue
}
return
case <-s.ctx.Done():
return
}
}
}