[devnet] separate logging - p2p (#7547)

Co-authored-by: Alex Sharp <alexsharp@Alexs-MacBook-Pro-2.local>
This commit is contained in:
ledgerwatch 2023-05-19 18:41:53 +01:00 committed by GitHub
parent b382f96f6c
commit b0117a7c30
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 224 additions and 189 deletions

View File

@ -95,7 +95,7 @@ func runConsensusLayerNode(cliCtx *cli.Context) error {
// Start the sentinel service
log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(cfg.LogLvl), log.StderrHandler))
log.Info("[Sentinel] running sentinel with configuration", "cfg", cfg)
s, err := startSentinel(cliCtx, *cfg, cpState)
s, err := startSentinel(cliCtx, *cfg, cpState, log.Root())
if err != nil {
log.Error("Could not start sentinel service", "err", err)
}
@ -131,7 +131,7 @@ Loop:
return nil
}
func startSentinel(cliCtx *cli.Context, cfg lcCli.ConsensusClientCliCfg, beaconState *state.BeaconState) (sentinelrpc.SentinelClient, error) {
func startSentinel(cliCtx *cli.Context, cfg lcCli.ConsensusClientCliCfg, beaconState *state.BeaconState, logger log.Logger) (sentinelrpc.SentinelClient, error) {
forkDigest, err := fork.ComputeForkDigest(cfg.BeaconCfg, cfg.GenesisCfg)
if err != nil {
return nil, err
@ -150,12 +150,12 @@ func startSentinel(cliCtx *cli.Context, cfg lcCli.ConsensusClientCliCfg, beaconS
FinalizedEpoch: beaconState.FinalizedCheckpoint().Epoch(),
HeadSlot: beaconState.FinalizedCheckpoint().Epoch() * cfg.BeaconCfg.SlotsPerEpoch,
HeadRoot: beaconState.FinalizedCheckpoint().BlockRoot(),
})
}, logger)
if err != nil {
log.Error("Could not start sentinel", "err", err)
logger.Error("Could not start sentinel", "err", err)
return nil, err
}
log.Info("Sentinel started", "addr", cfg.ServerAddr)
logger.Info("Sentinel started", "addr", cfg.ServerAddr)
return s, nil
}

View File

@ -51,7 +51,7 @@ func main() {
)
flag.Parse()
logging.SetupLogger("bootnode")
logger := logging.SetupLogger("bootnode")
natm, err := nat.Parse(*natdesc)
if err != nil {
@ -121,7 +121,7 @@ func main() {
if err != nil {
panic(err)
}
ln := enode.NewLocalNode(db, nodeKey)
ln := enode.NewLocalNode(db, nodeKey, logger)
cfg := discover.Config{
PrivateKey: nodeKey,
NetRestrict: restrictList,

View File

@ -91,7 +91,7 @@ func runCaplinNode(cliCtx *cli.Context) error {
FinalizedEpoch: state.FinalizedCheckpoint().Epoch(),
HeadSlot: state.FinalizedCheckpoint().Epoch() * cfg.BeaconCfg.SlotsPerEpoch,
HeadRoot: state.FinalizedCheckpoint().BlockRoot(),
})
}, log.Root())
if err != nil {
log.Error("Could not start sentinel", "err", err)
}

View File

