From 04498180dcbb66ec5ceb96662cf0ebe73d8c60b3 Mon Sep 17 00:00:00 2001 From: battlmonstr Date: Thu, 11 Jan 2024 16:04:46 +0100 Subject: [PATCH] p2p/discv4: revert gotreply handler change from #8661 (#9119) (#9195) The handler had race conditions in the candidates processing goroutine. --- p2p/discover/v4_udp.go | 79 ++++++++++-------------------------------- 1 file changed, 18 insertions(+), 61 deletions(-) diff --git a/p2p/discover/v4_udp.go b/p2p/discover/v4_udp.go index bc665d0a1..deb4427a7 100644 --- a/p2p/discover/v4_udp.go +++ b/p2p/discover/v4_udp.go @@ -29,12 +29,13 @@ import ( "time" lru "github.com/hashicorp/golang-lru/v2" + "github.com/ledgerwatch/log/v3" + "github.com/ledgerwatch/erigon/common/debug" "github.com/ledgerwatch/erigon/crypto" "github.com/ledgerwatch/erigon/p2p/discover/v4wire" "github.com/ledgerwatch/erigon/p2p/enode" "github.com/ledgerwatch/erigon/p2p/netutil" - "github.com/ledgerwatch/log/v3" ) // Errors @@ -610,74 +611,30 @@ func (t *UDPv4) loop() { }() case r := <-t.gotreply: - - type matchCandidate struct { - el *list.Element - errc chan error - } - - var matchCandidates []matchCandidate - - mutex.Lock() - for el := plist.Front(); el != nil; el = el.Next() { - p := el.Value.(*replyMatcher) - if p.from == r.from && p.ptype == r.data.Kind() && p.ip.Equal(r.ip) { - candidate := matchCandidate{el, p.errc} - p.errc = make(chan error, 1) - matchCandidates = append(matchCandidates, candidate) - } - } - mutex.Unlock() - - if len(matchCandidates) == 0 { - // if there are no matched candidates try again matching against - // ip & port to handle node key changes + func() { mutex.Lock() + defer mutex.Unlock() + + var matched bool // whether any replyMatcher considered the reply acceptable. for el := plist.Front(); el != nil; el = el.Next() { p := el.Value.(*replyMatcher) - if p.ptype == r.data.Kind() && p.ip.Equal(r.ip) && p.port == r.port { - candidate := matchCandidate{el, p.errc} - p.errc = make(chan error, 1) - matchCandidates = append(matchCandidates, candidate) - } - } - mutex.Unlock() - - if len(matchCandidates) == 0 { - r.matched <- false - } - } - - go func(r reply) { - var matched bool // whether any replyMatcher considered the reply acceptable. - for _, candidate := range matchCandidates { - p := candidate.el.Value.(*replyMatcher) - ok, requestDone := p.callback(r.data) - matched = matched || ok - p.reply = r.data - - // Remove the matcher if callback indicates that all replies have been received. - if requestDone { - mutex.Lock() - plist.Remove(candidate.el) - mutex.Unlock() - candidate.errc <- nil - listUpdate <- candidate.el - } else { - select { - case err := <-p.errc: - candidate.errc <- err - default: - p.errc = candidate.errc + if (p.ptype == r.data.Kind()) && p.ip.Equal(r.ip) && (p.port == r.port) { + ok, requestDone := p.callback(r.data) + matched = matched || ok + p.reply = r.data + // Remove the matcher if callback indicates that all replies have been received. + if requestDone { + p.errc <- nil + plist.Remove(el) + listUpdate <- el } + // Reset the continuous timeout counter (time drift detection) + contTimeouts = 0 } } - r.matched <- matched - }(r) + }() - // Reset the continuous timeout counter (time drift detection) - contTimeouts = 0 case key := <-t.gotkey: go func() { if key, err := v4wire.DecodePubkey(crypto.S256(), key); err == nil {