Fix subscription closing race (#7788)

`go test -p 1 -race -count=100 -run='TestFiltersDeadlock_Test'
./turbo/rpchelper/...`
This commit is contained in:
Alex Sharov 2023-06-23 10:07:51 +07:00 committed by GitHub
parent ded166d73d
commit f8cb4d6dbb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -2,7 +2,6 @@ package rpchelper
import ( import (
"sync" "sync"
"sync/atomic"
) )
// a simple interface for subscriptions for rpc helper // a simple interface for subscriptions for rpc helper
@ -12,14 +11,14 @@ type Sub[T any] interface {
} }
type chan_sub[T any] struct { type chan_sub[T any] struct {
lock sync.Mutex // protects all fileds of this struct
ch chan T ch chan T
closed atomic.Bool closed bool
} }
// buffered channel // newChanSub - buffered channel
func newChanSub[T any](size int) *chan_sub[T] { func newChanSub[T any](size int) *chan_sub[T] {
// set min size to 8. if size < 8 { // set min size to 8
if size < 8 {
size = 8 size = 8
} }
o := &chan_sub[T]{} o := &chan_sub[T]{}
@ -27,19 +26,23 @@ func newChanSub[T any](size int) *chan_sub[T] {
return o return o
} }
func (s *chan_sub[T]) Send(x T) { func (s *chan_sub[T]) Send(x T) {
if s.closed.Load() { s.lock.Lock()
defer s.lock.Unlock()
if s.closed {
return return
} }
select { select {
case s.ch <- x: case s.ch <- x:
default: // the sub is overloaded, dispose message default: // the sub is overloaded, dispose message
} }
} }
func (s *chan_sub[T]) Close() { func (s *chan_sub[T]) Close() {
if swapped := s.closed.CompareAndSwap(false, true); !swapped { s.lock.Lock()
defer s.lock.Unlock()
if s.closed {
return return
} }
s.closed = true
close(s.ch) close(s.ch)
} }