add state check assert routine

This commit is contained in:
alex.sharov 2021-08-27 09:49:26 +07:00
parent 624aec385e
commit 993265011f

View File

@ -145,7 +145,7 @@ func (sc *SendersCache) idsCount(tx kv.Tx) (inMem int, inDb int, err error) {
if err != nil {
return 0, 0, err
}
return int(inDB), len(sc.senderIDs), nil
return len(sc.senderIDs), int(inDB), nil
}
//nolint
@ -158,7 +158,7 @@ func (sc *SendersCache) infoCount(tx kv.Tx) (inMem int, inDb int, err error) {
if err != nil {
return 0, 0, err
}
return int(inDB), len(sc.senderInfo), nil
return len(sc.senderInfo), int(inDB), nil
}
func (sc *SendersCache) Id(addr string, tx kv.Tx) (uint64, bool, error) {
sc.lock.RLock() //TODO: it may load mapping from db,maybe just use thread-safe maps to don't worry much
@ -579,7 +579,7 @@ func (sc *SendersCache) flush(tx kv.RwTx, byNonce *ByNonce) error {
encIDs = append(encIDs, encID...)
}
//TODO: it's very naive eviction of all senders without transactions - and with O(n) complexity. To change in future.
//TODO: it's very naive eviction of all senders without transactions - and with O(n) complexity. We need more soft eviction policy.
if err := tx.ForEach(kv.PooledSenderID, nil, func(addr, id []byte) error {
if byNonce.count(binary.BigEndian.Uint64(id)) > 0 {
return nil
@ -768,15 +768,23 @@ func (p *TxPool) printDebug(prefix string) {
(*p.queued.best)[i].Tx.printDebug(fmt.Sprintf("%s.queued : %b", prefix, (*p.queued.best)[i].subPool))
}
}
func (p *TxPool) logStats(sc *SendersCache) {
func (p *TxPool) logStats(tx kv.Tx) error {
protocolBaseFee, pendingBaseFee := p.protocolBaseFee.Load(), p.pendingBaseFee.Load()
idsInMem, idsInDb, err := p.senders.idsCount(tx)
if err != nil {
return err
}
infoInMem, infoInDb, err := p.senders.infoCount(tx)
if err != nil {
return err
}
p.lock.RLock()
defer p.lock.RUnlock()
log.Info(fmt.Sprintf("baseFee: %dm->%dm; queuesSize: pending=%d/%d, baseFee=%d/%d, queued=%d/%d; sendersCache: id=%d,info=%d",
log.Info(fmt.Sprintf("baseFee: %dm->%dm; queuesSize: pending=%d/%d, baseFee=%d/%d, queued=%d/%d; sendersCache: id=%d+%d,info=%d+%d",
protocolBaseFee/1_000_000, pendingBaseFee/1_000_000,
p.pending.Len(), PendingSubPoolLimit, p.baseFee.Len(), BaseFeeSubPoolLimit, p.queued.Len(), QueuedSubPoolLimit,
len(sc.senderIDs), len(sc.senderInfo),
idsInMem, idsInDb, infoInMem, infoInDb,
))
}
func (p *TxPool) GetRlp(tx kv.Tx, hash []byte) ([]byte, error) {
@ -1615,7 +1623,9 @@ func BroadcastLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, s
}); err != nil {
log.Error("restore from db", "err", err)
}
p.logStats(senders)
if err := db.View(ctx, func(tx kv.Tx) error { return p.logStats(tx) }); err != nil {
log.Error("log stats", "err", err)
}
if ASSERT {
go func() {
if err := p.forceCheckState(ctx, db, coreDB); err != nil {
@ -1640,8 +1650,9 @@ func BroadcastLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, s
case <-ctx.Done():
return
case <-logEvery.C:
p.logStats(senders)
log.Info("cache", "size", senders.len())
if err := db.View(ctx, func(tx kv.Tx) error { return p.logStats(tx) }); err != nil {
log.Error("log stats", "err", err)
}
case <-commitEvery.C:
if db != nil {
t := time.Now()