This commit is contained in:
alex.sharov 2021-08-21 17:17:43 +07:00
parent 7510505f39
commit 91f148bd1f

View File

@ -89,9 +89,9 @@ const PendingSubPool SubPoolType = 1
const BaseFeeSubPool SubPoolType = 2
const QueuedSubPool SubPoolType = 3
const PendingSubPoolLimit = 1024
const BaseFeeSubPoolLimit = 1024
const QueuedSubPoolLimit = 1024
const PendingSubPoolLimit = 10 * 1024
const BaseFeeSubPoolLimit = 10 * 1024
const QueuedSubPoolLimit = 10 * 1024
const MaxSendersInfoCache = 2 * (PendingSubPoolLimit + BaseFeeSubPoolLimit + QueuedSubPoolLimit)
@ -175,6 +175,9 @@ func (sc *SendersCache) evict() int {
func (sc *SendersCache) onNewTxs(coreDBTx kv.Tx, newTxs TxSlots) error {
sc.ensureSenderIDOnNewTxs(newTxs)
toLoad := sc.setTxSenderID(newTxs)
if len(toLoad) == 0 {
return nil
}
diff, err := loadSenders(coreDBTx, toLoad)
if err != nil {
return err
@ -190,34 +193,53 @@ func (sc *SendersCache) onNewBlock(coreDBTx kv.Tx, stateChanges map[string]sende
//`loadSenders` goes by network to core - and it must be outside of SendersCache lock. But other methods must be locked
sc.mergeStateChanges(stateChanges, unwindTxs, minedTxs)
toLoad := sc.setTxSenderID(unwindTxs)
diff, err := loadSenders(coreDBTx, toLoad)
if err != nil {
return err
if len(toLoad) > 0 {
diff, err := loadSenders(coreDBTx, toLoad)
if err != nil {
return err
}
sc.set(diff)
}
sc.set(diff)
toLoad = sc.setTxSenderID(minedTxs)
diff, err = loadSenders(coreDBTx, toLoad)
if len(toLoad) == 0 {
return nil
}
diff, err := loadSenders(coreDBTx, toLoad)
if err != nil {
return err
}
sc.set(diff)
return nil
}
func (sc *SendersCache) set(diff map[uint64]*senderInfo) {
func (sc *SendersCache) set(diff map[uint64]senderInfo) {
sc.lock.Lock()
defer sc.lock.Unlock()
for id := range diff { // merge state changes
sc.senderInfo[id] = diff[id]
a := diff[id]
sc.senderInfo[id] = &a
}
}
func (sc *SendersCache) mergeStateChanges(stateChanges map[string]senderInfo, unwindedTxs, minedTxs TxSlots) {
sc.lock.Lock()
defer sc.lock.Unlock()
for addr, id := range sc.senderIDs { // merge state changes
if v, ok := stateChanges[addr]; ok {
sc.senderInfo[id] = newSenderInfo(v.nonce, v.balance)
for addr, v := range stateChanges { // merge state changes
id, ok := sc.senderIDs[addr]
if !ok {
sc.senderID++
id = sc.senderID
sc.senderIDs[addr] = id
}
sc.senderInfo[id] = newSenderInfo(v.nonce, v.balance)
}
/*
for addr, id := range sc.senderIDs { // merge state changes
if v, ok := stateChanges[addr]; ok {
sc.senderInfo[id] = newSenderInfo(v.nonce, v.balance)
}
}
*/
for i := 0; i < unwindedTxs.senders.Len(); i++ {
id, ok := sc.senderIDs[string(unwindedTxs.senders.At(i))]
if !ok {
@ -225,8 +247,8 @@ func (sc *SendersCache) mergeStateChanges(stateChanges map[string]senderInfo, un
id = sc.senderID
sc.senderIDs[string(unwindedTxs.senders.At(i))] = id
}
if v, ok := stateChanges[string(unwindedTxs.senders.At(i))]; ok {
sc.senderInfo[id] = newSenderInfo(v.nonce, v.balance)
if _, ok := stateChanges[string(unwindedTxs.senders.At(i))]; !ok {
sc.senderInfo[id] = newSenderInfo(0, *uint256.NewInt(0))
}
}
@ -237,9 +259,13 @@ func (sc *SendersCache) mergeStateChanges(stateChanges map[string]senderInfo, un
id = sc.senderID
sc.senderIDs[string(minedTxs.senders.At(i))] = id
}
if v, ok := stateChanges[string(minedTxs.senders.At(i))]; ok {
sc.senderInfo[id] = newSenderInfo(v.nonce, v.balance)
if _, ok := stateChanges[string(minedTxs.senders.At(i))]; !ok {
sc.senderInfo[id] = newSenderInfo(0, *uint256.NewInt(0))
}
//if v, ok := stateChanges[string(minedTxs.senders.At(i))]; ok {
// sc.senderInfo[id] = newSenderInfo(v.nonce, v.balance)
//}
}
}
@ -280,22 +306,22 @@ func (sc *SendersCache) setTxSenderID(txs TxSlots) map[uint64]string {
return toLoad
}
func loadSenders(coreDB kv.Tx, toLoad map[uint64]string) (map[uint64]*senderInfo, error) {
diff := make(map[uint64]*senderInfo, len(toLoad))
func loadSenders(coreDB kv.Tx, toLoad map[uint64]string) (map[uint64]senderInfo, error) {
diff := make(map[uint64]senderInfo, len(toLoad))
for id := range toLoad {
encoded, err := coreDB.GetOne(kv.PlainState, []byte(toLoad[id]))
if err != nil {
return nil, err
}
if len(encoded) == 0 {
diff[id] = newSenderInfo(0, *uint256.NewInt(0))
diff[id] = *newSenderInfo(0, *uint256.NewInt(0))
continue
}
nonce, balance, err := DecodeSender(encoded)
if err != nil {
return nil, err
}
diff[id] = newSenderInfo(nonce, balance)
diff[id] = *newSenderInfo(nonce, balance)
}
return diff, nil
}
@ -412,6 +438,7 @@ func (p *TxPool) Started() bool {
}
func (p *TxPool) Add(coreDB kv.Tx, newTxs TxSlots, senders *SendersCache) error {
t := time.Now()
if err := senders.onNewTxs(coreDB, newTxs); err != nil {
return err
}
@ -423,7 +450,6 @@ func (p *TxPool) Add(coreDB kv.Tx, newTxs TxSlots, senders *SendersCache) error
if protocolBaseFee == 0 || pendingBaseFee == 0 {
return fmt.Errorf("non-zero base fee: %d,%d", protocolBaseFee, pendingBaseFee)
}
p.lock.Lock()
defer p.lock.Unlock()
if err := onNewTxs(senders, newTxs, protocolBaseFee, pendingBaseFee, p.pending, p.baseFee, p.queued, p.byHash, p.localsHistory); err != nil {
@ -444,6 +470,7 @@ func (p *TxPool) Add(coreDB kv.Tx, newTxs TxSlots, senders *SendersCache) error
}
}
log.Info("on new txs", "in", time.Since(t))
return nil
}
func onNewTxs(senders *SendersCache, newTxs TxSlots, protocolBaseFee, pendingBaseFee uint64, pending, baseFee, queued *SubPool, byHash map[string]*metaTx, localsHistory *simplelru.LRU) error {
@ -511,11 +538,11 @@ func (p *TxPool) setBaseFee(protocolBaseFee, pendingBaseFee uint64) (uint64, uin
}
func (p *TxPool) OnNewBlock(coreDB kv.Tx, stateChanges map[string]senderInfo, unwindTxs, minedTxs TxSlots, protocolBaseFee, pendingBaseFee, blockHeight uint64, senders *SendersCache) error {
t := time.Now()
if err := senders.onNewBlock(coreDB, stateChanges, unwindTxs, minedTxs, blockHeight); err != nil {
return err
}
log.Debug("[txpool] new block", "unwinded", len(unwindTxs.txs), "mined", len(minedTxs.txs), "protocolBaseFee", protocolBaseFee, "blockHeight", blockHeight)
protocolBaseFee, pendingBaseFee = p.setBaseFee(protocolBaseFee, pendingBaseFee)
//log.Debug("[txpool] new block", "unwinded", len(unwindTxs.txs), "mined", len(minedTxs.txs), "protocolBaseFee", protocolBaseFee, "blockHeight", blockHeight)
if err := unwindTxs.Valid(); err != nil {
return err
}
@ -543,12 +570,13 @@ func (p *TxPool) OnNewBlock(coreDB kv.Tx, stateChanges map[string]senderInfo, un
default:
}
}
//count := senders.evict()
//if count > 0 {
// log.Debug("evicted senders", "amount", count)
//}
count := senders.evict()
if count > 0 {
log.Debug("evicted senders", "amount", count)
}
protocolBaseFee, pendingBaseFee = p.setBaseFee(protocolBaseFee, pendingBaseFee)
log.Info("on new block", "in", time.Since(t))
return nil
}
func (p *TxPool) flushIsLocalHistory(tx kv.RwTx) error {
@ -1027,6 +1055,7 @@ func BroadcastLoop(ctx context.Context, db kv.RwDB, p *TxPool, senders *SendersC
return
case <-logEvery.C:
p.logStats()
log.Info("cache", "size", senders.len())
case <-evictSendersEvery.C:
if db != nil {
if err := db.Update(ctx, func(tx kv.RwTx) error {