2021-07-04 17:24:15 +00:00
/ *
Copyright 2021 Erigon contributors
Licensed under the Apache License , Version 2.0 ( the "License" ) ;
you may not use this file except in compliance with the License .
You may obtain a copy of the License at
http : //www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing , software
distributed under the License is distributed on an "AS IS" BASIS ,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
See the License for the specific language governing permissions and
limitations under the License .
* /
package txpool
import (
2021-07-12 16:55:47 +00:00
"context"
2021-07-12 18:37:35 +00:00
"errors"
2021-07-14 16:14:36 +00:00
"fmt"
2021-07-12 18:37:35 +00:00
"io"
2021-07-12 16:55:47 +00:00
"sync"
2021-07-13 19:02:03 +00:00
"time"
2021-07-12 16:55:47 +00:00
2021-07-12 18:37:35 +00:00
"github.com/holiman/uint256"
"github.com/ledgerwatch/erigon-lib/gointerfaces"
2021-07-04 17:24:15 +00:00
"github.com/ledgerwatch/erigon-lib/gointerfaces/sentry"
2021-07-28 04:50:45 +00:00
"github.com/ledgerwatch/log/v3"
2021-07-12 18:37:35 +00:00
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
2021-07-04 17:24:15 +00:00
)
2021-08-03 08:22:43 +00:00
// Fetch connects to sentry and implements eth/65 or eth/66 protocol regarding the transaction
2021-07-12 16:55:47 +00:00
// messages. It tries to "prime" the sentry with StatusData message containing given
// genesis hash and list of forks, but with zero max block and total difficulty
// Sentry should have a logic not to overwrite statusData with messages from tx pool
2021-07-04 17:24:15 +00:00
type Fetch struct {
2021-07-12 16:55:47 +00:00
ctx context . Context // Context used for cancellation and closing of the fetcher
sentryClients [ ] sentry . SentryClient // sentry clients that will be used for accessing the network
2021-07-12 18:37:35 +00:00
statusData * sentry . StatusData // Status data used for "handshaking" with sentries
2021-07-16 13:29:21 +00:00
pool Pool // Transaction pool implementation
2021-08-03 08:22:43 +00:00
wg * sync . WaitGroup // used for synchronisation in the tests (nil when not in tests)
2021-07-26 03:38:01 +00:00
logger log . Logger
2021-07-25 12:19:33 +00:00
}
type Timings struct {
propagateAllNewTxsEvery time . Duration
syncToNewPeersEvery time . Duration
broadcastLocalTransactionsEvery time . Duration
}
var DefaultTimings = Timings {
propagateAllNewTxsEvery : 5 * time . Second ,
broadcastLocalTransactionsEvery : 2 * time . Minute ,
syncToNewPeersEvery : 2 * time . Minute ,
2021-07-04 17:24:15 +00:00
}
2021-07-12 16:55:47 +00:00
// NewFetch creates a new fetch object that will work with given sentry clients. Since the
// SentryClient here is an interface, it is suitable for mocking in tests (mock will need
// to implement all the functions of the SentryClient interface).
func NewFetch ( ctx context . Context ,
sentryClients [ ] sentry . SentryClient ,
genesisHash [ 32 ] byte ,
networkId uint64 ,
forks [ ] uint64 ,
2021-07-16 13:29:21 +00:00
pool Pool ,
2021-07-26 03:38:01 +00:00
logger log . Logger ,
2021-07-12 16:55:47 +00:00
) * Fetch {
2021-07-12 18:37:35 +00:00
statusData := & sentry . StatusData {
NetworkId : networkId ,
TotalDifficulty : gointerfaces . ConvertUint256IntToH256 ( uint256 . NewInt ( 0 ) ) ,
BestHash : gointerfaces . ConvertHashToH256 ( genesisHash ) ,
MaxBlock : 0 ,
ForkData : & sentry . Forks {
Genesis : gointerfaces . ConvertHashToH256 ( genesisHash ) ,
Forks : forks ,
} ,
}
2021-07-04 17:24:15 +00:00
return & Fetch {
2021-07-12 16:55:47 +00:00
ctx : ctx ,
sentryClients : sentryClients ,
2021-07-12 18:37:35 +00:00
statusData : statusData ,
2021-07-16 13:29:21 +00:00
pool : pool ,
2021-07-28 04:50:45 +00:00
logger : logger ,
2021-07-04 17:24:15 +00:00
}
}
2021-07-12 16:55:47 +00:00
func ( f * Fetch ) SetWaitGroup ( wg * sync . WaitGroup ) {
f . wg = wg
}
// Start initialises connection to the sentry
func ( f * Fetch ) Start ( ) {
for i := range f . sentryClients {
go func ( i int ) {
f . receiveMessageLoop ( f . sentryClients [ i ] )
} ( i )
go func ( i int ) {
f . receivePeerLoop ( f . sentryClients [ i ] )
} ( i )
}
}
func ( f * Fetch ) receiveMessageLoop ( sentryClient sentry . SentryClient ) {
2021-07-28 04:50:45 +00:00
logger := f . logger
2021-07-12 16:55:47 +00:00
for {
select {
case <- f . ctx . Done ( ) :
return
default :
}
2021-07-12 18:37:35 +00:00
_ , err := sentryClient . SetStatus ( f . ctx , f . statusData , grpc . WaitForReady ( true ) )
if err != nil {
if s , ok := status . FromError ( err ) ; ok && s . Code ( ) == codes . Canceled {
return
}
2021-07-13 19:02:03 +00:00
// Report error and wait more
2021-07-28 04:50:45 +00:00
logger . Warn ( "sentry not ready yet" , "err" , err )
2021-07-13 19:02:03 +00:00
time . Sleep ( time . Second )
continue
2021-07-12 18:37:35 +00:00
}
streamCtx , cancel := context . WithCancel ( f . ctx )
defer cancel ( )
stream , err := sentryClient . Messages ( streamCtx , & sentry . MessagesRequest { Ids : [ ] sentry . MessageId {
sentry . MessageId_NEW_POOLED_TRANSACTION_HASHES_65 ,
sentry . MessageId_GET_POOLED_TRANSACTIONS_65 ,
sentry . MessageId_TRANSACTIONS_65 ,
sentry . MessageId_POOLED_TRANSACTIONS_65 ,
sentry . MessageId_NEW_POOLED_TRANSACTION_HASHES_66 ,
sentry . MessageId_GET_POOLED_TRANSACTIONS_66 ,
sentry . MessageId_TRANSACTIONS_66 ,
sentry . MessageId_POOLED_TRANSACTIONS_66 ,
} } , grpc . WaitForReady ( true ) )
if err != nil {
select {
case <- f . ctx . Done ( ) :
return
default :
}
if s , ok := status . FromError ( err ) ; ok && s . Code ( ) == codes . Canceled {
return
}
if errors . Is ( err , io . EOF ) {
return
}
2021-07-28 04:50:45 +00:00
logger . Warn ( "messages" , "err" , err )
2021-07-12 18:37:35 +00:00
return
}
2021-07-12 16:55:47 +00:00
2021-07-12 18:37:35 +00:00
var req * sentry . InboundMessage
for req , err = stream . Recv ( ) ; ; req , err = stream . Recv ( ) {
if err != nil {
select {
case <- f . ctx . Done ( ) :
return
default :
}
if s , ok := status . FromError ( err ) ; ok && s . Code ( ) == codes . Canceled {
return
}
if errors . Is ( err , io . EOF ) {
return
}
2021-07-28 04:50:45 +00:00
logger . Warn ( "stream.Recv" , "err" , err )
2021-07-12 18:37:35 +00:00
return
}
if req == nil {
return
}
if err = f . handleInboundMessage ( req , sentryClient ) ; err != nil {
2021-07-28 04:50:45 +00:00
logger . Warn ( "Handling incoming message: %s" , "err" , err )
2021-07-12 18:37:35 +00:00
}
if f . wg != nil {
f . wg . Done ( )
}
}
2021-07-12 16:55:47 +00:00
}
}
2021-07-12 18:37:35 +00:00
func ( f * Fetch ) handleInboundMessage ( req * sentry . InboundMessage , sentryClient sentry . SentryClient ) error {
2021-07-15 11:23:17 +00:00
switch req . Id {
case sentry . MessageId_NEW_POOLED_TRANSACTION_HASHES_66 , sentry . MessageId_NEW_POOLED_TRANSACTION_HASHES_65 :
hashCount , pos , err := ParseHashesCount ( req . Data , 0 )
if err != nil {
return fmt . Errorf ( "parsing NewPooledTransactionHashes: %w" , err )
}
var hashbuf [ 32 ] byte
2021-07-26 00:57:49 +00:00
var unknownHashes Hashes
2021-07-15 11:23:17 +00:00
for i := 0 ; i < hashCount ; i ++ {
2021-07-27 10:07:10 +00:00
_ , pos , err = ParseHash ( req . Data , pos , hashbuf [ : 0 ] )
2021-07-15 11:23:17 +00:00
if err != nil {
return fmt . Errorf ( "parsing NewPooledTransactionHashes: %w" , err )
}
2021-07-16 13:29:21 +00:00
if ! f . pool . IdHashKnown ( hashbuf [ : ] ) {
unknownHashes = append ( unknownHashes , hashbuf [ : ] ... )
}
}
if len ( unknownHashes ) > 0 {
2021-07-16 18:42:03 +00:00
var encodedRequest [ ] byte
var messageId sentry . MessageId
if req . Id == sentry . MessageId_NEW_POOLED_TRANSACTION_HASHES_66 {
2021-07-26 12:13:07 +00:00
if encodedRequest , err = EncodeGetPooledTransactions66 ( unknownHashes , uint64 ( 1 ) , nil ) ; err != nil {
2021-07-16 18:42:03 +00:00
return err
}
messageId = sentry . MessageId_GET_POOLED_TRANSACTIONS_66
} else {
2021-07-27 09:44:47 +00:00
encodedRequest = EncodeHashes ( unknownHashes , nil )
2021-07-16 18:42:03 +00:00
messageId = sentry . MessageId_GET_POOLED_TRANSACTIONS_65
}
if _ , err = sentryClient . SendMessageById ( f . ctx , & sentry . SendMessageByIdRequest {
Data : & sentry . OutboundMessageData { Id : messageId , Data : encodedRequest } ,
PeerId : req . PeerId ,
} , & grpc . EmptyCallOption { } ) ; err != nil {
2021-07-16 13:29:21 +00:00
return err
}
2021-07-15 11:23:17 +00:00
}
2021-08-03 06:56:46 +00:00
case sentry . MessageId_GET_POOLED_TRANSACTIONS_66 , sentry . MessageId_GET_POOLED_TRANSACTIONS_65 :
2021-08-03 07:50:10 +00:00
//TODO: handleInboundMessage is single-threaded - means it can accept as argument couple buffers (or analog of txParseContext). Protobuf encoding will copy data anyway, but DirectClient doesn't
2021-08-03 07:35:45 +00:00
var encodedRequest [ ] byte
messageId := sentry . MessageId_POOLED_TRANSACTIONS_66
if req . Id == sentry . MessageId_GET_POOLED_TRANSACTIONS_65 {
messageId = sentry . MessageId_POOLED_TRANSACTIONS_65
}
if req . Id == sentry . MessageId_GET_POOLED_TRANSACTIONS_66 {
requestID , hashes , _ , err := ParseGetPooledTransactions66 ( req . Data , 0 , nil )
2021-08-03 06:56:46 +00:00
if err != nil {
2021-08-03 07:35:45 +00:00
return err
2021-08-03 06:56:46 +00:00
}
2021-08-03 07:35:45 +00:00
_ = requestID
var txs [ ] [ ] byte
for i := 0 ; i < len ( hashes ) ; i += 32 {
txn := f . pool . GetRlp ( hashes [ i : i + 32 ] )
if txn == nil {
continue
}
txs = append ( txs , txn )
2021-08-03 06:56:46 +00:00
}
2021-08-03 07:35:45 +00:00
encodedRequest = EncodePooledTransactions66 ( txs , requestID , nil )
} else {
hashes , _ , err := ParseGetPooledTransactions65 ( req . Data , 0 , nil )
2021-08-03 06:56:46 +00:00
if err != nil {
2021-08-03 07:35:45 +00:00
return err
}
var txs [ ] [ ] byte
for i := 0 ; i < len ( hashes ) ; i += 32 {
txn := f . pool . GetRlp ( hashes [ i : i + 32 ] )
if txn == nil {
continue
}
txs = append ( txs , txn )
2021-08-03 06:56:46 +00:00
}
2021-08-03 07:35:45 +00:00
encodedRequest = EncodePooledTransactions65 ( txs , nil )
}
if _ , err := sentryClient . SendMessageById ( f . ctx , & sentry . SendMessageByIdRequest {
Data : & sentry . OutboundMessageData { Id : messageId , Data : encodedRequest } ,
PeerId : req . PeerId ,
} , & grpc . EmptyCallOption { } ) ; err != nil {
return err
}
2021-07-15 11:23:17 +00:00
}
2021-08-03 06:56:46 +00:00
2021-07-12 18:37:35 +00:00
return nil
}
2021-07-12 16:55:47 +00:00
func ( f * Fetch ) receivePeerLoop ( sentryClient sentry . SentryClient ) {
2021-07-28 04:50:45 +00:00
logger := f . logger
2021-07-12 18:37:35 +00:00
for {
select {
case <- f . ctx . Done ( ) :
return
default :
}
_ , err := sentryClient . SetStatus ( f . ctx , f . statusData , grpc . WaitForReady ( true ) )
if err != nil {
if s , ok := status . FromError ( err ) ; ok && s . Code ( ) == codes . Canceled {
return
}
2021-07-13 19:02:03 +00:00
// Report error and wait more
2021-07-28 04:50:45 +00:00
logger . Warn ( "sentry not ready yet" , "err" , err )
2021-07-13 19:02:03 +00:00
time . Sleep ( time . Second )
continue
2021-07-12 18:37:35 +00:00
}
streamCtx , cancel := context . WithCancel ( f . ctx )
defer cancel ( )
2021-07-12 16:55:47 +00:00
2021-07-12 18:37:35 +00:00
stream , err := sentryClient . Peers ( streamCtx , & sentry . PeersRequest { } )
if err != nil {
select {
case <- f . ctx . Done ( ) :
return
default :
}
if s , ok := status . FromError ( err ) ; ok && s . Code ( ) == codes . Canceled {
return
}
if errors . Is ( err , io . EOF ) {
return
}
2021-07-28 04:50:45 +00:00
logger . Warn ( "peers" , "err" , err )
2021-07-12 18:37:35 +00:00
return
}
var req * sentry . PeersReply
for req , err = stream . Recv ( ) ; ; req , err = stream . Recv ( ) {
if err != nil {
select {
case <- f . ctx . Done ( ) :
return
default :
}
if s , ok := status . FromError ( err ) ; ok && s . Code ( ) == codes . Canceled {
return
}
if errors . Is ( err , io . EOF ) {
return
}
2021-07-28 04:50:45 +00:00
logger . Warn ( "stream.Recv" , "err" , err )
2021-07-12 18:37:35 +00:00
return
}
if req == nil {
return
}
2021-07-25 12:19:33 +00:00
if err = f . handleNewPeer ( req ) ; err != nil {
2021-07-28 04:50:45 +00:00
logger . Warn ( "Handling new peer" , "err" , err )
2021-07-12 18:37:35 +00:00
}
if f . wg != nil {
f . wg . Done ( )
}
}
}
2021-07-12 16:55:47 +00:00
}
2021-07-13 19:02:03 +00:00
2021-07-25 12:19:33 +00:00
func ( f * Fetch ) handleNewPeer ( req * sentry . PeersReply ) error {
if req == nil {
return nil
}
switch req . Event {
case sentry . PeersReply_Connect :
f . pool . NotifyNewPeer ( req . PeerId )
}
2021-07-13 19:02:03 +00:00
return nil
}