prysm-pulse/beacon-chain/monitor/service.go
kasey 918129cf36
Replace statefeed Initialize (#12285)
* refactor initialization to blocking startup method

* require genesisSetter in blockchain, fix tests

* work-around gazelle weirdness

* fix dep gazelle ignores

* only call SetGenesis once

* fix typo

* validator test setup and fix to return right error

* move waitForChainStart to Start

* wire up sync Service.genesisWaiter

* fix p2p genesisWaiter plumbing

* remove extra clock type, integrate into genesis

and rename

* use time.Now when no Nower is specified

* remove unused ClockSetter

* simplify rpc context checking

* fix typo

* use clock everywhere in sync; [32]byte val root

* don't use DeepEqual to compare [32]byte and []byte

* don't use clock in init sync, not wired up yet

* use clock waiter in blockchain as well

* use cancelable contexts in tests with goroutines

* missed a reference to WithClockSetter

* Update beacon-chain/startup/genesis.go

Co-authored-by: Radosław Kapka <rkapka@wp.pl>

* Update beacon-chain/blockchain/service_test.go

Co-authored-by: Radosław Kapka <rkapka@wp.pl>

* more clear docs

* doc for NewClock

* move clock typedef to more logical file name

* adding documentation

* gaz

* fixes for capella

* reducing test raciness

* fix races in committee cache tests

* lint

* add tests on Duration slot math helper

* startup package test coverage

* fix bad merge

* set non-zero genesis time in tests that call Start

* happy deepsource, happy me-epsource

* replace Synced event with channel

* remove unused error

* remove accidental wip commit

* gaz!

* remove unused event constants

* remove sync statefeed subscription to fix deadlock

* remove state notifier

* fix build

---------

Co-authored-by: Kasey Kirkham <kasey@users.noreply.github.com>
Co-authored-by: Radosław Kapka <rkapka@wp.pl>
Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
Co-authored-by: nisdas <nishdas93@gmail.com>
2023-05-03 04:34:01 +00:00

296 lines
9.8 KiB
Go

package monitor
import (
"context"
"errors"
"sort"
"sync"
"github.com/prysmaticlabs/prysm/v4/async/event"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed/operation"
statefeed "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed/state"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state/stategen"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v4/time/slots"
"github.com/sirupsen/logrus"
)
// Error when the context is closed while waiting for sync.
var errContextClosedWhileWaiting = errors.New("context closed while waiting for beacon to sync to latest Head")
// ValidatorLatestPerformance keeps track of the latest participation of the validator
type ValidatorLatestPerformance struct {
attestedSlot primitives.Slot
inclusionSlot primitives.Slot
timelySource bool
timelyTarget bool
timelyHead bool
balance uint64
balanceChange int64
}
// ValidatorAggregatedPerformance keeps track of the accumulated performance of
// the tracked validator since start of monitor service.
type ValidatorAggregatedPerformance struct {
startEpoch primitives.Epoch
startBalance uint64
totalAttestedCount uint64
totalRequestedCount uint64
totalDistance uint64
totalCorrectSource uint64
totalCorrectTarget uint64
totalCorrectHead uint64
totalProposedCount uint64
totalAggregations uint64
totalSyncCommitteeContributions uint64
totalSyncCommitteeAggregations uint64
}
// ValidatorMonitorConfig contains the list of validator indices that the
// monitor service tracks, and the event feed notifier that the
// monitor needs to subscribe.
type ValidatorMonitorConfig struct {
StateNotifier statefeed.Notifier
AttestationNotifier operation.Notifier
HeadFetcher blockchain.HeadFetcher
StateGen stategen.StateManager
InitialSyncComplete chan struct{}
}
// Service is the main structure that tracks validators and reports logs and
// metrics of their performances throughout their lifetime.
type Service struct {
config *ValidatorMonitorConfig
ctx context.Context
cancel context.CancelFunc
isLogging bool
// Locks access to TrackedValidators, latestPerformance, aggregatedPerformance,
// trackedSyncedCommitteeIndices and lastSyncedEpoch
sync.RWMutex
TrackedValidators map[primitives.ValidatorIndex]bool
latestPerformance map[primitives.ValidatorIndex]ValidatorLatestPerformance
aggregatedPerformance map[primitives.ValidatorIndex]ValidatorAggregatedPerformance
trackedSyncCommitteeIndices map[primitives.ValidatorIndex][]primitives.CommitteeIndex
lastSyncedEpoch primitives.Epoch
}
// NewService sets up a new validator monitor service instance when given a list of validator indices to track.
func NewService(ctx context.Context, config *ValidatorMonitorConfig, tracked []primitives.ValidatorIndex) (*Service, error) {
ctx, cancel := context.WithCancel(ctx)
r := &Service{
config: config,
ctx: ctx,
cancel: cancel,
TrackedValidators: make(map[primitives.ValidatorIndex]bool, len(tracked)),
latestPerformance: make(map[primitives.ValidatorIndex]ValidatorLatestPerformance),
aggregatedPerformance: make(map[primitives.ValidatorIndex]ValidatorAggregatedPerformance),
trackedSyncCommitteeIndices: make(map[primitives.ValidatorIndex][]primitives.CommitteeIndex),
isLogging: false,
}
for _, idx := range tracked {
r.TrackedValidators[idx] = true
}
return r, nil
}
// Start sets up the TrackedValidators map and then calls to wait until the beacon is synced.
func (s *Service) Start() {
s.Lock()
defer s.Unlock()
tracked := make([]primitives.ValidatorIndex, 0, len(s.TrackedValidators))
for idx := range s.TrackedValidators {
tracked = append(tracked, idx)
}
sort.Slice(tracked, func(i, j int) bool { return tracked[i] < tracked[j] })
log.WithFields(logrus.Fields{
"ValidatorIndices": tracked,
}).Info("Starting service")
stateChannel := make(chan *feed.Event, 1)
stateSub := s.config.StateNotifier.StateFeed().Subscribe(stateChannel)
go s.run(stateChannel, stateSub)
}
// run waits until the beacon is synced and starts the monitoring system.
func (s *Service) run(stateChannel chan *feed.Event, stateSub event.Subscription) {
if stateChannel == nil {
log.Error("State state is nil")
return
}
if err := s.waitForSync(s.config.InitialSyncComplete); err != nil {
log.WithError(err)
return
}
st, err := s.config.HeadFetcher.HeadState(s.ctx)
if err != nil {
log.WithError(err).Error("Could not get head state")
return
}
if st == nil {
log.Error("Head state is nil")
return
}
epoch := slots.ToEpoch(st.Slot())
log.WithField("Epoch", epoch).Info("Synced to head epoch, starting reporting performance")
s.Lock()
s.initializePerformanceStructures(st, epoch)
s.Unlock()
s.updateSyncCommitteeTrackedVals(st)
s.Lock()
s.isLogging = true
s.Unlock()
s.monitorRoutine(stateChannel, stateSub)
}
// initializePerformanceStructures initializes the validatorLatestPerformance
// and validatorAggregatedPerformance for each tracked validator.
func (s *Service) initializePerformanceStructures(state state.BeaconState, epoch primitives.Epoch) {
for idx := range s.TrackedValidators {
balance, err := state.BalanceAtIndex(idx)
if err != nil {
log.WithError(err).WithField("ValidatorIndex", idx).Error(
"Could not fetch starting balance, skipping aggregated logs.")
balance = 0
}
s.aggregatedPerformance[idx] = ValidatorAggregatedPerformance{
startEpoch: epoch,
startBalance: balance,
}
s.latestPerformance[idx] = ValidatorLatestPerformance{
balance: balance,
}
}
}
// Status retrieves the status of the service.
func (s *Service) Status() error {
if s.isLogging {
return nil
}
return errors.New("not running")
}
// Stop stops the service.
func (s *Service) Stop() error {
defer s.cancel()
s.isLogging = false
return nil
}
// waitForSync waits until the beacon node is synced to the latest head.
func (s *Service) waitForSync(syncChan chan struct{}) error {
select {
case <-syncChan:
return nil
case <-s.ctx.Done():
log.Debug("Context closed, exiting goroutine")
return errContextClosedWhileWaiting
}
}
// monitorRoutine is the main dispatcher, it registers event channels for the
// state feed and the operation feed. It then calls the appropriate function
// when we get messages after syncing a block or processing attestations/sync
// committee contributions.
func (s *Service) monitorRoutine(stateChannel chan *feed.Event, stateSub event.Subscription) {
defer stateSub.Unsubscribe()
opChannel := make(chan *feed.Event, 1)
opSub := s.config.AttestationNotifier.OperationFeed().Subscribe(opChannel)
defer opSub.Unsubscribe()
for {
select {
case e := <-stateChannel:
if e.Type == statefeed.BlockProcessed {
data, ok := e.Data.(*statefeed.BlockProcessedData)
if !ok {
log.Error("Event feed data is not of type *statefeed.BlockProcessedData")
} else if data.Verified {
// We only process blocks that have been verified
s.processBlock(s.ctx, data.SignedBlock)
}
}
case e := <-opChannel:
switch e.Type {
case operation.UnaggregatedAttReceived:
data, ok := e.Data.(*operation.UnAggregatedAttReceivedData)
if !ok {
log.Error("Event feed data is not of type *operation.UnAggregatedAttReceivedData")
} else {
s.processUnaggregatedAttestation(s.ctx, data.Attestation)
}
case operation.AggregatedAttReceived:
data, ok := e.Data.(*operation.AggregatedAttReceivedData)
if !ok {
log.Error("Event feed data is not of type *operation.AggregatedAttReceivedData")
} else {
s.processAggregatedAttestation(s.ctx, data.Attestation)
}
case operation.ExitReceived:
data, ok := e.Data.(*operation.ExitReceivedData)
if !ok {
log.Error("Event feed data is not of type *operation.ExitReceivedData")
} else {
s.processExit(data.Exit)
}
case operation.SyncCommitteeContributionReceived:
data, ok := e.Data.(*operation.SyncCommitteeContributionReceivedData)
if !ok {
log.Error("Event feed data is not of type *operation.SyncCommitteeContributionReceivedData")
} else {
s.processSyncCommitteeContribution(data.Contribution)
}
}
case <-s.ctx.Done():
log.Debug("Context closed, exiting goroutine")
return
case err := <-stateSub.Err():
log.WithError(err).Error("Could not subscribe to state notifier")
return
}
}
}
// TrackedIndex returns true if input validator index exists in tracked validator list.
// It assumes the caller holds the service Lock
func (s *Service) trackedIndex(idx primitives.ValidatorIndex) bool {
_, ok := s.TrackedValidators[idx]
return ok
}
// updateSyncCommitteeTrackedVals updates the sync committee assignments of our
// tracked validators. It gets called when we sync a block after the Sync Period changes.
func (s *Service) updateSyncCommitteeTrackedVals(state state.BeaconState) {
s.Lock()
defer s.Unlock()
for idx := range s.TrackedValidators {
syncIdx, err := helpers.CurrentPeriodSyncSubcommitteeIndices(state, idx)
if err != nil {
log.WithError(err).WithField("ValidatorIndex", idx).Error(
"Sync committee assignments will not be reported")
delete(s.trackedSyncCommitteeIndices, idx)
} else if len(syncIdx) == 0 {
delete(s.trackedSyncCommitteeIndices, idx)
} else {
s.trackedSyncCommitteeIndices[idx] = syncIdx
}
}
s.lastSyncedEpoch = slots.ToEpoch(state.Slot())
}