@ -15,8 +15,8 @@ import (
"github.com/ledgerwatch/log/v3"
)
func mainWithFlags(ctx context.Context, flags observer.CommandFlags) error {
server, err := observer.NewServer(flags)
func mainWithFlags(ctx context.Context, flags observer.CommandFlags, logger log.Logger) error {
server, err := observer.NewServer(flags, logger)
if err != nil {
return err
}

View File

@ -219,7 +219,7 @@ func (command *Command) withErigonLogPath() {
command.command.Flags().StringVar(&command.flags.ErigonLogPath, flag.Name, flag.Value, flag.Usage)
}
func (command *Command) ExecuteContext(ctx context.Context, runFunc func(ctx context.Context, flags CommandFlags) error) error {
func (command *Command) ExecuteContext(ctx context.Context, runFunc func(ctx context.Context, flags CommandFlags, logger log.Logger) error) error {
command.command.PersistentPostRun = func(cmd *cobra.Command, args []string) {
debug.Exit()
}
@ -231,7 +231,7 @@ func (command *Command) ExecuteContext(ctx context.Context, runFunc func(ctx con
return err
}
defer debug.Exit()
err = runFunc(cmd.Context(), command.flags)
err = runFunc(cmd.Context(), command.flags, logger)
if errors.Is(err, context.Canceled) {
return nil
}

View File

@ -30,10 +30,10 @@ type Server struct {
natInterface nat.Interface
discConfig discover.Config
log log.Logger
logger log.Logger
}
func NewServer(flags CommandFlags) (*Server, error) {
func NewServer(flags CommandFlags, logger log.Logger) (*Server, error) {
nodeDBPath := filepath.Join(flags.DataDir, "nodes", "eth66")
nodeKeyConfig := p2p.NodeKeyConfig{}
@ -42,7 +42,7 @@ func NewServer(flags CommandFlags) (*Server, error) {
return nil, err
}
localNode, err := makeLocalNode(nodeDBPath, privateKey, flags.Chain)
localNode, err := makeLocalNode(nodeDBPath, privateKey, flags.Chain, logger)
if err != nil {
return nil, err
}
@ -67,13 +67,11 @@ func NewServer(flags CommandFlags) (*Server, error) {
return nil, fmt.Errorf("bootnodes parse error: %w", err)
}
logger := log.New()
discConfig := discover.Config{
PrivateKey: privateKey,
NetRestrict: netRestrictList,
Bootnodes: bootnodes,
Log: logger.New(),
Log: logger,
}
instance := Server{
@ -86,12 +84,12 @@ func NewServer(flags CommandFlags) (*Server, error) {
return &instance, nil
}
func makeLocalNode(nodeDBPath string, privateKey *ecdsa.PrivateKey, chain string) (*enode.LocalNode, error) {
func makeLocalNode(nodeDBPath string, privateKey *ecdsa.PrivateKey, chain string, logger log.Logger) (*enode.LocalNode, error) {
db, err := enode.OpenDB(nodeDBPath, "")
if err != nil {
return nil, err
}
localNode := enode.NewLocalNode(db, privateKey)
localNode := enode.NewLocalNode(db, privateKey, logger)
localNode.SetFallbackIP(net.IP{127, 0, 0, 1})
forksEntry, err := makeForksENREntry(chain)
@ -144,13 +142,13 @@ func (server *Server) detectNATExternalIP() (net.IP, error) {
return nil, errors.New("no NAT flag configured")
}
if _, hasExtIP := server.natInterface.(nat.ExtIP); !hasExtIP {
server.log.Debug("Detecting external IP...")
server.logger.Debug("Detecting external IP...")
}
ip, err := server.natInterface.ExternalIP()
if err != nil {
return nil, fmt.Errorf("NAT ExternalIP error: %w", err)
}
server.log.Debug("External IP detected", "ip", ip)
server.logger.Debug("External IP detected", "ip", ip)
return ip, nil
}
@ -179,7 +177,7 @@ func (server *Server) Listen(ctx context.Context) (*discover.UDPv4, error) {
server.mapNATPort(ctx, realAddr)
}
server.log.Debug("Discovery UDP listener is up", "addr", realAddr)
server.logger.Debug("Discovery UDP listener is up", "addr", realAddr)
return discover.ListenV4(ctx, conn, server.localNode, server.discConfig)
}

View File

@ -52,7 +52,7 @@ func runSentinelNode(cliCtx *cli.Context) error {
NetworkConfig: cfg.NetworkCfg,
BeaconConfig: cfg.BeaconCfg,
NoDiscovery: cfg.NoDiscovery,
}, nil, &service.ServerConfig{Network: cfg.ServerProtocol, Addr: cfg.ServerAddr}, nil, nil)
}, nil, &service.ServerConfig{Network: cfg.ServerProtocol, Addr: cfg.ServerAddr}, nil, nil, log.Root())
if err != nil {
log.Error("[Sentinel] Could not start sentinel", "err", err)
return err

View File

@ -76,6 +76,7 @@ type Sentinel struct {
subManager *GossipManager
metrics bool
listenForPeersDoneCh chan struct{}
logger log.Logger
}
func (s *Sentinel) createLocalNode(
@ -88,7 +89,7 @@ func (s *Sentinel) createLocalNode(
if err != nil {
return nil, fmt.Errorf("could not open node's peer database: %w", err)
}
localNode := enode.NewLocalNode(db, privKey)
localNode := enode.NewLocalNode(db, privKey, s.logger)
ipEntry := enr.IP(ipAddr)
udpEntry := enr.UDP(udpPort)
@ -231,12 +232,14 @@ func New(
ctx context.Context,
cfg *SentinelConfig,
db kv.RoDB,
logger log.Logger,
) (*Sentinel, error) {
s := &Sentinel{
ctx: ctx,
cfg: cfg,
db: db,
metrics: true,
logger: logger,
}
// Setup discovery
@ -299,7 +302,7 @@ func (s *Sentinel) RecvGossip() <-chan *pubsub.Message {
func (s *Sentinel) Start() error {
if s.started {
log.Warn("[Sentinel] already running")
s.logger.Warn("[Sentinel] already running")
}
var err error
s.listener, err = s.createListener()

View File

@ -28,14 +28,16 @@ type SentinelServer struct {
sentinel *sentinel.Sentinel
gossipNotifier *gossipNotifier
mu sync.RWMutex
mu sync.RWMutex
logger log.Logger
}
func NewSentinelServer(ctx context.Context, sentinel *sentinel.Sentinel) *SentinelServer {
func NewSentinelServer(ctx context.Context, sentinel *sentinel.Sentinel, logger log.Logger) *SentinelServer {
return &SentinelServer{
sentinel: sentinel,
ctx: ctx,
gossipNotifier: newGossipNotifier(),
logger: logger,
}
}
@ -121,7 +123,7 @@ func (s *SentinelServer) SubscribeGossip(_ *sentinelrpc.EmptyMessage, stream sen
},
BlobIndex: packet.blobIndex,
}); err != nil {
log.Warn("[Sentinel] Could not relay gossip packet", "reason", err)
s.logger.Warn("[Sentinel] Could not relay gossip packet", "reason", err)
}
}
}
@ -256,7 +258,7 @@ func (s *SentinelServer) startServerBackgroundLoop() {
peers := s.sentinel.PeersList()
s.sentinel.Stop()
status := s.sentinel.Status()
s.sentinel, err = createSentinel(s.sentinel.Config(), s.sentinel.DB())
s.sentinel, err = createSentinel(s.sentinel.Config(), s.sentinel.DB(), s.logger)
if err != nil {
log.Warn("Could not coordinate sentinel", "err", err)
continue
@ -274,7 +276,7 @@ func (s *SentinelServer) startServerBackgroundLoop() {
func (s *SentinelServer) handleGossipPacket(pkt *pubsub.Message) error {
var err error
log.Trace("[Sentinel Gossip] Received Packet", "topic", pkt.Topic)
s.logger.Trace("[Sentinel Gossip] Received Packet", "topic", pkt.Topic)
data := pkt.GetData()
// If we use snappy codec then decompress it accordingly.

View File

@ -25,8 +25,8 @@ type ServerConfig struct {
Addr string
}
func createSentinel(cfg *sentinel.SentinelConfig, db kv.RoDB) (*sentinel.Sentinel, error) {
sent, err := sentinel.New(context.Background(), cfg, db)
func createSentinel(cfg *sentinel.SentinelConfig, db kv.RoDB, logger log.Logger) (*sentinel.Sentinel, error) {
sent, err := sentinel.New(context.Background(), cfg, db, logger)
if err != nil {
return nil, err
}
@ -44,35 +44,35 @@ func createSentinel(cfg *sentinel.SentinelConfig, db kv.RoDB) (*sentinel.Sentine
for _, v := range gossipTopics {
if err := sent.Unsubscribe(v); err != nil {
log.Error("[Sentinel] failed to start sentinel", "err", err)
logger.Error("[Sentinel] failed to start sentinel", "err", err)
continue
}
// now lets separately connect to the gossip topics. this joins the room
subscriber, err := sent.SubscribeGossip(v)
if err != nil {
log.Error("[Sentinel] failed to start sentinel", "err", err)
logger.Error("[Sentinel] failed to start sentinel", "err", err)
}
// actually start the subscription, aka listening and sending packets to the sentinel recv channel
err = subscriber.Listen()
if err != nil {
log.Error("[Sentinel] failed to start sentinel", "err", err)
logger.Error("[Sentinel] failed to start sentinel", "err", err)
}
}
return sent, nil
}
func StartSentinelService(cfg *sentinel.SentinelConfig, db kv.RoDB, srvCfg *ServerConfig, creds credentials.TransportCredentials, initialStatus *cltypes.Status) (sentinelrpc.SentinelClient, error) {
func StartSentinelService(cfg *sentinel.SentinelConfig, db kv.RoDB, srvCfg *ServerConfig, creds credentials.TransportCredentials, initialStatus *cltypes.Status, logger log.Logger) (sentinelrpc.SentinelClient, error) {
ctx := context.Background()
sent, err := createSentinel(cfg, db)
sent, err := createSentinel(cfg, db, logger)
if err != nil {
return nil, err
}
rcmgrObs.MustRegisterWith(prometheus.DefaultRegisterer)
log.Info("[Sentinel] Sentinel started", "enr", sent.String())
logger.Info("[Sentinel] Sentinel started", "enr", sent.String())
if initialStatus != nil {
sent.SetStatus(initialStatus)
}
server := NewSentinelServer(ctx, sent)
server := NewSentinelServer(ctx, sent, logger)
if creds == nil {
creds = insecure.NewCredentials()
}

View File

@ -967,7 +967,7 @@ func (ss *GrpcServer) SetStatus(ctx context.Context, statusData *proto_sentry.St
}
// Add protocol
if err = srv.Start(ss.ctx); err != nil {
if err = srv.Start(ss.ctx, ss.logger); err != nil {
srv.Stop()
return reply, fmt.Errorf("could not start server: %w", err)
}

View File

@ -936,7 +936,6 @@ func NewP2PConfig(
NoDiscovery: nodiscover,
PrivateKey: serverKey,
Name: nodeName,
Log: log.New(),
NodeDatabase: enodeDBPath,
AllowedPorts: allowedPorts,
TmpDir: dirs.Tmp,

View File

@ -601,7 +601,7 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere
FinalizedEpoch: state.FinalizedCheckpoint().Epoch(),
HeadSlot: state.FinalizedCheckpoint().Epoch() * beaconCfg.SlotsPerEpoch,
HeadRoot: state.FinalizedCheckpoint().BlockRoot(),
})
}, logger)
if err != nil {
return nil, err
}

View File

@ -60,11 +60,11 @@ type udpTest struct {
remoteaddr *net.UDPAddr
}
func newUDPTest(t *testing.T) *udpTest {
return newUDPTestContext(context.Background(), t)
func newUDPTest(t *testing.T, logger log.Logger) *udpTest {
return newUDPTestContext(context.Background(), t, logger)
}
func newUDPTestContext(ctx context.Context, t *testing.T) *udpTest {
func newUDPTestContext(ctx context.Context, t *testing.T, logger log.Logger) *udpTest {
ctx = disableLookupSlowdown(ctx)
replyTimeout := contextGetReplyTimeout(ctx)
@ -86,7 +86,7 @@ func newUDPTestContext(ctx context.Context, t *testing.T) *udpTest {
if err != nil {
panic(err)
}
ln := enode.NewLocalNode(test.db, test.localkey)
ln := enode.NewLocalNode(test.db, test.localkey, logger)
test.udp, err = ListenV4(ctx, test.pipe, ln, Config{
PrivateKey: test.localkey,
Log: testlog.Logger(t, log.LvlError),
@ -166,7 +166,8 @@ func (test *udpTest) waitPacketOut(validate interface{}) (closed bool) {
}
func TestUDPv4_packetErrors(t *testing.T) {
test := newUDPTest(t)
logger := log.New()
test := newUDPTest(t, logger)
defer test.close()
test.packetIn(errExpired, &v4wire.Ping{From: testRemote, To: testLocalAnnounced, Version: 4})
@ -177,7 +178,8 @@ func TestUDPv4_packetErrors(t *testing.T) {
func TestUDPv4_pingTimeout(t *testing.T) {
t.Parallel()
test := newUDPTest(t)
logger := log.New()
test := newUDPTest(t, logger)
defer test.close()
key := newkey()
@ -198,11 +200,12 @@ func TestUDPv4_responseTimeouts(t *testing.T) {
t.Skip("unstable test on darwin")
}
t.Parallel()
logger := log.New()
ctx := context.Background()
ctx = contextWithReplyTimeout(ctx, respTimeout)
test := newUDPTestContext(ctx, t)
test := newUDPTestContext(ctx, t, logger)
defer test.close()
rand.Seed(time.Now().UnixNano())
@ -274,7 +277,8 @@ func TestUDPv4_responseTimeouts(t *testing.T) {
func TestUDPv4_findnodeTimeout(t *testing.T) {
t.Parallel()
test := newUDPTest(t)
logger := log.New()
test := newUDPTest(t, logger)
defer test.close()
toaddr := &net.UDPAddr{IP: net.ParseIP("1.2.3.4"), Port: 2222}
@ -290,7 +294,8 @@ func TestUDPv4_findnodeTimeout(t *testing.T) {
}
func TestUDPv4_findnode(t *testing.T) {
test := newUDPTest(t)
logger := log.New()
test := newUDPTest(t, logger)
defer test.close()
// put a few nodes into the table. their exact
@ -347,7 +352,8 @@ func TestUDPv4_findnode(t *testing.T) {
}
func TestUDPv4_findnodeMultiReply(t *testing.T) {
test := newUDPTest(t)
logger := log.New()
test := newUDPTest(t, logger)
defer test.close()
rid := enode.PubkeyToIDV4(&test.remotekey.PublicKey)
@ -403,7 +409,8 @@ func TestUDPv4_findnodeMultiReply(t *testing.T) {
// This test checks that reply matching of pong verifies the ping hash.
func TestUDPv4_pingMatch(t *testing.T) {
test := newUDPTest(t)
logger := log.New()
test := newUDPTest(t, logger)
defer test.close()
randToken := make([]byte, 32)
@ -417,7 +424,8 @@ func TestUDPv4_pingMatch(t *testing.T) {
// This test checks that reply matching of pong verifies the sender IP address.
func TestUDPv4_pingMatchIP(t *testing.T) {
test := newUDPTest(t)
logger := log.New()
test := newUDPTest(t, logger)
defer test.close()
test.packetIn(nil, &v4wire.Ping{From: testRemote, To: testLocalAnnounced, Version: 4, Expiration: futureExp})
@ -434,7 +442,8 @@ func TestUDPv4_pingMatchIP(t *testing.T) {
}
func TestUDPv4_successfulPing(t *testing.T) {
test := newUDPTest(t)
logger := log.New()
test := newUDPTest(t, logger)
added := make(chan *node, 1)
test.table.nodeAddedHook = func(n *node) { added <- n }
defer test.close()
@ -500,7 +509,8 @@ func TestUDPv4_successfulPing(t *testing.T) {
// This test checks that EIP-868 requests work.
func TestUDPv4_EIP868(t *testing.T) {
test := newUDPTest(t)
logger := log.New()
test := newUDPTest(t, logger)
defer test.close()
test.udp.localNode.Set(enr.WithEntry("foo", "bar"))
@ -542,6 +552,7 @@ func TestUDPv4_smallNetConvergence(t *testing.T) {
t.Skip("fix me on win please")
}
t.Parallel()
logger := log.New()
ctx := context.Background()
ctx = disableLookupSlowdown(ctx)
@ -558,7 +569,7 @@ func TestUDPv4_smallNetConvergence(t *testing.T) {
cfg.PingBackDelay = time.Nanosecond
cfg.TableRevalidateInterval = time.Hour
nodes[i] = startLocalhostV4(ctx, t, cfg)
nodes[i] = startLocalhostV4(ctx, t, cfg, logger)
}
defer func() {
@ -604,7 +615,7 @@ func TestUDPv4_smallNetConvergence(t *testing.T) {
}
}
func startLocalhostV4(ctx context.Context, t *testing.T, cfg Config) *UDPv4 {
func startLocalhostV4(ctx context.Context, t *testing.T, cfg Config, logger log.Logger) *UDPv4 {
t.Helper()
cfg.PrivateKey = newkey()
@ -613,7 +624,7 @@ func startLocalhostV4(ctx context.Context, t *testing.T, cfg Config) *UDPv4 {
if err != nil {
panic(err)
}
ln := enode.NewLocalNode(db, cfg.PrivateKey)
ln := enode.NewLocalNode(db, cfg.PrivateKey, logger)
// Prefix logs with node ID.
lprefix := fmt.Sprintf("(%s)", ln.ID().TerminalString())

View File

@ -38,7 +38,7 @@ import (
"github.com/ledgerwatch/erigon/rlp"
)
func startLocalhostV5(t *testing.T, cfg Config) *UDPv5 {
func startLocalhostV5(t *testing.T, cfg Config, logger log.Logger) *UDPv5 {
cfg.PrivateKey = newkey()
tmpDir := t.TempDir()
db, err := enode.OpenDB("", tmpDir)
@ -46,7 +46,7 @@ func startLocalhostV5(t *testing.T, cfg Config) *UDPv5 {
panic(err)
}
t.Cleanup(db.Close)
ln := enode.NewLocalNode(db, cfg.PrivateKey)
ln := enode.NewLocalNode(db, cfg.PrivateKey, logger)
// Prefix logs with node ID.
lprefix := fmt.Sprintf("(%s)", ln.ID().TerminalString())
@ -80,10 +80,11 @@ func TestUDPv5_pingHandling(t *testing.T) {
t.Skip("fix me on win please")
}
t.Parallel()
test := newUDPV5Test(t)
logger := log.New()
test := newUDPV5Test(t, logger)
t.Cleanup(test.close)
test.packetIn(&v5wire.Ping{ReqID: []byte("foo")})
test.packetIn(&v5wire.Ping{ReqID: []byte("foo")}, logger)
test.waitPacketOut(func(p *v5wire.Pong, addr *net.UDPAddr, _ v5wire.Nonce) {
if !bytes.Equal(p.ReqID, []byte("foo")) {
t.Error("wrong request ID in response:", p.ReqID)
@ -100,7 +101,8 @@ func TestUDPv5_unknownPacket(t *testing.T) {
t.Skip("fix me on win please")
}
t.Parallel()
test := newUDPV5Test(t)
logger := log.New()
test := newUDPV5Test(t, logger)
t.Cleanup(test.close)
nonce := v5wire.Nonce{1, 2, 3}
@ -118,16 +120,16 @@ func TestUDPv5_unknownPacket(t *testing.T) {
}
// Unknown packet from unknown node.
test.packetIn(&v5wire.Unknown{Nonce: nonce})
test.packetIn(&v5wire.Unknown{Nonce: nonce}, logger)
test.waitPacketOut(func(p *v5wire.Whoareyou, addr *net.UDPAddr, _ v5wire.Nonce) {
check(p, 0)
})
// Make node known.
n := test.getNode(test.remotekey, test.remoteaddr).Node()
n := test.getNode(test.remotekey, test.remoteaddr, logger).Node()
test.table.addSeenNode(wrapNode(n))
test.packetIn(&v5wire.Unknown{Nonce: nonce})
test.packetIn(&v5wire.Unknown{Nonce: nonce}, logger)
test.waitPacketOut(func(p *v5wire.Whoareyou, addr *net.UDPAddr, _ v5wire.Nonce) {
check(p, n.Seq())
})
@ -139,7 +141,8 @@ func TestUDPv5_findnodeHandling(t *testing.T) {
t.Skip("fix me on win please")
}
t.Parallel()
test := newUDPV5Test(t)
logger := log.New()
test := newUDPV5Test(t, logger)
t.Cleanup(test.close)
// Create test nodes and insert them into the table.
@ -151,28 +154,28 @@ func TestUDPv5_findnodeHandling(t *testing.T) {
fillTable(test.table, wrapNodes(nodes248))
// Requesting with distance zero should return the node's own record.
test.packetIn(&v5wire.Findnode{ReqID: []byte{0}, Distances: []uint{0}})
test.packetIn(&v5wire.Findnode{ReqID: []byte{0}, Distances: []uint{0}}, logger)
test.expectNodes([]byte{0}, 1, []*enode.Node{test.udp.Self()})
// Requesting with distance > 256 shouldn't crash.
test.packetIn(&v5wire.Findnode{ReqID: []byte{1}, Distances: []uint{4234098}})
test.packetIn(&v5wire.Findnode{ReqID: []byte{1}, Distances: []uint{4234098}}, logger)
test.expectNodes([]byte{1}, 1, nil)
// Requesting with empty distance list shouldn't crash either.
test.packetIn(&v5wire.Findnode{ReqID: []byte{2}, Distances: []uint{}})
test.packetIn(&v5wire.Findnode{ReqID: []byte{2}, Distances: []uint{}}, logger)
test.expectNodes([]byte{2}, 1, nil)
// This request gets no nodes because the corresponding bucket is empty.
test.packetIn(&v5wire.Findnode{ReqID: []byte{3}, Distances: []uint{254}})
test.packetIn(&v5wire.Findnode{ReqID: []byte{3}, Distances: []uint{254}}, logger)
test.expectNodes([]byte{3}, 1, nil)
// This request gets all the distance-253 nodes.
test.packetIn(&v5wire.Findnode{ReqID: []byte{4}, Distances: []uint{253}})
test.packetIn(&v5wire.Findnode{ReqID: []byte{4}, Distances: []uint{253}}, logger)
test.expectNodes([]byte{4}, 4, nodes253)
// This request gets all the distance-249 nodes and some more at 248 because
// the bucket at 249 is not full.
test.packetIn(&v5wire.Findnode{ReqID: []byte{5}, Distances: []uint{249, 248}})
test.packetIn(&v5wire.Findnode{ReqID: []byte{5}, Distances: []uint{249, 248}}, logger)
nodes := make([]*enode.Node, 0, len(nodes249)+len(nodes248[:10]))
nodes = append(nodes, nodes249...)
nodes = append(nodes, nodes248[:10]...)
@ -226,10 +229,11 @@ func TestUDPv5_pingCall(t *testing.T) {
t.Skip("fix me on win please")
}
t.Parallel()
test := newUDPV5Test(t)
logger := log.New()
test := newUDPV5Test(t, logger)
t.Cleanup(test.close)
remote := test.getNode(test.remotekey, test.remoteaddr).Node()
remote := test.getNode(test.remotekey, test.remoteaddr, logger).Node()
done := make(chan error, 1)
// This ping times out.
@ -248,7 +252,7 @@ func TestUDPv5_pingCall(t *testing.T) {
done <- err
}()
test.waitPacketOut(func(p *v5wire.Ping, addr *net.UDPAddr, _ v5wire.Nonce) {
test.packetInFrom(test.remotekey, test.remoteaddr, &v5wire.Pong{ReqID: p.ReqID})
test.packetInFrom(test.remotekey, test.remoteaddr, &v5wire.Pong{ReqID: p.ReqID}, logger)
})
if err := <-done; err != nil {
t.Fatal(err)
@ -261,7 +265,7 @@ func TestUDPv5_pingCall(t *testing.T) {
}()
test.waitPacketOut(func(p *v5wire.Ping, addr *net.UDPAddr, _ v5wire.Nonce) {
wrongAddr := &net.UDPAddr{IP: net.IP{33, 44, 55, 22}, Port: 10101}
test.packetInFrom(test.remotekey, wrongAddr, &v5wire.Pong{ReqID: p.ReqID})
test.packetInFrom(test.remotekey, wrongAddr, &v5wire.Pong{ReqID: p.ReqID}, logger)
})
if err := <-done; err != errTimeout {
t.Fatalf("want errTimeout for reply from wrong IP, got %q", err)
@ -275,13 +279,14 @@ func TestUDPv5_findnodeCall(t *testing.T) {
t.Skip("fix me on win please")
}
t.Parallel()
test := newUDPV5Test(t)
logger := log.New()
test := newUDPV5Test(t, logger)
t.Cleanup(test.close)
// Launch the request:
var (
distances = []uint{230}
remote = test.getNode(test.remotekey, test.remoteaddr).Node()
remote = test.getNode(test.remotekey, test.remoteaddr, logger).Node()
nodes = nodesAtDistance(remote.ID(), int(distances[0]), 8)
done = make(chan error, 1)
response []*enode.Node
@ -301,12 +306,12 @@ func TestUDPv5_findnodeCall(t *testing.T) {
ReqID: p.ReqID,
Total: 2,
Nodes: nodesToRecords(nodes[:4]),
})
}, logger)
test.packetIn(&v5wire.Nodes{
ReqID: p.ReqID,
Total: 2,
Nodes: nodesToRecords(nodes[4:]),
})
}, logger)
})
// Check results:
@ -327,10 +332,11 @@ func TestUDPv5_callResend(t *testing.T) {
t.Skip("fix me on win please")
}
t.Parallel()
test := newUDPV5Test(t)
logger := log.New()
test := newUDPV5Test(t, logger)
t.Cleanup(test.close)
remote := test.getNode(test.remotekey, test.remoteaddr).Node()
remote := test.getNode(test.remotekey, test.remoteaddr, logger).Node()
done := make(chan error, 2)
go func() {
_, err := test.udp.ping(remote)
@ -343,15 +349,15 @@ func TestUDPv5_callResend(t *testing.T) {
// Ping answered by WHOAREYOU.
test.waitPacketOut(func(p *v5wire.Ping, addr *net.UDPAddr, nonce v5wire.Nonce) {
test.packetIn(&v5wire.Whoareyou{Nonce: nonce})
test.packetIn(&v5wire.Whoareyou{Nonce: nonce}, logger)
})
// Ping should be re-sent.
test.waitPacketOut(func(p *v5wire.Ping, addr *net.UDPAddr, _ v5wire.Nonce) {
test.packetIn(&v5wire.Pong{ReqID: p.ReqID})
test.packetIn(&v5wire.Pong{ReqID: p.ReqID}, logger)
})
// Answer the other ping.
test.waitPacketOut(func(p *v5wire.Ping, addr *net.UDPAddr, _ v5wire.Nonce) {
test.packetIn(&v5wire.Pong{ReqID: p.ReqID})
test.packetIn(&v5wire.Pong{ReqID: p.ReqID}, logger)
})
if err := <-done; err != nil {
t.Fatalf("unexpected ping error: %v", err)
@ -367,10 +373,11 @@ func TestUDPv5_multipleHandshakeRounds(t *testing.T) {
t.Skip("fix me on win please")
}
t.Parallel()
test := newUDPV5Test(t)
logger := log.New()
test := newUDPV5Test(t, logger)
t.Cleanup(test.close)
remote := test.getNode(test.remotekey, test.remoteaddr).Node()
remote := test.getNode(test.remotekey, test.remoteaddr, logger).Node()
done := make(chan error, 1)
go func() {
_, err := test.udp.ping(remote)
@ -379,11 +386,11 @@ func TestUDPv5_multipleHandshakeRounds(t *testing.T) {
// Ping answered by WHOAREYOU.
test.waitPacketOut(func(p *v5wire.Ping, addr *net.UDPAddr, nonce v5wire.Nonce) {
test.packetIn(&v5wire.Whoareyou{Nonce: nonce})
test.packetIn(&v5wire.Whoareyou{Nonce: nonce}, logger)
})
// Ping answered by WHOAREYOU again.
test.waitPacketOut(func(p *v5wire.Ping, addr *net.UDPAddr, nonce v5wire.Nonce) {
test.packetIn(&v5wire.Whoareyou{Nonce: nonce})
test.packetIn(&v5wire.Whoareyou{Nonce: nonce}, logger)
})
if err := <-done; err != errTimeout {
t.Fatalf("unexpected ping error: %q", err)
@ -396,7 +403,8 @@ func TestUDPv5_talkHandling(t *testing.T) {
t.Skip("fix me on win please")
}
t.Parallel()
test := newUDPV5Test(t)
logger := log.New()
test := newUDPV5Test(t, logger)
t.Cleanup(test.close)
var recvMessage []byte
@ -410,7 +418,7 @@ func TestUDPv5_talkHandling(t *testing.T) {
ReqID: []byte("foo"),
Protocol: "test",
Message: []byte("test request"),
})
}, logger)
test.waitPacketOut(func(p *v5wire.TalkResponse, addr *net.UDPAddr, _ v5wire.Nonce) {
if !bytes.Equal(p.ReqID, []byte("foo")) {
t.Error("wrong request ID in response:", p.ReqID)
@ -429,7 +437,7 @@ func TestUDPv5_talkHandling(t *testing.T) {
ReqID: []byte("2"),
Protocol: "wrong",
Message: []byte("test request"),
})
}, logger)
test.waitPacketOut(func(p *v5wire.TalkResponse, addr *net.UDPAddr, _ v5wire.Nonce) {
if !bytes.Equal(p.ReqID, []byte("2")) {
t.Error("wrong request ID in response:", p.ReqID)
@ -449,10 +457,11 @@ func TestUDPv5_talkRequest(t *testing.T) {
t.Skip("fix me on win please")
}
t.Parallel()
test := newUDPV5Test(t)
logger := log.New()
test := newUDPV5Test(t, logger)
t.Cleanup(test.close)
remote := test.getNode(test.remotekey, test.remoteaddr).Node()
remote := test.getNode(test.remotekey, test.remoteaddr, logger).Node()
done := make(chan error, 1)
// This request times out.
@ -480,7 +489,7 @@ func TestUDPv5_talkRequest(t *testing.T) {
test.packetInFrom(test.remotekey, test.remoteaddr, &v5wire.TalkResponse{
ReqID: p.ReqID,
Message: []byte("test response"),
})
}, logger)
})
if err := <-done; err != nil {
t.Fatal(err)
@ -493,8 +502,9 @@ func TestUDPv5_LocalNode(t *testing.T) {
t.Skip("fix me on win please")
}
t.Parallel()
logger := log.New()
var cfg Config
node := startLocalhostV5(t, cfg)
node := startLocalhostV5(t, cfg, logger)
defer node.Close()
localNd := node.LocalNode()
@ -580,11 +590,11 @@ func (c *testCodec) decodeFrame(input []byte) (frame testCodecFrame, p v5wire.Pa
return frame, p, err
}
func newUDPV5Test(t *testing.T) *udpV5Test {
return newUDPV5TestContext(context.Background(), t)
func newUDPV5Test(t *testing.T, logger log.Logger) *udpV5Test {
return newUDPV5TestContext(context.Background(), t, logger)
}
func newUDPV5TestContext(ctx context.Context, t *testing.T) *udpV5Test {
func newUDPV5TestContext(ctx context.Context, t *testing.T, logger log.Logger) *udpV5Test {
ctx = disableLookupSlowdown(ctx)
replyTimeout := contextGetReplyTimeout(ctx)
@ -609,7 +619,7 @@ func newUDPV5TestContext(ctx context.Context, t *testing.T) *udpV5Test {
panic(err)
}
ln := enode.NewLocalNode(test.db, test.localkey)
ln := enode.NewLocalNode(test.db, test.localkey, logger)
ln.SetStaticIP(net.IP{10, 0, 0, 1})
ln.Set(enr.UDP(30303))
test.udp, err = ListenV5(ctx, test.pipe, ln, Config{
@ -632,16 +642,16 @@ func newUDPV5TestContext(ctx context.Context, t *testing.T) *udpV5Test {
}
// handles a packet as if it had been sent to the transport.
func (test *udpV5Test) packetIn(packet v5wire.Packet) {
func (test *udpV5Test) packetIn(packet v5wire.Packet, logger log.Logger) {
test.t.Helper()
test.packetInFrom(test.remotekey, test.remoteaddr, packet)
test.packetInFrom(test.remotekey, test.remoteaddr, packet, logger)
}
// handles a packet as if it had been sent to the transport by the key/endpoint.
func (test *udpV5Test) packetInFrom(key *ecdsa.PrivateKey, addr *net.UDPAddr, packet v5wire.Packet) {
func (test *udpV5Test) packetInFrom(key *ecdsa.PrivateKey, addr *net.UDPAddr, packet v5wire.Packet, logger log.Logger) {
test.t.Helper()
ln := test.getNode(key, addr)
ln := test.getNode(key, addr, logger)
codec := &testCodec{test: test, id: ln.ID()}
enc, _, err := codec.Encode(test.udp.Self().ID(), addr.String(), packet, nil)
if err != nil {
@ -653,7 +663,7 @@ func (test *udpV5Test) packetInFrom(key *ecdsa.PrivateKey, addr *net.UDPAddr, pa
}
// getNode ensures the test knows about a node at the given endpoint.
func (test *udpV5Test) getNode(key *ecdsa.PrivateKey, addr *net.UDPAddr) *enode.LocalNode {
func (test *udpV5Test) getNode(key *ecdsa.PrivateKey, addr *net.UDPAddr, logger log.Logger) *enode.LocalNode {
id := enode.PubkeyToIDV4(&key.PublicKey)
ln := test.nodesByID[id]
if ln == nil {
@ -664,7 +674,7 @@ func (test *udpV5Test) getNode(key *ecdsa.PrivateKey, addr *net.UDPAddr) *enode.
}
test.t.Cleanup(db.Close)
ln = enode.NewLocalNode(db, key)
ln = enode.NewLocalNode(db, key, logger)
ln.SetStaticIP(addr.IP)
ln.Set(enr.UDP(addr.Port))
test.nodesByID[id] = ln

View File

@ -28,6 +28,7 @@ import (
"github.com/ledgerwatch/erigon/common/hexutil"
"github.com/ledgerwatch/erigon/crypto"
"github.com/ledgerwatch/erigon/p2p/enode"
"github.com/ledgerwatch/log/v3"
)
func TestVector_ECDH(t *testing.T) {
@ -41,10 +42,11 @@ func TestVector_ECDH(t *testing.T) {
}
func TestVector_KDF(t *testing.T) {
logger := log.New()
var (
ephKey = hexPrivkey("0xfb757dc581730490a1d7a00deea65e9b1936924caaea8f44d476014856b68736")
cdata = hexutil.MustDecode("0x000000000000000000000000000000006469736376350001010102030405060708090a0b0c00180102030405060708090a0b0c0d0e0f100000000000000000")
net = newHandshakeTest(t.TempDir())
net = newHandshakeTest(t.TempDir(), logger)
)
defer net.close()

View File

@ -35,6 +35,7 @@ import (
"github.com/ledgerwatch/erigon/common/mclock"
"github.com/ledgerwatch/erigon/crypto"
"github.com/ledgerwatch/erigon/p2p/enode"
"github.com/ledgerwatch/log/v3"
)
// To regenerate discv5 test vectors, run
@ -69,8 +70,9 @@ func TestMinSizes(t *testing.T) {
// This test checks the basic handshake flow where A talks to B and A has no secrets.
func TestHandshake(t *testing.T) {
t.Parallel()
logger := log.New()
tmpDir := t.TempDir()
net := newHandshakeTest(tmpDir)
net := newHandshakeTest(tmpDir, logger)
defer net.close()
// A -> B RANDOM PACKET
@ -101,8 +103,9 @@ func TestHandshake(t *testing.T) {
// This test checks that handshake attempts are removed within the timeout.
func TestHandshake_timeout(t *testing.T) {
t.Parallel()
logger := log.New()
tmpDir := t.TempDir()
net := newHandshakeTest(tmpDir)
net := newHandshakeTest(tmpDir, logger)
defer net.close()
// A -> B RANDOM PACKET
@ -127,8 +130,9 @@ func TestHandshake_timeout(t *testing.T) {
// This test checks handshake behavior when no record is sent in the auth response.
func TestHandshake_norecord(t *testing.T) {
t.Parallel()
logger := log.New()
tmpDir := t.TempDir()
net := newHandshakeTest(tmpDir)
net := newHandshakeTest(tmpDir, logger)
defer net.close()
// A -> B RANDOM PACKET
@ -169,7 +173,8 @@ func TestHandshake_rekey(t *testing.T) {
t.Parallel()
tmpDir := t.TempDir()
net := newHandshakeTest(tmpDir)
logger := log.New()
net := newHandshakeTest(tmpDir, logger)
defer net.close()
session := &session{
@ -209,7 +214,8 @@ func TestHandshake_rekey(t *testing.T) {
func TestHandshake_rekey2(t *testing.T) {
t.Parallel()
tmpDir := t.TempDir()
net := newHandshakeTest(tmpDir)
logger := log.New()
net := newHandshakeTest(tmpDir, logger)
defer net.close()
initKeysA := &session{
@ -244,7 +250,8 @@ func TestHandshake_rekey2(t *testing.T) {
func TestHandshake_BadHandshakeAttack(t *testing.T) {
t.Parallel()
tmpDir := t.TempDir()
net := newHandshakeTest(tmpDir)
logger := log.New()
net := newHandshakeTest(tmpDir, logger)
defer net.close()
// A -> B RANDOM PACKET
@ -285,7 +292,8 @@ func TestHandshake_BadHandshakeAttack(t *testing.T) {
func TestDecodeErrorsV5(t *testing.T) {
t.Parallel()
tmpDir := t.TempDir()
net := newHandshakeTest(tmpDir)
logger := log.New()
net := newHandshakeTest(tmpDir, logger)
defer net.close()
net.nodeA.expectDecodeErr(t, errTooShort, []byte{})
@ -296,6 +304,7 @@ func TestDecodeErrorsV5(t *testing.T) {
// This test checks that all test vectors can be decoded.
func TestTestVectorsV5(t *testing.T) {
logger := log.New()
var (
idA = enode.PubkeyToIDV4(&testKeyA.PublicKey)
idB = enode.PubkeyToIDV4(&testKeyB.PublicKey)
@ -315,7 +324,7 @@ func TestTestVectorsV5(t *testing.T) {
challenge0A, challenge1A, challenge0B = c, c, c
challenge1A.RecordSeq = 1
tmpDir := t.TempDir()
net := newHandshakeTest(tmpDir)
net := newHandshakeTest(tmpDir, logger)
challenge0A.Node = net.nodeA.n()
challenge0B.Node = net.nodeB.n()
challenge1A.Node = net.nodeA.n()
@ -374,7 +383,7 @@ func TestTestVectorsV5(t *testing.T) {
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
net := newHandshakeTest(tmpDir)
net := newHandshakeTest(tmpDir, logger)
defer net.close()
// Override all random inputs.
@ -443,7 +452,8 @@ func testVectorComment(net *handshakeTest, p Packet, challenge *Whoareyou, nonce
// This benchmark checks performance of handshake packet decoding.
func BenchmarkV5_DecodeHandshakePingSecp256k1(b *testing.B) {
tmpDir := b.TempDir()
net := newHandshakeTest(tmpDir)
logger := log.New()
net := newHandshakeTest(tmpDir, logger)
defer net.close()
var (
@ -472,7 +482,8 @@ func BenchmarkV5_DecodeHandshakePingSecp256k1(b *testing.B) {
// This benchmark checks how long it takes to decode an encrypted ping packet.
func BenchmarkV5_DecodePing(b *testing.B) {
tmpDir := b.TempDir()
net := newHandshakeTest(tmpDir)
logger := log.New()
net := newHandshakeTest(tmpDir, logger)
defer net.close()
session := &session{
@ -511,10 +522,10 @@ type handshakeTestNode struct {
c *Codec
}
func newHandshakeTest(tmpDir string) *handshakeTest {
func newHandshakeTest(tmpDir string, logger log.Logger) *handshakeTest {
t := new(handshakeTest)
t.nodeA.init(testKeyA, net.IP{127, 0, 0, 1}, &t.clock, tmpDir)
t.nodeB.init(testKeyB, net.IP{127, 0, 0, 1}, &t.clock, tmpDir)
t.nodeA.init(testKeyA, net.IP{127, 0, 0, 1}, &t.clock, tmpDir, logger)
t.nodeB.init(testKeyB, net.IP{127, 0, 0, 1}, &t.clock, tmpDir, logger)
return t
}
@ -523,12 +534,12 @@ func (t *handshakeTest) close() {
t.nodeB.ln.Database().Close()
}
func (n *handshakeTestNode) init(key *ecdsa.PrivateKey, ip net.IP, clock mclock.Clock, tmpDir string) {
func (n *handshakeTestNode) init(key *ecdsa.PrivateKey, ip net.IP, clock mclock.Clock, tmpDir string, logger log.Logger) {
db, err := enode.OpenDB("", tmpDir)
if err != nil {
panic(err)
}
n.ln = enode.NewLocalNode(db, key)
n.ln = enode.NewLocalNode(db, key, logger)
n.ln.SetStaticIP(ip)
if n.ln.Node().Seq() != 1 {
panic(fmt.Errorf("unexpected seq %d", n.ln.Node().Seq()))

View File

@ -53,6 +53,7 @@ type LocalNode struct {
entries map[string]enr.Entry
endpoint4 lnEndpoint
endpoint6 lnEndpoint
logger log.Logger
}
type lnEndpoint struct {
@ -62,7 +63,7 @@ type lnEndpoint struct {
}
// NewLocalNode creates a local node.
func NewLocalNode(db *DB, key *ecdsa.PrivateKey) *LocalNode {
func NewLocalNode(db *DB, key *ecdsa.PrivateKey, logger log.Logger) *LocalNode {
ln := &LocalNode{
id: PubkeyToIDV4(&key.PublicKey),
db: db,
@ -74,6 +75,7 @@ func NewLocalNode(db *DB, key *ecdsa.PrivateKey) *LocalNode {
endpoint6: lnEndpoint{
track: netutil.NewIPTracker(iptrackWindow, iptrackContactWindow, iptrackMinStatements),
},
logger: logger,
}
ln.seq = db.localSeq(ln.id)
ln.invalidate()
@ -281,7 +283,7 @@ func (ln *LocalNode) sign() {
panic(fmt.Errorf("enode: can't verify local record: %w", err))
}
ln.cur.Store(n)
log.Trace("New local node record", "seq", ln.seq, "id", n.ID(), "ip", n.IP(), "udp", n.UDP(), "tcp", n.TCP())
ln.logger.Trace("New local node record", "seq", ln.seq, "id", n.ID(), "ip", n.IP(), "udp", n.UDP(), "tcp", n.TCP())
}
func (ln *LocalNode) bumpSeq() {

View File

@ -23,21 +23,23 @@ import (
"github.com/ledgerwatch/erigon/crypto"
"github.com/ledgerwatch/erigon/p2p/enr"
"github.com/ledgerwatch/log/v3"
"github.com/stretchr/testify/assert"
)
func newLocalNodeForTesting(tmpDir string) (*LocalNode, *DB) {
func newLocalNodeForTesting(tmpDir string, logger log.Logger) (*LocalNode, *DB) {
db, err := OpenDB("", tmpDir)
if err != nil {
panic(err)
}
key, _ := crypto.GenerateKey()
return NewLocalNode(db, key), db
return NewLocalNode(db, key, logger), db
}
func TestLocalNode(t *testing.T) {
tmpDir := t.TempDir()
ln, db := newLocalNodeForTesting(tmpDir)
logger := log.New()
ln, db := newLocalNodeForTesting(tmpDir, logger)
defer db.Close()
if ln.Node().ID() != ln.ID() {
@ -55,7 +57,8 @@ func TestLocalNode(t *testing.T) {
func TestLocalNodeSeqPersist(t *testing.T) {
tmpDir := t.TempDir()
ln, db := newLocalNodeForTesting(tmpDir)
logger := log.New()
ln, db := newLocalNodeForTesting(tmpDir, logger)
defer db.Close()
if s := ln.Node().Seq(); s != 1 {
@ -69,7 +72,7 @@ func TestLocalNodeSeqPersist(t *testing.T) {
// Create a new instance, it should reload the sequence number.
// The number increases just after that because a new record is
// created without the "x" entry.
ln2 := NewLocalNode(db, ln.key)
ln2 := NewLocalNode(db, ln.key, logger)
if s := ln2.Node().Seq(); s != 3 {
t.Fatalf("wrong seq %d on new instance, want 3", s)
}
@ -77,7 +80,7 @@ func TestLocalNodeSeqPersist(t *testing.T) {
// Create a new instance with a different node key on the same database.
// This should reset the sequence number.
key, _ := crypto.GenerateKey()
ln3 := NewLocalNode(db, key)
ln3 := NewLocalNode(db, key, logger)
if s := ln3.Node().Seq(); s != 1 {
t.Fatalf("wrong seq %d on instance with changed key, want 1", s)
}
@ -91,7 +94,8 @@ func TestLocalNodeEndpoint(t *testing.T) {
staticIP = net.IP{127, 0, 1, 2}
)
tmpDir := t.TempDir()
ln, db := newLocalNodeForTesting(tmpDir)
logger := log.New()
ln, db := newLocalNodeForTesting(tmpDir, logger)
defer db.Close()
// Nothing is set initially.

View File

@ -165,9 +165,6 @@ type Config struct {
// whenever a message is sent to or received from a peer
EnableMsgEvents bool
// Log is a custom logger to use with the p2p.Server.
Log log.Logger `toml:",omitempty"`
// it is actually used but a linter got confused
clock mclock.Clock //nolint:structcheck
@ -194,7 +191,7 @@ type Server struct {
ourHandshake *protoHandshake
loopWG sync.WaitGroup // loop, listenLoop
peerFeed event.Feed
log log.Logger
logger log.Logger
nodedb *enode.DB
localnode *enode.LocalNode
@ -470,22 +467,19 @@ func (srv *Server) Running() bool {
// Start starts running the server.
// Servers can not be re-used after stopping.
func (srv *Server) Start(ctx context.Context) error {
func (srv *Server) Start(ctx context.Context, logger log.Logger) error {
srv.lock.Lock()
defer srv.lock.Unlock()
if srv.running {
return errors.New("server already running")
}
srv.log = srv.Config.Log
if srv.log == nil {
srv.log = log.Root()
}
srv.logger = logger
if srv.clock == nil {
srv.clock = mclock.System{}
}
if srv.NoDial && srv.ListenAddr == "" {
srv.log.Warn("P2P server will be useless, neither dialing nor listening")
srv.logger.Warn("P2P server will be useless, neither dialing nor listening")
}
// static fields
@ -549,7 +543,7 @@ func (srv *Server) setupLocalNode() error {
return err
}
srv.nodedb = db
srv.localnode = enode.NewLocalNode(db, srv.PrivateKey)
srv.localnode = enode.NewLocalNode(db, srv.PrivateKey, srv.logger)
srv.localnode.SetFallbackIP(net.IP{127, 0, 0, 1})
srv.updateLocalNodeStaticAddrCache()
// TODO: check conflicts
@ -608,7 +602,7 @@ func (srv *Server) setupDiscovery(ctx context.Context) error {
return err
}
realaddr := conn.LocalAddr().(*net.UDPAddr)
srv.log.Trace("UDP listener up", "addr", realaddr)
srv.logger.Trace("UDP listener up", "addr", realaddr)
if srv.NAT != nil {
if !realaddr.IP.IsLoopback() && srv.NAT.SupportsMapping() {
srv.loopWG.Add(1)
@ -634,7 +628,7 @@ func (srv *Server) setupDiscovery(ctx context.Context) error {
NetRestrict: srv.NetRestrict,
Bootnodes: srv.BootstrapNodes,
Unhandled: unhandled,
Log: srv.log,
Log: srv.logger,
}
ntab, err := discover.ListenV4(ctx, conn, srv.localnode, cfg)
if err != nil {
@ -650,7 +644,7 @@ func (srv *Server) setupDiscovery(ctx context.Context) error {
PrivateKey: srv.PrivateKey,
NetRestrict: srv.NetRestrict,
Bootnodes: srv.BootstrapNodesV5,
Log: srv.log,
Log: srv.logger,
}
var err error
if sconn != nil {
@ -670,7 +664,7 @@ func (srv *Server) setupDialScheduler() {
self: srv.localnode.ID(),
maxDialPeers: srv.maxDialedConns(),
maxActiveDials: srv.MaxPendingPeers,
log: srv.Log,
log: srv.logger,
netRestrict: srv.NetRestrict,
dialer: srv.Dialer,
clock: srv.clock,
@ -754,7 +748,7 @@ func (srv *Server) doPeerOp(fn peerOpFunc) {
func (srv *Server) run() {
defer debug.LogPanic()
if len(srv.Config.Protocols) > 0 {
srv.log.Info("Started P2P networking", "version", srv.Config.Protocols[0].Version, "self", *srv.localnodeAddrCache.Load(), "name", srv.Name)
srv.logger.Info("Started P2P networking", "version", srv.Config.Protocols[0].Version, "self", *srv.localnodeAddrCache.Load(), "name", srv.Name)
}
defer srv.loopWG.Done()
defer srv.nodedb.Close()
@ -781,7 +775,7 @@ running:
case n := <-srv.addtrusted:
// This channel is used by AddTrustedPeer to add a node
// to the trusted node set.
srv.log.Trace("Adding trusted node", "node", n)
srv.logger.Trace("Adding trusted node", "node", n)
trusted[n.ID()] = true
if p, ok := peers[n.ID()]; ok {
p.rw.set(trustedConn, true)
@ -790,7 +784,7 @@ running:
case n := <-srv.removetrusted:
// This channel is used by RemoveTrustedPeer to remove a node
// from the trusted node set.
srv.log.Trace("Removing trusted node", "node", n)
srv.logger.Trace("Removing trusted node", "node", n)
delete(trusted, n.ID())
if p, ok := peers[n.ID()]; ok {
p.rw.set(trustedConn, false)
@ -818,7 +812,7 @@ running:
// The handshakes are done and it passed all checks.
p := srv.launchPeer(c, c.pubkey)
peers[c.node.ID()] = p
srv.log.Trace("Adding p2p peer", "peercount", len(peers), "id", p.ID(), "conn", c.flags, "addr", p.RemoteAddr(), "name", p.Name())
srv.logger.Trace("Adding p2p peer", "peercount", len(peers), "id", p.ID(), "conn", c.flags, "addr", p.RemoteAddr(), "name", p.Name())
srv.dialsched.peerAdded(c)
if p.Inbound() {
inboundCount++
@ -830,7 +824,7 @@ running:
// A peer disconnected.
d := common.PrettyDuration(mclock.Now() - pd.created)
delete(peers, pd.ID())
srv.log.Trace("Removing p2p peer", "peercount", len(peers), "id", pd.ID(), "duration", d, "req", pd.requested, "err", pd.err)
srv.logger.Trace("Removing p2p peer", "peercount", len(peers), "id", pd.ID(), "duration", d, "req", pd.requested, "err", pd.err)
srv.dialsched.peerRemoved(pd.rw)
if pd.Inbound() {
inboundCount--
@ -838,7 +832,7 @@ running:
}
}
srv.log.Trace("P2P networking is spinning down")
srv.logger.Trace("P2P networking is spinning down")
// Terminate discovery. If there is a running lookup it will terminate soon.
if srv.ntab != nil {
@ -856,7 +850,7 @@ running:
// is closed.
for len(peers) > 0 {
p := <-srv.delpeer
p.log.Trace("<-delpeer (spindown)")
srv.logger.Trace("<-delpeer (spindown)")
delete(peers, p.ID())
}
}
@ -881,7 +875,7 @@ func (srv *Server) postHandshakeChecks(peers map[enode.ID]*Peer, inboundCount in
// listenLoop runs in its own goroutine and accepts
// inbound connections.
func (srv *Server) listenLoop(ctx context.Context) {
srv.log.Trace("TCP listener up", "addr", srv.listener.Addr())
srv.logger.Trace("TCP listener up", "addr", srv.listener.Addr())
// The slots limit accepts of new connections.
slots := semaphore.NewWeighted(int64(srv.MaxPendingPeers))
@ -896,7 +890,7 @@ func (srv *Server) listenLoop(ctx context.Context) {
// Wait for a free slot before accepting.
if slotErr := slots.Acquire(ctx, 1); slotErr != nil {
if !errors.Is(slotErr, context.Canceled) {
srv.log.Error("Failed to get a peer connection slot", "err", slotErr)
srv.logger.Error("Failed to get a peer connection slot", "err", slotErr)
}
return
}
@ -910,7 +904,7 @@ func (srv *Server) listenLoop(ctx context.Context) {
fd, err = srv.listener.Accept()
if netutil.IsTemporaryError(err) {
if time.Since(lastLog) > 1*time.Second {
srv.log.Trace("Temporary read error", "err", err)
srv.logger.Trace("Temporary read error", "err", err)
lastLog = time.Now()
}
time.Sleep(time.Millisecond * 200)
@ -920,7 +914,7 @@ func (srv *Server) listenLoop(ctx context.Context) {
select {
case <-srv.quit:
default:
srv.log.Error("Server listener failed to accept a connection", "err", err)
srv.logger.Error("Server listener failed to accept a connection", "err", err)
}
slots.Release(1)
return
@ -930,7 +924,7 @@ func (srv *Server) listenLoop(ctx context.Context) {
remoteIP := netutil.AddrIP(fd.RemoteAddr())
if err := srv.checkInboundConn(fd, remoteIP); err != nil {
srv.log.Trace("Rejected inbound connection", "addr", fd.RemoteAddr(), "err", err)
srv.logger.Trace("Rejected inbound connection", "addr", fd.RemoteAddr(), "err", err)
_ = fd.Close()
slots.Release(1)
continue
@ -941,7 +935,7 @@ func (srv *Server) listenLoop(ctx context.Context) {
addr = tcp
}
fd = newMeteredConn(fd, true, addr)
srv.log.Trace("Accepted connection", "addr", fd.RemoteAddr())
srv.logger.Trace("Accepted connection", "addr", fd.RemoteAddr())
}
go func() {
defer debug.LogPanic()
@ -1003,7 +997,7 @@ func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *enode.Node) erro
dialPubkey = new(ecdsa.PublicKey)
if err := dialDest.Load((*enode.Secp256k1)(dialPubkey)); err != nil {
err = errors.New("dial destination doesn't have a secp256k1 public key")
srv.log.Trace("Setting up connection failed", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err)
srv.logger.Trace("Setting up connection failed", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err)
return err
}
}
@ -1011,7 +1005,7 @@ func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *enode.Node) erro
// Run the RLPx handshake.
remotePubkey, err := c.doEncHandshake(srv.PrivateKey)
if err != nil {
srv.log.Trace("Failed RLPx handshake", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err)
srv.logger.Trace("Failed RLPx handshake", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err)
return err
}
copy(c.pubkey[:], crypto.MarshalPubkey(remotePubkey))
@ -1020,7 +1014,7 @@ func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *enode.Node) erro
} else {
c.node = nodeFromConn(remotePubkey, c.fd)
}
clog := srv.log.New("id", c.node.ID(), "addr", c.fd.RemoteAddr(), "conn", c.flags)
clog := srv.logger.New("id", c.node.ID(), "addr", c.fd.RemoteAddr(), "conn", c.flags)
err = srv.checkpoint(c, srv.checkpointPostHandshake)
if err != nil {
clog.Trace("Rejected peer", "err", err)
@ -1069,7 +1063,7 @@ func (srv *Server) checkpoint(c *conn, stage chan<- *conn) error {
}
func (srv *Server) launchPeer(c *conn, pubkey [64]byte) *Peer {
p := newPeer(srv.log, c, srv.Protocols, pubkey, srv.MetricsEnabled)
p := newPeer(srv.logger, c, srv.Protocols, pubkey, srv.MetricsEnabled)
if srv.EnableMsgEvents {
// If message events are enabled, pass the peerFeed
// to the peer.

View File

@ -33,7 +33,6 @@ import (
"github.com/ledgerwatch/erigon/p2p/enode"
"github.com/ledgerwatch/erigon/p2p/enr"
"github.com/ledgerwatch/erigon/p2p/rlpx"
"github.com/ledgerwatch/erigon/turbo/testlog"
"github.com/ledgerwatch/log/v3"
)
@ -68,7 +67,7 @@ func (c *testTransport) close(err error) {
c.closeErr = err
}
func startTestServer(t *testing.T, remoteKey *ecdsa.PublicKey, pf func(*Peer)) *Server {
func startTestServer(t *testing.T, remoteKey *ecdsa.PublicKey, pf func(*Peer), logger log.Logger) *Server {
config := Config{
Name: "test",
MaxPeers: 10,
@ -76,7 +75,6 @@ func startTestServer(t *testing.T, remoteKey *ecdsa.PublicKey, pf func(*Peer)) *
ListenAddr: "127.0.0.1:0",
NoDiscovery: true,
PrivateKey: newkey(),
Log: testlog.Logger(t, log.LvlError),
}
server := &Server{
Config: config,
@ -85,17 +83,18 @@ func startTestServer(t *testing.T, remoteKey *ecdsa.PublicKey, pf func(*Peer)) *
return newTestTransport(remoteKey, fd, dialDest)
},
}
if err := server.TestStart(); err != nil {
if err := server.TestStart(logger); err != nil {
t.Fatalf("Could not start server: %v", err)
}
return server
}
func (srv *Server) TestStart() error {
return srv.Start(context.Background())
func (srv *Server) TestStart(logger log.Logger) error {
return srv.Start(context.Background(), logger)
}
func TestServerListen(t *testing.T) {
logger := log.New()
// start the test server
connected := make(chan *Peer)
remid := &newkey().PublicKey
@ -104,7 +103,7 @@ func TestServerListen(t *testing.T) {
t.Error("peer func called with wrong node id")
}
connected <- p
})
}, logger)
defer close(connected)
defer srv.Stop()
@ -131,6 +130,7 @@ func TestServerListen(t *testing.T) {
}
func TestServerDial(t *testing.T) {
logger := log.New()
// run a one-shot TCP server to handle the connection.
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
@ -149,7 +149,7 @@ func TestServerDial(t *testing.T) {
// start the server
connected := make(chan *Peer)
remid := &newkey().PublicKey
srv := startTestServer(t, remid, func(p *Peer) { connected <- p })
srv := startTestServer(t, remid, func(p *Peer) { connected <- p }, logger)
defer close(connected)
defer srv.Stop()
@ -212,12 +212,12 @@ func TestServerDial(t *testing.T) {
// This test checks that RemovePeer disconnects the peer if it is connected.
func TestServerRemovePeerDisconnect(t *testing.T) {
logger := log.New()
srv1 := &Server{Config: Config{
PrivateKey: newkey(),
MaxPeers: 1,
MaxPendingPeers: 1,
NoDiscovery: true,
Log: testlog.Logger(t, log.LvlTrace).New("server", "1"),
}}
srv2 := &Server{Config: Config{
PrivateKey: newkey(),
@ -226,13 +226,12 @@ func TestServerRemovePeerDisconnect(t *testing.T) {
NoDiscovery: true,
NoDial: true,
ListenAddr: "127.0.0.1:0",
Log: testlog.Logger(t, log.LvlTrace).New("server", "2"),
}}
if err := srv1.TestStart(); err != nil {
if err := srv1.TestStart(logger); err != nil {
t.Fatal("cant start srv1")
}
defer srv1.Stop()
if err := srv2.TestStart(); err != nil {
if err := srv2.TestStart(logger); err != nil {
t.Fatal("cant start srv2")
}
defer srv2.Stop()
@ -249,6 +248,7 @@ func TestServerRemovePeerDisconnect(t *testing.T) {
// This test checks that connections are disconnected just after the encryption handshake
// when the server is at capacity. Trusted connections should still be accepted.
func TestServerAtCap(t *testing.T) {
logger := log.New()
trustedNode := newkey()
trustedID := enode.PubkeyToIDV4(&trustedNode.PublicKey)
srv := &Server{
@ -259,10 +259,9 @@ func TestServerAtCap(t *testing.T) {
NoDial: true,
NoDiscovery: true,
TrustedNodes: []*enode.Node{newNode(trustedID, "")},
Log: testlog.Logger(t, log.LvlTrace),
},
}
if err := srv.TestStart(); err != nil {
if err := srv.TestStart(logger); err != nil {
t.Fatalf("could not start: %v", err)
}
defer srv.Stop()
@ -329,6 +328,7 @@ func TestServerAtCap(t *testing.T) {
}
func TestServerPeerLimits(t *testing.T) {
logger := log.New()
srvkey := newkey()
clientkey := newkey()
clientnode := enode.NewV4(&clientkey.PublicKey, nil, 0, 0)
@ -350,11 +350,10 @@ func TestServerPeerLimits(t *testing.T) {
NoDial: true,
NoDiscovery: true,
Protocols: []Protocol{discard},
Log: testlog.Logger(t, log.LvlTrace),
},
newTransport: func(fd net.Conn, dialDest *ecdsa.PublicKey) transport { return tp },
}
if err := srv.TestStart(); err != nil {
if err := srv.TestStart(logger); err != nil {
t.Fatalf("couldn't start server: %v", err)
}
defer srv.Stop()
@ -391,6 +390,7 @@ func TestServerPeerLimits(t *testing.T) {
}
func TestServerSetupConn(t *testing.T) {
logger := log.New()
var (
clientkey, srvkey = newkey(), newkey()
clientpub = &clientkey.PublicKey
@ -454,15 +454,13 @@ func TestServerSetupConn(t *testing.T) {
NoDial: true,
NoDiscovery: true,
Protocols: []Protocol{discard},
Log: testlog.Logger(t, log.LvlTrace),
}
srv := &Server{
Config: cfg,
newTransport: func(fd net.Conn, dialDest *ecdsa.PublicKey) transport { return test.tt }, //nolint:scopelint
log: cfg.Log,
}
if !test.dontstart {
if err := srv.TestStart(); err != nil {
if err := srv.TestStart(logger); err != nil {
t.Fatalf("couldn't start server: %v", err)
}
defer srv.Stop()
@ -540,6 +538,7 @@ func randomID() (id enode.ID) {
// This test checks that inbound connections are throttled by IP.
func TestServerInboundThrottle(t *testing.T) {
logger := log.New()
const timeout = 5 * time.Second
newTransportCalled := make(chan struct{})
srv := &Server{
@ -551,7 +550,6 @@ func TestServerInboundThrottle(t *testing.T) {
NoDial: true,
NoDiscovery: true,
Protocols: []Protocol{discard},
Log: testlog.Logger(t, log.LvlTrace),
},
newTransport: func(fd net.Conn, dialDest *ecdsa.PublicKey) transport {
newTransportCalled <- struct{}{}
@ -562,7 +560,7 @@ func TestServerInboundThrottle(t *testing.T) {
return listenFakeAddr(network, laddr, fakeAddr)
},
}
if err := srv.TestStart(); err != nil {
if err := srv.TestStart(logger); err != nil {
t.Fatal("can't start: ", err)
}
defer srv.Stop()

View File

@ -106,7 +106,7 @@ func SetupLoggerCmd(filePrefix string, cmd *cobra.Command) log.Logger {
// SetupLoggerCmd perform the logging using parametrs specifying by `flag` package, and sets it to the root logger
// This is the function which is NOT used by Erigon itself, but instead by utility commans
func SetupLogger(filePrefix string) {
func SetupLogger(filePrefix string) log.Logger {
var logConsoleVerbosity = flag.String(LogConsoleVerbosityFlag.Name, "", LogConsoleVerbosityFlag.Usage)
var logDirVerbosity = flag.String(LogDirVerbosityFlag.Name, "", LogDirVerbosityFlag.Usage)
var logDirPath = flag.String(LogDirPathFlag.Name, "", LogDirPathFlag.Usage)
@ -134,6 +134,7 @@ func SetupLogger(filePrefix string) {
}
initSeparatedLogging(log.Root(), filePrefix, *logDirPath, consoleLevel, dirLevel, consoleJson, *dirJson)
return log.Root()
}
// initSeparatedLogging construct a log handler accrosing to the configuration parameters passed to it