From 8981f22943cddadd31370c064159d6ea2797d952 Mon Sep 17 00:00:00 2001 From: "net.wyman" <106940772+m-wyman@users.noreply.github.com> Date: Fri, 14 Oct 2022 05:14:09 +0800 Subject: [PATCH] fix deadlock in filters.go (#5734) When the filter is in the onNewTxs function, the subscription function exits and no longer receives information from the msg channel. In this case, the unsubscribe_xxx function is triggered, which will cause filter.mu read and write locks to enter a deadlock state Co-authored-by: dc Co-authored-by: Alexey Sharp --- turbo/rpchelper/filters.go | 74 +++++++++++++++++++++++++++----------- 1 file changed, 54 insertions(+), 20 deletions(-) diff --git a/turbo/rpchelper/filters.go b/turbo/rpchelper/filters.go index c9222b506..8ae7b7002 100644 --- a/turbo/rpchelper/filters.go +++ b/turbo/rpchelper/filters.go @@ -312,17 +312,34 @@ func (ff *Filters) SubscribeNewHeads(out chan *types.Header) HeadsSubID { } func (ff *Filters) UnsubscribeHeads(id HeadsSubID) bool { - ff.mu.Lock() - defer ff.mu.Unlock() - if ch, ok := ff.headsSubs[id]; ok { - close(ch) - delete(ff.headsSubs, id) - ff.storeMu.Lock() - defer ff.storeMu.Unlock() - delete(ff.pendingHeadsStores, id) - return true + ff.mu.RLock() + ch, ok := ff.headsSubs[id] + ff.mu.RUnlock() + if !ok { + return false + } + // Drain the channel to avoid the deadlock in the OnNewEvent function + // Draining of the channel is safe without a lock because it does not panic + // when the channel is closed + for { + select { + case <-ch: + default: + ff.mu.Lock() + defer ff.mu.Unlock() + // Need to re-check the channel because it might have been closed and removed from the map + // which we were not holding the lock + if ch, ok = ff.headsSubs[id]; !ok { + return false + } + close(ch) + delete(ff.headsSubs, id) + ff.storeMu.Lock() + delete(ff.pendingHeadsStores, id) + ff.storeMu.Unlock() + return true + } } - return false } func (ff *Filters) SubscribePendingLogs(c chan types.Logs) PendingLogsSubID { @@ -362,17 +379,34 @@ func (ff *Filters) SubscribePendingTxs(out chan []types.Transaction) PendingTxsS } func (ff *Filters) UnsubscribePendingTxs(id PendingTxsSubID) bool { - ff.mu.Lock() - defer ff.mu.Unlock() - if ch, ok := ff.pendingTxsSubs[id]; ok { - close(ch) - delete(ff.pendingTxsSubs, id) - ff.storeMu.Lock() - defer ff.storeMu.Unlock() - delete(ff.pendingTxsStores, id) - return true + ff.mu.RLock() + ch, ok := ff.pendingTxsSubs[id] + ff.mu.RUnlock() + if !ok { + return false + } + // Drain the channel to avoid the deadlock in the OnNewTxs function + // Draining of the channel is safe without a lock because it does not panic + // when the channel is closed + for { + select { + case <-ch: + default: + ff.mu.Lock() + defer ff.mu.Unlock() + // Need to re-check the channel because it might have been closed and removed from the map + // which we were not holding the lock + if ch, ok = ff.pendingTxsSubs[id]; !ok { + return false + } + close(ch) + delete(ff.pendingTxsSubs, id) + ff.storeMu.Lock() + delete(ff.pendingTxsStores, id) + ff.storeMu.Unlock() + return true + } } - return false } func (ff *Filters) SubscribeLogs(out chan *types.Log, crit filters.FilterCriteria) LogsSubID {