From b0117a7c307d37bd1f5f693072acc2ebbb061471 Mon Sep 17 00:00:00 2001 From: ledgerwatch Date: Fri, 19 May 2023 18:41:53 +0100 Subject: [PATCH] [devnet] separate logging - p2p (#7547) Co-authored-by: Alex Sharp --- cl/phase1/main.go | 10 +-- cmd/bootnode/main.go | 4 +- cmd/caplin-phase1/main.go | 2 +- cmd/observer/main.go | 4 +- cmd/observer/observer/command.go | 4 +- cmd/observer/observer/server.go | 20 ++--- cmd/sentinel/main.go | 2 +- cmd/sentinel/sentinel/sentinel.go | 7 +- cmd/sentinel/sentinel/service/service.go | 12 +-- cmd/sentinel/sentinel/service/start.go | 18 ++-- cmd/sentry/sentry/sentry_grpc_server.go | 2 +- cmd/utils/flags.go | 1 - eth/backend.go | 2 +- p2p/discover/v4_udp_test.go | 45 ++++++---- p2p/discover/v5_udp_test.go | 108 +++++++++++++---------- p2p/discover/v5wire/crypto_test.go | 4 +- p2p/discover/v5wire/encoding_test.go | 43 +++++---- p2p/enode/localnode.go | 6 +- p2p/enode/localnode_test.go | 18 ++-- p2p/server.go | 58 ++++++------ p2p/server_test.go | 40 ++++----- turbo/logging/logging.go | 3 +- 22 files changed, 224 insertions(+), 189 deletions(-) diff --git a/cl/phase1/main.go b/cl/phase1/main.go index fca9bdaf0..3324a8437 100644 --- a/cl/phase1/main.go +++ b/cl/phase1/main.go @@ -95,7 +95,7 @@ func runConsensusLayerNode(cliCtx *cli.Context) error { // Start the sentinel service log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(cfg.LogLvl), log.StderrHandler)) log.Info("[Sentinel] running sentinel with configuration", "cfg", cfg) - s, err := startSentinel(cliCtx, *cfg, cpState) + s, err := startSentinel(cliCtx, *cfg, cpState, log.Root()) if err != nil { log.Error("Could not start sentinel service", "err", err) } @@ -131,7 +131,7 @@ Loop: return nil } -func startSentinel(cliCtx *cli.Context, cfg lcCli.ConsensusClientCliCfg, beaconState *state.BeaconState) (sentinelrpc.SentinelClient, error) { +func startSentinel(cliCtx *cli.Context, cfg lcCli.ConsensusClientCliCfg, beaconState *state.BeaconState, logger log.Logger) (sentinelrpc.SentinelClient, error) { forkDigest, err := fork.ComputeForkDigest(cfg.BeaconCfg, cfg.GenesisCfg) if err != nil { return nil, err @@ -150,12 +150,12 @@ func startSentinel(cliCtx *cli.Context, cfg lcCli.ConsensusClientCliCfg, beaconS FinalizedEpoch: beaconState.FinalizedCheckpoint().Epoch(), HeadSlot: beaconState.FinalizedCheckpoint().Epoch() * cfg.BeaconCfg.SlotsPerEpoch, HeadRoot: beaconState.FinalizedCheckpoint().BlockRoot(), - }) + }, logger) if err != nil { - log.Error("Could not start sentinel", "err", err) + logger.Error("Could not start sentinel", "err", err) return nil, err } - log.Info("Sentinel started", "addr", cfg.ServerAddr) + logger.Info("Sentinel started", "addr", cfg.ServerAddr) return s, nil } diff --git a/cmd/bootnode/main.go b/cmd/bootnode/main.go index b954f536e..58c732ae5 100644 --- a/cmd/bootnode/main.go +++ b/cmd/bootnode/main.go @@ -51,7 +51,7 @@ func main() { ) flag.Parse() - logging.SetupLogger("bootnode") + logger := logging.SetupLogger("bootnode") natm, err := nat.Parse(*natdesc) if err != nil { @@ -121,7 +121,7 @@ func main() { if err != nil { panic(err) } - ln := enode.NewLocalNode(db, nodeKey) + ln := enode.NewLocalNode(db, nodeKey, logger) cfg := discover.Config{ PrivateKey: nodeKey, NetRestrict: restrictList, diff --git a/cmd/caplin-phase1/main.go b/cmd/caplin-phase1/main.go index 54a91021e..ea53e7f2b 100644 --- a/cmd/caplin-phase1/main.go +++ b/cmd/caplin-phase1/main.go @@ -91,7 +91,7 @@ func runCaplinNode(cliCtx *cli.Context) error { FinalizedEpoch: state.FinalizedCheckpoint().Epoch(), HeadSlot: state.FinalizedCheckpoint().Epoch() * cfg.BeaconCfg.SlotsPerEpoch, HeadRoot: state.FinalizedCheckpoint().BlockRoot(), - }) + }, log.Root()) if err != nil { log.Error("Could not start sentinel", "err", err) } diff --git a/cmd/observer/main.go b/cmd/observer/main.go index 5173d4828..12f613e13 100644 --- a/cmd/observer/main.go +++ b/cmd/observer/main.go @@ -15,8 +15,8 @@ import ( "github.com/ledgerwatch/log/v3" ) -func mainWithFlags(ctx context.Context, flags observer.CommandFlags) error { - server, err := observer.NewServer(flags) +func mainWithFlags(ctx context.Context, flags observer.CommandFlags, logger log.Logger) error { + server, err := observer.NewServer(flags, logger) if err != nil { return err } diff --git a/cmd/observer/observer/command.go b/cmd/observer/observer/command.go index e22df0d67..357dcee8a 100644 --- a/cmd/observer/observer/command.go +++ b/cmd/observer/observer/command.go @@ -219,7 +219,7 @@ func (command *Command) withErigonLogPath() { command.command.Flags().StringVar(&command.flags.ErigonLogPath, flag.Name, flag.Value, flag.Usage) } -func (command *Command) ExecuteContext(ctx context.Context, runFunc func(ctx context.Context, flags CommandFlags) error) error { +func (command *Command) ExecuteContext(ctx context.Context, runFunc func(ctx context.Context, flags CommandFlags, logger log.Logger) error) error { command.command.PersistentPostRun = func(cmd *cobra.Command, args []string) { debug.Exit() } @@ -231,7 +231,7 @@ func (command *Command) ExecuteContext(ctx context.Context, runFunc func(ctx con return err } defer debug.Exit() - err = runFunc(cmd.Context(), command.flags) + err = runFunc(cmd.Context(), command.flags, logger) if errors.Is(err, context.Canceled) { return nil } diff --git a/cmd/observer/observer/server.go b/cmd/observer/observer/server.go index da04e8f42..f785561a7 100644 --- a/cmd/observer/observer/server.go +++ b/cmd/observer/observer/server.go @@ -30,10 +30,10 @@ type Server struct { natInterface nat.Interface discConfig discover.Config - log log.Logger + logger log.Logger } -func NewServer(flags CommandFlags) (*Server, error) { +func NewServer(flags CommandFlags, logger log.Logger) (*Server, error) { nodeDBPath := filepath.Join(flags.DataDir, "nodes", "eth66") nodeKeyConfig := p2p.NodeKeyConfig{} @@ -42,7 +42,7 @@ func NewServer(flags CommandFlags) (*Server, error) { return nil, err } - localNode, err := makeLocalNode(nodeDBPath, privateKey, flags.Chain) + localNode, err := makeLocalNode(nodeDBPath, privateKey, flags.Chain, logger) if err != nil { return nil, err } @@ -67,13 +67,11 @@ func NewServer(flags CommandFlags) (*Server, error) { return nil, fmt.Errorf("bootnodes parse error: %w", err) } - logger := log.New() - discConfig := discover.Config{ PrivateKey: privateKey, NetRestrict: netRestrictList, Bootnodes: bootnodes, - Log: logger.New(), + Log: logger, } instance := Server{ @@ -86,12 +84,12 @@ func NewServer(flags CommandFlags) (*Server, error) { return &instance, nil } -func makeLocalNode(nodeDBPath string, privateKey *ecdsa.PrivateKey, chain string) (*enode.LocalNode, error) { +func makeLocalNode(nodeDBPath string, privateKey *ecdsa.PrivateKey, chain string, logger log.Logger) (*enode.LocalNode, error) { db, err := enode.OpenDB(nodeDBPath, "") if err != nil { return nil, err } - localNode := enode.NewLocalNode(db, privateKey) + localNode := enode.NewLocalNode(db, privateKey, logger) localNode.SetFallbackIP(net.IP{127, 0, 0, 1}) forksEntry, err := makeForksENREntry(chain) @@ -144,13 +142,13 @@ func (server *Server) detectNATExternalIP() (net.IP, error) { return nil, errors.New("no NAT flag configured") } if _, hasExtIP := server.natInterface.(nat.ExtIP); !hasExtIP { - server.log.Debug("Detecting external IP...") + server.logger.Debug("Detecting external IP...") } ip, err := server.natInterface.ExternalIP() if err != nil { return nil, fmt.Errorf("NAT ExternalIP error: %w", err) } - server.log.Debug("External IP detected", "ip", ip) + server.logger.Debug("External IP detected", "ip", ip) return ip, nil } @@ -179,7 +177,7 @@ func (server *Server) Listen(ctx context.Context) (*discover.UDPv4, error) { server.mapNATPort(ctx, realAddr) } - server.log.Debug("Discovery UDP listener is up", "addr", realAddr) + server.logger.Debug("Discovery UDP listener is up", "addr", realAddr) return discover.ListenV4(ctx, conn, server.localNode, server.discConfig) } diff --git a/cmd/sentinel/main.go b/cmd/sentinel/main.go index e3a722ad8..07040fca9 100644 --- a/cmd/sentinel/main.go +++ b/cmd/sentinel/main.go @@ -52,7 +52,7 @@ func runSentinelNode(cliCtx *cli.Context) error { NetworkConfig: cfg.NetworkCfg, BeaconConfig: cfg.BeaconCfg, NoDiscovery: cfg.NoDiscovery, - }, nil, &service.ServerConfig{Network: cfg.ServerProtocol, Addr: cfg.ServerAddr}, nil, nil) + }, nil, &service.ServerConfig{Network: cfg.ServerProtocol, Addr: cfg.ServerAddr}, nil, nil, log.Root()) if err != nil { log.Error("[Sentinel] Could not start sentinel", "err", err) return err diff --git a/cmd/sentinel/sentinel/sentinel.go b/cmd/sentinel/sentinel/sentinel.go index 4004e9e2a..6c9190f67 100644 --- a/cmd/sentinel/sentinel/sentinel.go +++ b/cmd/sentinel/sentinel/sentinel.go @@ -76,6 +76,7 @@ type Sentinel struct { subManager *GossipManager metrics bool listenForPeersDoneCh chan struct{} + logger log.Logger } func (s *Sentinel) createLocalNode( @@ -88,7 +89,7 @@ func (s *Sentinel) createLocalNode( if err != nil { return nil, fmt.Errorf("could not open node's peer database: %w", err) } - localNode := enode.NewLocalNode(db, privKey) + localNode := enode.NewLocalNode(db, privKey, s.logger) ipEntry := enr.IP(ipAddr) udpEntry := enr.UDP(udpPort) @@ -231,12 +232,14 @@ func New( ctx context.Context, cfg *SentinelConfig, db kv.RoDB, + logger log.Logger, ) (*Sentinel, error) { s := &Sentinel{ ctx: ctx, cfg: cfg, db: db, metrics: true, + logger: logger, } // Setup discovery @@ -299,7 +302,7 @@ func (s *Sentinel) RecvGossip() <-chan *pubsub.Message { func (s *Sentinel) Start() error { if s.started { - log.Warn("[Sentinel] already running") + s.logger.Warn("[Sentinel] already running") } var err error s.listener, err = s.createListener() diff --git a/cmd/sentinel/sentinel/service/service.go b/cmd/sentinel/sentinel/service/service.go index a174a0515..d4d1efe39 100644 --- a/cmd/sentinel/sentinel/service/service.go +++ b/cmd/sentinel/sentinel/service/service.go @@ -28,14 +28,16 @@ type SentinelServer struct { sentinel *sentinel.Sentinel gossipNotifier *gossipNotifier - mu sync.RWMutex + mu sync.RWMutex + logger log.Logger } -func NewSentinelServer(ctx context.Context, sentinel *sentinel.Sentinel) *SentinelServer { +func NewSentinelServer(ctx context.Context, sentinel *sentinel.Sentinel, logger log.Logger) *SentinelServer { return &SentinelServer{ sentinel: sentinel, ctx: ctx, gossipNotifier: newGossipNotifier(), + logger: logger, } } @@ -121,7 +123,7 @@ func (s *SentinelServer) SubscribeGossip(_ *sentinelrpc.EmptyMessage, stream sen }, BlobIndex: packet.blobIndex, }); err != nil { - log.Warn("[Sentinel] Could not relay gossip packet", "reason", err) + s.logger.Warn("[Sentinel] Could not relay gossip packet", "reason", err) } } } @@ -256,7 +258,7 @@ func (s *SentinelServer) startServerBackgroundLoop() { peers := s.sentinel.PeersList() s.sentinel.Stop() status := s.sentinel.Status() - s.sentinel, err = createSentinel(s.sentinel.Config(), s.sentinel.DB()) + s.sentinel, err = createSentinel(s.sentinel.Config(), s.sentinel.DB(), s.logger) if err != nil { log.Warn("Could not coordinate sentinel", "err", err) continue @@ -274,7 +276,7 @@ func (s *SentinelServer) startServerBackgroundLoop() { func (s *SentinelServer) handleGossipPacket(pkt *pubsub.Message) error { var err error - log.Trace("[Sentinel Gossip] Received Packet", "topic", pkt.Topic) + s.logger.Trace("[Sentinel Gossip] Received Packet", "topic", pkt.Topic) data := pkt.GetData() // If we use snappy codec then decompress it accordingly. diff --git a/cmd/sentinel/sentinel/service/start.go b/cmd/sentinel/sentinel/service/start.go index 9bd026084..0a73bafd3 100644 --- a/cmd/sentinel/sentinel/service/start.go +++ b/cmd/sentinel/sentinel/service/start.go @@ -25,8 +25,8 @@ type ServerConfig struct { Addr string } -func createSentinel(cfg *sentinel.SentinelConfig, db kv.RoDB) (*sentinel.Sentinel, error) { - sent, err := sentinel.New(context.Background(), cfg, db) +func createSentinel(cfg *sentinel.SentinelConfig, db kv.RoDB, logger log.Logger) (*sentinel.Sentinel, error) { + sent, err := sentinel.New(context.Background(), cfg, db, logger) if err != nil { return nil, err } @@ -44,35 +44,35 @@ func createSentinel(cfg *sentinel.SentinelConfig, db kv.RoDB) (*sentinel.Sentine for _, v := range gossipTopics { if err := sent.Unsubscribe(v); err != nil { - log.Error("[Sentinel] failed to start sentinel", "err", err) + logger.Error("[Sentinel] failed to start sentinel", "err", err) continue } // now lets separately connect to the gossip topics. this joins the room subscriber, err := sent.SubscribeGossip(v) if err != nil { - log.Error("[Sentinel] failed to start sentinel", "err", err) + logger.Error("[Sentinel] failed to start sentinel", "err", err) } // actually start the subscription, aka listening and sending packets to the sentinel recv channel err = subscriber.Listen() if err != nil { - log.Error("[Sentinel] failed to start sentinel", "err", err) + logger.Error("[Sentinel] failed to start sentinel", "err", err) } } return sent, nil } -func StartSentinelService(cfg *sentinel.SentinelConfig, db kv.RoDB, srvCfg *ServerConfig, creds credentials.TransportCredentials, initialStatus *cltypes.Status) (sentinelrpc.SentinelClient, error) { +func StartSentinelService(cfg *sentinel.SentinelConfig, db kv.RoDB, srvCfg *ServerConfig, creds credentials.TransportCredentials, initialStatus *cltypes.Status, logger log.Logger) (sentinelrpc.SentinelClient, error) { ctx := context.Background() - sent, err := createSentinel(cfg, db) + sent, err := createSentinel(cfg, db, logger) if err != nil { return nil, err } rcmgrObs.MustRegisterWith(prometheus.DefaultRegisterer) - log.Info("[Sentinel] Sentinel started", "enr", sent.String()) + logger.Info("[Sentinel] Sentinel started", "enr", sent.String()) if initialStatus != nil { sent.SetStatus(initialStatus) } - server := NewSentinelServer(ctx, sent) + server := NewSentinelServer(ctx, sent, logger) if creds == nil { creds = insecure.NewCredentials() } diff --git a/cmd/sentry/sentry/sentry_grpc_server.go b/cmd/sentry/sentry/sentry_grpc_server.go index da7388f33..b532317bc 100644 --- a/cmd/sentry/sentry/sentry_grpc_server.go +++ b/cmd/sentry/sentry/sentry_grpc_server.go @@ -967,7 +967,7 @@ func (ss *GrpcServer) SetStatus(ctx context.Context, statusData *proto_sentry.St } // Add protocol - if err = srv.Start(ss.ctx); err != nil { + if err = srv.Start(ss.ctx, ss.logger); err != nil { srv.Stop() return reply, fmt.Errorf("could not start server: %w", err) } diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 3b56e195b..5c09a3c66 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -936,7 +936,6 @@ func NewP2PConfig( NoDiscovery: nodiscover, PrivateKey: serverKey, Name: nodeName, - Log: log.New(), NodeDatabase: enodeDBPath, AllowedPorts: allowedPorts, TmpDir: dirs.Tmp, diff --git a/eth/backend.go b/eth/backend.go index 7afc70895..d9149a998 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -601,7 +601,7 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere FinalizedEpoch: state.FinalizedCheckpoint().Epoch(), HeadSlot: state.FinalizedCheckpoint().Epoch() * beaconCfg.SlotsPerEpoch, HeadRoot: state.FinalizedCheckpoint().BlockRoot(), - }) + }, logger) if err != nil { return nil, err } diff --git a/p2p/discover/v4_udp_test.go b/p2p/discover/v4_udp_test.go index ac7e5910e..71d86277e 100644 --- a/p2p/discover/v4_udp_test.go +++ b/p2p/discover/v4_udp_test.go @@ -60,11 +60,11 @@ type udpTest struct { remoteaddr *net.UDPAddr } -func newUDPTest(t *testing.T) *udpTest { - return newUDPTestContext(context.Background(), t) +func newUDPTest(t *testing.T, logger log.Logger) *udpTest { + return newUDPTestContext(context.Background(), t, logger) } -func newUDPTestContext(ctx context.Context, t *testing.T) *udpTest { +func newUDPTestContext(ctx context.Context, t *testing.T, logger log.Logger) *udpTest { ctx = disableLookupSlowdown(ctx) replyTimeout := contextGetReplyTimeout(ctx) @@ -86,7 +86,7 @@ func newUDPTestContext(ctx context.Context, t *testing.T) *udpTest { if err != nil { panic(err) } - ln := enode.NewLocalNode(test.db, test.localkey) + ln := enode.NewLocalNode(test.db, test.localkey, logger) test.udp, err = ListenV4(ctx, test.pipe, ln, Config{ PrivateKey: test.localkey, Log: testlog.Logger(t, log.LvlError), @@ -166,7 +166,8 @@ func (test *udpTest) waitPacketOut(validate interface{}) (closed bool) { } func TestUDPv4_packetErrors(t *testing.T) { - test := newUDPTest(t) + logger := log.New() + test := newUDPTest(t, logger) defer test.close() test.packetIn(errExpired, &v4wire.Ping{From: testRemote, To: testLocalAnnounced, Version: 4}) @@ -177,7 +178,8 @@ func TestUDPv4_packetErrors(t *testing.T) { func TestUDPv4_pingTimeout(t *testing.T) { t.Parallel() - test := newUDPTest(t) + logger := log.New() + test := newUDPTest(t, logger) defer test.close() key := newkey() @@ -198,11 +200,12 @@ func TestUDPv4_responseTimeouts(t *testing.T) { t.Skip("unstable test on darwin") } t.Parallel() + logger := log.New() ctx := context.Background() ctx = contextWithReplyTimeout(ctx, respTimeout) - test := newUDPTestContext(ctx, t) + test := newUDPTestContext(ctx, t, logger) defer test.close() rand.Seed(time.Now().UnixNano()) @@ -274,7 +277,8 @@ func TestUDPv4_responseTimeouts(t *testing.T) { func TestUDPv4_findnodeTimeout(t *testing.T) { t.Parallel() - test := newUDPTest(t) + logger := log.New() + test := newUDPTest(t, logger) defer test.close() toaddr := &net.UDPAddr{IP: net.ParseIP("1.2.3.4"), Port: 2222} @@ -290,7 +294,8 @@ func TestUDPv4_findnodeTimeout(t *testing.T) { } func TestUDPv4_findnode(t *testing.T) { - test := newUDPTest(t) + logger := log.New() + test := newUDPTest(t, logger) defer test.close() // put a few nodes into the table. their exact @@ -347,7 +352,8 @@ func TestUDPv4_findnode(t *testing.T) { } func TestUDPv4_findnodeMultiReply(t *testing.T) { - test := newUDPTest(t) + logger := log.New() + test := newUDPTest(t, logger) defer test.close() rid := enode.PubkeyToIDV4(&test.remotekey.PublicKey) @@ -403,7 +409,8 @@ func TestUDPv4_findnodeMultiReply(t *testing.T) { // This test checks that reply matching of pong verifies the ping hash. func TestUDPv4_pingMatch(t *testing.T) { - test := newUDPTest(t) + logger := log.New() + test := newUDPTest(t, logger) defer test.close() randToken := make([]byte, 32) @@ -417,7 +424,8 @@ func TestUDPv4_pingMatch(t *testing.T) { // This test checks that reply matching of pong verifies the sender IP address. func TestUDPv4_pingMatchIP(t *testing.T) { - test := newUDPTest(t) + logger := log.New() + test := newUDPTest(t, logger) defer test.close() test.packetIn(nil, &v4wire.Ping{From: testRemote, To: testLocalAnnounced, Version: 4, Expiration: futureExp}) @@ -434,7 +442,8 @@ func TestUDPv4_pingMatchIP(t *testing.T) { } func TestUDPv4_successfulPing(t *testing.T) { - test := newUDPTest(t) + logger := log.New() + test := newUDPTest(t, logger) added := make(chan *node, 1) test.table.nodeAddedHook = func(n *node) { added <- n } defer test.close() @@ -500,7 +509,8 @@ func TestUDPv4_successfulPing(t *testing.T) { // This test checks that EIP-868 requests work. func TestUDPv4_EIP868(t *testing.T) { - test := newUDPTest(t) + logger := log.New() + test := newUDPTest(t, logger) defer test.close() test.udp.localNode.Set(enr.WithEntry("foo", "bar")) @@ -542,6 +552,7 @@ func TestUDPv4_smallNetConvergence(t *testing.T) { t.Skip("fix me on win please") } t.Parallel() + logger := log.New() ctx := context.Background() ctx = disableLookupSlowdown(ctx) @@ -558,7 +569,7 @@ func TestUDPv4_smallNetConvergence(t *testing.T) { cfg.PingBackDelay = time.Nanosecond cfg.TableRevalidateInterval = time.Hour - nodes[i] = startLocalhostV4(ctx, t, cfg) + nodes[i] = startLocalhostV4(ctx, t, cfg, logger) } defer func() { @@ -604,7 +615,7 @@ func TestUDPv4_smallNetConvergence(t *testing.T) { } } -func startLocalhostV4(ctx context.Context, t *testing.T, cfg Config) *UDPv4 { +func startLocalhostV4(ctx context.Context, t *testing.T, cfg Config, logger log.Logger) *UDPv4 { t.Helper() cfg.PrivateKey = newkey() @@ -613,7 +624,7 @@ func startLocalhostV4(ctx context.Context, t *testing.T, cfg Config) *UDPv4 { if err != nil { panic(err) } - ln := enode.NewLocalNode(db, cfg.PrivateKey) + ln := enode.NewLocalNode(db, cfg.PrivateKey, logger) // Prefix logs with node ID. lprefix := fmt.Sprintf("(%s)", ln.ID().TerminalString()) diff --git a/p2p/discover/v5_udp_test.go b/p2p/discover/v5_udp_test.go index af9d0407f..988e95b0e 100644 --- a/p2p/discover/v5_udp_test.go +++ b/p2p/discover/v5_udp_test.go @@ -38,7 +38,7 @@ import ( "github.com/ledgerwatch/erigon/rlp" ) -func startLocalhostV5(t *testing.T, cfg Config) *UDPv5 { +func startLocalhostV5(t *testing.T, cfg Config, logger log.Logger) *UDPv5 { cfg.PrivateKey = newkey() tmpDir := t.TempDir() db, err := enode.OpenDB("", tmpDir) @@ -46,7 +46,7 @@ func startLocalhostV5(t *testing.T, cfg Config) *UDPv5 { panic(err) } t.Cleanup(db.Close) - ln := enode.NewLocalNode(db, cfg.PrivateKey) + ln := enode.NewLocalNode(db, cfg.PrivateKey, logger) // Prefix logs with node ID. lprefix := fmt.Sprintf("(%s)", ln.ID().TerminalString()) @@ -80,10 +80,11 @@ func TestUDPv5_pingHandling(t *testing.T) { t.Skip("fix me on win please") } t.Parallel() - test := newUDPV5Test(t) + logger := log.New() + test := newUDPV5Test(t, logger) t.Cleanup(test.close) - test.packetIn(&v5wire.Ping{ReqID: []byte("foo")}) + test.packetIn(&v5wire.Ping{ReqID: []byte("foo")}, logger) test.waitPacketOut(func(p *v5wire.Pong, addr *net.UDPAddr, _ v5wire.Nonce) { if !bytes.Equal(p.ReqID, []byte("foo")) { t.Error("wrong request ID in response:", p.ReqID) @@ -100,7 +101,8 @@ func TestUDPv5_unknownPacket(t *testing.T) { t.Skip("fix me on win please") } t.Parallel() - test := newUDPV5Test(t) + logger := log.New() + test := newUDPV5Test(t, logger) t.Cleanup(test.close) nonce := v5wire.Nonce{1, 2, 3} @@ -118,16 +120,16 @@ func TestUDPv5_unknownPacket(t *testing.T) { } // Unknown packet from unknown node. - test.packetIn(&v5wire.Unknown{Nonce: nonce}) + test.packetIn(&v5wire.Unknown{Nonce: nonce}, logger) test.waitPacketOut(func(p *v5wire.Whoareyou, addr *net.UDPAddr, _ v5wire.Nonce) { check(p, 0) }) // Make node known. - n := test.getNode(test.remotekey, test.remoteaddr).Node() + n := test.getNode(test.remotekey, test.remoteaddr, logger).Node() test.table.addSeenNode(wrapNode(n)) - test.packetIn(&v5wire.Unknown{Nonce: nonce}) + test.packetIn(&v5wire.Unknown{Nonce: nonce}, logger) test.waitPacketOut(func(p *v5wire.Whoareyou, addr *net.UDPAddr, _ v5wire.Nonce) { check(p, n.Seq()) }) @@ -139,7 +141,8 @@ func TestUDPv5_findnodeHandling(t *testing.T) { t.Skip("fix me on win please") } t.Parallel() - test := newUDPV5Test(t) + logger := log.New() + test := newUDPV5Test(t, logger) t.Cleanup(test.close) // Create test nodes and insert them into the table. @@ -151,28 +154,28 @@ func TestUDPv5_findnodeHandling(t *testing.T) { fillTable(test.table, wrapNodes(nodes248)) // Requesting with distance zero should return the node's own record. - test.packetIn(&v5wire.Findnode{ReqID: []byte{0}, Distances: []uint{0}}) + test.packetIn(&v5wire.Findnode{ReqID: []byte{0}, Distances: []uint{0}}, logger) test.expectNodes([]byte{0}, 1, []*enode.Node{test.udp.Self()}) // Requesting with distance > 256 shouldn't crash. - test.packetIn(&v5wire.Findnode{ReqID: []byte{1}, Distances: []uint{4234098}}) + test.packetIn(&v5wire.Findnode{ReqID: []byte{1}, Distances: []uint{4234098}}, logger) test.expectNodes([]byte{1}, 1, nil) // Requesting with empty distance list shouldn't crash either. - test.packetIn(&v5wire.Findnode{ReqID: []byte{2}, Distances: []uint{}}) + test.packetIn(&v5wire.Findnode{ReqID: []byte{2}, Distances: []uint{}}, logger) test.expectNodes([]byte{2}, 1, nil) // This request gets no nodes because the corresponding bucket is empty. - test.packetIn(&v5wire.Findnode{ReqID: []byte{3}, Distances: []uint{254}}) + test.packetIn(&v5wire.Findnode{ReqID: []byte{3}, Distances: []uint{254}}, logger) test.expectNodes([]byte{3}, 1, nil) // This request gets all the distance-253 nodes. - test.packetIn(&v5wire.Findnode{ReqID: []byte{4}, Distances: []uint{253}}) + test.packetIn(&v5wire.Findnode{ReqID: []byte{4}, Distances: []uint{253}}, logger) test.expectNodes([]byte{4}, 4, nodes253) // This request gets all the distance-249 nodes and some more at 248 because // the bucket at 249 is not full. - test.packetIn(&v5wire.Findnode{ReqID: []byte{5}, Distances: []uint{249, 248}}) + test.packetIn(&v5wire.Findnode{ReqID: []byte{5}, Distances: []uint{249, 248}}, logger) nodes := make([]*enode.Node, 0, len(nodes249)+len(nodes248[:10])) nodes = append(nodes, nodes249...) nodes = append(nodes, nodes248[:10]...) @@ -226,10 +229,11 @@ func TestUDPv5_pingCall(t *testing.T) { t.Skip("fix me on win please") } t.Parallel() - test := newUDPV5Test(t) + logger := log.New() + test := newUDPV5Test(t, logger) t.Cleanup(test.close) - remote := test.getNode(test.remotekey, test.remoteaddr).Node() + remote := test.getNode(test.remotekey, test.remoteaddr, logger).Node() done := make(chan error, 1) // This ping times out. @@ -248,7 +252,7 @@ func TestUDPv5_pingCall(t *testing.T) { done <- err }() test.waitPacketOut(func(p *v5wire.Ping, addr *net.UDPAddr, _ v5wire.Nonce) { - test.packetInFrom(test.remotekey, test.remoteaddr, &v5wire.Pong{ReqID: p.ReqID}) + test.packetInFrom(test.remotekey, test.remoteaddr, &v5wire.Pong{ReqID: p.ReqID}, logger) }) if err := <-done; err != nil { t.Fatal(err) @@ -261,7 +265,7 @@ func TestUDPv5_pingCall(t *testing.T) { }() test.waitPacketOut(func(p *v5wire.Ping, addr *net.UDPAddr, _ v5wire.Nonce) { wrongAddr := &net.UDPAddr{IP: net.IP{33, 44, 55, 22}, Port: 10101} - test.packetInFrom(test.remotekey, wrongAddr, &v5wire.Pong{ReqID: p.ReqID}) + test.packetInFrom(test.remotekey, wrongAddr, &v5wire.Pong{ReqID: p.ReqID}, logger) }) if err := <-done; err != errTimeout { t.Fatalf("want errTimeout for reply from wrong IP, got %q", err) @@ -275,13 +279,14 @@ func TestUDPv5_findnodeCall(t *testing.T) { t.Skip("fix me on win please") } t.Parallel() - test := newUDPV5Test(t) + logger := log.New() + test := newUDPV5Test(t, logger) t.Cleanup(test.close) // Launch the request: var ( distances = []uint{230} - remote = test.getNode(test.remotekey, test.remoteaddr).Node() + remote = test.getNode(test.remotekey, test.remoteaddr, logger).Node() nodes = nodesAtDistance(remote.ID(), int(distances[0]), 8) done = make(chan error, 1) response []*enode.Node @@ -301,12 +306,12 @@ func TestUDPv5_findnodeCall(t *testing.T) { ReqID: p.ReqID, Total: 2, Nodes: nodesToRecords(nodes[:4]), - }) + }, logger) test.packetIn(&v5wire.Nodes{ ReqID: p.ReqID, Total: 2, Nodes: nodesToRecords(nodes[4:]), - }) + }, logger) }) // Check results: @@ -327,10 +332,11 @@ func TestUDPv5_callResend(t *testing.T) { t.Skip("fix me on win please") } t.Parallel() - test := newUDPV5Test(t) + logger := log.New() + test := newUDPV5Test(t, logger) t.Cleanup(test.close) - remote := test.getNode(test.remotekey, test.remoteaddr).Node() + remote := test.getNode(test.remotekey, test.remoteaddr, logger).Node() done := make(chan error, 2) go func() { _, err := test.udp.ping(remote) @@ -343,15 +349,15 @@ func TestUDPv5_callResend(t *testing.T) { // Ping answered by WHOAREYOU. test.waitPacketOut(func(p *v5wire.Ping, addr *net.UDPAddr, nonce v5wire.Nonce) { - test.packetIn(&v5wire.Whoareyou{Nonce: nonce}) + test.packetIn(&v5wire.Whoareyou{Nonce: nonce}, logger) }) // Ping should be re-sent. test.waitPacketOut(func(p *v5wire.Ping, addr *net.UDPAddr, _ v5wire.Nonce) { - test.packetIn(&v5wire.Pong{ReqID: p.ReqID}) + test.packetIn(&v5wire.Pong{ReqID: p.ReqID}, logger) }) // Answer the other ping. test.waitPacketOut(func(p *v5wire.Ping, addr *net.UDPAddr, _ v5wire.Nonce) { - test.packetIn(&v5wire.Pong{ReqID: p.ReqID}) + test.packetIn(&v5wire.Pong{ReqID: p.ReqID}, logger) }) if err := <-done; err != nil { t.Fatalf("unexpected ping error: %v", err) @@ -367,10 +373,11 @@ func TestUDPv5_multipleHandshakeRounds(t *testing.T) { t.Skip("fix me on win please") } t.Parallel() - test := newUDPV5Test(t) + logger := log.New() + test := newUDPV5Test(t, logger) t.Cleanup(test.close) - remote := test.getNode(test.remotekey, test.remoteaddr).Node() + remote := test.getNode(test.remotekey, test.remoteaddr, logger).Node() done := make(chan error, 1) go func() { _, err := test.udp.ping(remote) @@ -379,11 +386,11 @@ func TestUDPv5_multipleHandshakeRounds(t *testing.T) { // Ping answered by WHOAREYOU. test.waitPacketOut(func(p *v5wire.Ping, addr *net.UDPAddr, nonce v5wire.Nonce) { - test.packetIn(&v5wire.Whoareyou{Nonce: nonce}) + test.packetIn(&v5wire.Whoareyou{Nonce: nonce}, logger) }) // Ping answered by WHOAREYOU again. test.waitPacketOut(func(p *v5wire.Ping, addr *net.UDPAddr, nonce v5wire.Nonce) { - test.packetIn(&v5wire.Whoareyou{Nonce: nonce}) + test.packetIn(&v5wire.Whoareyou{Nonce: nonce}, logger) }) if err := <-done; err != errTimeout { t.Fatalf("unexpected ping error: %q", err) @@ -396,7 +403,8 @@ func TestUDPv5_talkHandling(t *testing.T) { t.Skip("fix me on win please") } t.Parallel() - test := newUDPV5Test(t) + logger := log.New() + test := newUDPV5Test(t, logger) t.Cleanup(test.close) var recvMessage []byte @@ -410,7 +418,7 @@ func TestUDPv5_talkHandling(t *testing.T) { ReqID: []byte("foo"), Protocol: "test", Message: []byte("test request"), - }) + }, logger) test.waitPacketOut(func(p *v5wire.TalkResponse, addr *net.UDPAddr, _ v5wire.Nonce) { if !bytes.Equal(p.ReqID, []byte("foo")) { t.Error("wrong request ID in response:", p.ReqID) @@ -429,7 +437,7 @@ func TestUDPv5_talkHandling(t *testing.T) { ReqID: []byte("2"), Protocol: "wrong", Message: []byte("test request"), - }) + }, logger) test.waitPacketOut(func(p *v5wire.TalkResponse, addr *net.UDPAddr, _ v5wire.Nonce) { if !bytes.Equal(p.ReqID, []byte("2")) { t.Error("wrong request ID in response:", p.ReqID) @@ -449,10 +457,11 @@ func TestUDPv5_talkRequest(t *testing.T) { t.Skip("fix me on win please") } t.Parallel() - test := newUDPV5Test(t) + logger := log.New() + test := newUDPV5Test(t, logger) t.Cleanup(test.close) - remote := test.getNode(test.remotekey, test.remoteaddr).Node() + remote := test.getNode(test.remotekey, test.remoteaddr, logger).Node() done := make(chan error, 1) // This request times out. @@ -480,7 +489,7 @@ func TestUDPv5_talkRequest(t *testing.T) { test.packetInFrom(test.remotekey, test.remoteaddr, &v5wire.TalkResponse{ ReqID: p.ReqID, Message: []byte("test response"), - }) + }, logger) }) if err := <-done; err != nil { t.Fatal(err) @@ -493,8 +502,9 @@ func TestUDPv5_LocalNode(t *testing.T) { t.Skip("fix me on win please") } t.Parallel() + logger := log.New() var cfg Config - node := startLocalhostV5(t, cfg) + node := startLocalhostV5(t, cfg, logger) defer node.Close() localNd := node.LocalNode() @@ -580,11 +590,11 @@ func (c *testCodec) decodeFrame(input []byte) (frame testCodecFrame, p v5wire.Pa return frame, p, err } -func newUDPV5Test(t *testing.T) *udpV5Test { - return newUDPV5TestContext(context.Background(), t) +func newUDPV5Test(t *testing.T, logger log.Logger) *udpV5Test { + return newUDPV5TestContext(context.Background(), t, logger) } -func newUDPV5TestContext(ctx context.Context, t *testing.T) *udpV5Test { +func newUDPV5TestContext(ctx context.Context, t *testing.T, logger log.Logger) *udpV5Test { ctx = disableLookupSlowdown(ctx) replyTimeout := contextGetReplyTimeout(ctx) @@ -609,7 +619,7 @@ func newUDPV5TestContext(ctx context.Context, t *testing.T) *udpV5Test { panic(err) } - ln := enode.NewLocalNode(test.db, test.localkey) + ln := enode.NewLocalNode(test.db, test.localkey, logger) ln.SetStaticIP(net.IP{10, 0, 0, 1}) ln.Set(enr.UDP(30303)) test.udp, err = ListenV5(ctx, test.pipe, ln, Config{ @@ -632,16 +642,16 @@ func newUDPV5TestContext(ctx context.Context, t *testing.T) *udpV5Test { } // handles a packet as if it had been sent to the transport. -func (test *udpV5Test) packetIn(packet v5wire.Packet) { +func (test *udpV5Test) packetIn(packet v5wire.Packet, logger log.Logger) { test.t.Helper() - test.packetInFrom(test.remotekey, test.remoteaddr, packet) + test.packetInFrom(test.remotekey, test.remoteaddr, packet, logger) } // handles a packet as if it had been sent to the transport by the key/endpoint. -func (test *udpV5Test) packetInFrom(key *ecdsa.PrivateKey, addr *net.UDPAddr, packet v5wire.Packet) { +func (test *udpV5Test) packetInFrom(key *ecdsa.PrivateKey, addr *net.UDPAddr, packet v5wire.Packet, logger log.Logger) { test.t.Helper() - ln := test.getNode(key, addr) + ln := test.getNode(key, addr, logger) codec := &testCodec{test: test, id: ln.ID()} enc, _, err := codec.Encode(test.udp.Self().ID(), addr.String(), packet, nil) if err != nil { @@ -653,7 +663,7 @@ func (test *udpV5Test) packetInFrom(key *ecdsa.PrivateKey, addr *net.UDPAddr, pa } // getNode ensures the test knows about a node at the given endpoint. -func (test *udpV5Test) getNode(key *ecdsa.PrivateKey, addr *net.UDPAddr) *enode.LocalNode { +func (test *udpV5Test) getNode(key *ecdsa.PrivateKey, addr *net.UDPAddr, logger log.Logger) *enode.LocalNode { id := enode.PubkeyToIDV4(&key.PublicKey) ln := test.nodesByID[id] if ln == nil { @@ -664,7 +674,7 @@ func (test *udpV5Test) getNode(key *ecdsa.PrivateKey, addr *net.UDPAddr) *enode. } test.t.Cleanup(db.Close) - ln = enode.NewLocalNode(db, key) + ln = enode.NewLocalNode(db, key, logger) ln.SetStaticIP(addr.IP) ln.Set(enr.UDP(addr.Port)) test.nodesByID[id] = ln diff --git a/p2p/discover/v5wire/crypto_test.go b/p2p/discover/v5wire/crypto_test.go index 7f28528c1..c5ce9c66e 100644 --- a/p2p/discover/v5wire/crypto_test.go +++ b/p2p/discover/v5wire/crypto_test.go @@ -28,6 +28,7 @@ import ( "github.com/ledgerwatch/erigon/common/hexutil" "github.com/ledgerwatch/erigon/crypto" "github.com/ledgerwatch/erigon/p2p/enode" + "github.com/ledgerwatch/log/v3" ) func TestVector_ECDH(t *testing.T) { @@ -41,10 +42,11 @@ func TestVector_ECDH(t *testing.T) { } func TestVector_KDF(t *testing.T) { + logger := log.New() var ( ephKey = hexPrivkey("0xfb757dc581730490a1d7a00deea65e9b1936924caaea8f44d476014856b68736") cdata = hexutil.MustDecode("0x000000000000000000000000000000006469736376350001010102030405060708090a0b0c00180102030405060708090a0b0c0d0e0f100000000000000000") - net = newHandshakeTest(t.TempDir()) + net = newHandshakeTest(t.TempDir(), logger) ) defer net.close() diff --git a/p2p/discover/v5wire/encoding_test.go b/p2p/discover/v5wire/encoding_test.go index 9fb7575dc..ebfea2e21 100644 --- a/p2p/discover/v5wire/encoding_test.go +++ b/p2p/discover/v5wire/encoding_test.go @@ -35,6 +35,7 @@ import ( "github.com/ledgerwatch/erigon/common/mclock" "github.com/ledgerwatch/erigon/crypto" "github.com/ledgerwatch/erigon/p2p/enode" + "github.com/ledgerwatch/log/v3" ) // To regenerate discv5 test vectors, run @@ -69,8 +70,9 @@ func TestMinSizes(t *testing.T) { // This test checks the basic handshake flow where A talks to B and A has no secrets. func TestHandshake(t *testing.T) { t.Parallel() + logger := log.New() tmpDir := t.TempDir() - net := newHandshakeTest(tmpDir) + net := newHandshakeTest(tmpDir, logger) defer net.close() // A -> B RANDOM PACKET @@ -101,8 +103,9 @@ func TestHandshake(t *testing.T) { // This test checks that handshake attempts are removed within the timeout. func TestHandshake_timeout(t *testing.T) { t.Parallel() + logger := log.New() tmpDir := t.TempDir() - net := newHandshakeTest(tmpDir) + net := newHandshakeTest(tmpDir, logger) defer net.close() // A -> B RANDOM PACKET @@ -127,8 +130,9 @@ func TestHandshake_timeout(t *testing.T) { // This test checks handshake behavior when no record is sent in the auth response. func TestHandshake_norecord(t *testing.T) { t.Parallel() + logger := log.New() tmpDir := t.TempDir() - net := newHandshakeTest(tmpDir) + net := newHandshakeTest(tmpDir, logger) defer net.close() // A -> B RANDOM PACKET @@ -169,7 +173,8 @@ func TestHandshake_rekey(t *testing.T) { t.Parallel() tmpDir := t.TempDir() - net := newHandshakeTest(tmpDir) + logger := log.New() + net := newHandshakeTest(tmpDir, logger) defer net.close() session := &session{ @@ -209,7 +214,8 @@ func TestHandshake_rekey(t *testing.T) { func TestHandshake_rekey2(t *testing.T) { t.Parallel() tmpDir := t.TempDir() - net := newHandshakeTest(tmpDir) + logger := log.New() + net := newHandshakeTest(tmpDir, logger) defer net.close() initKeysA := &session{ @@ -244,7 +250,8 @@ func TestHandshake_rekey2(t *testing.T) { func TestHandshake_BadHandshakeAttack(t *testing.T) { t.Parallel() tmpDir := t.TempDir() - net := newHandshakeTest(tmpDir) + logger := log.New() + net := newHandshakeTest(tmpDir, logger) defer net.close() // A -> B RANDOM PACKET @@ -285,7 +292,8 @@ func TestHandshake_BadHandshakeAttack(t *testing.T) { func TestDecodeErrorsV5(t *testing.T) { t.Parallel() tmpDir := t.TempDir() - net := newHandshakeTest(tmpDir) + logger := log.New() + net := newHandshakeTest(tmpDir, logger) defer net.close() net.nodeA.expectDecodeErr(t, errTooShort, []byte{}) @@ -296,6 +304,7 @@ func TestDecodeErrorsV5(t *testing.T) { // This test checks that all test vectors can be decoded. func TestTestVectorsV5(t *testing.T) { + logger := log.New() var ( idA = enode.PubkeyToIDV4(&testKeyA.PublicKey) idB = enode.PubkeyToIDV4(&testKeyB.PublicKey) @@ -315,7 +324,7 @@ func TestTestVectorsV5(t *testing.T) { challenge0A, challenge1A, challenge0B = c, c, c challenge1A.RecordSeq = 1 tmpDir := t.TempDir() - net := newHandshakeTest(tmpDir) + net := newHandshakeTest(tmpDir, logger) challenge0A.Node = net.nodeA.n() challenge0B.Node = net.nodeB.n() challenge1A.Node = net.nodeA.n() @@ -374,7 +383,7 @@ func TestTestVectorsV5(t *testing.T) { for _, test := range tests { test := test t.Run(test.name, func(t *testing.T) { - net := newHandshakeTest(tmpDir) + net := newHandshakeTest(tmpDir, logger) defer net.close() // Override all random inputs. @@ -443,7 +452,8 @@ func testVectorComment(net *handshakeTest, p Packet, challenge *Whoareyou, nonce // This benchmark checks performance of handshake packet decoding. func BenchmarkV5_DecodeHandshakePingSecp256k1(b *testing.B) { tmpDir := b.TempDir() - net := newHandshakeTest(tmpDir) + logger := log.New() + net := newHandshakeTest(tmpDir, logger) defer net.close() var ( @@ -472,7 +482,8 @@ func BenchmarkV5_DecodeHandshakePingSecp256k1(b *testing.B) { // This benchmark checks how long it takes to decode an encrypted ping packet. func BenchmarkV5_DecodePing(b *testing.B) { tmpDir := b.TempDir() - net := newHandshakeTest(tmpDir) + logger := log.New() + net := newHandshakeTest(tmpDir, logger) defer net.close() session := &session{ @@ -511,10 +522,10 @@ type handshakeTestNode struct { c *Codec } -func newHandshakeTest(tmpDir string) *handshakeTest { +func newHandshakeTest(tmpDir string, logger log.Logger) *handshakeTest { t := new(handshakeTest) - t.nodeA.init(testKeyA, net.IP{127, 0, 0, 1}, &t.clock, tmpDir) - t.nodeB.init(testKeyB, net.IP{127, 0, 0, 1}, &t.clock, tmpDir) + t.nodeA.init(testKeyA, net.IP{127, 0, 0, 1}, &t.clock, tmpDir, logger) + t.nodeB.init(testKeyB, net.IP{127, 0, 0, 1}, &t.clock, tmpDir, logger) return t } @@ -523,12 +534,12 @@ func (t *handshakeTest) close() { t.nodeB.ln.Database().Close() } -func (n *handshakeTestNode) init(key *ecdsa.PrivateKey, ip net.IP, clock mclock.Clock, tmpDir string) { +func (n *handshakeTestNode) init(key *ecdsa.PrivateKey, ip net.IP, clock mclock.Clock, tmpDir string, logger log.Logger) { db, err := enode.OpenDB("", tmpDir) if err != nil { panic(err) } - n.ln = enode.NewLocalNode(db, key) + n.ln = enode.NewLocalNode(db, key, logger) n.ln.SetStaticIP(ip) if n.ln.Node().Seq() != 1 { panic(fmt.Errorf("unexpected seq %d", n.ln.Node().Seq())) diff --git a/p2p/enode/localnode.go b/p2p/enode/localnode.go index 8d3df4439..d146469ba 100644 --- a/p2p/enode/localnode.go +++ b/p2p/enode/localnode.go @@ -53,6 +53,7 @@ type LocalNode struct { entries map[string]enr.Entry endpoint4 lnEndpoint endpoint6 lnEndpoint + logger log.Logger } type lnEndpoint struct { @@ -62,7 +63,7 @@ type lnEndpoint struct { } // NewLocalNode creates a local node. -func NewLocalNode(db *DB, key *ecdsa.PrivateKey) *LocalNode { +func NewLocalNode(db *DB, key *ecdsa.PrivateKey, logger log.Logger) *LocalNode { ln := &LocalNode{ id: PubkeyToIDV4(&key.PublicKey), db: db, @@ -74,6 +75,7 @@ func NewLocalNode(db *DB, key *ecdsa.PrivateKey) *LocalNode { endpoint6: lnEndpoint{ track: netutil.NewIPTracker(iptrackWindow, iptrackContactWindow, iptrackMinStatements), }, + logger: logger, } ln.seq = db.localSeq(ln.id) ln.invalidate() @@ -281,7 +283,7 @@ func (ln *LocalNode) sign() { panic(fmt.Errorf("enode: can't verify local record: %w", err)) } ln.cur.Store(n) - log.Trace("New local node record", "seq", ln.seq, "id", n.ID(), "ip", n.IP(), "udp", n.UDP(), "tcp", n.TCP()) + ln.logger.Trace("New local node record", "seq", ln.seq, "id", n.ID(), "ip", n.IP(), "udp", n.UDP(), "tcp", n.TCP()) } func (ln *LocalNode) bumpSeq() { diff --git a/p2p/enode/localnode_test.go b/p2p/enode/localnode_test.go index 62c2eed56..a09a70869 100644 --- a/p2p/enode/localnode_test.go +++ b/p2p/enode/localnode_test.go @@ -23,21 +23,23 @@ import ( "github.com/ledgerwatch/erigon/crypto" "github.com/ledgerwatch/erigon/p2p/enr" + "github.com/ledgerwatch/log/v3" "github.com/stretchr/testify/assert" ) -func newLocalNodeForTesting(tmpDir string) (*LocalNode, *DB) { +func newLocalNodeForTesting(tmpDir string, logger log.Logger) (*LocalNode, *DB) { db, err := OpenDB("", tmpDir) if err != nil { panic(err) } key, _ := crypto.GenerateKey() - return NewLocalNode(db, key), db + return NewLocalNode(db, key, logger), db } func TestLocalNode(t *testing.T) { tmpDir := t.TempDir() - ln, db := newLocalNodeForTesting(tmpDir) + logger := log.New() + ln, db := newLocalNodeForTesting(tmpDir, logger) defer db.Close() if ln.Node().ID() != ln.ID() { @@ -55,7 +57,8 @@ func TestLocalNode(t *testing.T) { func TestLocalNodeSeqPersist(t *testing.T) { tmpDir := t.TempDir() - ln, db := newLocalNodeForTesting(tmpDir) + logger := log.New() + ln, db := newLocalNodeForTesting(tmpDir, logger) defer db.Close() if s := ln.Node().Seq(); s != 1 { @@ -69,7 +72,7 @@ func TestLocalNodeSeqPersist(t *testing.T) { // Create a new instance, it should reload the sequence number. // The number increases just after that because a new record is // created without the "x" entry. - ln2 := NewLocalNode(db, ln.key) + ln2 := NewLocalNode(db, ln.key, logger) if s := ln2.Node().Seq(); s != 3 { t.Fatalf("wrong seq %d on new instance, want 3", s) } @@ -77,7 +80,7 @@ func TestLocalNodeSeqPersist(t *testing.T) { // Create a new instance with a different node key on the same database. // This should reset the sequence number. key, _ := crypto.GenerateKey() - ln3 := NewLocalNode(db, key) + ln3 := NewLocalNode(db, key, logger) if s := ln3.Node().Seq(); s != 1 { t.Fatalf("wrong seq %d on instance with changed key, want 1", s) } @@ -91,7 +94,8 @@ func TestLocalNodeEndpoint(t *testing.T) { staticIP = net.IP{127, 0, 1, 2} ) tmpDir := t.TempDir() - ln, db := newLocalNodeForTesting(tmpDir) + logger := log.New() + ln, db := newLocalNodeForTesting(tmpDir, logger) defer db.Close() // Nothing is set initially. diff --git a/p2p/server.go b/p2p/server.go index e6ce19845..6fa4f8766 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -165,9 +165,6 @@ type Config struct { // whenever a message is sent to or received from a peer EnableMsgEvents bool - // Log is a custom logger to use with the p2p.Server. - Log log.Logger `toml:",omitempty"` - // it is actually used but a linter got confused clock mclock.Clock //nolint:structcheck @@ -194,7 +191,7 @@ type Server struct { ourHandshake *protoHandshake loopWG sync.WaitGroup // loop, listenLoop peerFeed event.Feed - log log.Logger + logger log.Logger nodedb *enode.DB localnode *enode.LocalNode @@ -470,22 +467,19 @@ func (srv *Server) Running() bool { // Start starts running the server. // Servers can not be re-used after stopping. -func (srv *Server) Start(ctx context.Context) error { +func (srv *Server) Start(ctx context.Context, logger log.Logger) error { srv.lock.Lock() defer srv.lock.Unlock() if srv.running { return errors.New("server already running") } - srv.log = srv.Config.Log - if srv.log == nil { - srv.log = log.Root() - } + srv.logger = logger if srv.clock == nil { srv.clock = mclock.System{} } if srv.NoDial && srv.ListenAddr == "" { - srv.log.Warn("P2P server will be useless, neither dialing nor listening") + srv.logger.Warn("P2P server will be useless, neither dialing nor listening") } // static fields @@ -549,7 +543,7 @@ func (srv *Server) setupLocalNode() error { return err } srv.nodedb = db - srv.localnode = enode.NewLocalNode(db, srv.PrivateKey) + srv.localnode = enode.NewLocalNode(db, srv.PrivateKey, srv.logger) srv.localnode.SetFallbackIP(net.IP{127, 0, 0, 1}) srv.updateLocalNodeStaticAddrCache() // TODO: check conflicts @@ -608,7 +602,7 @@ func (srv *Server) setupDiscovery(ctx context.Context) error { return err } realaddr := conn.LocalAddr().(*net.UDPAddr) - srv.log.Trace("UDP listener up", "addr", realaddr) + srv.logger.Trace("UDP listener up", "addr", realaddr) if srv.NAT != nil { if !realaddr.IP.IsLoopback() && srv.NAT.SupportsMapping() { srv.loopWG.Add(1) @@ -634,7 +628,7 @@ func (srv *Server) setupDiscovery(ctx context.Context) error { NetRestrict: srv.NetRestrict, Bootnodes: srv.BootstrapNodes, Unhandled: unhandled, - Log: srv.log, + Log: srv.logger, } ntab, err := discover.ListenV4(ctx, conn, srv.localnode, cfg) if err != nil { @@ -650,7 +644,7 @@ func (srv *Server) setupDiscovery(ctx context.Context) error { PrivateKey: srv.PrivateKey, NetRestrict: srv.NetRestrict, Bootnodes: srv.BootstrapNodesV5, - Log: srv.log, + Log: srv.logger, } var err error if sconn != nil { @@ -670,7 +664,7 @@ func (srv *Server) setupDialScheduler() { self: srv.localnode.ID(), maxDialPeers: srv.maxDialedConns(), maxActiveDials: srv.MaxPendingPeers, - log: srv.Log, + log: srv.logger, netRestrict: srv.NetRestrict, dialer: srv.Dialer, clock: srv.clock, @@ -754,7 +748,7 @@ func (srv *Server) doPeerOp(fn peerOpFunc) { func (srv *Server) run() { defer debug.LogPanic() if len(srv.Config.Protocols) > 0 { - srv.log.Info("Started P2P networking", "version", srv.Config.Protocols[0].Version, "self", *srv.localnodeAddrCache.Load(), "name", srv.Name) + srv.logger.Info("Started P2P networking", "version", srv.Config.Protocols[0].Version, "self", *srv.localnodeAddrCache.Load(), "name", srv.Name) } defer srv.loopWG.Done() defer srv.nodedb.Close() @@ -781,7 +775,7 @@ running: case n := <-srv.addtrusted: // This channel is used by AddTrustedPeer to add a node // to the trusted node set. - srv.log.Trace("Adding trusted node", "node", n) + srv.logger.Trace("Adding trusted node", "node", n) trusted[n.ID()] = true if p, ok := peers[n.ID()]; ok { p.rw.set(trustedConn, true) @@ -790,7 +784,7 @@ running: case n := <-srv.removetrusted: // This channel is used by RemoveTrustedPeer to remove a node // from the trusted node set. - srv.log.Trace("Removing trusted node", "node", n) + srv.logger.Trace("Removing trusted node", "node", n) delete(trusted, n.ID()) if p, ok := peers[n.ID()]; ok { p.rw.set(trustedConn, false) @@ -818,7 +812,7 @@ running: // The handshakes are done and it passed all checks. p := srv.launchPeer(c, c.pubkey) peers[c.node.ID()] = p - srv.log.Trace("Adding p2p peer", "peercount", len(peers), "id", p.ID(), "conn", c.flags, "addr", p.RemoteAddr(), "name", p.Name()) + srv.logger.Trace("Adding p2p peer", "peercount", len(peers), "id", p.ID(), "conn", c.flags, "addr", p.RemoteAddr(), "name", p.Name()) srv.dialsched.peerAdded(c) if p.Inbound() { inboundCount++ @@ -830,7 +824,7 @@ running: // A peer disconnected. d := common.PrettyDuration(mclock.Now() - pd.created) delete(peers, pd.ID()) - srv.log.Trace("Removing p2p peer", "peercount", len(peers), "id", pd.ID(), "duration", d, "req", pd.requested, "err", pd.err) + srv.logger.Trace("Removing p2p peer", "peercount", len(peers), "id", pd.ID(), "duration", d, "req", pd.requested, "err", pd.err) srv.dialsched.peerRemoved(pd.rw) if pd.Inbound() { inboundCount-- @@ -838,7 +832,7 @@ running: } } - srv.log.Trace("P2P networking is spinning down") + srv.logger.Trace("P2P networking is spinning down") // Terminate discovery. If there is a running lookup it will terminate soon. if srv.ntab != nil { @@ -856,7 +850,7 @@ running: // is closed. for len(peers) > 0 { p := <-srv.delpeer - p.log.Trace("<-delpeer (spindown)") + srv.logger.Trace("<-delpeer (spindown)") delete(peers, p.ID()) } } @@ -881,7 +875,7 @@ func (srv *Server) postHandshakeChecks(peers map[enode.ID]*Peer, inboundCount in // listenLoop runs in its own goroutine and accepts // inbound connections. func (srv *Server) listenLoop(ctx context.Context) { - srv.log.Trace("TCP listener up", "addr", srv.listener.Addr()) + srv.logger.Trace("TCP listener up", "addr", srv.listener.Addr()) // The slots limit accepts of new connections. slots := semaphore.NewWeighted(int64(srv.MaxPendingPeers)) @@ -896,7 +890,7 @@ func (srv *Server) listenLoop(ctx context.Context) { // Wait for a free slot before accepting. if slotErr := slots.Acquire(ctx, 1); slotErr != nil { if !errors.Is(slotErr, context.Canceled) { - srv.log.Error("Failed to get a peer connection slot", "err", slotErr) + srv.logger.Error("Failed to get a peer connection slot", "err", slotErr) } return } @@ -910,7 +904,7 @@ func (srv *Server) listenLoop(ctx context.Context) { fd, err = srv.listener.Accept() if netutil.IsTemporaryError(err) { if time.Since(lastLog) > 1*time.Second { - srv.log.Trace("Temporary read error", "err", err) + srv.logger.Trace("Temporary read error", "err", err) lastLog = time.Now() } time.Sleep(time.Millisecond * 200) @@ -920,7 +914,7 @@ func (srv *Server) listenLoop(ctx context.Context) { select { case <-srv.quit: default: - srv.log.Error("Server listener failed to accept a connection", "err", err) + srv.logger.Error("Server listener failed to accept a connection", "err", err) } slots.Release(1) return @@ -930,7 +924,7 @@ func (srv *Server) listenLoop(ctx context.Context) { remoteIP := netutil.AddrIP(fd.RemoteAddr()) if err := srv.checkInboundConn(fd, remoteIP); err != nil { - srv.log.Trace("Rejected inbound connection", "addr", fd.RemoteAddr(), "err", err) + srv.logger.Trace("Rejected inbound connection", "addr", fd.RemoteAddr(), "err", err) _ = fd.Close() slots.Release(1) continue @@ -941,7 +935,7 @@ func (srv *Server) listenLoop(ctx context.Context) { addr = tcp } fd = newMeteredConn(fd, true, addr) - srv.log.Trace("Accepted connection", "addr", fd.RemoteAddr()) + srv.logger.Trace("Accepted connection", "addr", fd.RemoteAddr()) } go func() { defer debug.LogPanic() @@ -1003,7 +997,7 @@ func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *enode.Node) erro dialPubkey = new(ecdsa.PublicKey) if err := dialDest.Load((*enode.Secp256k1)(dialPubkey)); err != nil { err = errors.New("dial destination doesn't have a secp256k1 public key") - srv.log.Trace("Setting up connection failed", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err) + srv.logger.Trace("Setting up connection failed", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err) return err } } @@ -1011,7 +1005,7 @@ func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *enode.Node) erro // Run the RLPx handshake. remotePubkey, err := c.doEncHandshake(srv.PrivateKey) if err != nil { - srv.log.Trace("Failed RLPx handshake", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err) + srv.logger.Trace("Failed RLPx handshake", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err) return err } copy(c.pubkey[:], crypto.MarshalPubkey(remotePubkey)) @@ -1020,7 +1014,7 @@ func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *enode.Node) erro } else { c.node = nodeFromConn(remotePubkey, c.fd) } - clog := srv.log.New("id", c.node.ID(), "addr", c.fd.RemoteAddr(), "conn", c.flags) + clog := srv.logger.New("id", c.node.ID(), "addr", c.fd.RemoteAddr(), "conn", c.flags) err = srv.checkpoint(c, srv.checkpointPostHandshake) if err != nil { clog.Trace("Rejected peer", "err", err) @@ -1069,7 +1063,7 @@ func (srv *Server) checkpoint(c *conn, stage chan<- *conn) error { } func (srv *Server) launchPeer(c *conn, pubkey [64]byte) *Peer { - p := newPeer(srv.log, c, srv.Protocols, pubkey, srv.MetricsEnabled) + p := newPeer(srv.logger, c, srv.Protocols, pubkey, srv.MetricsEnabled) if srv.EnableMsgEvents { // If message events are enabled, pass the peerFeed // to the peer. diff --git a/p2p/server_test.go b/p2p/server_test.go index 1decb3ead..dfa972af6 100644 --- a/p2p/server_test.go +++ b/p2p/server_test.go @@ -33,7 +33,6 @@ import ( "github.com/ledgerwatch/erigon/p2p/enode" "github.com/ledgerwatch/erigon/p2p/enr" "github.com/ledgerwatch/erigon/p2p/rlpx" - "github.com/ledgerwatch/erigon/turbo/testlog" "github.com/ledgerwatch/log/v3" ) @@ -68,7 +67,7 @@ func (c *testTransport) close(err error) { c.closeErr = err } -func startTestServer(t *testing.T, remoteKey *ecdsa.PublicKey, pf func(*Peer)) *Server { +func startTestServer(t *testing.T, remoteKey *ecdsa.PublicKey, pf func(*Peer), logger log.Logger) *Server { config := Config{ Name: "test", MaxPeers: 10, @@ -76,7 +75,6 @@ func startTestServer(t *testing.T, remoteKey *ecdsa.PublicKey, pf func(*Peer)) * ListenAddr: "127.0.0.1:0", NoDiscovery: true, PrivateKey: newkey(), - Log: testlog.Logger(t, log.LvlError), } server := &Server{ Config: config, @@ -85,17 +83,18 @@ func startTestServer(t *testing.T, remoteKey *ecdsa.PublicKey, pf func(*Peer)) * return newTestTransport(remoteKey, fd, dialDest) }, } - if err := server.TestStart(); err != nil { + if err := server.TestStart(logger); err != nil { t.Fatalf("Could not start server: %v", err) } return server } -func (srv *Server) TestStart() error { - return srv.Start(context.Background()) +func (srv *Server) TestStart(logger log.Logger) error { + return srv.Start(context.Background(), logger) } func TestServerListen(t *testing.T) { + logger := log.New() // start the test server connected := make(chan *Peer) remid := &newkey().PublicKey @@ -104,7 +103,7 @@ func TestServerListen(t *testing.T) { t.Error("peer func called with wrong node id") } connected <- p - }) + }, logger) defer close(connected) defer srv.Stop() @@ -131,6 +130,7 @@ func TestServerListen(t *testing.T) { } func TestServerDial(t *testing.T) { + logger := log.New() // run a one-shot TCP server to handle the connection. listener, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { @@ -149,7 +149,7 @@ func TestServerDial(t *testing.T) { // start the server connected := make(chan *Peer) remid := &newkey().PublicKey - srv := startTestServer(t, remid, func(p *Peer) { connected <- p }) + srv := startTestServer(t, remid, func(p *Peer) { connected <- p }, logger) defer close(connected) defer srv.Stop() @@ -212,12 +212,12 @@ func TestServerDial(t *testing.T) { // This test checks that RemovePeer disconnects the peer if it is connected. func TestServerRemovePeerDisconnect(t *testing.T) { + logger := log.New() srv1 := &Server{Config: Config{ PrivateKey: newkey(), MaxPeers: 1, MaxPendingPeers: 1, NoDiscovery: true, - Log: testlog.Logger(t, log.LvlTrace).New("server", "1"), }} srv2 := &Server{Config: Config{ PrivateKey: newkey(), @@ -226,13 +226,12 @@ func TestServerRemovePeerDisconnect(t *testing.T) { NoDiscovery: true, NoDial: true, ListenAddr: "127.0.0.1:0", - Log: testlog.Logger(t, log.LvlTrace).New("server", "2"), }} - if err := srv1.TestStart(); err != nil { + if err := srv1.TestStart(logger); err != nil { t.Fatal("cant start srv1") } defer srv1.Stop() - if err := srv2.TestStart(); err != nil { + if err := srv2.TestStart(logger); err != nil { t.Fatal("cant start srv2") } defer srv2.Stop() @@ -249,6 +248,7 @@ func TestServerRemovePeerDisconnect(t *testing.T) { // This test checks that connections are disconnected just after the encryption handshake // when the server is at capacity. Trusted connections should still be accepted. func TestServerAtCap(t *testing.T) { + logger := log.New() trustedNode := newkey() trustedID := enode.PubkeyToIDV4(&trustedNode.PublicKey) srv := &Server{ @@ -259,10 +259,9 @@ func TestServerAtCap(t *testing.T) { NoDial: true, NoDiscovery: true, TrustedNodes: []*enode.Node{newNode(trustedID, "")}, - Log: testlog.Logger(t, log.LvlTrace), }, } - if err := srv.TestStart(); err != nil { + if err := srv.TestStart(logger); err != nil { t.Fatalf("could not start: %v", err) } defer srv.Stop() @@ -329,6 +328,7 @@ func TestServerAtCap(t *testing.T) { } func TestServerPeerLimits(t *testing.T) { + logger := log.New() srvkey := newkey() clientkey := newkey() clientnode := enode.NewV4(&clientkey.PublicKey, nil, 0, 0) @@ -350,11 +350,10 @@ func TestServerPeerLimits(t *testing.T) { NoDial: true, NoDiscovery: true, Protocols: []Protocol{discard}, - Log: testlog.Logger(t, log.LvlTrace), }, newTransport: func(fd net.Conn, dialDest *ecdsa.PublicKey) transport { return tp }, } - if err := srv.TestStart(); err != nil { + if err := srv.TestStart(logger); err != nil { t.Fatalf("couldn't start server: %v", err) } defer srv.Stop() @@ -391,6 +390,7 @@ func TestServerPeerLimits(t *testing.T) { } func TestServerSetupConn(t *testing.T) { + logger := log.New() var ( clientkey, srvkey = newkey(), newkey() clientpub = &clientkey.PublicKey @@ -454,15 +454,13 @@ func TestServerSetupConn(t *testing.T) { NoDial: true, NoDiscovery: true, Protocols: []Protocol{discard}, - Log: testlog.Logger(t, log.LvlTrace), } srv := &Server{ Config: cfg, newTransport: func(fd net.Conn, dialDest *ecdsa.PublicKey) transport { return test.tt }, //nolint:scopelint - log: cfg.Log, } if !test.dontstart { - if err := srv.TestStart(); err != nil { + if err := srv.TestStart(logger); err != nil { t.Fatalf("couldn't start server: %v", err) } defer srv.Stop() @@ -540,6 +538,7 @@ func randomID() (id enode.ID) { // This test checks that inbound connections are throttled by IP. func TestServerInboundThrottle(t *testing.T) { + logger := log.New() const timeout = 5 * time.Second newTransportCalled := make(chan struct{}) srv := &Server{ @@ -551,7 +550,6 @@ func TestServerInboundThrottle(t *testing.T) { NoDial: true, NoDiscovery: true, Protocols: []Protocol{discard}, - Log: testlog.Logger(t, log.LvlTrace), }, newTransport: func(fd net.Conn, dialDest *ecdsa.PublicKey) transport { newTransportCalled <- struct{}{} @@ -562,7 +560,7 @@ func TestServerInboundThrottle(t *testing.T) { return listenFakeAddr(network, laddr, fakeAddr) }, } - if err := srv.TestStart(); err != nil { + if err := srv.TestStart(logger); err != nil { t.Fatal("can't start: ", err) } defer srv.Stop() diff --git a/turbo/logging/logging.go b/turbo/logging/logging.go index 8e46a6a32..588f6c862 100644 --- a/turbo/logging/logging.go +++ b/turbo/logging/logging.go @@ -106,7 +106,7 @@ func SetupLoggerCmd(filePrefix string, cmd *cobra.Command) log.Logger { // SetupLoggerCmd perform the logging using parametrs specifying by `flag` package, and sets it to the root logger // This is the function which is NOT used by Erigon itself, but instead by utility commans -func SetupLogger(filePrefix string) { +func SetupLogger(filePrefix string) log.Logger { var logConsoleVerbosity = flag.String(LogConsoleVerbosityFlag.Name, "", LogConsoleVerbosityFlag.Usage) var logDirVerbosity = flag.String(LogDirVerbosityFlag.Name, "", LogDirVerbosityFlag.Usage) var logDirPath = flag.String(LogDirPathFlag.Name, "", LogDirPathFlag.Usage) @@ -134,6 +134,7 @@ func SetupLogger(filePrefix string) { } initSeparatedLogging(log.Root(), filePrefix, *logDirPath, consoleLevel, dirLevel, consoleJson, *dirJson) + return log.Root() } // initSeparatedLogging construct a log handler accrosing to the configuration parameters passed to it