From 1b14785d7946335d8bc7f50d02a7620c84809e97 Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Wed, 14 Jun 2023 16:25:05 +0700 Subject: [PATCH] ws unfreeze (#7724) attempt to address next issue: > when I'm having a lot of websocket connections the node is freezing and then it needs like 10 mins to sync. Then if I keep pushing requests it falls out of sync all the time --- cmd/rpcdaemon/commands/eth_filters.go | 20 +++++-------- turbo/rpchelper/subscription.go | 42 +++++++-------------------- 2 files changed, 19 insertions(+), 43 deletions(-) diff --git a/cmd/rpcdaemon/commands/eth_filters.go b/cmd/rpcdaemon/commands/eth_filters.go index 3945b8268..49b6a4ddb 100644 --- a/cmd/rpcdaemon/commands/eth_filters.go +++ b/cmd/rpcdaemon/commands/eth_filters.go @@ -145,12 +145,11 @@ func (api *APIImpl) NewHeads(ctx context.Context) (*rpc.Subscription, error) { if h != nil { err := notifier.Notify(rpcSub.ID, h) if err != nil { - log.Warn("error while notifying subscription", "err", err) - return + log.Warn("[rpc] error while notifying subscription", "err", err) } } if !ok { - log.Warn("new heads channel was closed") + log.Warn("[rpc] new heads channel was closed") return } case <-rpcSub.Err(): @@ -186,13 +185,12 @@ func (api *APIImpl) NewPendingTransactions(ctx context.Context) (*rpc.Subscripti if t != nil { err := notifier.Notify(rpcSub.ID, t.Hash()) if err != nil { - log.Warn("error while notifying subscription", "err", err) - return + log.Warn("[rpc] error while notifying subscription", "err", err) } } } if !ok { - log.Warn("new pending transactions channel was closed") + log.Warn("[rpc] new pending transactions channel was closed") return } case <-rpcSub.Err(): @@ -228,13 +226,12 @@ func (api *APIImpl) NewPendingTransactionsWithBody(ctx context.Context) (*rpc.Su if t != nil { err := notifier.Notify(rpcSub.ID, t) if err != nil { - log.Warn("error while notifying subscription", "err", err) - return + log.Warn("[rpc] error while notifying subscription", "err", err) } } } if !ok { - log.Warn("new pending transactions channel was closed") + log.Warn("[rpc] new pending transactions channel was closed") return } case <-rpcSub.Err(): @@ -269,12 +266,11 @@ func (api *APIImpl) Logs(ctx context.Context, crit filters.FilterCriteria) (*rpc if h != nil { err := notifier.Notify(rpcSub.ID, h) if err != nil { - log.Warn("error while notifying subscription", "err", err) - return + log.Warn("[rpc] error while notifying subscription", "err", err) } } if !ok { - log.Warn("log channel was closed") + log.Warn("[rpc] log channel was closed") return } case <-rpcSub.Err(): diff --git a/turbo/rpchelper/subscription.go b/turbo/rpchelper/subscription.go index 720787257..e7eb12cbd 100644 --- a/turbo/rpchelper/subscription.go +++ b/turbo/rpchelper/subscription.go @@ -1,8 +1,8 @@ package rpchelper import ( - "context" "sync" + "sync/atomic" ) // a simple interface for subscriptions for rpc helper @@ -12,12 +12,8 @@ type Sub[T any] interface { } type chan_sub[T any] struct { - ch chan T - - closed chan struct{} - - ctx context.Context - cn context.CancelFunc + ch chan T + closed atomic.Bool } // buffered channel @@ -28,38 +24,23 @@ func newChanSub[T any](size int) *chan_sub[T] { } o := &chan_sub[T]{} o.ch = make(chan T, size) - o.closed = make(chan struct{}) - o.ctx, o.cn = context.WithCancel(context.Background()) return o } func (s *chan_sub[T]) Send(x T) { + if s.closed.Load() { + return + } + select { - // if the output buffer is empty, send case s.ch <- x: - // if sub is canceled, dispose message - case <-s.ctx.Done(): - // the sub is overloaded, dispose message - default: + default: // the sub is overloaded, dispose message } } func (s *chan_sub[T]) Close() { - select { - case <-s.ctx.Done(): + if swapped := s.closed.CompareAndSwap(false, true); !swapped { return - default: - } - // its possible for multiple goroutines to get to this point, if Close is called twice at the same time - // close the context - allows any sends to exit - s.cn() - select { - case s.closed <- struct{}{}: - // but it is not possible for multiple goroutines to get to this point - // drain the channel - for range s.ch { - } - close(s.ch) - default: } + close(s.ch) } func NewSyncMap[K comparable, T any]() *SyncMap[K, T] { @@ -117,8 +98,7 @@ func (m *SyncMap[K, T]) Range(fn func(k K, v T) error) error { return nil } -func (m *SyncMap[K, T]) Delete(k K) (T, bool) { - var t T +func (m *SyncMap[K, T]) Delete(k K) (t T, deleted bool) { m.mu.Lock() defer m.mu.Unlock() val, ok := m.m[k]