2021-07-04 17:24:15 +00:00
/ *
Copyright 2021 Erigon contributors
Licensed under the Apache License , Version 2.0 ( the "License" ) ;
you may not use this file except in compliance with the License .
You may obtain a copy of the License at
http : //www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing , software
distributed under the License is distributed on an "AS IS" BASIS ,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
See the License for the specific language governing permissions and
limitations under the License .
* /
package txpool
import (
2021-07-12 16:55:47 +00:00
"context"
2021-07-12 18:37:35 +00:00
"errors"
2021-07-14 16:14:36 +00:00
"fmt"
2021-07-12 18:37:35 +00:00
"io"
2021-07-12 16:55:47 +00:00
"sync"
2021-07-13 19:02:03 +00:00
"time"
2021-07-12 16:55:47 +00:00
2021-09-18 13:58:20 +00:00
"github.com/holiman/uint256"
"github.com/ledgerwatch/erigon-lib/chain"
2021-10-22 02:12:39 +00:00
"github.com/ledgerwatch/erigon-lib/common/dbg"
2021-09-07 02:52:17 +00:00
"github.com/ledgerwatch/erigon-lib/direct"
2021-08-09 02:43:15 +00:00
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
2021-07-04 17:24:15 +00:00
"github.com/ledgerwatch/erigon-lib/gointerfaces/sentry"
2021-08-11 04:21:36 +00:00
"github.com/ledgerwatch/erigon-lib/kv"
2021-07-28 04:50:45 +00:00
"github.com/ledgerwatch/log/v3"
2021-07-12 18:37:35 +00:00
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
2021-08-14 08:28:05 +00:00
"google.golang.org/protobuf/types/known/emptypb"
2021-07-04 17:24:15 +00:00
)
2021-08-03 08:22:43 +00:00
// Fetch connects to sentry and implements eth/65 or eth/66 protocol regarding the transaction
2021-07-12 16:55:47 +00:00
// messages. It tries to "prime" the sentry with StatusData message containing given
// genesis hash and list of forks, but with zero max block and total difficulty
// Sentry should have a logic not to overwrite statusData with messages from tx pool
2021-07-04 17:24:15 +00:00
type Fetch struct {
2021-08-21 08:24:03 +00:00
ctx context . Context // Context used for cancellation and closing of the fetcher
2021-09-07 02:52:17 +00:00
sentryClients [ ] direct . SentryClient // sentry clients that will be used for accessing the network
2021-08-21 08:24:03 +00:00
pool Pool // Transaction pool implementation
coreDB kv . RoDB
2021-08-22 10:06:38 +00:00
db kv . RwDB
2021-08-21 08:24:03 +00:00
wg * sync . WaitGroup // used for synchronisation in the tests (nil when not in tests)
2021-09-07 02:52:17 +00:00
stateChangesClient StateChangesClient
2021-08-21 08:24:03 +00:00
stateChangesParseCtx * TxParseContext
pooledTxsParseCtx * TxParseContext
2021-07-25 12:19:33 +00:00
}
2021-09-07 02:52:17 +00:00
type StateChangesClient interface {
StateChanges ( ctx context . Context , in * remote . StateChangeRequest , opts ... grpc . CallOption ) ( remote . KV_StateChangesClient , error )
}
2021-07-12 16:55:47 +00:00
// NewFetch creates a new fetch object that will work with given sentry clients. Since the
// SentryClient here is an interface, it is suitable for mocking in tests (mock will need
// to implement all the functions of the SentryClient interface).
2021-09-18 13:58:20 +00:00
func NewFetch ( ctx context . Context , sentryClients [ ] direct . SentryClient , pool Pool , stateChangesClient StateChangesClient , coreDB kv . RoDB , db kv . RwDB , rules chain . Rules , chainID uint256 . Int ) * Fetch {
2021-07-04 17:24:15 +00:00
return & Fetch {
2021-08-21 08:24:03 +00:00
ctx : ctx ,
sentryClients : sentryClients ,
pool : pool ,
2021-08-22 10:06:38 +00:00
coreDB : coreDB ,
db : db ,
2021-08-21 08:24:03 +00:00
stateChangesClient : stateChangesClient ,
2021-10-18 04:22:05 +00:00
stateChangesParseCtx : NewTxParseContext ( chainID ) , //TODO: change ctx if rules changed
pooledTxsParseCtx : NewTxParseContext ( chainID ) ,
2021-07-04 17:24:15 +00:00
}
}
2021-07-12 16:55:47 +00:00
func ( f * Fetch ) SetWaitGroup ( wg * sync . WaitGroup ) {
f . wg = wg
}
2021-08-09 04:00:36 +00:00
// ConnectSentries initialises connection to the sentry
func ( f * Fetch ) ConnectSentries ( ) {
2021-09-20 13:16:32 +00:00
//TODO: fix race in parse ctx - 2 sentries causing it
go func ( i int ) {
f . receiveMessageLoop ( f . sentryClients [ i ] )
} ( 0 )
go func ( i int ) {
f . receivePeerLoop ( f . sentryClients [ i ] )
} ( 0 )
/ *
for i := range f . sentryClients {
go func ( i int ) {
f . receiveMessageLoop ( f . sentryClients [ i ] )
} ( i )
go func ( i int ) {
f . receivePeerLoop ( f . sentryClients [ i ] )
} ( i )
}
* /
2021-08-09 04:00:36 +00:00
}
func ( f * Fetch ) ConnectCore ( ) {
2021-08-09 07:15:30 +00:00
go func ( ) {
for {
select {
case <- f . ctx . Done ( ) :
return
default :
}
2021-08-14 08:28:05 +00:00
if err := f . handleStateChanges ( f . ctx , f . stateChangesClient ) ; err != nil {
2021-09-01 08:22:48 +00:00
if s , ok := status . FromError ( err ) ; ok && retryLater ( s . Code ( ) ) {
2021-08-14 08:28:05 +00:00
time . Sleep ( time . Second )
continue
}
2021-09-01 08:22:48 +00:00
if errors . Is ( err , io . EOF ) || errors . Is ( err , context . Canceled ) {
continue
}
2021-08-14 08:28:05 +00:00
log . Warn ( "[txpool.handleStateChanges]" , "err" , err )
}
2021-08-09 07:15:30 +00:00
}
} ( )
2021-07-12 16:55:47 +00:00
}
func ( f * Fetch ) receiveMessageLoop ( sentryClient sentry . SentryClient ) {
for {
select {
case <- f . ctx . Done ( ) :
return
default :
}
2021-08-14 08:28:05 +00:00
if _ , err := sentryClient . HandShake ( f . ctx , & emptypb . Empty { } , grpc . WaitForReady ( true ) ) ; err != nil {
2021-09-01 08:22:48 +00:00
if s , ok := status . FromError ( err ) ; ok && retryLater ( s . Code ( ) ) {
2021-08-14 04:33:13 +00:00
time . Sleep ( time . Second )
continue
2021-07-12 18:37:35 +00:00
}
2021-09-01 08:22:48 +00:00
if errors . Is ( err , io . EOF ) || errors . Is ( err , context . Canceled ) {
continue
}
2021-07-13 19:02:03 +00:00
// Report error and wait more
2021-08-14 04:33:13 +00:00
log . Warn ( "[txpool.recvMessage] sentry not ready yet" , "err" , err )
2021-07-13 19:02:03 +00:00
continue
2021-07-12 18:37:35 +00:00
}
2021-08-14 04:33:13 +00:00
if err := f . receiveMessage ( f . ctx , sentryClient ) ; err != nil {
2021-09-01 08:22:48 +00:00
if s , ok := status . FromError ( err ) ; ok && retryLater ( s . Code ( ) ) {
2021-08-14 04:33:13 +00:00
time . Sleep ( time . Second )
continue
}
2021-09-01 08:22:48 +00:00
if errors . Is ( err , io . EOF ) || errors . Is ( err , context . Canceled ) {
continue
}
2021-08-14 04:33:13 +00:00
log . Warn ( "[txpool.recvMessage]" , "err" , err )
}
}
}
func ( f * Fetch ) receiveMessage ( ctx context . Context , sentryClient sentry . SentryClient ) error {
streamCtx , cancel := context . WithCancel ( ctx )
defer cancel ( )
stream , err := sentryClient . Messages ( streamCtx , & sentry . MessagesRequest { Ids : [ ] sentry . MessageId {
sentry . MessageId_NEW_POOLED_TRANSACTION_HASHES_65 ,
sentry . MessageId_GET_POOLED_TRANSACTIONS_65 ,
sentry . MessageId_TRANSACTIONS_65 ,
sentry . MessageId_POOLED_TRANSACTIONS_65 ,
sentry . MessageId_NEW_POOLED_TRANSACTION_HASHES_66 ,
sentry . MessageId_GET_POOLED_TRANSACTIONS_66 ,
sentry . MessageId_TRANSACTIONS_66 ,
sentry . MessageId_POOLED_TRANSACTIONS_66 ,
} } , grpc . WaitForReady ( true ) )
if err != nil {
select {
case <- f . ctx . Done ( ) :
return ctx . Err ( )
default :
}
return err
}
var req * sentry . InboundMessage
for req , err = stream . Recv ( ) ; ; req , err = stream . Recv ( ) {
2021-07-12 18:37:35 +00:00
if err != nil {
select {
case <- f . ctx . Done ( ) :
2021-08-14 04:33:13 +00:00
return ctx . Err ( )
2021-07-12 18:37:35 +00:00
default :
}
2021-08-14 04:33:13 +00:00
return err
2021-07-12 18:37:35 +00:00
}
2021-08-14 04:33:13 +00:00
if req == nil {
return nil
}
2021-08-20 11:16:33 +00:00
if err := f . handleInboundMessage ( streamCtx , req , sentryClient ) ; err != nil {
2021-09-01 08:22:48 +00:00
if s , ok := status . FromError ( err ) ; ok && retryLater ( s . Code ( ) ) {
time . Sleep ( time . Second )
continue
2021-08-20 09:17:34 +00:00
}
2021-09-01 08:22:48 +00:00
if errors . Is ( err , io . EOF ) || errors . Is ( err , context . Canceled ) {
continue
}
2021-10-22 02:12:39 +00:00
log . Warn ( "[txpool.fetch] Handling incoming message" , "msg" , req . Id . String ( ) , "err" , err )
2021-08-20 11:16:33 +00:00
}
if f . wg != nil {
f . wg . Done ( )
}
2021-07-12 16:55:47 +00:00
}
}
2021-10-22 02:12:39 +00:00
func ( f * Fetch ) handleInboundMessage ( ctx context . Context , req * sentry . InboundMessage , sentryClient sentry . SentryClient ) ( err error ) {
defer func ( ) {
if rec := recover ( ) ; rec != nil {
2021-10-22 02:41:20 +00:00
err = fmt . Errorf ( "%+v, trace: %s, rlp: %x" , rec , dbg . Stack ( ) , req . Data )
2021-10-22 02:12:39 +00:00
}
} ( )
2021-08-22 10:06:38 +00:00
if ! f . pool . Started ( ) {
return nil
}
tx , err := f . db . BeginRo ( ctx )
if err != nil {
return err
}
defer tx . Rollback ( )
2021-07-15 11:23:17 +00:00
switch req . Id {
case sentry . MessageId_NEW_POOLED_TRANSACTION_HASHES_66 , sentry . MessageId_NEW_POOLED_TRANSACTION_HASHES_65 :
hashCount , pos , err := ParseHashesCount ( req . Data , 0 )
if err != nil {
return fmt . Errorf ( "parsing NewPooledTransactionHashes: %w" , err )
}
var hashbuf [ 32 ] byte
2021-07-26 00:57:49 +00:00
var unknownHashes Hashes
2021-07-15 11:23:17 +00:00
for i := 0 ; i < hashCount ; i ++ {
2021-07-27 10:07:10 +00:00
_ , pos , err = ParseHash ( req . Data , pos , hashbuf [ : 0 ] )
2021-07-15 11:23:17 +00:00
if err != nil {
return fmt . Errorf ( "parsing NewPooledTransactionHashes: %w" , err )
}
2021-08-22 10:06:38 +00:00
known , err := f . pool . IdHashKnown ( tx , hashbuf [ : ] )
if err != nil {
return err
}
if ! known {
2021-07-16 13:29:21 +00:00
unknownHashes = append ( unknownHashes , hashbuf [ : ] ... )
}
}
if len ( unknownHashes ) > 0 {
2021-07-16 18:42:03 +00:00
var encodedRequest [ ] byte
var messageId sentry . MessageId
2021-09-07 02:52:17 +00:00
switch req . Id {
case sentry . MessageId_NEW_POOLED_TRANSACTION_HASHES_66 :
2021-07-26 12:13:07 +00:00
if encodedRequest , err = EncodeGetPooledTransactions66 ( unknownHashes , uint64 ( 1 ) , nil ) ; err != nil {
2021-07-16 18:42:03 +00:00
return err
}
messageId = sentry . MessageId_GET_POOLED_TRANSACTIONS_66
2021-09-07 02:52:17 +00:00
case sentry . MessageId_NEW_POOLED_TRANSACTION_HASHES_65 :
2021-07-27 09:44:47 +00:00
encodedRequest = EncodeHashes ( unknownHashes , nil )
2021-07-16 18:42:03 +00:00
messageId = sentry . MessageId_GET_POOLED_TRANSACTIONS_65
2021-09-07 02:52:17 +00:00
default :
return fmt . Errorf ( "unexpected message: %s" , req . Id . String ( ) )
2021-07-16 18:42:03 +00:00
}
if _ , err = sentryClient . SendMessageById ( f . ctx , & sentry . SendMessageByIdRequest {
Data : & sentry . OutboundMessageData { Id : messageId , Data : encodedRequest } ,
PeerId : req . PeerId ,
} , & grpc . EmptyCallOption { } ) ; err != nil {
2021-07-16 13:29:21 +00:00
return err
}
2021-07-15 11:23:17 +00:00
}
2021-08-03 06:56:46 +00:00
case sentry . MessageId_GET_POOLED_TRANSACTIONS_66 , sentry . MessageId_GET_POOLED_TRANSACTIONS_65 :
2021-08-03 07:50:10 +00:00
//TODO: handleInboundMessage is single-threaded - means it can accept as argument couple buffers (or analog of txParseContext). Protobuf encoding will copy data anyway, but DirectClient doesn't
2021-08-03 07:35:45 +00:00
var encodedRequest [ ] byte
2021-09-07 02:52:17 +00:00
var messageId sentry . MessageId
switch req . Id {
case sentry . MessageId_GET_POOLED_TRANSACTIONS_66 :
messageId = sentry . MessageId_POOLED_TRANSACTIONS_66
2021-08-03 07:35:45 +00:00
requestID , hashes , _ , err := ParseGetPooledTransactions66 ( req . Data , 0 , nil )
2021-08-03 06:56:46 +00:00
if err != nil {
2021-08-03 07:35:45 +00:00
return err
2021-08-03 06:56:46 +00:00
}
2021-08-03 07:35:45 +00:00
_ = requestID
var txs [ ] [ ] byte
for i := 0 ; i < len ( hashes ) ; i += 32 {
2021-08-22 10:06:38 +00:00
txn , err := f . pool . GetRlp ( tx , hashes [ i : i + 32 ] )
if err != nil {
return err
}
2021-08-03 07:35:45 +00:00
if txn == nil {
continue
}
txs = append ( txs , txn )
2021-08-03 06:56:46 +00:00
}
2021-08-03 07:35:45 +00:00
encodedRequest = EncodePooledTransactions66 ( txs , requestID , nil )
2021-09-07 02:52:17 +00:00
case sentry . MessageId_GET_POOLED_TRANSACTIONS_65 :
messageId = sentry . MessageId_POOLED_TRANSACTIONS_65
2021-08-03 07:35:45 +00:00
hashes , _ , err := ParseGetPooledTransactions65 ( req . Data , 0 , nil )
2021-08-03 06:56:46 +00:00
if err != nil {
2021-08-03 07:35:45 +00:00
return err
}
var txs [ ] [ ] byte
for i := 0 ; i < len ( hashes ) ; i += 32 {
2021-08-22 10:06:38 +00:00
txn , err := f . pool . GetRlp ( tx , hashes [ i : i + 32 ] )
if err != nil {
return err
}
2021-08-03 07:35:45 +00:00
if txn == nil {
continue
}
txs = append ( txs , txn )
2021-08-03 06:56:46 +00:00
}
2021-08-03 07:35:45 +00:00
encodedRequest = EncodePooledTransactions65 ( txs , nil )
2021-09-07 02:52:17 +00:00
default :
return fmt . Errorf ( "unexpected message: %s" , req . Id . String ( ) )
2021-08-03 07:35:45 +00:00
}
2021-09-07 02:52:17 +00:00
2021-08-03 07:35:45 +00:00
if _ , err := sentryClient . SendMessageById ( f . ctx , & sentry . SendMessageByIdRequest {
Data : & sentry . OutboundMessageData { Id : messageId , Data : encodedRequest } ,
PeerId : req . PeerId ,
} , & grpc . EmptyCallOption { } ) ; err != nil {
return err
}
2021-10-19 08:19:54 +00:00
case sentry . MessageId_POOLED_TRANSACTIONS_65 , sentry . MessageId_POOLED_TRANSACTIONS_66 , sentry . MessageId_TRANSACTIONS_65 , sentry . MessageId_TRANSACTIONS_66 :
2021-08-08 12:18:50 +00:00
txs := TxSlots { }
2021-09-20 05:44:29 +00:00
f . pooledTxsParseCtx . Reject ( func ( hash [ ] byte ) error {
2021-09-20 13:16:32 +00:00
known , err := f . pool . IdHashKnown ( tx , hash )
if err != nil {
return err
}
if known {
2021-09-20 05:44:29 +00:00
return ErrRejected
}
return nil
2021-08-22 10:06:38 +00:00
} )
2021-09-07 02:52:17 +00:00
switch req . Id {
2021-10-19 08:19:54 +00:00
case sentry . MessageId_POOLED_TRANSACTIONS_65 , sentry . MessageId_TRANSACTIONS_65 , sentry . MessageId_TRANSACTIONS_66 :
2021-09-08 12:21:13 +00:00
if _ , err := ParsePooledTransactions65 ( req . Data , 0 , f . pooledTxsParseCtx , & txs ) ; err != nil {
2021-08-08 12:18:50 +00:00
return err
}
2021-09-07 02:52:17 +00:00
case sentry . MessageId_POOLED_TRANSACTIONS_66 :
2021-09-08 12:21:13 +00:00
if _ , _ , err := ParsePooledTransactions66 ( req . Data , 0 , f . pooledTxsParseCtx , & txs ) ; err != nil {
2021-08-08 12:18:50 +00:00
return err
}
2021-09-07 02:52:17 +00:00
default :
return fmt . Errorf ( "unexpected message: %s" , req . Id . String ( ) )
2021-08-08 12:18:50 +00:00
}
2021-08-16 02:40:43 +00:00
if len ( txs . txs ) == 0 {
return nil
}
2021-09-02 05:25:34 +00:00
f . pool . AddRemoteTxs ( ctx , txs )
2021-08-20 09:17:34 +00:00
default :
2021-10-19 08:19:54 +00:00
defer log . Trace ( "[txpool] dropped p2p message" , "id" , req . Id )
2021-07-15 11:23:17 +00:00
}
2021-08-03 06:56:46 +00:00
2021-07-12 18:37:35 +00:00
return nil
}
2021-09-01 08:22:48 +00:00
func retryLater ( code codes . Code ) bool {
return code == codes . Unavailable || code == codes . Canceled || code == codes . ResourceExhausted
}
2021-07-12 16:55:47 +00:00
func ( f * Fetch ) receivePeerLoop ( sentryClient sentry . SentryClient ) {
2021-07-12 18:37:35 +00:00
for {
select {
case <- f . ctx . Done ( ) :
return
default :
}
2021-08-14 08:28:05 +00:00
if _ , err := sentryClient . HandShake ( f . ctx , & emptypb . Empty { } , grpc . WaitForReady ( true ) ) ; err != nil {
2021-09-01 08:22:48 +00:00
if s , ok := status . FromError ( err ) ; ok && retryLater ( s . Code ( ) ) {
2021-08-14 04:33:13 +00:00
time . Sleep ( time . Second )
continue
2021-07-12 18:37:35 +00:00
}
2021-09-01 08:22:48 +00:00
if errors . Is ( err , io . EOF ) || errors . Is ( err , context . Canceled ) {
continue
}
2021-07-13 19:02:03 +00:00
// Report error and wait more
2021-08-14 04:33:13 +00:00
log . Warn ( "[txpool.recvPeers] sentry not ready yet" , "err" , err )
2021-07-13 19:02:03 +00:00
time . Sleep ( time . Second )
continue
2021-07-12 18:37:35 +00:00
}
2021-08-14 04:33:13 +00:00
if err := f . receivePeer ( sentryClient ) ; err != nil {
2021-09-01 08:22:48 +00:00
if s , ok := status . FromError ( err ) ; ok && retryLater ( s . Code ( ) ) {
2021-08-14 04:33:13 +00:00
time . Sleep ( time . Second )
continue
2021-07-12 18:37:35 +00:00
}
2021-09-01 08:22:48 +00:00
if errors . Is ( err , io . EOF ) || errors . Is ( err , context . Canceled ) {
continue
}
2021-08-14 04:33:13 +00:00
log . Warn ( "[txpool.recvPeers]" , "err" , err )
2021-07-12 18:37:35 +00:00
}
2021-08-14 04:33:13 +00:00
}
}
2021-07-12 18:37:35 +00:00
2021-08-14 04:33:13 +00:00
func ( f * Fetch ) receivePeer ( sentryClient sentry . SentryClient ) error {
streamCtx , cancel := context . WithCancel ( f . ctx )
defer cancel ( )
stream , err := sentryClient . Peers ( streamCtx , & sentry . PeersRequest { } )
if err != nil {
select {
case <- f . ctx . Done ( ) :
return f . ctx . Err ( )
default :
}
return err
}
var req * sentry . PeersReply
for req , err = stream . Recv ( ) ; ; req , err = stream . Recv ( ) {
if err != nil {
return err
}
if req == nil {
return nil
}
if err = f . handleNewPeer ( req ) ; err != nil {
return err
}
if f . wg != nil {
f . wg . Done ( )
2021-07-12 18:37:35 +00:00
}
}
2021-07-12 16:55:47 +00:00
}
2021-07-13 19:02:03 +00:00
2021-07-25 12:19:33 +00:00
func ( f * Fetch ) handleNewPeer ( req * sentry . PeersReply ) error {
if req == nil {
return nil
}
switch req . Event {
case sentry . PeersReply_Connect :
2021-08-08 12:18:50 +00:00
f . pool . AddNewGoodPeer ( req . PeerId )
2021-07-25 12:19:33 +00:00
}
2021-07-13 19:02:03 +00:00
return nil
}
2021-08-09 02:43:15 +00:00
2021-09-07 02:52:17 +00:00
func ( f * Fetch ) handleStateChanges ( ctx context . Context , client StateChangesClient ) error {
2021-08-09 02:43:15 +00:00
streamCtx , cancel := context . WithCancel ( ctx )
defer cancel ( )
stream , err := client . StateChanges ( streamCtx , & remote . StateChangeRequest { WithStorage : false , WithTransactions : true } , grpc . WaitForReady ( true ) )
if err != nil {
2021-08-14 08:28:05 +00:00
return err
2021-08-09 02:43:15 +00:00
}
for req , err := stream . Recv ( ) ; ; req , err = stream . Recv ( ) {
if err != nil {
2021-08-14 08:28:05 +00:00
return err
2021-08-09 02:43:15 +00:00
}
if req == nil {
2021-08-14 08:28:05 +00:00
return nil
2021-08-09 02:43:15 +00:00
}
var unwindTxs , minedTxs TxSlots
2021-09-17 02:56:04 +00:00
for _ , change := range req . ChangeBatch {
if change . Direction == remote . Direction_FORWARD {
minedTxs . Resize ( uint ( len ( change . Txs ) ) )
for i := range change . Txs {
minedTxs . txs [ i ] = & TxSlot { }
if _ , err := f . stateChangesParseCtx . ParseTransaction ( change . Txs [ i ] , 0 , minedTxs . txs [ i ] , minedTxs . senders . At ( i ) ) ; err != nil {
log . Warn ( "stream.Recv" , "err" , err )
continue
}
2021-08-09 07:04:22 +00:00
}
}
2021-09-17 02:56:04 +00:00
if change . Direction == remote . Direction_UNWIND {
unwindTxs . Resize ( uint ( len ( change . Txs ) ) )
for i := range change . Txs {
unwindTxs . txs [ i ] = & TxSlot { }
if _ , err := f . stateChangesParseCtx . ParseTransaction ( change . Txs [ i ] , 0 , unwindTxs . txs [ i ] , unwindTxs . senders . At ( i ) ) ; err != nil {
log . Warn ( "stream.Recv" , "err" , err )
continue
}
2021-08-09 07:04:22 +00:00
}
}
2021-08-09 02:43:15 +00:00
}
2021-09-07 02:52:17 +00:00
if err := f . db . View ( ctx , func ( tx kv . Tx ) error {
2021-09-20 05:44:29 +00:00
return f . pool . OnNewBlock ( ctx , req , unwindTxs , minedTxs , tx )
2021-09-07 02:52:17 +00:00
} ) ; err != nil {
2021-08-09 02:43:15 +00:00
log . Warn ( "onNewBlock" , "err" , err )
}
2021-08-09 07:04:22 +00:00
if f . wg != nil {
f . wg . Done ( )
}
2021-08-09 02:43:15 +00:00
}
}