go-pulse/p2p/discover/v5wire/msg.go
Felix Lange 47cdea5ac5
p2p/discover: concurrent TALKREQ handling (#27112)
This changes TALKREQ message processing to run the handler on separate goroutine,
instead of running on the main discv5 dispatcher goroutine. It's better this way because
it allows the handler to perform blocking actions.

I'm also adding a new method TalkRequestToID here. The method allows implementing
a request flow where one node A sends TALKREQ to another node B, and node B later
sends a TALKREQ back. With TalkRequestToID, node B does not need the ENR of A to
send its request.
2023-04-28 11:03:43 +02:00

230 lines
6.9 KiB
Go

// Copyright 2020 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package v5wire
import (
"fmt"
"net"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/ethereum/go-ethereum/rlp"
)
// Packet is implemented by all message types.
type Packet interface {
Name() string // Name returns a string corresponding to the message type.
Kind() byte // Kind returns the message type.
RequestID() []byte // Returns the request ID.
SetRequestID([]byte) // Sets the request ID.
// AppendLogInfo returns its argument 'ctx' with additional fields
// appended for logging purposes.
AppendLogInfo(ctx []interface{}) []interface{}
}
// Message types.
const (
PingMsg byte = iota + 1
PongMsg
FindnodeMsg
NodesMsg
TalkRequestMsg
TalkResponseMsg
RequestTicketMsg
TicketMsg
UnknownPacket = byte(255) // any non-decryptable packet
WhoareyouPacket = byte(254) // the WHOAREYOU packet
)
// Protocol messages.
type (
// Unknown represents any packet that can't be decrypted.
Unknown struct {
Nonce Nonce
}
// WHOAREYOU contains the handshake challenge.
Whoareyou struct {
ChallengeData []byte // Encoded challenge
Nonce Nonce // Nonce of request packet
IDNonce [16]byte // Identity proof data
RecordSeq uint64 // ENR sequence number of recipient
// Node is the locally known node record of recipient.
// This must be set by the caller of Encode.
Node *enode.Node
sent mclock.AbsTime // for handshake GC.
}
// PING is sent during liveness checks.
Ping struct {
ReqID []byte
ENRSeq uint64
}
// PONG is the reply to PING.
Pong struct {
ReqID []byte
ENRSeq uint64
ToIP net.IP // These fields should mirror the UDP envelope address of the ping
ToPort uint16 // packet, which provides a way to discover the external address (after NAT).
}
// FINDNODE is a query for nodes in the given bucket.
Findnode struct {
ReqID []byte
Distances []uint
// OpID is for debugging purposes and is not part of the packet encoding.
// It identifies the 'operation' on behalf of which the request was sent.
OpID uint64 `rlp:"-"`
}
// NODES is a response to FINDNODE.
Nodes struct {
ReqID []byte
RespCount uint8 // total number of responses to the request
Nodes []*enr.Record
}
// TALKREQ is an application-level request.
TalkRequest struct {
ReqID []byte
Protocol string
Message []byte
}
// TALKRESP is the reply to TALKREQ.
TalkResponse struct {
ReqID []byte
Message []byte
}
)
// DecodeMessage decodes the message body of a packet.
func DecodeMessage(ptype byte, body []byte) (Packet, error) {
var dec Packet
switch ptype {
case PingMsg:
dec = new(Ping)
case PongMsg:
dec = new(Pong)
case FindnodeMsg:
dec = new(Findnode)
case NodesMsg:
dec = new(Nodes)
case TalkRequestMsg:
dec = new(TalkRequest)
case TalkResponseMsg:
dec = new(TalkResponse)
default:
return nil, fmt.Errorf("unknown packet type %d", ptype)
}
if err := rlp.DecodeBytes(body, dec); err != nil {
return nil, err
}
if dec.RequestID() != nil && len(dec.RequestID()) > 8 {
return nil, ErrInvalidReqID
}
return dec, nil
}
func (*Whoareyou) Name() string { return "WHOAREYOU/v5" }
func (*Whoareyou) Kind() byte { return WhoareyouPacket }
func (*Whoareyou) RequestID() []byte { return nil }
func (*Whoareyou) SetRequestID([]byte) {}
func (*Whoareyou) AppendLogInfo(ctx []interface{}) []interface{} {
return ctx
}
func (*Unknown) Name() string { return "UNKNOWN/v5" }
func (*Unknown) Kind() byte { return UnknownPacket }
func (*Unknown) RequestID() []byte { return nil }
func (*Unknown) SetRequestID([]byte) {}
func (*Unknown) AppendLogInfo(ctx []interface{}) []interface{} {
return ctx
}
func (*Ping) Name() string { return "PING/v5" }
func (*Ping) Kind() byte { return PingMsg }
func (p *Ping) RequestID() []byte { return p.ReqID }
func (p *Ping) SetRequestID(id []byte) { p.ReqID = id }
func (p *Ping) AppendLogInfo(ctx []interface{}) []interface{} {
return append(ctx, "req", hexutil.Bytes(p.ReqID), "enrseq", p.ENRSeq)
}
func (*Pong) Name() string { return "PONG/v5" }
func (*Pong) Kind() byte { return PongMsg }
func (p *Pong) RequestID() []byte { return p.ReqID }
func (p *Pong) SetRequestID(id []byte) { p.ReqID = id }
func (p *Pong) AppendLogInfo(ctx []interface{}) []interface{} {
return append(ctx, "req", hexutil.Bytes(p.ReqID), "enrseq", p.ENRSeq)
}
func (p *Findnode) Name() string { return "FINDNODE/v5" }
func (p *Findnode) Kind() byte { return FindnodeMsg }
func (p *Findnode) RequestID() []byte { return p.ReqID }
func (p *Findnode) SetRequestID(id []byte) { p.ReqID = id }
func (p *Findnode) AppendLogInfo(ctx []interface{}) []interface{} {
ctx = append(ctx, "req", hexutil.Bytes(p.ReqID))
if p.OpID != 0 {
ctx = append(ctx, "opid", p.OpID)
}
return ctx
}
func (*Nodes) Name() string { return "NODES/v5" }
func (*Nodes) Kind() byte { return NodesMsg }
func (p *Nodes) RequestID() []byte { return p.ReqID }
func (p *Nodes) SetRequestID(id []byte) { p.ReqID = id }
func (p *Nodes) AppendLogInfo(ctx []interface{}) []interface{} {
return append(ctx,
"req", hexutil.Bytes(p.ReqID),
"tot", p.RespCount,
"n", len(p.Nodes),
)
}
func (*TalkRequest) Name() string { return "TALKREQ/v5" }
func (*TalkRequest) Kind() byte { return TalkRequestMsg }
func (p *TalkRequest) RequestID() []byte { return p.ReqID }
func (p *TalkRequest) SetRequestID(id []byte) { p.ReqID = id }
func (p *TalkRequest) AppendLogInfo(ctx []interface{}) []interface{} {
return append(ctx, "proto", p.Protocol, "req", hexutil.Bytes(p.ReqID), "len", len(p.Message))
}
func (*TalkResponse) Name() string { return "TALKRESP/v5" }
func (*TalkResponse) Kind() byte { return TalkResponseMsg }
func (p *TalkResponse) RequestID() []byte { return p.ReqID }
func (p *TalkResponse) SetRequestID(id []byte) { p.ReqID = id }
func (p *TalkResponse) AppendLogInfo(ctx []interface{}) []interface{} {
return append(ctx, "req", hexutil.Bytes(p.ReqID), "len", len(p.Message))
}