diff --git a/txpool/pool.go b/txpool/pool.go index 7a01d1157..94456ac59 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -89,9 +89,9 @@ const PendingSubPool SubPoolType = 1 const BaseFeeSubPool SubPoolType = 2 const QueuedSubPool SubPoolType = 3 -const PendingSubPoolLimit = 1024 -const BaseFeeSubPoolLimit = 1024 -const QueuedSubPoolLimit = 1024 +const PendingSubPoolLimit = 10 * 1024 +const BaseFeeSubPoolLimit = 10 * 1024 +const QueuedSubPoolLimit = 10 * 1024 const MaxSendersInfoCache = 2 * (PendingSubPoolLimit + BaseFeeSubPoolLimit + QueuedSubPoolLimit) @@ -175,6 +175,9 @@ func (sc *SendersCache) evict() int { func (sc *SendersCache) onNewTxs(coreDBTx kv.Tx, newTxs TxSlots) error { sc.ensureSenderIDOnNewTxs(newTxs) toLoad := sc.setTxSenderID(newTxs) + if len(toLoad) == 0 { + return nil + } diff, err := loadSenders(coreDBTx, toLoad) if err != nil { return err @@ -190,34 +193,53 @@ func (sc *SendersCache) onNewBlock(coreDBTx kv.Tx, stateChanges map[string]sende //`loadSenders` goes by network to core - and it must be outside of SendersCache lock. But other methods must be locked sc.mergeStateChanges(stateChanges, unwindTxs, minedTxs) toLoad := sc.setTxSenderID(unwindTxs) - diff, err := loadSenders(coreDBTx, toLoad) - if err != nil { - return err + if len(toLoad) > 0 { + diff, err := loadSenders(coreDBTx, toLoad) + if err != nil { + return err + } + sc.set(diff) } - sc.set(diff) toLoad = sc.setTxSenderID(minedTxs) - diff, err = loadSenders(coreDBTx, toLoad) + if len(toLoad) == 0 { + return nil + } + diff, err := loadSenders(coreDBTx, toLoad) if err != nil { return err } sc.set(diff) return nil } -func (sc *SendersCache) set(diff map[uint64]*senderInfo) { +func (sc *SendersCache) set(diff map[uint64]senderInfo) { sc.lock.Lock() defer sc.lock.Unlock() for id := range diff { // merge state changes - sc.senderInfo[id] = diff[id] + a := diff[id] + sc.senderInfo[id] = &a } } func (sc *SendersCache) mergeStateChanges(stateChanges map[string]senderInfo, unwindedTxs, minedTxs TxSlots) { sc.lock.Lock() defer sc.lock.Unlock() - for addr, id := range sc.senderIDs { // merge state changes - if v, ok := stateChanges[addr]; ok { - sc.senderInfo[id] = newSenderInfo(v.nonce, v.balance) + for addr, v := range stateChanges { // merge state changes + id, ok := sc.senderIDs[addr] + if !ok { + sc.senderID++ + id = sc.senderID + sc.senderIDs[addr] = id } + sc.senderInfo[id] = newSenderInfo(v.nonce, v.balance) } + + /* + for addr, id := range sc.senderIDs { // merge state changes + if v, ok := stateChanges[addr]; ok { + sc.senderInfo[id] = newSenderInfo(v.nonce, v.balance) + } + } + */ + for i := 0; i < unwindedTxs.senders.Len(); i++ { id, ok := sc.senderIDs[string(unwindedTxs.senders.At(i))] if !ok { @@ -225,8 +247,8 @@ func (sc *SendersCache) mergeStateChanges(stateChanges map[string]senderInfo, un id = sc.senderID sc.senderIDs[string(unwindedTxs.senders.At(i))] = id } - if v, ok := stateChanges[string(unwindedTxs.senders.At(i))]; ok { - sc.senderInfo[id] = newSenderInfo(v.nonce, v.balance) + if _, ok := stateChanges[string(unwindedTxs.senders.At(i))]; !ok { + sc.senderInfo[id] = newSenderInfo(0, *uint256.NewInt(0)) } } @@ -237,9 +259,13 @@ func (sc *SendersCache) mergeStateChanges(stateChanges map[string]senderInfo, un id = sc.senderID sc.senderIDs[string(minedTxs.senders.At(i))] = id } - if v, ok := stateChanges[string(minedTxs.senders.At(i))]; ok { - sc.senderInfo[id] = newSenderInfo(v.nonce, v.balance) + if _, ok := stateChanges[string(minedTxs.senders.At(i))]; !ok { + sc.senderInfo[id] = newSenderInfo(0, *uint256.NewInt(0)) } + + //if v, ok := stateChanges[string(minedTxs.senders.At(i))]; ok { + // sc.senderInfo[id] = newSenderInfo(v.nonce, v.balance) + //} } } @@ -280,22 +306,22 @@ func (sc *SendersCache) setTxSenderID(txs TxSlots) map[uint64]string { return toLoad } -func loadSenders(coreDB kv.Tx, toLoad map[uint64]string) (map[uint64]*senderInfo, error) { - diff := make(map[uint64]*senderInfo, len(toLoad)) +func loadSenders(coreDB kv.Tx, toLoad map[uint64]string) (map[uint64]senderInfo, error) { + diff := make(map[uint64]senderInfo, len(toLoad)) for id := range toLoad { encoded, err := coreDB.GetOne(kv.PlainState, []byte(toLoad[id])) if err != nil { return nil, err } if len(encoded) == 0 { - diff[id] = newSenderInfo(0, *uint256.NewInt(0)) + diff[id] = *newSenderInfo(0, *uint256.NewInt(0)) continue } nonce, balance, err := DecodeSender(encoded) if err != nil { return nil, err } - diff[id] = newSenderInfo(nonce, balance) + diff[id] = *newSenderInfo(nonce, balance) } return diff, nil } @@ -412,6 +438,7 @@ func (p *TxPool) Started() bool { } func (p *TxPool) Add(coreDB kv.Tx, newTxs TxSlots, senders *SendersCache) error { + t := time.Now() if err := senders.onNewTxs(coreDB, newTxs); err != nil { return err } @@ -423,7 +450,6 @@ func (p *TxPool) Add(coreDB kv.Tx, newTxs TxSlots, senders *SendersCache) error if protocolBaseFee == 0 || pendingBaseFee == 0 { return fmt.Errorf("non-zero base fee: %d,%d", protocolBaseFee, pendingBaseFee) } - p.lock.Lock() defer p.lock.Unlock() if err := onNewTxs(senders, newTxs, protocolBaseFee, pendingBaseFee, p.pending, p.baseFee, p.queued, p.byHash, p.localsHistory); err != nil { @@ -444,6 +470,7 @@ func (p *TxPool) Add(coreDB kv.Tx, newTxs TxSlots, senders *SendersCache) error } } + log.Info("on new txs", "in", time.Since(t)) return nil } func onNewTxs(senders *SendersCache, newTxs TxSlots, protocolBaseFee, pendingBaseFee uint64, pending, baseFee, queued *SubPool, byHash map[string]*metaTx, localsHistory *simplelru.LRU) error { @@ -511,11 +538,11 @@ func (p *TxPool) setBaseFee(protocolBaseFee, pendingBaseFee uint64) (uint64, uin } func (p *TxPool) OnNewBlock(coreDB kv.Tx, stateChanges map[string]senderInfo, unwindTxs, minedTxs TxSlots, protocolBaseFee, pendingBaseFee, blockHeight uint64, senders *SendersCache) error { + t := time.Now() if err := senders.onNewBlock(coreDB, stateChanges, unwindTxs, minedTxs, blockHeight); err != nil { return err } - log.Debug("[txpool] new block", "unwinded", len(unwindTxs.txs), "mined", len(minedTxs.txs), "protocolBaseFee", protocolBaseFee, "blockHeight", blockHeight) - protocolBaseFee, pendingBaseFee = p.setBaseFee(protocolBaseFee, pendingBaseFee) + //log.Debug("[txpool] new block", "unwinded", len(unwindTxs.txs), "mined", len(minedTxs.txs), "protocolBaseFee", protocolBaseFee, "blockHeight", blockHeight) if err := unwindTxs.Valid(); err != nil { return err } @@ -543,12 +570,13 @@ func (p *TxPool) OnNewBlock(coreDB kv.Tx, stateChanges map[string]senderInfo, un default: } } + //count := senders.evict() + //if count > 0 { + // log.Debug("evicted senders", "amount", count) + //} - count := senders.evict() - if count > 0 { - log.Debug("evicted senders", "amount", count) - } - + protocolBaseFee, pendingBaseFee = p.setBaseFee(protocolBaseFee, pendingBaseFee) + log.Info("on new block", "in", time.Since(t)) return nil } func (p *TxPool) flushIsLocalHistory(tx kv.RwTx) error { @@ -1027,6 +1055,7 @@ func BroadcastLoop(ctx context.Context, db kv.RwDB, p *TxPool, senders *SendersC return case <-logEvery.C: p.logStats() + log.Info("cache", "size", senders.len()) case <-evictSendersEvery.C: if db != nil { if err := db.Update(ctx, func(tx kv.RwTx) error {