diff --git a/core/bloombits/matcher_test.go b/core/bloombits/matcher_test.go index f95d0ea9e..2e15e7aac 100644 --- a/core/bloombits/matcher_test.go +++ b/core/bloombits/matcher_test.go @@ -85,7 +85,7 @@ func TestWildcardMatcher(t *testing.T) { } // makeRandomIndexes generates a random filter system, composed on multiple filter -// criteria, each having one bloom list component for the address and arbitrarilly +// criteria, each having one bloom list component for the address and arbitrarily // many topic bloom list components. func makeRandomIndexes(lengths []int, max int) [][]bloomIndexes { res := make([][]bloomIndexes, len(lengths)) diff --git a/core/tx_journal.go b/core/tx_journal.go index 94a9ff9b8..3fd8ece49 100644 --- a/core/tx_journal.go +++ b/core/tx_journal.go @@ -31,6 +31,15 @@ import ( // into the journal, but no such file is currently open. var errNoActiveJournal = errors.New("no active journal") +// devNull is a WriteCloser that just discards anything written into it. Its +// goal is to allow the transaction journal to write into a fake journal when +// loading transactions on startup without printing warnings due to no file +// being readt for write. +type devNull struct{} + +func (*devNull) Write(p []byte) (n int, err error) { return len(p), nil } +func (*devNull) Close() error { return nil } + // txJournal is a rotating log of transactions with the aim of storing locally // created transactions to allow non-executed ones to survive node restarts. type txJournal struct { @@ -59,6 +68,10 @@ func (journal *txJournal) load(add func(*types.Transaction) error) error { } defer input.Close() + // Temporarilly discard any journal additions (don't double add on load) + journal.writer = new(devNull) + defer func() { journal.writer = nil }() + // Inject all transactions from the journal into the pool stream := rlp.NewStream(input, 0) total, dropped := 0, 0 diff --git a/core/tx_pool.go b/core/tx_pool.go index 0ad765179..a705e36d6 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -640,6 +640,10 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) { pool.journalTx(from, tx) log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To()) + + // We've directly injected a replacement transaction, notify subsystems + go pool.txFeed.Send(TxPreEvent{tx}) + return old != nil, nil } // New transaction isn't replacing a pending one, push into queue @@ -729,6 +733,7 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T // Set the potentially new pending nonce and notify any subsystems of the new tx pool.beats[addr] = time.Now() pool.pendingState.SetNonce(addr, tx.Nonce()+1) + go pool.txFeed.Send(TxPreEvent{tx}) } diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go index 17d736877..eec128cba 100644 --- a/core/tx_pool_test.go +++ b/core/tx_pool_test.go @@ -117,6 +117,28 @@ func validateTxPoolInternals(pool *TxPool) error { return nil } +// validateEvents checks that the correct number of transaction addition events +// were fired on the pool's event feed. +func validateEvents(events chan TxPreEvent, count int) error { + for i := 0; i < count; i++ { + select { + case <-events: + case <-time.After(time.Second): + return fmt.Errorf("event #%d not fired", i) + } + } + select { + case tx := <-events: + return fmt.Errorf("more than %d events fired: %v", count, tx.Tx) + + case <-time.After(50 * time.Millisecond): + // This branch should be "default", but it's a data race between goroutines, + // reading the event channel and pushng into it, so better wait a bit ensuring + // really nothing gets injected. + } + return nil +} + func deriveSender(tx *types.Transaction) (common.Address, error) { return types.Sender(types.HomesteadSigner{}, tx) } @@ -149,7 +171,9 @@ func (c *testChain) State() (*state.StateDB, error) { // This test simulates a scenario where a new block is imported during a // state reset and tests whether the pending state is in sync with the // block head event that initiated the resetState(). -func TestStateChangeDuringPoolReset(t *testing.T) { +func TestStateChangeDuringTransactionPoolReset(t *testing.T) { + t.Parallel() + var ( db, _ = ethdb.NewMemDatabase() key, _ = crypto.GenerateKey() @@ -201,6 +225,8 @@ func TestStateChangeDuringPoolReset(t *testing.T) { } func TestInvalidTransactions(t *testing.T) { + t.Parallel() + pool, key := setupTxPool() defer pool.Stop() @@ -236,6 +262,8 @@ func TestInvalidTransactions(t *testing.T) { } func TestTransactionQueue(t *testing.T) { + t.Parallel() + pool, key := setupTxPool() defer pool.Stop() @@ -287,7 +315,9 @@ func TestTransactionQueue(t *testing.T) { } } -func TestNegativeValue(t *testing.T) { +func TestTransactionNegativeValue(t *testing.T) { + t.Parallel() + pool, key := setupTxPool() defer pool.Stop() @@ -300,6 +330,8 @@ func TestNegativeValue(t *testing.T) { } func TestTransactionChainFork(t *testing.T) { + t.Parallel() + pool, key := setupTxPool() defer pool.Stop() @@ -328,6 +360,8 @@ func TestTransactionChainFork(t *testing.T) { } func TestTransactionDoubleNonce(t *testing.T) { + t.Parallel() + pool, key := setupTxPool() defer pool.Stop() @@ -376,7 +410,9 @@ func TestTransactionDoubleNonce(t *testing.T) { } } -func TestMissingNonce(t *testing.T) { +func TestTransactionMissingNonce(t *testing.T) { + t.Parallel() + pool, key := setupTxPool() defer pool.Stop() @@ -398,6 +434,8 @@ func TestMissingNonce(t *testing.T) { } func TestTransactionNonceRecovery(t *testing.T) { + t.Parallel() + const n = 10 pool, key := setupTxPool() defer pool.Stop() @@ -422,6 +460,8 @@ func TestTransactionNonceRecovery(t *testing.T) { // Tests that if an account runs out of funds, any pending and queued transactions // are dropped. func TestTransactionDropping(t *testing.T) { + t.Parallel() + // Create a test account and fund it pool, key := setupTxPool() defer pool.Stop() @@ -515,6 +555,8 @@ func TestTransactionDropping(t *testing.T) { // of fund), all consecutive (still valid, but not executable) transactions are // postponed back into the future queue to prevent broadcasting them. func TestTransactionPostponing(t *testing.T) { + t.Parallel() + // Create a test account and fund it pool, key := setupTxPool() defer pool.Stop() @@ -586,9 +628,68 @@ func TestTransactionPostponing(t *testing.T) { } } +// Tests that if the transaction pool has both executable and non-executable +// transactions from an origin account, filling the nonce gap moves all queued +// ones into the pending pool. +func TestTransactionGapFilling(t *testing.T) { + t.Parallel() + + // Create a test account and fund it + pool, key := setupTxPool() + defer pool.Stop() + + account, _ := deriveSender(transaction(0, big.NewInt(0), key)) + pool.currentState.AddBalance(account, big.NewInt(1000000)) + + // Keep track of transaction events to ensure all executables get announced + events := make(chan TxPreEvent, testTxPoolConfig.AccountQueue+5) + sub := pool.txFeed.Subscribe(events) + defer sub.Unsubscribe() + + // Create a pending and a queued transaction with a nonce-gap in between + if err := pool.AddRemote(transaction(0, big.NewInt(100000), key)); err != nil { + t.Fatalf("failed to add pending transaction: %v", err) + } + if err := pool.AddRemote(transaction(2, big.NewInt(100000), key)); err != nil { + t.Fatalf("failed to add queued transaction: %v", err) + } + pending, queued := pool.Stats() + if pending != 1 { + t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 1) + } + if queued != 1 { + t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1) + } + if err := validateEvents(events, 1); err != nil { + t.Fatalf("original event firing failed: %v", err) + } + if err := validateTxPoolInternals(pool); err != nil { + t.Fatalf("pool internal state corrupted: %v", err) + } + // Fill the nonce gap and ensure all transactions become pending + if err := pool.AddRemote(transaction(1, big.NewInt(100000), key)); err != nil { + t.Fatalf("failed to add gapped transaction: %v", err) + } + pending, queued = pool.Stats() + if pending != 3 { + t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 3) + } + if queued != 0 { + t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0) + } + if err := validateEvents(events, 2); err != nil { + t.Fatalf("gap-filling event firing failed: %v", err) + } + if err := validateTxPoolInternals(pool); err != nil { + t.Fatalf("pool internal state corrupted: %v", err) + } +} + // Tests that if the transaction count belonging to a single account goes above // some threshold, the higher transactions are dropped to prevent DOS attacks. func TestTransactionQueueAccountLimiting(t *testing.T) { + t.Parallel() + // Create a test account and fund it pool, key := setupTxPool() defer pool.Stop() @@ -632,6 +733,8 @@ func TestTransactionQueueGlobalLimitingNoLocals(t *testing.T) { } func testTransactionQueueGlobalLimiting(t *testing.T, nolocals bool) { + t.Parallel() + // Create the pool to test the limit enforcement with db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) @@ -782,6 +885,8 @@ func testTransactionQueueTimeLimiting(t *testing.T, nolocals bool) { // above some threshold, as long as the transactions are executable, they are // accepted. func TestTransactionPendingLimiting(t *testing.T) { + t.Parallel() + // Create a test account and fund it pool, key := setupTxPool() defer pool.Stop() @@ -789,6 +894,11 @@ func TestTransactionPendingLimiting(t *testing.T) { account, _ := deriveSender(transaction(0, big.NewInt(0), key)) pool.currentState.AddBalance(account, big.NewInt(1000000)) + // Keep track of transaction events to ensure all executables get announced + events := make(chan TxPreEvent, testTxPoolConfig.AccountQueue+5) + sub := pool.txFeed.Subscribe(events) + defer sub.Unsubscribe() + // Keep queuing up transactions and make sure all above a limit are dropped for i := uint64(0); i < testTxPoolConfig.AccountQueue+5; i++ { if err := pool.AddRemote(transaction(i, big.NewInt(100000), key)); err != nil { @@ -804,6 +914,12 @@ func TestTransactionPendingLimiting(t *testing.T) { if len(pool.all) != int(testTxPoolConfig.AccountQueue+5) { t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), testTxPoolConfig.AccountQueue+5) } + if err := validateEvents(events, int(testTxPoolConfig.AccountQueue+5)); err != nil { + t.Fatalf("event firing failed: %v", err) + } + if err := validateTxPoolInternals(pool); err != nil { + t.Fatalf("pool internal state corrupted: %v", err) + } } // Tests that the transaction limits are enforced the same way irrelevant whether @@ -812,6 +928,8 @@ func TestTransactionQueueLimitingEquivalency(t *testing.T) { testTransactionLi func TestTransactionPendingLimitingEquivalency(t *testing.T) { testTransactionLimitingEquivalency(t, 0) } func testTransactionLimitingEquivalency(t *testing.T, origin uint64) { + t.Parallel() + // Add a batch of transactions to a pool one by one pool1, key1 := setupTxPool() defer pool1.Stop() @@ -859,6 +977,8 @@ func testTransactionLimitingEquivalency(t *testing.T, origin uint64) { // some hard threshold, the higher transactions are dropped to prevent DOS // attacks. func TestTransactionPendingGlobalLimiting(t *testing.T) { + t.Parallel() + // Create the pool to test the limit enforcement with db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) @@ -904,6 +1024,8 @@ func TestTransactionPendingGlobalLimiting(t *testing.T) { // Tests that if transactions start being capped, transactions are also removed from 'all' func TestTransactionCapClearsFromAll(t *testing.T) { + t.Parallel() + // Create the pool to test the limit enforcement with db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) @@ -937,6 +1059,8 @@ func TestTransactionCapClearsFromAll(t *testing.T) { // some hard threshold, if they are under the minimum guaranteed slot count then // the transactions are still kept. func TestTransactionPendingMinimumAllowance(t *testing.T) { + t.Parallel() + // Create the pool to test the limit enforcement with db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) @@ -984,6 +1108,8 @@ func TestTransactionPendingMinimumAllowance(t *testing.T) { // // Note, local transactions are never allowed to be dropped. func TestTransactionPoolRepricing(t *testing.T) { + t.Parallel() + // Create the pool to test the pricing enforcement with db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) @@ -992,6 +1118,11 @@ func TestTransactionPoolRepricing(t *testing.T) { pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain) defer pool.Stop() + // Keep track of transaction events to ensure all executables get announced + events := make(chan TxPreEvent, 32) + sub := pool.txFeed.Subscribe(events) + defer sub.Unsubscribe() + // Create a number of test accounts and fund them keys := make([]*ecdsa.PrivateKey, 3) for i := 0; i < len(keys); i++ { @@ -1022,6 +1153,9 @@ func TestTransactionPoolRepricing(t *testing.T) { if queued != 3 { t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 3) } + if err := validateEvents(events, 4); err != nil { + t.Fatalf("original event firing failed: %v", err) + } if err := validateTxPoolInternals(pool); err != nil { t.Fatalf("pool internal state corrupted: %v", err) } @@ -1035,6 +1169,9 @@ func TestTransactionPoolRepricing(t *testing.T) { if queued != 3 { t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 3) } + if err := validateEvents(events, 0); err != nil { + t.Fatalf("reprice event firing failed: %v", err) + } if err := validateTxPoolInternals(pool); err != nil { t.Fatalf("pool internal state corrupted: %v", err) } @@ -1045,6 +1182,9 @@ func TestTransactionPoolRepricing(t *testing.T) { if err := pool.AddRemote(pricedTransaction(2, big.NewInt(100000), big.NewInt(1), keys[1])); err != ErrUnderpriced { t.Fatalf("adding underpriced queued transaction error mismatch: have %v, want %v", err, ErrUnderpriced) } + if err := validateEvents(events, 0); err != nil { + t.Fatalf("post-reprice event firing failed: %v", err) + } if err := validateTxPoolInternals(pool); err != nil { t.Fatalf("pool internal state corrupted: %v", err) } @@ -1056,6 +1196,9 @@ func TestTransactionPoolRepricing(t *testing.T) { if pending, _ = pool.Stats(); pending != 3 { t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 3) } + if err := validateEvents(events, 1); err != nil { + t.Fatalf("post-reprice local event firing failed: %v", err) + } if err := validateTxPoolInternals(pool); err != nil { t.Fatalf("pool internal state corrupted: %v", err) } @@ -1064,6 +1207,8 @@ func TestTransactionPoolRepricing(t *testing.T) { // Tests that setting the transaction pool gas price to a higher value does not // remove local transactions. func TestTransactionPoolRepricingKeepsLocals(t *testing.T) { + t.Parallel() + // Create the pool to test the pricing enforcement with db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) @@ -1125,6 +1270,8 @@ func TestTransactionPoolRepricingKeepsLocals(t *testing.T) { // // Note, local transactions are never allowed to be dropped. func TestTransactionPoolUnderpricing(t *testing.T) { + t.Parallel() + // Create the pool to test the pricing enforcement with db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) @@ -1137,6 +1284,11 @@ func TestTransactionPoolUnderpricing(t *testing.T) { pool := NewTxPool(config, params.TestChainConfig, blockchain) defer pool.Stop() + // Keep track of transaction events to ensure all executables get announced + events := make(chan TxPreEvent, 32) + sub := pool.txFeed.Subscribe(events) + defer sub.Unsubscribe() + // Create a number of test accounts and fund them keys := make([]*ecdsa.PrivateKey, 3) for i := 0; i < len(keys); i++ { @@ -1164,6 +1316,9 @@ func TestTransactionPoolUnderpricing(t *testing.T) { if queued != 1 { t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1) } + if err := validateEvents(events, 3); err != nil { + t.Fatalf("original event firing failed: %v", err) + } if err := validateTxPoolInternals(pool); err != nil { t.Fatalf("pool internal state corrupted: %v", err) } @@ -1188,6 +1343,9 @@ func TestTransactionPoolUnderpricing(t *testing.T) { if queued != 2 { t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2) } + if err := validateEvents(events, 2); err != nil { + t.Fatalf("additional event firing failed: %v", err) + } if err := validateTxPoolInternals(pool); err != nil { t.Fatalf("pool internal state corrupted: %v", err) } @@ -1203,6 +1361,9 @@ func TestTransactionPoolUnderpricing(t *testing.T) { if queued != 2 { t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2) } + if err := validateEvents(events, 1); err != nil { + t.Fatalf("local event firing failed: %v", err) + } if err := validateTxPoolInternals(pool); err != nil { t.Fatalf("pool internal state corrupted: %v", err) } @@ -1211,6 +1372,8 @@ func TestTransactionPoolUnderpricing(t *testing.T) { // Tests that the pool rejects replacement transactions that don't meet the minimum // price bump required. func TestTransactionReplacement(t *testing.T) { + t.Parallel() + // Create the pool to test the pricing enforcement with db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) @@ -1219,6 +1382,11 @@ func TestTransactionReplacement(t *testing.T) { pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain) defer pool.Stop() + // Keep track of transaction events to ensure all executables get announced + events := make(chan TxPreEvent, 32) + sub := pool.txFeed.Subscribe(events) + defer sub.Unsubscribe() + // Create a test account to add transactions with key, _ := crypto.GenerateKey() pool.currentState.AddBalance(crypto.PubkeyToAddress(key.PublicKey), big.NewInt(1000000000)) @@ -1236,6 +1404,9 @@ func TestTransactionReplacement(t *testing.T) { if err := pool.AddRemote(pricedTransaction(0, big.NewInt(100000), big.NewInt(2), key)); err != nil { t.Fatalf("failed to replace original cheap pending transaction: %v", err) } + if err := validateEvents(events, 2); err != nil { + t.Fatalf("cheap replacement event firing failed: %v", err) + } if err := pool.AddRemote(pricedTransaction(0, big.NewInt(100000), big.NewInt(price), key)); err != nil { t.Fatalf("failed to add original proper pending transaction: %v", err) @@ -1246,6 +1417,9 @@ func TestTransactionReplacement(t *testing.T) { if err := pool.AddRemote(pricedTransaction(0, big.NewInt(100000), big.NewInt(threshold+1), key)); err != nil { t.Fatalf("failed to replace original proper pending transaction: %v", err) } + if err := validateEvents(events, 2); err != nil { + t.Fatalf("proper replacement event firing failed: %v", err) + } // Add queued transactions, ensuring the minimum price bump is enforced for replacement (for ultra low prices too) if err := pool.AddRemote(pricedTransaction(2, big.NewInt(100000), big.NewInt(1), key)); err != nil { t.Fatalf("failed to add original queued transaction: %v", err) @@ -1266,6 +1440,10 @@ func TestTransactionReplacement(t *testing.T) { if err := pool.AddRemote(pricedTransaction(2, big.NewInt(100000), big.NewInt(threshold+1), key)); err != nil { t.Fatalf("failed to replace original queued transaction: %v", err) } + + if err := validateEvents(events, 0); err != nil { + t.Fatalf("queued replacement event firing failed: %v", err) + } if err := validateTxPoolInternals(pool); err != nil { t.Fatalf("pool internal state corrupted: %v", err) } @@ -1277,6 +1455,8 @@ func TestTransactionJournaling(t *testing.T) { testTransactionJournaling func TestTransactionJournalingNoLocals(t *testing.T) { testTransactionJournaling(t, true) } func testTransactionJournaling(t *testing.T, nolocals bool) { + t.Parallel() + // Create a temporary file for the journal file, err := ioutil.TempFile("", "") if err != nil { @@ -1335,6 +1515,7 @@ func testTransactionJournaling(t *testing.T, nolocals bool) { pool.Stop() statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1) blockchain = &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed)} + pool = NewTxPool(config, params.TestChainConfig, blockchain) pending, queued = pool.Stats() @@ -1358,6 +1539,7 @@ func testTransactionJournaling(t *testing.T, nolocals bool) { pool.lockedReset(nil, nil) time.Sleep(2 * config.Rejournal) pool.Stop() + statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1) blockchain = &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed)} pool = NewTxPool(config, params.TestChainConfig, blockchain) diff --git a/event/feed.go b/event/feed.go index b1b597f17..78fa3d98d 100644 --- a/event/feed.go +++ b/event/feed.go @@ -127,6 +127,8 @@ func (f *Feed) remove(sub *feedSub) { // Send delivers to all subscribed channels simultaneously. // It returns the number of subscribers that the value was sent to. func (f *Feed) Send(value interface{}) (nsent int) { + rvalue := reflect.ValueOf(value) + f.once.Do(f.init) <-f.sendLock @@ -134,14 +136,14 @@ func (f *Feed) Send(value interface{}) (nsent int) { f.mu.Lock() f.sendCases = append(f.sendCases, f.inbox...) f.inbox = nil - f.mu.Unlock() - // Set the sent value on all channels. - rvalue := reflect.ValueOf(value) if !f.typecheck(rvalue.Type()) { f.sendLock <- struct{}{} panic(feedTypeError{op: "Send", got: rvalue.Type(), want: f.etype}) } + f.mu.Unlock() + + // Set the sent value on all channels. for i := firstSubSendCase; i < len(f.sendCases); i++ { f.sendCases[i].Send = rvalue }