[Caplin] beginnings of instrumentation (#7486)

this pr is ready for review, but it is waiting on this PR 

https://github.com/VictoriaMetrics/metrics/pull/45

so that we do not need to use a replace directive.
This commit is contained in:
a 2023-05-11 11:38:56 -05:00 committed by GitHub
parent 404e395bb4
commit fd6acd4b31
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 197 additions and 40 deletions

View File

@ -52,7 +52,6 @@ func (m *merkleHasher) merkleizeTrieLeaves(leaves [][32]byte) ([32]byte, error)
func (m *merkleHasher) merkleizeTrieLeavesFlat(leaves []byte, out []byte) (err error) {
m.mu.Lock()
defer m.mu.Unlock()
layer := m.getBufferFromFlat(leaves)
for len(layer) > 1 {
if err := gohashtree.Hash(layer, layer); err != nil {

View File

@ -4,12 +4,12 @@ import (
"crypto/sha256"
"encoding/binary"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/ledgerwatch/erigon-lib/common"
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon/cl/clparams"
"github.com/ledgerwatch/erigon/cl/cltypes"
"github.com/ledgerwatch/erigon/cl/utils"
"github.com/ledgerwatch/erigon/cmd/erigon-cl/core/state/lru"
"github.com/ledgerwatch/erigon/cmd/erigon-cl/core/state/raw"
"github.com/ledgerwatch/erigon/cmd/erigon-cl/core/state/shuffling"
)
@ -194,13 +194,13 @@ func (b *BeaconState) _refreshActiveBalances() {
func (b *BeaconState) initCaches() error {
var err error
if b.activeValidatorsCache, err = lru.New[uint64, []uint64](5); err != nil {
if b.activeValidatorsCache, err = lru.New[uint64, []uint64]("beacon_active_validators_cache", 5); err != nil {
return err
}
if b.shuffledSetsCache, err = lru.New[common.Hash, []uint64](5); err != nil {
if b.shuffledSetsCache, err = lru.New[common.Hash, []uint64]("beacon_shuffled_sets_cache", 5); err != nil {
return err
}
if b.committeeCache, err = lru.New[[16]byte, []uint64](256); err != nil {
if b.committeeCache, err = lru.New[[16]byte, []uint64]("beacon_committee_cache", 256); err != nil {
return err
}
return nil

View File

@ -159,6 +159,7 @@ func (b *BeaconState) GetBeaconCommitee(slot, committeeIndex uint64) ([]uint64,
var cacheKey [16]byte
binary.BigEndian.PutUint64(cacheKey[:], slot)
binary.BigEndian.PutUint64(cacheKey[8:], committeeIndex)
if cachedCommittee, ok := b.committeeCache.Get(cacheKey); ok {
return cachedCommittee, nil
}

View File

@ -0,0 +1,33 @@
package lru
import (
"fmt"
"github.com/VictoriaMetrics/metrics"
lru "github.com/hashicorp/golang-lru/v2"
)
// Cache is a wrapper around hashicorp lru but with metric for Get
type Cache[K comparable, V any] struct {
*lru.Cache[K, V]
metricName string
}
func New[K comparable, V any](metricName string, size int) (*Cache[K, V], error) {
v, err := lru.NewWithEvict[K, V](size, nil)
if err != nil {
return nil, err
}
return &Cache[K, V]{Cache: v, metricName: metricName}, nil
}
func (c *Cache[K, V]) Get(k K) (V, bool) {
v, ok := c.Cache.Get(k)
if ok {
metrics.GetOrCreateCounter(fmt.Sprintf(`golang_lru_cache_hit{%s="%s"}`, "cache", c.metricName)).Inc()
} else {
metrics.GetOrCreateCounter(fmt.Sprintf(`golang_lru_cache_miss{%s="%s"}`, "cache", c.metricName)).Inc()
}
return v, ok
}

View File

@ -1,23 +1,38 @@
package state
import (
"github.com/ledgerwatch/erigon/metrics/methelp"
"github.com/ledgerwatch/erigon-lib/types/clonable"
)
func (b *BeaconState) EncodeSSZ(buf []byte) ([]byte, error) {
return b.BeaconState.EncodeSSZ(buf)
h := methelp.NewHistTimer("encode_ssz_beacon_state_dur")
bts, err := b.BeaconState.EncodeSSZ(buf)
if err != nil {
return nil, err
}
h.PutSince()
sz := methelp.NewHistTimer("encode_ssz_beacon_state_size")
sz.Update(float64(len(bts)))
return bts, err
}
func (b *BeaconState) DecodeSSZ(buf []byte, version int) error {
h := methelp.NewHistTimer("decode_ssz_beacon_state_dur")
if err := b.BeaconState.DecodeSSZ(buf, version); err != nil {
return err
}
sz := methelp.NewHistTimer("decode_ssz_beacon_state_size")
sz.Update(float64(len(buf)))
h.PutSince()
return b.initBeaconState()
}
// SSZ size of the Beacon State
func (b *BeaconState) EncodingSizeSSZ() (size int) {
return b.BeaconState.EncodingSizeSSZ()
sz := b.BeaconState.EncodingSizeSSZ()
return sz
}
func (b *BeaconState) Clone() clonable.Clonable {

View File

@ -3,16 +3,13 @@ package state
import (
"sort"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/ledgerwatch/erigon/cl/clparams"
"github.com/ledgerwatch/erigon/cl/cltypes"
"github.com/ledgerwatch/erigon/cl/utils"
"github.com/ledgerwatch/erigon/cmd/erigon-cl/core/state/lru"
)
func copyLRU[K comparable, V any](dst *lru.Cache[K, V], src *lru.Cache[K, V]) *lru.Cache[K, V] {
if dst == nil {
dst = new(lru.Cache[K, V])
}
dst.Purge()
for _, key := range src.Keys() {
val, has := src.Get(key)

View File

@ -8,6 +8,7 @@ import (
"github.com/ledgerwatch/erigon/cl/clparams"
"github.com/ledgerwatch/erigon/cl/cltypes"
"github.com/ledgerwatch/erigon/cmd/erigon-cl/core/state"
"github.com/ledgerwatch/erigon/metrics/methelp"
)
// processBlock takes a block and transitions the state to the next slot, using the provided execution payload if enabled.
@ -19,48 +20,65 @@ func processBlock(state *state.BeaconState, signedBlock *cltypes.SignedBeaconBlo
return fmt.Errorf("processBlock: wrong state version for block at slot %d", block.Slot)
}
h := methelp.NewHistTimer("beacon_process_block")
c := h.Tag("process_step", "block_header")
// Process the block header.
if err := ProcessBlockHeader(state, block, fullValidation); err != nil {
return fmt.Errorf("processBlock: failed to process block header: %v", err)
}
c.PutSince()
// Process execution payload if enabled.
if version >= clparams.BellatrixVersion && executionEnabled(state, block.Body.ExecutionPayload) {
if state.Version() >= clparams.CapellaVersion {
// Process withdrawals in the execution payload.
c = h.Tag("process_step", "withdrawals")
if err := ProcessWithdrawals(state, block.Body.ExecutionPayload.Withdrawals, fullValidation); err != nil {
return fmt.Errorf("processBlock: failed to process withdrawals: %v", err)
}
c.PutSince()
}
// Process the execution payload.
c = h.Tag("process_step", "execution_payload")
if err := ProcessExecutionPayload(state, block.Body.ExecutionPayload); err != nil {
return fmt.Errorf("processBlock: failed to process execution payload: %v", err)
}
c.PutSince()
}
// Process RANDAO reveal.
c = h.Tag("process_step", "randao_reveal")
if err := ProcessRandao(state, block.Body.RandaoReveal, block.ProposerIndex, fullValidation); err != nil {
return fmt.Errorf("processBlock: failed to process RANDAO reveal: %v", err)
}
c.PutSince()
// Process Eth1 data.
c = h.Tag("process_step", "eth1_data")
if err := ProcessEth1Data(state, block.Body.Eth1Data); err != nil {
return fmt.Errorf("processBlock: failed to process Eth1 data: %v", err)
}
c.PutSince()
// Process block body operations.
c = h.Tag("process_step", "operations")
if err := processOperations(state, block.Body, fullValidation); err != nil {
return fmt.Errorf("processBlock: failed to process block body operations: %v", err)
}
c.PutSince()
// Process sync aggregate in case of Altair version.
if version >= clparams.AltairVersion {
c = h.Tag("process_step", "sync_aggregate")
if err := ProcessSyncAggregate(state, block.Body.SyncAggregate, fullValidation); err != nil {
return fmt.Errorf("processBlock: failed to process sync aggregate: %v", err)
}
c.PutSince()
}
h.PutSince()
return nil
}
@ -68,41 +86,61 @@ func processOperations(state *state.BeaconState, blockBody *cltypes.BeaconBody,
if len(blockBody.Deposits) != int(maximumDeposits(state)) {
return errors.New("outstanding deposits do not match maximum deposits")
}
h := methelp.NewHistTimer("beacon_process_block_operations")
// Process each proposer slashing
c := h.Tag("operation", "proposer_slashings")
for _, slashing := range blockBody.ProposerSlashings {
if err := ProcessProposerSlashing(state, slashing); err != nil {
return fmt.Errorf("ProcessProposerSlashing: %s", err)
}
}
c.PutSince()
// Process each attester slashing
c = h.Tag("operation", "attester_slashings")
for _, slashing := range blockBody.AttesterSlashings {
if err := ProcessAttesterSlashing(state, slashing); err != nil {
return fmt.Errorf("ProcessAttesterSlashing: %s", err)
}
}
c.PutSince()
// Process each attestations
c = h.Tag("operation", "attestations", "validation", "false")
if fullValidation {
c = h.Tag("operation", "attestations", "validation", "true")
}
if err := ProcessAttestations(state, blockBody.Attestations, fullValidation); err != nil {
return fmt.Errorf("ProcessAttestation: %s", err)
}
c.PutSince()
// Process each deposit
c = h.Tag("operation", "deposit")
for _, dep := range blockBody.Deposits {
if err := ProcessDeposit(state, dep, fullValidation); err != nil {
return fmt.Errorf("ProcessDeposit: %s", err)
}
}
c.PutSince()
// Process each voluntary exit.
c = h.Tag("operation", "voluntary_exit")
for _, exit := range blockBody.VoluntaryExits {
if err := ProcessVoluntaryExit(state, exit, fullValidation); err != nil {
return fmt.Errorf("ProcessVoluntaryExit: %s", err)
}
}
c.PutSince()
// Process each execution change. this will only have entries after the capella fork.
c = h.Tag("operation", "execution_change")
for _, addressChange := range blockBody.ExecutionChanges {
if err := ProcessBlsToExecutionChange(state, addressChange, fullValidation); err != nil {
return err
}
}
c.PutSince()
return nil
}

View File

@ -8,21 +8,25 @@ import (
"github.com/ledgerwatch/erigon/cl/cltypes"
"github.com/ledgerwatch/erigon/cl/utils"
"github.com/ledgerwatch/erigon/cmd/erigon-cl/core/state"
"github.com/ledgerwatch/erigon/metrics/methelp"
"golang.org/x/exp/slices"
)
func ProcessAttestations(s *state.BeaconState, attestations []*cltypes.Attestation, fullValidation bool) error {
var err error
attestingIndiciesSet := make([][]uint64, len(attestations))
h := methelp.NewHistTimer("beacon_process_attestations")
baseRewardPerIncrement := s.BaseRewardPerIncrement()
c := h.Tag("attestation_step", "process")
for i, attestation := range attestations {
if attestingIndiciesSet[i], err = processAttestation(s, attestation, baseRewardPerIncrement); err != nil {
return err
}
}
c.PutSince()
if fullValidation {
c = h.Tag("attestation_step", "validate")
valid, err := verifyAttestations(s, attestations, attestingIndiciesSet)
if err != nil {
return err
@ -30,6 +34,7 @@ func ProcessAttestations(s *state.BeaconState, attestations []*cltypes.Attestati
if !valid {
return errors.New("ProcessAttestation: wrong bls data")
}
c.PutSince()
}
return nil
@ -41,19 +46,27 @@ func processAttestationPostAltair(s *state.BeaconState, attestation *cltypes.Att
stateSlot := s.Slot()
beaconConfig := s.BeaconConfig()
h := methelp.NewHistTimer("beacon_process_attestation_post_altair")
c := h.Tag("step", "get_participation_flag")
participationFlagsIndicies, err := s.GetAttestationParticipationFlagIndicies(attestation.Data, stateSlot-data.Slot)
if err != nil {
return nil, err
}
c.PutSince()
c = h.Tag("step", "get_attesting_indices")
attestingIndicies, err := s.GetAttestingIndicies(attestation.Data, attestation.AggregationBits, true)
if err != nil {
return nil, err
}
c.PutSince()
var proposerRewardNumerator uint64
isCurrentEpoch := data.Target.Epoch == currentEpoch
c = h.Tag("step", "update_attestation")
for _, attesterIndex := range attestingIndicies {
val, err := s.ValidatorEffectiveBalance(int(attesterIndex))
if err != nil {
@ -70,11 +83,14 @@ func processAttestationPostAltair(s *state.BeaconState, attestation *cltypes.Att
proposerRewardNumerator += baseReward * weight
}
}
c.PutSince()
// Reward proposer
c = h.Tag("step", "get_proposer_index")
proposer, err := s.GetBeaconProposerIndex()
if err != nil {
return nil, err
}
c.PutSince()
proposerRewardDenominator := (beaconConfig.WeightDenominator - beaconConfig.ProposerWeight) * beaconConfig.WeightDenominator / beaconConfig.ProposerWeight
reward := proposerRewardNumerator / proposerRewardDenominator
return attestingIndicies, state.IncreaseBalance(s.BeaconState, proposer, reward)

View File

@ -20,7 +20,6 @@ import (
"math"
"net"
"net/http"
"time"
"github.com/ledgerwatch/erigon-lib/kv"
@ -40,8 +39,6 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
rcmgrObs "github.com/libp2p/go-libp2p/p2p/host/resource-manager/obs"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
const (
@ -236,10 +233,10 @@ func New(
db kv.RoDB,
) (*Sentinel, error) {
s := &Sentinel{
ctx: ctx,
cfg: cfg,
db: db,
// metrics: true,
ctx: ctx,
cfg: cfg,
db: db,
metrics: true,
}
// Setup discovery
@ -265,18 +262,6 @@ func New(
return nil, err
}
if s.metrics {
http.Handle("/metrics", promhttp.Handler())
go func() {
server := &http.Server{
Addr: ":2112",
ReadHeaderTimeout: time.Hour,
}
if err := server.ListenAndServe(); err != nil {
panic(err)
}
}()
rcmgrObs.MustRegisterWith(prometheus.DefaultRegisterer)
str, err := rcmgrObs.NewStatsTraceReporter()
if err != nil {

View File

@ -11,6 +11,8 @@ import (
"github.com/ledgerwatch/erigon/cl/cltypes"
"github.com/ledgerwatch/erigon/cmd/sentinel/sentinel"
"github.com/ledgerwatch/log/v3"
rcmgrObs "github.com/libp2p/go-libp2p/p2p/host/resource-manager/obs"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
@ -61,11 +63,11 @@ func createSentinel(cfg *sentinel.SentinelConfig, db kv.RoDB) (*sentinel.Sentine
func StartSentinelService(cfg *sentinel.SentinelConfig, db kv.RoDB, srvCfg *ServerConfig, creds credentials.TransportCredentials, initialStatus *cltypes.Status) (sentinelrpc.SentinelClient, error) {
ctx := context.Background()
sent, err := createSentinel(cfg, db)
if err != nil {
return nil, err
}
rcmgrObs.MustRegisterWith(prometheus.DefaultRegisterer)
log.Info("[Sentinel] Sentinel started", "enr", sent.String())
if initialStatus != nil {
sent.SetStatus(initialStatus)
@ -89,7 +91,11 @@ WaitingLoop:
}
}
conn, err := grpc.DialContext(ctx, srvCfg.Addr, grpc.WithTransportCredentials(creds), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMessageSize)))
conn, err := grpc.DialContext(ctx,
srvCfg.Addr,
grpc.WithTransportCredentials(creds),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMessageSize)),
)
if err != nil {
return nil, err
}

2
go.mod
View File

@ -271,3 +271,5 @@ require (
)
replace github.com/tendermint/tendermint => github.com/bnb-chain/tendermint v0.31.12
replace github.com/VictoriaMetrics/metrics => github.com/greyireland/metrics v0.0.5

4
go.sum
View File

@ -29,8 +29,6 @@ github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWX
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
github.com/VictoriaMetrics/fastcache v1.12.1 h1:i0mICQuojGDL3KblA7wUNlY5lOK6a4bwt3uRKnkZU40=
github.com/VictoriaMetrics/fastcache v1.12.1/go.mod h1:tX04vaqcNoQeGLD+ra5pU5sWkuxnzWhEzLwhP9w653o=
github.com/VictoriaMetrics/metrics v1.23.1 h1:/j8DzeJBxSpL2qSIdqnRFLvQQhbJyJbbEi22yMm7oL0=
github.com/VictoriaMetrics/metrics v1.23.1/go.mod h1:rAr/llLpEnAdTehiNlUxKgnjcOuROSzpw0GvjpEbvFc=
github.com/agnivade/levenshtein v1.0.1/go.mod h1:CURSv5d9Uaml+FovSIICkLbAUZ9S4RqaHDIsdSBg7lM=
github.com/agnivade/levenshtein v1.1.1 h1:QY8M92nrzkmr798gCo3kmMyqXFzdQVpxLlGPRBij0P8=
github.com/agnivade/levenshtein v1.1.1/go.mod h1:veldBMzWxcCG2ZvUTKD2kJNRdCk5hVbJomOvKkmgYbo=
@ -358,6 +356,8 @@ github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/ad
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
github.com/greyireland/metrics v0.0.5 h1:FgHLl8lF4D0i77NlgJM7txwdwGStSH5x/thxv2o0IPA=
github.com/greyireland/metrics v0.0.5/go.mod h1:rAr/llLpEnAdTehiNlUxKgnjcOuROSzpw0GvjpEbvFc=
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 h1:UH//fgunKIs4JdUbpDl1VZCDaL56wXCB/5+wF6uHfaI=
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0/go.mod h1:g5qyo/la0ALbONm6Vbp88Yd8NsDy6rZz+RcrMPxvld8=
github.com/grpc-ecosystem/grpc-gateway v1.5.0/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpgL2+G+NZTnrVHpWWfpdw=

View File

@ -8,6 +8,8 @@ import (
metrics2 "github.com/VictoriaMetrics/metrics"
"github.com/ledgerwatch/log/v3"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/expfmt"
)
// Setup starts a dedicated metrics server at the given address.
@ -15,13 +17,22 @@ import (
func Setup(address string) {
http.HandleFunc("/debug/metrics/prometheus", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
metrics2.WritePrometheus(w, true)
metrics2.WritePrometheus(w, false)
contentType := expfmt.Negotiate(r.Header)
enc := expfmt.NewEncoder(w, contentType)
mf, err := prometheus.DefaultGatherer.Gather()
if err != nil {
return
}
for _, m := range mf {
enc.Encode(m)
}
})
//m.Handle("/debug/metrics", ExpHandler(metrics.DefaultRegistry))
//m.Handle("/debug/metrics/prometheus2", promhttp.HandlerFor(prometheus2.DefaultGatherer, promhttp.HandlerOpts{
// EnableOpenMetrics: true,
//}))
log.Info("Starting metrics server", "addr", fmt.Sprintf("http://%s/debug/metrics/prometheus", address))
//http.Handle("/debug/metrics/prometheus2", promhttp.HandlerFor(prometheus.DefaultGatherer, promhttp.HandlerOpts{}))
log.Info("Starting metrics server", "addr",
fmt.Sprintf("http://%s/debug/metrics/prometheus", address),
)
go func() {
if err := http.ListenAndServe(address, nil); err != nil { // nolint:gosec
log.Error("Failure in running metrics server", "err", err)

54
metrics/methelp/timer.go Normal file
View File

@ -0,0 +1,54 @@
package methelp
import (
"fmt"
"strings"
"time"
"github.com/VictoriaMetrics/metrics"
)
type HistTimer struct {
*metrics.Histogram
start time.Time
name string
}
func NewHistTimer(name string) *HistTimer {
rawName := strings.Split(name, "{")
return &HistTimer{
Histogram: metrics.GetOrCreateCompatibleHistogram(name),
start: time.Now(),
name: rawName[0],
}
}
func (h *HistTimer) PutSince() {
h.Histogram.UpdateDuration(h.start)
}
func (h *HistTimer) Tag(pairs ...string) *HistTimer {
if len(pairs)%2 != 0 {
pairs = append(pairs, "UNEQUAL_KEY_VALUE_TAGS")
}
toJoin := []string{}
for i := 0; i < len(pairs); i = i + 2 {
toJoin = append(toJoin, fmt.Sprintf(`%s="%s"`, pairs[i], pairs[i+1]))
}
tags := ""
if len(toJoin) > 0 {
tags = "{" + strings.Join(toJoin, ",") + "}"
}
return &HistTimer{
Histogram: metrics.GetOrCreateCompatibleHistogram(h.name + tags),
start: time.Now(),
name: h.name,
}
}
func (h *HistTimer) Child(suffix string) *HistTimer {
suffix = strings.TrimPrefix(suffix, "_")
return NewHistTimer(h.name + "_" + suffix)
}