Merge pull request #24 from ledgerwatch/pool10

Pool: handle pooled transactions package
This commit is contained in:
Alex Sharov 2021-08-08 19:21:30 +07:00 committed by GitHub
commit 4bed7dc7d1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 114 additions and 52 deletions

View File

@ -262,6 +262,21 @@ func (f *Fetch) handleInboundMessage(req *sentry.InboundMessage, sentryClient se
}, &grpc.EmptyCallOption{}); err != nil {
return err
}
case sentry.MessageId_POOLED_TRANSACTIONS_65, sentry.MessageId_POOLED_TRANSACTIONS_66:
parseCtx := NewTxParseContext()
txs := TxSlots{}
if req.Id == sentry.MessageId_GET_POOLED_TRANSACTIONS_66 {
if _, err := ParsePooledTransactions65(req.Data, 0, parseCtx, &txs); err != nil {
return err
}
} else {
if _, _, err := ParsePooledTransactions66(req.Data, 0, parseCtx, &txs); err != nil {
return err
}
}
if err := f.pool.Add(txs); err != nil {
return err
}
}
return nil
@ -341,7 +356,7 @@ func (f *Fetch) handleNewPeer(req *sentry.PeersReply) error {
}
switch req.Event {
case sentry.PeersReply_Connect:
f.pool.NotifyNewPeer(req.PeerId)
f.pool.AddNewGoodPeer(req.PeerId)
}
return nil

View File

