Monitor sync committee (#9923)

* Add sync committeee contributions to monitor

* gaz

* Raul's review

* Added lock around TrackedValidators

* add comment to trackedIndex

* add missing locks because of trackedIndex

* Terence fixes 2

* moved TrackedValidator to service from config

* Terence comment fix

Co-authored-by: Raul Jordan <raul@prysmaticlabs.com>
This commit is contained in:
Potuz 2021-11-23 22:56:34 -03:00 committed by GitHub
parent 448d62d6e3
commit a2c1185032
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 374 additions and 110 deletions

View File

@ -5,8 +5,6 @@ import (
types "github.com/prysmaticlabs/eth2-types"
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
"github.com/prysmaticlabs/prysm/config/params"
"github.com/prysmaticlabs/prysm/encoding/bytesutil"
ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/testing/require"
"github.com/prysmaticlabs/prysm/testing/util"
@ -28,10 +26,10 @@ func TestSyncCommitteeCache_CanUpdateAndRetrieve(t *testing.T) {
}{
{
name: "only current epoch",
currentSyncCommittee: convertToCommittee([][]byte{
currentSyncCommittee: util.ConvertToCommittee([][]byte{
pubKeys[1], pubKeys[2], pubKeys[3], pubKeys[2], pubKeys[2],
}),
nextSyncCommittee: convertToCommittee([][]byte{}),
nextSyncCommittee: util.ConvertToCommittee([][]byte{}),
currentSyncMap: map[types.ValidatorIndex][]types.CommitteeIndex{
1: {0},
2: {1, 3, 4},
@ -45,8 +43,8 @@ func TestSyncCommitteeCache_CanUpdateAndRetrieve(t *testing.T) {
},
{
name: "only next epoch",
currentSyncCommittee: convertToCommittee([][]byte{}),
nextSyncCommittee: convertToCommittee([][]byte{
currentSyncCommittee: util.ConvertToCommittee([][]byte{}),
nextSyncCommittee: util.ConvertToCommittee([][]byte{
pubKeys[1], pubKeys[2], pubKeys[3], pubKeys[2], pubKeys[2],
}),
currentSyncMap: map[types.ValidatorIndex][]types.CommitteeIndex{
@ -62,14 +60,14 @@ func TestSyncCommitteeCache_CanUpdateAndRetrieve(t *testing.T) {
},
{
name: "some current epoch and some next epoch",
currentSyncCommittee: convertToCommittee([][]byte{
currentSyncCommittee: util.ConvertToCommittee([][]byte{
pubKeys[1],
pubKeys[2],
pubKeys[3],
pubKeys[2],
pubKeys[2],
}),
nextSyncCommittee: convertToCommittee([][]byte{
nextSyncCommittee: util.ConvertToCommittee([][]byte{
pubKeys[7],
pubKeys[6],
pubKeys[5],
@ -90,14 +88,14 @@ func TestSyncCommitteeCache_CanUpdateAndRetrieve(t *testing.T) {
},
{
name: "some current epoch and some next epoch duplicated across",
currentSyncCommittee: convertToCommittee([][]byte{
currentSyncCommittee: util.ConvertToCommittee([][]byte{
pubKeys[1],
pubKeys[2],
pubKeys[3],
pubKeys[2],
pubKeys[2],
}),
nextSyncCommittee: convertToCommittee([][]byte{
nextSyncCommittee: util.ConvertToCommittee([][]byte{
pubKeys[2],
pubKeys[1],
pubKeys[3],
@ -117,13 +115,13 @@ func TestSyncCommitteeCache_CanUpdateAndRetrieve(t *testing.T) {
},
{
name: "all duplicated",
currentSyncCommittee: convertToCommittee([][]byte{
currentSyncCommittee: util.ConvertToCommittee([][]byte{
pubKeys[100],
pubKeys[100],
pubKeys[100],
pubKeys[100],
}),
nextSyncCommittee: convertToCommittee([][]byte{
nextSyncCommittee: util.ConvertToCommittee([][]byte{
pubKeys[100],
pubKeys[100],
pubKeys[100],
@ -138,13 +136,13 @@ func TestSyncCommitteeCache_CanUpdateAndRetrieve(t *testing.T) {
},
{
name: "unknown keys",
currentSyncCommittee: convertToCommittee([][]byte{
currentSyncCommittee: util.ConvertToCommittee([][]byte{
pubKeys[100],
pubKeys[100],
pubKeys[100],
pubKeys[100],
}),
nextSyncCommittee: convertToCommittee([][]byte{
nextSyncCommittee: util.ConvertToCommittee([][]byte{
pubKeys[100],
pubKeys[100],
pubKeys[100],
@ -189,13 +187,13 @@ func TestSyncCommitteeCache_RootDoesNotExist(t *testing.T) {
func TestSyncCommitteeCache_CanRotate(t *testing.T) {
c := cache.NewSyncCommittee()
s, _ := util.DeterministicGenesisStateAltair(t, 64)
require.NoError(t, s.SetCurrentSyncCommittee(convertToCommittee([][]byte{{1}})))
require.NoError(t, s.SetCurrentSyncCommittee(util.ConvertToCommittee([][]byte{{1}})))
require.NoError(t, c.UpdatePositionsInCommittee([32]byte{'a'}, s))
require.NoError(t, s.SetCurrentSyncCommittee(convertToCommittee([][]byte{{2}})))
require.NoError(t, s.SetCurrentSyncCommittee(util.ConvertToCommittee([][]byte{{2}})))
require.NoError(t, c.UpdatePositionsInCommittee([32]byte{'b'}, s))
require.NoError(t, s.SetCurrentSyncCommittee(convertToCommittee([][]byte{{3}})))
require.NoError(t, s.SetCurrentSyncCommittee(util.ConvertToCommittee([][]byte{{3}})))
require.NoError(t, c.UpdatePositionsInCommittee([32]byte{'c'}, s))
require.NoError(t, s.SetCurrentSyncCommittee(convertToCommittee([][]byte{{4}})))
require.NoError(t, s.SetCurrentSyncCommittee(util.ConvertToCommittee([][]byte{{4}})))
require.NoError(t, c.UpdatePositionsInCommittee([32]byte{'d'}, s))
_, err := c.CurrentPeriodIndexPosition([32]byte{'a'}, 0)
@ -204,19 +202,3 @@ func TestSyncCommitteeCache_CanRotate(t *testing.T) {
_, err = c.CurrentPeriodIndexPosition([32]byte{'c'}, 0)
require.NoError(t, err)
}
func convertToCommittee(inputKeys [][]byte) *ethpb.SyncCommittee {
var pubKeys [][]byte
for i := uint64(0); i < params.BeaconConfig().SyncCommitteeSize; i++ {
if i < uint64(len(inputKeys)) {
pubKeys = append(pubKeys, bytesutil.PadTo(inputKeys[i], params.BeaconConfig().BLSPubkeyLength))
} else {
pubKeys = append(pubKeys, bytesutil.PadTo([]byte{}, params.BeaconConfig().BLSPubkeyLength))
}
}
return &ethpb.SyncCommittee{
Pubkeys: pubKeys,
AggregatePubkey: bytesutil.PadTo([]byte{}, params.BeaconConfig().BLSPubkeyLength),
}
}

View File

@ -8,6 +8,7 @@ go_library(
"process_attestation.go",
"process_block.go",
"process_exit.go",
"process_sync_committee.go",
"service.go",
],
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/monitor",
@ -38,10 +39,12 @@ go_test(
"process_attestation_test.go",
"process_block_test.go",
"process_exit_test.go",
"process_sync_committee_test.go",
"service_test.go",
],
embed = [":go_default_library"],
deps = [
"//beacon-chain/core/altair:go_default_library",
"//beacon-chain/db/testing:go_default_library",
"//beacon-chain/state/stategen:go_default_library",
"//config/params:go_default_library",

View File

@ -55,6 +55,7 @@ var (
"validator_index",
},
)
// proposedSlotsCounter used to track proposed blocks
proposedSlotsCounter = promauto.NewCounterVec(
prometheus.CounterOpts{
@ -77,4 +78,16 @@ var (
"validator_index",
},
)
// syncCommitteeContributionCounter used to track sync committee
// contributions
syncCommitteeContributionCounter = promauto.NewCounterVec(
prometheus.CounterOpts{
Namespace: "monitor",
Name: "sync_committee_contributions_total",
Help: "Number of Sync committee contributions performed",
},
[]string{
"validator_index",
},
)
)

View File

@ -20,12 +20,13 @@ import (
// updatedPerformanceFromTrackedVal returns true if the validator is tracked and if the
// given slot is different than the last attested slot from this validator.
// It assumes that a read lock is held on the monitor service.
func (s *Service) updatedPerformanceFromTrackedVal(idx types.ValidatorIndex, slot types.Slot) bool {
if !s.TrackedIndex(types.ValidatorIndex(idx)) {
if !s.trackedIndex(idx) {
return false
}
if lp, ok := s.latestPerformance[types.ValidatorIndex(idx)]; ok {
if lp, ok := s.latestPerformance[idx]; ok {
return lp.attestedSlot != slot
}
return false
@ -73,6 +74,8 @@ func (s *Service) processIncludedAttestation(ctx context.Context, state state.Be
log.WithError(err).Error("Could not get attesting indices")
return
}
s.Lock()
defer s.Unlock()
for _, idx := range attestingIndices {
if s.updatedPerformanceFromTrackedVal(types.ValidatorIndex(idx), att.Data.Slot) {
logFields := logMessageTimelyFlagsForIndex(types.ValidatorIndex(idx), att.Data)
@ -87,7 +90,7 @@ func (s *Service) processIncludedAttestation(ctx context.Context, state state.Be
aggregatedPerf.totalRequestedCount++
latestPerf := s.latestPerformance[types.ValidatorIndex(idx)]
balanceChg := balance - latestPerf.balance
balanceChg := int64(balance - latestPerf.balance)
latestPerf.balanceChange = balanceChg
latestPerf.balance = balance
latestPerf.attestedSlot = att.Data.Slot
@ -165,10 +168,13 @@ func (s *Service) processIncludedAttestation(ctx context.Context, state state.Be
// processUnaggregatedAttestation logs when the beacon node sees an unaggregated attestation from one of our
// tracked validators
func (s *Service) processUnaggregatedAttestation(ctx context.Context, att *ethpb.Attestation) {
s.RLock()
defer s.RUnlock()
root := bytesutil.ToBytes32(att.Data.BeaconBlockRoot)
state := s.config.StateGen.StateByRootIfCachedNoCopy(root)
if state == nil {
log.Debug("Skipping unaggregated attestation due to state not found in cache")
log.WithField("BeaconBlockRoot", fmt.Sprintf("%#x", bytesutil.Trunc(root[:]))).Debug(
"Skipping unaggregated attestation due to state not found in cache")
return
}
attestingIndices, err := attestingIndices(ctx, state, att)
@ -187,7 +193,9 @@ func (s *Service) processUnaggregatedAttestation(ctx context.Context, att *ethpb
// processAggregatedAttestation logs when we see an aggregation from one of our tracked validators or an aggregated
// attestation from one of our tracked validators
func (s *Service) processAggregatedAttestation(ctx context.Context, att *ethpb.AggregateAttestationAndProof) {
if s.TrackedIndex(att.AggregatorIndex) {
s.Lock()
defer s.Unlock()
if s.trackedIndex(att.AggregatorIndex) {
log.WithFields(logrus.Fields{
"ValidatorIndex": att.AggregatorIndex,
}).Info("Processed attestation aggregation")
@ -201,7 +209,8 @@ func (s *Service) processAggregatedAttestation(ctx context.Context, att *ethpb.A
copy(root[:], att.Aggregate.Data.BeaconBlockRoot)
state := s.config.StateGen.StateByRootIfCachedNoCopy(root)
if state == nil {
log.Debug("Skipping agregated attestation due to state not found in cache")
log.WithField("BeaconBlockRoot", fmt.Sprintf("%#x", bytesutil.Trunc(root[:]))).Debug(
"Skipping agregated attestation due to state not found in cache")
return
}
attestingIndices, err := attestingIndices(ctx, state, att.Aggregate)

View File

@ -41,21 +41,26 @@ func setupService(t *testing.T) *Service {
balance: 31900000000,
},
}
aggregatedPerformance := map[types.ValidatorIndex]ValidatorAggregatedPerformance{
1: {},
2: {},
12: {},
15: {},
}
trackedSyncCommitteeIndices := map[types.ValidatorIndex][]types.CommitteeIndex{
1: {0, 1, 2, 3},
12: {4, 5},
}
return &Service{
config: &ValidatorMonitorConfig{
StateGen: stategen.New(beaconDB),
TrackedValidators: trackedVals,
StateGen: stategen.New(beaconDB),
},
latestPerformance: latestPerformance,
aggregatedPerformance: aggregatedPerformance,
TrackedValidators: trackedVals,
latestPerformance: latestPerformance,
aggregatedPerformance: aggregatedPerformance,
trackedSyncCommitteeIndices: trackedSyncCommitteeIndices,
lastSyncedEpoch: 0,
}
}

View File

@ -9,6 +9,7 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/state"
"github.com/prysmaticlabs/prysm/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/block"
"github.com/prysmaticlabs/prysm/time/slots"
"github.com/sirupsen/logrus"
)
@ -39,13 +40,26 @@ func (s *Service) processBlock(ctx context.Context, b block.SignedBeaconBlock) {
return
}
currEpoch := slots.ToEpoch(blk.Slot())
s.RLock()
lastSyncedEpoch := s.lastSyncedEpoch
s.RUnlock()
if currEpoch != lastSyncedEpoch &&
slots.SyncCommitteePeriod(currEpoch) == slots.SyncCommitteePeriod(lastSyncedEpoch) {
s.updateSyncCommitteeTrackedVals(state)
}
s.processSyncAggregate(state, blk)
s.processProposedBlock(state, root, blk)
s.processAttestations(ctx, state, blk)
}
// processProposedBlock logs the event that one of our tracked validators proposed a block that was included
func (s *Service) processProposedBlock(state state.BeaconState, root [32]byte, blk block.BeaconBlock) {
if s.TrackedIndex(blk.ProposerIndex()) {
s.Lock()
defer s.Unlock()
if s.trackedIndex(blk.ProposerIndex()) {
// update metrics
proposedSlotsCounter.WithLabelValues(fmt.Sprintf("%d", blk.ProposerIndex())).Inc()
@ -57,7 +71,7 @@ func (s *Service) processProposedBlock(state state.BeaconState, root [32]byte, b
}
latestPerf := s.latestPerformance[blk.ProposerIndex()]
balanceChg := balance - latestPerf.balance
balanceChg := int64(balance - latestPerf.balance)
latestPerf.balanceChange = balanceChg
latestPerf.balance = balance
s.latestPerformance[blk.ProposerIndex()] = latestPerf
@ -80,9 +94,11 @@ func (s *Service) processProposedBlock(state state.BeaconState, root [32]byte, b
// processSlashings logs the event of one of our tracked validators was slashed
func (s *Service) processSlashings(blk block.BeaconBlock) {
s.RLock()
defer s.RUnlock()
for _, slashing := range blk.Body().ProposerSlashings() {
idx := slashing.Header_1.Header.ProposerIndex
if s.TrackedIndex(idx) {
if s.trackedIndex(idx) {
log.WithFields(logrus.Fields{
"ProposerIndex": idx,
"Slot:": blk.Slot(),
@ -95,7 +111,7 @@ func (s *Service) processSlashings(blk block.BeaconBlock) {
for _, slashing := range blk.Body().AttesterSlashings() {
for _, idx := range blocks.SlashableAttesterIndices(slashing) {
if s.TrackedIndex(types.ValidatorIndex(idx)) {
if s.trackedIndex(types.ValidatorIndex(idx)) {
log.WithFields(logrus.Fields{
"AttesterIndex": idx,
"Slot:": blk.Slot(),

View File

@ -6,6 +6,7 @@ import (
"testing"
types "github.com/prysmaticlabs/eth2-types"
"github.com/prysmaticlabs/prysm/beacon-chain/core/altair"
"github.com/prysmaticlabs/prysm/config/params"
"github.com/prysmaticlabs/prysm/encoding/bytesutil"
ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
@ -116,11 +117,9 @@ func TestProcessSlashings(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
hook := logTest.NewGlobal()
s := &Service{
config: &ValidatorMonitorConfig{
TrackedValidators: map[types.ValidatorIndex]interface{}{
1: nil,
2: nil,
},
TrackedValidators: map[types.ValidatorIndex]interface{}{
1: nil,
2: nil,
},
}
s.processSlashings(wrapper.WrappedPhase0BeaconBlock(tt.block))
@ -178,31 +177,55 @@ func TestProcessProposedBlock(t *testing.T) {
}
func TestProcessBlock_ProposerAndSlashedTrackedVals(t *testing.T) {
func TestProcessBlock_AllEventsTrackedVals(t *testing.T) {
hook := logTest.NewGlobal()
ctx := context.Background()
s := setupService(t)
genesis, keys := util.DeterministicGenesisState(t, 64)
genesis, keys := util.DeterministicGenesisStateAltair(t, 64)
c, err := altair.NextSyncCommittee(ctx, genesis)
require.NoError(t, err)
require.NoError(t, genesis.SetCurrentSyncCommittee(c))
genConfig := util.DefaultBlockGenConfig()
genConfig.NumProposerSlashings = 1
b, err := util.GenerateFullBlock(genesis, keys, genConfig, 1)
b, err := util.GenerateFullBlockAltair(genesis, keys, genConfig, 1)
require.NoError(t, err)
s := setupService(t)
pubKeys := make([][]byte, 3)
pubKeys[0] = genesis.Validators()[0].PublicKey
pubKeys[1] = genesis.Validators()[1].PublicKey
pubKeys[2] = genesis.Validators()[2].PublicKey
currentSyncCommittee := util.ConvertToCommittee([][]byte{
pubKeys[0], pubKeys[1], pubKeys[2], pubKeys[1], pubKeys[1],
})
require.NoError(t, genesis.SetCurrentSyncCommittee(currentSyncCommittee))
idx := b.Block.Body.ProposerSlashings[0].Header_1.Header.ProposerIndex
if !s.TrackedIndex(idx) {
s.config.TrackedValidators[idx] = nil
s.RLock()
if !s.trackedIndex(idx) {
s.TrackedValidators[idx] = nil
s.latestPerformance[idx] = ValidatorLatestPerformance{
balance: 31900000000,
}
s.aggregatedPerformance[idx] = ValidatorAggregatedPerformance{}
}
s.RUnlock()
s.updateSyncCommitteeTrackedVals(genesis)
require.NoError(t, err)
root, err := b.GetBlock().HashTreeRoot()
require.NoError(t, err)
require.NoError(t, s.config.StateGen.SaveState(ctx, root, genesis))
wanted1 := fmt.Sprintf("\"Proposed block was included\" BalanceChange=100000000 BlockRoot=%#x NewBalance=32000000000 ParentRoot=0x67a9fe4d0d8d ProposerIndex=15 Slot=1 Version=0 prefix=monitor", bytesutil.Trunc(root[:]))
wanted1 := fmt.Sprintf("\"Proposed block was included\" BalanceChange=100000000 BlockRoot=%#x NewBalance=32000000000 ParentRoot=0xf732eaeb7fae ProposerIndex=15 Slot=1 Version=1 prefix=monitor", bytesutil.Trunc(root[:]))
wanted2 := fmt.Sprintf("\"Proposer slashing was included\" ProposerIndex=%d Root1=0x000100000000 Root2=0x000200000000 SlashingSlot=0 Slot:=1 prefix=monitor", idx)
wrapped := wrapper.WrappedPhase0SignedBeaconBlock(b)
wanted3 := "\"Sync committee contribution included\" BalanceChange=0 Contributions=3 ExpectedContrib=3 NewBalance=32000000000 ValidatorIndex=1 prefix=monitor"
wanted4 := "\"Sync committee contribution included\" BalanceChange=0 Contributions=1 ExpectedContrib=1 NewBalance=32000000000 ValidatorIndex=2 prefix=monitor"
wrapped, err := wrapper.WrappedAltairSignedBeaconBlock(b)
require.NoError(t, err)
s.processBlock(ctx, wrapped)
require.LogsContain(t, hook, wanted1)
require.LogsContain(t, hook, wanted2)
require.LogsContain(t, hook, wanted3)
require.LogsContain(t, hook, wanted4)
}

View File

@ -9,9 +9,11 @@ import (
// processExitsFromBlock logs the event of one of our tracked validators' exit was
// included in a block
func (s *Service) processExitsFromBlock(blk block.BeaconBlock) {
s.RLock()
defer s.RUnlock()
for _, exit := range blk.Body().VoluntaryExits() {
idx := exit.Exit.ValidatorIndex
if s.TrackedIndex(idx) {
if s.trackedIndex(idx) {
log.WithFields(logrus.Fields{
"ValidatorIndex": idx,
"Slot": blk.Slot(),
@ -23,7 +25,9 @@ func (s *Service) processExitsFromBlock(blk block.BeaconBlock) {
// processExit logs the event of one of our tracked validators' exit was processed
func (s *Service) processExit(exit *ethpb.SignedVoluntaryExit) {
idx := exit.Exit.ValidatorIndex
if s.TrackedIndex(idx) {
s.RLock()
defer s.RUnlock()
if s.trackedIndex(idx) {
log.WithFields(logrus.Fields{
"ValidatorIndex": idx,
}).Info("Voluntary exit was processed")

View File

@ -13,11 +13,9 @@ import (
func TestProcessExitsFromBlockTrackedIndices(t *testing.T) {
hook := logTest.NewGlobal()
s := &Service{
config: &ValidatorMonitorConfig{
TrackedValidators: map[types.ValidatorIndex]interface{}{
1: nil,
2: nil,
},
TrackedValidators: map[types.ValidatorIndex]interface{}{
1: nil,
2: nil,
},
}
@ -49,11 +47,9 @@ func TestProcessExitsFromBlockTrackedIndices(t *testing.T) {
func TestProcessExitsFromBlockUntrackedIndices(t *testing.T) {
hook := logTest.NewGlobal()
s := &Service{
config: &ValidatorMonitorConfig{
TrackedValidators: map[types.ValidatorIndex]interface{}{
1: nil,
2: nil,
},
TrackedValidators: map[types.ValidatorIndex]interface{}{
1: nil,
2: nil,
},
}
@ -85,11 +81,9 @@ func TestProcessExitsFromBlockUntrackedIndices(t *testing.T) {
func TestProcessExitP2PTrackedIndices(t *testing.T) {
hook := logTest.NewGlobal()
s := &Service{
config: &ValidatorMonitorConfig{
TrackedValidators: map[types.ValidatorIndex]interface{}{
1: nil,
2: nil,
},
TrackedValidators: map[types.ValidatorIndex]interface{}{
1: nil,
2: nil,
},
}
@ -107,11 +101,9 @@ func TestProcessExitP2PTrackedIndices(t *testing.T) {
func TestProcessExitP2PUntrackedIndices(t *testing.T) {
hook := logTest.NewGlobal()
s := &Service{
config: &ValidatorMonitorConfig{
TrackedValidators: map[types.ValidatorIndex]interface{}{
1: nil,
2: nil,
},
TrackedValidators: map[types.ValidatorIndex]interface{}{
1: nil,
2: nil,
},
}

View File

@ -0,0 +1,79 @@
package monitor
import (
"fmt"
"github.com/prysmaticlabs/prysm/beacon-chain/state"
ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/block"
"github.com/sirupsen/logrus"
)
// processSyncCommitteeContribution logs the event that one of our tracked
// validators' aggregated sync contribution has been processed.
// TODO: We do not log if a sync contribution was included in an aggregate (we
// log them when they are included in blocks)
func (s *Service) processSyncCommitteeContribution(contribution *ethpb.SignedContributionAndProof) {
idx := contribution.Message.AggregatorIndex
s.Lock()
defer s.Unlock()
if s.trackedIndex(idx) {
aggPerf := s.aggregatedPerformance[idx]
aggPerf.totalSyncComitteeAggregations++
s.aggregatedPerformance[idx] = aggPerf
log.WithField("ValidatorIndex", contribution.Message.AggregatorIndex).Info("Sync committee aggregation processed")
}
}
// processSyncAggregate logs the event that one of our tracked validators is a sync-committee member and its
// contribution was included
func (s *Service) processSyncAggregate(state state.BeaconState, blk block.BeaconBlock) {
if blk == nil || blk.Body() == nil {
return
}
bits, err := blk.Body().SyncAggregate()
if err != nil {
log.WithError(err).Error("Cannot get SyncAggregate")
return
}
s.Lock()
defer s.Unlock()
for validatorIdx, committeeIndices := range s.trackedSyncCommitteeIndices {
if len(committeeIndices) > 0 {
contrib := 0
for _, idx := range committeeIndices {
if bits.SyncCommitteeBits.BitAt(uint64(idx)) {
contrib++
}
}
balance, err := state.BalanceAtIndex(validatorIdx)
if err != nil {
log.Error("Could not get balance")
return
}
latestPerf := s.latestPerformance[validatorIdx]
balanceChg := int64(balance - latestPerf.balance)
latestPerf.balanceChange = balanceChg
latestPerf.balance = balance
s.latestPerformance[validatorIdx] = latestPerf
aggPerf := s.aggregatedPerformance[validatorIdx]
aggPerf.totalSyncComitteeContributions += uint64(contrib)
s.aggregatedPerformance[validatorIdx] = aggPerf
syncCommitteeContributionCounter.WithLabelValues(
fmt.Sprintf("%d", validatorIdx)).Add(float64(contrib))
log.WithFields(logrus.Fields{
"ValidatorIndex": validatorIdx,
"ExpectedContrib": len(committeeIndices),
"Contributions": contrib,
"NewBalance": balance,
"BalanceChange": balanceChg,
}).Info("Sync committee contribution included")
}
}
}

View File

@ -0,0 +1,59 @@
package monitor
import (
"testing"
"github.com/prysmaticlabs/go-bitfield"
ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/wrapper"
"github.com/prysmaticlabs/prysm/testing/require"
"github.com/prysmaticlabs/prysm/testing/util"
logTest "github.com/sirupsen/logrus/hooks/test"
)
func TestProcessSyncCommitteeContribution(t *testing.T) {
hook := logTest.NewGlobal()
s := setupService(t)
contrib := &ethpb.SignedContributionAndProof{
Message: &ethpb.ContributionAndProof{
AggregatorIndex: 1,
},
}
s.processSyncCommitteeContribution(contrib)
require.LogsContain(t, hook, "\"Sync committee aggregation processed\" ValidatorIndex=1")
require.LogsDoNotContain(t, hook, "ValidatorIndex=2")
}
func TestProcessSyncAggregate(t *testing.T) {
hook := logTest.NewGlobal()
s := setupService(t)
beaconState, _ := util.DeterministicGenesisStateAltair(t, 256)
block := &ethpb.BeaconBlockAltair{
Slot: 2,
Body: &ethpb.BeaconBlockBodyAltair{
SyncAggregate: &ethpb.SyncAggregate{
SyncCommitteeBits: bitfield.Bitvector512{
0x31, 0xff, 0xff, 0xff, 0xff, 0x3f, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
},
},
},
}
wrappedBlock, err := wrapper.WrappedAltairBeaconBlock(block)
require.NoError(t, err)
s.processSyncAggregate(beaconState, wrappedBlock)
require.LogsContain(t, hook, "\"Sync committee contribution included\" BalanceChange=0 Contributions=1 ExpectedContrib=4 NewBalance=32000000000 ValidatorIndex=1 prefix=monitor")
require.LogsContain(t, hook, "\"Sync committee contribution included\" BalanceChange=100000000 Contributions=2 ExpectedContrib=2 NewBalance=32000000000 ValidatorIndex=12 prefix=monitor")
require.LogsDoNotContain(t, hook, "ValidatorIndex=2")
}

View File

@ -1,8 +1,13 @@
package monitor
import (
"sync"
types "github.com/prysmaticlabs/eth2-types"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/state"
"github.com/prysmaticlabs/prysm/beacon-chain/state/stategen"
"github.com/prysmaticlabs/prysm/time/slots"
)
// ValidatorLatestPerformance keeps track of the latest participation of the validator
@ -13,41 +18,71 @@ type ValidatorLatestPerformance struct {
timelyTarget bool
timelyHead bool
balance uint64
balanceChange uint64
balanceChange int64
}
// ValidatorAggregatedPerformance keeps track of the accumulated performance of
// the validator since launch
type ValidatorAggregatedPerformance struct {
totalAttestedCount uint64
totalRequestedCount uint64
totalDistance uint64
totalCorrectSource uint64
totalCorrectTarget uint64
totalCorrectHead uint64
totalProposedCount uint64
totalAggregations uint64
totalAttestedCount uint64
totalRequestedCount uint64
totalDistance uint64
totalCorrectSource uint64
totalCorrectTarget uint64
totalCorrectHead uint64
totalProposedCount uint64
totalAggregations uint64
totalSyncComitteeContributions uint64
totalSyncComitteeAggregations uint64
}
// ValidatorMonitorConfig contains the list of validator indices that the
// monitor service tracks, as well as the event feed notifier that the
// monitor needs to subscribe.
type ValidatorMonitorConfig struct {
StateGen stategen.StateManager
TrackedValidators map[types.ValidatorIndex]interface{}
StateGen stategen.StateManager
}
// Service is the main structure that tracks validators and reports logs and
// metrics of their performances throughout their lifetime.
type Service struct {
config *ValidatorMonitorConfig
latestPerformance map[types.ValidatorIndex]ValidatorLatestPerformance
aggregatedPerformance map[types.ValidatorIndex]ValidatorAggregatedPerformance
config *ValidatorMonitorConfig
// Locks access to TrackedValidators, latestPerformance, aggregatedPerformance,
// trackedSyncedCommitteeIndices and lastSyncedEpoch
sync.RWMutex
TrackedValidators map[types.ValidatorIndex]interface{}
latestPerformance map[types.ValidatorIndex]ValidatorLatestPerformance
aggregatedPerformance map[types.ValidatorIndex]ValidatorAggregatedPerformance
trackedSyncCommitteeIndices map[types.ValidatorIndex][]types.CommitteeIndex
lastSyncedEpoch types.Epoch
}
// TrackedIndex returns if the given validator index corresponds to one of the
// validators we follow
func (s *Service) TrackedIndex(idx types.ValidatorIndex) bool {
_, ok := s.config.TrackedValidators[idx]
// validators we follow.
// It assumes the caller holds the service Lock
func (s *Service) trackedIndex(idx types.ValidatorIndex) bool {
_, ok := s.TrackedValidators[idx]
return ok
}
// updateSyncCommitteeTrackedVals updates the sync committee assignments of our
// tracked validators. It gets called when we sync a block after the Sync Period changes.
func (s *Service) updateSyncCommitteeTrackedVals(state state.BeaconState) {
s.Lock()
defer s.Unlock()
for idx := range s.TrackedValidators {
syncIdx, err := helpers.CurrentPeriodSyncSubcommitteeIndices(state, idx)
if err != nil {
log.WithError(err).WithField("ValidatorIndex", idx).Error(
"Sync committee assignments will not be reported")
delete(s.trackedSyncCommitteeIndices, idx)
} else if len(syncIdx) == 0 {
delete(s.trackedSyncCommitteeIndices, idx)
} else {
s.trackedSyncCommitteeIndices[idx] = syncIdx
}
}
s.lastSyncedEpoch = slots.ToEpoch(state.Slot())
}

View File

@ -5,17 +5,41 @@ import (
types "github.com/prysmaticlabs/eth2-types"
"github.com/prysmaticlabs/prysm/testing/require"
"github.com/prysmaticlabs/prysm/testing/util"
logTest "github.com/sirupsen/logrus/hooks/test"
)
func TestTrackedIndex(t *testing.T) {
s := &Service{
config: &ValidatorMonitorConfig{
TrackedValidators: map[types.ValidatorIndex]interface{}{
1: nil,
2: nil,
},
TrackedValidators: map[types.ValidatorIndex]interface{}{
1: nil,
2: nil,
},
}
require.Equal(t, s.TrackedIndex(types.ValidatorIndex(1)), true)
require.Equal(t, s.TrackedIndex(types.ValidatorIndex(3)), false)
require.Equal(t, s.trackedIndex(types.ValidatorIndex(1)), true)
require.Equal(t, s.trackedIndex(types.ValidatorIndex(3)), false)
}
func TestUpdateSyncCommitteeTrackedVals(t *testing.T) {
hook := logTest.NewGlobal()
s := setupService(t)
state, _ := util.DeterministicGenesisStateAltair(t, 1024)
pubKeys := make([][]byte, 3)
pubKeys[0] = state.Validators()[0].PublicKey
pubKeys[1] = state.Validators()[1].PublicKey
pubKeys[2] = state.Validators()[2].PublicKey
currentSyncCommittee := util.ConvertToCommittee([][]byte{
pubKeys[0], pubKeys[1], pubKeys[2], pubKeys[1], pubKeys[1],
})
require.NoError(t, state.SetCurrentSyncCommittee(currentSyncCommittee))
s.updateSyncCommitteeTrackedVals(state)
require.LogsDoNotContain(t, hook, "Sync committee assignments will not be reported")
newTrackedSyncIndices := map[types.ValidatorIndex][]types.CommitteeIndex{
1: {1, 3, 4},
2: {2},
}
require.DeepEqual(t, s.trackedSyncCommitteeIndices, newTrackedSyncIndices)
}

View File

@ -1,6 +1,8 @@
package util
import (
"github.com/prysmaticlabs/prysm/config/params"
"github.com/prysmaticlabs/prysm/encoding/bytesutil"
ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
)
@ -14,3 +16,21 @@ func HydrateSyncCommittee(s *ethpb.SyncCommitteeMessage) *ethpb.SyncCommitteeMes
}
return s
}
// ConvertToCommittee takes a list of pubkeys and returns a SyncCommittee with
// these keys as members. Some keys may appear repeated
func ConvertToCommittee(inputKeys [][]byte) *ethpb.SyncCommittee {
var pubKeys [][]byte
for i := uint64(0); i < params.BeaconConfig().SyncCommitteeSize; i++ {
if i < uint64(len(inputKeys)) {
pubKeys = append(pubKeys, bytesutil.PadTo(inputKeys[i], params.BeaconConfig().BLSPubkeyLength))
} else {
pubKeys = append(pubKeys, bytesutil.PadTo([]byte{}, params.BeaconConfig().BLSPubkeyLength))
}
}
return &ethpb.SyncCommittee{
Pubkeys: pubKeys,
AggregatePubkey: bytesutil.PadTo([]byte{}, params.BeaconConfig().BLSPubkeyLength),
}
}