use PrioritizedSend method (#553)

This commit is contained in:
Alex Sharov 2022-07-26 10:37:29 +07:00 committed by GitHub
parent bc7fda6c72
commit 7a1c2f700b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 17 additions and 11 deletions

View File

@ -44,3 +44,18 @@ func SafeClose(ch chan struct{}) {
close(ch) close(ch)
} }
} }
// PrioritizedSend message to channel, but if channel is full (slow consumer) - drop half of old messages (not new)
func PrioritizedSend[t any](ch chan t, msg t) {
select {
case ch <- msg:
default: //if channel is full (slow consumer), drop old messages (not new)
for i := 0; i < cap(ch)/2; i++ {
select {
case <-ch:
default:
}
}
ch <- msg
}
}

View File

@ -24,6 +24,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote" "github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
"github.com/ledgerwatch/erigon-lib/gointerfaces/types" "github.com/ledgerwatch/erigon-lib/gointerfaces/types"
"github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv"
@ -320,17 +321,7 @@ func (s *StateChangePubSub) Pub(reply *remote.StateChangeBatch) {
s.mu.RLock() s.mu.RLock()
defer s.mu.RUnlock() defer s.mu.RUnlock()
for _, ch := range s.chans { for _, ch := range s.chans {
select { common.PrioritizedSend(ch, reply)
case ch <- reply:
default: //if channel is full (slow consumer), drop old messages
for i := 0; i < cap(ch)/2; i++ {
select {
case <-ch:
default:
}
}
ch <- reply
}
} }
} }