diff --git a/event/feed.go b/event/feed.go index bd8e26321..4568304df 100644 --- a/event/feed.go +++ b/event/feed.go @@ -33,7 +33,9 @@ var errBadChannel = errors.New("event: Subscribe argument does not have sendable // // The zero value is ready to use. type Feed struct { - sendLock chan struct{} // one-element buffer, empty when held + // sendLock has a one-element buffer and is empty when held. + // It protects sendCases. + sendLock chan struct{} removeSub chan interface{} // interrupts Send sendCases caseList // the active set of select cases used by Send @@ -44,6 +46,10 @@ type Feed struct { closed bool } +// This is the index of the first actual subscription channel in sendCases. +// sendCases[0] is a SelectRecv case for the removeSub channel. +const firstSubSendCase = 1 + type feedTypeError struct { got, want reflect.Type op string @@ -67,6 +73,7 @@ func (f *Feed) init() { // until the subscription is canceled. All channels added must have the same element type. // // The channel should have ample buffer space to avoid blocking other subscribers. +// Slow subscribers are not dropped. func (f *Feed) Subscribe(channel interface{}) Subscription { chanval := reflect.ValueOf(channel) chantyp := chanval.Type() @@ -125,13 +132,14 @@ func (f *Feed) remove(sub *feedSub) { func (f *Feed) Send(value interface{}) (nsent int) { f.mu.Lock() f.init() + f.mu.Unlock() + <-f.sendLock - // Add new subscriptions from the inbox, then clear it. + + // Add new cases from the inbox after taking the send lock. + f.mu.Lock() f.sendCases = append(f.sendCases, f.inbox...) - for i := range f.inbox { - f.inbox[i] = reflect.SelectCase{} - } - f.inbox = f.inbox[:0] + f.inbox = nil f.mu.Unlock() // Set the sent value on all channels. @@ -140,7 +148,7 @@ func (f *Feed) Send(value interface{}) (nsent int) { f.sendLock <- struct{}{} panic(feedTypeError{op: "Send", got: rvalue.Type(), want: f.etype}) } - for i := 1; i < len(f.sendCases); i++ { + for i := firstSubSendCase; i < len(f.sendCases); i++ { f.sendCases[i].Send = rvalue } @@ -150,13 +158,14 @@ func (f *Feed) Send(value interface{}) (nsent int) { // Fast path: try sending without blocking before adding to the select set. // This should usually succeed if subscribers are fast enough and have free // buffer space. - for i := 1; i < len(cases); i++ { + for i := firstSubSendCase; i < len(cases); i++ { if cases[i].Chan.TrySend(rvalue) { - cases = cases.deactivate(i) nsent++ + cases = cases.deactivate(i) + i-- } } - if len(cases) == 1 { + if len(cases) == firstSubSendCase { break } // Select on all the receivers, waiting for them to unblock. @@ -174,7 +183,7 @@ func (f *Feed) Send(value interface{}) (nsent int) { } // Forget about the sent value and hand off the send lock. - for i := 1; i < len(f.sendCases); i++ { + for i := firstSubSendCase; i < len(f.sendCases); i++ { f.sendCases[i].Send = reflect.Value{} } f.sendLock <- struct{}{} diff --git a/event/feed_test.go b/event/feed_test.go index 4f897c162..a82c10303 100644 --- a/event/feed_test.go +++ b/event/feed_test.go @@ -167,6 +167,74 @@ func TestFeedSubscribeSameChannel(t *testing.T) { done.Wait() } +func TestFeedSubscribeBlockedPost(t *testing.T) { + var ( + feed Feed + nsends = 2000 + ch1 = make(chan int) + ch2 = make(chan int) + wg sync.WaitGroup + ) + defer wg.Wait() + + feed.Subscribe(ch1) + wg.Add(nsends) + for i := 0; i < nsends; i++ { + go func() { + feed.Send(99) + wg.Done() + }() + } + + sub2 := feed.Subscribe(ch2) + defer sub2.Unsubscribe() + + // We're done when ch1 has received N times. + // The number of receives on ch2 depends on scheduling. + for i := 0; i < nsends; { + select { + case <-ch1: + i++ + case <-ch2: + } + } +} + +func TestFeedUnsubscribeBlockedPost(t *testing.T) { + var ( + feed Feed + nsends = 200 + chans = make([]chan int, 2000) + subs = make([]Subscription, len(chans)) + bchan = make(chan int) + bsub = feed.Subscribe(bchan) + wg sync.WaitGroup + ) + for i := range chans { + chans[i] = make(chan int, nsends) + } + + // Queue up some Sends. None of these can make progress while bchan isn't read. + wg.Add(nsends) + for i := 0; i < nsends; i++ { + go func() { + feed.Send(99) + wg.Done() + }() + } + // Subscribe the other channels. + for i, ch := range chans { + subs[i] = feed.Subscribe(ch) + } + // Unsubscribe them again. + for _, sub := range subs { + sub.Unsubscribe() + } + // Unblock the Sends. + bsub.Unsubscribe() + wg.Wait() +} + func TestFeedUnsubscribeFromInbox(t *testing.T) { var ( feed Feed diff --git a/event/subscription.go b/event/subscription.go index 7f2619b2d..83bd21213 100644 --- a/event/subscription.go +++ b/event/subscription.go @@ -43,14 +43,14 @@ type Subscription interface { Unsubscribe() // cancels sending of events, closing the error channel } -// NewSubscription runs fn as a subscription in a new goroutine. The channel given to fn -// is closed when Unsubscribe is called. If fn returns an error, it is sent on the -// subscription's error channel. -func NewSubscription(fn func(<-chan struct{}) error) Subscription { +// NewSubscription runs a producer function as a subscription in a new goroutine. The +// channel given to the producer is closed when Unsubscribe is called. If fn returns an +// error, it is sent on the subscription's error channel. +func NewSubscription(producer func(<-chan struct{}) error) Subscription { s := &funcSub{unsub: make(chan struct{}), err: make(chan error, 1)} go func() { defer close(s.err) - err := fn(s.unsub) + err := producer(s.unsub) s.mu.Lock() defer s.mu.Unlock() if !s.unsubscribed {