Merge pull request #28 from ledgerwatch/pool12

Pool: save isLocalLRU to db
This commit is contained in:
Alex Sharov 2021-08-13 12:32:56 +07:00 committed by GitHub
commit 1b9bcfa1c9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 67 additions and 10 deletions

View File

@ -341,7 +341,13 @@ var ChaindataTables = []string{
PendingEpoch,
}
var TxPoolTables = []string{}
const (
RecentLocalTransactions = "RecentLocalTransactions" // sequence_u64 -> tx_hash
)
var TxPoolTables = []string{
RecentLocalTransactions,
}
var SentryTables = []string{}
// ChaindataDeprecatedTables - list of buckets which can be programmatically deleted - for example after migration

View File

@ -19,13 +19,14 @@ package txpool
import (
"container/heap"
"context"
"encoding/binary"
"fmt"
"math"
"sync"
"time"
"github.com/google/btree"
lru "github.com/hashicorp/golang-lru"
"github.com/hashicorp/golang-lru/simplelru"
"github.com/holiman/uint256"
"github.com/ledgerwatch/erigon-lib/kv"
"go.uber.org/atomic"
@ -125,7 +126,9 @@ type TxPool struct {
pending, baseFee, queued *SubPool
// track isLocal flag of already mined transactions. used at unwind.
localsHistory *lru.Cache
localsHistoryCommited time.Time
localsHistory *simplelru.LRU
db kv.RwDB
// fields for transaction propagation
recentlyConnectedPeers *recentlyConnectedPeers
@ -133,8 +136,14 @@ type TxPool struct {
//lastTxPropagationTimestamp time.Time
}
func New(newTxs chan Hashes) *TxPool {
localsHistory, _ := lru.New(1024)
func New(newTxs chan Hashes, db kv.RwDB) (*TxPool, error) {
localsHistory, err := simplelru.NewLRU(1024, nil)
if err != nil {
return nil, err
}
if err = restoreIsLocalHistory(db, localsHistory); err != nil {
return nil, err
}
return &TxPool{
lock: &sync.RWMutex{},
senderInfo: map[uint64]*senderInfo{},
@ -145,7 +154,8 @@ func New(newTxs chan Hashes) *TxPool {
baseFee: NewSubPool(),
queued: NewSubPool(),
newTxs: newTxs,
}
db: db,
}, nil
}
func (p *TxPool) GetRlp(hash []byte) []byte {
@ -242,7 +252,7 @@ func (p *TxPool) Add(coreDB kv.Tx, newTxs TxSlots) error {
return nil
}
func onNewTxs(senderInfo map[uint64]*senderInfo, newTxs TxSlots, protocolBaseFee, blockBaseFee uint64, pending, baseFee, queued *SubPool, byHash map[string]*metaTx, localsHistory *lru.Cache) error {
func onNewTxs(senderInfo map[uint64]*senderInfo, newTxs TxSlots, protocolBaseFee, blockBaseFee uint64, pending, baseFee, queued *SubPool, byHash map[string]*metaTx, localsHistory *simplelru.LRU) error {
for i := range newTxs.txs {
if newTxs.txs[i].senderID == 0 {
return fmt.Errorf("senderID can't be zero")
@ -354,9 +364,11 @@ func (p *TxPool) OnNewBlock(coreDB kv.Tx, stateChanges map[string]senderInfo, un
}
*/
if err := commitIsLocalHistory(p.db, p.localsHistoryCommited, p.localsHistory); err != nil {
return err
}
return nil
}
func setTxSenderID(coreDB kv.Tx, senderIDSequence *uint64, senderIDs map[string]uint64, sendersInfo map[uint64]*senderInfo, txs TxSlots) error {
for i := range txs.txs {
addr := string(txs.senders.At(i))
@ -386,7 +398,7 @@ func setTxSenderID(coreDB kv.Tx, senderIDSequence *uint64, senderIDs map[string]
return nil
}
func onNewBlock(senderInfo map[uint64]*senderInfo, unwindTxs TxSlots, minedTxs []*TxSlot, protocolBaseFee, blockBaseFee uint64, pending, baseFee, queued *SubPool, byHash map[string]*metaTx, localsHistory *lru.Cache) error {
func onNewBlock(senderInfo map[uint64]*senderInfo, unwindTxs TxSlots, minedTxs []*TxSlot, protocolBaseFee, blockBaseFee uint64, pending, baseFee, queued *SubPool, byHash map[string]*metaTx, localsHistory *simplelru.LRU) error {
for i := range unwindTxs.txs {
if unwindTxs.txs[i].senderID == 0 {
return fmt.Errorf("onNewBlock.unwindTxs: senderID can't be zero")
@ -865,6 +877,44 @@ func BroadcastLoop(ctx context.Context, p *TxPool, newTxs chan Hashes, send *Sen
}
}
func commitIsLocalHistory(db kv.RwDB, commited time.Time, localsHistory *simplelru.LRU) error {
if db == nil || time.Since(commited) < 30*time.Second {
return nil
}
txHashes := localsHistory.Keys()
key := make([]byte, 8)
return db.Update(context.Background(), func(tx kv.RwTx) error {
if err := tx.ClearBucket(kv.RecentLocalTransactions); err != nil {
return err
}
for i := range txHashes {
binary.BigEndian.PutUint64(key, uint64(i))
if err := tx.Append(kv.RecentLocalTransactions, key, txHashes[i].([]byte)); err != nil {
return err
}
}
return nil
})
}
func restoreIsLocalHistory(db kv.RwDB, localsHistory *simplelru.LRU) error {
if db == nil {
return nil
}
return db.View(context.Background(), func(tx kv.Tx) error {
return tx.ForPrefix(kv.RecentLocalTransactions, nil, func(k, v []byte) error {
localsHistory.Add(copyBytes(v), struct{}{})
return nil
})
})
}
func copyBytes(b []byte) (copiedBytes []byte) {
copiedBytes = make([]byte, len(b))
copy(copiedBytes, b)
return
}
// recentlyConnectedPeers does buffer IDs of recently connected good peers
// then sync of pooled Transaction can happen to all of then at once
// DoS protection and performance saving

View File

@ -258,7 +258,8 @@ func FuzzOnNewBlocks11(f *testing.F) {
var prevTotal int
ch := make(chan Hashes, 100)
pool := New(ch)
pool, err := New(ch, nil)
assert.NoError(err)
pool.senderInfo = senders
pool.senderIDs = senderIDs
check := func(unwindTxs, minedTxs TxSlots, msg string) {