erigon-pulse/turbo/rpchelper/subscription.go
a 06de4aeb91
filterlogs deadlock fix (#6429)
Refactors filters.go such that map+locks are now in their own class 

Move logic for safely canceling & draining channel to its own class

changed subscriptions to ask for size of buffer and construct its own
channel

marked as draft b/c need to do live testing.
2023-01-02 11:42:40 +07:00

133 lines
2.3 KiB
Go

package rpchelper
import (
"context"
"sync"
)
// a simple interface for subscriptions for rpc helper
type Sub[T any] interface {
Send(T)
Close()
}
type chan_sub[T any] struct {
ch chan T
closed chan struct{}
ctx context.Context
cn context.CancelFunc
c sync.Cond
}
// buffered channel
func newChanSub[T any](size int) *chan_sub[T] {
// set min size to 8.
if size < 8 {
size = 8
}
o := &chan_sub[T]{}
o.ch = make(chan T, size)
o.closed = make(chan struct{})
o.ctx, o.cn = context.WithCancel(context.Background())
return o
}
func (s *chan_sub[T]) Send(x T) {
select {
// if the output buffer is empty, send
case s.ch <- x:
// if sub is canceled, dispose message
case <-s.ctx.Done():
// the sub is overloaded, dispose message
default:
}
}
func (s *chan_sub[T]) Close() {
select {
case <-s.ctx.Done():
return
default:
}
// its possible for multiple goroutines to get to this point, if Close is called twice at the same time
// close the context - allows any sends to exit
s.cn()
select {
case s.closed <- struct{}{}:
// but it is not possible for multiple goroutines to get to this point
// drain the channel
for range s.ch {
}
close(s.ch)
default:
}
}
func NewSyncMap[K comparable, T any]() *SyncMap[K, T] {
return &SyncMap[K, T]{
m: make(map[K]T),
}
}
type SyncMap[K comparable, T any] struct {
m map[K]T
mu sync.RWMutex
}
func (m *SyncMap[K, T]) Get(k K) (res T, ok bool) {
m.mu.RLock()
defer m.mu.RUnlock()
res, ok = m.m[k]
return res, ok
}
func (m *SyncMap[K, T]) Put(k K, v T) (T, bool) {
m.mu.Lock()
defer m.mu.Unlock()
old, ok := m.m[k]
m.m[k] = v
return old, ok
}
func (m *SyncMap[K, T]) Do(k K, fn func(T, bool) (T, bool)) (after T, ok bool) {
m.mu.Lock()
defer m.mu.Unlock()
val, ok := m.m[k]
nv, save := fn(val, ok)
if save {
m.m[k] = nv
}
return nv, ok
}
func (m *SyncMap[K, T]) DoAndStore(k K, fn func(t T, ok bool) T) (after T, ok bool) {
return m.Do(k, func(t T, b bool) (T, bool) {
res := fn(t, b)
return res, true
})
}
func (m *SyncMap[K, T]) Range(fn func(k K, v T) error) error {
m.mu.RLock()
defer m.mu.RUnlock()
for k, v := range m.m {
if err := fn(k, v); err != nil {
return err
}
}
return nil
}
func (m *SyncMap[K, T]) Delete(k K) (T, bool) {
var t T
m.mu.Lock()
defer m.mu.Unlock()
val, ok := m.m[k]
if !ok {
return t, false
}
delete(m.m, k)
return val, true
}