diff --git a/kv/bucket.go b/kv/tables.go similarity index 99% rename from kv/bucket.go rename to kv/tables.go index 729d28660..64853cfe8 100644 --- a/kv/bucket.go +++ b/kv/tables.go @@ -341,7 +341,13 @@ var ChaindataTables = []string{ PendingEpoch, } -var TxPoolTables = []string{} +const ( + RecentLocalTransactions = "RecentLocalTransactions" // sequence_u64 -> tx_hash +) + +var TxPoolTables = []string{ + RecentLocalTransactions, +} var SentryTables = []string{} // ChaindataDeprecatedTables - list of buckets which can be programmatically deleted - for example after migration diff --git a/txpool/pool.go b/txpool/pool.go index ef60ed532..fd8c974ef 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -19,13 +19,14 @@ package txpool import ( "container/heap" "context" + "encoding/binary" "fmt" "math" "sync" "time" "github.com/google/btree" - lru "github.com/hashicorp/golang-lru" + "github.com/hashicorp/golang-lru/simplelru" "github.com/holiman/uint256" "github.com/ledgerwatch/erigon-lib/kv" "go.uber.org/atomic" @@ -125,7 +126,9 @@ type TxPool struct { pending, baseFee, queued *SubPool // track isLocal flag of already mined transactions. used at unwind. - localsHistory *lru.Cache + localsHistoryCommited time.Time + localsHistory *simplelru.LRU + db kv.RwDB // fields for transaction propagation recentlyConnectedPeers *recentlyConnectedPeers @@ -133,8 +136,14 @@ type TxPool struct { //lastTxPropagationTimestamp time.Time } -func New(newTxs chan Hashes) *TxPool { - localsHistory, _ := lru.New(1024) +func New(newTxs chan Hashes, db kv.RwDB) (*TxPool, error) { + localsHistory, err := simplelru.NewLRU(1024, nil) + if err != nil { + return nil, err + } + if err = restoreIsLocalHistory(db, localsHistory); err != nil { + return nil, err + } return &TxPool{ lock: &sync.RWMutex{}, senderInfo: map[uint64]*senderInfo{}, @@ -145,7 +154,8 @@ func New(newTxs chan Hashes) *TxPool { baseFee: NewSubPool(), queued: NewSubPool(), newTxs: newTxs, - } + db: db, + }, nil } func (p *TxPool) GetRlp(hash []byte) []byte { @@ -242,7 +252,7 @@ func (p *TxPool) Add(coreDB kv.Tx, newTxs TxSlots) error { return nil } -func onNewTxs(senderInfo map[uint64]*senderInfo, newTxs TxSlots, protocolBaseFee, blockBaseFee uint64, pending, baseFee, queued *SubPool, byHash map[string]*metaTx, localsHistory *lru.Cache) error { +func onNewTxs(senderInfo map[uint64]*senderInfo, newTxs TxSlots, protocolBaseFee, blockBaseFee uint64, pending, baseFee, queued *SubPool, byHash map[string]*metaTx, localsHistory *simplelru.LRU) error { for i := range newTxs.txs { if newTxs.txs[i].senderID == 0 { return fmt.Errorf("senderID can't be zero") @@ -354,9 +364,11 @@ func (p *TxPool) OnNewBlock(coreDB kv.Tx, stateChanges map[string]senderInfo, un } */ + if err := commitIsLocalHistory(p.db, p.localsHistoryCommited, p.localsHistory); err != nil { + return err + } return nil } - func setTxSenderID(coreDB kv.Tx, senderIDSequence *uint64, senderIDs map[string]uint64, sendersInfo map[uint64]*senderInfo, txs TxSlots) error { for i := range txs.txs { addr := string(txs.senders.At(i)) @@ -386,7 +398,7 @@ func setTxSenderID(coreDB kv.Tx, senderIDSequence *uint64, senderIDs map[string] return nil } -func onNewBlock(senderInfo map[uint64]*senderInfo, unwindTxs TxSlots, minedTxs []*TxSlot, protocolBaseFee, blockBaseFee uint64, pending, baseFee, queued *SubPool, byHash map[string]*metaTx, localsHistory *lru.Cache) error { +func onNewBlock(senderInfo map[uint64]*senderInfo, unwindTxs TxSlots, minedTxs []*TxSlot, protocolBaseFee, blockBaseFee uint64, pending, baseFee, queued *SubPool, byHash map[string]*metaTx, localsHistory *simplelru.LRU) error { for i := range unwindTxs.txs { if unwindTxs.txs[i].senderID == 0 { return fmt.Errorf("onNewBlock.unwindTxs: senderID can't be zero") @@ -865,6 +877,44 @@ func BroadcastLoop(ctx context.Context, p *TxPool, newTxs chan Hashes, send *Sen } } +func commitIsLocalHistory(db kv.RwDB, commited time.Time, localsHistory *simplelru.LRU) error { + if db == nil || time.Since(commited) < 30*time.Second { + return nil + } + txHashes := localsHistory.Keys() + key := make([]byte, 8) + return db.Update(context.Background(), func(tx kv.RwTx) error { + if err := tx.ClearBucket(kv.RecentLocalTransactions); err != nil { + return err + } + for i := range txHashes { + binary.BigEndian.PutUint64(key, uint64(i)) + if err := tx.Append(kv.RecentLocalTransactions, key, txHashes[i].([]byte)); err != nil { + return err + } + } + return nil + }) +} + +func restoreIsLocalHistory(db kv.RwDB, localsHistory *simplelru.LRU) error { + if db == nil { + return nil + } + return db.View(context.Background(), func(tx kv.Tx) error { + return tx.ForPrefix(kv.RecentLocalTransactions, nil, func(k, v []byte) error { + localsHistory.Add(copyBytes(v), struct{}{}) + return nil + }) + }) +} + +func copyBytes(b []byte) (copiedBytes []byte) { + copiedBytes = make([]byte, len(b)) + copy(copiedBytes, b) + return +} + // recentlyConnectedPeers does buffer IDs of recently connected good peers // then sync of pooled Transaction can happen to all of then at once // DoS protection and performance saving diff --git a/txpool/pool_fuzz_test.go b/txpool/pool_fuzz_test.go index d905de78f..3c4e6098a 100644 --- a/txpool/pool_fuzz_test.go +++ b/txpool/pool_fuzz_test.go @@ -258,7 +258,8 @@ func FuzzOnNewBlocks11(f *testing.F) { var prevTotal int ch := make(chan Hashes, 100) - pool := New(ch) + pool, err := New(ch, nil) + assert.NoError(err) pool.senderInfo = senders pool.senderIDs = senderIDs check := func(unwindTxs, minedTxs TxSlots, msg string) {