diff --git a/common/chan.go b/common/chan.go index ec7ec7a3f..ac9fdbf6f 100644 --- a/common/chan.go +++ b/common/chan.go @@ -44,3 +44,18 @@ func SafeClose(ch chan struct{}) { close(ch) } } + +// PrioritizedSend message to channel, but if channel is full (slow consumer) - drop half of old messages (not new) +func PrioritizedSend[t any](ch chan t, msg t) { + select { + case ch <- msg: + default: //if channel is full (slow consumer), drop old messages (not new) + for i := 0; i < cap(ch)/2; i++ { + select { + case <-ch: + default: + } + } + ch <- msg + } +} diff --git a/kv/remotedbserver/server.go b/kv/remotedbserver/server.go index 03334e969..73f7f6ad4 100644 --- a/kv/remotedbserver/server.go +++ b/kv/remotedbserver/server.go @@ -24,6 +24,7 @@ import ( "sync" "time" + "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/gointerfaces/remote" "github.com/ledgerwatch/erigon-lib/gointerfaces/types" "github.com/ledgerwatch/erigon-lib/kv" @@ -320,17 +321,7 @@ func (s *StateChangePubSub) Pub(reply *remote.StateChangeBatch) { s.mu.RLock() defer s.mu.RUnlock() for _, ch := range s.chans { - select { - case ch <- reply: - default: //if channel is full (slow consumer), drop old messages - for i := 0; i < cap(ch)/2; i++ { - select { - case <-ch: - default: - } - } - ch <- reply - } + common.PrioritizedSend(ch, reply) } }