metrics: use prometheus histogram and summary interfaces (#8808)

This commit is contained in:
milen 2023-11-24 18:50:57 +01:00 committed by GitHub
parent 230b013096
commit 9b74cf0384
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 89 additions and 78 deletions

View File

@ -13,7 +13,7 @@ func (b *CachingBeaconState) EncodeSSZ(buf []byte) ([]byte, error) {
} }
h.PutSince() h.PutSince()
sz := metrics.NewHistTimer("encode_ssz_beacon_state_size") sz := metrics.NewHistTimer("encode_ssz_beacon_state_size")
sz.Update(float64(len(bts))) sz.Observe(float64(len(bts)))
return bts, err return bts, err
} }
@ -23,7 +23,7 @@ func (b *CachingBeaconState) DecodeSSZ(buf []byte, version int) error {
return err return err
} }
sz := metrics.NewHistTimer("decode_ssz_beacon_state_size") sz := metrics.NewHistTimer("decode_ssz_beacon_state_size")
sz.Update(float64(len(buf))) sz.Observe(float64(len(buf)))
h.PutSince() h.PutSince()
return b.initBeaconState() return b.initBeaconState()
} }

View File

@ -83,5 +83,5 @@ func sendMetrics(ctx context.Context, start time.Time, isSuccessful bool) {
} }
meters.request[isSuccessful].Set(1) meters.request[isSuccessful].Set(1)
meters.timer.UpdateDuration(start) meters.timer.ObserveDuration(start)
} }

View File

