This commit is contained in:
alex.sharov 2021-08-20 18:16:33 +07:00
parent 0d2b0c494f
commit d77258ef47
2 changed files with 51 additions and 46 deletions

View File

@ -177,18 +177,16 @@ func (f *Fetch) receiveMessage(ctx context.Context, sentryClient sentry.SentryCl
if req == nil {
return nil
}
go func(req *sentry.InboundMessage) {
if err := f.handleInboundMessage(streamCtx, req, sentryClient); err != nil {
s, ok := status.FromError(err)
doLog := !((ok && s.Code() == codes.Canceled) || errors.Is(err, io.EOF) || errors.Is(err, context.Canceled))
if doLog {
log.Warn("Handling incoming message", "err", err)
}
if err := f.handleInboundMessage(streamCtx, req, sentryClient); err != nil {
s, ok := status.FromError(err)
doLog := !((ok && s.Code() == codes.Canceled) || errors.Is(err, io.EOF) || errors.Is(err, context.Canceled))
if doLog {
log.Warn("Handling incoming message", "err", err)
}
if f.wg != nil {
f.wg.Done()
}
}(req)
}
if f.wg != nil {
f.wg.Done()
}
}
}

View File

@ -173,7 +173,8 @@ func (sc *SendersCache) evict() int {
func (sc *SendersCache) onNewTxs(coreDBTx kv.Tx, newTxs TxSlots) error {
sc.ensureSenderIDOnNewTxs(newTxs)
diff, err := sc.setTxSenderID(coreDBTx, newTxs)
toLoad := sc.setTxSenderID(newTxs)
diff, err := loadSenders(coreDBTx, toLoad)
if err != nil {
return err
}
@ -183,12 +184,14 @@ func (sc *SendersCache) onNewTxs(coreDBTx kv.Tx, newTxs TxSlots) error {
func (sc *SendersCache) onNewBlock(coreDBTx kv.Tx, stateChanges map[string]senderInfo, unwindTxs, minedTxs TxSlots) error {
sc.mergeStateChanges(stateChanges, unwindTxs, minedTxs)
diff, err := sc.setTxSenderID(coreDBTx, unwindTxs)
toLoad := sc.setTxSenderID(unwindTxs)
diff, err := loadSenders(coreDBTx, toLoad)
if err != nil {
return err
}
sc.set(diff)
diff, err = sc.setTxSenderID(coreDBTx, minedTxs)
toLoad = sc.setTxSenderID(minedTxs)
diff, err = loadSenders(coreDBTx, toLoad)
if err != nil {
return err
}
@ -260,10 +263,10 @@ func (sc *SendersCache) ensureSenderIDOnNewTxs(newTxs TxSlots) {
}
}
func (sc *SendersCache) setTxSenderID(coreDB kv.Tx, txs TxSlots) (map[uint64]*senderInfo, error) {
func (sc *SendersCache) setTxSenderID(txs TxSlots) map[uint64]string {
sc.lock.RLock()
defer sc.lock.RUnlock()
diff := map[uint64]*senderInfo{}
toLoad := map[uint64]string{}
for i := range txs.txs {
addr := string(txs.senders.At(i))
@ -272,20 +275,30 @@ func (sc *SendersCache) setTxSenderID(coreDB kv.Tx, txs TxSlots) (map[uint64]*se
// load data from db if need
_, ok := sc.senderInfo[txs.txs[i].senderID]
if !ok {
_, ok = diff[txs.txs[i].senderID]
if !ok {
encoded, err := coreDB.GetOne(kv.PlainState, txs.senders.At(i))
if err != nil {
return nil, err
}
nonce, balance, err := DecodeSender(encoded)
if err != nil {
return nil, err
}
diff[txs.txs[i].senderID] = newSenderInfo(nonce, balance)
}
if ok {
continue
}
_, ok = toLoad[txs.txs[i].senderID]
if ok {
continue
}
toLoad[txs.txs[i].senderID] = addr
}
return toLoad
}
func loadSenders(coreDB kv.Tx, toLoad map[uint64]string) (map[uint64]*senderInfo, error) {
diff := make(map[uint64]*senderInfo, len(toLoad))
for id := range toLoad {
encoded, err := coreDB.GetOne(kv.PlainState, []byte(toLoad[id]))
if err != nil {
return nil, err
}
nonce, balance, err := DecodeSender(encoded)
if err != nil {
return nil, err
}
diff[id] = newSenderInfo(nonce, balance)
}
return diff, nil
}
@ -337,9 +350,10 @@ func New(newTxs chan Hashes, db kv.RwDB) (*TxPool, error) {
}
func (p *TxPool) logStats() {
protocolBaseFee, pendingBaseFee := p.protocolBaseFee.Load(), p.pendingBaseFee.Load()
p.lock.RLock()
defer p.lock.RUnlock()
protocolBaseFee, pendingBaseFee := p.protocolBaseFee.Load(), p.pendingBaseFee.Load()
log.Info(fmt.Sprintf("[txpool] baseFee: protocol=%d,pending=%d; queues size: pending=%d/%d, baseFee=%d/%d, queued=%d/%d", protocolBaseFee, pendingBaseFee, p.pending.Len(), PendingSubPoolLimit, p.baseFee.Len(), BaseFeeSubPoolLimit, p.queued.Len(), QueuedSubPoolLimit))
}
func (p *TxPool) GetRlp(hash []byte) []byte {
@ -421,7 +435,6 @@ func (p *TxPool) Add(coreDB kv.Tx, newTxs TxSlots, senders *SendersCache) error
if err := onNewTxs(senders, newTxs, protocolBaseFee, pendingBaseFee, p.pending, p.baseFee, p.queued, p.byHash, p.localsHistory); err != nil {
return err
}
fmt.Printf("len1: %d, %d\n", len(p.byHash), len(newTxs.txs))
notifyNewTxs := make(Hashes, 0, 32*len(newTxs.txs))
for i := range newTxs.txs {
_, ok := p.byHash[string(newTxs.txs[i].idHash[:])]
@ -440,20 +453,19 @@ func (p *TxPool) Add(coreDB kv.Tx, newTxs TxSlots, senders *SendersCache) error
return nil
}
func onNewTxs(senders *SendersCache, newTxs TxSlots, protocolBaseFee, pendingBaseFee uint64, pending, baseFee, queued *SubPool, byHash map[string]*metaTx, localsHistory *simplelru.LRU) error {
defer func(t time.Time) { fmt.Printf("pool.go:444: %s\n", time.Since(t)) }(time.Now())
for i := range newTxs.txs {
if newTxs.txs[i].senderID == 0 {
return fmt.Errorf("senderID can't be zero")
}
}
unsafeAddToPool(senders, newTxs, pending, PendingSubPool, func(i *metaTx) {
unsafeAddToPool(senders, newTxs, pending, PendingSubPool, func(i *metaTx, sender *senderInfo) {
if _, ok := localsHistory.Get(i.Tx.idHash); ok {
//TODO: also check if sender is in list of local-senders
i.subPool |= IsLocal
}
byHash[string(i.Tx.idHash[:])] = i
replaced := senders.get(i.Tx.senderID).txNonce2Tx.ReplaceOrInsert(&nonce2TxItem{i})
replaced := sender.txNonce2Tx.ReplaceOrInsert(&nonce2TxItem{i})
if replaced != nil {
replacedMT := replaced.(*nonce2TxItem).metaTx
delete(byHash, string(replacedMT.Tx.idHash[:]))
@ -469,13 +481,11 @@ func onNewTxs(senders *SendersCache, newTxs TxSlots, protocolBaseFee, pendingBas
}
}
})
fmt.Printf("aa: %d\n", len(byHash))
senders.forEach(func(sender *senderInfo) {
// TODO: aggregate changed senders before call this func
onSenderChange(sender, protocolBaseFee, pendingBaseFee)
})
defer func(t time.Time) { fmt.Printf("pool.go:478: %s\n", time.Since(t)) }(time.Now())
pending.EnforceInvariants()
baseFee.EnforceInvariants()
@ -489,7 +499,6 @@ func onNewTxs(senders *SendersCache, newTxs TxSlots, protocolBaseFee, pendingBas
localsHistory.Add(i.Tx.idHash, struct{}{})
}
})
fmt.Printf("cc: %d\n", len(byHash))
return nil
}
@ -525,11 +534,9 @@ func (p *TxPool) OnNewBlock(coreDB kv.Tx, stateChanges map[string]senderInfo, un
p.lock.Lock()
defer p.lock.Unlock()
//log.Debug("[txpool.onNewBlock]", "senderInfo", len(p.senderInfo))
fmt.Printf("len2: %d\n", len(p.byHash))
if err := onNewBlock(senders, unwindTxs, minedTxs.txs, protocolBaseFee, pendingBaseFee, p.pending, p.baseFee, p.queued, p.byHash, p.localsHistory); err != nil {
return err
}
fmt.Printf("len3: %d\n", len(p.byHash))
notifyNewTxs := make(Hashes, 0, 32*len(unwindTxs.txs))
for i := range unwindTxs.txs {
@ -599,14 +606,14 @@ func onNewBlock(senders *SendersCache, unwindTxs TxSlots, minedTxs []*TxSlot, pr
// they effective lose their priority over the "remote" transactions. In order to prevent that,
// somehow the fact that certain transactions were local, needs to be remembered for some
// time (up to some "immutability threshold").
unsafeAddToPool(senders, unwindTxs, pending, PendingSubPool, func(i *metaTx) {
unsafeAddToPool(senders, unwindTxs, pending, PendingSubPool, func(i *metaTx, sender *senderInfo) {
//fmt.Printf("add: %d,%d\n", i.Tx.senderID, i.Tx.nonce)
if _, ok := localsHistory.Get(i.Tx.idHash); ok {
//TODO: also check if sender is in list of local-senders
i.subPool |= IsLocal
}
byHash[string(i.Tx.idHash[:])] = i
replaced := senders.get(i.Tx.senderID).txNonce2Tx.ReplaceOrInsert(&nonce2TxItem{i})
replaced := sender.txNonce2Tx.ReplaceOrInsert(&nonce2TxItem{i})
if replaced != nil {
replacedMT := replaced.(*nonce2TxItem).metaTx
delete(byHash, string(replacedMT.Tx.idHash[:]))
@ -689,19 +696,19 @@ func removeMined(senders *SendersCache, minedTxs []*TxSlot, pending, baseFee, qu
}
// unwind
func unsafeAddToPool(senders *SendersCache, unwindTxs TxSlots, to *SubPool, subPoolType SubPoolType, beforeAdd func(tx *metaTx)) {
defer func(t time.Time) { fmt.Printf("unsafeAddToPool %s\n", time.Since(t)) }(time.Now())
func unsafeAddToPool(senders *SendersCache, unwindTxs TxSlots, to *SubPool, subPoolType SubPoolType, beforeAdd func(tx *metaTx, sender *senderInfo)) {
for i, tx := range unwindTxs.txs {
mt := newMetaTx(tx, unwindTxs.isLocal[i])
sender := senders.get(tx.senderID)
// Insert to pending pool, if pool doesn't have tx with same Nonce and bigger Tip
if found := senders.getTx(tx.senderID, mt); found != nil {
if tx.tip <= found.Tx.tip {
if found := sender.txNonce2Tx.Get(&nonce2TxItem{mt}); found != nil {
if tx.tip <= found.(*nonce2TxItem).Tx.tip {
continue
}
//mt = found
}
beforeAdd(mt)
beforeAdd(mt, sender)
to.UnsafeAdd(mt, subPoolType)
}
}