erigon-pulse/cmd/headers/download/tx_pool.go

175 lines
6.0 KiB
Go

package download
import (
"context"
"errors"
"fmt"
"io"
"math/rand"
"github.com/golang/protobuf/ptypes/empty"
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/core"
"github.com/ledgerwatch/turbo-geth/eth/fetcher"
"github.com/ledgerwatch/turbo-geth/eth/protocols/eth"
"github.com/ledgerwatch/turbo-geth/gointerfaces"
proto_sentry "github.com/ledgerwatch/turbo-geth/gointerfaces/sentry"
"github.com/ledgerwatch/turbo-geth/log"
"github.com/ledgerwatch/turbo-geth/rlp"
"google.golang.org/grpc"
)
type TxPoolServer struct {
sentries []proto_sentry.SentryClient
txPool *core.TxPool
TxFetcher *fetcher.TxFetcher
}
func NewTxPoolServer(sentries []proto_sentry.SentryClient, txPool *core.TxPool) (*TxPoolServer, error) {
cs := &TxPoolServer{
sentries: sentries,
txPool: txPool,
}
return cs, nil
}
func (tp *TxPoolServer) newPooledTransactionHashes(ctx context.Context, inreq *proto_sentry.InboundMessage, sentry proto_sentry.SentryClient) error {
var query eth.NewPooledTransactionHashesPacket
if err := rlp.DecodeBytes(inreq.Data, &query); err != nil {
return fmt.Errorf("decoding GetBlockHeader: %v, data: %x", err, inreq.Data)
}
return tp.TxFetcher.Notify(string(gointerfaces.ConvertH512ToBytes(inreq.PeerId)), query)
}
func (tp *TxPoolServer) pooledTransactions(ctx context.Context, inreq *proto_sentry.InboundMessage, sentry proto_sentry.SentryClient) error {
var query eth.PooledTransactionsPacket66
if err := rlp.DecodeBytes(inreq.Data, &query); err != nil {
return fmt.Errorf("decoding GetBlockHeader: %v, data: %x", err, inreq.Data)
}
return tp.TxFetcher.Enqueue(string(gointerfaces.ConvertH512ToBytes(inreq.PeerId)), query.PooledTransactionsPacket, true)
}
func (tp *TxPoolServer) transactions(ctx context.Context, inreq *proto_sentry.InboundMessage, sentry proto_sentry.SentryClient) error {
if tp.txPool == nil {
return nil
}
var query eth.TransactionsPacket
if err := rlp.DecodeBytes(inreq.Data, &query); err != nil {
return fmt.Errorf("decoding GetBlockHeader: %v, data: %x", err, inreq.Data)
}
return tp.TxFetcher.Enqueue(string(gointerfaces.ConvertH512ToBytes(inreq.PeerId)), query, false)
}
func (tp *TxPoolServer) getPooledTransactions(ctx context.Context, inreq *proto_sentry.InboundMessage, sentry proto_sentry.SentryClient) error {
if tp.txPool == nil {
return nil
}
var query eth.GetPooledTransactionsPacket66
if err := rlp.DecodeBytes(inreq.Data, &query); err != nil {
return fmt.Errorf("decoding GetBlockHeader: %v, data: %x", err, inreq.Data)
}
_, txs := eth.AnswerGetPooledTransactions(tp.txPool, query.GetPooledTransactionsPacket)
b, err := rlp.EncodeToBytes(&eth.PooledTransactionsRLPPacket66{
RequestId: query.RequestId,
PooledTransactionsRLPPacket: txs,
})
if err != nil {
return fmt.Errorf("encode header response: %v", err)
}
// TODO: implement logic from perr.ReplyPooledTransactionsRLP - to remember tx ids
outreq := proto_sentry.SendMessageByIdRequest{
PeerId: inreq.PeerId,
Data: &proto_sentry.OutboundMessageData{Id: proto_sentry.MessageId_PooledTransactions, Data: b},
}
_, err = sentry.SendMessageById(ctx, &outreq, &grpc.EmptyCallOption{})
if err != nil {
return fmt.Errorf("send pooled transactions response: %v", err)
}
return nil
}
func (tp *TxPoolServer) SendTxsRequest(ctx context.Context, peerID string, hashes []common.Hash) []byte {
bytes, err := rlp.EncodeToBytes(&eth.GetPooledTransactionsPacket66{
RequestId: rand.Uint64(), //nolint:gosec
GetPooledTransactionsPacket: hashes,
})
if err != nil {
log.Error("Could not send transactions request", "err", err)
return nil
}
outreq := proto_sentry.SendMessageByIdRequest{
PeerId: gointerfaces.ConvertBytesToH512([]byte(peerID)),
Data: &proto_sentry.OutboundMessageData{Id: proto_sentry.MessageId_GetPooledTransactions, Data: bytes},
}
// if sentry not found peers to send such message, try next one. stop if found.
for i, ok, next := tp.randSentryIndex(); ok; i, ok = next() {
sentPeers, err1 := tp.sentries[i].SendMessageById(ctx, &outreq, &grpc.EmptyCallOption{})
if err1 != nil {
log.Error("Could not send block bodies request", "err", err1)
continue
}
if sentPeers == nil || len(sentPeers.Peers) == 0 {
continue
}
return gointerfaces.ConvertH512ToBytes(sentPeers.Peers[0])
}
return nil
}
func (tp *TxPoolServer) randSentryIndex() (int, bool, func() (int, bool)) {
var i int
if len(tp.sentries) > 1 {
i = rand.Intn(len(tp.sentries) - 1)
}
to := i
return i, true, func() (int, bool) {
i = (i + 1) % len(tp.sentries)
return i, i != to
}
}
func (tp *TxPoolServer) HandleInboundMessage(ctx context.Context, inreq *proto_sentry.InboundMessage, sentry proto_sentry.SentryClient) error {
switch inreq.Id {
case proto_sentry.MessageId_NewPooledTransactionHashes:
return tp.newPooledTransactionHashes(ctx, inreq, sentry)
case proto_sentry.MessageId_PooledTransactions:
return tp.pooledTransactions(ctx, inreq, sentry)
case proto_sentry.MessageId_Transactions:
return tp.transactions(ctx, inreq, sentry)
case proto_sentry.MessageId_GetPooledTransactions:
return tp.getPooledTransactions(ctx, inreq, sentry)
default:
return fmt.Errorf("not implemented for message Id: %s", inreq.Id)
}
}
func RecvTxMessage(ctx context.Context, sentry proto_sentry.SentryClient, handleInboundMessage func(ctx context.Context, inreq *proto_sentry.InboundMessage, sentry proto_sentry.SentryClient) error) {
streamCtx, cancel := context.WithCancel(ctx)
defer cancel()
receiveClient, err2 := sentry.ReceiveTxMessages(streamCtx, &empty.Empty{}, &grpc.EmptyCallOption{})
if err2 != nil {
log.Error("Receive messages failed", "error", err2)
return
}
for req, err := receiveClient.Recv(); ; req, err = receiveClient.Recv() {
if err != nil {
if !errors.Is(err, io.EOF) {
log.Error("Receive loop terminated", "error", err)
return
}
return
}
if req == nil {
return
}
if err = handleInboundMessage(ctx, req, sentry); err != nil {
log.Error("Handling incoming message", "error", err)
}
}
}