p2p/discv4: revert gotreply handler change from #8661 (#9119) (#9195)

The handler had race conditions in the candidates processing goroutine.
This commit is contained in:
battlmonstr 2024-01-11 16:04:46 +01:00 committed by GitHub
parent 9a9808f715
commit 04498180dc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -29,12 +29,13 @@ import (
"time" "time"
lru "github.com/hashicorp/golang-lru/v2" lru "github.com/hashicorp/golang-lru/v2"
"github.com/ledgerwatch/log/v3"
"github.com/ledgerwatch/erigon/common/debug" "github.com/ledgerwatch/erigon/common/debug"
"github.com/ledgerwatch/erigon/crypto" "github.com/ledgerwatch/erigon/crypto"
"github.com/ledgerwatch/erigon/p2p/discover/v4wire" "github.com/ledgerwatch/erigon/p2p/discover/v4wire"
"github.com/ledgerwatch/erigon/p2p/enode" "github.com/ledgerwatch/erigon/p2p/enode"
"github.com/ledgerwatch/erigon/p2p/netutil" "github.com/ledgerwatch/erigon/p2p/netutil"
"github.com/ledgerwatch/log/v3"
) )
// Errors // Errors
@ -610,74 +611,30 @@ func (t *UDPv4) loop() {
}() }()
case r := <-t.gotreply: case r := <-t.gotreply:
func() {
type matchCandidate struct {
el *list.Element
errc chan error
}
var matchCandidates []matchCandidate
mutex.Lock()
for el := plist.Front(); el != nil; el = el.Next() {
p := el.Value.(*replyMatcher)
if p.from == r.from && p.ptype == r.data.Kind() && p.ip.Equal(r.ip) {
candidate := matchCandidate{el, p.errc}
p.errc = make(chan error, 1)
matchCandidates = append(matchCandidates, candidate)
}
}
mutex.Unlock()
if len(matchCandidates) == 0 {
// if there are no matched candidates try again matching against
// ip & port to handle node key changes
mutex.Lock() mutex.Lock()
defer mutex.Unlock()
var matched bool // whether any replyMatcher considered the reply acceptable.
for el := plist.Front(); el != nil; el = el.Next() { for el := plist.Front(); el != nil; el = el.Next() {
p := el.Value.(*replyMatcher) p := el.Value.(*replyMatcher)
if p.ptype == r.data.Kind() && p.ip.Equal(r.ip) && p.port == r.port { if (p.ptype == r.data.Kind()) && p.ip.Equal(r.ip) && (p.port == r.port) {
candidate := matchCandidate{el, p.errc} ok, requestDone := p.callback(r.data)
p.errc = make(chan error, 1) matched = matched || ok
matchCandidates = append(matchCandidates, candidate) p.reply = r.data
} // Remove the matcher if callback indicates that all replies have been received.
} if requestDone {
mutex.Unlock() p.errc <- nil
plist.Remove(el)
if len(matchCandidates) == 0 { listUpdate <- el
r.matched <- false
}
}
go func(r reply) {
var matched bool // whether any replyMatcher considered the reply acceptable.
for _, candidate := range matchCandidates {
p := candidate.el.Value.(*replyMatcher)
ok, requestDone := p.callback(r.data)
matched = matched || ok
p.reply = r.data
// Remove the matcher if callback indicates that all replies have been received.
if requestDone {
mutex.Lock()
plist.Remove(candidate.el)
mutex.Unlock()
candidate.errc <- nil
listUpdate <- candidate.el
} else {
select {
case err := <-p.errc:
candidate.errc <- err
default:
p.errc = candidate.errc
} }
// Reset the continuous timeout counter (time drift detection)
contTimeouts = 0
} }
} }
r.matched <- matched r.matched <- matched
}(r) }()
// Reset the continuous timeout counter (time drift detection)
contTimeouts = 0
case key := <-t.gotkey: case key := <-t.gotkey:
go func() { go func() {
if key, err := v4wire.DecodePubkey(crypto.S256(), key); err == nil { if key, err := v4wire.DecodePubkey(crypto.S256(), key); err == nil {