From 1e575ea1723deac76b9bb54ffccd2ae989d54409 Mon Sep 17 00:00:00 2001 From: Mark Holt <135143369+mh0lt@users.noreply.github.com> Date: Sat, 10 Jun 2023 07:39:39 +0100 Subject: [PATCH] rpc service and stageloop logger updates (#7696) This is another update to logging to replace the root logger with a contextual logger --- cmd/rpcdaemon/cli/config.go | 2 -- rpc/client.go | 2 +- rpc/handler.go | 2 +- rpc/server.go | 2 +- rpc/service.go | 18 ++++++++++-------- turbo/stages/headerdownload/header_algos.go | 7 +++---- turbo/stages/stageloop.go | 6 +++--- 7 files changed, 19 insertions(+), 20 deletions(-) diff --git a/cmd/rpcdaemon/cli/config.go b/cmd/rpcdaemon/cli/config.go index 4601c9049..ec17355d2 100644 --- a/cmd/rpcdaemon/cli/config.go +++ b/cmd/rpcdaemon/cli/config.go @@ -489,7 +489,6 @@ func startRegularRpcServer(ctx context.Context, cfg httpcfg.HttpCfg, rpcAPI []rp // register apis and create handler stack httpEndpoint := fmt.Sprintf("%s:%d", cfg.HttpListenAddress, cfg.HttpPort) - logger.Trace("TraceRequests = %t\n", cfg.TraceRequests) srv := rpc.NewServer(cfg.RpcBatchConcurrency, cfg.TraceRequests, cfg.RpcStreamingDisable, logger) allowListForRPC, err := parseAllowListForRPC(cfg.RpcAllowListFilePath) @@ -609,7 +608,6 @@ type engineInfo struct { } func startAuthenticatedRpcServer(cfg httpcfg.HttpCfg, rpcAPI []rpc.API, logger log.Logger) (*engineInfo, error) { - logger.Trace("TraceRequests = %t\n", cfg.TraceRequests) srv := rpc.NewServer(cfg.RpcBatchConcurrency, cfg.TraceRequests, cfg.RpcStreamingDisable, logger) engineListener, engineSrv, engineHttpEndpoint, err := createEngineListener(cfg, rpcAPI, logger) diff --git a/rpc/client.go b/rpc/client.go index e60c99db4..06b0bb6ea 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -198,7 +198,7 @@ func newClient(initctx context.Context, connect reconnectFunc, logger log.Logger if err != nil { return nil, err } - c := initClient(conn, randomIDGenerator(), new(serviceRegistry), logger) + c := initClient(conn, randomIDGenerator(), &serviceRegistry{logger: logger}, logger) c.reconnectFunc = connect return c, nil } diff --git a/rpc/handler.go b/rpc/handler.go index b1da7fa23..679aa2c8e 100644 --- a/rpc/handler.go +++ b/rpc/handler.go @@ -134,7 +134,7 @@ func newHandler(connCtx context.Context, conn jsonWriter, idgen func() ID, reg * if conn.remoteAddr() != "" { h.logger = h.logger.New("conn", conn.remoteAddr()) } - h.unsubscribeCb = newCallback(reflect.Value{}, reflect.ValueOf(h.unsubscribe), "unsubscribe") + h.unsubscribeCb = newCallback(reflect.Value{}, reflect.ValueOf(h.unsubscribe), "unsubscribe", h.logger) return h } diff --git a/rpc/server.go b/rpc/server.go index b0805702c..25603cb5f 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -59,7 +59,7 @@ type Server struct { // NewServer creates a new server instance with no registered handlers. func NewServer(batchConcurrency uint, traceRequests, disableStreaming bool, logger log.Logger) *Server { - server := &Server{idgen: randomIDGenerator(), codecs: mapset.NewSet(), run: 1, batchConcurrency: batchConcurrency, + server := &Server{services: serviceRegistry{logger: logger}, idgen: randomIDGenerator(), codecs: mapset.NewSet(), run: 1, batchConcurrency: batchConcurrency, disableStreaming: disableStreaming, traceRequests: traceRequests, logger: logger} // Register the default service providing meta information about the RPC service such // as the services and methods it offers. diff --git a/rpc/service.go b/rpc/service.go index e04225f82..c22521945 100644 --- a/rpc/service.go +++ b/rpc/service.go @@ -41,6 +41,7 @@ var ( type serviceRegistry struct { mu sync.Mutex services map[string]service + logger log.Logger } // service represents a registered object. @@ -59,6 +60,7 @@ type callback struct { errPos int // err return idx, of -1 when method cannot return error isSubscribe bool // true if this is a subscription callback streamable bool // support JSON streaming (more efficient for large responses) + logger log.Logger } func (r *serviceRegistry) registerName(name string, rcvr interface{}) error { @@ -66,7 +68,7 @@ func (r *serviceRegistry) registerName(name string, rcvr interface{}) error { if name == "" { return fmt.Errorf("no service name for type %s", rcvrVal.Type().String()) } - callbacks := suitableCallbacks(rcvrVal) + callbacks := suitableCallbacks(rcvrVal, r.logger) if len(callbacks) == 0 { return fmt.Errorf("service %T doesn't have any suitable methods/subscriptions to expose", rcvr) } @@ -116,7 +118,7 @@ func (r *serviceRegistry) subscription(service, name string) *callback { // suitableCallbacks iterates over the methods of the given type. It determines if a method // satisfies the criteria for a RPC callback or a subscription callback and adds it to the // collection of callbacks. See server documentation for a summary of these criteria. -func suitableCallbacks(receiver reflect.Value) map[string]*callback { +func suitableCallbacks(receiver reflect.Value, logger log.Logger) map[string]*callback { typ := receiver.Type() callbacks := make(map[string]*callback) for m := 0; m < typ.NumMethod(); m++ { @@ -125,7 +127,7 @@ func suitableCallbacks(receiver reflect.Value) map[string]*callback { continue // method not exported } name := formatName(method.Name) - cb := newCallback(receiver, method.Func, name) + cb := newCallback(receiver, method.Func, name, logger) if cb == nil { continue // function invalid } @@ -136,9 +138,9 @@ func suitableCallbacks(receiver reflect.Value) map[string]*callback { // newCallback turns fn (a function) into a callback object. It returns nil if the function // is unsuitable as an RPC callback. -func newCallback(receiver, fn reflect.Value, name string) *callback { +func newCallback(receiver, fn reflect.Value, name string, logger log.Logger) *callback { fntype := fn.Type() - c := &callback{fn: fn, rcvr: receiver, errPos: -1, isSubscribe: isPubSub(fntype)} + c := &callback{fn: fn, rcvr: receiver, errPos: -1, isSubscribe: isPubSub(fntype), logger: logger} // Determine parameter types. They must all be exported or builtin types. c.makeArgTypes() @@ -149,7 +151,7 @@ func newCallback(receiver, fn reflect.Value, name string) *callback { outs[i] = fntype.Out(i) } if len(outs) > 2 { - log.Warn(fmt.Sprintf("Cannot register RPC callback [%s] - maximum 2 return values are allowed, got %d", name, len(outs))) + logger.Warn(fmt.Sprintf("Cannot register RPC callback [%s] - maximum 2 return values are allowed, got %d", name, len(outs))) return nil } // If an error is returned, it must be the last returned value. @@ -158,7 +160,7 @@ func newCallback(receiver, fn reflect.Value, name string) *callback { c.errPos = 0 case len(outs) == 2: if isErrorType(outs[0]) || !isErrorType(outs[1]) { - log.Warn(fmt.Sprintf("Cannot register RPC callback [%s] - error must the last return value", name)) + logger.Warn(fmt.Sprintf("Cannot register RPC callback [%s] - error must the last return value", name)) return nil } c.errPos = 1 @@ -214,7 +216,7 @@ func (c *callback) call(ctx context.Context, method string, args []reflect.Value // Catch panic while running the callback. defer func() { if err := recover(); err != nil { - log.Error("RPC method " + method + " crashed: " + fmt.Sprintf("%v\n%s", err, dbg.Stack())) + c.logger.Error("RPC method " + method + " crashed: " + fmt.Sprintf("%v\n%s", err, dbg.Stack())) errRes = errors.New("method handler crashed") } }() diff --git a/turbo/stages/headerdownload/header_algos.go b/turbo/stages/headerdownload/header_algos.go index 2981ce174..be3079217 100644 --- a/turbo/stages/headerdownload/header_algos.go +++ b/turbo/stages/headerdownload/header_algos.go @@ -17,7 +17,6 @@ import ( libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/etl" "github.com/ledgerwatch/erigon-lib/kv" - "github.com/ledgerwatch/log/v3" "golang.org/x/exp/slices" "github.com/ledgerwatch/erigon/dataflow" @@ -303,7 +302,7 @@ func (hd *HeaderDownload) logAnchorState() { sort.Strings(ss) hd.logger.Debug("[downloader] Queue sizes", "anchors", hd.anchorTree.Len(), "links", hd.linkQueue.Len(), "persisted", hd.persistedLinkQueue.Len()) for _, s := range ss { - log.Debug(s) + hd.logger.Debug(s) } } @@ -476,7 +475,7 @@ func (hd *HeaderDownload) UpdateStats(req *HeaderRequest, skeleton bool, peer [6 } } } - //log.Debug("Header request sent", "req", fmt.Sprintf("%+v", req), "peer", fmt.Sprintf("%x", peer)[:8]) + //hd.logger.Debug("Header request sent", "req", fmt.Sprintf("%+v", req), "peer", fmt.Sprintf("%x", peer)[:8]) } func (hd *HeaderDownload) UpdateRetryTime(req *HeaderRequest, currentTime time.Time, timeout time.Duration) { @@ -570,7 +569,7 @@ func (hd *HeaderDownload) InsertHeader(hf FeedHeaderFunc, terminalTotalDifficult if terminalTotalDifficulty != nil { if td.Cmp(terminalTotalDifficulty) >= 0 { hd.highestInDb = link.blockHeight - log.Info(POSPandaBanner) + hd.logger.Info(POSPandaBanner) dataflow.HeaderDownloadStates.AddChange(link.blockHeight, dataflow.HeaderInserted) return true, true, 0, lastTime, nil } diff --git a/turbo/stages/stageloop.go b/turbo/stages/stageloop.go index a78804b4a..45725d46c 100644 --- a/turbo/stages/stageloop.go +++ b/turbo/stages/stageloop.go @@ -99,9 +99,9 @@ func StageLoop(ctx context.Context, return } - log.Error("Staged Sync", "err", err) + logger.Error("Staged Sync", "err", err) if recoveryErr := hd.RecoverFromDb(db); recoveryErr != nil { - log.Error("Failed to recover header sentriesClient", "err", recoveryErr) + logger.Error("Failed to recover header sentriesClient", "err", recoveryErr) } time.Sleep(500 * time.Millisecond) // just to avoid too much similar errors in logs continue @@ -112,7 +112,7 @@ func StageLoop(ctx context.Context, if loopMinTime != 0 { waitTime := loopMinTime - time.Since(start) - log.Info("Wait time until next loop", "for", waitTime) + logger.Info("Wait time until next loop", "for", waitTime) c := time.After(waitTime) select { case <-ctx.Done():