mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2024-12-22 03:30:37 +00:00
adopt --metrics.addr flag in integration (#889)
This commit is contained in:
parent
6c0ff33366
commit
091819a51c
@ -48,3 +48,6 @@ issues:
|
||||
- path: cmd/faucet/
|
||||
linters:
|
||||
- deadcode
|
||||
exclude:
|
||||
# gosec: Profiling endpoint is automatically exposed on /debug/pprof
|
||||
- G108
|
||||
|
@ -19,3 +19,6 @@ issues:
|
||||
linters:
|
||||
- gosec
|
||||
- unused
|
||||
exclude:
|
||||
# gosec: Profiling endpoint is automatically exposed on /debug/pprof
|
||||
- G108
|
||||
|
@ -37,7 +37,7 @@ var rootCmd = &cobra.Command{
|
||||
}
|
||||
|
||||
func init() {
|
||||
utils.CobraFlags(rootCmd, append(debug.Flags, utils.MetricsEnabledFlag, utils.MetricsEnabledExpensiveFlag))
|
||||
utils.CobraFlags(rootCmd, append(debug.Flags, utils.MetricsEnabledFlag, utils.MetricsEnabledExpensiveFlag, utils.MetricsHTTPFlag, utils.MetricsPortFlag))
|
||||
}
|
||||
|
||||
func rootContext() context.Context {
|
||||
|
119
ethdb/kv_bolt.go
119
ethdb/kv_bolt.go
@ -4,114 +4,25 @@ import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"path"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ledgerwatch/bolt"
|
||||
"github.com/ledgerwatch/turbo-geth/common"
|
||||
"github.com/ledgerwatch/turbo-geth/common/dbutils"
|
||||
"github.com/ledgerwatch/turbo-geth/log"
|
||||
"github.com/ledgerwatch/turbo-geth/metrics"
|
||||
"os"
|
||||
"path"
|
||||
"sync"
|
||||
)
|
||||
|
||||
var (
|
||||
boltPagesAllocGauge = metrics.NewRegisteredGauge("bolt/pages/alloc_bytes", nil)
|
||||
boltPagesFreeGauge = metrics.NewRegisteredGauge("bolt/pages/free", nil)
|
||||
boltPagesPendingGauge = metrics.NewRegisteredGauge("bolt/pages/pending", nil)
|
||||
boltFreelistInuseGauge = metrics.NewRegisteredGauge("bolt/freelist/inuse", nil)
|
||||
boltTxGauge = metrics.NewRegisteredGauge("bolt/tx/total", nil)
|
||||
boltTxOpenGauge = metrics.NewRegisteredGauge("bolt/tx/open", nil)
|
||||
boltTxCursorGauge = metrics.NewRegisteredGauge("bolt/tx/cursors_total", nil)
|
||||
boltRebalanceGauge = metrics.NewRegisteredGauge("bolt/rebalance/total", nil)
|
||||
boltRebalanceTimer = metrics.NewRegisteredTimer("bolt/rebalance/time", nil)
|
||||
boltSplitGauge = metrics.NewRegisteredGauge("bolt/split/total", nil)
|
||||
boltSpillGauge = metrics.NewRegisteredGauge("bolt/spill/total", nil)
|
||||
boltSpillTimer = metrics.NewRegisteredTimer("bolt/spill/time", nil)
|
||||
boltWriteGauge = metrics.NewRegisteredGauge("bolt/write/total", nil)
|
||||
boltWriteTimer = metrics.NewRegisteredTimer("bolt/write/time", nil)
|
||||
)
|
||||
|
||||
var valueBytesMetrics []metrics.Gauge
|
||||
var keyBytesMetrics []metrics.Gauge
|
||||
var totalBytesPutMetrics []metrics.Gauge
|
||||
var totalBytesDeleteMetrics []metrics.Gauge
|
||||
var keyNMetrics []metrics.Gauge
|
||||
|
||||
func init() {
|
||||
if metrics.Enabled {
|
||||
for i := range dbutils.Buckets {
|
||||
b := strings.ToLower(string(dbutils.Buckets[i]))
|
||||
b = strings.Replace(b, "-", "_", -1)
|
||||
valueBytesMetrics = append(valueBytesMetrics, metrics.NewRegisteredGauge("db/bucket/value_bytes/"+b, nil))
|
||||
keyBytesMetrics = append(keyBytesMetrics, metrics.NewRegisteredGauge("db/bucket/key_bytes/"+b, nil))
|
||||
totalBytesPutMetrics = append(totalBytesPutMetrics, metrics.NewRegisteredGauge("db/bucket/bytes_put_total/"+b, nil))
|
||||
totalBytesDeleteMetrics = append(totalBytesDeleteMetrics, metrics.NewRegisteredGauge("db/bucket/bytes_delete_total/"+b, nil))
|
||||
keyNMetrics = append(keyNMetrics, metrics.NewRegisteredGauge("db/bucket/keys/"+b, nil))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func collectBoltMetrics(ctx context.Context, db *bolt.DB, ticker *time.Ticker) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
}
|
||||
|
||||
stats := db.Stats()
|
||||
boltPagesFreeGauge.Update(int64(stats.FreePageN))
|
||||
boltPagesPendingGauge.Update(int64(stats.PendingPageN))
|
||||
boltPagesAllocGauge.Update(int64(stats.FreeAlloc))
|
||||
boltFreelistInuseGauge.Update(int64(stats.FreelistInuse))
|
||||
|
||||
boltTxGauge.Update(int64(stats.TxN))
|
||||
boltTxOpenGauge.Update(int64(stats.OpenTxN))
|
||||
boltTxCursorGauge.Update(int64(stats.TxStats.CursorCount))
|
||||
|
||||
boltRebalanceGauge.Update(int64(stats.TxStats.Rebalance))
|
||||
boltRebalanceTimer.Update(stats.TxStats.RebalanceTime)
|
||||
|
||||
boltSplitGauge.Update(int64(stats.TxStats.Split))
|
||||
boltSpillGauge.Update(int64(stats.TxStats.Spill))
|
||||
boltSpillTimer.Update(stats.TxStats.SpillTime)
|
||||
|
||||
boltWriteGauge.Update(int64(stats.TxStats.Write))
|
||||
boltWriteTimer.Update(stats.TxStats.WriteTime)
|
||||
|
||||
if len(valueBytesMetrics) == 0 {
|
||||
continue
|
||||
}
|
||||
writeStats := db.WriteStats()
|
||||
for i := range dbutils.Buckets {
|
||||
st, ok := writeStats[string(dbutils.Buckets[i])]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
valueBytesMetrics[i].Update(int64(st.ValueBytesN))
|
||||
keyBytesMetrics[i].Update(int64(st.KeyBytesN))
|
||||
totalBytesPutMetrics[i].Update(int64(st.TotalBytesPut))
|
||||
totalBytesDeleteMetrics[i].Update(int64(st.TotalBytesDelete))
|
||||
keyNMetrics[i].Update(int64(st.KeyN))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type boltOpts struct {
|
||||
Bolt *bolt.Options
|
||||
path string
|
||||
}
|
||||
|
||||
type BoltKV struct {
|
||||
opts boltOpts
|
||||
bolt *bolt.DB
|
||||
log log.Logger
|
||||
stopMetrics context.CancelFunc
|
||||
wg *sync.WaitGroup
|
||||
opts boltOpts
|
||||
bolt *bolt.DB
|
||||
log log.Logger
|
||||
wg *sync.WaitGroup
|
||||
}
|
||||
|
||||
type boltTx struct {
|
||||
@ -186,18 +97,6 @@ func (opts boltOpts) Open() (KV, error) {
|
||||
wg: &sync.WaitGroup{},
|
||||
}
|
||||
|
||||
if metrics.Enabled {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
db.stopMetrics = cancel
|
||||
db.wg.Add(1)
|
||||
go func() {
|
||||
defer db.wg.Done()
|
||||
ticker := time.NewTicker(3 * time.Second)
|
||||
defer ticker.Stop()
|
||||
collectBoltMetrics(ctx, boltDB, ticker)
|
||||
}()
|
||||
}
|
||||
|
||||
return db, nil
|
||||
}
|
||||
|
||||
@ -218,10 +117,6 @@ func NewBolt() boltOpts {
|
||||
// Close closes BoltKV
|
||||
// All transactions must be closed before closing the database.
|
||||
func (db *BoltKV) Close() {
|
||||
if db.stopMetrics != nil {
|
||||
db.stopMetrics()
|
||||
}
|
||||
|
||||
db.wg.Wait()
|
||||
|
||||
if db.bolt != nil {
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
_ "net/http/pprof"
|
||||
"os"
|
||||
"os/signal"
|
||||
"runtime"
|
||||
@ -54,6 +55,13 @@ var (
|
||||
Usage: "Request a stack trace at a specific logging statement (e.g. \"block.go:271\")",
|
||||
Value: "",
|
||||
}
|
||||
metricsAddrFlag = cli.StringFlag{
|
||||
Name: "metrics.addr",
|
||||
}
|
||||
metricsPortFlag = cli.UintFlag{
|
||||
Name: "metrics.port",
|
||||
Value: 6060,
|
||||
}
|
||||
debugFlag = cli.BoolFlag{
|
||||
Name: "debug",
|
||||
Usage: "Prepends log messages with call-site location (file and line number)",
|
||||
@ -227,9 +235,25 @@ func SetupCobra(cmd *cobra.Command) error {
|
||||
return err
|
||||
}
|
||||
|
||||
metricsAddr, err := flags.GetString(metricsAddrFlag.Name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
metricsPort, err := flags.GetInt(metricsPortFlag.Name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if metrics.Enabled && metricsAddr != "" {
|
||||
address := fmt.Sprintf("%s:%d", metricsAddr, metricsPort)
|
||||
log.Info("Enabling stand-alone metrics HTTP endpoint", "addr", address)
|
||||
exp.Setup(address)
|
||||
}
|
||||
|
||||
withMetrics := metrics.Enabled && metricsAddr == ""
|
||||
if pprof {
|
||||
// metrics and pprof server
|
||||
StartPProf(fmt.Sprintf("%s:%d", pprofAddr, pprofPort), metrics.Enabled)
|
||||
StartPProf(fmt.Sprintf("%s:%d", pprofAddr, pprofPort), withMetrics)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -304,7 +328,9 @@ func StartPProf(address string, withMetrics bool) {
|
||||
exp.Exp(metrics.DefaultRegistry, http.NewServeMux())
|
||||
}
|
||||
http.Handle("/memsize/", http.StripPrefix("/memsize", &Memsize))
|
||||
log.Info("Starting pprof server", "addr", fmt.Sprintf("http://%s/debug/pprof", address))
|
||||
cpuMsg := fmt.Sprintf("go tool pprof -lines -http=: http://%s/%s", address, "debug/pprof/profile?seconds=20")
|
||||
heapMsg := fmt.Sprintf("go tool pprof -lines -http=: http://%s/%s", address, "debug/pprof/heap")
|
||||
log.Info("Starting pprof server", "cpu", cpuMsg, "heap", heapMsg)
|
||||
go func() {
|
||||
if err := http.ListenAndServe(address, nil); err != nil {
|
||||
log.Error("Failure in running pprof server", "err", err)
|
||||
|
@ -64,6 +64,9 @@ func Setup(address string) {
|
||||
m := http.NewServeMux()
|
||||
m.Handle("/debug/metrics", ExpHandler(metrics.DefaultRegistry))
|
||||
m.Handle("/debug/metrics/prometheus", prometheus.Handler(metrics.DefaultRegistry))
|
||||
m.Handle("/debug/metrics/prometheus2", promhttp.HandlerFor(prometheus2.DefaultGatherer, promhttp.HandlerOpts{
|
||||
EnableOpenMetrics: true,
|
||||
}))
|
||||
log.Info("Starting metrics server", "addr", fmt.Sprintf("http://%s/debug/metrics", address))
|
||||
go func() {
|
||||
if err := http.ListenAndServe(address, m); err != nil {
|
||||
|
Loading…
Reference in New Issue
Block a user