@ -19,18 +19,16 @@ package core
import ( import (
"fmt" "fmt"
"github.com/ledgerwatch/erigon-lib/metrics"
"time" "time"
"github.com/ledgerwatch/log/v3"
"golang.org/x/crypto/sha3" "golang.org/x/crypto/sha3"
"golang.org/x/exp/slices" "golang.org/x/exp/slices"
"github.com/ledgerwatch/log/v3"
"github.com/ledgerwatch/erigon-lib/chain" "github.com/ledgerwatch/erigon-lib/chain"
libcommon "github.com/ledgerwatch/erigon-lib/common" libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/common/cmp" "github.com/ledgerwatch/erigon-lib/common/cmp"
"github.com/ledgerwatch/erigon-lib/metrics"
"github.com/ledgerwatch/erigon/common/math" "github.com/ledgerwatch/erigon/common/math"
"github.com/ledgerwatch/erigon/common/u256" "github.com/ledgerwatch/erigon/common/u256"
"github.com/ledgerwatch/erigon/consensus" "github.com/ledgerwatch/erigon/consensus"
@ -42,7 +40,7 @@ import (
) )
var ( var (
BlockExecutionTimer = metrics.GetOrCreateSummary("chain_execution_seconds") blockExecutionTimer = metrics.GetOrCreateSummary("chain_execution_seconds")
) )
type SyncMode string type SyncMode string
@ -85,7 +83,7 @@ func ExecuteBlockEphemerally(
logger log.Logger, logger log.Logger,
) (*EphemeralExecResult, error) { ) (*EphemeralExecResult, error) {
defer BlockExecutionTimer.UpdateDuration(time.Now()) defer blockExecutionTimer.ObserveDuration(time.Now())
block.Uncles() block.Uncles()
ibs := state.New(stateReader) ibs := state.New(stateReader)
header := block.Header() header := block.Header()

View File

@ -833,12 +833,12 @@ func (tx *MdbxTx) Commit() error {
} }
if tx.db.opts.label == kv.ChainDB { if tx.db.opts.label == kv.ChainDB {
kv.DbCommitPreparation.Update(latency.Preparation.Seconds()) kv.DbCommitPreparation.Observe(latency.Preparation.Seconds())
//kv.DbCommitAudit.Update(latency.Audit.Seconds()) //kv.DbCommitAudit.Update(latency.Audit.Seconds())
kv.DbCommitWrite.Update(latency.Write.Seconds()) kv.DbCommitWrite.Observe(latency.Write.Seconds())
kv.DbCommitSync.Update(latency.Sync.Seconds()) kv.DbCommitSync.Observe(latency.Sync.Seconds())
kv.DbCommitEnding.Update(latency.Ending.Seconds()) kv.DbCommitEnding.Observe(latency.Ending.Seconds())
kv.DbCommitTotal.Update(latency.Whole.Seconds()) kv.DbCommitTotal.Observe(latency.Whole.Seconds())
//kv.DbGcWorkPnlMergeTime.Update(latency.GCDetails.WorkPnlMergeTime.Seconds()) //kv.DbGcWorkPnlMergeTime.Update(latency.GCDetails.WorkPnlMergeTime.Seconds())
//kv.DbGcWorkPnlMergeVolume.Set(uint64(latency.GCDetails.WorkPnlMergeVolume)) //kv.DbGcWorkPnlMergeVolume.Set(uint64(latency.GCDetails.WorkPnlMergeVolume))

View File

@ -0,0 +1,14 @@
package metrics
import (
"time"
)
type DurationObserver interface {
// ObserveDuration observes duration since start time
ObserveDuration(start time.Time)
}
func secondsSince(start time.Time) float64 {
return time.Since(start).Seconds()
}

View File

@ -0,0 +1,20 @@
package metrics
import (
"time"
"github.com/prometheus/client_golang/prometheus"
)
type Histogram interface {
prometheus.Histogram
DurationObserver
}
type histogram struct {
prometheus.Summary
}
func (h *histogram) ObserveDuration(start time.Time) {
h.Observe(secondsSince(start))
}

View File

@ -2,25 +2,8 @@ package metrics
import ( import (
"fmt" "fmt"
"time"
"github.com/prometheus/client_golang/prometheus"
) )
type Histogram interface {
// UpdateDuration updates request duration based on the given startTime.
UpdateDuration(time.Time)
// Update updates h with v.
//
// Negative values and NaNs are ignored.
Update(float64)
}
type Summary interface {
Histogram
}
// NewCounter registers and returns new counter with the given name. // NewCounter registers and returns new counter with the given name.
// //
// name must be valid Prometheus-compatible metric with possible labels. // name must be valid Prometheus-compatible metric with possible labels.
@ -105,18 +88,6 @@ func GetOrCreateGauge(name string) Gauge {
return &gauge{g} return &gauge{g}
} }
type summary struct {
prometheus.Summary
}
func (sm summary) UpdateDuration(startTime time.Time) {
sm.Observe(time.Since(startTime).Seconds())
}
func (sm summary) Update(v float64) {
sm.Observe(v)
}
// NewSummary creates and returns new summary with the given name. // NewSummary creates and returns new summary with the given name.
// //
// name must be valid Prometheus-compatible metric with possible labels. // name must be valid Prometheus-compatible metric with possible labels.
@ -133,7 +104,7 @@ func NewSummary(name string) Summary {
panic(fmt.Errorf("could not create new summary: %w", err)) panic(fmt.Errorf("could not create new summary: %w", err))
} }
return summary{s} return &summary{s}
} }
// GetOrCreateSummary returns registered summary with the given name // GetOrCreateSummary returns registered summary with the given name
@ -156,19 +127,7 @@ func GetOrCreateSummary(name string) Summary {
panic(fmt.Errorf("could not get or create new summary: %w", err)) panic(fmt.Errorf("could not get or create new summary: %w", err))
} }
return summary{s} return &summary{s}
}
type histogram struct {
prometheus.Histogram
}
func (h histogram) UpdateDuration(startTime time.Time) {
h.Observe(time.Since(startTime).Seconds())
}
func (h histogram) Update(v float64) {
h.Observe(v)
} }
// NewHistogram creates and returns new histogram with the given name. // NewHistogram creates and returns new histogram with the given name.
@ -187,7 +146,7 @@ func NewHistogram(name string) Histogram {
panic(fmt.Errorf("could not create new histogram: %w", err)) panic(fmt.Errorf("could not create new histogram: %w", err))
} }
return histogram{h} return &histogram{h}
} }
// GetOrCreateHistogram returns registered histogram with the given name // GetOrCreateHistogram returns registered histogram with the given name
@ -210,5 +169,5 @@ func GetOrCreateHistogram(name string) Histogram {
panic(fmt.Errorf("could not get or create new histogram: %w", err)) panic(fmt.Errorf("could not get or create new histogram: %w", err))
} }
return histogram{h} return &histogram{h}
} }

View File

@ -0,0 +1,20 @@
package metrics
import (
"time"
"github.com/prometheus/client_golang/prometheus"
)
type Summary interface {
prometheus.Summary
DurationObserver
}
type summary struct {
prometheus.Summary
}
func (s *summary) ObserveDuration(start time.Time) {
s.Observe(secondsSince(start))
}

View File

