go-pulse/metrics/influxdb/influxdbv2.go

97 lines
2.4 KiB
Go
Raw Permalink Normal View History

package influxdb
import (
"context"
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api"
)
type v2Reporter struct {
reg metrics.Registry
interval time.Duration
endpoint string
token string
bucket string
organization string
namespace string
tags map[string]string
client influxdb2.Client
write api.WriteAPI
}
// InfluxDBV2WithTags starts a InfluxDB reporter which will post the from the given metrics.Registry at each d interval with the specified tags
func InfluxDBV2WithTags(r metrics.Registry, d time.Duration, endpoint string, token string, bucket string, organization string, namespace string, tags map[string]string) {
rep := &v2Reporter{
reg: r,
interval: d,
endpoint: endpoint,
token: token,
bucket: bucket,
organization: organization,
namespace: namespace,
tags: tags,
}
rep.client = influxdb2.NewClient(rep.endpoint, rep.token)
defer rep.client.Close()
// async write client
rep.write = rep.client.WriteAPI(rep.organization, rep.bucket)
errorsCh := rep.write.Errors()
// have to handle write errors in a separate goroutine like this b/c the channel is unbuffered and will block writes if not read
go func() {
for err := range errorsCh {
log.Warn("write error", "err", err.Error())
}
}()
rep.run()
}
func (r *v2Reporter) run() {
intervalTicker := time.NewTicker(r.interval)
pingTicker := time.NewTicker(time.Second * 5)
defer intervalTicker.Stop()
defer pingTicker.Stop()
for {
select {
case <-intervalTicker.C:
r.send(0)
case <-pingTicker.C:
_, err := r.client.Health(context.Background())
if err != nil {
log.Warn("Got error from influxdb client health check", "err", err.Error())
}
}
}
}
// send sends the measurements. If provided tstamp is >0, it is used. Otherwise,
// a 'fresh' timestamp is used.
func (r *v2Reporter) send(tstamp int64) {
r.reg.Each(func(name string, i interface{}) {
var now time.Time
if tstamp <= 0 {
now = time.Now()
} else {
now = time.Unix(tstamp, 0)
}
measurement, fields := readMeter(r.namespace, name, i)
if fields == nil {
return
}
pt := influxdb2.NewPoint(measurement, r.tags, fields, now)
r.write.WritePoint(pt)
})
// Force all unwritten data to be sent
r.write.Flush()
}