From 92c1b86119f3a7b897cdbb3f3c1ebda70eb52db9 Mon Sep 17 00:00:00 2001 From: Sina Mahmoodi <1591639+s1na@users.noreply.github.com> Date: Thu, 21 Jan 2021 12:17:10 +0100 Subject: [PATCH] eth/filters: fix potential deadlock in filter timeout loop (#22178) This fixes #22131 and adds a test reproducing the issue. # Conflicts: # eth/filters/filter_system_test.go # les/client.go --- eth/backend.go | 2 +- eth/filters/api.go | 33 +++++---- eth/filters/filter_system_test.go | 107 +++++++++++++++++++++++++----- 3 files changed, 110 insertions(+), 32 deletions(-) diff --git a/eth/backend.go b/eth/backend.go index a9aaea74e..5da24cbfb 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -603,7 +603,7 @@ func (s *Ethereum) APIs() []rpc.API { { Namespace: "eth", Version: "1.0", - Service: filters.NewPublicFilterAPI(s.APIBackend, false), + Service: filters.NewPublicFilterAPI(s.APIBackend, false, 5*time.Minute), Public: true, }, //{ diff --git a/eth/filters/api.go b/eth/filters/api.go index 29d9206da..02e4ff86b 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -34,10 +34,6 @@ import ( "github.com/ledgerwatch/turbo-geth/rpc" ) -var ( - deadline = 5 * time.Minute // consider a filter inactive if it has not been polled for within deadline -) - // filter is a helper struct that holds meta information over the filter type // and associated subscription in the event system. type filter struct { @@ -59,26 +55,29 @@ type PublicFilterAPI struct { events *EventSystem filtersMu sync.Mutex filters map[rpc.ID]*filter + timeout time.Duration } // NewPublicFilterAPI returns a new PublicFilterAPI instance. -func NewPublicFilterAPI(backend Backend, lightMode bool) *PublicFilterAPI { +func NewPublicFilterAPI(backend Backend, lightMode bool, timeout time.Duration) *PublicFilterAPI { api := &PublicFilterAPI{ backend: backend, quit: make(chan struct{}, 1), chainDb: backend.ChainDb(), events: NewEventSystem(backend, lightMode), filters: make(map[rpc.ID]*filter), + timeout: timeout, } - go api.timeoutLoop() + go api.timeoutLoop(timeout) return api } // timeoutLoop runs every 5 minutes and deletes filters that have not been recently used. // Tt is started when the api is created. -func (api *PublicFilterAPI) timeoutLoop() { - ticker := time.NewTicker(5 * time.Minute) +func (api *PublicFilterAPI) timeoutLoop(timeout time.Duration) { + var toUninstall []*Subscription + ticker := time.NewTicker(timeout) defer ticker.Stop() for { select { @@ -92,13 +91,21 @@ func (api *PublicFilterAPI) timeoutLoop() { for id, f := range api.filters { select { case <-f.deadline.C: - f.s.Unsubscribe() + toUninstall = append(toUninstall, f.s) delete(api.filters, id) default: continue } } api.filtersMu.Unlock() + + // Unsubscribes are processed outside the lock to avoid the following scenario: + // event loop attempts broadcasting events to still active filters while + // Unsubscribe is waiting for it to process the uninstall request. + for _, s := range toUninstall { + s.Unsubscribe() + } + toUninstall = nil } } @@ -120,7 +127,7 @@ func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID { ) api.filtersMu.Lock() - api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(deadline), hashes: make([]common.Hash, 0), s: pendingTxSub} + api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(api.timeout), hashes: make([]common.Hash, 0), s: pendingTxSub} api.filtersMu.Unlock() go func() { @@ -194,7 +201,7 @@ func (api *PublicFilterAPI) NewBlockFilter() rpc.ID { ) api.filtersMu.Lock() - api.filters[headerSub.ID] = &filter{typ: BlocksSubscription, deadline: time.NewTimer(deadline), hashes: make([]common.Hash, 0), s: headerSub} + api.filters[headerSub.ID] = &filter{typ: BlocksSubscription, deadline: time.NewTimer(api.timeout), hashes: make([]common.Hash, 0), s: headerSub} api.filtersMu.Unlock() go func() { @@ -317,7 +324,7 @@ func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) (rpc.ID, error) { } api.filtersMu.Lock() - api.filters[logsSub.ID] = &filter{typ: LogsSubscription, crit: crit, deadline: time.NewTimer(deadline), logs: make([]*types.Log, 0), s: logsSub} + api.filters[logsSub.ID] = &filter{typ: LogsSubscription, crit: crit, deadline: time.NewTimer(api.timeout), logs: make([]*types.Log, 0), s: logsSub} api.filtersMu.Unlock() go func() { @@ -444,7 +451,7 @@ func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) { // receive timer value and reset timer <-f.deadline.C } - f.deadline.Reset(deadline) + f.deadline.Reset(api.timeout) switch f.typ { case PendingTransactionsSubscription, BlocksSubscription: diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index 3a09336f9..57ef69c17 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -22,22 +22,26 @@ import ( "math/big" "math/rand" "reflect" + "runtime" "testing" "time" "github.com/holiman/uint256" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/consensus/ethash" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/bloombits" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/params" + "github.com/ethereum/go-ethereum/rpc" +) + +var ( + deadline = 5 * time.Minute - ethereum "github.com/ledgerwatch/turbo-geth" - "github.com/ledgerwatch/turbo-geth/common" - "github.com/ledgerwatch/turbo-geth/consensus/ethash" - "github.com/ledgerwatch/turbo-geth/core" - "github.com/ledgerwatch/turbo-geth/core/bloombits" - "github.com/ledgerwatch/turbo-geth/core/rawdb" - "github.com/ledgerwatch/turbo-geth/core/types" - "github.com/ledgerwatch/turbo-geth/ethdb" - "github.com/ledgerwatch/turbo-geth/event" - "github.com/ledgerwatch/turbo-geth/params" - "github.com/ledgerwatch/turbo-geth/rpc" ) type testBackend struct { @@ -173,7 +177,7 @@ func TestBlockSubscription(t *testing.T) { defer db.Close() var ( backend = &testBackend{db: db} - api = NewPublicFilterAPI(backend, false) + api = NewPublicFilterAPI(backend, false, deadline) genesis = (&core.Genesis{Config: params.TestChainConfig}).MustCommit(db) chain, _, _ = core.GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 10, func(i int, gen *core.BlockGen) {}, false /* intermediateHashes */) chainEvents = []core.ChainEvent{} @@ -227,7 +231,7 @@ func TestPendingTxFilter(t *testing.T) { var ( backend = &testBackend{db: db} - api = NewPublicFilterAPI(backend, false) + api = NewPublicFilterAPI(backend, false, deadline) transactions = []*types.Transaction{ types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(uint256.Int), 0, new(uint256.Int), nil), @@ -283,7 +287,7 @@ func TestLogFilterCreation(t *testing.T) { defer db.Close() var ( backend = &testBackend{db: db} - api = NewPublicFilterAPI(backend, false) + api = NewPublicFilterAPI(backend, false, deadline) testCases = []struct { crit FilterCriteria @@ -327,7 +331,7 @@ func TestInvalidLogFilterCreation(t *testing.T) { defer db.Close() var ( backend = &testBackend{db: db} - api = NewPublicFilterAPI(backend, false) + api = NewPublicFilterAPI(backend, false, deadline) ) // different situations where log filter creation should fail. @@ -350,7 +354,7 @@ func TestInvalidGetLogsRequest(t *testing.T) { defer db.Close() var ( backend = &testBackend{db: db} - api = NewPublicFilterAPI(backend, false) + api = NewPublicFilterAPI(backend, false, deadline) blockHash = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111") ) @@ -376,7 +380,7 @@ func TestLogFilter(t *testing.T) { defer db.Close() var ( backend = &testBackend{db: db} - api = NewPublicFilterAPI(backend, false) + api = NewPublicFilterAPI(backend, false, deadline) firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111") secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222") @@ -491,7 +495,7 @@ func TestPendingLogsSubscription(t *testing.T) { defer db.Close() var ( backend = &testBackend{db: db} - api = NewPublicFilterAPI(backend, false) + api = NewPublicFilterAPI(backend, false, deadline) firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111") secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222") @@ -617,6 +621,73 @@ func TestPendingLogsSubscription(t *testing.T) { } } +// TestPendingTxFilterDeadlock tests if the event loop hangs when pending +// txes arrive at the same time that one of multiple filters is timing out. +// Please refer to #22131 for more details. +func TestPendingTxFilterDeadlock(t *testing.T) { + t.Parallel() + timeout := 100 * time.Millisecond + + var ( + db = rawdb.NewMemoryDatabase() + backend = &testBackend{db: db} + api = NewPublicFilterAPI(backend, false, timeout) + done = make(chan struct{}) + ) + + go func() { + // Bombard feed with txes until signal was received to stop + i := uint64(0) + for { + select { + case <-done: + return + default: + } + + tx := types.NewTransaction(i, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil) + backend.txFeed.Send(core.NewTxsEvent{Txs: []*types.Transaction{tx}}) + i++ + } + }() + + // Create a bunch of filters that will + // timeout either in 100ms or 200ms + fids := make([]rpc.ID, 20) + for i := 0; i < len(fids); i++ { + fid := api.NewPendingTransactionFilter() + fids[i] = fid + // Wait for at least one tx to arrive in filter + for { + hashes, err := api.GetFilterChanges(fid) + if err != nil { + t.Fatalf("Filter should exist: %v\n", err) + } + if len(hashes.([]common.Hash)) > 0 { + break + } + runtime.Gosched() + } + } + + // Wait until filters have timed out + time.Sleep(3 * timeout) + + // If tx loop doesn't consume `done` after a second + // it's hanging. + select { + case done <- struct{}{}: + // Check that all filters have been uninstalled + for _, fid := range fids { + if _, err := api.GetFilterChanges(fid); err == nil { + t.Errorf("Filter %s should have been uninstalled\n", fid) + } + } + case <-time.After(1 * time.Second): + t.Error("Tx sending loop hangs") + } +} + func flattenLogs(pl [][]*types.Log) []*types.Log { //nolint: prealloc var logs []*types.Log