mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2025-01-03 09:37:38 +00:00
fix deadlock in filters.go (#5734)
When the filter is in the onNewTxs function, the subscription function exits and no longer receives information from the msg channel. In this case, the unsubscribe_xxx function is triggered, which will cause filter.mu read and write locks to enter a deadlock state Co-authored-by: dc <dctrlbox@gmail.com> Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local>
This commit is contained in:
parent
5785f4ecec
commit
8981f22943
@ -312,17 +312,34 @@ func (ff *Filters) SubscribeNewHeads(out chan *types.Header) HeadsSubID {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (ff *Filters) UnsubscribeHeads(id HeadsSubID) bool {
|
func (ff *Filters) UnsubscribeHeads(id HeadsSubID) bool {
|
||||||
ff.mu.Lock()
|
ff.mu.RLock()
|
||||||
defer ff.mu.Unlock()
|
ch, ok := ff.headsSubs[id]
|
||||||
if ch, ok := ff.headsSubs[id]; ok {
|
ff.mu.RUnlock()
|
||||||
close(ch)
|
if !ok {
|
||||||
delete(ff.headsSubs, id)
|
return false
|
||||||
ff.storeMu.Lock()
|
}
|
||||||
defer ff.storeMu.Unlock()
|
// Drain the channel to avoid the deadlock in the OnNewEvent function
|
||||||
delete(ff.pendingHeadsStores, id)
|
// Draining of the channel is safe without a lock because it does not panic
|
||||||
return true
|
// when the channel is closed
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ch:
|
||||||
|
default:
|
||||||
|
ff.mu.Lock()
|
||||||
|
defer ff.mu.Unlock()
|
||||||
|
// Need to re-check the channel because it might have been closed and removed from the map
|
||||||
|
// which we were not holding the lock
|
||||||
|
if ch, ok = ff.headsSubs[id]; !ok {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
close(ch)
|
||||||
|
delete(ff.headsSubs, id)
|
||||||
|
ff.storeMu.Lock()
|
||||||
|
delete(ff.pendingHeadsStores, id)
|
||||||
|
ff.storeMu.Unlock()
|
||||||
|
return true
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return false
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ff *Filters) SubscribePendingLogs(c chan types.Logs) PendingLogsSubID {
|
func (ff *Filters) SubscribePendingLogs(c chan types.Logs) PendingLogsSubID {
|
||||||
@ -362,17 +379,34 @@ func (ff *Filters) SubscribePendingTxs(out chan []types.Transaction) PendingTxsS
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (ff *Filters) UnsubscribePendingTxs(id PendingTxsSubID) bool {
|
func (ff *Filters) UnsubscribePendingTxs(id PendingTxsSubID) bool {
|
||||||
ff.mu.Lock()
|
ff.mu.RLock()
|
||||||
defer ff.mu.Unlock()
|
ch, ok := ff.pendingTxsSubs[id]
|
||||||
if ch, ok := ff.pendingTxsSubs[id]; ok {
|
ff.mu.RUnlock()
|
||||||
close(ch)
|
if !ok {
|
||||||
delete(ff.pendingTxsSubs, id)
|
return false
|
||||||
ff.storeMu.Lock()
|
}
|
||||||
defer ff.storeMu.Unlock()
|
// Drain the channel to avoid the deadlock in the OnNewTxs function
|
||||||
delete(ff.pendingTxsStores, id)
|
// Draining of the channel is safe without a lock because it does not panic
|
||||||
return true
|
// when the channel is closed
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ch:
|
||||||
|
default:
|
||||||
|
ff.mu.Lock()
|
||||||
|
defer ff.mu.Unlock()
|
||||||
|
// Need to re-check the channel because it might have been closed and removed from the map
|
||||||
|
// which we were not holding the lock
|
||||||
|
if ch, ok = ff.pendingTxsSubs[id]; !ok {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
close(ch)
|
||||||
|
delete(ff.pendingTxsSubs, id)
|
||||||
|
ff.storeMu.Lock()
|
||||||
|
delete(ff.pendingTxsStores, id)
|
||||||
|
ff.storeMu.Unlock()
|
||||||
|
return true
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return false
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ff *Filters) SubscribeLogs(out chan *types.Log, crit filters.FilterCriteria) LogsSubID {
|
func (ff *Filters) SubscribeLogs(out chan *types.Log, crit filters.FilterCriteria) LogsSubID {
|
||||||
|
Loading…
Reference in New Issue
Block a user