diff --git a/txpool/fetch.go b/txpool/fetch.go index 5e7fa6160..af4a637f4 100644 --- a/txpool/fetch.go +++ b/txpool/fetch.go @@ -40,13 +40,15 @@ import ( // genesis hash and list of forks, but with zero max block and total difficulty // Sentry should have a logic not to overwrite statusData with messages from tx pool type Fetch struct { - ctx context.Context // Context used for cancellation and closing of the fetcher - sentryClients []sentry.SentryClient // sentry clients that will be used for accessing the network - pool Pool // Transaction pool implementation - senders *SendersCache - coreDB kv.RoDB - wg *sync.WaitGroup // used for synchronisation in the tests (nil when not in tests) - stateChangesClient remote.KVClient + ctx context.Context // Context used for cancellation and closing of the fetcher + sentryClients []sentry.SentryClient // sentry clients that will be used for accessing the network + pool Pool // Transaction pool implementation + senders *SendersCache + coreDB kv.RoDB + wg *sync.WaitGroup // used for synchronisation in the tests (nil when not in tests) + stateChangesClient remote.KVClient + stateChangesParseCtx *TxParseContext + pooledTxsParseCtx *TxParseContext } type Timings struct { @@ -63,13 +65,17 @@ var DefaultTimings = Timings{ // SentryClient here is an interface, it is suitable for mocking in tests (mock will need // to implement all the functions of the SentryClient interface). func NewFetch(ctx context.Context, sentryClients []sentry.SentryClient, pool Pool, senders *SendersCache, stateChangesClient remote.KVClient, db kv.RoDB) *Fetch { + pooledTxsParseCtx := NewTxParseContext() + pooledTxsParseCtx.Reject(func(hash []byte) bool { return pool.IdHashKnown(hash) }) return &Fetch{ - ctx: ctx, - sentryClients: sentryClients, - pool: pool, - senders: senders, - coreDB: db, - stateChangesClient: stateChangesClient, + ctx: ctx, + sentryClients: sentryClients, + pool: pool, + senders: senders, + coreDB: db, + stateChangesClient: stateChangesClient, + stateChangesParseCtx: NewTxParseContext(), + pooledTxsParseCtx: pooledTxsParseCtx, } } @@ -281,19 +287,13 @@ func (f *Fetch) handleInboundMessage(ctx context.Context, req *sentry.InboundMes if !f.pool.Started() { return nil } - - parseCtx := NewTxParseContext() - parseCtx.Reject(func(hash []byte) bool { - //fmt.Printf("check: %t\n", f.pool.IdHashKnown(hash)) - return f.pool.IdHashKnown(hash) - }) txs := TxSlots{} if req.Id == sentry.MessageId_GET_POOLED_TRANSACTIONS_66 { - if _, err := ParsePooledTransactions65(req.Data, 0, parseCtx, &txs); err != nil { + if _, err := ParsePooledTransactions65(req.Data, 0, f.pooledTxsParseCtx, &txs); err != nil { return err } } else { - if _, _, err := ParsePooledTransactions66(req.Data, 0, parseCtx, &txs); err != nil { + if _, _, err := ParsePooledTransactions66(req.Data, 0, f.pooledTxsParseCtx, &txs); err != nil { return err } } @@ -403,13 +403,12 @@ func (f *Fetch) handleStateChanges(ctx context.Context, client remote.KVClient) return nil } - parseCtx := NewTxParseContext() var unwindTxs, minedTxs TxSlots if req.Direction == remote.Direction_FORWARD { minedTxs.Growth(len(req.Txs)) for i := range req.Txs { minedTxs.txs[i] = &TxSlot{} - if _, err := parseCtx.ParseTransaction(req.Txs[i], 0, minedTxs.txs[i], minedTxs.senders.At(i)); err != nil { + if _, err := f.stateChangesParseCtx.ParseTransaction(req.Txs[i], 0, minedTxs.txs[i], minedTxs.senders.At(i)); err != nil { log.Warn("stream.Recv", "err", err) continue } @@ -419,7 +418,7 @@ func (f *Fetch) handleStateChanges(ctx context.Context, client remote.KVClient) unwindTxs.Growth(len(req.Txs)) for i := range req.Txs { unwindTxs.txs[i] = &TxSlot{} - if _, err := parseCtx.ParseTransaction(req.Txs[i], 0, unwindTxs.txs[i], unwindTxs.senders.At(i)); err != nil { + if _, err := f.stateChangesParseCtx.ParseTransaction(req.Txs[i], 0, unwindTxs.txs[i], unwindTxs.senders.At(i)); err != nil { log.Warn("stream.Recv", "err", err) continue } diff --git a/txpool/pool.go b/txpool/pool.go index 791fd2945..87bf961d0 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -291,6 +291,10 @@ func loadSenders(coreDB kv.Tx, toLoad map[uint64]string) (map[uint64]*senderInfo if err != nil { return nil, err } + if len(encoded) == 0 { + diff[id] = newSenderInfo(0, *uint256.NewInt(0)) + continue + } nonce, balance, err := DecodeSender(encoded) if err != nil { return nil, err @@ -586,7 +590,9 @@ func onNewBlock(senders *SendersCache, unwindTxs TxSlots, minedTxs []*TxSlot, pr localsHistory.Add(i.Tx.idHash, struct{}{}) } }) - //log.Info("remove mined", "removed", j, "minedTxsLen", len(minedTxs)) + if j > 0 { + log.Info("remove mined", "removed", j, "minedTxsLen", len(minedTxs)) + } // This can be thought of a reverse operation from the one described before. // When a block that was deemed "the best" of its height, is no longer deemed "the best", the @@ -654,13 +660,13 @@ func onNewBlock(senders *SendersCache, unwindTxs TxSlots, minedTxs []*TxSlot, pr func removeMined(senders *SendersCache, minedTxs []*TxSlot, pending, baseFee, queued *SubPool, discard func(tx *metaTx)) { for _, tx := range minedTxs { sender := senders.get(tx.senderID) - if sender.txNonce2Tx.Len() > 0 { - log.Debug("[txpool] removing mined", "senderID", tx.senderID, "sender.txNonce2Tx.len()", sender.txNonce2Tx.Len()) - } + //if sender.txNonce2Tx.Len() > 0 { + //log.Debug("[txpool] removing mined", "senderID", tx.senderID, "sender.txNonce2Tx.len()", sender.txNonce2Tx.Len()) + //} // delete mined transactions from everywhere sender.txNonce2Tx.Ascend(func(i btree.Item) bool { it := i.(*nonce2TxItem) - log.Debug("[txpool] removing mined, cmp nonces", "tx.nonce", it.metaTx.Tx.nonce, "sender.nonce", sender.nonce) + //log.Debug("[txpool] removing mined, cmp nonces", "tx.nonce", it.metaTx.Tx.nonce, "sender.nonce", sender.nonce) if it.metaTx.Tx.nonce > sender.nonce { return false } @@ -679,6 +685,7 @@ func removeMined(senders *SendersCache, minedTxs []*TxSlot, pending, baseFee, qu queued.UnsafeRemove(it.metaTx) discard(it.metaTx) default: + fmt.Printf("aaaaaaa\n") //already removed } return true