diff --git a/txpool/fetch.go b/txpool/fetch.go index af4a637f4..8de696dfa 100644 --- a/txpool/fetch.go +++ b/txpool/fetch.go @@ -300,11 +300,7 @@ func (f *Fetch) handleInboundMessage(ctx context.Context, req *sentry.InboundMes if len(txs.txs) == 0 { return nil } - if err := f.coreDB.View(ctx, func(tx kv.Tx) error { - return f.pool.Add(tx, txs, f.senders) - }); err != nil { - return err - } + return f.pool.Add(f.coreDB, txs, f.senders) default: //defer log.Info("dropped", "id", req.Id) } @@ -434,10 +430,7 @@ func (f *Fetch) handleStateChanges(ctx context.Context, client remote.KVClient) addr := gointerfaces.ConvertH160toAddress(change.Address) diff[string(addr[:])] = senderInfo{nonce: nonce, balance: balance} } - - if err := f.coreDB.View(ctx, func(tx kv.Tx) error { - return f.pool.OnNewBlock(tx, diff, unwindTxs, minedTxs, req.ProtocolBaseFee, 0, req.BlockHeight, f.senders) - }); err != nil { + if err := f.pool.OnNewBlock(diff, unwindTxs, minedTxs, req.ProtocolBaseFee, 0, req.BlockHeight, f.senders); err != nil { log.Warn("onNewBlock", "err", err) } if f.wg != nil { diff --git a/txpool/pool.go b/txpool/pool.go index 94456ac59..ed3f6837a 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -41,8 +41,8 @@ type Pool interface { IdHashKnown(hash []byte) bool Started() bool GetRlp(hash []byte) []byte - Add(db kv.Tx, newTxs TxSlots, senders *SendersCache) error - OnNewBlock(db kv.Tx, stateChanges map[string]senderInfo, unwindTxs, minedTxs TxSlots, protocolBaseFee, pendingBaseFee, blockHeight uint64, senders *SendersCache) error + Add(db kv.RoDB, newTxs TxSlots, senders *SendersCache) error + OnNewBlock(stateChanges map[string]senderInfo, unwindTxs, minedTxs TxSlots, protocolBaseFee, pendingBaseFee, blockHeight uint64, senders *SendersCache) error AddNewGoodPeer(peerID PeerID) } @@ -172,7 +172,7 @@ func (sc *SendersCache) evict() int { return count } -func (sc *SendersCache) onNewTxs(coreDBTx kv.Tx, newTxs TxSlots) error { +func (sc *SendersCache) onNewTxs(coreDBTx kv.RoDB, newTxs TxSlots) error { sc.ensureSenderIDOnNewTxs(newTxs) toLoad := sc.setTxSenderID(newTxs) if len(toLoad) == 0 { @@ -186,29 +186,33 @@ func (sc *SendersCache) onNewTxs(coreDBTx kv.Tx, newTxs TxSlots) error { return nil } -func (sc *SendersCache) onNewBlock(coreDBTx kv.Tx, stateChanges map[string]senderInfo, unwindTxs, minedTxs TxSlots, blockHeight uint64) error { +func (sc *SendersCache) onNewBlock(stateChanges map[string]senderInfo, unwindTxs, minedTxs TxSlots, blockHeight uint64) error { //TODO: if see non-continuous block heigh - drop cache and reload from db sc.blockHeight.Store(blockHeight) //`loadSenders` goes by network to core - and it must be outside of SendersCache lock. But other methods must be locked sc.mergeStateChanges(stateChanges, unwindTxs, minedTxs) toLoad := sc.setTxSenderID(unwindTxs) - if len(toLoad) > 0 { + /* + if len(toLoad) > 0 { + diff, err := loadSenders(coreDBTx, toLoad) + if err != nil { + return err + } + sc.set(diff) + } + */ + toLoad = sc.setTxSenderID(minedTxs) + if len(toLoad) == 0 { + return nil + } + /* diff, err := loadSenders(coreDBTx, toLoad) if err != nil { return err } sc.set(diff) - } - toLoad = sc.setTxSenderID(minedTxs) - if len(toLoad) == 0 { - return nil - } - diff, err := loadSenders(coreDBTx, toLoad) - if err != nil { - return err - } - sc.set(diff) + */ return nil } func (sc *SendersCache) set(diff map[uint64]senderInfo) { @@ -306,22 +310,27 @@ func (sc *SendersCache) setTxSenderID(txs TxSlots) map[uint64]string { return toLoad } -func loadSenders(coreDB kv.Tx, toLoad map[uint64]string) (map[uint64]senderInfo, error) { +func loadSenders(coreDB kv.RoDB, toLoad map[uint64]string) (map[uint64]senderInfo, error) { diff := make(map[uint64]senderInfo, len(toLoad)) - for id := range toLoad { - encoded, err := coreDB.GetOne(kv.PlainState, []byte(toLoad[id])) - if err != nil { - return nil, err + if err := coreDB.View(context.Background(), func(tx kv.Tx) error { + for id := range toLoad { + encoded, err := tx.GetOne(kv.PlainState, []byte(toLoad[id])) + if err != nil { + return err + } + if len(encoded) == 0 { + diff[id] = *newSenderInfo(0, *uint256.NewInt(0)) + continue + } + nonce, balance, err := DecodeSender(encoded) + if err != nil { + return err + } + diff[id] = *newSenderInfo(nonce, balance) } - if len(encoded) == 0 { - diff[id] = *newSenderInfo(0, *uint256.NewInt(0)) - continue - } - nonce, balance, err := DecodeSender(encoded) - if err != nil { - return nil, err - } - diff[id] = *newSenderInfo(nonce, balance) + return nil + }); err != nil { + return nil, err } return diff, nil } @@ -437,7 +446,7 @@ func (p *TxPool) Started() bool { return p.protocolBaseFee.Load() > 0 } -func (p *TxPool) Add(coreDB kv.Tx, newTxs TxSlots, senders *SendersCache) error { +func (p *TxPool) Add(coreDB kv.RoDB, newTxs TxSlots, senders *SendersCache) error { t := time.Now() if err := senders.onNewTxs(coreDB, newTxs); err != nil { return err @@ -537,9 +546,9 @@ func (p *TxPool) setBaseFee(protocolBaseFee, pendingBaseFee uint64) (uint64, uin return protocolBaseFee, p.pendingBaseFee.Load() } -func (p *TxPool) OnNewBlock(coreDB kv.Tx, stateChanges map[string]senderInfo, unwindTxs, minedTxs TxSlots, protocolBaseFee, pendingBaseFee, blockHeight uint64, senders *SendersCache) error { +func (p *TxPool) OnNewBlock(stateChanges map[string]senderInfo, unwindTxs, minedTxs TxSlots, protocolBaseFee, pendingBaseFee, blockHeight uint64, senders *SendersCache) error { t := time.Now() - if err := senders.onNewBlock(coreDB, stateChanges, unwindTxs, minedTxs, blockHeight); err != nil { + if err := senders.onNewBlock(stateChanges, unwindTxs, minedTxs, blockHeight); err != nil { return err } //log.Debug("[txpool] new block", "unwinded", len(unwindTxs.txs), "mined", len(minedTxs.txs), "protocolBaseFee", protocolBaseFee, "blockHeight", blockHeight) diff --git a/txpool/pool_fuzz_test.go b/txpool/pool_fuzz_test.go index b4df7abb2..0f4f796bd 100644 --- a/txpool/pool_fuzz_test.go +++ b/txpool/pool_fuzz_test.go @@ -434,13 +434,13 @@ func FuzzOnNewBlocks11(f *testing.F) { // go to first fork //fmt.Printf("ll1: %d,%d,%d\n", pool.pending.Len(), pool.baseFee.Len(), pool.queued.Len()) unwindTxs, minedTxs1, p2pReceived, minedTxs2 := splitDataset(txs) - err = pool.OnNewBlock(nil, map[string]senderInfo{}, unwindTxs, minedTxs1, protocolBaseFee, pendingBaseFee, 1, sendersCache) + err = pool.OnNewBlock(map[string]senderInfo{}, unwindTxs, minedTxs1, protocolBaseFee, pendingBaseFee, 1, sendersCache) assert.NoError(err) check(unwindTxs, minedTxs1, "fork1") checkNotify(unwindTxs, minedTxs1, "fork1") // unwind everything and switch to new fork (need unwind mined now) - err = pool.OnNewBlock(nil, map[string]senderInfo{}, minedTxs1, minedTxs2, protocolBaseFee, pendingBaseFee, 2, sendersCache) + err = pool.OnNewBlock(map[string]senderInfo{}, minedTxs1, minedTxs2, protocolBaseFee, pendingBaseFee, 2, sendersCache) assert.NoError(err) check(minedTxs1, minedTxs2, "fork2") checkNotify(minedTxs1, minedTxs2, "fork2")