This commit is contained in:
alex.sharov 2021-08-21 17:23:09 +07:00
parent 91f148bd1f
commit 6db3e60403
3 changed files with 45 additions and 43 deletions

View File

@ -300,11 +300,7 @@ func (f *Fetch) handleInboundMessage(ctx context.Context, req *sentry.InboundMes
if len(txs.txs) == 0 {
return nil
}
if err := f.coreDB.View(ctx, func(tx kv.Tx) error {
return f.pool.Add(tx, txs, f.senders)
}); err != nil {
return err
}
return f.pool.Add(f.coreDB, txs, f.senders)
default:
//defer log.Info("dropped", "id", req.Id)
}
@ -434,10 +430,7 @@ func (f *Fetch) handleStateChanges(ctx context.Context, client remote.KVClient)
addr := gointerfaces.ConvertH160toAddress(change.Address)
diff[string(addr[:])] = senderInfo{nonce: nonce, balance: balance}
}
if err := f.coreDB.View(ctx, func(tx kv.Tx) error {
return f.pool.OnNewBlock(tx, diff, unwindTxs, minedTxs, req.ProtocolBaseFee, 0, req.BlockHeight, f.senders)
}); err != nil {
if err := f.pool.OnNewBlock(diff, unwindTxs, minedTxs, req.ProtocolBaseFee, 0, req.BlockHeight, f.senders); err != nil {
log.Warn("onNewBlock", "err", err)
}
if f.wg != nil {

View File

@ -41,8 +41,8 @@ type Pool interface {
IdHashKnown(hash []byte) bool
Started() bool
GetRlp(hash []byte) []byte
Add(db kv.Tx, newTxs TxSlots, senders *SendersCache) error
OnNewBlock(db kv.Tx, stateChanges map[string]senderInfo, unwindTxs, minedTxs TxSlots, protocolBaseFee, pendingBaseFee, blockHeight uint64, senders *SendersCache) error
Add(db kv.RoDB, newTxs TxSlots, senders *SendersCache) error
OnNewBlock(stateChanges map[string]senderInfo, unwindTxs, minedTxs TxSlots, protocolBaseFee, pendingBaseFee, blockHeight uint64, senders *SendersCache) error
AddNewGoodPeer(peerID PeerID)
}
@ -172,7 +172,7 @@ func (sc *SendersCache) evict() int {
return count
}
func (sc *SendersCache) onNewTxs(coreDBTx kv.Tx, newTxs TxSlots) error {
func (sc *SendersCache) onNewTxs(coreDBTx kv.RoDB, newTxs TxSlots) error {
sc.ensureSenderIDOnNewTxs(newTxs)
toLoad := sc.setTxSenderID(newTxs)
if len(toLoad) == 0 {
@ -186,29 +186,33 @@ func (sc *SendersCache) onNewTxs(coreDBTx kv.Tx, newTxs TxSlots) error {
return nil
}
func (sc *SendersCache) onNewBlock(coreDBTx kv.Tx, stateChanges map[string]senderInfo, unwindTxs, minedTxs TxSlots, blockHeight uint64) error {
func (sc *SendersCache) onNewBlock(stateChanges map[string]senderInfo, unwindTxs, minedTxs TxSlots, blockHeight uint64) error {
//TODO: if see non-continuous block heigh - drop cache and reload from db
sc.blockHeight.Store(blockHeight)
//`loadSenders` goes by network to core - and it must be outside of SendersCache lock. But other methods must be locked
sc.mergeStateChanges(stateChanges, unwindTxs, minedTxs)
toLoad := sc.setTxSenderID(unwindTxs)
if len(toLoad) > 0 {
/*
if len(toLoad) > 0 {
diff, err := loadSenders(coreDBTx, toLoad)
if err != nil {
return err
}
sc.set(diff)
}
*/
toLoad = sc.setTxSenderID(minedTxs)
if len(toLoad) == 0 {
return nil
}
/*
diff, err := loadSenders(coreDBTx, toLoad)
if err != nil {
return err
}
sc.set(diff)
}
toLoad = sc.setTxSenderID(minedTxs)
if len(toLoad) == 0 {
return nil
}
diff, err := loadSenders(coreDBTx, toLoad)
if err != nil {
return err
}
sc.set(diff)
*/
return nil
}
func (sc *SendersCache) set(diff map[uint64]senderInfo) {
@ -306,22 +310,27 @@ func (sc *SendersCache) setTxSenderID(txs TxSlots) map[uint64]string {
return toLoad
}
func loadSenders(coreDB kv.Tx, toLoad map[uint64]string) (map[uint64]senderInfo, error) {
func loadSenders(coreDB kv.RoDB, toLoad map[uint64]string) (map[uint64]senderInfo, error) {
diff := make(map[uint64]senderInfo, len(toLoad))
for id := range toLoad {
encoded, err := coreDB.GetOne(kv.PlainState, []byte(toLoad[id]))
if err != nil {
return nil, err
if err := coreDB.View(context.Background(), func(tx kv.Tx) error {
for id := range toLoad {
encoded, err := tx.GetOne(kv.PlainState, []byte(toLoad[id]))
if err != nil {
return err
}
if len(encoded) == 0 {
diff[id] = *newSenderInfo(0, *uint256.NewInt(0))
continue
}
nonce, balance, err := DecodeSender(encoded)
if err != nil {
return err
}
diff[id] = *newSenderInfo(nonce, balance)
}
if len(encoded) == 0 {
diff[id] = *newSenderInfo(0, *uint256.NewInt(0))
continue
}
nonce, balance, err := DecodeSender(encoded)
if err != nil {
return nil, err
}
diff[id] = *newSenderInfo(nonce, balance)
return nil
}); err != nil {
return nil, err
}
return diff, nil
}
@ -437,7 +446,7 @@ func (p *TxPool) Started() bool {
return p.protocolBaseFee.Load() > 0
}
func (p *TxPool) Add(coreDB kv.Tx, newTxs TxSlots, senders *SendersCache) error {
func (p *TxPool) Add(coreDB kv.RoDB, newTxs TxSlots, senders *SendersCache) error {
t := time.Now()
if err := senders.onNewTxs(coreDB, newTxs); err != nil {
return err
@ -537,9 +546,9 @@ func (p *TxPool) setBaseFee(protocolBaseFee, pendingBaseFee uint64) (uint64, uin
return protocolBaseFee, p.pendingBaseFee.Load()
}
func (p *TxPool) OnNewBlock(coreDB kv.Tx, stateChanges map[string]senderInfo, unwindTxs, minedTxs TxSlots, protocolBaseFee, pendingBaseFee, blockHeight uint64, senders *SendersCache) error {
func (p *TxPool) OnNewBlock(stateChanges map[string]senderInfo, unwindTxs, minedTxs TxSlots, protocolBaseFee, pendingBaseFee, blockHeight uint64, senders *SendersCache) error {
t := time.Now()
if err := senders.onNewBlock(coreDB, stateChanges, unwindTxs, minedTxs, blockHeight); err != nil {
if err := senders.onNewBlock(stateChanges, unwindTxs, minedTxs, blockHeight); err != nil {
return err
}
//log.Debug("[txpool] new block", "unwinded", len(unwindTxs.txs), "mined", len(minedTxs.txs), "protocolBaseFee", protocolBaseFee, "blockHeight", blockHeight)

View File

@ -434,13 +434,13 @@ func FuzzOnNewBlocks11(f *testing.F) {
// go to first fork
//fmt.Printf("ll1: %d,%d,%d\n", pool.pending.Len(), pool.baseFee.Len(), pool.queued.Len())
unwindTxs, minedTxs1, p2pReceived, minedTxs2 := splitDataset(txs)
err = pool.OnNewBlock(nil, map[string]senderInfo{}, unwindTxs, minedTxs1, protocolBaseFee, pendingBaseFee, 1, sendersCache)
err = pool.OnNewBlock(map[string]senderInfo{}, unwindTxs, minedTxs1, protocolBaseFee, pendingBaseFee, 1, sendersCache)
assert.NoError(err)
check(unwindTxs, minedTxs1, "fork1")
checkNotify(unwindTxs, minedTxs1, "fork1")
// unwind everything and switch to new fork (need unwind mined now)
err = pool.OnNewBlock(nil, map[string]senderInfo{}, minedTxs1, minedTxs2, protocolBaseFee, pendingBaseFee, 2, sendersCache)
err = pool.OnNewBlock(map[string]senderInfo{}, minedTxs1, minedTxs2, protocolBaseFee, pendingBaseFee, 2, sendersCache)
assert.NoError(err)
check(minedTxs1, minedTxs2, "fork2")
checkNotify(minedTxs1, minedTxs2, "fork2")