diff --git a/txpool/fetch.go b/txpool/fetch.go index 626b515ff..d8edaeede 100644 --- a/txpool/fetch.go +++ b/txpool/fetch.go @@ -262,6 +262,21 @@ func (f *Fetch) handleInboundMessage(req *sentry.InboundMessage, sentryClient se }, &grpc.EmptyCallOption{}); err != nil { return err } + case sentry.MessageId_POOLED_TRANSACTIONS_65, sentry.MessageId_POOLED_TRANSACTIONS_66: + parseCtx := NewTxParseContext() + txs := TxSlots{} + if req.Id == sentry.MessageId_GET_POOLED_TRANSACTIONS_66 { + if _, err := ParsePooledTransactions65(req.Data, 0, parseCtx, &txs); err != nil { + return err + } + } else { + if _, _, err := ParsePooledTransactions66(req.Data, 0, parseCtx, &txs); err != nil { + return err + } + } + if err := f.pool.Add(txs); err != nil { + return err + } } return nil @@ -341,7 +356,7 @@ func (f *Fetch) handleNewPeer(req *sentry.PeersReply) error { } switch req.Event { case sentry.PeersReply_Connect: - f.pool.NotifyNewPeer(req.PeerId) + f.pool.AddNewGoodPeer(req.PeerId) } return nil diff --git a/txpool/mocks_test.go b/txpool/mocks_test.go index 1571ff15c..d3178604d 100644 --- a/txpool/mocks_test.go +++ b/txpool/mocks_test.go @@ -17,15 +17,18 @@ var _ Pool = &PoolMock{} // // // make and configure a mocked Pool // mockedPool := &PoolMock{ +// AddFunc: func(newTxs TxSlots) error { +// panic("mock out the Add method") +// }, +// AddNewGoodPeerFunc: func(peerID PeerID) { +// panic("mock out the AddNewGoodPeer method") +// }, // GetRlpFunc: func(hash []byte) []byte { // panic("mock out the GetRlp method") // }, // IdHashKnownFunc: func(hash []byte) bool { // panic("mock out the IdHashKnown method") // }, -// NotifyNewPeerFunc: func(peerID PeerID) { -// panic("mock out the NotifyNewPeer method") -// }, // } // // // use mockedPool in code that requires Pool @@ -33,17 +36,30 @@ var _ Pool = &PoolMock{} // // } type PoolMock struct { + // AddFunc mocks the Add method. + AddFunc func(newTxs TxSlots) error + + // AddNewGoodPeerFunc mocks the AddNewGoodPeer method. + AddNewGoodPeerFunc func(peerID PeerID) + // GetRlpFunc mocks the GetRlp method. GetRlpFunc func(hash []byte) []byte // IdHashKnownFunc mocks the IdHashKnown method. IdHashKnownFunc func(hash []byte) bool - // NotifyNewPeerFunc mocks the NotifyNewPeer method. - NotifyNewPeerFunc func(peerID PeerID) - // calls tracks calls to the methods. calls struct { + // Add holds details about calls to the Add method. + Add []struct { + // NewTxs is the newTxs argument value. + NewTxs TxSlots + } + // AddNewGoodPeer holds details about calls to the AddNewGoodPeer method. + AddNewGoodPeer []struct { + // PeerID is the peerID argument value. + PeerID PeerID + } // GetRlp holds details about calls to the GetRlp method. GetRlp []struct { // Hash is the hash argument value. @@ -54,15 +70,76 @@ type PoolMock struct { // Hash is the hash argument value. Hash []byte } - // NotifyNewPeer holds details about calls to the NotifyNewPeer method. - NotifyNewPeer []struct { - // PeerID is the peerID argument value. - PeerID PeerID - } } - lockGetRlp sync.RWMutex - lockIdHashKnown sync.RWMutex - lockNotifyNewPeer sync.RWMutex + lockAdd sync.RWMutex + lockAddNewGoodPeer sync.RWMutex + lockGetRlp sync.RWMutex + lockIdHashKnown sync.RWMutex +} + +// Add calls AddFunc. +func (mock *PoolMock) Add(newTxs TxSlots) error { + callInfo := struct { + NewTxs TxSlots + }{ + NewTxs: newTxs, + } + mock.lockAdd.Lock() + mock.calls.Add = append(mock.calls.Add, callInfo) + mock.lockAdd.Unlock() + if mock.AddFunc == nil { + var ( + errOut error + ) + return errOut + } + return mock.AddFunc(newTxs) +} + +// AddCalls gets all the calls that were made to Add. +// Check the length with: +// len(mockedPool.AddCalls()) +func (mock *PoolMock) AddCalls() []struct { + NewTxs TxSlots +} { + var calls []struct { + NewTxs TxSlots + } + mock.lockAdd.RLock() + calls = mock.calls.Add + mock.lockAdd.RUnlock() + return calls +} + +// AddNewGoodPeer calls AddNewGoodPeerFunc. +func (mock *PoolMock) AddNewGoodPeer(peerID PeerID) { + callInfo := struct { + PeerID PeerID + }{ + PeerID: peerID, + } + mock.lockAddNewGoodPeer.Lock() + mock.calls.AddNewGoodPeer = append(mock.calls.AddNewGoodPeer, callInfo) + mock.lockAddNewGoodPeer.Unlock() + if mock.AddNewGoodPeerFunc == nil { + return + } + mock.AddNewGoodPeerFunc(peerID) +} + +// AddNewGoodPeerCalls gets all the calls that were made to AddNewGoodPeer. +// Check the length with: +// len(mockedPool.AddNewGoodPeerCalls()) +func (mock *PoolMock) AddNewGoodPeerCalls() []struct { + PeerID PeerID +} { + var calls []struct { + PeerID PeerID + } + mock.lockAddNewGoodPeer.RLock() + calls = mock.calls.AddNewGoodPeer + mock.lockAddNewGoodPeer.RUnlock() + return calls } // GetRlp calls GetRlpFunc. @@ -132,34 +209,3 @@ func (mock *PoolMock) IdHashKnownCalls() []struct { mock.lockIdHashKnown.RUnlock() return calls } - -// NotifyNewPeer calls NotifyNewPeerFunc. -func (mock *PoolMock) NotifyNewPeer(peerID PeerID) { - callInfo := struct { - PeerID PeerID - }{ - PeerID: peerID, - } - mock.lockNotifyNewPeer.Lock() - mock.calls.NotifyNewPeer = append(mock.calls.NotifyNewPeer, callInfo) - mock.lockNotifyNewPeer.Unlock() - if mock.NotifyNewPeerFunc == nil { - return - } - mock.NotifyNewPeerFunc(peerID) -} - -// NotifyNewPeerCalls gets all the calls that were made to NotifyNewPeer. -// Check the length with: -// len(mockedPool.NotifyNewPeerCalls()) -func (mock *PoolMock) NotifyNewPeerCalls() []struct { - PeerID PeerID -} { - var calls []struct { - PeerID PeerID - } - mock.lockNotifyNewPeer.RLock() - calls = mock.calls.NotifyNewPeer - mock.lockNotifyNewPeer.RUnlock() - return calls -} diff --git a/txpool/pool.go b/txpool/pool.go index cef426649..e1b4adb6c 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -37,8 +37,9 @@ type Pool interface { // IdHashKnown check whether transaction with given Id hash is known to the pool IdHashKnown(hash []byte) bool GetRlp(hash []byte) []byte + Add(newTxs TxSlots) error - NotifyNewPeer(peerID PeerID) + AddNewGoodPeer(peerID PeerID) } // SubPoolMarker ordered bitset responsible to sort transactions by sub-pools. Bits meaning: @@ -199,7 +200,7 @@ func (p *TxPool) IdHashIsLocal(hash []byte) bool { } func (p *TxPool) OnNewPeer(peerID PeerID) { p.recentlyConnectedPeers.AddPeer(peerID) } -func (p *TxPool) OnNewTxs(newTxs TxSlots) error { +func (p *TxPool) Add(newTxs TxSlots) error { p.lock.Lock() defer p.lock.Unlock() if err := newTxs.Valid(); err != nil { @@ -710,9 +711,6 @@ func (mt *metaTx) Less(than *metaTx) bool { if mt.Tx.nonce != than.Tx.nonce { return mt.Tx.nonce < than.Tx.nonce } - //if mt.Tx.senderID != than.Tx.senderID { - // return mt.Tx.senderID < than.Tx.senderID - //} return false } diff --git a/txpool/pool_fuzz_test.go b/txpool/pool_fuzz_test.go index f7e889789..f0f3bfd53 100644 --- a/txpool/pool_fuzz_test.go +++ b/txpool/pool_fuzz_test.go @@ -438,7 +438,7 @@ func FuzzOnNewBlocks11(f *testing.F) { checkNotify(minedTxs1, minedTxs2, "fork2") // add some remote txs from p2p - err = pool.OnNewTxs(p2pReceived) + err = pool.Add(p2pReceived) assert.NoError(err) check(p2pReceived, TxSlots{}, "p2pmsg1") checkNotify(p2pReceived, TxSlots{}, "p2pmsg1") diff --git a/txpool/types.go b/txpool/types.go index ff3b3126a..aabceb080 100644 --- a/txpool/types.go +++ b/txpool/types.go @@ -420,6 +420,9 @@ func (s *TxSlots) Growth(targetSize int) { for s.senders.Len() < targetSize { s.senders = append(s.senders, addressesGrowth...) } + for len(s.isLocal) < targetSize { + s.isLocal = append(s.isLocal, false) + } } var addressesGrowth = make([]byte, 20)