rpc service and stageloop logger updates (#7696)

This is another update to logging to replace the root logger with a
contextual logger
This commit is contained in:
Mark Holt 2023-06-10 07:39:39 +01:00 committed by GitHub
parent 2ea9eb538e
commit 1e575ea172
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 19 additions and 20 deletions

View File

@ -489,7 +489,6 @@ func startRegularRpcServer(ctx context.Context, cfg httpcfg.HttpCfg, rpcAPI []rp
// register apis and create handler stack
httpEndpoint := fmt.Sprintf("%s:%d", cfg.HttpListenAddress, cfg.HttpPort)
logger.Trace("TraceRequests = %t\n", cfg.TraceRequests)
srv := rpc.NewServer(cfg.RpcBatchConcurrency, cfg.TraceRequests, cfg.RpcStreamingDisable, logger)
allowListForRPC, err := parseAllowListForRPC(cfg.RpcAllowListFilePath)
@ -609,7 +608,6 @@ type engineInfo struct {
}
func startAuthenticatedRpcServer(cfg httpcfg.HttpCfg, rpcAPI []rpc.API, logger log.Logger) (*engineInfo, error) {
logger.Trace("TraceRequests = %t\n", cfg.TraceRequests)
srv := rpc.NewServer(cfg.RpcBatchConcurrency, cfg.TraceRequests, cfg.RpcStreamingDisable, logger)
engineListener, engineSrv, engineHttpEndpoint, err := createEngineListener(cfg, rpcAPI, logger)

View File

@ -198,7 +198,7 @@ func newClient(initctx context.Context, connect reconnectFunc, logger log.Logger
if err != nil {
return nil, err
}
c := initClient(conn, randomIDGenerator(), new(serviceRegistry), logger)
c := initClient(conn, randomIDGenerator(), &serviceRegistry{logger: logger}, logger)
c.reconnectFunc = connect
return c, nil
}

View File

@ -134,7 +134,7 @@ func newHandler(connCtx context.Context, conn jsonWriter, idgen func() ID, reg *
if conn.remoteAddr() != "" {
h.logger = h.logger.New("conn", conn.remoteAddr())
}
h.unsubscribeCb = newCallback(reflect.Value{}, reflect.ValueOf(h.unsubscribe), "unsubscribe")
h.unsubscribeCb = newCallback(reflect.Value{}, reflect.ValueOf(h.unsubscribe), "unsubscribe", h.logger)
return h
}

View File

@ -59,7 +59,7 @@ type Server struct {
// NewServer creates a new server instance with no registered handlers.
func NewServer(batchConcurrency uint, traceRequests, disableStreaming bool, logger log.Logger) *Server {
server := &Server{idgen: randomIDGenerator(), codecs: mapset.NewSet(), run: 1, batchConcurrency: batchConcurrency,
server := &Server{services: serviceRegistry{logger: logger}, idgen: randomIDGenerator(), codecs: mapset.NewSet(), run: 1, batchConcurrency: batchConcurrency,
disableStreaming: disableStreaming, traceRequests: traceRequests, logger: logger}
// Register the default service providing meta information about the RPC service such
// as the services and methods it offers.

View File

@ -41,6 +41,7 @@ var (
type serviceRegistry struct {
mu sync.Mutex
services map[string]service
logger log.Logger
}
// service represents a registered object.
@ -59,6 +60,7 @@ type callback struct {
errPos int // err return idx, of -1 when method cannot return error
isSubscribe bool // true if this is a subscription callback
streamable bool // support JSON streaming (more efficient for large responses)
logger log.Logger
}
func (r *serviceRegistry) registerName(name string, rcvr interface{}) error {
@ -66,7 +68,7 @@ func (r *serviceRegistry) registerName(name string, rcvr interface{}) error {
if name == "" {
return fmt.Errorf("no service name for type %s", rcvrVal.Type().String())
}
callbacks := suitableCallbacks(rcvrVal)
callbacks := suitableCallbacks(rcvrVal, r.logger)
if len(callbacks) == 0 {
return fmt.Errorf("service %T doesn't have any suitable methods/subscriptions to expose", rcvr)
}
@ -116,7 +118,7 @@ func (r *serviceRegistry) subscription(service, name string) *callback {
// suitableCallbacks iterates over the methods of the given type. It determines if a method
// satisfies the criteria for a RPC callback or a subscription callback and adds it to the
// collection of callbacks. See server documentation for a summary of these criteria.
func suitableCallbacks(receiver reflect.Value) map[string]*callback {
func suitableCallbacks(receiver reflect.Value, logger log.Logger) map[string]*callback {
typ := receiver.Type()
callbacks := make(map[string]*callback)
for m := 0; m < typ.NumMethod(); m++ {
@ -125,7 +127,7 @@ func suitableCallbacks(receiver reflect.Value) map[string]*callback {
continue // method not exported
}
name := formatName(method.Name)
cb := newCallback(receiver, method.Func, name)
cb := newCallback(receiver, method.Func, name, logger)
if cb == nil {
continue // function invalid
}
@ -136,9 +138,9 @@ func suitableCallbacks(receiver reflect.Value) map[string]*callback {
// newCallback turns fn (a function) into a callback object. It returns nil if the function
// is unsuitable as an RPC callback.
func newCallback(receiver, fn reflect.Value, name string) *callback {
func newCallback(receiver, fn reflect.Value, name string, logger log.Logger) *callback {
fntype := fn.Type()
c := &callback{fn: fn, rcvr: receiver, errPos: -1, isSubscribe: isPubSub(fntype)}
c := &callback{fn: fn, rcvr: receiver, errPos: -1, isSubscribe: isPubSub(fntype), logger: logger}
// Determine parameter types. They must all be exported or builtin types.
c.makeArgTypes()
@ -149,7 +151,7 @@ func newCallback(receiver, fn reflect.Value, name string) *callback {
outs[i] = fntype.Out(i)
}
if len(outs) > 2 {
log.Warn(fmt.Sprintf("Cannot register RPC callback [%s] - maximum 2 return values are allowed, got %d", name, len(outs)))
logger.Warn(fmt.Sprintf("Cannot register RPC callback [%s] - maximum 2 return values are allowed, got %d", name, len(outs)))
return nil
}
// If an error is returned, it must be the last returned value.
@ -158,7 +160,7 @@ func newCallback(receiver, fn reflect.Value, name string) *callback {
c.errPos = 0
case len(outs) == 2:
if isErrorType(outs[0]) || !isErrorType(outs[1]) {
log.Warn(fmt.Sprintf("Cannot register RPC callback [%s] - error must the last return value", name))
logger.Warn(fmt.Sprintf("Cannot register RPC callback [%s] - error must the last return value", name))
return nil
}
c.errPos = 1
@ -214,7 +216,7 @@ func (c *callback) call(ctx context.Context, method string, args []reflect.Value
// Catch panic while running the callback.
defer func() {
if err := recover(); err != nil {
log.Error("RPC method " + method + " crashed: " + fmt.Sprintf("%v\n%s", err, dbg.Stack()))
c.logger.Error("RPC method " + method + " crashed: " + fmt.Sprintf("%v\n%s", err, dbg.Stack()))
errRes = errors.New("method handler crashed")
}
}()

View File

@ -17,7 +17,6 @@ import (
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/etl"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/log/v3"
"golang.org/x/exp/slices"
"github.com/ledgerwatch/erigon/dataflow"
@ -303,7 +302,7 @@ func (hd *HeaderDownload) logAnchorState() {
sort.Strings(ss)
hd.logger.Debug("[downloader] Queue sizes", "anchors", hd.anchorTree.Len(), "links", hd.linkQueue.Len(), "persisted", hd.persistedLinkQueue.Len())
for _, s := range ss {
log.Debug(s)
hd.logger.Debug(s)
}
}
@ -476,7 +475,7 @@ func (hd *HeaderDownload) UpdateStats(req *HeaderRequest, skeleton bool, peer [6
}
}
}
//log.Debug("Header request sent", "req", fmt.Sprintf("%+v", req), "peer", fmt.Sprintf("%x", peer)[:8])
//hd.logger.Debug("Header request sent", "req", fmt.Sprintf("%+v", req), "peer", fmt.Sprintf("%x", peer)[:8])
}
func (hd *HeaderDownload) UpdateRetryTime(req *HeaderRequest, currentTime time.Time, timeout time.Duration) {
@ -570,7 +569,7 @@ func (hd *HeaderDownload) InsertHeader(hf FeedHeaderFunc, terminalTotalDifficult
if terminalTotalDifficulty != nil {
if td.Cmp(terminalTotalDifficulty) >= 0 {
hd.highestInDb = link.blockHeight
log.Info(POSPandaBanner)
hd.logger.Info(POSPandaBanner)
dataflow.HeaderDownloadStates.AddChange(link.blockHeight, dataflow.HeaderInserted)
return true, true, 0, lastTime, nil
}

View File

@ -99,9 +99,9 @@ func StageLoop(ctx context.Context,
return
}
log.Error("Staged Sync", "err", err)
logger.Error("Staged Sync", "err", err)
if recoveryErr := hd.RecoverFromDb(db); recoveryErr != nil {
log.Error("Failed to recover header sentriesClient", "err", recoveryErr)
logger.Error("Failed to recover header sentriesClient", "err", recoveryErr)
}
time.Sleep(500 * time.Millisecond) // just to avoid too much similar errors in logs
continue
@ -112,7 +112,7 @@ func StageLoop(ctx context.Context,
if loopMinTime != 0 {
waitTime := loopMinTime - time.Since(start)
log.Info("Wait time until next loop", "for", waitTime)
logger.Info("Wait time until next loop", "for", waitTime)
c := time.After(waitTime)
select {
case <-ctx.Done():