From 4c765516ba903c3670b8cad83e23fcc9dd2c3df6 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Sat, 21 Aug 2021 08:49:05 +0700 Subject: [PATCH 1/8] save --- txpool/pool.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/txpool/pool.go b/txpool/pool.go index a1dce3cb6..ffdf541a9 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -185,6 +185,8 @@ func (sc *SendersCache) onNewTxs(coreDBTx kv.Tx, newTxs TxSlots) error { func (sc *SendersCache) onNewBlock(coreDBTx kv.Tx, stateChanges map[string]senderInfo, unwindTxs, minedTxs TxSlots, blockHeight uint64) error { //TODO: if see non-continuous block heigh - drop cache and reload from db sc.blockHeight.Store(blockHeight) + + //`loadSenders` goes by network to core - and it must be outside of SendersCache lock. But other methods must be locked sc.mergeStateChanges(stateChanges, unwindTxs, minedTxs) toLoad := sc.setTxSenderID(unwindTxs) diff, err := loadSenders(coreDBTx, toLoad) @@ -312,7 +314,6 @@ type TxPool struct { // fields for transaction propagation recentlyConnectedPeers *recentlyConnectedPeers newTxs chan Hashes - //lastTxPropagationTimestamp time.Time } func New(newTxs chan Hashes, db kv.RwDB) (*TxPool, error) { From 256dfd818a9597e5955ac76cb17621deafb659b3 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Sat, 21 Aug 2021 15:08:03 +0700 Subject: [PATCH 2/8] save --- txpool/fetch.go | 6 ++++++ txpool/packets.go | 2 -- txpool/pool.go | 13 +++++++++---- 3 files changed, 15 insertions(+), 6 deletions(-) diff --git a/txpool/fetch.go b/txpool/fetch.go index 402312c5b..5e7fa6160 100644 --- a/txpool/fetch.go +++ b/txpool/fetch.go @@ -193,6 +193,9 @@ func (f *Fetch) receiveMessage(ctx context.Context, sentryClient sentry.SentryCl func (f *Fetch) handleInboundMessage(ctx context.Context, req *sentry.InboundMessage, sentryClient sentry.SentryClient) error { switch req.Id { case sentry.MessageId_NEW_POOLED_TRANSACTION_HASHES_66, sentry.MessageId_NEW_POOLED_TRANSACTION_HASHES_65: + if !f.pool.Started() { + return nil + } hashCount, pos, err := ParseHashesCount(req.Data, 0) if err != nil { return fmt.Errorf("parsing NewPooledTransactionHashes: %w", err) @@ -228,6 +231,9 @@ func (f *Fetch) handleInboundMessage(ctx context.Context, req *sentry.InboundMes } } case sentry.MessageId_GET_POOLED_TRANSACTIONS_66, sentry.MessageId_GET_POOLED_TRANSACTIONS_65: + if !f.pool.Started() { + return nil + } //TODO: handleInboundMessage is single-threaded - means it can accept as argument couple buffers (or analog of txParseContext). Protobuf encoding will copy data anyway, but DirectClient doesn't var encodedRequest []byte messageId := sentry.MessageId_POOLED_TRANSACTIONS_66 diff --git a/txpool/packets.go b/txpool/packets.go index 44b7af7fb..63cfa1da0 100644 --- a/txpool/packets.go +++ b/txpool/packets.go @@ -188,7 +188,6 @@ func ParsePooledTransactions65(payload []byte, pos int, ctx *TxParseContext, txS pos, err = ctx.ParseTransaction(payload, pos, txSlots.txs[i], txSlots.senders.At(i)) if err != nil { if errors.Is(err, ErrRejected) { - fmt.Printf("rejected\n") continue } return 0, err @@ -217,7 +216,6 @@ func ParsePooledTransactions66(payload []byte, pos int, ctx *TxParseContext, txS pos, err = ctx.ParseTransaction(payload, pos, txSlots.txs[i], txSlots.senders.At(i)) if err != nil { if errors.Is(err, ErrRejected) { - fmt.Printf("rejected\n") continue } return requestID, 0, err diff --git a/txpool/pool.go b/txpool/pool.go index ffdf541a9..791fd2945 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -146,6 +146,11 @@ func (sc *SendersCache) forEach(f func(info *senderInfo)) { f(sc.senderInfo[i]) } } +func (sc *SendersCache) len() int { + sc.lock.RLock() + defer sc.lock.RUnlock() + return len(sc.senderInfo) +} func (sc *SendersCache) evict() int { sc.lock.Lock() @@ -502,7 +507,6 @@ func (p *TxPool) setBaseFee(protocolBaseFee, pendingBaseFee uint64) (uint64, uin if hasNewVal { p.pendingBaseFee.Store(pendingBaseFee) } - log.Debug("set base fee", "protocol", protocolBaseFee, "pending", pendingBaseFee) return protocolBaseFee, p.pendingBaseFee.Load() } @@ -510,7 +514,7 @@ func (p *TxPool) OnNewBlock(coreDB kv.Tx, stateChanges map[string]senderInfo, un if err := senders.onNewBlock(coreDB, stateChanges, unwindTxs, minedTxs, blockHeight); err != nil { return err } - log.Debug("[txpool.onNewBlock]", "unwinded", len(unwindTxs.txs), "mined", len(minedTxs.txs), "protocolBaseFee", protocolBaseFee, "blockHeight", blockHeight) + log.Debug("[txpool] new block", "unwinded", len(unwindTxs.txs), "mined", len(minedTxs.txs), "protocolBaseFee", protocolBaseFee, "blockHeight", blockHeight) protocolBaseFee, pendingBaseFee = p.setBaseFee(protocolBaseFee, pendingBaseFee) if err := unwindTxs.Valid(); err != nil { return err @@ -521,7 +525,6 @@ func (p *TxPool) OnNewBlock(coreDB kv.Tx, stateChanges map[string]senderInfo, un p.lock.Lock() defer p.lock.Unlock() - //log.Debug("[txpool.onNewBlock]", "senderInfo", len(p.senderInfo)) if err := onNewBlock(senders, unwindTxs, minedTxs.txs, protocolBaseFee, pendingBaseFee, p.pending, p.baseFee, p.queued, p.byHash, p.localsHistory); err != nil { return err } @@ -1019,7 +1022,9 @@ func BroadcastLoop(ctx context.Context, db kv.RwDB, p *TxPool, senders *SendersC case <-evictSendersEvery.C: // evict sendersInfo without txs count := senders.evict() - log.Debug("evicted senders", "amount", count) + if count > 0 { + log.Debug("evicted senders", "amount", count) + } if db != nil { if err := db.Update(ctx, func(tx kv.RwTx) error { return p.flushIsLocalHistory(tx) From 8170635b5c78b24104a6256a18bab0d9517da416 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Sat, 21 Aug 2021 15:24:03 +0700 Subject: [PATCH 3/8] save --- txpool/fetch.go | 47 +++++++++++++++++++++++------------------------ txpool/pool.go | 17 ++++++++++++----- 2 files changed, 35 insertions(+), 29 deletions(-) diff --git a/txpool/fetch.go b/txpool/fetch.go index 5e7fa6160..af4a637f4 100644 --- a/txpool/fetch.go +++ b/txpool/fetch.go @@ -40,13 +40,15 @@ import ( // genesis hash and list of forks, but with zero max block and total difficulty // Sentry should have a logic not to overwrite statusData with messages from tx pool type Fetch struct { - ctx context.Context // Context used for cancellation and closing of the fetcher - sentryClients []sentry.SentryClient // sentry clients that will be used for accessing the network - pool Pool // Transaction pool implementation - senders *SendersCache - coreDB kv.RoDB - wg *sync.WaitGroup // used for synchronisation in the tests (nil when not in tests) - stateChangesClient remote.KVClient + ctx context.Context // Context used for cancellation and closing of the fetcher + sentryClients []sentry.SentryClient // sentry clients that will be used for accessing the network + pool Pool // Transaction pool implementation + senders *SendersCache + coreDB kv.RoDB + wg *sync.WaitGroup // used for synchronisation in the tests (nil when not in tests) + stateChangesClient remote.KVClient + stateChangesParseCtx *TxParseContext + pooledTxsParseCtx *TxParseContext } type Timings struct { @@ -63,13 +65,17 @@ var DefaultTimings = Timings{ // SentryClient here is an interface, it is suitable for mocking in tests (mock will need // to implement all the functions of the SentryClient interface). func NewFetch(ctx context.Context, sentryClients []sentry.SentryClient, pool Pool, senders *SendersCache, stateChangesClient remote.KVClient, db kv.RoDB) *Fetch { + pooledTxsParseCtx := NewTxParseContext() + pooledTxsParseCtx.Reject(func(hash []byte) bool { return pool.IdHashKnown(hash) }) return &Fetch{ - ctx: ctx, - sentryClients: sentryClients, - pool: pool, - senders: senders, - coreDB: db, - stateChangesClient: stateChangesClient, + ctx: ctx, + sentryClients: sentryClients, + pool: pool, + senders: senders, + coreDB: db, + stateChangesClient: stateChangesClient, + stateChangesParseCtx: NewTxParseContext(), + pooledTxsParseCtx: pooledTxsParseCtx, } } @@ -281,19 +287,13 @@ func (f *Fetch) handleInboundMessage(ctx context.Context, req *sentry.InboundMes if !f.pool.Started() { return nil } - - parseCtx := NewTxParseContext() - parseCtx.Reject(func(hash []byte) bool { - //fmt.Printf("check: %t\n", f.pool.IdHashKnown(hash)) - return f.pool.IdHashKnown(hash) - }) txs := TxSlots{} if req.Id == sentry.MessageId_GET_POOLED_TRANSACTIONS_66 { - if _, err := ParsePooledTransactions65(req.Data, 0, parseCtx, &txs); err != nil { + if _, err := ParsePooledTransactions65(req.Data, 0, f.pooledTxsParseCtx, &txs); err != nil { return err } } else { - if _, _, err := ParsePooledTransactions66(req.Data, 0, parseCtx, &txs); err != nil { + if _, _, err := ParsePooledTransactions66(req.Data, 0, f.pooledTxsParseCtx, &txs); err != nil { return err } } @@ -403,13 +403,12 @@ func (f *Fetch) handleStateChanges(ctx context.Context, client remote.KVClient) return nil } - parseCtx := NewTxParseContext() var unwindTxs, minedTxs TxSlots if req.Direction == remote.Direction_FORWARD { minedTxs.Growth(len(req.Txs)) for i := range req.Txs { minedTxs.txs[i] = &TxSlot{} - if _, err := parseCtx.ParseTransaction(req.Txs[i], 0, minedTxs.txs[i], minedTxs.senders.At(i)); err != nil { + if _, err := f.stateChangesParseCtx.ParseTransaction(req.Txs[i], 0, minedTxs.txs[i], minedTxs.senders.At(i)); err != nil { log.Warn("stream.Recv", "err", err) continue } @@ -419,7 +418,7 @@ func (f *Fetch) handleStateChanges(ctx context.Context, client remote.KVClient) unwindTxs.Growth(len(req.Txs)) for i := range req.Txs { unwindTxs.txs[i] = &TxSlot{} - if _, err := parseCtx.ParseTransaction(req.Txs[i], 0, unwindTxs.txs[i], unwindTxs.senders.At(i)); err != nil { + if _, err := f.stateChangesParseCtx.ParseTransaction(req.Txs[i], 0, unwindTxs.txs[i], unwindTxs.senders.At(i)); err != nil { log.Warn("stream.Recv", "err", err) continue } diff --git a/txpool/pool.go b/txpool/pool.go index 791fd2945..87bf961d0 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -291,6 +291,10 @@ func loadSenders(coreDB kv.Tx, toLoad map[uint64]string) (map[uint64]*senderInfo if err != nil { return nil, err } + if len(encoded) == 0 { + diff[id] = newSenderInfo(0, *uint256.NewInt(0)) + continue + } nonce, balance, err := DecodeSender(encoded) if err != nil { return nil, err @@ -586,7 +590,9 @@ func onNewBlock(senders *SendersCache, unwindTxs TxSlots, minedTxs []*TxSlot, pr localsHistory.Add(i.Tx.idHash, struct{}{}) } }) - //log.Info("remove mined", "removed", j, "minedTxsLen", len(minedTxs)) + if j > 0 { + log.Info("remove mined", "removed", j, "minedTxsLen", len(minedTxs)) + } // This can be thought of a reverse operation from the one described before. // When a block that was deemed "the best" of its height, is no longer deemed "the best", the @@ -654,13 +660,13 @@ func onNewBlock(senders *SendersCache, unwindTxs TxSlots, minedTxs []*TxSlot, pr func removeMined(senders *SendersCache, minedTxs []*TxSlot, pending, baseFee, queued *SubPool, discard func(tx *metaTx)) { for _, tx := range minedTxs { sender := senders.get(tx.senderID) - if sender.txNonce2Tx.Len() > 0 { - log.Debug("[txpool] removing mined", "senderID", tx.senderID, "sender.txNonce2Tx.len()", sender.txNonce2Tx.Len()) - } + //if sender.txNonce2Tx.Len() > 0 { + //log.Debug("[txpool] removing mined", "senderID", tx.senderID, "sender.txNonce2Tx.len()", sender.txNonce2Tx.Len()) + //} // delete mined transactions from everywhere sender.txNonce2Tx.Ascend(func(i btree.Item) bool { it := i.(*nonce2TxItem) - log.Debug("[txpool] removing mined, cmp nonces", "tx.nonce", it.metaTx.Tx.nonce, "sender.nonce", sender.nonce) + //log.Debug("[txpool] removing mined, cmp nonces", "tx.nonce", it.metaTx.Tx.nonce, "sender.nonce", sender.nonce) if it.metaTx.Tx.nonce > sender.nonce { return false } @@ -679,6 +685,7 @@ func removeMined(senders *SendersCache, minedTxs []*TxSlot, pending, baseFee, qu queued.UnsafeRemove(it.metaTx) discard(it.metaTx) default: + fmt.Printf("aaaaaaa\n") //already removed } return true From ee04be5cf3a4c030d9c38d7a4eccc0f1f58f40eb Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Sat, 21 Aug 2021 15:25:53 +0700 Subject: [PATCH 4/8] save --- txpool/pool.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/txpool/pool.go b/txpool/pool.go index 87bf961d0..64cbdfeb6 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -93,7 +93,7 @@ const PendingSubPoolLimit = 1024 const BaseFeeSubPoolLimit = 1024 const QueuedSubPoolLimit = 1024 -const MaxSendersInfoCache = 1024 +const MaxSendersInfoCache = 2 * (PendingSubPoolLimit + BaseFeeSubPoolLimit + QueuedSubPoolLimit) type nonce2Tx struct{ *btree.BTree } From 7510505f39b2270431692cde622b314dbb86b2d7 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Sat, 21 Aug 2021 15:31:53 +0700 Subject: [PATCH 5/8] save --- txpool/pool.go | 22 +++++++++------------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/txpool/pool.go b/txpool/pool.go index 64cbdfeb6..7a01d1157 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -161,16 +161,12 @@ func (sc *SendersCache) evict() int { } count := 0 - for i := range sc.senderInfo { - if sc.senderInfo[i].txNonce2Tx.Len() > 0 { + for addr, id := range sc.senderIDs { + if sc.senderInfo[id].txNonce2Tx.Len() > 0 { continue } - for addr, id := range sc.senderIDs { - if id == i { - delete(sc.senderIDs, addr) - } - } - delete(sc.senderInfo, i) + delete(sc.senderInfo, id) + delete(sc.senderIDs, addr) count++ } return count @@ -548,6 +544,11 @@ func (p *TxPool) OnNewBlock(coreDB kv.Tx, stateChanges map[string]senderInfo, un } } + count := senders.evict() + if count > 0 { + log.Debug("evicted senders", "amount", count) + } + return nil } func (p *TxPool) flushIsLocalHistory(tx kv.RwTx) error { @@ -1027,11 +1028,6 @@ func BroadcastLoop(ctx context.Context, db kv.RwDB, p *TxPool, senders *SendersC case <-logEvery.C: p.logStats() case <-evictSendersEvery.C: - // evict sendersInfo without txs - count := senders.evict() - if count > 0 { - log.Debug("evicted senders", "amount", count) - } if db != nil { if err := db.Update(ctx, func(tx kv.RwTx) error { return p.flushIsLocalHistory(tx) From 91f148bd1fa6a72d920e62ac559cc912a6c83dfd Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Sat, 21 Aug 2021 17:17:43 +0700 Subject: [PATCH 6/8] save --- txpool/pool.go | 87 +++++++++++++++++++++++++++++++++----------------- 1 file changed, 58 insertions(+), 29 deletions(-) diff --git a/txpool/pool.go b/txpool/pool.go index 7a01d1157..94456ac59 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -89,9 +89,9 @@ const PendingSubPool SubPoolType = 1 const BaseFeeSubPool SubPoolType = 2 const QueuedSubPool SubPoolType = 3 -const PendingSubPoolLimit = 1024 -const BaseFeeSubPoolLimit = 1024 -const QueuedSubPoolLimit = 1024 +const PendingSubPoolLimit = 10 * 1024 +const BaseFeeSubPoolLimit = 10 * 1024 +const QueuedSubPoolLimit = 10 * 1024 const MaxSendersInfoCache = 2 * (PendingSubPoolLimit + BaseFeeSubPoolLimit + QueuedSubPoolLimit) @@ -175,6 +175,9 @@ func (sc *SendersCache) evict() int { func (sc *SendersCache) onNewTxs(coreDBTx kv.Tx, newTxs TxSlots) error { sc.ensureSenderIDOnNewTxs(newTxs) toLoad := sc.setTxSenderID(newTxs) + if len(toLoad) == 0 { + return nil + } diff, err := loadSenders(coreDBTx, toLoad) if err != nil { return err @@ -190,34 +193,53 @@ func (sc *SendersCache) onNewBlock(coreDBTx kv.Tx, stateChanges map[string]sende //`loadSenders` goes by network to core - and it must be outside of SendersCache lock. But other methods must be locked sc.mergeStateChanges(stateChanges, unwindTxs, minedTxs) toLoad := sc.setTxSenderID(unwindTxs) - diff, err := loadSenders(coreDBTx, toLoad) - if err != nil { - return err + if len(toLoad) > 0 { + diff, err := loadSenders(coreDBTx, toLoad) + if err != nil { + return err + } + sc.set(diff) } - sc.set(diff) toLoad = sc.setTxSenderID(minedTxs) - diff, err = loadSenders(coreDBTx, toLoad) + if len(toLoad) == 0 { + return nil + } + diff, err := loadSenders(coreDBTx, toLoad) if err != nil { return err } sc.set(diff) return nil } -func (sc *SendersCache) set(diff map[uint64]*senderInfo) { +func (sc *SendersCache) set(diff map[uint64]senderInfo) { sc.lock.Lock() defer sc.lock.Unlock() for id := range diff { // merge state changes - sc.senderInfo[id] = diff[id] + a := diff[id] + sc.senderInfo[id] = &a } } func (sc *SendersCache) mergeStateChanges(stateChanges map[string]senderInfo, unwindedTxs, minedTxs TxSlots) { sc.lock.Lock() defer sc.lock.Unlock() - for addr, id := range sc.senderIDs { // merge state changes - if v, ok := stateChanges[addr]; ok { - sc.senderInfo[id] = newSenderInfo(v.nonce, v.balance) + for addr, v := range stateChanges { // merge state changes + id, ok := sc.senderIDs[addr] + if !ok { + sc.senderID++ + id = sc.senderID + sc.senderIDs[addr] = id } + sc.senderInfo[id] = newSenderInfo(v.nonce, v.balance) } + + /* + for addr, id := range sc.senderIDs { // merge state changes + if v, ok := stateChanges[addr]; ok { + sc.senderInfo[id] = newSenderInfo(v.nonce, v.balance) + } + } + */ + for i := 0; i < unwindedTxs.senders.Len(); i++ { id, ok := sc.senderIDs[string(unwindedTxs.senders.At(i))] if !ok { @@ -225,8 +247,8 @@ func (sc *SendersCache) mergeStateChanges(stateChanges map[string]senderInfo, un id = sc.senderID sc.senderIDs[string(unwindedTxs.senders.At(i))] = id } - if v, ok := stateChanges[string(unwindedTxs.senders.At(i))]; ok { - sc.senderInfo[id] = newSenderInfo(v.nonce, v.balance) + if _, ok := stateChanges[string(unwindedTxs.senders.At(i))]; !ok { + sc.senderInfo[id] = newSenderInfo(0, *uint256.NewInt(0)) } } @@ -237,9 +259,13 @@ func (sc *SendersCache) mergeStateChanges(stateChanges map[string]senderInfo, un id = sc.senderID sc.senderIDs[string(minedTxs.senders.At(i))] = id } - if v, ok := stateChanges[string(minedTxs.senders.At(i))]; ok { - sc.senderInfo[id] = newSenderInfo(v.nonce, v.balance) + if _, ok := stateChanges[string(minedTxs.senders.At(i))]; !ok { + sc.senderInfo[id] = newSenderInfo(0, *uint256.NewInt(0)) } + + //if v, ok := stateChanges[string(minedTxs.senders.At(i))]; ok { + // sc.senderInfo[id] = newSenderInfo(v.nonce, v.balance) + //} } } @@ -280,22 +306,22 @@ func (sc *SendersCache) setTxSenderID(txs TxSlots) map[uint64]string { return toLoad } -func loadSenders(coreDB kv.Tx, toLoad map[uint64]string) (map[uint64]*senderInfo, error) { - diff := make(map[uint64]*senderInfo, len(toLoad)) +func loadSenders(coreDB kv.Tx, toLoad map[uint64]string) (map[uint64]senderInfo, error) { + diff := make(map[uint64]senderInfo, len(toLoad)) for id := range toLoad { encoded, err := coreDB.GetOne(kv.PlainState, []byte(toLoad[id])) if err != nil { return nil, err } if len(encoded) == 0 { - diff[id] = newSenderInfo(0, *uint256.NewInt(0)) + diff[id] = *newSenderInfo(0, *uint256.NewInt(0)) continue } nonce, balance, err := DecodeSender(encoded) if err != nil { return nil, err } - diff[id] = newSenderInfo(nonce, balance) + diff[id] = *newSenderInfo(nonce, balance) } return diff, nil } @@ -412,6 +438,7 @@ func (p *TxPool) Started() bool { } func (p *TxPool) Add(coreDB kv.Tx, newTxs TxSlots, senders *SendersCache) error { + t := time.Now() if err := senders.onNewTxs(coreDB, newTxs); err != nil { return err } @@ -423,7 +450,6 @@ func (p *TxPool) Add(coreDB kv.Tx, newTxs TxSlots, senders *SendersCache) error if protocolBaseFee == 0 || pendingBaseFee == 0 { return fmt.Errorf("non-zero base fee: %d,%d", protocolBaseFee, pendingBaseFee) } - p.lock.Lock() defer p.lock.Unlock() if err := onNewTxs(senders, newTxs, protocolBaseFee, pendingBaseFee, p.pending, p.baseFee, p.queued, p.byHash, p.localsHistory); err != nil { @@ -444,6 +470,7 @@ func (p *TxPool) Add(coreDB kv.Tx, newTxs TxSlots, senders *SendersCache) error } } + log.Info("on new txs", "in", time.Since(t)) return nil } func onNewTxs(senders *SendersCache, newTxs TxSlots, protocolBaseFee, pendingBaseFee uint64, pending, baseFee, queued *SubPool, byHash map[string]*metaTx, localsHistory *simplelru.LRU) error { @@ -511,11 +538,11 @@ func (p *TxPool) setBaseFee(protocolBaseFee, pendingBaseFee uint64) (uint64, uin } func (p *TxPool) OnNewBlock(coreDB kv.Tx, stateChanges map[string]senderInfo, unwindTxs, minedTxs TxSlots, protocolBaseFee, pendingBaseFee, blockHeight uint64, senders *SendersCache) error { + t := time.Now() if err := senders.onNewBlock(coreDB, stateChanges, unwindTxs, minedTxs, blockHeight); err != nil { return err } - log.Debug("[txpool] new block", "unwinded", len(unwindTxs.txs), "mined", len(minedTxs.txs), "protocolBaseFee", protocolBaseFee, "blockHeight", blockHeight) - protocolBaseFee, pendingBaseFee = p.setBaseFee(protocolBaseFee, pendingBaseFee) + //log.Debug("[txpool] new block", "unwinded", len(unwindTxs.txs), "mined", len(minedTxs.txs), "protocolBaseFee", protocolBaseFee, "blockHeight", blockHeight) if err := unwindTxs.Valid(); err != nil { return err } @@ -543,12 +570,13 @@ func (p *TxPool) OnNewBlock(coreDB kv.Tx, stateChanges map[string]senderInfo, un default: } } + //count := senders.evict() + //if count > 0 { + // log.Debug("evicted senders", "amount", count) + //} - count := senders.evict() - if count > 0 { - log.Debug("evicted senders", "amount", count) - } - + protocolBaseFee, pendingBaseFee = p.setBaseFee(protocolBaseFee, pendingBaseFee) + log.Info("on new block", "in", time.Since(t)) return nil } func (p *TxPool) flushIsLocalHistory(tx kv.RwTx) error { @@ -1027,6 +1055,7 @@ func BroadcastLoop(ctx context.Context, db kv.RwDB, p *TxPool, senders *SendersC return case <-logEvery.C: p.logStats() + log.Info("cache", "size", senders.len()) case <-evictSendersEvery.C: if db != nil { if err := db.Update(ctx, func(tx kv.RwTx) error { From 6db3e60403e72695f22ab8ef25eaf5b34123cf1f Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Sat, 21 Aug 2021 17:23:09 +0700 Subject: [PATCH 7/8] save --- txpool/fetch.go | 11 ++---- txpool/pool.go | 73 ++++++++++++++++++++++------------------ txpool/pool_fuzz_test.go | 4 +-- 3 files changed, 45 insertions(+), 43 deletions(-) diff --git a/txpool/fetch.go b/txpool/fetch.go index af4a637f4..8de696dfa 100644 --- a/txpool/fetch.go +++ b/txpool/fetch.go @@ -300,11 +300,7 @@ func (f *Fetch) handleInboundMessage(ctx context.Context, req *sentry.InboundMes if len(txs.txs) == 0 { return nil } - if err := f.coreDB.View(ctx, func(tx kv.Tx) error { - return f.pool.Add(tx, txs, f.senders) - }); err != nil { - return err - } + return f.pool.Add(f.coreDB, txs, f.senders) default: //defer log.Info("dropped", "id", req.Id) } @@ -434,10 +430,7 @@ func (f *Fetch) handleStateChanges(ctx context.Context, client remote.KVClient) addr := gointerfaces.ConvertH160toAddress(change.Address) diff[string(addr[:])] = senderInfo{nonce: nonce, balance: balance} } - - if err := f.coreDB.View(ctx, func(tx kv.Tx) error { - return f.pool.OnNewBlock(tx, diff, unwindTxs, minedTxs, req.ProtocolBaseFee, 0, req.BlockHeight, f.senders) - }); err != nil { + if err := f.pool.OnNewBlock(diff, unwindTxs, minedTxs, req.ProtocolBaseFee, 0, req.BlockHeight, f.senders); err != nil { log.Warn("onNewBlock", "err", err) } if f.wg != nil { diff --git a/txpool/pool.go b/txpool/pool.go index 94456ac59..ed3f6837a 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -41,8 +41,8 @@ type Pool interface { IdHashKnown(hash []byte) bool Started() bool GetRlp(hash []byte) []byte - Add(db kv.Tx, newTxs TxSlots, senders *SendersCache) error - OnNewBlock(db kv.Tx, stateChanges map[string]senderInfo, unwindTxs, minedTxs TxSlots, protocolBaseFee, pendingBaseFee, blockHeight uint64, senders *SendersCache) error + Add(db kv.RoDB, newTxs TxSlots, senders *SendersCache) error + OnNewBlock(stateChanges map[string]senderInfo, unwindTxs, minedTxs TxSlots, protocolBaseFee, pendingBaseFee, blockHeight uint64, senders *SendersCache) error AddNewGoodPeer(peerID PeerID) } @@ -172,7 +172,7 @@ func (sc *SendersCache) evict() int { return count } -func (sc *SendersCache) onNewTxs(coreDBTx kv.Tx, newTxs TxSlots) error { +func (sc *SendersCache) onNewTxs(coreDBTx kv.RoDB, newTxs TxSlots) error { sc.ensureSenderIDOnNewTxs(newTxs) toLoad := sc.setTxSenderID(newTxs) if len(toLoad) == 0 { @@ -186,29 +186,33 @@ func (sc *SendersCache) onNewTxs(coreDBTx kv.Tx, newTxs TxSlots) error { return nil } -func (sc *SendersCache) onNewBlock(coreDBTx kv.Tx, stateChanges map[string]senderInfo, unwindTxs, minedTxs TxSlots, blockHeight uint64) error { +func (sc *SendersCache) onNewBlock(stateChanges map[string]senderInfo, unwindTxs, minedTxs TxSlots, blockHeight uint64) error { //TODO: if see non-continuous block heigh - drop cache and reload from db sc.blockHeight.Store(blockHeight) //`loadSenders` goes by network to core - and it must be outside of SendersCache lock. But other methods must be locked sc.mergeStateChanges(stateChanges, unwindTxs, minedTxs) toLoad := sc.setTxSenderID(unwindTxs) - if len(toLoad) > 0 { + /* + if len(toLoad) > 0 { + diff, err := loadSenders(coreDBTx, toLoad) + if err != nil { + return err + } + sc.set(diff) + } + */ + toLoad = sc.setTxSenderID(minedTxs) + if len(toLoad) == 0 { + return nil + } + /* diff, err := loadSenders(coreDBTx, toLoad) if err != nil { return err } sc.set(diff) - } - toLoad = sc.setTxSenderID(minedTxs) - if len(toLoad) == 0 { - return nil - } - diff, err := loadSenders(coreDBTx, toLoad) - if err != nil { - return err - } - sc.set(diff) + */ return nil } func (sc *SendersCache) set(diff map[uint64]senderInfo) { @@ -306,22 +310,27 @@ func (sc *SendersCache) setTxSenderID(txs TxSlots) map[uint64]string { return toLoad } -func loadSenders(coreDB kv.Tx, toLoad map[uint64]string) (map[uint64]senderInfo, error) { +func loadSenders(coreDB kv.RoDB, toLoad map[uint64]string) (map[uint64]senderInfo, error) { diff := make(map[uint64]senderInfo, len(toLoad)) - for id := range toLoad { - encoded, err := coreDB.GetOne(kv.PlainState, []byte(toLoad[id])) - if err != nil { - return nil, err + if err := coreDB.View(context.Background(), func(tx kv.Tx) error { + for id := range toLoad { + encoded, err := tx.GetOne(kv.PlainState, []byte(toLoad[id])) + if err != nil { + return err + } + if len(encoded) == 0 { + diff[id] = *newSenderInfo(0, *uint256.NewInt(0)) + continue + } + nonce, balance, err := DecodeSender(encoded) + if err != nil { + return err + } + diff[id] = *newSenderInfo(nonce, balance) } - if len(encoded) == 0 { - diff[id] = *newSenderInfo(0, *uint256.NewInt(0)) - continue - } - nonce, balance, err := DecodeSender(encoded) - if err != nil { - return nil, err - } - diff[id] = *newSenderInfo(nonce, balance) + return nil + }); err != nil { + return nil, err } return diff, nil } @@ -437,7 +446,7 @@ func (p *TxPool) Started() bool { return p.protocolBaseFee.Load() > 0 } -func (p *TxPool) Add(coreDB kv.Tx, newTxs TxSlots, senders *SendersCache) error { +func (p *TxPool) Add(coreDB kv.RoDB, newTxs TxSlots, senders *SendersCache) error { t := time.Now() if err := senders.onNewTxs(coreDB, newTxs); err != nil { return err @@ -537,9 +546,9 @@ func (p *TxPool) setBaseFee(protocolBaseFee, pendingBaseFee uint64) (uint64, uin return protocolBaseFee, p.pendingBaseFee.Load() } -func (p *TxPool) OnNewBlock(coreDB kv.Tx, stateChanges map[string]senderInfo, unwindTxs, minedTxs TxSlots, protocolBaseFee, pendingBaseFee, blockHeight uint64, senders *SendersCache) error { +func (p *TxPool) OnNewBlock(stateChanges map[string]senderInfo, unwindTxs, minedTxs TxSlots, protocolBaseFee, pendingBaseFee, blockHeight uint64, senders *SendersCache) error { t := time.Now() - if err := senders.onNewBlock(coreDB, stateChanges, unwindTxs, minedTxs, blockHeight); err != nil { + if err := senders.onNewBlock(stateChanges, unwindTxs, minedTxs, blockHeight); err != nil { return err } //log.Debug("[txpool] new block", "unwinded", len(unwindTxs.txs), "mined", len(minedTxs.txs), "protocolBaseFee", protocolBaseFee, "blockHeight", blockHeight) diff --git a/txpool/pool_fuzz_test.go b/txpool/pool_fuzz_test.go index b4df7abb2..0f4f796bd 100644 --- a/txpool/pool_fuzz_test.go +++ b/txpool/pool_fuzz_test.go @@ -434,13 +434,13 @@ func FuzzOnNewBlocks11(f *testing.F) { // go to first fork //fmt.Printf("ll1: %d,%d,%d\n", pool.pending.Len(), pool.baseFee.Len(), pool.queued.Len()) unwindTxs, minedTxs1, p2pReceived, minedTxs2 := splitDataset(txs) - err = pool.OnNewBlock(nil, map[string]senderInfo{}, unwindTxs, minedTxs1, protocolBaseFee, pendingBaseFee, 1, sendersCache) + err = pool.OnNewBlock(map[string]senderInfo{}, unwindTxs, minedTxs1, protocolBaseFee, pendingBaseFee, 1, sendersCache) assert.NoError(err) check(unwindTxs, minedTxs1, "fork1") checkNotify(unwindTxs, minedTxs1, "fork1") // unwind everything and switch to new fork (need unwind mined now) - err = pool.OnNewBlock(nil, map[string]senderInfo{}, minedTxs1, minedTxs2, protocolBaseFee, pendingBaseFee, 2, sendersCache) + err = pool.OnNewBlock(map[string]senderInfo{}, minedTxs1, minedTxs2, protocolBaseFee, pendingBaseFee, 2, sendersCache) assert.NoError(err) check(minedTxs1, minedTxs2, "fork2") checkNotify(minedTxs1, minedTxs2, "fork2") From d8b5d8dc2ed2d10963d1dd2adec90a0d59ef256b Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Sat, 21 Aug 2021 18:18:29 +0700 Subject: [PATCH 8/8] save --- txpool/mocks_test.go | 31 ++++++++++++------------------- txpool/pool.go | 14 ++++++++------ 2 files changed, 20 insertions(+), 25 deletions(-) diff --git a/txpool/mocks_test.go b/txpool/mocks_test.go index 2b9d98438..198bca435 100644 --- a/txpool/mocks_test.go +++ b/txpool/mocks_test.go @@ -4,9 +4,8 @@ package txpool import ( - "sync" - "github.com/ledgerwatch/erigon-lib/kv" + "sync" ) // Ensure, that PoolMock does implement Pool. @@ -19,7 +18,7 @@ var _ Pool = &PoolMock{} // // // make and configure a mocked Pool // mockedPool := &PoolMock{ -// AddFunc: func(db kv.Tx, newTxs TxSlots, senders *SendersCache) error { +// AddFunc: func(db kv.RoDB, newTxs TxSlots, senders *SendersCache) error { // panic("mock out the Add method") // }, // AddNewGoodPeerFunc: func(peerID PeerID) { @@ -31,7 +30,7 @@ var _ Pool = &PoolMock{} // IdHashKnownFunc: func(hash []byte) bool { // panic("mock out the IdHashKnown method") // }, -// OnNewBlockFunc: func(db kv.Tx, stateChanges map[string]senderInfo, unwindTxs TxSlots, minedTxs TxSlots, protocolBaseFee uint64, pendingBaseFee uint64, blockHeight uint64, senders *SendersCache) error { +// OnNewBlockFunc: func(stateChanges map[string]senderInfo, unwindTxs TxSlots, minedTxs TxSlots, protocolBaseFee uint64, pendingBaseFee uint64, blockHeight uint64, senders *SendersCache) error { // panic("mock out the OnNewBlock method") // }, // StartedFunc: func() bool { @@ -45,7 +44,7 @@ var _ Pool = &PoolMock{} // } type PoolMock struct { // AddFunc mocks the Add method. - AddFunc func(db kv.Tx, newTxs TxSlots, senders *SendersCache) error + AddFunc func(db kv.RoDB, newTxs TxSlots, senders *SendersCache) error // AddNewGoodPeerFunc mocks the AddNewGoodPeer method. AddNewGoodPeerFunc func(peerID PeerID) @@ -57,7 +56,7 @@ type PoolMock struct { IdHashKnownFunc func(hash []byte) bool // OnNewBlockFunc mocks the OnNewBlock method. - OnNewBlockFunc func(db kv.Tx, stateChanges map[string]senderInfo, unwindTxs TxSlots, minedTxs TxSlots, protocolBaseFee uint64, pendingBaseFee uint64, blockHeight uint64, senders *SendersCache) error + OnNewBlockFunc func(stateChanges map[string]senderInfo, unwindTxs TxSlots, minedTxs TxSlots, protocolBaseFee uint64, pendingBaseFee uint64, blockHeight uint64, senders *SendersCache) error // StartedFunc mocks the Started method. StartedFunc func() bool @@ -67,7 +66,7 @@ type PoolMock struct { // Add holds details about calls to the Add method. Add []struct { // Db is the db argument value. - Db kv.Tx + Db kv.RoDB // NewTxs is the newTxs argument value. NewTxs TxSlots // Senders is the senders argument value. @@ -90,8 +89,6 @@ type PoolMock struct { } // OnNewBlock holds details about calls to the OnNewBlock method. OnNewBlock []struct { - // Db is the db argument value. - Db kv.Tx // StateChanges is the stateChanges argument value. StateChanges map[string]senderInfo // UnwindTxs is the unwindTxs argument value. @@ -120,9 +117,9 @@ type PoolMock struct { } // Add calls AddFunc. -func (mock *PoolMock) Add(db kv.Tx, newTxs TxSlots, senders *SendersCache) error { +func (mock *PoolMock) Add(db kv.RoDB, newTxs TxSlots, senders *SendersCache) error { callInfo := struct { - Db kv.Tx + Db kv.RoDB NewTxs TxSlots Senders *SendersCache }{ @@ -146,12 +143,12 @@ func (mock *PoolMock) Add(db kv.Tx, newTxs TxSlots, senders *SendersCache) error // Check the length with: // len(mockedPool.AddCalls()) func (mock *PoolMock) AddCalls() []struct { - Db kv.Tx + Db kv.RoDB NewTxs TxSlots Senders *SendersCache } { var calls []struct { - Db kv.Tx + Db kv.RoDB NewTxs TxSlots Senders *SendersCache } @@ -261,9 +258,8 @@ func (mock *PoolMock) IdHashKnownCalls() []struct { } // OnNewBlock calls OnNewBlockFunc. -func (mock *PoolMock) OnNewBlock(db kv.Tx, stateChanges map[string]senderInfo, unwindTxs TxSlots, minedTxs TxSlots, protocolBaseFee uint64, pendingBaseFee uint64, blockHeight uint64, senders *SendersCache) error { +func (mock *PoolMock) OnNewBlock(stateChanges map[string]senderInfo, unwindTxs TxSlots, minedTxs TxSlots, protocolBaseFee uint64, pendingBaseFee uint64, blockHeight uint64, senders *SendersCache) error { callInfo := struct { - Db kv.Tx StateChanges map[string]senderInfo UnwindTxs TxSlots MinedTxs TxSlots @@ -272,7 +268,6 @@ func (mock *PoolMock) OnNewBlock(db kv.Tx, stateChanges map[string]senderInfo, u BlockHeight uint64 Senders *SendersCache }{ - Db: db, StateChanges: stateChanges, UnwindTxs: unwindTxs, MinedTxs: minedTxs, @@ -290,14 +285,13 @@ func (mock *PoolMock) OnNewBlock(db kv.Tx, stateChanges map[string]senderInfo, u ) return errOut } - return mock.OnNewBlockFunc(db, stateChanges, unwindTxs, minedTxs, protocolBaseFee, pendingBaseFee, blockHeight, senders) + return mock.OnNewBlockFunc(stateChanges, unwindTxs, minedTxs, protocolBaseFee, pendingBaseFee, blockHeight, senders) } // OnNewBlockCalls gets all the calls that were made to OnNewBlock. // Check the length with: // len(mockedPool.OnNewBlockCalls()) func (mock *PoolMock) OnNewBlockCalls() []struct { - Db kv.Tx StateChanges map[string]senderInfo UnwindTxs TxSlots MinedTxs TxSlots @@ -307,7 +301,6 @@ func (mock *PoolMock) OnNewBlockCalls() []struct { Senders *SendersCache } { var calls []struct { - Db kv.Tx StateChanges map[string]senderInfo UnwindTxs TxSlots MinedTxs TxSlots diff --git a/txpool/pool.go b/txpool/pool.go index ed3f6837a..cadcf68e9 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -152,6 +152,7 @@ func (sc *SendersCache) len() int { return len(sc.senderInfo) } +/* func (sc *SendersCache) evict() int { sc.lock.Lock() defer sc.lock.Unlock() @@ -171,6 +172,7 @@ func (sc *SendersCache) evict() int { } return count } +*/ func (sc *SendersCache) onNewTxs(coreDBTx kv.RoDB, newTxs TxSlots) error { sc.ensureSenderIDOnNewTxs(newTxs) @@ -192,7 +194,7 @@ func (sc *SendersCache) onNewBlock(stateChanges map[string]senderInfo, unwindTxs //`loadSenders` goes by network to core - and it must be outside of SendersCache lock. But other methods must be locked sc.mergeStateChanges(stateChanges, unwindTxs, minedTxs) - toLoad := sc.setTxSenderID(unwindTxs) + _ = sc.setTxSenderID(unwindTxs) /* if len(toLoad) > 0 { diff, err := loadSenders(coreDBTx, toLoad) @@ -202,11 +204,11 @@ func (sc *SendersCache) onNewBlock(stateChanges map[string]senderInfo, unwindTxs sc.set(diff) } */ - toLoad = sc.setTxSenderID(minedTxs) - if len(toLoad) == 0 { - return nil - } + _ = sc.setTxSenderID(minedTxs) /* + if len(toLoad) == 0 { + return nil + } diff, err := loadSenders(coreDBTx, toLoad) if err != nil { return err @@ -552,6 +554,7 @@ func (p *TxPool) OnNewBlock(stateChanges map[string]senderInfo, unwindTxs, mined return err } //log.Debug("[txpool] new block", "unwinded", len(unwindTxs.txs), "mined", len(minedTxs.txs), "protocolBaseFee", protocolBaseFee, "blockHeight", blockHeight) + protocolBaseFee, pendingBaseFee = p.setBaseFee(protocolBaseFee, pendingBaseFee) if err := unwindTxs.Valid(); err != nil { return err } @@ -584,7 +587,6 @@ func (p *TxPool) OnNewBlock(stateChanges map[string]senderInfo, unwindTxs, mined // log.Debug("evicted senders", "amount", count) //} - protocolBaseFee, pendingBaseFee = p.setBaseFee(protocolBaseFee, pendingBaseFee) log.Info("on new block", "in", time.Since(t)) return nil }