From 0286d092e2a645d65388c7497acdaecf69f54109 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Mon, 16 Aug 2021 16:34:12 +0700 Subject: [PATCH 01/12] save --- txpool/pool.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/txpool/pool.go b/txpool/pool.go index c2a27b107..afe71f662 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -243,7 +243,7 @@ func (p *TxPool) Add(coreDB kv.Tx, newTxs TxSlots) error { protocolBaseFee, pendingBaseFee := p.protocolBaseFee.Load(), p.pendingBaseFee.Load() if protocolBaseFee == 0 || pendingBaseFee == 0 { - return fmt.Errorf("non-zero base fee") + return fmt.Errorf("non-zero base fee: %d,%d", protocolBaseFee, pendingBaseFee) } if err := setTxSenderID(coreDB, &p.senderID, p.senderIDs, p.senderInfo, newTxs); err != nil { From 1eff0781102b6574b1d6a8ac929744be07eabf70 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Mon, 16 Aug 2021 16:35:49 +0700 Subject: [PATCH 02/12] save --- txpool/pool.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/txpool/pool.go b/txpool/pool.go index afe71f662..8bce454ff 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -230,7 +230,7 @@ func (p *TxPool) Started() bool { defer p.lock.Unlock() protocolBaseFee := p.protocolBaseFee.Load() - return protocolBaseFee == 0 + return protocolBaseFee > 0 } func (p *TxPool) Add(coreDB kv.Tx, newTxs TxSlots) error { From 6f686f510bb9094ef52bee33d47dfd126d5106de Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Mon, 16 Aug 2021 16:40:07 +0700 Subject: [PATCH 03/12] save --- txpool/pool.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/txpool/pool.go b/txpool/pool.go index 8bce454ff..0240685d0 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -165,7 +165,8 @@ func New(newTxs chan Hashes, db kv.RwDB) (*TxPool, error) { func (p *TxPool) logStats() { p.lock.RLock() defer p.lock.RUnlock() - log.Info(fmt.Sprintf("[txpool] queues size: pending=%d/%d, baseFee=%d/%d, queued=%d/%d", p.pending.Len(), PendingSubPoolLimit, p.baseFee.Len(), BaseFeeSubPoolLimit, p.pending.Len(), PendingSubPoolLimit)) + protocolBaseFee, pendingBaseFee := p.protocolBaseFee.Load(), p.pendingBaseFee.Load() + log.Info(fmt.Sprintf("[txpool] baseFee: protocol=%d,pending=%d; queues size: pending=%d/%d, baseFee=%d/%d, queued=%d/%d", protocolBaseFee, pendingBaseFee, p.pending.Len(), PendingSubPoolLimit, p.baseFee.Len(), BaseFeeSubPoolLimit, p.pending.Len(), PendingSubPoolLimit)) } func (p *TxPool) GetRlp(hash []byte) []byte { p.lock.RLock() From de4c2982819f409fe24cc84d49a2c2acf5e22f64 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Tue, 17 Aug 2021 10:10:54 +0700 Subject: [PATCH 04/12] add txpool tables --- kv/tables.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/kv/tables.go b/kv/tables.go index 64853cfe8..5649b320f 100644 --- a/kv/tables.go +++ b/kv/tables.go @@ -414,6 +414,8 @@ var ChaindataTablesCfg = TableCfg{ }, } +var TxpoolTablesCfg = TableCfg{} + func sortBuckets() { sort.SliceStable(ChaindataTables, func(i, j int) bool { return strings.Compare(ChaindataTables[i], ChaindataTables[j]) < 0 @@ -443,4 +445,12 @@ func reinit() { tmp.IsDeprecated = true ChaindataTablesCfg[name] = tmp } + + for _, name := range TxPoolTables { + _, ok := TxpoolTablesCfg[name] + if !ok { + TxpoolTablesCfg[name] = TableCfgItem{} + } + } + } From 601e5fa4438e2faf05d0866065d0db1a8066d71f Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Tue, 17 Aug 2021 10:32:18 +0700 Subject: [PATCH 05/12] add txpool tables --- txpool/fetch.go | 2 +- txpool/pool.go | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/txpool/fetch.go b/txpool/fetch.go index 24c0d9535..35a710e3b 100644 --- a/txpool/fetch.go +++ b/txpool/fetch.go @@ -55,7 +55,7 @@ type Timings struct { var DefaultTimings = Timings{ syncToNewPeersEvery: 2 * time.Minute, - logEvery: 30 * time.Second, + logEvery: 10 * time.Second, } // NewFetch creates a new fetch object that will work with given sentry clients. Since the diff --git a/txpool/pool.go b/txpool/pool.go index 0240685d0..a007a4667 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -331,6 +331,7 @@ func (p *TxPool) setBaseFee(protocolBaseFee, pendingBaseFee uint64) (uint64, uin if hasNewVal { p.protocolBaseFee.Store(pendingBaseFee) } + log.Debug("set base fee", "protocol", protocolBaseFee, "pending", pendingBaseFee) return protocolBaseFee, p.pendingBaseFee.Load() } From 491370a8ab5667a0a74c2ed4eaf905b62093ce71 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Tue, 17 Aug 2021 10:34:09 +0700 Subject: [PATCH 06/12] add txpool tables --- txpool/pool.go | 1 + 1 file changed, 1 insertion(+) diff --git a/txpool/pool.go b/txpool/pool.go index a007a4667..73fcde3ef 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -347,6 +347,7 @@ func (p *TxPool) OnNewBlock(coreDB kv.Tx, stateChanges map[string]senderInfo, un } p.blockHeight.Store(blockHeight) + log.Debug("before set base fee", "protocol", protocolBaseFee, "pending", pendingBaseFee) protocolBaseFee, pendingBaseFee = p.setBaseFee(protocolBaseFee, pendingBaseFee) if err := setTxSenderID(coreDB, &p.senderID, p.senderIDs, p.senderInfo, unwindTxs); err != nil { From 3a2ee52c23955ffc06fd88d16a74fa8083002439 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Tue, 17 Aug 2021 10:36:07 +0700 Subject: [PATCH 07/12] add txpool tables --- txpool/pool.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/txpool/pool.go b/txpool/pool.go index 73fcde3ef..cbd25ac60 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -339,6 +339,10 @@ func (p *TxPool) OnNewBlock(coreDB kv.Tx, stateChanges map[string]senderInfo, un log.Debug("[txpool.onNewBlock]", "unwinded", len(unwindTxs.txs), "mined", len(minedTxs.txs), "protocolBaseFee", protocolBaseFee, "blockHeight", blockHeight) p.lock.Lock() defer p.lock.Unlock() + log.Debug("before set base fee", "protocol", protocolBaseFee, "pending", pendingBaseFee) + p.blockHeight.Store(blockHeight) + protocolBaseFee, pendingBaseFee = p.setBaseFee(protocolBaseFee, pendingBaseFee) + if err := unwindTxs.Valid(); err != nil { return err } @@ -346,10 +350,6 @@ func (p *TxPool) OnNewBlock(coreDB kv.Tx, stateChanges map[string]senderInfo, un return err } - p.blockHeight.Store(blockHeight) - log.Debug("before set base fee", "protocol", protocolBaseFee, "pending", pendingBaseFee) - protocolBaseFee, pendingBaseFee = p.setBaseFee(protocolBaseFee, pendingBaseFee) - if err := setTxSenderID(coreDB, &p.senderID, p.senderIDs, p.senderInfo, unwindTxs); err != nil { return err } From 8e26a7eb1385c017191b6e7c9b4864df2cc7c565 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Tue, 17 Aug 2021 11:06:17 +0700 Subject: [PATCH 08/12] fix many nil-pointers --- txpool/fetch.go | 2 +- txpool/pool.go | 10 ++++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/txpool/fetch.go b/txpool/fetch.go index 35a710e3b..24c0d9535 100644 --- a/txpool/fetch.go +++ b/txpool/fetch.go @@ -55,7 +55,7 @@ type Timings struct { var DefaultTimings = Timings{ syncToNewPeersEvery: 2 * time.Minute, - logEvery: 10 * time.Second, + logEvery: 30 * time.Second, } // NewFetch creates a new fetch object that will work with given sentry clients. Since the diff --git a/txpool/pool.go b/txpool/pool.go index cbd25ac60..a8470ab41 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -150,6 +150,7 @@ func New(newTxs chan Hashes, db kv.RwDB) (*TxPool, error) { } return &TxPool{ lock: &sync.RWMutex{}, + senderIDs: map[string]uint64{}, senderInfo: map[uint64]*senderInfo{}, byHash: map[string]*metaTx{}, localsHistory: localsHistory, @@ -327,9 +328,10 @@ func (p *TxPool) setBaseFee(protocolBaseFee, pendingBaseFee uint64) (uint64, uin hasNewVal := pendingBaseFee > 0 if pendingBaseFee < protocolBaseFee { pendingBaseFee = protocolBaseFee + hasNewVal = true } if hasNewVal { - p.protocolBaseFee.Store(pendingBaseFee) + p.pendingBaseFee.Store(pendingBaseFee) } log.Debug("set base fee", "protocol", protocolBaseFee, "pending", pendingBaseFee) return protocolBaseFee, p.pendingBaseFee.Load() @@ -339,9 +341,9 @@ func (p *TxPool) OnNewBlock(coreDB kv.Tx, stateChanges map[string]senderInfo, un log.Debug("[txpool.onNewBlock]", "unwinded", len(unwindTxs.txs), "mined", len(minedTxs.txs), "protocolBaseFee", protocolBaseFee, "blockHeight", blockHeight) p.lock.Lock() defer p.lock.Unlock() - log.Debug("before set base fee", "protocol", protocolBaseFee, "pending", pendingBaseFee) p.blockHeight.Store(blockHeight) protocolBaseFee, pendingBaseFee = p.setBaseFee(protocolBaseFee, pendingBaseFee) + log.Debug("before set base fee", "protocol", p.protocolBaseFee.Load(), "pending", p.pendingBaseFee.Load()) if err := unwindTxs.Valid(); err != nil { return err @@ -358,7 +360,7 @@ func (p *TxPool) OnNewBlock(coreDB kv.Tx, stateChanges map[string]senderInfo, un } for addr, id := range p.senderIDs { // merge state changes if v, ok := stateChanges[addr]; ok { - p.senderInfo[id] = &v + p.senderInfo[id] = newSenderInfo(v.nonce, v.balance) } } @@ -428,7 +430,7 @@ func setTxSenderID(coreDB kv.Tx, senderIDSequence *uint64, senderIDs map[string] if err != nil { return err } - sendersInfo[txs.txs[i].senderID] = &senderInfo{nonce: nonce, balance: balance} + sendersInfo[txs.txs[i].senderID] = newSenderInfo(nonce, balance) } } return nil From 9e67a858e6a0c21854f2cf5f04cea3d1b2e177d1 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Tue, 17 Aug 2021 11:19:14 +0700 Subject: [PATCH 09/12] fix many nil-pointers --- txpool/pool.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/txpool/pool.go b/txpool/pool.go index a8470ab41..bb0d769ad 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -160,6 +160,7 @@ func New(newTxs chan Hashes, db kv.RwDB) (*TxPool, error) { queued: NewSubPool(), newTxs: newTxs, db: db, + senderID: 1, }, nil } @@ -187,7 +188,7 @@ func (p *TxPool) AppendLocalHashes(buf []byte) { if txn.subPool&IsLocal == 0 { continue } - copy(buf[i*32:(i+1)*32], hash) + buf = append(buf[i*32:], hash...) i++ } } @@ -200,7 +201,7 @@ func (p *TxPool) AppendRemoteHashes(buf []byte) { if txn.subPool&IsLocal != 0 { continue } - copy(buf[i*32:(i+1)*32], hash) + buf = append(buf[i*32:], hash...) i++ } } @@ -343,8 +344,6 @@ func (p *TxPool) OnNewBlock(coreDB kv.Tx, stateChanges map[string]senderInfo, un defer p.lock.Unlock() p.blockHeight.Store(blockHeight) protocolBaseFee, pendingBaseFee = p.setBaseFee(protocolBaseFee, pendingBaseFee) - log.Debug("before set base fee", "protocol", p.protocolBaseFee.Load(), "pending", p.pendingBaseFee.Load()) - if err := unwindTxs.Valid(); err != nil { return err } @@ -415,7 +414,8 @@ func setTxSenderID(coreDB kv.Tx, senderIDSequence *uint64, senderIDs map[string] id, ok := senderIDs[addr] if !ok { *senderIDSequence++ - senderIDs[addr] = *senderIDSequence + id = *senderIDSequence + senderIDs[addr] = id } txs.txs[i].senderID = id @@ -629,7 +629,6 @@ func onSenderChange(sender *senderInfo, protocolBaseFee, pendingBaseFee uint64) // baseFee of the currently pending block. Set to 0 otherwise. it.metaTx.subPool &^= EnoughFeeCapBlock if it.metaTx.Tx.feeCap >= pendingBaseFee { - fmt.Printf("setttttt: %d,%d,%d\n", protocolBaseFee, pendingBaseFee, it.metaTx.Tx.feeCap) it.metaTx.subPool |= EnoughFeeCapBlock } From e4acb2ae3af6ab1ac637aefe3362429e9c0bf703 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Tue, 17 Aug 2021 11:22:47 +0700 Subject: [PATCH 10/12] fix many nil-pointers --- txpool/pool.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/txpool/pool.go b/txpool/pool.go index bb0d769ad..3a3d799a8 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -183,26 +183,22 @@ func (p *TxPool) GetRlp(hash []byte) []byte { func (p *TxPool) AppendLocalHashes(buf []byte) { p.lock.RLock() defer p.lock.RUnlock() - i := 0 for hash, txn := range p.byHash { if txn.subPool&IsLocal == 0 { continue } - buf = append(buf[i*32:], hash...) - i++ + buf = append(buf, hash...) } } func (p *TxPool) AppendRemoteHashes(buf []byte) { p.lock.RLock() defer p.lock.RUnlock() - i := 0 for hash, txn := range p.byHash { if txn.subPool&IsLocal != 0 { continue } - buf = append(buf[i*32:], hash...) - i++ + buf = append(buf, hash...) } } func (p *TxPool) AppendAllHashes(buf []byte) { From 58e62a50b5e6e7cde5eca91aff3ca111290240c5 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Tue, 17 Aug 2021 12:02:13 +0700 Subject: [PATCH 11/12] fix many nil-pointers --- txpool/pool.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/txpool/pool.go b/txpool/pool.go index 3a3d799a8..103a915f6 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -168,7 +168,7 @@ func (p *TxPool) logStats() { p.lock.RLock() defer p.lock.RUnlock() protocolBaseFee, pendingBaseFee := p.protocolBaseFee.Load(), p.pendingBaseFee.Load() - log.Info(fmt.Sprintf("[txpool] baseFee: protocol=%d,pending=%d; queues size: pending=%d/%d, baseFee=%d/%d, queued=%d/%d", protocolBaseFee, pendingBaseFee, p.pending.Len(), PendingSubPoolLimit, p.baseFee.Len(), BaseFeeSubPoolLimit, p.pending.Len(), PendingSubPoolLimit)) + log.Info(fmt.Sprintf("[txpool] baseFee: protocol=%d,pending=%d; queues size: pending=%d/%d, baseFee=%d/%d, queued=%d/%d", protocolBaseFee, pendingBaseFee, p.pending.Len(), PendingSubPoolLimit, p.baseFee.Len(), BaseFeeSubPoolLimit, p.queued.Len(), QueuedSubPoolLimit)) } func (p *TxPool) GetRlp(hash []byte) []byte { p.lock.RLock() From df4c8e2309ce505fc2fdd3252608b392fb269ea8 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Tue, 17 Aug 2021 15:46:59 +0700 Subject: [PATCH 12/12] fix many nil-pointers --- txpool/pool.go | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/txpool/pool.go b/txpool/pool.go index 103a915f6..a1ad49dca 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -180,7 +180,7 @@ func (p *TxPool) GetRlp(hash []byte) []byte { } return txn.Tx.rlp } -func (p *TxPool) AppendLocalHashes(buf []byte) { +func (p *TxPool) AppendLocalHashes(buf []byte) []byte { p.lock.RLock() defer p.lock.RUnlock() for hash, txn := range p.byHash { @@ -189,8 +189,9 @@ func (p *TxPool) AppendLocalHashes(buf []byte) { } buf = append(buf, hash...) } + return buf } -func (p *TxPool) AppendRemoteHashes(buf []byte) { +func (p *TxPool) AppendRemoteHashes(buf []byte) []byte { p.lock.RLock() defer p.lock.RUnlock() @@ -200,10 +201,12 @@ func (p *TxPool) AppendRemoteHashes(buf []byte) { } buf = append(buf, hash...) } + return buf } -func (p *TxPool) AppendAllHashes(buf []byte) { - p.AppendLocalHashes(buf) - p.AppendRemoteHashes(buf[len(buf):]) +func (p *TxPool) AppendAllHashes(buf []byte) []byte { + buf = p.AppendLocalHashes(buf) + buf = p.AppendRemoteHashes(buf) + return buf } func (p *TxPool) IdHashKnown(hash []byte) bool { p.lock.RLock() @@ -444,7 +447,9 @@ func onNewBlock(senderInfo map[uint64]*senderInfo, unwindTxs TxSlots, minedTxs [ } } + j := 0 removeMined(senderInfo, minedTxs, pending, baseFee, queued, func(i *metaTx) { + j++ delete(byHash, string(i.Tx.idHash[:])) senderInfo[i.Tx.senderID].txNonce2Tx.Delete(&nonce2TxItem{i}) if i.subPool&IsLocal != 0 { @@ -452,6 +457,7 @@ func onNewBlock(senderInfo map[uint64]*senderInfo, unwindTxs TxSlots, minedTxs [ localsHistory.Add(i.Tx.idHash, struct{}{}) } }) + log.Info("remove mined", "removed", j, "minedTxsLen", len(minedTxs)) // This can be thought of a reverse operation from the one described before. // When a block that was deemed "the best" of its height, is no longer deemed "the best", the @@ -525,10 +531,10 @@ func removeMined(senderInfo map[uint64]*senderInfo, minedTxs []*TxSlot, pending, // delete mined transactions from everywhere sender.txNonce2Tx.Ascend(func(i btree.Item) bool { it := i.(*nonce2TxItem) + fmt.Printf("nonce cmp: %d,%d, senderID=%d\n", it.metaTx.Tx.nonce, sender.nonce, tx.senderID) if it.metaTx.Tx.nonce > sender.nonce { return false } - // TODO: save local transactions to cache with TTL, in case of re-org - to restore isLocal flag of re-injected transactions // del from nonce2tx mapping sender.txNonce2Tx.Delete(i) @@ -904,7 +910,7 @@ func BroadcastLoop(ctx context.Context, p *TxPool, newTxs chan Hashes, send *Sen if len(newPeers) == 0 { continue } - p.AppendAllHashes(remoteTxHashes[:0]) + remoteTxHashes = p.AppendAllHashes(remoteTxHashes[:0]) send.PropagatePooledTxsToPeersList(newPeers, remoteTxHashes) } }