go-pulse/cmd/devp2p/internal/ethtest/transaction.go
Felix Lange 1bdf8b9b2d
cmd/devp2p/internal/ethtest: some fixes for the eth test suite (#28996)
Improving two things here:

On hive, where we look at these tests, the Go code comment above the test
is not visible. When there is a failure, it's not obvious what the test is actually
expecting. I have converted the comments in to printed log messages to
explain the test more.

Second, I noticed that besu is failing some tests because it happens to request
a header when we want it to send transactions. Trying the minimal fix here to
serve the headers.

Co-authored-by: lightclient <14004106+lightclient@users.noreply.github.com>
2024-02-15 19:43:37 +01:00

179 lines
4.8 KiB
Go

// Copyright 2020 The go-ethereum Authors
// This file is part of go-ethereum.
//
// go-ethereum is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// go-ethereum is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with go-ethereum. If not, see <http://www.gnu.org/licenses/>.
package ethtest
import (
"errors"
"fmt"
"os"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/internal/utesting"
)
// sendTxs sends the given transactions to the node and
// expects the node to accept and propagate them.
func (s *Suite) sendTxs(t *utesting.T, txs []*types.Transaction) error {
// Open sending conn.
sendConn, err := s.dial()
if err != nil {
return err
}
defer sendConn.Close()
if err = sendConn.peer(s.chain, nil); err != nil {
return fmt.Errorf("peering failed: %v", err)
}
// Open receiving conn.
recvConn, err := s.dial()
if err != nil {
return err
}
defer recvConn.Close()
if err = recvConn.peer(s.chain, nil); err != nil {
return fmt.Errorf("peering failed: %v", err)
}
if err = sendConn.Write(ethProto, eth.TransactionsMsg, eth.TransactionsPacket(txs)); err != nil {
return fmt.Errorf("failed to write message to connection: %v", err)
}
var (
got = make(map[common.Hash]bool)
end = time.Now().Add(timeout)
)
// Wait for the transaction announcements, make sure all txs ar propagated.
for time.Now().Before(end) {
msg, err := recvConn.ReadEth()
if err != nil {
return fmt.Errorf("failed to read from connection: %w", err)
}
switch msg := msg.(type) {
case *eth.TransactionsPacket:
for _, tx := range *msg {
got[tx.Hash()] = true
}
case *eth.NewPooledTransactionHashesPacket:
for _, hash := range msg.Hashes {
got[hash] = true
}
case *eth.GetBlockHeadersPacket:
headers, err := s.chain.GetHeaders(msg)
if err != nil {
t.Logf("invalid GetBlockHeaders request: %v", err)
}
recvConn.Write(ethProto, eth.BlockHeadersMsg, &eth.BlockHeadersPacket{
RequestId: msg.RequestId,
BlockHeadersRequest: headers,
})
default:
return fmt.Errorf("unexpected eth wire msg: %s", pretty.Sdump(msg))
}
// Check if all txs received.
allReceived := func() bool {
for _, tx := range txs {
if !got[tx.Hash()] {
return false
}
}
return true
}
if allReceived() {
return nil
}
}
return fmt.Errorf("timed out waiting for txs")
}
func (s *Suite) sendInvalidTxs(t *utesting.T, txs []*types.Transaction) error {
// Open sending conn.
sendConn, err := s.dial()
if err != nil {
return err
}
defer sendConn.Close()
if err = sendConn.peer(s.chain, nil); err != nil {
return fmt.Errorf("peering failed: %v", err)
}
sendConn.SetDeadline(time.Now().Add(timeout))
// Open receiving conn.
recvConn, err := s.dial()
if err != nil {
return err
}
defer recvConn.Close()
if err = recvConn.peer(s.chain, nil); err != nil {
return fmt.Errorf("peering failed: %v", err)
}
recvConn.SetDeadline(time.Now().Add(timeout))
if err = sendConn.Write(ethProto, eth.TransactionsMsg, txs); err != nil {
return fmt.Errorf("failed to write message to connection: %w", err)
}
// Make map of invalid txs.
invalids := make(map[common.Hash]struct{})
for _, tx := range txs {
invalids[tx.Hash()] = struct{}{}
}
// Get responses.
recvConn.SetReadDeadline(time.Now().Add(timeout))
for {
msg, err := recvConn.ReadEth()
if errors.Is(err, os.ErrDeadlineExceeded) {
// Successful if no invalid txs are propagated before timeout.
return nil
} else if err != nil {
return fmt.Errorf("failed to read from connection: %w", err)
}
switch msg := msg.(type) {
case *eth.TransactionsPacket:
for _, tx := range txs {
if _, ok := invalids[tx.Hash()]; ok {
return fmt.Errorf("received bad tx: %s", tx.Hash())
}
}
case *eth.NewPooledTransactionHashesPacket:
for _, hash := range msg.Hashes {
if _, ok := invalids[hash]; ok {
return fmt.Errorf("received bad tx: %s", hash)
}
}
case *eth.GetBlockHeadersPacket:
headers, err := s.chain.GetHeaders(msg)
if err != nil {
t.Logf("invalid GetBlockHeaders request: %v", err)
}
recvConn.Write(ethProto, eth.BlockHeadersMsg, &eth.BlockHeadersPacket{
RequestId: msg.RequestId,
BlockHeadersRequest: headers,
})
default:
return fmt.Errorf("unexpected eth message: %v", pretty.Sdump(msg))
}
}
}