@ -22,7 +22,7 @@ func NewHistTimer(name string) *HistTimer {
} }
func (h *HistTimer) PutSince() { func (h *HistTimer) PutSince() {
h.Histogram.UpdateDuration(h.start) h.Histogram.ObserveDuration(h.start)
} }
func (h *HistTimer) Tag(pairs ...string) *HistTimer { func (h *HistTimer) Tag(pairs ...string) *HistTimer {

View File

@ -442,7 +442,7 @@ func (a *Aggregator) aggregate(ctx context.Context, step uint64) error {
start := time.Now() start := time.Now()
collation, err := d.collateStream(ctx, step, txFrom, txTo, d.tx) collation, err := d.collateStream(ctx, step, txFrom, txTo, d.tx)
mxRunningCollations.Dec() mxRunningCollations.Dec()
mxCollateTook.UpdateDuration(start) mxCollateTook.ObserveDuration(start)
//mxCollationSize.Set(uint64(collation.valuesComp.Count())) //mxCollationSize.Set(uint64(collation.valuesComp.Count()))
mxCollationSizeHist.SetInt(collation.historyComp.Count()) mxCollationSizeHist.SetInt(collation.historyComp.Count())
@ -481,8 +481,8 @@ func (a *Aggregator) aggregate(ctx context.Context, step uint64) error {
mxPruningProgress.Dec() mxPruningProgress.Dec()
mxPruningProgress.Dec() mxPruningProgress.Dec()
mxPruneTook.Update(d.stats.LastPruneTook.Seconds()) mxPruneTook.Observe(d.stats.LastPruneTook.Seconds())
mxPruneHistTook.Update(d.stats.LastPruneHistTook.Seconds()) mxPruneHistTook.Observe(d.stats.LastPruneHistTook.Seconds())
} }
// when domain files are build and db is pruned, we can merge them // when domain files are build and db is pruned, we can merge them
@ -503,7 +503,7 @@ func (a *Aggregator) aggregate(ctx context.Context, step uint64) error {
start := time.Now() start := time.Now()
collation, err := d.collate(ctx, step*a.aggregationStep, (step+1)*a.aggregationStep, d.tx) collation, err := d.collate(ctx, step*a.aggregationStep, (step+1)*a.aggregationStep, d.tx)
mxRunningCollations.Dec() mxRunningCollations.Dec()
mxCollateTook.UpdateDuration(start) mxCollateTook.ObserveDuration(start)
if err != nil { if err != nil {
return fmt.Errorf("index collation %q has failed: %w", d.filenameBase, err) return fmt.Errorf("index collation %q has failed: %w", d.filenameBase, err)
@ -523,7 +523,7 @@ func (a *Aggregator) aggregate(ctx context.Context, step uint64) error {
} }
mxRunningMerges.Dec() mxRunningMerges.Dec()
mxBuildTook.UpdateDuration(start) mxBuildTook.ObserveDuration(start)
d.integrateFiles(sf, step*a.aggregationStep, (step+1)*a.aggregationStep) d.integrateFiles(sf, step*a.aggregationStep, (step+1)*a.aggregationStep)
@ -547,7 +547,7 @@ func (a *Aggregator) aggregate(ctx context.Context, step uint64) error {
if err := d.prune(ctx, txFrom, txTo, math.MaxUint64, logEvery); err != nil { if err := d.prune(ctx, txFrom, txTo, math.MaxUint64, logEvery); err != nil {
return err return err
} }
mxPruneTook.UpdateDuration(startPrune) mxPruneTook.ObserveDuration(startPrune)
mxPruningProgress.Dec() mxPruningProgress.Dec()
} }
@ -565,7 +565,7 @@ func (a *Aggregator) aggregate(ctx context.Context, step uint64) error {
"range", fmt.Sprintf("%.2fM-%.2fM", float64(txFrom)/10e5, float64(txTo)/10e5), "range", fmt.Sprintf("%.2fM-%.2fM", float64(txFrom)/10e5, float64(txTo)/10e5),
"took", time.Since(stepStartedAt)) "took", time.Since(stepStartedAt))
mxStepTook.UpdateDuration(stepStartedAt) mxStepTook.ObserveDuration(stepStartedAt)
return nil return nil
} }
@ -601,7 +601,7 @@ func (a *Aggregator) mergeLoopStep(ctx context.Context, maxEndTxNum uint64, work
closeAll = false closeAll = false
for _, s := range []DomainStats{a.accounts.stats, a.code.stats, a.storage.stats} { for _, s := range []DomainStats{a.accounts.stats, a.code.stats, a.storage.stats} {
mxBuildTook.Update(s.LastFileBuildingTook.Seconds()) mxBuildTook.Observe(s.LastFileBuildingTook.Seconds())
} }
a.logger.Info("[stat] finished merge step", a.logger.Info("[stat] finished merge step",
@ -855,9 +855,9 @@ func (a *Aggregator) ComputeCommitment(saveStateAfter, trace bool) (rootHash []b
} }
mxCommitmentKeys.AddUint64(a.commitment.comKeys) mxCommitmentKeys.AddUint64(a.commitment.comKeys)
mxCommitmentTook.Update(a.commitment.comTook.Seconds()) mxCommitmentTook.Observe(a.commitment.comTook.Seconds())
defer func(t time.Time) { mxCommitmentWriteTook.UpdateDuration(t) }(time.Now()) defer func(t time.Time) { mxCommitmentWriteTook.ObserveDuration(t) }(time.Now())
for pref, update := range branchNodeUpdates { for pref, update := range branchNodeUpdates {
prefix := []byte(pref) prefix := []byte(pref)

View File

@ -710,7 +710,7 @@ func (d *Domain) aggregate(ctx context.Context, step uint64, txFrom, txTo uint64
start := time.Now() start := time.Now()
collation, err := d.collateStream(ctx, step, txFrom, txTo, tx) collation, err := d.collateStream(ctx, step, txFrom, txTo, tx)
mxRunningCollations.Dec() mxRunningCollations.Dec()
mxCollateTook.UpdateDuration(start) mxCollateTook.ObserveDuration(start)
mxCollationSize.SetInt(collation.valuesComp.Count()) mxCollationSize.SetInt(collation.valuesComp.Count())
mxCollationSizeHist.SetInt(collation.historyComp.Count()) mxCollationSizeHist.SetInt(collation.historyComp.Count())

View File

@ -303,7 +303,7 @@ func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChang
return err return err
} }
defer newBlockTimer.UpdateDuration(time.Now()) defer newBlockTimer.ObserveDuration(time.Now())
//t := time.Now() //t := time.Now()
coreDB, cache := p.coreDBWithCache() coreDB, cache := p.coreDBWithCache()
@ -414,7 +414,7 @@ func (p *TxPool) processRemoteTxs(ctx context.Context) error {
return fmt.Errorf("txpool not started yet") return fmt.Errorf("txpool not started yet")
} }
defer processBatchTxsTimer.UpdateDuration(time.Now()) defer processBatchTxsTimer.ObserveDuration(time.Now())
coreDB, cache := p.coreDBWithCache() coreDB, cache := p.coreDBWithCache()
coreTx, err := coreDB.BeginRo(ctx) coreTx, err := coreDB.BeginRo(ctx)
if err != nil { if err != nil {
@ -718,7 +718,7 @@ func (p *TxPool) AddRemoteTxs(_ context.Context, newTxs types.TxSlots) {
return return
} }
defer addRemoteTxsTimer.UpdateDuration(time.Now()) defer addRemoteTxsTimer.ObserveDuration(time.Now())
p.lock.Lock() p.lock.Lock()
defer p.lock.Unlock() defer p.lock.Unlock()
for i, txn := range newTxs.Txs { for i, txn := range newTxs.Txs {
@ -1702,7 +1702,7 @@ func MainLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, newTxs
if announcements.Len() == 0 { if announcements.Len() == 0 {
return return
} }
defer propagateNewTxsTimer.UpdateDuration(time.Now()) defer propagateNewTxsTimer.ObserveDuration(time.Now())
announcements = announcements.DedupCopy() announcements = announcements.DedupCopy()
@ -1804,7 +1804,7 @@ func MainLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, newTxs
var sizes []uint32 var sizes []uint32
types, sizes, hashes = p.AppendAllAnnouncements(types, sizes, hashes[:0]) types, sizes, hashes = p.AppendAllAnnouncements(types, sizes, hashes[:0])
go send.PropagatePooledTxsToPeersList(newPeers, types, sizes, hashes) go send.PropagatePooledTxsToPeersList(newPeers, types, sizes, hashes)
propagateToNewPeerTimer.UpdateDuration(t) propagateToNewPeerTimer.ObserveDuration(t)
} }
} }
} }
@ -1830,7 +1830,7 @@ func (p *TxPool) flushNoFsync(ctx context.Context, db kv.RwDB) (written uint64,
} }
func (p *TxPool) flush(ctx context.Context, db kv.RwDB) (written uint64, err error) { func (p *TxPool) flush(ctx context.Context, db kv.RwDB) (written uint64, err error) {
defer writeToDBTimer.UpdateDuration(time.Now()) defer writeToDBTimer.ObserveDuration(time.Now())
// 1. get global lock on txpool and flush it to db, without fsync (to release lock asap) // 1. get global lock on txpool and flush it to db, without fsync (to release lock asap)
// 2. then fsync db without txpool lock // 2. then fsync db without txpool lock
written, err = p.flushNoFsync(ctx, db) written, err = p.flushNoFsync(ctx, db)

View File

@ -449,7 +449,7 @@ func (h *handler) handleCall(cp *callProc, msg *jsonrpcMessage, stream *jsoniter
if answer != nil && answer.Error != nil { if answer != nil && answer.Error != nil {
failedReqeustGauge.Inc() failedReqeustGauge.Inc()
} }
newRPCServingTimerMS(msg.Method, answer == nil || answer.Error == nil).UpdateDuration(start) newRPCServingTimerMS(msg.Method, answer == nil || answer.Error == nil).ObserveDuration(start)
} }
return answer return answer
} }