Performance Metrics for Prysm (#11377)

* atts performance and blocks

* idiomatic observe

* all attestation related errors

* block metrics

* db metrics

* metrics func

* rem old metrics

* naming

* rem metric

* rem unneeded

* fix

* fix up

* rev

* fix

* rem
This commit is contained in:
Raul Jordan 2022-08-31 21:26:19 -04:00 committed by GitHub
parent 587ba83aca
commit bcaae1c440
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 186 additions and 56 deletions

View File

@ -170,6 +170,14 @@ var (
Name: "missed_payload_id_filled_count",
Help: "",
})
onBlockProcessingTime = promauto.NewSummary(prometheus.SummaryOpts{
Name: "on_block_processing_milliseconds",
Help: "Total time in milliseconds to complete a call to onBlock()",
})
stateTransitionProcessingTime = promauto.NewSummary(prometheus.SummaryOpts{
Name: "state_transition_processing_milliseconds",
Help: "Total time to call a state transition in onBlock()",
})
processAttsElapsedTime = promauto.NewHistogram(
prometheus.HistogramOpts{
Name: "process_attestations_milliseconds",

View File

@ -98,6 +98,7 @@ func (s *Service) onBlock(ctx context.Context, signed interfaces.SignedBeaconBlo
if err := consensusblocks.BeaconBlockIsNil(signed); err != nil {
return invalidBlock{error: err}
}
startTime := time.Now()
b := signed.Block()
preState, err := s.getBlockPreState(ctx, b)
@ -115,10 +116,13 @@ func (s *Service) onBlock(ctx context.Context, signed interfaces.SignedBeaconBlo
if err != nil {
return err
}
stateTransitionStartTime := time.Now()
postState, err := transition.ExecuteStateTransition(ctx, preState, signed)
if err != nil {
return invalidBlock{error: err}
}
stateTransitionProcessingTime.Observe(float64(time.Since(stateTransitionStartTime).Milliseconds()))
postStateVersion, postStateHeader, err := getStateVersionAndPayload(postState)
if err != nil {
return err
@ -266,7 +270,11 @@ func (s *Service) onBlock(ctx context.Context, signed interfaces.SignedBeaconBlo
}
defer reportAttestationInclusion(b)
return s.handleEpochBoundary(ctx, postState)
if err := s.handleEpochBoundary(ctx, postState); err != nil {
return err
}
onBlockProcessingTime.Observe(float64(time.Since(startTime).Milliseconds()))
return nil
}
func getStateVersionAndPayload(st state.BeaconState) (int, *enginev1.ExecutionPayloadHeader, error) {

View File

@ -7,6 +7,7 @@ go_library(
"beacon_committee.go",
"block.go",
"genesis.go",
"metrics.go",
"randao.go",
"rewards_penalties.go",
"shuffle.go",

View File

@ -51,10 +51,11 @@ func ValidateSlotTargetEpoch(data *ethpb.AttestationData) error {
// committee count as an argument allows cheaper computation at run time.
//
// Spec pseudocode definition:
// def is_aggregator(state: BeaconState, slot: Slot, index: CommitteeIndex, slot_signature: BLSSignature) -> bool:
// committee = get_beacon_committee(state, slot, index)
// modulo = max(1, len(committee) // TARGET_AGGREGATORS_PER_COMMITTEE)
// return bytes_to_uint64(hash(slot_signature)[0:8]) % modulo == 0
//
// def is_aggregator(state: BeaconState, slot: Slot, index: CommitteeIndex, slot_signature: BLSSignature) -> bool:
// committee = get_beacon_committee(state, slot, index)
// modulo = max(1, len(committee) // TARGET_AGGREGATORS_PER_COMMITTEE)
// return bytes_to_uint64(hash(slot_signature)[0:8]) % modulo == 0
func IsAggregator(committeeCount uint64, slotSig []byte) (bool, error) {
modulo := uint64(1)
if committeeCount/params.BeaconConfig().TargetAggregatorsPerCommittee > 1 {
@ -68,9 +69,10 @@ func IsAggregator(committeeCount uint64, slotSig []byte) (bool, error) {
// AggregateSignature returns the aggregated signature of the input attestations.
//
// Spec pseudocode definition:
// def get_aggregate_signature(attestations: Sequence[Attestation]) -> BLSSignature:
// signatures = [attestation.signature for attestation in attestations]
// return bls.Aggregate(signatures)
//
// def get_aggregate_signature(attestations: Sequence[Attestation]) -> BLSSignature:
// signatures = [attestation.signature for attestation in attestations]
// return bls.Aggregate(signatures)
func AggregateSignature(attestations []*ethpb.Attestation) (bls.Signature, error) {
sigs := make([]bls.Signature, len(attestations))
var err error
@ -95,14 +97,15 @@ func IsAggregated(attestation *ethpb.Attestation) bool {
//
// Spec pseudocode definition:
// def compute_subnet_for_attestation(committees_per_slot: uint64, slot: Slot, committee_index: CommitteeIndex) -> uint64:
// """
// Compute the correct subnet for an attestation for Phase 0.
// Note, this mimics expected future behavior where attestations will be mapped to their shard subnet.
// """
// slots_since_epoch_start = uint64(slot % SLOTS_PER_EPOCH)
// committees_since_epoch_start = committees_per_slot * slots_since_epoch_start
//
// return uint64((committees_since_epoch_start + committee_index) % ATTESTATION_SUBNET_COUNT)
// """
// Compute the correct subnet for an attestation for Phase 0.
// Note, this mimics expected future behavior where attestations will be mapped to their shard subnet.
// """
// slots_since_epoch_start = uint64(slot % SLOTS_PER_EPOCH)
// committees_since_epoch_start = committees_per_slot * slots_since_epoch_start
//
// return uint64((committees_since_epoch_start + committee_index) % ATTESTATION_SUBNET_COUNT)
func ComputeSubnetForAttestation(activeValCount uint64, att *ethpb.Attestation) uint64 {
return ComputeSubnetFromCommitteeAndSlot(activeValCount, att.Data.CommitteeIndex, att.Data.Slot)
}
@ -112,14 +115,15 @@ func ComputeSubnetForAttestation(activeValCount uint64, att *ethpb.Attestation)
//
// Spec pseudocode definition:
// def compute_subnet_for_attestation(committees_per_slot: uint64, slot: Slot, committee_index: CommitteeIndex) -> uint64:
// """
// Compute the correct subnet for an attestation for Phase 0.
// Note, this mimics expected future behavior where attestations will be mapped to their shard subnet.
// """
// slots_since_epoch_start = uint64(slot % SLOTS_PER_EPOCH)
// committees_since_epoch_start = committees_per_slot * slots_since_epoch_start
//
// return uint64((committees_since_epoch_start + committee_index) % ATTESTATION_SUBNET_COUNT)
// """
// Compute the correct subnet for an attestation for Phase 0.
// Note, this mimics expected future behavior where attestations will be mapped to their shard subnet.
// """
// slots_since_epoch_start = uint64(slot % SLOTS_PER_EPOCH)
// committees_since_epoch_start = committees_per_slot * slots_since_epoch_start
//
// return uint64((committees_since_epoch_start + committee_index) % ATTESTATION_SUBNET_COUNT)
func ComputeSubnetFromCommitteeAndSlot(activeValCount uint64, comIdx types.CommitteeIndex, attSlot types.Slot) uint64 {
slotSinceStart := slots.SinceEpochStarts(attSlot)
comCount := SlotCommitteeCount(activeValCount)
@ -133,13 +137,15 @@ func ComputeSubnetFromCommitteeAndSlot(activeValCount uint64, comIdx types.Commi
// slots.
//
// Example:
// ATTESTATION_PROPAGATION_SLOT_RANGE = 5
// clockDisparity = 24 seconds
// current_slot = 100
// invalid_attestation_slot = 92
// invalid_attestation_slot = 103
// valid_attestation_slot = 98
// valid_attestation_slot = 101
//
// ATTESTATION_PROPAGATION_SLOT_RANGE = 5
// clockDisparity = 24 seconds
// current_slot = 100
// invalid_attestation_slot = 92
// invalid_attestation_slot = 103
// valid_attestation_slot = 98
// valid_attestation_slot = 101
//
// In the attestation must be within the range of 95 to 102 in the example above.
func ValidateAttestationTime(attSlot types.Slot, genesisTime time.Time, clockDisparity time.Duration) error {
if err := slots.ValidateClock(attSlot, uint64(genesisTime.Unix())); err != nil {
@ -170,13 +176,19 @@ func ValidateAttestationTime(attSlot types.Slot, genesisTime time.Time, clockDis
lowerBounds := lowerTime.Add(-clockDisparity)
// Verify attestation slot within the time range.
if attTime.Before(lowerBounds) || attTime.After(upperBounds) {
return fmt.Errorf(
"attestation slot %d not within attestation propagation range of %d to %d (current slot)",
attSlot,
lowerBoundsSlot,
currentSlot,
)
attError := fmt.Errorf(
"attestation slot %d not within attestation propagation range of %d to %d (current slot)",
attSlot,
lowerBoundsSlot,
currentSlot,
)
if attTime.Before(lowerBounds) {
attReceivedTooEarlyCount.Inc()
return attError
}
if attTime.After(upperBounds) {
attReceivedTooLateCount.Inc()
return attError
}
return nil
}

View File

@ -0,0 +1,17 @@
package helpers
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
attReceivedTooEarlyCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "attestation_too_early_total",
Help: "Increased when an attestation is considered too early",
})
attReceivedTooLateCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "attestation_too_late_total",
Help: "Increased when an attestation is considered too late",
})
)

View File

@ -57,6 +57,7 @@ go_library(
"//monitoring/tracing:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//runtime/version:go_default_library",
"//time:go_default_library",
"//time/slots:go_default_library",
"@com_github_dgraph_io_ristretto//:go_default_library",
"@com_github_ethereum_go_ethereum//common:go_default_library",

View File

@ -55,6 +55,14 @@ var (
Name: "validator_entry_cache_delete_total",
Help: "The total number of cache deletes on the validator entry cache.",
})
stateReadingTime = promauto.NewSummary(prometheus.SummaryOpts{
Name: "db_beacon_state_reading_milliseconds",
Help: "Milliseconds it takes to read a beacon state from the DB",
})
stateSavingTime = promauto.NewSummary(prometheus.SummaryOpts{
Name: "db_beacon_state_saving_milliseconds",
Help: "Milliseconds it takes to save a beacon state to the DB",
})
)
// BlockCacheSize specifies 1000 slots worth of blocks cached, which

View File

@ -20,6 +20,7 @@ import (
"github.com/prysmaticlabs/prysm/v3/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v3/monitoring/tracing"
ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v3/time"
"github.com/prysmaticlabs/prysm/v3/time/slots"
bolt "go.etcd.io/bbolt"
"go.opencensus.io/trace"
@ -30,6 +31,7 @@ import (
func (s *Store) State(ctx context.Context, blockRoot [32]byte) (state.BeaconState, error) {
ctx, span := trace.StartSpan(ctx, "BeaconDB.State")
defer span.End()
startTime := time.Now()
enc, err := s.stateBytes(ctx, blockRoot)
if err != nil {
return nil, err
@ -44,7 +46,12 @@ func (s *Store) State(ctx context.Context, blockRoot [32]byte) (state.BeaconStat
return nil, valErr
}
return s.unmarshalState(ctx, enc, valEntries)
st, err := s.unmarshalState(ctx, enc, valEntries)
if err != nil {
return nil, err
}
stateReadingTime.Observe(float64(time.Since(startTime).Milliseconds()))
return st, err
}
// StateOrError is just like State(), except it only returns a non-error response
@ -127,6 +134,7 @@ func (s *Store) SaveStates(ctx context.Context, states []state.ReadOnlyBeaconSta
if states == nil {
return errors.New("nil state")
}
startTime := time.Now()
multipleEncs := make([][]byte, len(states))
for i, st := range states {
stateBytes, err := marshalState(ctx, st)
@ -136,7 +144,7 @@ func (s *Store) SaveStates(ctx context.Context, states []state.ReadOnlyBeaconSta
multipleEncs[i] = stateBytes
}
return s.db.Update(func(tx *bolt.Tx) error {
if err := s.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(stateBucket)
for i, rt := range blockRoots {
indicesByBucket := createStateIndicesFromStateSlot(ctx, states[i].Slot())
@ -148,7 +156,11 @@ func (s *Store) SaveStates(ctx context.Context, states []state.ReadOnlyBeaconSta
}
}
return nil
})
}); err != nil {
return err
}
stateSavingTime.Observe(float64(time.Since(startTime).Milliseconds()))
return nil
}
type withValidators interface {
@ -760,8 +772,10 @@ func createStateIndicesFromStateSlot(ctx context.Context, slot types.Slot) map[s
// Only following states would be kept:
// 1.) state_slot % archived_interval == 0. (e.g. archived_interval=2048, states with slot 2048, 4096... etc)
// 2.) archived_interval - archived_interval/3 < state_slot % archived_interval
// (e.g. archived_interval=2048, states with slots after 1365).
// This is to tolerate skip slots. Not every state lays on the boundary.
//
// (e.g. archived_interval=2048, states with slots after 1365).
// This is to tolerate skip slots. Not every state lays on the boundary.
//
// 3.) state with current finalized root
// 4.) unfinalized States
func (s *Store) CleanUpDirtyStates(ctx context.Context, slotsPerArchivedPoint types.Slot) error {

View File

@ -13,4 +13,16 @@ var (
Buckets: []float64{64, 256, 1024, 2048, 4096},
},
)
replayBlocksSummary = promauto.NewSummary(
prometheus.SummaryOpts{
Name: "replay_blocks_milliseconds",
Help: "Time it took to replay blocks",
},
)
replayToSlotSummary = promauto.NewSummary(
prometheus.SummaryOpts{
Name: "replay_to_slot_milliseconds",
Help: "Time it took to replay to slot",
},
)
)

View File

@ -119,6 +119,7 @@ func (rs *stateReplayer) ReplayBlocks(ctx context.Context) (state.BeaconState, e
log.WithFields(logrus.Fields{
"duration": duration,
}).Debug("Finished calling process_blocks on all blocks in ReplayBlocks")
replayBlocksSummary.Observe(float64(duration.Milliseconds()))
return s, nil
}
@ -150,14 +151,14 @@ func (rs *stateReplayer) ReplayToSlot(ctx context.Context, replayTo types.Slot)
// err will be handled after the bookend log
s, err = ReplayProcessSlots(ctx, s, replayTo)
if err != nil {
return nil, errors.Wrap(err, fmt.Sprintf("ReplayToSlot failed to seek to slot %d after applying blocks", replayTo))
}
duration := time.Since(start)
log.WithFields(logrus.Fields{
"duration": duration,
}).Debug("time spent in process_slots")
if err != nil {
return nil, errors.Wrap(err, fmt.Sprintf("ReplayToSlot failed to seek to slot %d after applying blocks", replayTo))
}
replayToSlotSummary.Observe(float64(duration.Milliseconds()))
return s, nil
}

View File

@ -89,6 +89,38 @@ var (
Buckets: []float64{250, 500, 1000, 1500, 2000, 4000, 8000, 16000},
},
)
// Attestation processing granular error tracking.
attBadBlockCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "gossip_attestation_bad_block_total",
Help: "Increased when a gossip attestation references a bad block",
})
attBadLmdConsistencyCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "gossip_attestation_bad_lmd_consistency_total",
Help: "Increased when a gossip attestation has bad LMD GHOST consistency",
})
attBadSelectionProofCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "gossip_attestation_bad_selection_proof_total",
Help: "Increased when a gossip attestation has a bad selection proof",
})
attBadSignatureBatchCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "gossip_attestation_bad_signature_batch_total",
Help: "Increased when a gossip attestation has a bad signature batch",
})
// Attestation and block gossip verification performance.
aggregateAttestationVerificationGossipSummary = promauto.NewSummary(
prometheus.SummaryOpts{
Name: "gossip_aggregate_attestation_verification_milliseconds",
Help: "Time to verify gossiped attestations",
},
)
blockVerificationGossipSummary = promauto.NewSummary(
prometheus.SummaryOpts{
Name: "gossip_block_verification_milliseconds",
Help: "Time to verify gossiped blocks",
},
)
)
func (s *Service) updateMetrics() {

View File

@ -20,6 +20,7 @@ import (
"github.com/prysmaticlabs/prysm/v3/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v3/monitoring/tracing"
ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
prysmTime "github.com/prysmaticlabs/prysm/v3/time"
"github.com/prysmaticlabs/prysm/v3/time/slots"
"go.opencensus.io/trace"
)
@ -27,6 +28,7 @@ import (
// validateAggregateAndProof verifies the aggregated signature and the selection proof is valid before forwarding to the
// network and downstream services.
func (s *Service) validateAggregateAndProof(ctx context.Context, pid peer.ID, msg *pubsub.Message) (pubsub.ValidationResult, error) {
receivedTime := prysmTime.Now()
if pid == s.cfg.p2p.PeerID() {
return pubsub.ValidationAccept, nil
}
@ -75,8 +77,11 @@ func (s *Service) validateAggregateAndProof(ctx context.Context, pid peer.ID, ms
// Attestation's slot is within ATTESTATION_PROPAGATION_SLOT_RANGE and early attestation
// processing tolerance.
if err := helpers.ValidateAttestationTime(m.Message.Aggregate.Data.Slot, s.cfg.chain.GenesisTime(),
earlyAttestationProcessingTolerance); err != nil {
if err := helpers.ValidateAttestationTime(
m.Message.Aggregate.Data.Slot,
s.cfg.chain.GenesisTime(),
earlyAttestationProcessingTolerance,
); err != nil {
tracing.AnnotateError(span, err)
return pubsub.ValidationIgnore, err
}
@ -89,6 +94,7 @@ func (s *Service) validateAggregateAndProof(ctx context.Context, pid peer.ID, ms
if s.hasBadBlock(bytesutil.ToBytes32(m.Message.Aggregate.Data.BeaconBlockRoot)) ||
s.hasBadBlock(bytesutil.ToBytes32(m.Message.Aggregate.Data.Target.Root)) ||
s.hasBadBlock(bytesutil.ToBytes32(m.Message.Aggregate.Data.Source.Root)) {
attBadBlockCount.Inc()
return pubsub.ValidationReject, errors.New("bad block referenced in attestation data")
}
@ -114,6 +120,8 @@ func (s *Service) validateAggregateAndProof(ctx context.Context, pid peer.ID, ms
msg.ValidatorData = m
aggregateAttestationVerificationGossipSummary.Observe(float64(prysmTime.Since(receivedTime).Milliseconds()))
return pubsub.ValidationAccept, nil
}
@ -127,6 +135,7 @@ func (s *Service) validateAggregatedAtt(ctx context.Context, signed *ethpb.Signe
// but it's invalid in the spirit of the protocol. Here we choose safety over profit.
if err := s.cfg.chain.VerifyLmdFfgConsistency(ctx, signed.Message.Aggregate); err != nil {
tracing.AnnotateError(span, err)
attBadLmdConsistencyCount.Inc()
return pubsub.ValidationReject, err
}
@ -168,6 +177,7 @@ func (s *Service) validateAggregatedAtt(ctx context.Context, signed *ethpb.Signe
if err != nil {
wrappedErr := errors.Wrapf(err, "Could not validate selection for validator %d", signed.Message.AggregatorIndex)
tracing.AnnotateError(span, wrappedErr)
attBadSelectionProofCount.Inc()
return pubsub.ValidationReject, wrappedErr
}

View File

@ -124,6 +124,7 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p
if s.hasBadBlock(bytesutil.ToBytes32(att.Data.BeaconBlockRoot)) ||
s.hasBadBlock(bytesutil.ToBytes32(att.Data.Target.Root)) ||
s.hasBadBlock(bytesutil.ToBytes32(att.Data.Source.Root)) {
attBadBlockCount.Inc()
return pubsub.ValidationReject, errors.New("attestation data references bad block root")
}
@ -141,6 +142,7 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p
}
if err := s.cfg.chain.VerifyLmdFfgConsistency(ctx, att); err != nil {
tracing.AnnotateError(span, err)
attBadLmdConsistencyCount.Inc()
return pubsub.ValidationReject, err
}
@ -222,6 +224,7 @@ func (s *Service) validateUnaggregatedAttWithState(ctx context.Context, a *eth.A
set, err := blocks.AttestationSignatureBatch(ctx, bs, []*eth.Attestation{a})
if err != nil {
tracing.AnnotateError(span, err)
attBadSignatureBatchCount.Inc()
return pubsub.ValidationReject, err
}
return s.validateWithBatchVerifier(ctx, "attestation", set)

View File

@ -204,6 +204,8 @@ func (s *Service) validateBeaconBlockPubSub(ctx context.Context, pid peer.ID, ms
"proposerIndex": blk.Block().ProposerIndex(),
"graffiti": string(blk.Block().Body().Graffiti()),
}).Debug("Received block")
blockVerificationGossipSummary.Observe(float64(prysmTime.Since(receivedTime).Milliseconds()))
return pubsub.ValidationAccept, nil
}
@ -253,16 +255,17 @@ func (s *Service) validateBeaconBlock(ctx context.Context, blk interfaces.Signed
// validateBellatrixBeaconBlock validates the block for the Bellatrix fork.
// spec code:
// If the execution is enabled for the block -- i.e. is_execution_enabled(state, block.body) then validate the following:
// [REJECT] The block's execution payload timestamp is correct with respect to the slot --
// i.e. execution_payload.timestamp == compute_timestamp_at_slot(state, block.slot).
//
// If exection_payload verification of block's parent by an execution node is not complete:
// [REJECT] The block's parent (defined by block.parent_root) passes all validation (excluding execution
// node verification of the block.body.execution_payload).
// otherwise:
// [IGNORE] The block's parent (defined by block.parent_root) passes all validation (including execution
// node verification of the block.body.execution_payload).
// If the execution is enabled for the block -- i.e. is_execution_enabled(state, block.body) then validate the following:
// [REJECT] The block's execution payload timestamp is correct with respect to the slot --
// i.e. execution_payload.timestamp == compute_timestamp_at_slot(state, block.slot).
//
// If exection_payload verification of block's parent by an execution node is not complete:
// [REJECT] The block's parent (defined by block.parent_root) passes all validation (excluding execution
// node verification of the block.body.execution_payload).
// otherwise:
// [IGNORE] The block's parent (defined by block.parent_root) passes all validation (including execution
// node verification of the block.body.execution_payload).
func (s *Service) validateBellatrixBeaconBlock(ctx context.Context, parentState state.BeaconState, blk interfaces.BeaconBlock) error {
// Error if block and state are not the same version
if parentState.Version() != blk.Version() {