Merge pull request #38 from ledgerwatch/pool22

Pool: lazy db tx, don't go to db onNewBlock, add pool.Started check to couple more places, start pool after first onNewBlock (not before)
This commit is contained in:
Alex Sharov 2021-08-21 18:24:52 +07:00 committed by GitHub
commit 83d0dd38ea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 158 additions and 120 deletions

View File

@ -40,13 +40,15 @@ import (
// genesis hash and list of forks, but with zero max block and total difficulty
// Sentry should have a logic not to overwrite statusData with messages from tx pool
type Fetch struct {
ctx context.Context // Context used for cancellation and closing of the fetcher
sentryClients []sentry.SentryClient // sentry clients that will be used for accessing the network
pool Pool // Transaction pool implementation
senders *SendersCache
coreDB kv.RoDB
wg *sync.WaitGroup // used for synchronisation in the tests (nil when not in tests)
stateChangesClient remote.KVClient
ctx context.Context // Context used for cancellation and closing of the fetcher
sentryClients []sentry.SentryClient // sentry clients that will be used for accessing the network
pool Pool // Transaction pool implementation
senders *SendersCache
coreDB kv.RoDB
wg *sync.WaitGroup // used for synchronisation in the tests (nil when not in tests)
stateChangesClient remote.KVClient
stateChangesParseCtx *TxParseContext
pooledTxsParseCtx *TxParseContext
}
type Timings struct {
@ -63,13 +65,17 @@ var DefaultTimings = Timings{
// SentryClient here is an interface, it is suitable for mocking in tests (mock will need
// to implement all the functions of the SentryClient interface).
func NewFetch(ctx context.Context, sentryClients []sentry.SentryClient, pool Pool, senders *SendersCache, stateChangesClient remote.KVClient, db kv.RoDB) *Fetch {
pooledTxsParseCtx := NewTxParseContext()
pooledTxsParseCtx.Reject(func(hash []byte) bool { return pool.IdHashKnown(hash) })
return &Fetch{
ctx: ctx,
sentryClients: sentryClients,
pool: pool,
senders: senders,
coreDB: db,
stateChangesClient: stateChangesClient,
ctx: ctx,
sentryClients: sentryClients,
pool: pool,
senders: senders,
coreDB: db,
stateChangesClient: stateChangesClient,
stateChangesParseCtx: NewTxParseContext(),
pooledTxsParseCtx: pooledTxsParseCtx,
}
}
@ -193,6 +199,9 @@ func (f *Fetch) receiveMessage(ctx context.Context, sentryClient sentry.SentryCl
func (f *Fetch) handleInboundMessage(ctx context.Context, req *sentry.InboundMessage, sentryClient sentry.SentryClient) error {
switch req.Id {
case sentry.MessageId_NEW_POOLED_TRANSACTION_HASHES_66, sentry.MessageId_NEW_POOLED_TRANSACTION_HASHES_65:
if !f.pool.Started() {
return nil
}
hashCount, pos, err := ParseHashesCount(req.Data, 0)
if err != nil {
return fmt.Errorf("parsing NewPooledTransactionHashes: %w", err)
@ -228,6 +237,9 @@ func (f *Fetch) handleInboundMessage(ctx context.Context, req *sentry.InboundMes
}
}
case sentry.MessageId_GET_POOLED_TRANSACTIONS_66, sentry.MessageId_GET_POOLED_TRANSACTIONS_65:
if !f.pool.Started() {
return nil
}
//TODO: handleInboundMessage is single-threaded - means it can accept as argument couple buffers (or analog of txParseContext). Protobuf encoding will copy data anyway, but DirectClient doesn't
var encodedRequest []byte
messageId := sentry.MessageId_POOLED_TRANSACTIONS_66
@ -275,30 +287,20 @@ func (f *Fetch) handleInboundMessage(ctx context.Context, req *sentry.InboundMes
if !f.pool.Started() {
return nil
}
parseCtx := NewTxParseContext()
parseCtx.Reject(func(hash []byte) bool {
//fmt.Printf("check: %t\n", f.pool.IdHashKnown(hash))
return f.pool.IdHashKnown(hash)
})
txs := TxSlots{}
if req.Id == sentry.MessageId_GET_POOLED_TRANSACTIONS_66 {
if _, err := ParsePooledTransactions65(req.Data, 0, parseCtx, &txs); err != nil {
if _, err := ParsePooledTransactions65(req.Data, 0, f.pooledTxsParseCtx, &txs); err != nil {
return err
}
} else {
if _, _, err := ParsePooledTransactions66(req.Data, 0, parseCtx, &txs); err != nil {
if _, _, err := ParsePooledTransactions66(req.Data, 0, f.pooledTxsParseCtx, &txs); err != nil {
return err
}
}
if len(txs.txs) == 0 {
return nil
}
if err := f.coreDB.View(ctx, func(tx kv.Tx) error {
return f.pool.Add(tx, txs, f.senders)
}); err != nil {
return err
}
return f.pool.Add(f.coreDB, txs, f.senders)
default:
//defer log.Info("dropped", "id", req.Id)
}
@ -397,13 +399,12 @@ func (f *Fetch) handleStateChanges(ctx context.Context, client remote.KVClient)
return nil
}
parseCtx := NewTxParseContext()
var unwindTxs, minedTxs TxSlots
if req.Direction == remote.Direction_FORWARD {
minedTxs.Growth(len(req.Txs))
for i := range req.Txs {
minedTxs.txs[i] = &TxSlot{}
if _, err := parseCtx.ParseTransaction(req.Txs[i], 0, minedTxs.txs[i], minedTxs.senders.At(i)); err != nil {
if _, err := f.stateChangesParseCtx.ParseTransaction(req.Txs[i], 0, minedTxs.txs[i], minedTxs.senders.At(i)); err != nil {
log.Warn("stream.Recv", "err", err)
continue
}
@ -413,7 +414,7 @@ func (f *Fetch) handleStateChanges(ctx context.Context, client remote.KVClient)
unwindTxs.Growth(len(req.Txs))
for i := range req.Txs {
unwindTxs.txs[i] = &TxSlot{}
if _, err := parseCtx.ParseTransaction(req.Txs[i], 0, unwindTxs.txs[i], unwindTxs.senders.At(i)); err != nil {
if _, err := f.stateChangesParseCtx.ParseTransaction(req.Txs[i], 0, unwindTxs.txs[i], unwindTxs.senders.At(i)); err != nil {
log.Warn("stream.Recv", "err", err)
continue
}
@ -429,10 +430,7 @@ func (f *Fetch) handleStateChanges(ctx context.Context, client remote.KVClient)
addr := gointerfaces.ConvertH160toAddress(change.Address)
diff[string(addr[:])] = senderInfo{nonce: nonce, balance: balance}
}
if err := f.coreDB.View(ctx, func(tx kv.Tx) error {
return f.pool.OnNewBlock(tx, diff, unwindTxs, minedTxs, req.ProtocolBaseFee, 0, req.BlockHeight, f.senders)
}); err != nil {
if err := f.pool.OnNewBlock(diff, unwindTxs, minedTxs, req.ProtocolBaseFee, 0, req.BlockHeight, f.senders); err != nil {
log.Warn("onNewBlock", "err", err)
}
if f.wg != nil {

View File

@ -4,9 +4,8 @@
package txpool
import (
"sync"
"github.com/ledgerwatch/erigon-lib/kv"
"sync"
)
// Ensure, that PoolMock does implement Pool.
@ -19,7 +18,7 @@ var _ Pool = &PoolMock{}
//
// // make and configure a mocked Pool
// mockedPool := &PoolMock{
// AddFunc: func(db kv.Tx, newTxs TxSlots, senders *SendersCache) error {
// AddFunc: func(db kv.RoDB, newTxs TxSlots, senders *SendersCache) error {
// panic("mock out the Add method")
// },
// AddNewGoodPeerFunc: func(peerID PeerID) {
@ -31,7 +30,7 @@ var _ Pool = &PoolMock{}
// IdHashKnownFunc: func(hash []byte) bool {
// panic("mock out the IdHashKnown method")
// },
// OnNewBlockFunc: func(db kv.Tx, stateChanges map[string]senderInfo, unwindTxs TxSlots, minedTxs TxSlots, protocolBaseFee uint64, pendingBaseFee uint64, blockHeight uint64, senders *SendersCache) error {
// OnNewBlockFunc: func(stateChanges map[string]senderInfo, unwindTxs TxSlots, minedTxs TxSlots, protocolBaseFee uint64, pendingBaseFee uint64, blockHeight uint64, senders *SendersCache) error {
// panic("mock out the OnNewBlock method")
// },
// StartedFunc: func() bool {
@ -45,7 +44,7 @@ var _ Pool = &PoolMock{}
// }
type PoolMock struct {
// AddFunc mocks the Add method.
AddFunc func(db kv.Tx, newTxs TxSlots, senders *SendersCache) error
AddFunc func(db kv.RoDB, newTxs TxSlots, senders *SendersCache) error
// AddNewGoodPeerFunc mocks the AddNewGoodPeer method.
AddNewGoodPeerFunc func(peerID PeerID)
@ -57,7 +56,7 @@ type PoolMock struct {
IdHashKnownFunc func(hash []byte) bool
// OnNewBlockFunc mocks the OnNewBlock method.
OnNewBlockFunc func(db kv.Tx, stateChanges map[string]senderInfo, unwindTxs TxSlots, minedTxs TxSlots, protocolBaseFee uint64, pendingBaseFee uint64, blockHeight uint64, senders *SendersCache) error
OnNewBlockFunc func(stateChanges map[string]senderInfo, unwindTxs TxSlots, minedTxs TxSlots, protocolBaseFee uint64, pendingBaseFee uint64, blockHeight uint64, senders *SendersCache) error
// StartedFunc mocks the Started method.
StartedFunc func() bool
@ -67,7 +66,7 @@ type PoolMock struct {
// Add holds details about calls to the Add method.
Add []struct {
// Db is the db argument value.
Db kv.Tx
Db kv.RoDB
// NewTxs is the newTxs argument value.
NewTxs TxSlots
// Senders is the senders argument value.
@ -90,8 +89,6 @@ type PoolMock struct {
}
// OnNewBlock holds details about calls to the OnNewBlock method.
OnNewBlock []struct {
// Db is the db argument value.
Db kv.Tx
// StateChanges is the stateChanges argument value.
StateChanges map[string]senderInfo
// UnwindTxs is the unwindTxs argument value.
@ -120,9 +117,9 @@ type PoolMock struct {
}
// Add calls AddFunc.
func (mock *PoolMock) Add(db kv.Tx, newTxs TxSlots, senders *SendersCache) error {
func (mock *PoolMock) Add(db kv.RoDB, newTxs TxSlots, senders *SendersCache) error {
callInfo := struct {
Db kv.Tx
Db kv.RoDB
NewTxs TxSlots
Senders *SendersCache
}{
@ -146,12 +143,12 @@ func (mock *PoolMock) Add(db kv.Tx, newTxs TxSlots, senders *SendersCache) error
// Check the length with:
// len(mockedPool.AddCalls())
func (mock *PoolMock) AddCalls() []struct {
Db kv.Tx
Db kv.RoDB
NewTxs TxSlots
Senders *SendersCache
} {
var calls []struct {
Db kv.Tx
Db kv.RoDB
NewTxs TxSlots
Senders *SendersCache
}
@ -261,9 +258,8 @@ func (mock *PoolMock) IdHashKnownCalls() []struct {
}
// OnNewBlock calls OnNewBlockFunc.
func (mock *PoolMock) OnNewBlock(db kv.Tx, stateChanges map[string]senderInfo, unwindTxs TxSlots, minedTxs TxSlots, protocolBaseFee uint64, pendingBaseFee uint64, blockHeight uint64, senders *SendersCache) error {
func (mock *PoolMock) OnNewBlock(stateChanges map[string]senderInfo, unwindTxs TxSlots, minedTxs TxSlots, protocolBaseFee uint64, pendingBaseFee uint64, blockHeight uint64, senders *SendersCache) error {
callInfo := struct {
Db kv.Tx
StateChanges map[string]senderInfo
UnwindTxs TxSlots
MinedTxs TxSlots
@ -272,7 +268,6 @@ func (mock *PoolMock) OnNewBlock(db kv.Tx, stateChanges map[string]senderInfo, u
BlockHeight uint64
Senders *SendersCache
}{
Db: db,
StateChanges: stateChanges,
UnwindTxs: unwindTxs,
MinedTxs: minedTxs,
@ -290,14 +285,13 @@ func (mock *PoolMock) OnNewBlock(db kv.Tx, stateChanges map[string]senderInfo, u
)
return errOut
}
return mock.OnNewBlockFunc(db, stateChanges, unwindTxs, minedTxs, protocolBaseFee, pendingBaseFee, blockHeight, senders)
return mock.OnNewBlockFunc(stateChanges, unwindTxs, minedTxs, protocolBaseFee, pendingBaseFee, blockHeight, senders)
}
// OnNewBlockCalls gets all the calls that were made to OnNewBlock.
// Check the length with:
// len(mockedPool.OnNewBlockCalls())
func (mock *PoolMock) OnNewBlockCalls() []struct {
Db kv.Tx
StateChanges map[string]senderInfo
UnwindTxs TxSlots
MinedTxs TxSlots
@ -307,7 +301,6 @@ func (mock *PoolMock) OnNewBlockCalls() []struct {
Senders *SendersCache
} {
var calls []struct {
Db kv.Tx
StateChanges map[string]senderInfo
UnwindTxs TxSlots
MinedTxs TxSlots

View File

@ -188,7 +188,6 @@ func ParsePooledTransactions65(payload []byte, pos int, ctx *TxParseContext, txS
pos, err = ctx.ParseTransaction(payload, pos, txSlots.txs[i], txSlots.senders.At(i))
if err != nil {
if errors.Is(err, ErrRejected) {
fmt.Printf("rejected\n")
continue
}
return 0, err
@ -217,7 +216,6 @@ func ParsePooledTransactions66(payload []byte, pos int, ctx *TxParseContext, txS
pos, err = ctx.ParseTransaction(payload, pos, txSlots.txs[i], txSlots.senders.At(i))
if err != nil {
if errors.Is(err, ErrRejected) {
fmt.Printf("rejected\n")
continue
}
return requestID, 0, err

View File

@ -41,8 +41,8 @@ type Pool interface {
IdHashKnown(hash []byte) bool
Started() bool
GetRlp(hash []byte) []byte
Add(db kv.Tx, newTxs TxSlots, senders *SendersCache) error
OnNewBlock(db kv.Tx, stateChanges map[string]senderInfo, unwindTxs, minedTxs TxSlots, protocolBaseFee, pendingBaseFee, blockHeight uint64, senders *SendersCache) error
Add(db kv.RoDB, newTxs TxSlots, senders *SendersCache) error
OnNewBlock(stateChanges map[string]senderInfo, unwindTxs, minedTxs TxSlots, protocolBaseFee, pendingBaseFee, blockHeight uint64, senders *SendersCache) error
AddNewGoodPeer(peerID PeerID)
}
@ -89,11 +89,11 @@ const PendingSubPool SubPoolType = 1
const BaseFeeSubPool SubPoolType = 2
const QueuedSubPool SubPoolType = 3
const PendingSubPoolLimit = 1024
const BaseFeeSubPoolLimit = 1024
const QueuedSubPoolLimit = 1024
const PendingSubPoolLimit = 10 * 1024
const BaseFeeSubPoolLimit = 10 * 1024
const QueuedSubPoolLimit = 10 * 1024
const MaxSendersInfoCache = 1024
const MaxSendersInfoCache = 2 * (PendingSubPoolLimit + BaseFeeSubPoolLimit + QueuedSubPoolLimit)
type nonce2Tx struct{ *btree.BTree }
@ -146,7 +146,13 @@ func (sc *SendersCache) forEach(f func(info *senderInfo)) {
f(sc.senderInfo[i])
}
}
func (sc *SendersCache) len() int {
sc.lock.RLock()
defer sc.lock.RUnlock()
return len(sc.senderInfo)
}
/*
func (sc *SendersCache) evict() int {
sc.lock.Lock()
defer sc.lock.Unlock()
@ -156,24 +162,24 @@ func (sc *SendersCache) evict() int {
}
count := 0
for i := range sc.senderInfo {
if sc.senderInfo[i].txNonce2Tx.Len() > 0 {
for addr, id := range sc.senderIDs {
if sc.senderInfo[id].txNonce2Tx.Len() > 0 {
continue
}
for addr, id := range sc.senderIDs {
if id == i {
delete(sc.senderIDs, addr)
}
}
delete(sc.senderInfo, i)
delete(sc.senderInfo, id)
delete(sc.senderIDs, addr)
count++
}
return count
}
*/
func (sc *SendersCache) onNewTxs(coreDBTx kv.Tx, newTxs TxSlots) error {
func (sc *SendersCache) onNewTxs(coreDBTx kv.RoDB, newTxs TxSlots) error {
sc.ensureSenderIDOnNewTxs(newTxs)
toLoad := sc.setTxSenderID(newTxs)
if len(toLoad) == 0 {
return nil
}
diff, err := loadSenders(coreDBTx, toLoad)
if err != nil {
return err
@ -182,39 +188,64 @@ func (sc *SendersCache) onNewTxs(coreDBTx kv.Tx, newTxs TxSlots) error {
return nil
}
func (sc *SendersCache) onNewBlock(coreDBTx kv.Tx, stateChanges map[string]senderInfo, unwindTxs, minedTxs TxSlots, blockHeight uint64) error {
func (sc *SendersCache) onNewBlock(stateChanges map[string]senderInfo, unwindTxs, minedTxs TxSlots, blockHeight uint64) error {
//TODO: if see non-continuous block heigh - drop cache and reload from db
sc.blockHeight.Store(blockHeight)
//`loadSenders` goes by network to core - and it must be outside of SendersCache lock. But other methods must be locked
sc.mergeStateChanges(stateChanges, unwindTxs, minedTxs)
toLoad := sc.setTxSenderID(unwindTxs)
diff, err := loadSenders(coreDBTx, toLoad)
if err != nil {
return err
}
sc.set(diff)
toLoad = sc.setTxSenderID(minedTxs)
diff, err = loadSenders(coreDBTx, toLoad)
if err != nil {
return err
}
sc.set(diff)
_ = sc.setTxSenderID(unwindTxs)
/*
if len(toLoad) > 0 {
diff, err := loadSenders(coreDBTx, toLoad)
if err != nil {
return err
}
sc.set(diff)
}
*/
_ = sc.setTxSenderID(minedTxs)
/*
if len(toLoad) == 0 {
return nil
}
diff, err := loadSenders(coreDBTx, toLoad)
if err != nil {
return err
}
sc.set(diff)
*/
return nil
}
func (sc *SendersCache) set(diff map[uint64]*senderInfo) {
func (sc *SendersCache) set(diff map[uint64]senderInfo) {
sc.lock.Lock()
defer sc.lock.Unlock()
for id := range diff { // merge state changes
sc.senderInfo[id] = diff[id]
a := diff[id]
sc.senderInfo[id] = &a
}
}
func (sc *SendersCache) mergeStateChanges(stateChanges map[string]senderInfo, unwindedTxs, minedTxs TxSlots) {
sc.lock.Lock()
defer sc.lock.Unlock()
for addr, id := range sc.senderIDs { // merge state changes
if v, ok := stateChanges[addr]; ok {
sc.senderInfo[id] = newSenderInfo(v.nonce, v.balance)
for addr, v := range stateChanges { // merge state changes
id, ok := sc.senderIDs[addr]
if !ok {
sc.senderID++
id = sc.senderID
sc.senderIDs[addr] = id
}
sc.senderInfo[id] = newSenderInfo(v.nonce, v.balance)
}
/*
for addr, id := range sc.senderIDs { // merge state changes
if v, ok := stateChanges[addr]; ok {
sc.senderInfo[id] = newSenderInfo(v.nonce, v.balance)
}
}
*/
for i := 0; i < unwindedTxs.senders.Len(); i++ {
id, ok := sc.senderIDs[string(unwindedTxs.senders.At(i))]
if !ok {
@ -222,8 +253,8 @@ func (sc *SendersCache) mergeStateChanges(stateChanges map[string]senderInfo, un
id = sc.senderID
sc.senderIDs[string(unwindedTxs.senders.At(i))] = id
}
if v, ok := stateChanges[string(unwindedTxs.senders.At(i))]; ok {
sc.senderInfo[id] = newSenderInfo(v.nonce, v.balance)
if _, ok := stateChanges[string(unwindedTxs.senders.At(i))]; !ok {
sc.senderInfo[id] = newSenderInfo(0, *uint256.NewInt(0))
}
}
@ -234,9 +265,13 @@ func (sc *SendersCache) mergeStateChanges(stateChanges map[string]senderInfo, un
id = sc.senderID
sc.senderIDs[string(minedTxs.senders.At(i))] = id
}
if v, ok := stateChanges[string(minedTxs.senders.At(i))]; ok {
sc.senderInfo[id] = newSenderInfo(v.nonce, v.balance)
if _, ok := stateChanges[string(minedTxs.senders.At(i))]; !ok {
sc.senderInfo[id] = newSenderInfo(0, *uint256.NewInt(0))
}
//if v, ok := stateChanges[string(minedTxs.senders.At(i))]; ok {
// sc.senderInfo[id] = newSenderInfo(v.nonce, v.balance)
//}
}
}
@ -277,18 +312,27 @@ func (sc *SendersCache) setTxSenderID(txs TxSlots) map[uint64]string {
return toLoad
}
func loadSenders(coreDB kv.Tx, toLoad map[uint64]string) (map[uint64]*senderInfo, error) {
diff := make(map[uint64]*senderInfo, len(toLoad))
for id := range toLoad {
encoded, err := coreDB.GetOne(kv.PlainState, []byte(toLoad[id]))
if err != nil {
return nil, err
func loadSenders(coreDB kv.RoDB, toLoad map[uint64]string) (map[uint64]senderInfo, error) {
diff := make(map[uint64]senderInfo, len(toLoad))
if err := coreDB.View(context.Background(), func(tx kv.Tx) error {
for id := range toLoad {
encoded, err := tx.GetOne(kv.PlainState, []byte(toLoad[id]))
if err != nil {
return err
}
if len(encoded) == 0 {
diff[id] = *newSenderInfo(0, *uint256.NewInt(0))
continue
}
nonce, balance, err := DecodeSender(encoded)
if err != nil {
return err
}
diff[id] = *newSenderInfo(nonce, balance)
}
nonce, balance, err := DecodeSender(encoded)
if err != nil {
return nil, err
}
diff[id] = newSenderInfo(nonce, balance)
return nil
}); err != nil {
return nil, err
}
return diff, nil
}
@ -312,7 +356,6 @@ type TxPool struct {
// fields for transaction propagation
recentlyConnectedPeers *recentlyConnectedPeers
newTxs chan Hashes
//lastTxPropagationTimestamp time.Time
}
func New(newTxs chan Hashes, db kv.RwDB) (*TxPool, error) {
@ -405,7 +448,8 @@ func (p *TxPool) Started() bool {
return p.protocolBaseFee.Load() > 0
}
func (p *TxPool) Add(coreDB kv.Tx, newTxs TxSlots, senders *SendersCache) error {
func (p *TxPool) Add(coreDB kv.RoDB, newTxs TxSlots, senders *SendersCache) error {
t := time.Now()
if err := senders.onNewTxs(coreDB, newTxs); err != nil {
return err
}
@ -417,7 +461,6 @@ func (p *TxPool) Add(coreDB kv.Tx, newTxs TxSlots, senders *SendersCache) error
if protocolBaseFee == 0 || pendingBaseFee == 0 {
return fmt.Errorf("non-zero base fee: %d,%d", protocolBaseFee, pendingBaseFee)
}
p.lock.Lock()
defer p.lock.Unlock()
if err := onNewTxs(senders, newTxs, protocolBaseFee, pendingBaseFee, p.pending, p.baseFee, p.queued, p.byHash, p.localsHistory); err != nil {
@ -438,6 +481,7 @@ func (p *TxPool) Add(coreDB kv.Tx, newTxs TxSlots, senders *SendersCache) error
}
}
log.Info("on new txs", "in", time.Since(t))
return nil
}
func onNewTxs(senders *SendersCache, newTxs TxSlots, protocolBaseFee, pendingBaseFee uint64, pending, baseFee, queued *SubPool, byHash map[string]*metaTx, localsHistory *simplelru.LRU) error {
@ -501,15 +545,15 @@ func (p *TxPool) setBaseFee(protocolBaseFee, pendingBaseFee uint64) (uint64, uin
if hasNewVal {
p.pendingBaseFee.Store(pendingBaseFee)
}
log.Debug("set base fee", "protocol", protocolBaseFee, "pending", pendingBaseFee)
return protocolBaseFee, p.pendingBaseFee.Load()
}
func (p *TxPool) OnNewBlock(coreDB kv.Tx, stateChanges map[string]senderInfo, unwindTxs, minedTxs TxSlots, protocolBaseFee, pendingBaseFee, blockHeight uint64, senders *SendersCache) error {
if err := senders.onNewBlock(coreDB, stateChanges, unwindTxs, minedTxs, blockHeight); err != nil {
func (p *TxPool) OnNewBlock(stateChanges map[string]senderInfo, unwindTxs, minedTxs TxSlots, protocolBaseFee, pendingBaseFee, blockHeight uint64, senders *SendersCache) error {
t := time.Now()
if err := senders.onNewBlock(stateChanges, unwindTxs, minedTxs, blockHeight); err != nil {
return err
}
log.Debug("[txpool.onNewBlock]", "unwinded", len(unwindTxs.txs), "mined", len(minedTxs.txs), "protocolBaseFee", protocolBaseFee, "blockHeight", blockHeight)
//log.Debug("[txpool] new block", "unwinded", len(unwindTxs.txs), "mined", len(minedTxs.txs), "protocolBaseFee", protocolBaseFee, "blockHeight", blockHeight)
protocolBaseFee, pendingBaseFee = p.setBaseFee(protocolBaseFee, pendingBaseFee)
if err := unwindTxs.Valid(); err != nil {
return err
@ -520,7 +564,6 @@ func (p *TxPool) OnNewBlock(coreDB kv.Tx, stateChanges map[string]senderInfo, un
p.lock.Lock()
defer p.lock.Unlock()
//log.Debug("[txpool.onNewBlock]", "senderInfo", len(p.senderInfo))
if err := onNewBlock(senders, unwindTxs, minedTxs.txs, protocolBaseFee, pendingBaseFee, p.pending, p.baseFee, p.queued, p.byHash, p.localsHistory); err != nil {
return err
}
@ -539,7 +582,12 @@ func (p *TxPool) OnNewBlock(coreDB kv.Tx, stateChanges map[string]senderInfo, un
default:
}
}
//count := senders.evict()
//if count > 0 {
// log.Debug("evicted senders", "amount", count)
//}
log.Info("on new block", "in", time.Since(t))
return nil
}
func (p *TxPool) flushIsLocalHistory(tx kv.RwTx) error {
@ -582,7 +630,9 @@ func onNewBlock(senders *SendersCache, unwindTxs TxSlots, minedTxs []*TxSlot, pr
localsHistory.Add(i.Tx.idHash, struct{}{})
}
})
//log.Info("remove mined", "removed", j, "minedTxsLen", len(minedTxs))
if j > 0 {
log.Info("remove mined", "removed", j, "minedTxsLen", len(minedTxs))
}
// This can be thought of a reverse operation from the one described before.
// When a block that was deemed "the best" of its height, is no longer deemed "the best", the
@ -650,13 +700,13 @@ func onNewBlock(senders *SendersCache, unwindTxs TxSlots, minedTxs []*TxSlot, pr
func removeMined(senders *SendersCache, minedTxs []*TxSlot, pending, baseFee, queued *SubPool, discard func(tx *metaTx)) {
for _, tx := range minedTxs {
sender := senders.get(tx.senderID)
if sender.txNonce2Tx.Len() > 0 {
log.Debug("[txpool] removing mined", "senderID", tx.senderID, "sender.txNonce2Tx.len()", sender.txNonce2Tx.Len())
}
//if sender.txNonce2Tx.Len() > 0 {
//log.Debug("[txpool] removing mined", "senderID", tx.senderID, "sender.txNonce2Tx.len()", sender.txNonce2Tx.Len())
//}
// delete mined transactions from everywhere
sender.txNonce2Tx.Ascend(func(i btree.Item) bool {
it := i.(*nonce2TxItem)
log.Debug("[txpool] removing mined, cmp nonces", "tx.nonce", it.metaTx.Tx.nonce, "sender.nonce", sender.nonce)
//log.Debug("[txpool] removing mined, cmp nonces", "tx.nonce", it.metaTx.Tx.nonce, "sender.nonce", sender.nonce)
if it.metaTx.Tx.nonce > sender.nonce {
return false
}
@ -675,6 +725,7 @@ func removeMined(senders *SendersCache, minedTxs []*TxSlot, pending, baseFee, qu
queued.UnsafeRemove(it.metaTx)
discard(it.metaTx)
default:
fmt.Printf("aaaaaaa\n")
//already removed
}
return true
@ -1015,10 +1066,8 @@ func BroadcastLoop(ctx context.Context, db kv.RwDB, p *TxPool, senders *SendersC
return
case <-logEvery.C:
p.logStats()
log.Info("cache", "size", senders.len())
case <-evictSendersEvery.C:
// evict sendersInfo without txs
count := senders.evict()
log.Debug("evicted senders", "amount", count)
if db != nil {
if err := db.Update(ctx, func(tx kv.RwTx) error {
return p.flushIsLocalHistory(tx)

View File

@ -434,13 +434,13 @@ func FuzzOnNewBlocks11(f *testing.F) {
// go to first fork
//fmt.Printf("ll1: %d,%d,%d\n", pool.pending.Len(), pool.baseFee.Len(), pool.queued.Len())
unwindTxs, minedTxs1, p2pReceived, minedTxs2 := splitDataset(txs)
err = pool.OnNewBlock(nil, map[string]senderInfo{}, unwindTxs, minedTxs1, protocolBaseFee, pendingBaseFee, 1, sendersCache)
err = pool.OnNewBlock(map[string]senderInfo{}, unwindTxs, minedTxs1, protocolBaseFee, pendingBaseFee, 1, sendersCache)
assert.NoError(err)
check(unwindTxs, minedTxs1, "fork1")
checkNotify(unwindTxs, minedTxs1, "fork1")
// unwind everything and switch to new fork (need unwind mined now)
err = pool.OnNewBlock(nil, map[string]senderInfo{}, minedTxs1, minedTxs2, protocolBaseFee, pendingBaseFee, 2, sendersCache)
err = pool.OnNewBlock(map[string]senderInfo{}, minedTxs1, minedTxs2, protocolBaseFee, pendingBaseFee, 2, sendersCache)
assert.NoError(err)
check(minedTxs1, minedTxs2, "fork2")
checkNotify(minedTxs1, minedTxs2, "fork2")