@ -17,15 +17,18 @@ var _ Pool = &PoolMock{}
//
// // make and configure a mocked Pool
// mockedPool := &PoolMock{
// AddFunc: func(newTxs TxSlots) error {
// panic("mock out the Add method")
// },
// AddNewGoodPeerFunc: func(peerID PeerID) {
// panic("mock out the AddNewGoodPeer method")
// },
// GetRlpFunc: func(hash []byte) []byte {
// panic("mock out the GetRlp method")
// },
// IdHashKnownFunc: func(hash []byte) bool {
// panic("mock out the IdHashKnown method")
// },
// NotifyNewPeerFunc: func(peerID PeerID) {
// panic("mock out the NotifyNewPeer method")
// },
// }
//
// // use mockedPool in code that requires Pool
@ -33,17 +36,30 @@ var _ Pool = &PoolMock{}
//
// }
type PoolMock struct {
// AddFunc mocks the Add method.
AddFunc func(newTxs TxSlots) error
// AddNewGoodPeerFunc mocks the AddNewGoodPeer method.
AddNewGoodPeerFunc func(peerID PeerID)
// GetRlpFunc mocks the GetRlp method.
GetRlpFunc func(hash []byte) []byte
// IdHashKnownFunc mocks the IdHashKnown method.
IdHashKnownFunc func(hash []byte) bool
// NotifyNewPeerFunc mocks the NotifyNewPeer method.
NotifyNewPeerFunc func(peerID PeerID)
// calls tracks calls to the methods.
calls struct {
// Add holds details about calls to the Add method.
Add []struct {
// NewTxs is the newTxs argument value.
NewTxs TxSlots
}
// AddNewGoodPeer holds details about calls to the AddNewGoodPeer method.
AddNewGoodPeer []struct {
// PeerID is the peerID argument value.
PeerID PeerID
}
// GetRlp holds details about calls to the GetRlp method.
GetRlp []struct {
// Hash is the hash argument value.
@ -54,15 +70,76 @@ type PoolMock struct {
// Hash is the hash argument value.
Hash []byte
}
// NotifyNewPeer holds details about calls to the NotifyNewPeer method.
NotifyNewPeer []struct {
// PeerID is the peerID argument value.
PeerID PeerID
}
}
lockAdd sync.RWMutex
lockAddNewGoodPeer sync.RWMutex
lockGetRlp sync.RWMutex
lockIdHashKnown sync.RWMutex
lockNotifyNewPeer sync.RWMutex
}
// Add calls AddFunc.
func (mock *PoolMock) Add(newTxs TxSlots) error {
callInfo := struct {
NewTxs TxSlots
}{
NewTxs: newTxs,
}
mock.lockAdd.Lock()
mock.calls.Add = append(mock.calls.Add, callInfo)
mock.lockAdd.Unlock()
if mock.AddFunc == nil {
var (
errOut error
)
return errOut
}
return mock.AddFunc(newTxs)
}
// AddCalls gets all the calls that were made to Add.
// Check the length with:
// len(mockedPool.AddCalls())
func (mock *PoolMock) AddCalls() []struct {
NewTxs TxSlots
} {
var calls []struct {
NewTxs TxSlots
}
mock.lockAdd.RLock()
calls = mock.calls.Add
mock.lockAdd.RUnlock()
return calls
}
// AddNewGoodPeer calls AddNewGoodPeerFunc.
func (mock *PoolMock) AddNewGoodPeer(peerID PeerID) {
callInfo := struct {
PeerID PeerID
}{
PeerID: peerID,
}
mock.lockAddNewGoodPeer.Lock()
mock.calls.AddNewGoodPeer = append(mock.calls.AddNewGoodPeer, callInfo)
mock.lockAddNewGoodPeer.Unlock()
if mock.AddNewGoodPeerFunc == nil {
return
}
mock.AddNewGoodPeerFunc(peerID)
}
// AddNewGoodPeerCalls gets all the calls that were made to AddNewGoodPeer.
// Check the length with:
// len(mockedPool.AddNewGoodPeerCalls())
func (mock *PoolMock) AddNewGoodPeerCalls() []struct {
PeerID PeerID
} {
var calls []struct {
PeerID PeerID
}
mock.lockAddNewGoodPeer.RLock()
calls = mock.calls.AddNewGoodPeer
mock.lockAddNewGoodPeer.RUnlock()
return calls
}
// GetRlp calls GetRlpFunc.
@ -132,34 +209,3 @@ func (mock *PoolMock) IdHashKnownCalls() []struct {
mock.lockIdHashKnown.RUnlock()
return calls
}
// NotifyNewPeer calls NotifyNewPeerFunc.
func (mock *PoolMock) NotifyNewPeer(peerID PeerID) {
callInfo := struct {
PeerID PeerID
}{
PeerID: peerID,
}
mock.lockNotifyNewPeer.Lock()
mock.calls.NotifyNewPeer = append(mock.calls.NotifyNewPeer, callInfo)
mock.lockNotifyNewPeer.Unlock()
if mock.NotifyNewPeerFunc == nil {
return
}
mock.NotifyNewPeerFunc(peerID)
}
// NotifyNewPeerCalls gets all the calls that were made to NotifyNewPeer.
// Check the length with:
// len(mockedPool.NotifyNewPeerCalls())
func (mock *PoolMock) NotifyNewPeerCalls() []struct {
PeerID PeerID
} {
var calls []struct {
PeerID PeerID
}
mock.lockNotifyNewPeer.RLock()
calls = mock.calls.NotifyNewPeer
mock.lockNotifyNewPeer.RUnlock()
return calls
}

View File

@ -37,8 +37,9 @@ type Pool interface {
// IdHashKnown check whether transaction with given Id hash is known to the pool
IdHashKnown(hash []byte) bool
GetRlp(hash []byte) []byte
Add(newTxs TxSlots) error
NotifyNewPeer(peerID PeerID)
AddNewGoodPeer(peerID PeerID)
}
// SubPoolMarker ordered bitset responsible to sort transactions by sub-pools. Bits meaning:
@ -199,7 +200,7 @@ func (p *TxPool) IdHashIsLocal(hash []byte) bool {
}
func (p *TxPool) OnNewPeer(peerID PeerID) { p.recentlyConnectedPeers.AddPeer(peerID) }
func (p *TxPool) OnNewTxs(newTxs TxSlots) error {
func (p *TxPool) Add(newTxs TxSlots) error {
p.lock.Lock()
defer p.lock.Unlock()
if err := newTxs.Valid(); err != nil {
@ -710,9 +711,6 @@ func (mt *metaTx) Less(than *metaTx) bool {
if mt.Tx.nonce != than.Tx.nonce {
return mt.Tx.nonce < than.Tx.nonce
}
//if mt.Tx.senderID != than.Tx.senderID {
// return mt.Tx.senderID < than.Tx.senderID
//}
return false
}

View File

@ -438,7 +438,7 @@ func FuzzOnNewBlocks11(f *testing.F) {
checkNotify(minedTxs1, minedTxs2, "fork2")
// add some remote txs from p2p
err = pool.OnNewTxs(p2pReceived)
err = pool.Add(p2pReceived)
assert.NoError(err)
check(p2pReceived, TxSlots{}, "p2pmsg1")
checkNotify(p2pReceived, TxSlots{}, "p2pmsg1")

View File

@ -420,6 +420,9 @@ func (s *TxSlots) Growth(targetSize int) {
for s.senders.Len() < targetSize {
s.senders = append(s.senders, addressesGrowth...)
}
for len(s.isLocal) < targetSize {
s.isLocal = append(s.isLocal, false)
}
}
var addressesGrowth = make([]byte, 20)