2021-07-04 17:24:15 +00:00
/ *
Copyright 2021 Erigon contributors
Licensed under the Apache License , Version 2.0 ( the "License" ) ;
you may not use this file except in compliance with the License .
You may obtain a copy of the License at
http : //www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing , software
distributed under the License is distributed on an "AS IS" BASIS ,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
See the License for the specific language governing permissions and
limitations under the License .
* /
package txpool
import (
2021-07-12 16:55:47 +00:00
"context"
2021-07-14 16:14:36 +00:00
"fmt"
2021-07-12 16:55:47 +00:00
"sync"
2021-07-13 19:02:03 +00:00
"time"
2021-07-12 16:55:47 +00:00
2021-09-18 13:58:20 +00:00
"github.com/holiman/uint256"
2021-10-22 02:12:39 +00:00
"github.com/ledgerwatch/erigon-lib/common/dbg"
2021-09-07 02:52:17 +00:00
"github.com/ledgerwatch/erigon-lib/direct"
2022-02-18 02:40:11 +00:00
"github.com/ledgerwatch/erigon-lib/gointerfaces/grpcutil"
2021-08-09 02:43:15 +00:00
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
2021-07-04 17:24:15 +00:00
"github.com/ledgerwatch/erigon-lib/gointerfaces/sentry"
2021-08-11 04:21:36 +00:00
"github.com/ledgerwatch/erigon-lib/kv"
2021-12-22 07:34:34 +00:00
"github.com/ledgerwatch/erigon-lib/rlp"
2022-04-11 03:02:44 +00:00
types2 "github.com/ledgerwatch/erigon-lib/types"
2021-07-28 04:50:45 +00:00
"github.com/ledgerwatch/log/v3"
2021-07-12 18:37:35 +00:00
"google.golang.org/grpc"
2021-08-14 08:28:05 +00:00
"google.golang.org/protobuf/types/known/emptypb"
2021-07-04 17:24:15 +00:00
)
2022-06-28 11:39:45 +00:00
// Fetch connects to sentry and implements eth/66 protocol regarding the transaction
2021-07-12 16:55:47 +00:00
// messages. It tries to "prime" the sentry with StatusData message containing given
// genesis hash and list of forks, but with zero max block and total difficulty
// Sentry should have a logic not to overwrite statusData with messages from tx pool
2021-07-04 17:24:15 +00:00
type Fetch struct {
2021-11-24 17:13:17 +00:00
ctx context . Context // Context used for cancellation and closing of the fetcher
sentryClients [ ] direct . SentryClient // sentry clients that will be used for accessing the network
pool Pool // Transaction pool implementation
coreDB kv . RoDB
db kv . RwDB
wg * sync . WaitGroup // used for synchronisation in the tests (nil when not in tests)
stateChangesClient StateChangesClient
2022-04-11 03:02:44 +00:00
stateChangesParseCtx * types2 . TxParseContext
2021-11-24 17:13:17 +00:00
stateChangesParseCtxLock sync . Mutex
2022-04-11 03:02:44 +00:00
pooledTxsParseCtx * types2 . TxParseContext
2021-11-24 17:13:17 +00:00
pooledTxsParseCtxLock sync . Mutex
2021-07-25 12:19:33 +00:00
}
2021-09-07 02:52:17 +00:00
type StateChangesClient interface {
StateChanges ( ctx context . Context , in * remote . StateChangeRequest , opts ... grpc . CallOption ) ( remote . KV_StateChangesClient , error )
}
2021-07-12 16:55:47 +00:00
// NewFetch creates a new fetch object that will work with given sentry clients. Since the
// SentryClient here is an interface, it is suitable for mocking in tests (mock will need
// to implement all the functions of the SentryClient interface).
2021-10-28 03:13:39 +00:00
func NewFetch ( ctx context . Context , sentryClients [ ] direct . SentryClient , pool Pool , stateChangesClient StateChangesClient , coreDB kv . RoDB , db kv . RwDB , chainID uint256 . Int ) * Fetch {
2021-11-05 10:04:17 +00:00
f := & Fetch {
2021-08-21 08:24:03 +00:00
ctx : ctx ,
sentryClients : sentryClients ,
pool : pool ,
2021-08-22 10:06:38 +00:00
coreDB : coreDB ,
db : db ,
2021-08-21 08:24:03 +00:00
stateChangesClient : stateChangesClient ,
2022-07-23 03:11:25 +00:00
stateChangesParseCtx : types2 . NewTxParseContext ( chainID ) . ChainIDRequired ( ) , //TODO: change ctx if rules changed
pooledTxsParseCtx : types2 . NewTxParseContext ( chainID ) . ChainIDRequired ( ) ,
2021-07-04 17:24:15 +00:00
}
2021-11-05 10:04:17 +00:00
f . pooledTxsParseCtx . ValidateRLP ( f . pool . ValidateSerializedTxn )
f . stateChangesParseCtx . ValidateRLP ( f . pool . ValidateSerializedTxn )
return f
2021-07-04 17:24:15 +00:00
}
2021-07-12 16:55:47 +00:00
func ( f * Fetch ) SetWaitGroup ( wg * sync . WaitGroup ) {
f . wg = wg
}
2022-04-11 03:02:44 +00:00
func ( f * Fetch ) threadSafeParsePooledTxn ( cb func ( * types2 . TxParseContext ) error ) error {
2021-11-24 17:13:17 +00:00
f . pooledTxsParseCtxLock . Lock ( )
defer f . pooledTxsParseCtxLock . Unlock ( )
return cb ( f . pooledTxsParseCtx )
}
2022-04-11 03:02:44 +00:00
func ( f * Fetch ) threadSafeParseStateChangeTxn ( cb func ( * types2 . TxParseContext ) error ) error {
2021-11-24 17:13:17 +00:00
f . stateChangesParseCtxLock . Lock ( )
defer f . stateChangesParseCtxLock . Unlock ( )
return cb ( f . stateChangesParseCtx )
}
2021-08-09 04:00:36 +00:00
// ConnectSentries initialises connection to the sentry
func ( f * Fetch ) ConnectSentries ( ) {
2021-11-24 17:13:17 +00:00
for i := range f . sentryClients {
go func ( i int ) {
f . receiveMessageLoop ( f . sentryClients [ i ] )
} ( i )
go func ( i int ) {
f . receivePeerLoop ( f . sentryClients [ i ] )
} ( i )
}
2021-08-09 04:00:36 +00:00
}
func ( f * Fetch ) ConnectCore ( ) {
2021-08-09 07:15:30 +00:00
go func ( ) {
for {
select {
case <- f . ctx . Done ( ) :
return
default :
}
2021-08-14 08:28:05 +00:00
if err := f . handleStateChanges ( f . ctx , f . stateChangesClient ) ; err != nil {
2022-02-18 02:40:11 +00:00
if grpcutil . IsRetryLater ( err ) || grpcutil . IsEndOfStream ( err ) {
time . Sleep ( 3 * time . Second )
2021-09-01 08:22:48 +00:00
continue
}
2021-08-14 08:28:05 +00:00
log . Warn ( "[txpool.handleStateChanges]" , "err" , err )
}
2021-08-09 07:15:30 +00:00
}
} ( )
2021-07-12 16:55:47 +00:00
}
func ( f * Fetch ) receiveMessageLoop ( sentryClient sentry . SentryClient ) {
for {
select {
case <- f . ctx . Done ( ) :
return
default :
}
2021-08-14 08:28:05 +00:00
if _ , err := sentryClient . HandShake ( f . ctx , & emptypb . Empty { } , grpc . WaitForReady ( true ) ) ; err != nil {
2022-02-18 02:40:11 +00:00
if grpcutil . IsRetryLater ( err ) || grpcutil . IsEndOfStream ( err ) {
time . Sleep ( 3 * time . Second )
2021-09-01 08:22:48 +00:00
continue
}
2021-07-13 19:02:03 +00:00
// Report error and wait more
2021-08-14 04:33:13 +00:00
log . Warn ( "[txpool.recvMessage] sentry not ready yet" , "err" , err )
2021-07-13 19:02:03 +00:00
continue
2021-07-12 18:37:35 +00:00
}
2021-08-14 04:33:13 +00:00
if err := f . receiveMessage ( f . ctx , sentryClient ) ; err != nil {
2022-02-18 02:40:11 +00:00
if grpcutil . IsRetryLater ( err ) || grpcutil . IsEndOfStream ( err ) {
time . Sleep ( 3 * time . Second )
2021-09-01 08:22:48 +00:00
continue
}
2021-08-14 04:33:13 +00:00
log . Warn ( "[txpool.recvMessage]" , "err" , err )
}
}
}
func ( f * Fetch ) receiveMessage ( ctx context . Context , sentryClient sentry . SentryClient ) error {
streamCtx , cancel := context . WithCancel ( ctx )
defer cancel ( )
stream , err := sentryClient . Messages ( streamCtx , & sentry . MessagesRequest { Ids : [ ] sentry . MessageId {
2022-06-28 11:39:45 +00:00
sentry . MessageId_NEW_POOLED_TRANSACTION_HASHES_66 ,
sentry . MessageId_GET_POOLED_TRANSACTIONS_66 ,
sentry . MessageId_TRANSACTIONS_66 ,
sentry . MessageId_POOLED_TRANSACTIONS_66 ,
2021-08-14 04:33:13 +00:00
} } , grpc . WaitForReady ( true ) )
if err != nil {
select {
case <- f . ctx . Done ( ) :
return ctx . Err ( )
default :
}
return err
}
var req * sentry . InboundMessage
for req , err = stream . Recv ( ) ; ; req , err = stream . Recv ( ) {
2021-07-12 18:37:35 +00:00
if err != nil {
select {
case <- f . ctx . Done ( ) :
2021-08-14 04:33:13 +00:00
return ctx . Err ( )
2021-07-12 18:37:35 +00:00
default :
}
2021-08-14 04:33:13 +00:00
return err
2021-07-12 18:37:35 +00:00
}
2021-08-14 04:33:13 +00:00
if req == nil {
return nil
}
2021-08-20 11:16:33 +00:00
if err := f . handleInboundMessage ( streamCtx , req , sentryClient ) ; err != nil {
2022-02-18 02:40:11 +00:00
if grpcutil . IsRetryLater ( err ) || grpcutil . IsEndOfStream ( err ) {
time . Sleep ( 3 * time . Second )
2021-09-01 08:22:48 +00:00
continue
}
2021-12-22 07:34:34 +00:00
if rlp . IsRLPError ( err ) {
log . Debug ( "[txpool.fetch] Handling incoming message" , "msg" , req . Id . String ( ) , "err" , err )
} else {
log . Warn ( "[txpool.fetch] Handling incoming message" , "msg" , req . Id . String ( ) , "err" , err )
}
2021-08-20 11:16:33 +00:00
}
if f . wg != nil {
f . wg . Done ( )
}
2021-07-12 16:55:47 +00:00
}
}
2021-10-22 02:12:39 +00:00
func ( f * Fetch ) handleInboundMessage ( ctx context . Context , req * sentry . InboundMessage , sentryClient sentry . SentryClient ) ( err error ) {
defer func ( ) {
if rec := recover ( ) ; rec != nil {
2021-10-22 02:41:20 +00:00
err = fmt . Errorf ( "%+v, trace: %s, rlp: %x" , rec , dbg . Stack ( ) , req . Data )
2021-10-22 02:12:39 +00:00
}
} ( )
2021-08-22 10:06:38 +00:00
if ! f . pool . Started ( ) {
return nil
}
tx , err := f . db . BeginRo ( ctx )
if err != nil {
return err
}
defer tx . Rollback ( )
2021-07-15 11:23:17 +00:00
switch req . Id {
2022-06-28 11:39:45 +00:00
case sentry . MessageId_NEW_POOLED_TRANSACTION_HASHES_66 :
2022-04-11 03:02:44 +00:00
hashCount , pos , err := types2 . ParseHashesCount ( req . Data , 0 )
2021-07-15 11:23:17 +00:00
if err != nil {
return fmt . Errorf ( "parsing NewPooledTransactionHashes: %w" , err )
}
var hashbuf [ 32 ] byte
2022-04-11 03:02:44 +00:00
var unknownHashes types2 . Hashes
2021-07-15 11:23:17 +00:00
for i := 0 ; i < hashCount ; i ++ {
2022-04-11 03:02:44 +00:00
_ , pos , err = types2 . ParseHash ( req . Data , pos , hashbuf [ : 0 ] )
2021-07-15 11:23:17 +00:00
if err != nil {
return fmt . Errorf ( "parsing NewPooledTransactionHashes: %w" , err )
}
2021-08-22 10:06:38 +00:00
known , err := f . pool . IdHashKnown ( tx , hashbuf [ : ] )
if err != nil {
return err
}
if ! known {
2021-07-16 13:29:21 +00:00
unknownHashes = append ( unknownHashes , hashbuf [ : ] ... )
}
}
if len ( unknownHashes ) > 0 {
2021-07-16 18:42:03 +00:00
var encodedRequest [ ] byte
2022-03-19 04:38:37 +00:00
var messageID sentry . MessageId
2021-09-07 02:52:17 +00:00
switch req . Id {
2022-06-28 11:39:45 +00:00
case sentry . MessageId_NEW_POOLED_TRANSACTION_HASHES_66 :
2022-04-11 03:02:44 +00:00
if encodedRequest , err = types2 . EncodeGetPooledTransactions66 ( unknownHashes , uint64 ( 1 ) , nil ) ; err != nil {
2021-07-16 18:42:03 +00:00
return err
}
2022-06-28 11:39:45 +00:00
messageID = sentry . MessageId_GET_POOLED_TRANSACTIONS_66
2021-09-07 02:52:17 +00:00
default :
return fmt . Errorf ( "unexpected message: %s" , req . Id . String ( ) )
2021-07-16 18:42:03 +00:00
}
if _ , err = sentryClient . SendMessageById ( f . ctx , & sentry . SendMessageByIdRequest {
2022-03-19 04:38:37 +00:00
Data : & sentry . OutboundMessageData { Id : messageID , Data : encodedRequest } ,
2021-07-16 18:42:03 +00:00
PeerId : req . PeerId ,
} , & grpc . EmptyCallOption { } ) ; err != nil {
2021-07-16 13:29:21 +00:00
return err
}
2021-07-15 11:23:17 +00:00
}
2022-06-28 11:39:45 +00:00
case sentry . MessageId_GET_POOLED_TRANSACTIONS_66 :
2021-08-03 07:50:10 +00:00
//TODO: handleInboundMessage is single-threaded - means it can accept as argument couple buffers (or analog of txParseContext). Protobuf encoding will copy data anyway, but DirectClient doesn't
2021-08-03 07:35:45 +00:00
var encodedRequest [ ] byte
2022-03-19 04:38:37 +00:00
var messageID sentry . MessageId
2021-09-07 02:52:17 +00:00
switch req . Id {
2022-06-28 11:39:45 +00:00
case sentry . MessageId_GET_POOLED_TRANSACTIONS_66 :
messageID = sentry . MessageId_POOLED_TRANSACTIONS_66
2022-04-11 03:02:44 +00:00
requestID , hashes , _ , err := types2 . ParseGetPooledTransactions66 ( req . Data , 0 , nil )
2021-08-03 06:56:46 +00:00
if err != nil {
2021-08-03 07:35:45 +00:00
return err
2021-08-03 06:56:46 +00:00
}
2021-08-03 07:35:45 +00:00
_ = requestID
var txs [ ] [ ] byte
for i := 0 ; i < len ( hashes ) ; i += 32 {
2021-08-22 10:06:38 +00:00
txn , err := f . pool . GetRlp ( tx , hashes [ i : i + 32 ] )
if err != nil {
return err
}
2021-08-03 07:35:45 +00:00
if txn == nil {
continue
}
txs = append ( txs , txn )
2021-08-03 06:56:46 +00:00
}
2021-08-03 07:35:45 +00:00
2022-04-11 03:02:44 +00:00
encodedRequest = types2 . EncodePooledTransactions66 ( txs , requestID , nil )
2021-09-07 02:52:17 +00:00
default :
return fmt . Errorf ( "unexpected message: %s" , req . Id . String ( ) )
2021-08-03 07:35:45 +00:00
}
2021-09-07 02:52:17 +00:00
2021-08-03 07:35:45 +00:00
if _ , err := sentryClient . SendMessageById ( f . ctx , & sentry . SendMessageByIdRequest {
2022-03-19 04:38:37 +00:00
Data : & sentry . OutboundMessageData { Id : messageID , Data : encodedRequest } ,
2021-08-03 07:35:45 +00:00
PeerId : req . PeerId ,
} , & grpc . EmptyCallOption { } ) ; err != nil {
return err
}
2022-06-28 11:39:45 +00:00
case sentry . MessageId_POOLED_TRANSACTIONS_66 , sentry . MessageId_TRANSACTIONS_66 :
2022-04-11 03:02:44 +00:00
txs := types2 . TxSlots { }
if err := f . threadSafeParsePooledTxn ( func ( parseContext * types2 . TxParseContext ) error {
2021-09-20 05:44:29 +00:00
return nil
2021-11-24 17:13:17 +00:00
} ) ; err != nil {
return err
}
2021-11-05 10:04:17 +00:00
2021-09-07 02:52:17 +00:00
switch req . Id {
2022-06-28 11:39:45 +00:00
case sentry . MessageId_TRANSACTIONS_66 :
2022-04-11 03:02:44 +00:00
if err := f . threadSafeParsePooledTxn ( func ( parseContext * types2 . TxParseContext ) error {
if _ , err := types2 . ParseTransactions ( req . Data , 0 , parseContext , & txs , func ( hash [ ] byte ) error {
2022-03-31 08:13:11 +00:00
known , err := f . pool . IdHashKnown ( tx , hash )
if err != nil {
return err
}
if known {
2022-04-11 03:02:44 +00:00
return types2 . ErrRejected
2022-03-31 08:13:11 +00:00
}
return nil
} ) ; err != nil {
2021-11-24 17:13:17 +00:00
return err
}
return nil
} ) ; err != nil {
2021-08-08 12:18:50 +00:00
return err
}
2022-06-28 11:39:45 +00:00
case sentry . MessageId_POOLED_TRANSACTIONS_66 :
2022-04-11 03:02:44 +00:00
if err := f . threadSafeParsePooledTxn ( func ( parseContext * types2 . TxParseContext ) error {
if _ , _ , err := types2 . ParsePooledTransactions66 ( req . Data , 0 , parseContext , & txs , func ( hash [ ] byte ) error {
2022-03-31 08:13:11 +00:00
known , err := f . pool . IdHashKnown ( tx , hash )
if err != nil {
return err
}
if known {
2022-04-11 03:02:44 +00:00
return types2 . ErrRejected
2022-03-31 08:13:11 +00:00
}
return nil
} ) ; err != nil {
2021-11-24 17:13:17 +00:00
return err
}
return nil
} ) ; err != nil {
2021-08-08 12:18:50 +00:00
return err
}
2021-09-07 02:52:17 +00:00
default :
return fmt . Errorf ( "unexpected message: %s" , req . Id . String ( ) )
2021-08-08 12:18:50 +00:00
}
2022-04-11 03:02:44 +00:00
if len ( txs . Txs ) == 0 {
2021-08-16 02:40:43 +00:00
return nil
}
2021-09-02 05:25:34 +00:00
f . pool . AddRemoteTxs ( ctx , txs )
2021-08-20 09:17:34 +00:00
default :
2021-10-19 08:19:54 +00:00
defer log . Trace ( "[txpool] dropped p2p message" , "id" , req . Id )
2021-07-15 11:23:17 +00:00
}
2021-08-03 06:56:46 +00:00
2021-07-12 18:37:35 +00:00
return nil
}
2021-07-12 16:55:47 +00:00
func ( f * Fetch ) receivePeerLoop ( sentryClient sentry . SentryClient ) {
2021-07-12 18:37:35 +00:00
for {
select {
case <- f . ctx . Done ( ) :
return
default :
}
2021-08-14 08:28:05 +00:00
if _ , err := sentryClient . HandShake ( f . ctx , & emptypb . Empty { } , grpc . WaitForReady ( true ) ) ; err != nil {
2022-02-18 02:40:11 +00:00
if grpcutil . IsRetryLater ( err ) || grpcutil . IsEndOfStream ( err ) {
time . Sleep ( 3 * time . Second )
2021-09-01 08:22:48 +00:00
continue
}
2021-07-13 19:02:03 +00:00
// Report error and wait more
2021-08-14 04:33:13 +00:00
log . Warn ( "[txpool.recvPeers] sentry not ready yet" , "err" , err )
2021-07-13 19:02:03 +00:00
time . Sleep ( time . Second )
continue
2021-07-12 18:37:35 +00:00
}
2021-08-14 04:33:13 +00:00
if err := f . receivePeer ( sentryClient ) ; err != nil {
2022-02-18 02:40:11 +00:00
if grpcutil . IsRetryLater ( err ) || grpcutil . IsEndOfStream ( err ) {
time . Sleep ( 3 * time . Second )
2021-09-01 08:22:48 +00:00
continue
}
2021-08-14 04:33:13 +00:00
log . Warn ( "[txpool.recvPeers]" , "err" , err )
2021-07-12 18:37:35 +00:00
}
2021-08-14 04:33:13 +00:00
}
}
2021-07-12 18:37:35 +00:00
2021-08-14 04:33:13 +00:00
func ( f * Fetch ) receivePeer ( sentryClient sentry . SentryClient ) error {
streamCtx , cancel := context . WithCancel ( f . ctx )
defer cancel ( )
2022-04-23 14:43:24 +00:00
stream , err := sentryClient . PeerEvents ( streamCtx , & sentry . PeerEventsRequest { } )
2021-08-14 04:33:13 +00:00
if err != nil {
select {
case <- f . ctx . Done ( ) :
return f . ctx . Err ( )
default :
}
return err
}
2022-04-23 14:43:24 +00:00
var req * sentry . PeerEvent
2021-08-14 04:33:13 +00:00
for req , err = stream . Recv ( ) ; ; req , err = stream . Recv ( ) {
if err != nil {
return err
}
if req == nil {
return nil
}
if err = f . handleNewPeer ( req ) ; err != nil {
return err
}
if f . wg != nil {
f . wg . Done ( )
2021-07-12 18:37:35 +00:00
}
}
2021-07-12 16:55:47 +00:00
}
2021-07-13 19:02:03 +00:00
2022-04-23 14:43:24 +00:00
func ( f * Fetch ) handleNewPeer ( req * sentry . PeerEvent ) error {
2021-07-25 12:19:33 +00:00
if req == nil {
return nil
}
2022-04-23 14:43:24 +00:00
switch req . EventId {
case sentry . PeerEvent_Connect :
2021-08-08 12:18:50 +00:00
f . pool . AddNewGoodPeer ( req . PeerId )
2021-07-25 12:19:33 +00:00
}
2021-07-13 19:02:03 +00:00
return nil
}
2021-08-09 02:43:15 +00:00
2021-09-07 02:52:17 +00:00
func ( f * Fetch ) handleStateChanges ( ctx context . Context , client StateChangesClient ) error {
2021-08-09 02:43:15 +00:00
streamCtx , cancel := context . WithCancel ( ctx )
defer cancel ( )
stream , err := client . StateChanges ( streamCtx , & remote . StateChangeRequest { WithStorage : false , WithTransactions : true } , grpc . WaitForReady ( true ) )
if err != nil {
2021-08-14 08:28:05 +00:00
return err
2021-08-09 02:43:15 +00:00
}
for req , err := stream . Recv ( ) ; ; req , err = stream . Recv ( ) {
if err != nil {
2021-08-14 08:28:05 +00:00
return err
2021-08-09 02:43:15 +00:00
}
if req == nil {
2021-08-14 08:28:05 +00:00
return nil
2021-08-09 02:43:15 +00:00
}
2022-04-11 03:02:44 +00:00
var unwindTxs , minedTxs types2 . TxSlots
2021-09-17 02:56:04 +00:00
for _ , change := range req . ChangeBatch {
if change . Direction == remote . Direction_FORWARD {
minedTxs . Resize ( uint ( len ( change . Txs ) ) )
for i := range change . Txs {
2022-04-11 03:02:44 +00:00
minedTxs . Txs [ i ] = & types2 . TxSlot { }
if err = f . threadSafeParseStateChangeTxn ( func ( parseContext * types2 . TxParseContext ) error {
2022-07-08 14:25:51 +00:00
_ , err := parseContext . ParseTransaction ( change . Txs [ i ] , 0 , minedTxs . Txs [ i ] , minedTxs . Senders . At ( i ) , false /* hasEnvelope */ , nil )
2021-11-24 17:13:17 +00:00
return err
} ) ; err != nil {
2021-09-17 02:56:04 +00:00
log . Warn ( "stream.Recv" , "err" , err )
continue
}
2021-08-09 07:04:22 +00:00
}
}
2021-09-17 02:56:04 +00:00
if change . Direction == remote . Direction_UNWIND {
unwindTxs . Resize ( uint ( len ( change . Txs ) ) )
for i := range change . Txs {
2022-04-11 03:02:44 +00:00
unwindTxs . Txs [ i ] = & types2 . TxSlot { }
if err = f . threadSafeParseStateChangeTxn ( func ( parseContext * types2 . TxParseContext ) error {
2022-07-08 14:25:51 +00:00
_ , err = parseContext . ParseTransaction ( change . Txs [ i ] , 0 , unwindTxs . Txs [ i ] , unwindTxs . Senders . At ( i ) , false /* hasEnvelope */ , nil )
2021-11-24 17:13:17 +00:00
return err
} ) ; err != nil {
2021-09-17 02:56:04 +00:00
log . Warn ( "stream.Recv" , "err" , err )
continue
}
2021-08-09 07:04:22 +00:00
}
}
2021-08-09 02:43:15 +00:00
}
2021-09-07 02:52:17 +00:00
if err := f . db . View ( ctx , func ( tx kv . Tx ) error {
2021-09-20 05:44:29 +00:00
return f . pool . OnNewBlock ( ctx , req , unwindTxs , minedTxs , tx )
2021-09-07 02:52:17 +00:00
} ) ; err != nil {
2021-08-09 02:43:15 +00:00
log . Warn ( "onNewBlock" , "err" , err )
}
2021-08-09 07:04:22 +00:00
if f . wg != nil {
f . wg . Done ( )
}
2021-08-09 02:43:15 +00:00
}
}