mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2025-01-05 10:32:19 +00:00
event: make Unsubscribe idempotent
This commit is contained in:
parent
dac4a8f113
commit
10bbf265b2
@ -124,14 +124,16 @@ func posdelete(slice []*muxsub, pos int) []*muxsub {
|
|||||||
|
|
||||||
type muxsub struct {
|
type muxsub struct {
|
||||||
mux *TypeMux
|
mux *TypeMux
|
||||||
mutex sync.RWMutex
|
closeMu sync.Mutex
|
||||||
closing chan struct{}
|
closing chan struct{}
|
||||||
|
closed bool
|
||||||
|
|
||||||
// these two are the same channel. they are stored separately so
|
// these two are the same channel. they are stored separately so
|
||||||
// postC can be set to nil without affecting the return value of
|
// postC can be set to nil without affecting the return value of
|
||||||
// Chan.
|
// Chan.
|
||||||
readC <-chan interface{}
|
postMu sync.RWMutex
|
||||||
postC chan<- interface{}
|
readC <-chan interface{}
|
||||||
|
postC chan<- interface{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func newsub(mux *TypeMux) *muxsub {
|
func newsub(mux *TypeMux) *muxsub {
|
||||||
@ -154,18 +156,25 @@ func (s *muxsub) Unsubscribe() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *muxsub) closewait() {
|
func (s *muxsub) closewait() {
|
||||||
|
s.closeMu.Lock()
|
||||||
|
defer s.closeMu.Unlock()
|
||||||
|
if s.closed {
|
||||||
|
return
|
||||||
|
}
|
||||||
close(s.closing)
|
close(s.closing)
|
||||||
s.mutex.Lock()
|
s.closed = true
|
||||||
|
|
||||||
|
s.postMu.Lock()
|
||||||
close(s.postC)
|
close(s.postC)
|
||||||
s.postC = nil
|
s.postC = nil
|
||||||
s.mutex.Unlock()
|
s.postMu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *muxsub) deliver(ev interface{}) {
|
func (s *muxsub) deliver(ev interface{}) {
|
||||||
s.mutex.RLock()
|
s.postMu.RLock()
|
||||||
select {
|
select {
|
||||||
case s.postC <- ev:
|
case s.postC <- ev:
|
||||||
case <-s.closing:
|
case <-s.closing:
|
||||||
}
|
}
|
||||||
s.mutex.RUnlock()
|
s.postMu.RUnlock()
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user