Move blob cache check in txpool (#9250)

This should help with less frequent lock/unlock. Following from an
earlier "TODO"
This commit is contained in:
Somnath 2024-01-17 17:46:02 +04:00 committed by GitHub
parent b38e17e393
commit 5e5d8490b1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 57 additions and 108 deletions

View File

@ -476,7 +476,7 @@ func (f *Fetch) handleStateChanges(ctx context.Context, client StateChangesClien
}
func (f *Fetch) handleStateChangesRequest(ctx context.Context, req *remote.StateChangeBatch) error {
var unwindTxs, minedTxs types2.TxSlots
var unwindTxs, unwindBlobTxs, minedTxs types2.TxSlots
for _, change := range req.ChangeBatch {
if change.Direction == remote.Direction_FORWARD {
minedTxs.Resize(uint(len(change.Txs)))
@ -500,18 +500,7 @@ func (f *Fetch) handleStateChangesRequest(ctx context.Context, req *remote.State
return err
}
if utx.Type == types2.BlobTxType {
var knownBlobTxn *metaTx
//TODO: don't check `KnownBlobTxn()` here - because each call require `txpool.mutex.lock()`. Better add all hashes here and do check inside `OnNewBlock`
if err := f.db.View(ctx, func(tx kv.Tx) error {
knownBlobTxn, err = f.pool.GetKnownBlobTxn(tx, utx.IDHash[:])
return err
}); err != nil {
return err
}
// Get the blob tx from cache; ignore altogether if it isn't there
if knownBlobTxn != nil {
unwindTxs.Append(knownBlobTxn.Tx, sender, false)
}
unwindBlobTxs.Append(utx, sender, false)
} else {
unwindTxs.Append(utx, sender, false)
}
@ -525,7 +514,7 @@ func (f *Fetch) handleStateChangesRequest(ctx context.Context, req *remote.State
}
if err := f.db.View(ctx, func(tx kv.Tx) error {
return f.pool.OnNewBlock(ctx, req, unwindTxs, minedTxs, tx)
return f.pool.OnNewBlock(ctx, req, unwindTxs, unwindBlobTxs, minedTxs, tx)
}); err != nil && !errors.Is(err, context.Canceled) {
return err
}

View File

@ -34,16 +34,13 @@ var _ Pool = &PoolMock{}
// FilterKnownIdHashesFunc: func(tx kv.Tx, hashes types2.Hashes) (types2.Hashes, error) {
// panic("mock out the FilterKnownIdHashes method")
// },
// GetKnownBlobTxnFunc: func(tx kv.Tx, hash []byte) (*metaTx, error) {
// panic("mock out the GetKnownBlobTxn method")
// },
// GetRlpFunc: func(tx kv.Tx, hash []byte) ([]byte, error) {
// panic("mock out the GetRlp method")
// },
// IdHashKnownFunc: func(tx kv.Tx, hash []byte) (bool, error) {
// panic("mock out the IdHashKnown method")
// },
// OnNewBlockFunc: func(ctx context.Context, stateChanges *remote.StateChangeBatch, unwindTxs types2.TxSlots, minedTxs types2.TxSlots, tx kv.Tx) error {
// OnNewBlockFunc: func(ctx context.Context, stateChanges *remote.StateChangeBatch, unwindTxs types2.TxSlots, unwindBlobTxs types2.TxSlots, minedTxs types2.TxSlots, tx kv.Tx) error {
// panic("mock out the OnNewBlock method")
// },
// StartedFunc: func() bool {
@ -71,9 +68,6 @@ type PoolMock struct {
// FilterKnownIdHashesFunc mocks the FilterKnownIdHashes method.
FilterKnownIdHashesFunc func(tx kv.Tx, hashes types2.Hashes) (types2.Hashes, error)
// GetKnownBlobTxnFunc mocks the GetKnownBlobTxn method.
GetKnownBlobTxnFunc func(tx kv.Tx, hash []byte) (*metaTx, error)
// GetRlpFunc mocks the GetRlp method.
GetRlpFunc func(tx kv.Tx, hash []byte) ([]byte, error)
@ -81,7 +75,7 @@ type PoolMock struct {
IdHashKnownFunc func(tx kv.Tx, hash []byte) (bool, error)
// OnNewBlockFunc mocks the OnNewBlock method.
OnNewBlockFunc func(ctx context.Context, stateChanges *remote.StateChangeBatch, unwindTxs types2.TxSlots, minedTxs types2.TxSlots, tx kv.Tx) error
OnNewBlockFunc func(ctx context.Context, stateChanges *remote.StateChangeBatch, unwindTxs types2.TxSlots, unwindBlobTxs types2.TxSlots, minedTxs types2.TxSlots, tx kv.Tx) error
// StartedFunc mocks the Started method.
StartedFunc func() bool
@ -119,13 +113,6 @@ type PoolMock struct {
// Hashes is the hashes argument value.
Hashes types2.Hashes
}
// GetKnownBlobTxn holds details about calls to the GetKnownBlobTxn method.
GetKnownBlobTxn []struct {
// Tx is the tx argument value.
Tx kv.Tx
// Hash is the hash argument value.
Hash []byte
}
// GetRlp holds details about calls to the GetRlp method.
GetRlp []struct {
// Tx is the tx argument value.
@ -148,6 +135,8 @@ type PoolMock struct {
StateChanges *remote.StateChangeBatch
// UnwindTxs is the unwindTxs argument value.
UnwindTxs types2.TxSlots
// UnwindBlobTxs is the unwindBlobTxs argument value.
UnwindBlobTxs types2.TxSlots
// MinedTxs is the minedTxs argument value.
MinedTxs types2.TxSlots
// Tx is the tx argument value.
@ -166,7 +155,6 @@ type PoolMock struct {
lockAddNewGoodPeer sync.RWMutex
lockAddRemoteTxs sync.RWMutex
lockFilterKnownIdHashes sync.RWMutex
lockGetKnownBlobTxn sync.RWMutex
lockGetRlp sync.RWMutex
lockIdHashKnown sync.RWMutex
lockOnNewBlock sync.RWMutex
@ -326,46 +314,6 @@ func (mock *PoolMock) FilterKnownIdHashesCalls() []struct {
return calls
}
// GetKnownBlobTxn calls GetKnownBlobTxnFunc.
func (mock *PoolMock) GetKnownBlobTxn(tx kv.Tx, hash []byte) (*metaTx, error) {
callInfo := struct {
Tx kv.Tx
Hash []byte
}{
Tx: tx,
Hash: hash,
}
mock.lockGetKnownBlobTxn.Lock()
mock.calls.GetKnownBlobTxn = append(mock.calls.GetKnownBlobTxn, callInfo)
mock.lockGetKnownBlobTxn.Unlock()
if mock.GetKnownBlobTxnFunc == nil {
var (
metaTxMoqParamOut *metaTx
errOut error
)
return metaTxMoqParamOut, errOut
}
return mock.GetKnownBlobTxnFunc(tx, hash)
}
// GetKnownBlobTxnCalls gets all the calls that were made to GetKnownBlobTxn.
// Check the length with:
//
// len(mockedPool.GetKnownBlobTxnCalls())
func (mock *PoolMock) GetKnownBlobTxnCalls() []struct {
Tx kv.Tx
Hash []byte
} {
var calls []struct {
Tx kv.Tx
Hash []byte
}
mock.lockGetKnownBlobTxn.RLock()
calls = mock.calls.GetKnownBlobTxn
mock.lockGetKnownBlobTxn.RUnlock()
return calls
}
// GetRlp calls GetRlpFunc.
func (mock *PoolMock) GetRlp(tx kv.Tx, hash []byte) ([]byte, error) {
callInfo := struct {
@ -447,19 +395,21 @@ func (mock *PoolMock) IdHashKnownCalls() []struct {
}
// OnNewBlock calls OnNewBlockFunc.
func (mock *PoolMock) OnNewBlock(ctx context.Context, stateChanges *remote.StateChangeBatch, unwindTxs types2.TxSlots, minedTxs types2.TxSlots, tx kv.Tx) error {
func (mock *PoolMock) OnNewBlock(ctx context.Context, stateChanges *remote.StateChangeBatch, unwindTxs types2.TxSlots, unwindBlobTxs types2.TxSlots, minedTxs types2.TxSlots, tx kv.Tx) error {
callInfo := struct {
Ctx context.Context
StateChanges *remote.StateChangeBatch
UnwindTxs types2.TxSlots
MinedTxs types2.TxSlots
Tx kv.Tx
Ctx context.Context
StateChanges *remote.StateChangeBatch
UnwindTxs types2.TxSlots
UnwindBlobTxs types2.TxSlots
MinedTxs types2.TxSlots
Tx kv.Tx
}{
Ctx: ctx,
StateChanges: stateChanges,
UnwindTxs: unwindTxs,
MinedTxs: minedTxs,
Tx: tx,
Ctx: ctx,
StateChanges: stateChanges,
UnwindTxs: unwindTxs,
UnwindBlobTxs: unwindBlobTxs,
MinedTxs: minedTxs,
Tx: tx,
}
mock.lockOnNewBlock.Lock()
mock.calls.OnNewBlock = append(mock.calls.OnNewBlock, callInfo)
@ -470,7 +420,7 @@ func (mock *PoolMock) OnNewBlock(ctx context.Context, stateChanges *remote.State
)
return errOut
}
return mock.OnNewBlockFunc(ctx, stateChanges, unwindTxs, minedTxs, tx)
return mock.OnNewBlockFunc(ctx, stateChanges, unwindTxs, unwindBlobTxs, minedTxs, tx)
}
// OnNewBlockCalls gets all the calls that were made to OnNewBlock.
@ -478,18 +428,20 @@ func (mock *PoolMock) OnNewBlock(ctx context.Context, stateChanges *remote.State
//
// len(mockedPool.OnNewBlockCalls())
func (mock *PoolMock) OnNewBlockCalls() []struct {
Ctx context.Context
StateChanges *remote.StateChangeBatch
UnwindTxs types2.TxSlots
MinedTxs types2.TxSlots
Tx kv.Tx
Ctx context.Context
StateChanges *remote.StateChangeBatch
UnwindTxs types2.TxSlots
UnwindBlobTxs types2.TxSlots
MinedTxs types2.TxSlots
Tx kv.Tx
} {
var calls []struct {
Ctx context.Context
StateChanges *remote.StateChangeBatch
UnwindTxs types2.TxSlots
MinedTxs types2.TxSlots
Tx kv.Tx
Ctx context.Context
StateChanges *remote.StateChangeBatch
UnwindTxs types2.TxSlots
UnwindBlobTxs types2.TxSlots
MinedTxs types2.TxSlots
Tx kv.Tx
}
mock.lockOnNewBlock.RLock()
calls = mock.calls.OnNewBlock

View File

@ -85,13 +85,12 @@ type Pool interface {
// Handle 3 main events - new remote txs from p2p, new local txs from RPC, new blocks from execution layer
AddRemoteTxs(ctx context.Context, newTxs types.TxSlots)
AddLocalTxs(ctx context.Context, newTxs types.TxSlots, tx kv.Tx) ([]txpoolcfg.DiscardReason, error)
OnNewBlock(ctx context.Context, stateChanges *remote.StateChangeBatch, unwindTxs, minedTxs types.TxSlots, tx kv.Tx) error
OnNewBlock(ctx context.Context, stateChanges *remote.StateChangeBatch, unwindTxs, unwindBlobTxs, minedTxs types.TxSlots, tx kv.Tx) error
// IdHashKnown check whether transaction with given Id hash is known to the pool
IdHashKnown(tx kv.Tx, hash []byte) (bool, error)
FilterKnownIdHashes(tx kv.Tx, hashes types.Hashes) (unknownHashes types.Hashes, err error)
Started() bool
GetRlp(tx kv.Tx, hash []byte) ([]byte, error)
GetKnownBlobTxn(tx kv.Tx, hash []byte) (*metaTx, error)
AddNewGoodPeer(peerID types.PeerID)
}
@ -337,7 +336,7 @@ func (p *TxPool) Start(ctx context.Context, db kv.RwDB) error {
})
}
func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChangeBatch, unwindTxs, minedTxs types.TxSlots, tx kv.Tx) error {
func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChangeBatch, unwindTxs, unwindBlobTxs, minedTxs types.TxSlots, tx kv.Tx) error {
defer newBlockTimer.ObserveDuration(time.Now())
//t := time.Now()
@ -401,6 +400,17 @@ func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChang
p.blockGasLimit.Store(stateChanges.BlockGasLimit)
for i, txn := range unwindBlobTxs.Txs {
if txn.Type == types.BlobTxType {
knownBlobTxn, err := p.getCachedBlobTxnLocked(coreTx, txn.IDHash[:])
if err != nil {
return err
}
if knownBlobTxn != nil {
unwindTxs.Append(knownBlobTxn.Tx, unwindBlobTxs.Senders.At(i), false)
}
}
}
if err = p.senders.onNewBlock(stateChanges, unwindTxs, minedTxs, p.logger); err != nil {
return err
}
@ -619,10 +629,8 @@ func (p *TxPool) getUnprocessedTxn(hashS string) (*types.TxSlot, bool) {
return nil, false
}
func (p *TxPool) GetKnownBlobTxn(tx kv.Tx, hash []byte) (*metaTx, error) {
func (p *TxPool) getCachedBlobTxnLocked(tx kv.Tx, hash []byte) (*metaTx, error) {
hashS := string(hash)
p.lock.Lock()
defer p.lock.Unlock()
if mt, ok := p.minedBlobTxsByHash[hashS]; ok {
return mt, nil
}

View File

@ -485,7 +485,7 @@ func FuzzOnNewBlocks(f *testing.F) {
}
// go to first fork
txs1, txs2, p2pReceived, txs3 := splitDataset(txs)
err = pool.OnNewBlock(ctx, change, txs1, types.TxSlots{}, tx)
err = pool.OnNewBlock(ctx, change, txs1, types.TxSlots{}, types.TxSlots{}, tx)
assert.NoError(err)
check(txs1, types.TxSlots{}, "fork1")
checkNotify(txs1, types.TxSlots{}, "fork1")
@ -498,7 +498,7 @@ func FuzzOnNewBlocks(f *testing.F) {
{BlockHeight: 1, BlockHash: h0},
},
}
err = pool.OnNewBlock(ctx, change, types.TxSlots{}, txs2, tx)
err = pool.OnNewBlock(ctx, change, types.TxSlots{}, types.TxSlots{}, txs2, tx)
assert.NoError(err)
check(types.TxSlots{}, txs2, "fork1 mined")
checkNotify(types.TxSlots{}, txs2, "fork1 mined")
@ -511,7 +511,7 @@ func FuzzOnNewBlocks(f *testing.F) {
{BlockHeight: 0, BlockHash: h0, Direction: remote.Direction_UNWIND},
},
}
err = pool.OnNewBlock(ctx, change, txs2, types.TxSlots{}, tx)
err = pool.OnNewBlock(ctx, change, txs2, types.TxSlots{}, types.TxSlots{}, tx)
assert.NoError(err)
check(txs2, types.TxSlots{}, "fork2")
checkNotify(txs2, types.TxSlots{}, "fork2")
@ -523,7 +523,7 @@ func FuzzOnNewBlocks(f *testing.F) {
{BlockHeight: 1, BlockHash: h22},
},
}
err = pool.OnNewBlock(ctx, change, types.TxSlots{}, txs3, tx)
err = pool.OnNewBlock(ctx, change, types.TxSlots{}, types.TxSlots{}, txs3, tx)
assert.NoError(err)
check(types.TxSlots{}, txs3, "fork2 mined")
checkNotify(types.TxSlots{}, txs3, "fork2 mined")

View File

@ -81,7 +81,7 @@ func TestNonceFromAddress(t *testing.T) {
tx, err := db.BeginRw(ctx)
require.NoError(err)
defer tx.Rollback()
err = pool.OnNewBlock(ctx, change, types.TxSlots{}, types.TxSlots{}, tx)
err = pool.OnNewBlock(ctx, change, types.TxSlots{}, types.TxSlots{}, types.TxSlots{}, tx)
assert.NoError(err)
{
@ -201,7 +201,7 @@ func TestReplaceWithHigherFee(t *testing.T) {
tx, err := db.BeginRw(ctx)
require.NoError(err)
defer tx.Rollback()
err = pool.OnNewBlock(ctx, change, types.TxSlots{}, types.TxSlots{}, tx)
err = pool.OnNewBlock(ctx, change, types.TxSlots{}, types.TxSlots{}, types.TxSlots{}, tx)
assert.NoError(err)
{
@ -318,7 +318,7 @@ func TestReverseNonces(t *testing.T) {
tx, err := db.BeginRw(ctx)
require.NoError(err)
defer tx.Rollback()
err = pool.OnNewBlock(ctx, change, types.TxSlots{}, types.TxSlots{}, tx)
err = pool.OnNewBlock(ctx, change, types.TxSlots{}, types.TxSlots{}, types.TxSlots{}, tx)
assert.NoError(err)
// 1. Send high fee transaction with nonce gap
{
@ -445,7 +445,7 @@ func TestTxPoke(t *testing.T) {
tx, err := db.BeginRw(ctx)
require.NoError(err)
defer tx.Rollback()
err = pool.OnNewBlock(ctx, change, types.TxSlots{}, types.TxSlots{}, tx)
err = pool.OnNewBlock(ctx, change, types.TxSlots{}, types.TxSlots{}, types.TxSlots{}, tx)
assert.NoError(err)
var idHash types.Hashes
@ -759,7 +759,7 @@ func TestBlobTxReplacement(t *testing.T) {
tx, err := db.BeginRw(ctx)
require.NoError(err)
defer tx.Rollback()
err = pool.OnNewBlock(ctx, change, types.TxSlots{}, types.TxSlots{}, tx)
err = pool.OnNewBlock(ctx, change, types.TxSlots{}, types.TxSlots{}, types.TxSlots{}, tx)
assert.NoError(err)
tip, feeCap, blobFeeCap := uint256.NewInt(100_000), uint256.NewInt(200_000), uint256.NewInt(200_000)
@ -986,7 +986,7 @@ func TestDropRemoteAtNoGossip(t *testing.T) {
tx, err := db.BeginRw(ctx)
require.NoError(err)
defer tx.Rollback()
err = txPool.OnNewBlock(ctx, change, types.TxSlots{}, types.TxSlots{}, tx)
err = txPool.OnNewBlock(ctx, change, types.TxSlots{}, types.TxSlots{}, types.TxSlots{}, tx)
assert.NoError(err)
// 1. Try Local Tx
{