From 5e5d8490b1b70613c76c642a61d102597f1ebfc3 Mon Sep 17 00:00:00 2001 From: Somnath Date: Wed, 17 Jan 2024 17:46:02 +0400 Subject: [PATCH] Move blob cache check in txpool (#9250) This should help with less frequent lock/unlock. Following from an earlier "TODO" --- erigon-lib/txpool/fetch.go | 17 +---- erigon-lib/txpool/mocks_test.go | 108 ++++++++-------------------- erigon-lib/txpool/pool.go | 20 ++++-- erigon-lib/txpool/pool_fuzz_test.go | 8 +-- erigon-lib/txpool/pool_test.go | 12 ++-- 5 files changed, 57 insertions(+), 108 deletions(-) diff --git a/erigon-lib/txpool/fetch.go b/erigon-lib/txpool/fetch.go index e5097a7f7..a4ca77bff 100644 --- a/erigon-lib/txpool/fetch.go +++ b/erigon-lib/txpool/fetch.go @@ -476,7 +476,7 @@ func (f *Fetch) handleStateChanges(ctx context.Context, client StateChangesClien } func (f *Fetch) handleStateChangesRequest(ctx context.Context, req *remote.StateChangeBatch) error { - var unwindTxs, minedTxs types2.TxSlots + var unwindTxs, unwindBlobTxs, minedTxs types2.TxSlots for _, change := range req.ChangeBatch { if change.Direction == remote.Direction_FORWARD { minedTxs.Resize(uint(len(change.Txs))) @@ -500,18 +500,7 @@ func (f *Fetch) handleStateChangesRequest(ctx context.Context, req *remote.State return err } if utx.Type == types2.BlobTxType { - var knownBlobTxn *metaTx - //TODO: don't check `KnownBlobTxn()` here - because each call require `txpool.mutex.lock()`. Better add all hashes here and do check inside `OnNewBlock` - if err := f.db.View(ctx, func(tx kv.Tx) error { - knownBlobTxn, err = f.pool.GetKnownBlobTxn(tx, utx.IDHash[:]) - return err - }); err != nil { - return err - } - // Get the blob tx from cache; ignore altogether if it isn't there - if knownBlobTxn != nil { - unwindTxs.Append(knownBlobTxn.Tx, sender, false) - } + unwindBlobTxs.Append(utx, sender, false) } else { unwindTxs.Append(utx, sender, false) } @@ -525,7 +514,7 @@ func (f *Fetch) handleStateChangesRequest(ctx context.Context, req *remote.State } if err := f.db.View(ctx, func(tx kv.Tx) error { - return f.pool.OnNewBlock(ctx, req, unwindTxs, minedTxs, tx) + return f.pool.OnNewBlock(ctx, req, unwindTxs, unwindBlobTxs, minedTxs, tx) }); err != nil && !errors.Is(err, context.Canceled) { return err } diff --git a/erigon-lib/txpool/mocks_test.go b/erigon-lib/txpool/mocks_test.go index 22e8e8121..502b4a690 100644 --- a/erigon-lib/txpool/mocks_test.go +++ b/erigon-lib/txpool/mocks_test.go @@ -34,16 +34,13 @@ var _ Pool = &PoolMock{} // FilterKnownIdHashesFunc: func(tx kv.Tx, hashes types2.Hashes) (types2.Hashes, error) { // panic("mock out the FilterKnownIdHashes method") // }, -// GetKnownBlobTxnFunc: func(tx kv.Tx, hash []byte) (*metaTx, error) { -// panic("mock out the GetKnownBlobTxn method") -// }, // GetRlpFunc: func(tx kv.Tx, hash []byte) ([]byte, error) { // panic("mock out the GetRlp method") // }, // IdHashKnownFunc: func(tx kv.Tx, hash []byte) (bool, error) { // panic("mock out the IdHashKnown method") // }, -// OnNewBlockFunc: func(ctx context.Context, stateChanges *remote.StateChangeBatch, unwindTxs types2.TxSlots, minedTxs types2.TxSlots, tx kv.Tx) error { +// OnNewBlockFunc: func(ctx context.Context, stateChanges *remote.StateChangeBatch, unwindTxs types2.TxSlots, unwindBlobTxs types2.TxSlots, minedTxs types2.TxSlots, tx kv.Tx) error { // panic("mock out the OnNewBlock method") // }, // StartedFunc: func() bool { @@ -71,9 +68,6 @@ type PoolMock struct { // FilterKnownIdHashesFunc mocks the FilterKnownIdHashes method. FilterKnownIdHashesFunc func(tx kv.Tx, hashes types2.Hashes) (types2.Hashes, error) - // GetKnownBlobTxnFunc mocks the GetKnownBlobTxn method. - GetKnownBlobTxnFunc func(tx kv.Tx, hash []byte) (*metaTx, error) - // GetRlpFunc mocks the GetRlp method. GetRlpFunc func(tx kv.Tx, hash []byte) ([]byte, error) @@ -81,7 +75,7 @@ type PoolMock struct { IdHashKnownFunc func(tx kv.Tx, hash []byte) (bool, error) // OnNewBlockFunc mocks the OnNewBlock method. - OnNewBlockFunc func(ctx context.Context, stateChanges *remote.StateChangeBatch, unwindTxs types2.TxSlots, minedTxs types2.TxSlots, tx kv.Tx) error + OnNewBlockFunc func(ctx context.Context, stateChanges *remote.StateChangeBatch, unwindTxs types2.TxSlots, unwindBlobTxs types2.TxSlots, minedTxs types2.TxSlots, tx kv.Tx) error // StartedFunc mocks the Started method. StartedFunc func() bool @@ -119,13 +113,6 @@ type PoolMock struct { // Hashes is the hashes argument value. Hashes types2.Hashes } - // GetKnownBlobTxn holds details about calls to the GetKnownBlobTxn method. - GetKnownBlobTxn []struct { - // Tx is the tx argument value. - Tx kv.Tx - // Hash is the hash argument value. - Hash []byte - } // GetRlp holds details about calls to the GetRlp method. GetRlp []struct { // Tx is the tx argument value. @@ -148,6 +135,8 @@ type PoolMock struct { StateChanges *remote.StateChangeBatch // UnwindTxs is the unwindTxs argument value. UnwindTxs types2.TxSlots + // UnwindBlobTxs is the unwindBlobTxs argument value. + UnwindBlobTxs types2.TxSlots // MinedTxs is the minedTxs argument value. MinedTxs types2.TxSlots // Tx is the tx argument value. @@ -166,7 +155,6 @@ type PoolMock struct { lockAddNewGoodPeer sync.RWMutex lockAddRemoteTxs sync.RWMutex lockFilterKnownIdHashes sync.RWMutex - lockGetKnownBlobTxn sync.RWMutex lockGetRlp sync.RWMutex lockIdHashKnown sync.RWMutex lockOnNewBlock sync.RWMutex @@ -326,46 +314,6 @@ func (mock *PoolMock) FilterKnownIdHashesCalls() []struct { return calls } -// GetKnownBlobTxn calls GetKnownBlobTxnFunc. -func (mock *PoolMock) GetKnownBlobTxn(tx kv.Tx, hash []byte) (*metaTx, error) { - callInfo := struct { - Tx kv.Tx - Hash []byte - }{ - Tx: tx, - Hash: hash, - } - mock.lockGetKnownBlobTxn.Lock() - mock.calls.GetKnownBlobTxn = append(mock.calls.GetKnownBlobTxn, callInfo) - mock.lockGetKnownBlobTxn.Unlock() - if mock.GetKnownBlobTxnFunc == nil { - var ( - metaTxMoqParamOut *metaTx - errOut error - ) - return metaTxMoqParamOut, errOut - } - return mock.GetKnownBlobTxnFunc(tx, hash) -} - -// GetKnownBlobTxnCalls gets all the calls that were made to GetKnownBlobTxn. -// Check the length with: -// -// len(mockedPool.GetKnownBlobTxnCalls()) -func (mock *PoolMock) GetKnownBlobTxnCalls() []struct { - Tx kv.Tx - Hash []byte -} { - var calls []struct { - Tx kv.Tx - Hash []byte - } - mock.lockGetKnownBlobTxn.RLock() - calls = mock.calls.GetKnownBlobTxn - mock.lockGetKnownBlobTxn.RUnlock() - return calls -} - // GetRlp calls GetRlpFunc. func (mock *PoolMock) GetRlp(tx kv.Tx, hash []byte) ([]byte, error) { callInfo := struct { @@ -447,19 +395,21 @@ func (mock *PoolMock) IdHashKnownCalls() []struct { } // OnNewBlock calls OnNewBlockFunc. -func (mock *PoolMock) OnNewBlock(ctx context.Context, stateChanges *remote.StateChangeBatch, unwindTxs types2.TxSlots, minedTxs types2.TxSlots, tx kv.Tx) error { +func (mock *PoolMock) OnNewBlock(ctx context.Context, stateChanges *remote.StateChangeBatch, unwindTxs types2.TxSlots, unwindBlobTxs types2.TxSlots, minedTxs types2.TxSlots, tx kv.Tx) error { callInfo := struct { - Ctx context.Context - StateChanges *remote.StateChangeBatch - UnwindTxs types2.TxSlots - MinedTxs types2.TxSlots - Tx kv.Tx + Ctx context.Context + StateChanges *remote.StateChangeBatch + UnwindTxs types2.TxSlots + UnwindBlobTxs types2.TxSlots + MinedTxs types2.TxSlots + Tx kv.Tx }{ - Ctx: ctx, - StateChanges: stateChanges, - UnwindTxs: unwindTxs, - MinedTxs: minedTxs, - Tx: tx, + Ctx: ctx, + StateChanges: stateChanges, + UnwindTxs: unwindTxs, + UnwindBlobTxs: unwindBlobTxs, + MinedTxs: minedTxs, + Tx: tx, } mock.lockOnNewBlock.Lock() mock.calls.OnNewBlock = append(mock.calls.OnNewBlock, callInfo) @@ -470,7 +420,7 @@ func (mock *PoolMock) OnNewBlock(ctx context.Context, stateChanges *remote.State ) return errOut } - return mock.OnNewBlockFunc(ctx, stateChanges, unwindTxs, minedTxs, tx) + return mock.OnNewBlockFunc(ctx, stateChanges, unwindTxs, unwindBlobTxs, minedTxs, tx) } // OnNewBlockCalls gets all the calls that were made to OnNewBlock. @@ -478,18 +428,20 @@ func (mock *PoolMock) OnNewBlock(ctx context.Context, stateChanges *remote.State // // len(mockedPool.OnNewBlockCalls()) func (mock *PoolMock) OnNewBlockCalls() []struct { - Ctx context.Context - StateChanges *remote.StateChangeBatch - UnwindTxs types2.TxSlots - MinedTxs types2.TxSlots - Tx kv.Tx + Ctx context.Context + StateChanges *remote.StateChangeBatch + UnwindTxs types2.TxSlots + UnwindBlobTxs types2.TxSlots + MinedTxs types2.TxSlots + Tx kv.Tx } { var calls []struct { - Ctx context.Context - StateChanges *remote.StateChangeBatch - UnwindTxs types2.TxSlots - MinedTxs types2.TxSlots - Tx kv.Tx + Ctx context.Context + StateChanges *remote.StateChangeBatch + UnwindTxs types2.TxSlots + UnwindBlobTxs types2.TxSlots + MinedTxs types2.TxSlots + Tx kv.Tx } mock.lockOnNewBlock.RLock() calls = mock.calls.OnNewBlock diff --git a/erigon-lib/txpool/pool.go b/erigon-lib/txpool/pool.go index a3eb55f25..d56b4d7cc 100644 --- a/erigon-lib/txpool/pool.go +++ b/erigon-lib/txpool/pool.go @@ -85,13 +85,12 @@ type Pool interface { // Handle 3 main events - new remote txs from p2p, new local txs from RPC, new blocks from execution layer AddRemoteTxs(ctx context.Context, newTxs types.TxSlots) AddLocalTxs(ctx context.Context, newTxs types.TxSlots, tx kv.Tx) ([]txpoolcfg.DiscardReason, error) - OnNewBlock(ctx context.Context, stateChanges *remote.StateChangeBatch, unwindTxs, minedTxs types.TxSlots, tx kv.Tx) error + OnNewBlock(ctx context.Context, stateChanges *remote.StateChangeBatch, unwindTxs, unwindBlobTxs, minedTxs types.TxSlots, tx kv.Tx) error // IdHashKnown check whether transaction with given Id hash is known to the pool IdHashKnown(tx kv.Tx, hash []byte) (bool, error) FilterKnownIdHashes(tx kv.Tx, hashes types.Hashes) (unknownHashes types.Hashes, err error) Started() bool GetRlp(tx kv.Tx, hash []byte) ([]byte, error) - GetKnownBlobTxn(tx kv.Tx, hash []byte) (*metaTx, error) AddNewGoodPeer(peerID types.PeerID) } @@ -337,7 +336,7 @@ func (p *TxPool) Start(ctx context.Context, db kv.RwDB) error { }) } -func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChangeBatch, unwindTxs, minedTxs types.TxSlots, tx kv.Tx) error { +func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChangeBatch, unwindTxs, unwindBlobTxs, minedTxs types.TxSlots, tx kv.Tx) error { defer newBlockTimer.ObserveDuration(time.Now()) //t := time.Now() @@ -401,6 +400,17 @@ func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChang p.blockGasLimit.Store(stateChanges.BlockGasLimit) + for i, txn := range unwindBlobTxs.Txs { + if txn.Type == types.BlobTxType { + knownBlobTxn, err := p.getCachedBlobTxnLocked(coreTx, txn.IDHash[:]) + if err != nil { + return err + } + if knownBlobTxn != nil { + unwindTxs.Append(knownBlobTxn.Tx, unwindBlobTxs.Senders.At(i), false) + } + } + } if err = p.senders.onNewBlock(stateChanges, unwindTxs, minedTxs, p.logger); err != nil { return err } @@ -619,10 +629,8 @@ func (p *TxPool) getUnprocessedTxn(hashS string) (*types.TxSlot, bool) { return nil, false } -func (p *TxPool) GetKnownBlobTxn(tx kv.Tx, hash []byte) (*metaTx, error) { +func (p *TxPool) getCachedBlobTxnLocked(tx kv.Tx, hash []byte) (*metaTx, error) { hashS := string(hash) - p.lock.Lock() - defer p.lock.Unlock() if mt, ok := p.minedBlobTxsByHash[hashS]; ok { return mt, nil } diff --git a/erigon-lib/txpool/pool_fuzz_test.go b/erigon-lib/txpool/pool_fuzz_test.go index 08106374c..54b1beb02 100644 --- a/erigon-lib/txpool/pool_fuzz_test.go +++ b/erigon-lib/txpool/pool_fuzz_test.go @@ -485,7 +485,7 @@ func FuzzOnNewBlocks(f *testing.F) { } // go to first fork txs1, txs2, p2pReceived, txs3 := splitDataset(txs) - err = pool.OnNewBlock(ctx, change, txs1, types.TxSlots{}, tx) + err = pool.OnNewBlock(ctx, change, txs1, types.TxSlots{}, types.TxSlots{}, tx) assert.NoError(err) check(txs1, types.TxSlots{}, "fork1") checkNotify(txs1, types.TxSlots{}, "fork1") @@ -498,7 +498,7 @@ func FuzzOnNewBlocks(f *testing.F) { {BlockHeight: 1, BlockHash: h0}, }, } - err = pool.OnNewBlock(ctx, change, types.TxSlots{}, txs2, tx) + err = pool.OnNewBlock(ctx, change, types.TxSlots{}, types.TxSlots{}, txs2, tx) assert.NoError(err) check(types.TxSlots{}, txs2, "fork1 mined") checkNotify(types.TxSlots{}, txs2, "fork1 mined") @@ -511,7 +511,7 @@ func FuzzOnNewBlocks(f *testing.F) { {BlockHeight: 0, BlockHash: h0, Direction: remote.Direction_UNWIND}, }, } - err = pool.OnNewBlock(ctx, change, txs2, types.TxSlots{}, tx) + err = pool.OnNewBlock(ctx, change, txs2, types.TxSlots{}, types.TxSlots{}, tx) assert.NoError(err) check(txs2, types.TxSlots{}, "fork2") checkNotify(txs2, types.TxSlots{}, "fork2") @@ -523,7 +523,7 @@ func FuzzOnNewBlocks(f *testing.F) { {BlockHeight: 1, BlockHash: h22}, }, } - err = pool.OnNewBlock(ctx, change, types.TxSlots{}, txs3, tx) + err = pool.OnNewBlock(ctx, change, types.TxSlots{}, types.TxSlots{}, txs3, tx) assert.NoError(err) check(types.TxSlots{}, txs3, "fork2 mined") checkNotify(types.TxSlots{}, txs3, "fork2 mined") diff --git a/erigon-lib/txpool/pool_test.go b/erigon-lib/txpool/pool_test.go index 80de08a51..9dcbf1cc9 100644 --- a/erigon-lib/txpool/pool_test.go +++ b/erigon-lib/txpool/pool_test.go @@ -81,7 +81,7 @@ func TestNonceFromAddress(t *testing.T) { tx, err := db.BeginRw(ctx) require.NoError(err) defer tx.Rollback() - err = pool.OnNewBlock(ctx, change, types.TxSlots{}, types.TxSlots{}, tx) + err = pool.OnNewBlock(ctx, change, types.TxSlots{}, types.TxSlots{}, types.TxSlots{}, tx) assert.NoError(err) { @@ -201,7 +201,7 @@ func TestReplaceWithHigherFee(t *testing.T) { tx, err := db.BeginRw(ctx) require.NoError(err) defer tx.Rollback() - err = pool.OnNewBlock(ctx, change, types.TxSlots{}, types.TxSlots{}, tx) + err = pool.OnNewBlock(ctx, change, types.TxSlots{}, types.TxSlots{}, types.TxSlots{}, tx) assert.NoError(err) { @@ -318,7 +318,7 @@ func TestReverseNonces(t *testing.T) { tx, err := db.BeginRw(ctx) require.NoError(err) defer tx.Rollback() - err = pool.OnNewBlock(ctx, change, types.TxSlots{}, types.TxSlots{}, tx) + err = pool.OnNewBlock(ctx, change, types.TxSlots{}, types.TxSlots{}, types.TxSlots{}, tx) assert.NoError(err) // 1. Send high fee transaction with nonce gap { @@ -445,7 +445,7 @@ func TestTxPoke(t *testing.T) { tx, err := db.BeginRw(ctx) require.NoError(err) defer tx.Rollback() - err = pool.OnNewBlock(ctx, change, types.TxSlots{}, types.TxSlots{}, tx) + err = pool.OnNewBlock(ctx, change, types.TxSlots{}, types.TxSlots{}, types.TxSlots{}, tx) assert.NoError(err) var idHash types.Hashes @@ -759,7 +759,7 @@ func TestBlobTxReplacement(t *testing.T) { tx, err := db.BeginRw(ctx) require.NoError(err) defer tx.Rollback() - err = pool.OnNewBlock(ctx, change, types.TxSlots{}, types.TxSlots{}, tx) + err = pool.OnNewBlock(ctx, change, types.TxSlots{}, types.TxSlots{}, types.TxSlots{}, tx) assert.NoError(err) tip, feeCap, blobFeeCap := uint256.NewInt(100_000), uint256.NewInt(200_000), uint256.NewInt(200_000) @@ -986,7 +986,7 @@ func TestDropRemoteAtNoGossip(t *testing.T) { tx, err := db.BeginRw(ctx) require.NoError(err) defer tx.Rollback() - err = txPool.OnNewBlock(ctx, change, types.TxSlots{}, types.TxSlots{}, tx) + err = txPool.OnNewBlock(ctx, change, types.TxSlots{}, types.TxSlots{}, types.TxSlots{}, tx) assert.NoError(err) // 1. Try Local Tx {