Merge pull request #35 from ledgerwatch/pool19

Pool: accept state changes
This commit is contained in:
Alex Sharov 2021-08-17 15:51:27 +07:00 committed by GitHub
commit 67472f8341
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 40 additions and 24 deletions

View File

@ -414,6 +414,8 @@ var ChaindataTablesCfg = TableCfg{
},
}
var TxpoolTablesCfg = TableCfg{}
func sortBuckets() {
sort.SliceStable(ChaindataTables, func(i, j int) bool {
return strings.Compare(ChaindataTables[i], ChaindataTables[j]) < 0
@ -443,4 +445,12 @@ func reinit() {
tmp.IsDeprecated = true
ChaindataTablesCfg[name] = tmp
}
for _, name := range TxPoolTables {
_, ok := TxpoolTablesCfg[name]
if !ok {
TxpoolTablesCfg[name] = TableCfgItem{}
}
}
}

View File

@ -150,6 +150,7 @@ func New(newTxs chan Hashes, db kv.RwDB) (*TxPool, error) {
}
return &TxPool{
lock: &sync.RWMutex{},
senderIDs: map[string]uint64{},
senderInfo: map[uint64]*senderInfo{},
byHash: map[string]*metaTx{},
localsHistory: localsHistory,
@ -159,13 +160,15 @@ func New(newTxs chan Hashes, db kv.RwDB) (*TxPool, error) {
queued: NewSubPool(),
newTxs: newTxs,
db: db,
senderID: 1,
}, nil
}
func (p *TxPool) logStats() {
p.lock.RLock()
defer p.lock.RUnlock()
log.Info(fmt.Sprintf("[txpool] queues size: pending=%d/%d, baseFee=%d/%d, queued=%d/%d", p.pending.Len(), PendingSubPoolLimit, p.baseFee.Len(), BaseFeeSubPoolLimit, p.pending.Len(), PendingSubPoolLimit))
protocolBaseFee, pendingBaseFee := p.protocolBaseFee.Load(), p.pendingBaseFee.Load()
log.Info(fmt.Sprintf("[txpool] baseFee: protocol=%d,pending=%d; queues size: pending=%d/%d, baseFee=%d/%d, queued=%d/%d", protocolBaseFee, pendingBaseFee, p.pending.Len(), PendingSubPoolLimit, p.baseFee.Len(), BaseFeeSubPoolLimit, p.queued.Len(), QueuedSubPoolLimit))
}
func (p *TxPool) GetRlp(hash []byte) []byte {
p.lock.RLock()
@ -177,34 +180,33 @@ func (p *TxPool) GetRlp(hash []byte) []byte {
}
return txn.Tx.rlp
}
func (p *TxPool) AppendLocalHashes(buf []byte) {
func (p *TxPool) AppendLocalHashes(buf []byte) []byte {
p.lock.RLock()
defer p.lock.RUnlock()
i := 0
for hash, txn := range p.byHash {
if txn.subPool&IsLocal == 0 {
continue
}
copy(buf[i*32:(i+1)*32], hash)
i++
buf = append(buf, hash...)
}
return buf
}
func (p *TxPool) AppendRemoteHashes(buf []byte) {
func (p *TxPool) AppendRemoteHashes(buf []byte) []byte {
p.lock.RLock()
defer p.lock.RUnlock()
i := 0
for hash, txn := range p.byHash {
if txn.subPool&IsLocal != 0 {
continue
}
copy(buf[i*32:(i+1)*32], hash)
i++
buf = append(buf, hash...)
}
return buf
}
func (p *TxPool) AppendAllHashes(buf []byte) {
p.AppendLocalHashes(buf)
p.AppendRemoteHashes(buf[len(buf):])
func (p *TxPool) AppendAllHashes(buf []byte) []byte {
buf = p.AppendLocalHashes(buf)
buf = p.AppendRemoteHashes(buf)
return buf
}
func (p *TxPool) IdHashKnown(hash []byte) bool {
p.lock.RLock()
@ -230,7 +232,7 @@ func (p *TxPool) Started() bool {
defer p.lock.Unlock()
protocolBaseFee := p.protocolBaseFee.Load()
return protocolBaseFee == 0
return protocolBaseFee > 0
}
func (p *TxPool) Add(coreDB kv.Tx, newTxs TxSlots) error {
@ -243,7 +245,7 @@ func (p *TxPool) Add(coreDB kv.Tx, newTxs TxSlots) error {
protocolBaseFee, pendingBaseFee := p.protocolBaseFee.Load(), p.pendingBaseFee.Load()
if protocolBaseFee == 0 || pendingBaseFee == 0 {
return fmt.Errorf("non-zero base fee")
return fmt.Errorf("non-zero base fee: %d,%d", protocolBaseFee, pendingBaseFee)
}
if err := setTxSenderID(coreDB, &p.senderID, p.senderIDs, p.senderInfo, newTxs); err != nil {
@ -326,10 +328,12 @@ func (p *TxPool) setBaseFee(protocolBaseFee, pendingBaseFee uint64) (uint64, uin
hasNewVal := pendingBaseFee > 0
if pendingBaseFee < protocolBaseFee {
pendingBaseFee = protocolBaseFee
hasNewVal = true
}
if hasNewVal {
p.protocolBaseFee.Store(pendingBaseFee)
p.pendingBaseFee.Store(pendingBaseFee)
}
log.Debug("set base fee", "protocol", protocolBaseFee, "pending", pendingBaseFee)
return protocolBaseFee, p.pendingBaseFee.Load()
}
@ -337,6 +341,8 @@ func (p *TxPool) OnNewBlock(coreDB kv.Tx, stateChanges map[string]senderInfo, un
log.Debug("[txpool.onNewBlock]", "unwinded", len(unwindTxs.txs), "mined", len(minedTxs.txs), "protocolBaseFee", protocolBaseFee, "blockHeight", blockHeight)
p.lock.Lock()
defer p.lock.Unlock()
p.blockHeight.Store(blockHeight)
protocolBaseFee, pendingBaseFee = p.setBaseFee(protocolBaseFee, pendingBaseFee)
if err := unwindTxs.Valid(); err != nil {
return err
}
@ -344,9 +350,6 @@ func (p *TxPool) OnNewBlock(coreDB kv.Tx, stateChanges map[string]senderInfo, un
return err
}
p.blockHeight.Store(blockHeight)
protocolBaseFee, pendingBaseFee = p.setBaseFee(protocolBaseFee, pendingBaseFee)
if err := setTxSenderID(coreDB, &p.senderID, p.senderIDs, p.senderInfo, unwindTxs); err != nil {
return err
}
@ -355,7 +358,7 @@ func (p *TxPool) OnNewBlock(coreDB kv.Tx, stateChanges map[string]senderInfo, un
}
for addr, id := range p.senderIDs { // merge state changes
if v, ok := stateChanges[addr]; ok {
p.senderInfo[id] = &v
p.senderInfo[id] = newSenderInfo(v.nonce, v.balance)
}
}
@ -410,7 +413,8 @@ func setTxSenderID(coreDB kv.Tx, senderIDSequence *uint64, senderIDs map[string]
id, ok := senderIDs[addr]
if !ok {
*senderIDSequence++
senderIDs[addr] = *senderIDSequence
id = *senderIDSequence
senderIDs[addr] = id
}
txs.txs[i].senderID = id
@ -425,7 +429,7 @@ func setTxSenderID(coreDB kv.Tx, senderIDSequence *uint64, senderIDs map[string]
if err != nil {
return err
}
sendersInfo[txs.txs[i].senderID] = &senderInfo{nonce: nonce, balance: balance}
sendersInfo[txs.txs[i].senderID] = newSenderInfo(nonce, balance)
}
}
return nil
@ -443,7 +447,9 @@ func onNewBlock(senderInfo map[uint64]*senderInfo, unwindTxs TxSlots, minedTxs [
}
}
j := 0
removeMined(senderInfo, minedTxs, pending, baseFee, queued, func(i *metaTx) {
j++
delete(byHash, string(i.Tx.idHash[:]))
senderInfo[i.Tx.senderID].txNonce2Tx.Delete(&nonce2TxItem{i})
if i.subPool&IsLocal != 0 {
@ -451,6 +457,7 @@ func onNewBlock(senderInfo map[uint64]*senderInfo, unwindTxs TxSlots, minedTxs [
localsHistory.Add(i.Tx.idHash, struct{}{})
}
})
log.Info("remove mined", "removed", j, "minedTxsLen", len(minedTxs))
// This can be thought of a reverse operation from the one described before.
// When a block that was deemed "the best" of its height, is no longer deemed "the best", the
@ -524,10 +531,10 @@ func removeMined(senderInfo map[uint64]*senderInfo, minedTxs []*TxSlot, pending,
// delete mined transactions from everywhere
sender.txNonce2Tx.Ascend(func(i btree.Item) bool {
it := i.(*nonce2TxItem)
fmt.Printf("nonce cmp: %d,%d, senderID=%d\n", it.metaTx.Tx.nonce, sender.nonce, tx.senderID)
if it.metaTx.Tx.nonce > sender.nonce {
return false
}
// TODO: save local transactions to cache with TTL, in case of re-org - to restore isLocal flag of re-injected transactions
// del from nonce2tx mapping
sender.txNonce2Tx.Delete(i)
@ -624,7 +631,6 @@ func onSenderChange(sender *senderInfo, protocolBaseFee, pendingBaseFee uint64)
// baseFee of the currently pending block. Set to 0 otherwise.
it.metaTx.subPool &^= EnoughFeeCapBlock
if it.metaTx.Tx.feeCap >= pendingBaseFee {
fmt.Printf("setttttt: %d,%d,%d\n", protocolBaseFee, pendingBaseFee, it.metaTx.Tx.feeCap)
it.metaTx.subPool |= EnoughFeeCapBlock
}
@ -904,7 +910,7 @@ func BroadcastLoop(ctx context.Context, p *TxPool, newTxs chan Hashes, send *Sen
if len(newPeers) == 0 {
continue
}
p.AppendAllHashes(remoteTxHashes[:0])
remoteTxHashes = p.AppendAllHashes(remoteTxHashes[:0])
send.PropagatePooledTxsToPeersList(newPeers, remoteTxHashes)
}
}