prysm-pulse/beacon-chain/monitor/service.go
Radosław Kapka 0b261cba5e
Unify log fields (#13654)
* unify fields

* fix tests
2024-02-22 22:40:36 +00:00

290 lines
9.6 KiB
Go

package monitor
import (
"context"
"errors"
"sort"
"sync"
"github.com/prysmaticlabs/prysm/v5/async/event"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/operation"
statefeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/state"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state/stategen"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v5/time/slots"
"github.com/sirupsen/logrus"
)
// Error when the context is closed while waiting for sync.
var errContextClosedWhileWaiting = errors.New("context closed while waiting for beacon to sync to latest Head")
// ValidatorLatestPerformance keeps track of the latest participation of the validator
type ValidatorLatestPerformance struct {
attestedSlot primitives.Slot
inclusionSlot primitives.Slot
timelySource bool
timelyTarget bool
timelyHead bool
balance uint64
balanceChange int64
}
// ValidatorAggregatedPerformance keeps track of the accumulated performance of
// the tracked validator since start of monitor service.
type ValidatorAggregatedPerformance struct {
startEpoch primitives.Epoch
startBalance uint64
totalAttestedCount uint64
totalRequestedCount uint64
totalDistance uint64
totalCorrectSource uint64
totalCorrectTarget uint64
totalCorrectHead uint64
totalProposedCount uint64
totalAggregations uint64
totalSyncCommitteeContributions uint64
totalSyncCommitteeAggregations uint64
}
// ValidatorMonitorConfig contains the list of validator indices that the
// monitor service tracks, and the event feed notifier that the
// monitor needs to subscribe.
type ValidatorMonitorConfig struct {
StateNotifier statefeed.Notifier
AttestationNotifier operation.Notifier
HeadFetcher blockchain.HeadFetcher
StateGen stategen.StateManager
InitialSyncComplete chan struct{}
}
// Service is the main structure that tracks validators and reports logs and
// metrics of their performances throughout their lifetime.
type Service struct {
config *ValidatorMonitorConfig
ctx context.Context
cancel context.CancelFunc
isLogging bool
// Locks access to TrackedValidators, latestPerformance, aggregatedPerformance,
// trackedSyncedCommitteeIndices and lastSyncedEpoch
sync.RWMutex
TrackedValidators map[primitives.ValidatorIndex]bool
latestPerformance map[primitives.ValidatorIndex]ValidatorLatestPerformance
aggregatedPerformance map[primitives.ValidatorIndex]ValidatorAggregatedPerformance
trackedSyncCommitteeIndices map[primitives.ValidatorIndex][]primitives.CommitteeIndex
lastSyncedEpoch primitives.Epoch
}
// NewService sets up a new validator monitor service instance when given a list of validator indices to track.
func NewService(ctx context.Context, config *ValidatorMonitorConfig, tracked []primitives.ValidatorIndex) (*Service, error) {
ctx, cancel := context.WithCancel(ctx)
r := &Service{
config: config,
ctx: ctx,
cancel: cancel,
TrackedValidators: make(map[primitives.ValidatorIndex]bool, len(tracked)),
latestPerformance: make(map[primitives.ValidatorIndex]ValidatorLatestPerformance),
aggregatedPerformance: make(map[primitives.ValidatorIndex]ValidatorAggregatedPerformance),
trackedSyncCommitteeIndices: make(map[primitives.ValidatorIndex][]primitives.CommitteeIndex),
isLogging: false,
}
for _, idx := range tracked {
r.TrackedValidators[idx] = true
}
return r, nil
}
// Start sets up the TrackedValidators map and then calls to wait until the beacon is synced.
func (s *Service) Start() {
s.Lock()
defer s.Unlock()
tracked := make([]primitives.ValidatorIndex, 0, len(s.TrackedValidators))
for idx := range s.TrackedValidators {
tracked = append(tracked, idx)
}
sort.Slice(tracked, func(i, j int) bool { return tracked[i] < tracked[j] })
log.WithFields(logrus.Fields{
"validatorIndices": tracked,
}).Info("Starting service")
go s.run()
}
// run waits until the beacon is synced and starts the monitoring system.
func (s *Service) run() {
if err := s.waitForSync(s.config.InitialSyncComplete); err != nil {
log.WithError(err)
return
}
st, err := s.config.HeadFetcher.HeadState(s.ctx)
if err != nil {
log.WithError(err).Error("Could not get head state")
return
}
if st == nil {
log.Error("Head state is nil")
return
}
epoch := slots.ToEpoch(st.Slot())
log.WithField("epoch", epoch).Info("Synced to head epoch, starting reporting performance")
s.Lock()
s.initializePerformanceStructures(st, epoch)
s.Unlock()
s.updateSyncCommitteeTrackedVals(st)
s.Lock()
s.isLogging = true
s.Unlock()
stateChannel := make(chan *feed.Event, 1)
stateSub := s.config.StateNotifier.StateFeed().Subscribe(stateChannel)
s.monitorRoutine(stateChannel, stateSub)
}
// initializePerformanceStructures initializes the validatorLatestPerformance
// and validatorAggregatedPerformance for each tracked validator.
func (s *Service) initializePerformanceStructures(state state.BeaconState, epoch primitives.Epoch) {
for idx := range s.TrackedValidators {
balance, err := state.BalanceAtIndex(idx)
if err != nil {
log.WithError(err).WithField("validatorIndex", idx).Error(
"Could not fetch starting balance, skipping aggregated logs.")
balance = 0
}
s.aggregatedPerformance[idx] = ValidatorAggregatedPerformance{
startEpoch: epoch,
startBalance: balance,
}
s.latestPerformance[idx] = ValidatorLatestPerformance{
balance: balance,
}
}
}
// Status retrieves the status of the service.
func (s *Service) Status() error {
if s.isLogging {
return nil
}
return errors.New("not running")
}
// Stop stops the service.
func (s *Service) Stop() error {
defer s.cancel()
s.isLogging = false
return nil
}
// waitForSync waits until the beacon node is synced to the latest head.
func (s *Service) waitForSync(syncChan chan struct{}) error {
select {
case <-syncChan:
return nil
case <-s.ctx.Done():
log.Debug("Context closed, exiting goroutine")
return errContextClosedWhileWaiting
}
}
// monitorRoutine is the main dispatcher, it registers event channels for the
// state feed and the operation feed. It then calls the appropriate function
// when we get messages after syncing a block or processing attestations/sync
// committee contributions.
func (s *Service) monitorRoutine(stateChannel chan *feed.Event, stateSub event.Subscription) {
defer stateSub.Unsubscribe()
opChannel := make(chan *feed.Event, 1)
opSub := s.config.AttestationNotifier.OperationFeed().Subscribe(opChannel)
defer opSub.Unsubscribe()
for {
select {
case e := <-stateChannel:
if e.Type == statefeed.BlockProcessed {
data, ok := e.Data.(*statefeed.BlockProcessedData)
if !ok {
log.Error("Event feed data is not of type *statefeed.BlockProcessedData")
} else if data.Verified {
// We only process blocks that have been verified
s.processBlock(s.ctx, data.SignedBlock)
}
}
case e := <-opChannel:
switch e.Type {
case operation.UnaggregatedAttReceived:
data, ok := e.Data.(*operation.UnAggregatedAttReceivedData)
if !ok {
log.Error("Event feed data is not of type *operation.UnAggregatedAttReceivedData")
} else {
s.processUnaggregatedAttestation(s.ctx, data.Attestation)
}
case operation.AggregatedAttReceived:
data, ok := e.Data.(*operation.AggregatedAttReceivedData)
if !ok {
log.Error("Event feed data is not of type *operation.AggregatedAttReceivedData")
} else {
s.processAggregatedAttestation(s.ctx, data.Attestation)
}
case operation.ExitReceived:
data, ok := e.Data.(*operation.ExitReceivedData)
if !ok {
log.Error("Event feed data is not of type *operation.ExitReceivedData")
} else {
s.processExit(data.Exit)
}
case operation.SyncCommitteeContributionReceived:
data, ok := e.Data.(*operation.SyncCommitteeContributionReceivedData)
if !ok {
log.Error("Event feed data is not of type *operation.SyncCommitteeContributionReceivedData")
} else {
s.processSyncCommitteeContribution(data.Contribution)
}
}
case <-s.ctx.Done():
log.Debug("Context closed, exiting goroutine")
return
case err := <-stateSub.Err():
log.WithError(err).Error("Could not subscribe to state notifier")
return
}
}
}
// TrackedIndex returns true if input validator index exists in tracked validator list.
// It assumes the caller holds the service Lock
func (s *Service) trackedIndex(idx primitives.ValidatorIndex) bool {
_, ok := s.TrackedValidators[idx]
return ok
}
// updateSyncCommitteeTrackedVals updates the sync committee assignments of our
// tracked validators. It gets called when we sync a block after the Sync Period changes.
func (s *Service) updateSyncCommitteeTrackedVals(state state.BeaconState) {
s.Lock()
defer s.Unlock()
for idx := range s.TrackedValidators {
syncIdx, err := helpers.CurrentPeriodSyncSubcommitteeIndices(state, idx)
if err != nil {
log.WithError(err).WithField("validatorIndex", idx).Error(
"Sync committee assignments will not be reported")
delete(s.trackedSyncCommitteeIndices, idx)
} else if len(syncIdx) == 0 {
delete(s.trackedSyncCommitteeIndices, idx)
} else {
s.trackedSyncCommitteeIndices[idx] = syncIdx
}
}
s.lastSyncedEpoch = slots.ToEpoch(state.Slot())
}