diff --git a/cl/sentinel/sentinel.go b/cl/sentinel/sentinel.go index 51fab5101..839906fb1 100644 --- a/cl/sentinel/sentinel.go +++ b/cl/sentinel/sentinel.go @@ -168,7 +168,7 @@ func (s *Sentinel) createListener() (*discover.UDPv5, error) { // Start stream handlers handlers.NewConsensusHandlers(s.ctx, s.db, s.host, s.peers, s.cfg.BeaconConfig, s.cfg.GenesisConfig, s.metadataV2).Start() - net, err := discover.ListenV5(s.ctx, conn, localNode, discCfg) + net, err := discover.ListenV5(s.ctx, "any", conn, localNode, discCfg) if err != nil { return nil, err } diff --git a/cmd/bootnode/main.go b/cmd/bootnode/main.go index 7339cf06a..eedde266a 100644 --- a/cmd/bootnode/main.go +++ b/cmd/bootnode/main.go @@ -131,11 +131,11 @@ func main() { } if *runv5 { - if _, err := discover.ListenV5(ctx, conn, ln, cfg); err != nil { + if _, err := discover.ListenV5(ctx, "any", conn, ln, cfg); err != nil { utils.Fatalf("%v", err) } } else { - if _, err := discover.ListenUDP(ctx, conn, ln, cfg); err != nil { + if _, err := discover.ListenUDP(ctx, "any", conn, ln, cfg); err != nil { utils.Fatalf("%v", err) } } diff --git a/cmd/observer/observer/server.go b/cmd/observer/observer/server.go index 4c017f337..99c2cb4bb 100644 --- a/cmd/observer/observer/server.go +++ b/cmd/observer/observer/server.go @@ -183,5 +183,5 @@ func (server *Server) Listen(ctx context.Context) (*discover.UDPv4, error) { server.logger.Debug("Discovery UDP listener is up", "addr", realAddr) - return discover.ListenV4(ctx, conn, server.localNode, server.discConfig) + return discover.ListenV4(ctx, "any", conn, server.localNode, server.discConfig) } diff --git a/erigon-lib/diagnostics/network.go b/erigon-lib/diagnostics/network.go index 08bfaed8d..7436a4b91 100644 --- a/erigon-lib/diagnostics/network.go +++ b/erigon-lib/diagnostics/network.go @@ -16,8 +16,6 @@ package diagnostics -import "reflect" - func (p PeerStatistics) Type() Type { - return Type(reflect.TypeOf(p)) + return TypeOf(p) } diff --git a/erigon-lib/diagnostics/provider.go b/erigon-lib/diagnostics/provider.go index c1c2ae756..2982df163 100644 --- a/erigon-lib/diagnostics/provider.go +++ b/erigon-lib/diagnostics/provider.go @@ -6,6 +6,7 @@ import ( "fmt" "reflect" "sync" + "sync/atomic" "github.com/ledgerwatch/erigon-lib/common/dbg" "github.com/ledgerwatch/log/v3" @@ -17,7 +18,35 @@ const ( ckChan ctxKey = iota ) -type Type reflect.Type +type Type interface { + reflect.Type + Context() context.Context + Err() error +} + +type diagType struct { + reflect.Type +} + +var cancelled = func() context.Context { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + return ctx +}() + +func (t diagType) Context() context.Context { + providerMutex.Lock() + defer providerMutex.Unlock() + if reg := providers[t]; reg != nil { + return reg.context + } + + return cancelled +} + +func (t diagType) Err() error { + return t.Context().Err() +} type Info interface { Type() Type @@ -25,7 +54,7 @@ type Info interface { func TypeOf(i Info) Type { t := reflect.TypeOf(i) - return Type(t) + return diagType{t} } type Provider interface { @@ -50,7 +79,7 @@ func RegisterProvider(provider Provider, infoType Type, logger log.Logger) { providerMutex.Lock() defer providerMutex.Unlock() - reg, _ := providers[infoType] + reg := providers[infoType] if reg != nil { for _, p := range reg.providers { @@ -73,13 +102,15 @@ func RegisterProvider(provider Provider, infoType Type, logger log.Logger) { func StartProviders(ctx context.Context, infoType Type, logger log.Logger) { providerMutex.Lock() - reg, _ := providers[infoType] + reg := providers[infoType] + + if reg == nil { + reg = ®istry{} + providers[infoType] = reg + } toStart := make([]Provider, len(reg.providers)) - - for i, provider := range reg.providers { - toStart[i] = provider - } + copy(toStart, reg.providers) reg.context = ctx @@ -105,18 +136,29 @@ func startProvider(ctx context.Context, infoType Type, provider Provider, logger } } -func Send[I Info](ctx context.Context, info I) error { +func Send[I Info](info I) error { + ctx := info.Type().Context() + if ctx.Err() != nil { + if !errors.Is(ctx.Err(), context.Canceled) { + // drop the diagnostic message if there is + // no active diagnostic context for the type + return nil + } + return ctx.Err() } cval := ctx.Value(ckChan) - if c, ok := cval.(chan I); ok { - select { - case c <- info: - default: - // drop the diagnostic message if the receiver is busy - // so the sender is not blocked on non critcal actions + + if cp, ok := cval.(*atomic.Pointer[chan I]); ok { + if c := (*cp).Load(); c != nil { + select { + case *c <- info: + default: + // drop the diagnostic message if the receiver is busy + // so the sender is not blocked on non critcal actions + } } } else { return fmt.Errorf("unexpected channel type: %T", cval) @@ -126,16 +168,20 @@ func Send[I Info](ctx context.Context, info I) error { } func Context[I Info](ctx context.Context, buffer int) (context.Context, <-chan I, context.CancelFunc) { - ch := make(chan I, buffer) - ctx = context.WithValue(ctx, ckChan, ch) + c := make(chan I, buffer) + cp := atomic.Pointer[chan I]{} + cp.Store(&c) + + ctx = context.WithValue(ctx, ckChan, &cp) ctx, cancel := context.WithCancel(ctx) - return ctx, ch, func() { - if ch != nil { - toClose := ch - ch = nil - close(toClose) - } + return ctx, *cp.Load(), func() { cancel() + + if cp.CompareAndSwap(&c, nil) { + ch := c + c = nil + close(ch) + } } } diff --git a/erigon-lib/diagnostics/provider_test.go b/erigon-lib/diagnostics/provider_test.go index 7d8ea6b10..b5f2fefc7 100644 --- a/erigon-lib/diagnostics/provider_test.go +++ b/erigon-lib/diagnostics/provider_test.go @@ -31,7 +31,7 @@ func (t *testProvider) StartDiagnostics(ctx context.Context) error { case <-ctx.Done(): return nil case <-timer.C: - diagnostics.Send(ctx, testInfo{count}) + diagnostics.Send(testInfo{count}) count++ } } @@ -54,6 +54,25 @@ func TestProviderRegistration(t *testing.T) { } } +func TestDelayedProviderRegistration(t *testing.T) { + + time.AfterFunc(1*time.Second, func() { + // diagnostics provider + provider := &testProvider{} + diagnostics.RegisterProvider(provider, diagnostics.TypeOf(testInfo{}), log.Root()) + }) + + // diagnostics receiver + ctx, ch, cancel := diagnostics.Context[testInfo](context.Background(), 1) + diagnostics.StartProviders(ctx, diagnostics.TypeOf(testInfo{}), log.Root()) + + for info := range ch { + if info.count == 3 { + cancel() + } + } +} + func TestProviderFuncRegistration(t *testing.T) { // diagnostics provider @@ -68,7 +87,7 @@ func TestProviderFuncRegistration(t *testing.T) { case <-ctx.Done(): return nil case <-timer.C: - diagnostics.Send(ctx, testInfo{count}) + diagnostics.Send(testInfo{count}) count++ } } diff --git a/eth/stagedsync/stage_headers.go b/eth/stagedsync/stage_headers.go index 5126aa9a5..6dc0a9a3b 100644 --- a/eth/stagedsync/stage_headers.go +++ b/eth/stagedsync/stage_headers.go @@ -204,6 +204,7 @@ Loop: if req != nil { peer, sentToPeer = cfg.headerReqSend(ctx, req) if sentToPeer { + logger.Debug(fmt.Sprintf("[%s] Requested header", logPrefix), "from", req.Number, "length", req.Length) cfg.hd.UpdateStats(req, false /* skeleton */, peer) cfg.hd.UpdateRetryTime(req, currentTime, 5*time.Second /* timeout */) } @@ -233,6 +234,7 @@ Loop: if req != nil { peer, sentToPeer = cfg.headerReqSend(ctx, req) if sentToPeer { + logger.Debug(fmt.Sprintf("[%s] Requested skeleton", logPrefix), "from", req.Number, "length", req.Length) cfg.hd.UpdateStats(req, true /* skeleton */, peer) lastSkeletonTime = time.Now() } diff --git a/p2p/dial.go b/p2p/dial.go index cadb821d5..8bb3934eb 100644 --- a/p2p/dial.go +++ b/p2p/dial.go @@ -43,7 +43,6 @@ const ( // Config for the "Looking for peers" message. dialStatsLogInterval = 60 * time.Second // printed at most this often - dialStatsPeerLimit = 20 // but not if more than this many dialed peers // Endpoint resolution is throttled with bounded backoff. initialResolveDelay = 60 * time.Second @@ -94,6 +93,7 @@ var ( // to create peer connections to nodes arriving through the iterator. type dialScheduler struct { dialConfig + mutex sync.Mutex setupFunc dialSetupFunc wg sync.WaitGroup cancel context.CancelFunc @@ -126,8 +126,8 @@ type dialScheduler struct { historyTimerTime mclock.AbsTime // for logStats - lastStatsLog mclock.AbsTime - doneSinceLastLog int + dialed int + errors map[string]uint } type dialSetupFunc func(net.Conn, connFlag, *enode.Node) error @@ -177,8 +177,9 @@ func newDialScheduler(config dialConfig, it enode.Iterator, setupFunc dialSetupF remPeerCh: make(chan *conn), subProtocolVersion: subProtocolVersion, + errors: map[string]uint{}, } - d.lastStatsLog = d.clock.Now() + d.ctx, d.cancel = context.WithCancel(context.Background()) d.wg.Add(2) go d.readNodes(it) @@ -232,6 +233,9 @@ func (d *dialScheduler) loop(it enode.Iterator) { historyExp = make(chan struct{}, 1) ) + logTimer := time.NewTicker(dialStatsLogInterval) + defer logTimer.Stop() + loop: for { // Launch new dials if slots are available. @@ -243,13 +247,15 @@ loop: nodesCh = nil } d.rearmHistoryTimer(historyExp) - //d.logStats() select { case <-d.ctx.Done(): it.Close() break loop + case <-logTimer.C: + d.logStats() + case node := <-nodesCh: if err := d.checkDial(node); err != nil { d.log.Trace("Discarding dial candidate", "id", node.ID(), "ip", node.IP(), "reason", err) @@ -261,7 +267,7 @@ loop: id := task.dest.ID() delete(d.dialing, id) d.updateStaticPool(id) - d.doneSinceLastLog++ + d.dialed++ case c := <-d.addPeerCh: if c.is(dynDialedConn) || c.is(staticDialedConn) { @@ -337,15 +343,16 @@ func (d *dialScheduler) readNodes(it enode.Iterator) { // or comes back online. // nolint func (d *dialScheduler) logStats() { - now := d.clock.Now() - if d.lastStatsLog.Add(dialStatsLogInterval) > now { - return + vals := []interface{}{"protocol", d.subProtocolVersion, + "peers", fmt.Sprintf("%d/%d", len(d.peers), d.maxDialPeers), "tried", d.dialed, "static", len(d.static)} + + d.mutex.Lock() + for err, count := range d.errors { + vals = append(vals, err, count) } - if d.dialPeers < dialStatsPeerLimit && d.dialPeers < d.maxDialPeers { - d.log.Info("[p2p] Looking for peers", "protocol", d.subProtocolVersion, "peers", fmt.Sprintf("%d/%d", len(d.peers), d.maxDialPeers), "tried", d.doneSinceLastLog, "static", len(d.static)) - } - d.doneSinceLastLog = 0 - d.lastStatsLog = now + d.mutex.Unlock() + + d.log.Debug("[p2p] Dial scheduler", vals...) } // rearmHistoryTimer configures d.historyTimer to fire when the @@ -543,7 +550,12 @@ func (t *dialTask) resolve(d *dialScheduler) bool { func (t *dialTask) dial(d *dialScheduler, dest *enode.Node) error { fd, err := d.dialer.Dial(d.ctx, t.dest) if err != nil { - d.log.Trace("Dial error", "id", t.dest.ID(), "addr", nodeAddr(t.dest), "conn", t.flags, "err", cleanupDialErr(err)) + cleanErr := cleanupDialErr(err) + d.log.Trace("Dial error", "id", t.dest.ID(), "addr", nodeAddr(t.dest), "conn", t.flags, "err", cleanErr) + + d.mutex.Lock() + d.errors[cleanErr.Error()] = d.errors[cleanErr.Error()] + 1 + d.mutex.Unlock() return &dialError{err} } mfd := newMeteredConn(fd, false, &net.TCPAddr{IP: dest.IP(), Port: dest.TCP()}) diff --git a/p2p/discover/common.go b/p2p/discover/common.go index 6ee5c4c0b..da45e7b6d 100644 --- a/p2p/discover/common.go +++ b/p2p/discover/common.go @@ -86,8 +86,8 @@ func (cfg Config) withDefaults(defaultReplyTimeout time.Duration) Config { } // ListenUDP starts listening for discovery packets on the given UDP socket. -func ListenUDP(ctx context.Context, c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) { - return ListenV4(ctx, c, ln, cfg) +func ListenUDP(ctx context.Context, protocol string, c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) { + return ListenV4(ctx, protocol, c, ln, cfg) } // ReadPacket is a packet that couldn't be handled. Those packets are sent to the unhandled @@ -96,3 +96,8 @@ type ReadPacket struct { Data []byte Addr *net.UDPAddr } + +type UnhandledPacket struct { + ReadPacket + Reason error +} diff --git a/p2p/discover/lookup.go b/p2p/discover/lookup.go index 0e03daa30..87ba2c2d5 100644 --- a/p2p/discover/lookup.go +++ b/p2p/discover/lookup.go @@ -155,6 +155,7 @@ func (it *lookup) slowdown() { func (it *lookup) query(n *node, reply chan<- []*node) { fails := it.tab.db.FindFails(n.ID(), n.IP()) r, err := it.queryfunc(n) + if err == errClosed { // Avoid recording failures on shutdown. reply <- nil @@ -180,6 +181,7 @@ func (it *lookup) query(n *node, reply chan<- []*node) { for _, n := range r { it.tab.addSeenNode(n) } + reply <- r } diff --git a/p2p/discover/table.go b/p2p/discover/table.go index eaa794034..feaf5d397 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -55,12 +55,13 @@ const ( bucketIPLimit, bucketSubnet = 2, 24 // at most 2 addresses from the same /24 tableIPLimit, tableSubnet = 10, 24 - refreshInterval = 30 * time.Minute - revalidateInterval = 5 * time.Second - copyNodesInterval = 30 * time.Second - seedMinTableTime = 5 * time.Minute - seedCount = 30 - seedMaxAge = 5 * 24 * time.Hour + minRefreshInterval = 30 * time.Second + refreshInterval = 30 * time.Minute + revalidateInterval = 5 * time.Second + maintenanceInterval = 60 * time.Second + seedMinTableTime = 5 * time.Minute + seedCount = 30 + seedMaxAge = 5 * 24 * time.Hour ) // Table is the 'node table', a Kademlia-like index of neighbor nodes. The table keeps @@ -84,6 +85,12 @@ type Table struct { closed chan struct{} nodeAddedHook func(*node) // for testing + + // diagnostics + errors map[string]uint + dbseeds int + revalidates int + protocol string } // transport is implemented by the UDP transports. @@ -93,6 +100,9 @@ type transport interface { lookupRandom() []*enode.Node lookupSelf() []*enode.Node ping(*enode.Node) (seq uint64, err error) + Version() string + Errors() map[string]uint + LenUnsolicited() int } // bucket contains nodes, ordered by their last activity. the entry @@ -105,24 +115,25 @@ type bucket struct { func newTable( t transport, + protocol string, db *enode.DB, bootnodes []*enode.Node, revalidateInterval time.Duration, logger log.Logger, ) (*Table, error) { tab := &Table{ - net: t, - db: db, - refreshReq: make(chan chan struct{}), - initDone: make(chan struct{}), - closeReq: make(chan struct{}), - closed: make(chan struct{}), - rand: mrand.New(mrand.NewSource(0)), // nolint: gosec - ips: netutil.DistinctNetSet{Subnet: tableSubnet, Limit: tableIPLimit}, - + net: t, + db: db, + refreshReq: make(chan chan struct{}), + initDone: make(chan struct{}), + closeReq: make(chan struct{}), + closed: make(chan struct{}), + rand: mrand.New(mrand.NewSource(0)), // nolint: gosec + ips: netutil.DistinctNetSet{Subnet: tableSubnet, Limit: tableIPLimit}, + errors: map[string]uint{}, revalidateInterval: revalidateInterval, - - log: logger, + protocol: protocol, + log: logger, } if err := tab.setFallbackNodes(bootnodes); err != nil { return nil, err @@ -147,8 +158,8 @@ func (tab *Table) seedRand() { crand.Read(b[:]) tab.mutex.Lock() + defer tab.mutex.Unlock() tab.rand.Seed(int64(binary.BigEndian.Uint64(b[:]))) - tab.mutex.Unlock() } // ReadRandomNodes fills the given slice with random nodes from the table. The results @@ -157,6 +168,7 @@ func (tab *Table) ReadRandomNodes(buf []*enode.Node) (n int) { if !tab.isInitDone() { return 0 } + tab.mutex.Lock() defer tab.mutex.Unlock() @@ -230,21 +242,29 @@ func (tab *Table) refresh() <-chan struct{} { // loop schedules runs of doRefresh, doRevalidate and copyLiveNodes. func (tab *Table) loop() { var ( - revalidate = time.NewTimer(tab.revalidateInterval) - refresh = time.NewTicker(refreshInterval) - copyNodes = time.NewTicker(copyNodesInterval) - refreshDone = make(chan struct{}) // where doRefresh reports completion - revalidateDone chan struct{} // where doRevalidate reports completion - waiting = []chan struct{}{tab.initDone} // holds waiting callers while doRefresh runs + revalidate = time.NewTimer(tab.revalidateInterval) + refresh = time.NewTicker(refreshInterval) + tableMainenance = time.NewTicker(maintenanceInterval) + refreshDone = make(chan struct{}) // where doRefresh reports completion + revalidateDone chan struct{} // where doRevalidate reports completion + waiting = []chan struct{}{tab.initDone} // holds waiting callers while doRefresh runs ) defer debug.LogPanic() defer refresh.Stop() defer revalidate.Stop() - defer copyNodes.Stop() + defer tableMainenance.Stop() // Start initial refresh. go tab.doRefresh(refreshDone) + var minRefreshTimer *time.Timer + + defer func() { + if minRefreshTimer != nil { + minRefreshTimer.Stop() + } + }() + loop: for { select { @@ -266,13 +286,49 @@ loop: } waiting, refreshDone = nil, nil case <-revalidate.C: - revalidateDone = make(chan struct{}) - go tab.doRevalidate(revalidateDone) + if revalidateDone == nil { + revalidateDone = make(chan struct{}) + go tab.doRevalidate(revalidateDone) + } case <-revalidateDone: revalidate.Reset(tab.revalidateInterval) + if tab.live() == 0 && len(waiting) == 0 && minRefreshTimer == nil { + minRefreshTimer = time.AfterFunc(minRefreshInterval, func() { + minRefreshTimer = nil + tab.net.lookupRandom() + tab.refresh() + }) + } revalidateDone = nil - case <-copyNodes.C: - go tab.copyLiveNodes() + case <-tableMainenance.C: + live := tab.live() + + vals := []interface{}{"protocol", tab.protocol, "version", tab.net.Version(), + "len", tab.len(), "live", tab.live(), "unsol", tab.net.LenUnsolicited(), "ips", tab.ips.Len(), "db", tab.dbseeds, "reval", tab.revalidates} + + func() { + tab.mutex.Lock() + defer tab.mutex.Unlock() + + for err, count := range tab.errors { + vals = append(vals, err, count) + } + + for err, count := range tab.net.Errors() { + vals = append(vals, err, count) + } + }() + + tab.log.Debug("[p2p] Discovery table", vals...) + + if live != 0 { + if revalidateDone == nil { + revalidateDone = make(chan struct{}) + go tab.doRevalidate(revalidateDone) + } + } else { + go tab.copyLiveNodes() + } case <-tab.closeReq: break loop } @@ -316,7 +372,10 @@ func (tab *Table) doRefresh(done chan struct{}) { } func (tab *Table) loadSeedNodes() { - seeds := wrapNodes(tab.db.QuerySeeds(seedCount, seedMaxAge)) + dbseeds := tab.db.QuerySeeds(seedCount, seedMaxAge) + tab.dbseeds = len(dbseeds) + + seeds := wrapNodes(dbseeds) tab.log.Debug("QuerySeeds read nodes from the node DB", "count", len(seeds)) seeds = append(seeds, tab.nursery...) for i := range seeds { @@ -333,6 +392,8 @@ func (tab *Table) doRevalidate(done chan<- struct{}) { defer debug.LogPanic() defer func() { done <- struct{}{} }() + tab.revalidates++ + last, bi := tab.nodeToRevalidate() if last == nil { // No non-empty bucket found. @@ -343,11 +404,14 @@ func (tab *Table) doRevalidate(done chan<- struct{}) { remoteSeq, rErr := tab.net.ping(unwrapNode(last)) // Also fetch record if the node replied and returned a higher sequence number. - if last.Seq() < remoteSeq { - if n, err := tab.net.RequestENR(unwrapNode(last)); err != nil { - tab.log.Trace("ENR request failed", "id", last.ID(), "addr", last.addr(), "err", err) - } else { - last = &node{Node: *n, addedAt: last.addedAt, livenessChecks: last.livenessChecks} + if rErr == nil { + if last.Seq() < remoteSeq { + if n, err := tab.net.RequestENR(unwrapNode(last)); err != nil { + rErr = err + tab.log.Trace("ENR request failed", "id", last.ID(), "addr", last.addr(), "err", err) + } else { + last = &node{Node: *n, addedAt: last.addedAt, livenessChecks: last.livenessChecks} + } } } @@ -360,7 +424,10 @@ func (tab *Table) doRevalidate(done chan<- struct{}) { tab.log.Trace("Revalidated node", "b", bi, "id", last.ID(), "checks", last.livenessChecks) tab.bumpInBucket(b, last) return + } else { + tab.addError(rErr) } + // No reply received, pick a replacement or delete the node if there aren't // any replacements. if r := tab.replace(b, last); r != nil { @@ -444,6 +511,26 @@ func (tab *Table) len() (n int) { return n } +func (tab *Table) live() (n int) { + tab.mutex.Lock() + defer tab.mutex.Unlock() + + for _, b := range &tab.buckets { + for _, e := range b.entries { + if e.livenessChecks > 0 { + n++ + } + } + } + + return n +} + +func (tab *Table) addError(err error) { + str := err.Error() + tab.errors[str] = tab.errors[str] + 1 +} + // bucketLen returns the number of nodes in the bucket for the given ID. func (tab *Table) bucketLen(id enode.ID) int { tab.mutex.Lock() @@ -477,6 +564,7 @@ func (tab *Table) addSeenNode(n *node) { tab.mutex.Lock() defer tab.mutex.Unlock() + b := tab.bucket(n.ID()) if contains(b.entries, n.ID()) { // Already in bucket, don't add. @@ -519,6 +607,7 @@ func (tab *Table) addVerifiedNode(n *node) { tab.mutex.Lock() defer tab.mutex.Unlock() + b := tab.bucket(n.ID()) if tab.bumpInBucket(b, n) { // Already in bucket, moved to front. diff --git a/p2p/discover/table_util_test.go b/p2p/discover/table_util_test.go index 50cff8aeb..e46131928 100644 --- a/p2p/discover/table_util_test.go +++ b/p2p/discover/table_util_test.go @@ -48,7 +48,7 @@ func newTestTable(t transport, tmpDir string) (*Table, *enode.DB) { if err != nil { panic(err) } - tab, _ := newTable(t, db, nil, time.Hour, log.Root()) + tab, _ := newTable(t, "test", db, nil, time.Hour, log.Root()) go tab.loop() return tab, db } @@ -156,6 +156,9 @@ func (t *pingRecorder) updateRecord(n *enode.Node) { // Stubs to satisfy the transport interface. func (t *pingRecorder) Self() *enode.Node { return nullNode } +func (t *pingRecorder) Version() string { return "none" } +func (t *pingRecorder) Errors() map[string]uint { return nil } +func (t *pingRecorder) LenUnsolicited() int { return 0 } func (t *pingRecorder) lookupSelf() []*enode.Node { return nil } func (t *pingRecorder) lookupRandom() []*enode.Node { return nil } diff --git a/p2p/discover/v4_udp.go b/p2p/discover/v4_udp.go index 38687292d..9d962df04 100644 --- a/p2p/discover/v4_udp.go +++ b/p2p/discover/v4_udp.go @@ -28,6 +28,7 @@ import ( "sync" "time" + lru "github.com/hashicorp/golang-lru/v2" "github.com/ledgerwatch/erigon/common/debug" "github.com/ledgerwatch/erigon/crypto" "github.com/ledgerwatch/erigon/p2p/discover/v4wire" @@ -47,8 +48,14 @@ var ( errLowPort = errors.New("low port") ) +var ( + errExpiredStr = errExpired.Error() + errUnsolicitedReplyStr = errUnsolicitedReply.Error() + errUnknownNodeStr = errUnknownNode.Error() +) + const ( - respTimeout = 500 * time.Millisecond + respTimeout = 750 * time.Millisecond expiration = 20 * time.Second bondExpiration = 24 * time.Hour @@ -65,6 +72,7 @@ const ( // UDPv4 implements the v4 wire protocol. type UDPv4 struct { + mutex sync.Mutex conn UDPConn log log.Logger netrestrict *netutil.Netlist @@ -75,13 +83,16 @@ type UDPv4 struct { closeOnce sync.Once wg sync.WaitGroup - addReplyMatcher chan *replyMatcher - gotreply chan reply - replyTimeout time.Duration - pingBackDelay time.Duration - closeCtx context.Context - cancelCloseCtx context.CancelFunc - + addReplyMatcher chan *replyMatcher + gotreply chan reply + gotkey chan v4wire.Pubkey + gotnodes chan nodes + replyTimeout time.Duration + pingBackDelay time.Duration + closeCtx context.Context + cancelCloseCtx context.CancelFunc + errors map[string]uint + unsolicitedNodes *lru.Cache[enode.ID, *enode.Node] privateKeyGenerator func() (*ecdsa.PrivateKey, error) } @@ -98,6 +109,7 @@ type replyMatcher struct { // these fields must match in the reply. from enode.ID ip net.IP + port int ptype byte // time when the request must complete @@ -124,33 +136,44 @@ type replyMatchFunc func(v4wire.Packet) (matched bool, requestDone bool) type reply struct { from enode.ID ip net.IP + port int data v4wire.Packet // loop indicates whether there was // a matching request by sending on this channel. matched chan<- bool } -func ListenV4(ctx context.Context, c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) { +type nodes struct { + addr *net.UDPAddr + nodes []v4wire.Node +} + +func ListenV4(ctx context.Context, protocol string, c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) { cfg = cfg.withDefaults(respTimeout) closeCtx, cancel := context.WithCancel(ctx) - t := &UDPv4{ - conn: c, - priv: cfg.PrivateKey, - netrestrict: cfg.NetRestrict, - localNode: ln, - db: ln.Database(), - gotreply: make(chan reply), - addReplyMatcher: make(chan *replyMatcher), - replyTimeout: cfg.ReplyTimeout, - pingBackDelay: cfg.PingBackDelay, - closeCtx: closeCtx, - cancelCloseCtx: cancel, - log: cfg.Log, + unsolicitedNodes, _ := lru.New[enode.ID, *enode.Node](500) + t := &UDPv4{ + conn: c, + priv: cfg.PrivateKey, + netrestrict: cfg.NetRestrict, + localNode: ln, + db: ln.Database(), + gotreply: make(chan reply, 10), + addReplyMatcher: make(chan *replyMatcher, 10), + gotkey: make(chan v4wire.Pubkey, 10), + gotnodes: make(chan nodes, 10), + replyTimeout: cfg.ReplyTimeout, + pingBackDelay: cfg.PingBackDelay, + closeCtx: closeCtx, + cancelCloseCtx: cancel, + log: cfg.Log, + errors: map[string]uint{}, + unsolicitedNodes: unsolicitedNodes, privateKeyGenerator: cfg.PrivateKeyGenerator, } - tab, err := newTable(t, ln.Database(), cfg.Bootnodes, cfg.TableRevalidateInterval, cfg.Log) + tab, err := newTable(t, protocol, ln.Database(), cfg.Bootnodes, cfg.TableRevalidateInterval, cfg.Log) if err != nil { return nil, err } @@ -168,6 +191,28 @@ func (t *UDPv4) Self() *enode.Node { return t.localNode.Node() } +func (t *UDPv4) Version() string { + return "v4" +} + +func (t *UDPv4) Errors() map[string]uint { + errors := map[string]uint{} + + t.mutex.Lock() + for key, value := range t.errors { + errors[key] = value + } + t.mutex.Unlock() + + return errors +} + +func (t *UDPv4) LenUnsolicited() int { + t.mutex.Lock() + defer t.mutex.Unlock() + return t.unsolicitedNodes.Len() +} + // Close shuts down the socket and aborts any running queries. func (t *UDPv4) Close() { t.closeOnce.Do(func() { @@ -241,7 +286,7 @@ func (t *UDPv4) sendPing(toid enode.ID, toaddr *net.UDPAddr, callback func()) *r } // Add a matcher for the reply to the pending reply queue. Pongs are matched if they // reference the ping we're about to send. - rm := t.pending(toid, toaddr.IP, v4wire.PongPacket, func(p v4wire.Packet) (matched bool, requestDone bool) { + rm := t.pending(toid, toaddr.IP, toaddr.Port, v4wire.PongPacket, func(p v4wire.Packet) (matched bool, requestDone bool) { matched = bytes.Equal(p.(*v4wire.Pong).ReplyTok, hash) if matched && callback != nil { callback() @@ -301,6 +346,7 @@ func (t *UDPv4) newRandomLookup(ctx context.Context) *lookup { func (t *UDPv4) newLookup(ctx context.Context, targetKey *ecdsa.PublicKey) *lookup { targetKeyEnc := v4wire.EncodePubkey(targetKey) target := enode.PubkeyEncoded(targetKeyEnc).ID() + it := newLookup(ctx, t.tab, target, func(n *node) ([]*node, error) { return t.findnode(n.ID(), n.addr(), targetKeyEnc) }) @@ -322,7 +368,7 @@ func (t *UDPv4) findnode(toid enode.ID, toaddr *net.UDPAddr, target v4wire.Pubke // active until enough nodes have been received. nodes := make([]*node, 0, bucketSize) nreceived := 0 - rm := t.pending(toid, toaddr.IP, v4wire.NeighborsPacket, func(r v4wire.Packet) (matched bool, requestDone bool) { + rm := t.pending(toid, toaddr.IP, toaddr.Port, v4wire.NeighborsPacket, func(r v4wire.Packet) (matched bool, requestDone bool) { reply := r.(*v4wire.Neighbors) for _, rn := range reply.Nodes { nreceived++ @@ -374,7 +420,7 @@ func (t *UDPv4) RequestENR(n *enode.Node) (*enode.Node, error) { // Add a matcher for the reply to the pending reply queue. Responses are matched if // they reference the request we're about to send. - rm := t.pending(n.ID(), addr.IP, v4wire.ENRResponsePacket, func(r v4wire.Packet) (matched bool, requestDone bool) { + rm := t.pending(n.ID(), addr.IP, addr.Port, v4wire.ENRResponsePacket, func(r v4wire.Packet) (matched bool, requestDone bool) { matched = bytes.Equal(r.(*v4wire.ENRResponse).ReplyTok, hash) return matched, matched }) @@ -406,9 +452,10 @@ func (t *UDPv4) RequestENR(n *enode.Node) (*enode.Node, error) { // pending adds a reply matcher to the pending reply queue. // see the documentation of type replyMatcher for a detailed explanation. -func (t *UDPv4) pending(id enode.ID, ip net.IP, ptype byte, callback replyMatchFunc) *replyMatcher { +func (t *UDPv4) pending(id enode.ID, ip net.IP, port int, ptype byte, callback replyMatchFunc) *replyMatcher { ch := make(chan error, 1) - p := &replyMatcher{from: id, ip: ip, ptype: ptype, callback: callback, errc: ch} + p := &replyMatcher{from: id, ip: ip, port: port, ptype: ptype, callback: callback, errc: ch} + select { case t.addReplyMatcher <- p: // loop will handle it @@ -420,10 +467,10 @@ func (t *UDPv4) pending(id enode.ID, ip net.IP, ptype byte, callback replyMatchF // handleReply dispatches a reply packet, invoking reply matchers. It returns // whether any matcher considered the packet acceptable. -func (t *UDPv4) handleReply(from enode.ID, fromIP net.IP, req v4wire.Packet) bool { +func (t *UDPv4) handleReply(from enode.ID, fromIP net.IP, port int, req v4wire.Packet) bool { matched := make(chan bool, 1) select { - case t.gotreply <- reply{from, fromIP, req, matched}: + case t.gotreply <- reply{from, fromIP, port, req, matched}: // loop will handle it return <-matched case <-t.closeCtx.Done(): @@ -439,89 +486,208 @@ func (t *UDPv4) loop() { var ( plist = list.New() - timeout = time.NewTimer(0) - nextTimeout *replyMatcher // head of plist when timeout was last reset - contTimeouts = 0 // number of continuous timeouts to do NTP checks + mutex = sync.Mutex{} + contTimeouts = 0 // number of continuous timeouts to do NTP checks ntpWarnTime = time.Unix(0, 0) ) - <-timeout.C // ignore first timeout - defer timeout.Stop() - resetTimeout := func() { - if plist.Front() == nil || nextTimeout == plist.Front().Value { - return - } - // Start the timer so it fires when the next pending reply has expired. - now := time.Now() - for el := plist.Front(); el != nil; el = el.Next() { - nextTimeout = el.Value.(*replyMatcher) - if dist := nextTimeout.deadline.Sub(now); dist < 2*t.replyTimeout { - timeout.Reset(dist) + listUpdate := make(chan *list.Element, 10) + + go func() { + var ( + timeout = time.NewTimer(0) + nextTimeout *replyMatcher // head of plist when timeout was last reset + ) + + <-timeout.C // ignore first timeout + defer timeout.Stop() + + resetTimeout := func() { + mutex.Lock() + defer mutex.Unlock() + + if plist.Front() == nil || nextTimeout == plist.Front().Value { return } - // Remove pending replies whose deadline is too far in the - // future. These can occur if the system clock jumped - // backwards after the deadline was assigned. - nextTimeout.errc <- errClockWarp - plist.Remove(el) + + // Start the timer so it fires when the next pending reply has expired. + now := time.Now() + for el := plist.Front(); el != nil; el = el.Next() { + nextTimeout = el.Value.(*replyMatcher) + if dist := nextTimeout.deadline.Sub(now); dist < 2*t.replyTimeout { + timeout.Reset(dist) + return + } + // Remove pending replies whose deadline is too far in the + // future. These can occur if the system clock jumped + // backwards after the deadline was assigned. + nextTimeout.errc <- errClockWarp + plist.Remove(el) + } + + nextTimeout = nil + timeout.Stop() } - nextTimeout = nil - timeout.Stop() - } + + for { + select { + case <-t.closeCtx.Done(): + return + + case now := <-timeout.C: + func() { + mutex.Lock() + defer mutex.Unlock() + + nextTimeout = nil + // Notify and remove callbacks whose deadline is in the past. + for el := plist.Front(); el != nil; el = el.Next() { + p := el.Value.(*replyMatcher) + if !now.Before(p.deadline) { + p.errc <- errTimeout + plist.Remove(el) + contTimeouts++ + } + } + // If we've accumulated too many timeouts, do an NTP time sync check + if contTimeouts > ntpFailureThreshold { + if time.Since(ntpWarnTime) >= ntpWarningCooldown { + ntpWarnTime = time.Now() + go checkClockDrift() + } + contTimeouts = 0 + } + }() + + resetTimeout() + + case el := <-listUpdate: + if el == nil { + return + } + + resetTimeout() + } + } + }() for { - resetTimeout() - select { case <-t.closeCtx.Done(): - for el := plist.Front(); el != nil; el = el.Next() { - el.Value.(*replyMatcher).errc <- errClosed - } + listUpdate <- nil + func() { + mutex.Lock() + defer mutex.Unlock() + for el := plist.Front(); el != nil; el = el.Next() { + el.Value.(*replyMatcher).errc <- errClosed + } + }() return case p := <-t.addReplyMatcher: - p.deadline = time.Now().Add(t.replyTimeout) - plist.PushBack(p) + func() { + mutex.Lock() + defer mutex.Unlock() + p.deadline = time.Now().Add(t.replyTimeout) + listUpdate <- plist.PushBack(p) + }() case r := <-t.gotreply: - var matched bool // whether any replyMatcher considered the reply acceptable. + + type matchCandidate struct { + el *list.Element + errc chan error + } + + var matchCandidates []matchCandidate + + mutex.Lock() for el := plist.Front(); el != nil; el = el.Next() { p := el.Value.(*replyMatcher) if p.from == r.from && p.ptype == r.data.Kind() && p.ip.Equal(r.ip) { + candidate := matchCandidate{el, p.errc} + p.errc = make(chan error, 1) + matchCandidates = append(matchCandidates, candidate) + } + } + mutex.Unlock() + + if len(matchCandidates) == 0 { + // if there are no matched candidates try again matching against + // ip & port to handle node key changes + mutex.Lock() + for el := plist.Front(); el != nil; el = el.Next() { + p := el.Value.(*replyMatcher) + if p.ptype == r.data.Kind() && p.ip.Equal(r.ip) && p.port == r.port { + candidate := matchCandidate{el, p.errc} + p.errc = make(chan error, 1) + matchCandidates = append(matchCandidates, candidate) + } + } + mutex.Unlock() + + if len(matchCandidates) == 0 { + r.matched <- false + } + } + + go func(r reply) { + var matched bool // whether any replyMatcher considered the reply acceptable. + for _, candidate := range matchCandidates { + p := candidate.el.Value.(*replyMatcher) ok, requestDone := p.callback(r.data) matched = matched || ok p.reply = r.data + // Remove the matcher if callback indicates that all replies have been received. if requestDone { - p.errc <- nil - plist.Remove(el) + mutex.Lock() + plist.Remove(candidate.el) + mutex.Unlock() + candidate.errc <- nil + listUpdate <- candidate.el + } else { + select { + case err := <-p.errc: + candidate.errc <- err + default: + p.errc = candidate.errc + } } - // Reset the continuous timeout counter (time drift detection) - contTimeouts = 0 } - } - r.matched <- matched - case now := <-timeout.C: - nextTimeout = nil + r.matched <- matched + }(r) - // Notify and remove callbacks whose deadline is in the past. - for el := plist.Front(); el != nil; el = el.Next() { - p := el.Value.(*replyMatcher) - if now.After(p.deadline) || now.Equal(p.deadline) { - p.errc <- errTimeout - plist.Remove(el) - contTimeouts++ + // Reset the continuous timeout counter (time drift detection) + contTimeouts = 0 + case key := <-t.gotkey: + go func() { + if key, err := v4wire.DecodePubkey(crypto.S256(), key); err == nil { + nodes := t.LookupPubkey(key) + mutex.Lock() + defer mutex.Unlock() + + for _, n := range nodes { + t.unsolicitedNodes.Add(n.ID(), n) + } } - } - // If we've accumulated too many timeouts, do an NTP time sync check - if contTimeouts > ntpFailureThreshold { - if time.Since(ntpWarnTime) >= ntpWarningCooldown { - ntpWarnTime = time.Now() - go checkClockDrift() + }() + + case nodes := <-t.gotnodes: + + func() { + mutex.Lock() + defer mutex.Unlock() + for _, rn := range nodes.nodes { + n, err := t.nodeFromRPC(nodes.addr, rn) + if err != nil { + t.log.Trace("Invalid neighbor node received", "ip", rn.IP, "addr", nodes.addr, "err", err) + continue + } + t.unsolicitedNodes.Add(n.ID(), &n.Node) } - contTimeouts = 0 - } + }() } } } @@ -545,10 +711,13 @@ func (t *UDPv4) write(toaddr *net.UDPAddr, toid enode.ID, what string, packet [] func (t *UDPv4) readLoop(unhandled chan<- ReadPacket) { defer t.wg.Done() defer debug.LogPanic() + if unhandled != nil { defer close(unhandled) } + unknownKeys, _ := lru.New[v4wire.Pubkey, any](100) + buf := make([]byte, maxPacketSize) for { nbytes, from, err := t.conn.ReadFromUDP(buf) @@ -563,11 +732,35 @@ func (t *UDPv4) readLoop(unhandled chan<- ReadPacket) { } return } - if t.handlePacket(from, buf[:nbytes]) != nil && unhandled != nil { - select { - case unhandled <- ReadPacket{buf[:nbytes], from}: - default: - } + if err := t.handlePacket(from, buf[:nbytes]); err != nil { + func() { + switch { + case errors.Is(err, errUnsolicitedReply): + if packet, fromKey, _, err := v4wire.Decode(buf[:nbytes]); err == nil { + switch packet.Kind() { + case v4wire.PongPacket: + if _, ok := unknownKeys.Get(fromKey); !ok { + fromId := enode.PubkeyEncoded(fromKey).ID() + t.log.Trace("Unsolicited packet", "type", packet.Name(), "from", fromId, "addr", from) + unknownKeys.Add(fromKey, nil) + t.gotkey <- fromKey + } + case v4wire.NeighborsPacket: + neighbors := packet.(*v4wire.Neighbors) + t.gotnodes <- nodes{from, neighbors.Nodes} + default: + fromId := enode.PubkeyEncoded(fromKey).ID() + t.log.Trace("Unsolicited packet", "type", packet.Name(), "from", fromId, "addr", from) + } + } else { + t.log.Trace("Unsolicited packet handling failed", "addr", from, "err", err) + } + default: + if unhandled != nil { + unhandled <- ReadPacket{buf[:nbytes], from} + } + } + }() } } } @@ -580,6 +773,7 @@ func (t *UDPv4) handlePacket(from *net.UDPAddr, buf []byte) error { } packet := t.wrapPacket(rawpacket) fromID := enode.PubkeyEncoded(fromKey).ID() + if packet.preverify != nil { err = packet.preverify(packet, from, fromID, fromKey) } @@ -677,9 +871,15 @@ func (t *UDPv4) verifyPing(h *packetHandlerV4, from *net.UDPAddr, fromID enode.I senderKey, err := v4wire.DecodePubkey(crypto.S256(), fromKey) if err != nil { + t.mutex.Lock() + t.errors[err.Error()] = t.errors[err.Error()] + 1 + t.mutex.Unlock() return err } if v4wire.Expired(req.Expiration) { + t.mutex.Lock() + t.errors[errExpiredStr] = t.errors[errExpiredStr] + 1 + t.mutex.Unlock() return errExpired } h.senderKey = senderKey @@ -719,9 +919,15 @@ func (t *UDPv4) verifyPong(h *packetHandlerV4, from *net.UDPAddr, fromID enode.I req := h.Packet.(*v4wire.Pong) if v4wire.Expired(req.Expiration) { + t.mutex.Lock() + t.errors[errExpiredStr] = t.errors[errExpiredStr] + 1 + t.mutex.Unlock() return errExpired } - if !t.handleReply(fromID, from.IP, req) { + if !t.handleReply(fromID, from.IP, from.Port, req) { + t.mutex.Lock() + t.errors[errUnsolicitedReplyStr] = t.errors[errUnsolicitedReplyStr] + 1 + t.mutex.Unlock() return errUnsolicitedReply } t.localNode.UDPEndpointStatement(from, &net.UDPAddr{IP: req.To.IP, Port: int(req.To.UDP)}) @@ -735,6 +941,9 @@ func (t *UDPv4) verifyFindnode(h *packetHandlerV4, from *net.UDPAddr, fromID eno req := h.Packet.(*v4wire.Findnode) if v4wire.Expired(req.Expiration) { + t.mutex.Lock() + t.errors[errExpiredStr] = t.errors[errExpiredStr] + 1 + t.mutex.Unlock() return errExpired } if !t.checkBond(fromID, from.IP) { @@ -744,6 +953,9 @@ func (t *UDPv4) verifyFindnode(h *packetHandlerV4, from *net.UDPAddr, fromID eno // and UDP port of the target as the source address. The recipient of the findnode // packet would then send a neighbors packet (which is a much bigger packet than // findnode) to the victim. + t.mutex.Lock() + t.errors[errUnknownNodeStr] = t.errors[errUnknownNodeStr] + 1 + t.mutex.Unlock() return errUnknownNode } return nil @@ -781,9 +993,15 @@ func (t *UDPv4) verifyNeighbors(h *packetHandlerV4, from *net.UDPAddr, fromID en req := h.Packet.(*v4wire.Neighbors) if v4wire.Expired(req.Expiration) { + t.mutex.Lock() + t.errors[errExpiredStr] = t.errors[errExpiredStr] + 1 + t.mutex.Unlock() return errExpired } - if !t.handleReply(fromID, from.IP, h.Packet) { + if !t.handleReply(fromID, from.IP, from.Port, h.Packet) { + t.mutex.Lock() + t.errors[errUnsolicitedReplyStr] = t.errors[errUnsolicitedReplyStr] + 1 + t.mutex.Unlock() return errUnsolicitedReply } return nil @@ -795,26 +1013,40 @@ func (t *UDPv4) verifyENRRequest(h *packetHandlerV4, from *net.UDPAddr, fromID e req := h.Packet.(*v4wire.ENRRequest) if v4wire.Expired(req.Expiration) { + t.mutex.Lock() + t.errors[errExpiredStr] = t.errors[errExpiredStr] + 1 + t.mutex.Unlock() return errExpired } if !t.checkBond(fromID, from.IP) { + t.mutex.Lock() + t.errors[errUnknownNodeStr] = t.errors[errUnknownNodeStr] + 1 + t.mutex.Unlock() return errUnknownNode } return nil } func (t *UDPv4) handleENRRequest(h *packetHandlerV4, from *net.UDPAddr, fromID enode.ID, mac []byte) { - //nolint:errcheck - t.send(from, fromID, &v4wire.ENRResponse{ + _, err := t.send(from, fromID, &v4wire.ENRResponse{ ReplyTok: mac, Record: *t.localNode.Node().Record(), }) + + if err != nil { + t.mutex.Lock() + t.errors[err.Error()] = t.errors[err.Error()] + 1 + t.mutex.Unlock() + } } // ENRRESPONSE/v4 func (t *UDPv4) verifyENRResponse(h *packetHandlerV4, from *net.UDPAddr, fromID enode.ID, fromKey v4wire.Pubkey) error { - if !t.handleReply(fromID, from.IP, h.Packet) { + if !t.handleReply(fromID, from.IP, from.Port, h.Packet) { + t.mutex.Lock() + t.errors[errUnsolicitedReplyStr] = t.errors[errUnsolicitedReplyStr] + 1 + t.mutex.Unlock() return errUnsolicitedReply } return nil diff --git a/p2p/discover/v4_udp_test.go b/p2p/discover/v4_udp_test.go index 289bd2715..5e2a9df92 100644 --- a/p2p/discover/v4_udp_test.go +++ b/p2p/discover/v4_udp_test.go @@ -87,7 +87,7 @@ func newUDPTestContext(ctx context.Context, t *testing.T, logger log.Logger) *ud panic(err) } ln := enode.NewLocalNode(test.db, test.localkey, logger) - test.udp, err = ListenV4(ctx, test.pipe, ln, Config{ + test.udp, err = ListenV4(ctx, "test", test.pipe, ln, Config{ PrivateKey: test.localkey, Log: testlog.Logger(t, log.LvlError), @@ -237,7 +237,7 @@ func TestUDPv4_responseTimeouts(t *testing.T) { p.errc = nilErr test.udp.addReplyMatcher <- p time.AfterFunc(randomDuration(60*time.Millisecond), func() { - if !test.udp.handleReply(p.from, p.ip, testPacket(p.ptype)) { + if !test.udp.handleReply(p.from, p.ip, p.port, testPacket(p.ptype)) { t.Logf("not matched: %v", p) } }) @@ -643,7 +643,7 @@ func startLocalhostV4(ctx context.Context, t *testing.T, cfg Config, logger log. realaddr := socket.LocalAddr().(*net.UDPAddr) ln.SetStaticIP(realaddr.IP) ln.SetFallbackUDP(realaddr.Port) - udp, err := ListenV4(ctx, socket, ln, cfg) + udp, err := ListenV4(ctx, "test", socket, ln, cfg) if err != nil { t.Fatal(err) } diff --git a/p2p/discover/v5_udp.go b/p2p/discover/v5_udp.go index 686bd2678..d66d44e36 100644 --- a/p2p/discover/v5_udp.go +++ b/p2p/discover/v5_udp.go @@ -97,6 +97,7 @@ type UDPv5 struct { closeCtx context.Context cancelCloseCtx context.CancelFunc wg sync.WaitGroup + errors map[string]uint } // TalkRequestHandler callback processes a talk request and optionally returns a reply @@ -125,8 +126,8 @@ type callTimeout struct { } // ListenV5 listens on the given connection. -func ListenV5(ctx context.Context, conn UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv5, error) { - t, err := newUDPv5(ctx, conn, ln, cfg) +func ListenV5(ctx context.Context, protocol string, conn UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv5, error) { + t, err := newUDPv5(ctx, protocol, conn, ln, cfg) if err != nil { return nil, err } @@ -138,7 +139,7 @@ func ListenV5(ctx context.Context, conn UDPConn, ln *enode.LocalNode, cfg Config } // newUDPv5 creates a UDPv5 transport, but doesn't start any goroutines. -func newUDPv5(ctx context.Context, conn UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv5, error) { +func newUDPv5(ctx context.Context, protocol string, conn UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv5, error) { closeCtx, cancelCloseCtx := context.WithCancel(ctx) cfg = cfg.withDefaults(respTimeoutV5) t := &UDPv5{ @@ -167,8 +168,9 @@ func newUDPv5(ctx context.Context, conn UDPConn, ln *enode.LocalNode, cfg Config // shutdown closeCtx: closeCtx, cancelCloseCtx: cancelCloseCtx, + errors: map[string]uint{}, } - tab, err := newTable(t, t.db, cfg.Bootnodes, cfg.TableRevalidateInterval, cfg.Log) + tab, err := newTable(t, protocol, t.db, cfg.Bootnodes, cfg.TableRevalidateInterval, cfg.Log) if err != nil { return nil, err } @@ -181,6 +183,18 @@ func (t *UDPv5) Self() *enode.Node { return t.localNode.Node() } +func (t *UDPv5) Version() string { + return "v5" +} + +func (t *UDPv5) Errors() map[string]uint { + return t.errors +} + +func (t *UDPv5) LenUnsolicited() int { + return 0 +} + // Close shuts down packet processing. func (t *UDPv5) Close() { t.closeOnce.Do(func() { diff --git a/p2p/discover/v5_udp_test.go b/p2p/discover/v5_udp_test.go index 09c8a2110..5ca080e04 100644 --- a/p2p/discover/v5_udp_test.go +++ b/p2p/discover/v5_udp_test.go @@ -67,7 +67,7 @@ func startLocalhostV5(t *testing.T, cfg Config, logger log.Logger) *UDPv5 { ln.SetFallbackUDP(realaddr.Port) ctx := context.Background() ctx = disableLookupSlowdown(ctx) - udp, err := ListenV5(ctx, socket, ln, cfg) + udp, err := ListenV5(ctx, "test", socket, ln, cfg) if err != nil { t.Fatal(err) } @@ -581,7 +581,7 @@ func newUDPV5TestContext(ctx context.Context, t *testing.T, logger log.Logger) * ln := enode.NewLocalNode(test.db, test.localkey, logger) ln.SetStaticIP(net.IP{10, 0, 0, 1}) ln.Set(enr.UDP(30303)) - test.udp, err = ListenV5(ctx, test.pipe, ln, Config{ + test.udp, err = ListenV5(ctx, "test", test.pipe, ln, Config{ PrivateKey: test.localkey, Log: testlog.Logger(t, log.LvlError), ValidSchemes: enode.ValidSchemesForTesting, diff --git a/p2p/peer.go b/p2p/peer.go index 0adf711d7..43767f427 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -223,7 +223,9 @@ func (p *Peer) Inbound() bool { } func newPeer(logger log.Logger, conn *conn, protocols []Protocol, pubkey [64]byte, metricsEnabled bool) *Peer { - protomap := matchProtocols(protocols, conn.caps, conn) + log := logger.New("id", conn.node.ID(), "conn", conn.flags) + + protomap := matchProtocols(protocols, conn.caps, conn, log) p := &Peer{ rw: conn, running: protomap, @@ -232,7 +234,7 @@ func newPeer(logger log.Logger, conn *conn, protocols []Protocol, pubkey [64]byt protoErr: make(chan *PeerError, len(protomap)+1), // protocols + pingLoop closed: make(chan struct{}), pingRecv: make(chan struct{}, 16), - log: logger.New("id", conn.node.ID(), "conn", conn.flags), + log: log, pubkey: pubkey, metricsEnabled: metricsEnabled, CapBytesIn: make(map[string]uint64), @@ -438,7 +440,7 @@ func countMatchingProtocols(protocols []Protocol, caps []Cap) int { } // matchProtocols creates structures for matching named subprotocols. -func matchProtocols(protocols []Protocol, caps []Cap, rw MsgReadWriter) map[string]*protoRW { +func matchProtocols(protocols []Protocol, caps []Cap, rw MsgReadWriter, logger log.Logger) map[string]*protoRW { sort.Sort(capsByNameAndVersion(caps)) offset := baseProtocolLength result := make(map[string]*protoRW) @@ -452,7 +454,7 @@ outer: offset -= old.Length } // Assign the new match - result[cap.Name] = &protoRW{Protocol: proto, offset: offset, in: make(chan Msg), w: rw} + result[cap.Name] = &protoRW{Protocol: proto, offset: offset, in: make(chan Msg), w: rw, logger: logger} offset += proto.Length continue outer @@ -506,8 +508,11 @@ type protoRW struct { werr chan<- error // for write results offset uint64 w MsgWriter + logger log.Logger } +var traceMsg = false + func (rw *protoRW) WriteMsg(msg Msg) (err error) { if msg.Code >= rw.Length { return NewPeerError(PeerErrorInvalidMessageCode, DiscProtocolError, nil, fmt.Sprintf("not handled code=%d", msg.Code)) @@ -520,6 +525,15 @@ func (rw *protoRW) WriteMsg(msg Msg) (err error) { select { case <-rw.wstart: err = rw.w.WriteMsg(msg) + + if traceMsg { + if err != nil { + rw.logger.Trace("Write failed", "cap", rw.cap(), "msg", msg.Code-rw.offset, "size", msg.Size, "err", err) + } else { + rw.logger.Trace("Wrote", "cap", rw.cap(), "msg", msg.Code-rw.offset, "size", msg.Size) + } + } + // Report write status back to Peer.run. It will initiate // shutdown if the error is non-nil and unblock the next write // otherwise. The calling protocol code should exit for errors @@ -536,6 +550,9 @@ func (rw *protoRW) ReadMsg() (Msg, error) { select { case msg := <-rw.in: msg.Code -= rw.offset + if traceMsg { + rw.logger.Trace("Read", "cap", rw.cap(), "msg", msg.Code, "size", msg.Size) + } return msg, nil case <-rw.closed: return Msg{}, io.EOF diff --git a/p2p/peer_test.go b/p2p/peer_test.go index 45b0e89f6..c84097645 100644 --- a/p2p/peer_test.go +++ b/p2p/peer_test.go @@ -331,7 +331,7 @@ func TestMatchProtocols(t *testing.T) { } for i, tt := range tests { - result := matchProtocols(tt.Local, tt.Remote, nil) + result := matchProtocols(tt.Local, tt.Remote, nil, log.Root()) if len(result) != len(tt.Match) { t.Errorf("test %d: negotiation mismatch: have %v, want %v", i, len(result), len(tt.Match)) continue diff --git a/p2p/server.go b/p2p/server.go index 4a32e45b3..7ba83014a 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -27,6 +27,7 @@ import ( "net" "sort" "strconv" + "strings" "sync" "sync/atomic" "time" @@ -68,6 +69,8 @@ const ( // Maximum amount of time allowed for writing a complete message. frameWriteTimeout = 20 * time.Second + + serverStatsLogInterval = 60 * time.Second ) var errServerStopped = errors.New("server stopped") @@ -232,6 +235,7 @@ type Server struct { // State of run loop and listenLoop. inboundHistory expHeap + errors map[string]uint } type peerOpFunc func(map[enode.ID]*Peer) @@ -654,7 +658,7 @@ func (srv *Server) setupDiscovery(ctx context.Context) error { Unhandled: unhandled, Log: srv.logger, } - ntab, err := discover.ListenV4(ctx, conn, srv.localnode, cfg) + ntab, err := discover.ListenV4(ctx, fmt.Sprint(srv.Config.Protocols[0].Version), conn, srv.localnode, cfg) if err != nil { return err } @@ -672,9 +676,9 @@ func (srv *Server) setupDiscovery(ctx context.Context) error { } var err error if sconn != nil { - srv.DiscV5, err = discover.ListenV5(ctx, sconn, srv.localnode, cfg) + srv.DiscV5, err = discover.ListenV5(ctx, fmt.Sprint(srv.Config.Protocols[0].Version), sconn, srv.localnode, cfg) } else { - srv.DiscV5, err = discover.ListenV5(ctx, conn, srv.localnode, cfg) + srv.DiscV5, err = discover.ListenV5(ctx, fmt.Sprint(srv.Config.Protocols[0].Version), conn, srv.localnode, cfg) } if err != nil { return err @@ -792,6 +796,9 @@ func (srv *Server) run() { trusted[n.ID()] = true } + logTimer := time.NewTicker(serverStatsLogInterval) + defer logTimer.Stop() + running: for { select { @@ -855,6 +862,18 @@ running: if pd.Inbound() { inboundCount-- } + case <-logTimer.C: + vals := []interface{}{"protocol", srv.Config.Protocols[0].Version, "peers", len(peers), "trusted", len(trusted), "inbound", inboundCount} + + func() { + srv.lock.Lock() + defer srv.lock.Unlock() + for err, count := range srv.errors { + vals = append(vals, err, count) + } + }() + + srv.logger.Debug("[p2p] Server", vals...) } } @@ -906,6 +925,8 @@ func (srv *Server) listenLoop(ctx context.Context) { // The slots limit accepts of new connections. slots := semaphore.NewWeighted(int64(srv.MaxPendingPeers)) + srv.errors = map[string]uint{} + // Wait for slots to be returned on exit. This ensures all connection goroutines // are down before listenLoop returns. defer func() { @@ -1008,10 +1029,25 @@ func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *enode.Node) return err } +func cleanError(err string) string { + switch { + case strings.HasSuffix(err, "i/o timeout"): + return "i/o timeout" + case strings.HasSuffix(err, "closed by the remote host."): + return "closed by remote" + case strings.HasSuffix(err, "connection reset by peer"): + return "closed by remote" + default: + return err + } +} + func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *enode.Node) error { // Prevent leftover pending conns from entering the handshake. srv.lock.Lock() running := srv.running + // reset error counts + srv.errors = map[string]uint{} srv.lock.Unlock() if !running { return errServerStopped @@ -1031,6 +1067,10 @@ func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *enode.Node) erro // Run the RLPx handshake. remotePubkey, err := c.doEncHandshake(srv.PrivateKey) if err != nil { + errStr := cleanError(err.Error()) + srv.lock.Lock() + srv.errors[errStr] = srv.errors[errStr] + 1 + srv.lock.Unlock() srv.logger.Trace("Failed RLPx handshake", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err) return err } @@ -1050,6 +1090,10 @@ func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *enode.Node) erro // Run the capability negotiation handshake. phs, err := c.doProtoHandshake(srv.ourHandshake) if err != nil { + errStr := cleanError(err.Error()) + srv.lock.Lock() + srv.errors[errStr] = srv.errors[errStr] + 1 + srv.lock.Unlock() clog.Trace("Failed p2p handshake", "err", err) return err } diff --git a/turbo/stages/headerdownload/header_algos.go b/turbo/stages/headerdownload/header_algos.go index e7b8a29e8..c98b0a969 100644 --- a/turbo/stages/headerdownload/header_algos.go +++ b/turbo/stages/headerdownload/header_algos.go @@ -8,13 +8,14 @@ import ( "encoding/base64" "errors" "fmt" - "github.com/ledgerwatch/erigon-lib/kv/dbutils" "io" "math/big" "sort" "strings" "time" + "github.com/ledgerwatch/erigon-lib/kv/dbutils" + libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/etl" "github.com/ledgerwatch/erigon-lib/kv" @@ -432,6 +433,8 @@ func (hd *HeaderDownload) requestMoreHeadersForPOS(currentTime time.Time) (timeo return } + hd.logger.Debug("[downloader] Request header", "numer", anchor.blockHeight-1, "length", 192) + // Request ancestors request = &HeaderRequest{ Anchor: anchor, @@ -482,7 +485,7 @@ func (hd *HeaderDownload) UpdateRetryTime(req *HeaderRequest, currentTime time.T func (hd *HeaderDownload) RequestSkeleton() *HeaderRequest { hd.lock.RLock() defer hd.lock.RUnlock() - hd.logger.Debug("[downloader] Request skeleton", "anchors", len(hd.anchors), "highestInDb", hd.highestInDb) + var stride uint64 if hd.initialCycle { stride = 192 @@ -495,6 +498,7 @@ func (hd *HeaderDownload) RequestSkeleton() *HeaderRequest { } else { from-- } + return &HeaderRequest{Number: from, Length: length, Skip: stride, Reverse: false} }