state and newHead: 1 goroutine per subscriber #309

This commit is contained in:
Alex Sharov 2022-02-10 08:26:08 +07:00 committed by GitHub
parent 2e162c70ba
commit 8e4618c57d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -26,7 +26,6 @@ import (
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
"github.com/ledgerwatch/erigon-lib/gointerfaces/types"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/log/v3"
"google.golang.org/protobuf/types/known/emptypb"
)
@ -46,7 +45,7 @@ type KvServer struct {
remote.UnimplementedKVServer // must be embedded to have forward compatible implementations.
kv kv.RwDB
stateChangeStreams *StateChangeStreams
stateChangeStreams *StateChangePubSub
ctx context.Context
}
@ -260,94 +259,80 @@ func bytesCopy(b []byte) []byte {
}
func (s *KvServer) StateChanges(req *remote.StateChangeRequest, server remote.KV_StateChangesServer) error {
clean := s.stateChangeStreams.Add(server)
defer clean()
select {
case <-s.ctx.Done():
return nil
case <-server.Context().Done():
return nil
ch, remove := s.stateChangeStreams.Sub()
defer remove()
for {
select {
case reply := <-ch:
if err := server.Send(reply); err != nil {
return err
}
case <-s.ctx.Done():
return nil
case <-server.Context().Done():
return nil
}
}
}
func (s *KvServer) SendStateChanges(ctx context.Context, sc *remote.StateChangeBatch) {
if err := s.stateChangeStreams.Broadcast(ctx, sc); err != nil {
log.Warn("Sending new peer notice to core P2P failed", "error", err)
}
s.stateChangeStreams.Pub(sc)
}
type StateChangeStreams struct {
mu sync.RWMutex
id uint
streams map[uint]remote.KV_StateChangesServer
type StateChangePubSub struct {
mu sync.RWMutex
id uint
chans map[uint]chan *remote.StateChangeBatch
}
func newStateChangeStreams() *StateChangeStreams {
return &StateChangeStreams{}
func newStateChangeStreams() *StateChangePubSub {
return &StateChangePubSub{}
}
func (s *StateChangeStreams) Add(stream remote.KV_StateChangesServer) (remove func()) {
func (s *StateChangePubSub) Sub() (ch chan *remote.StateChangeBatch, remove func()) {
s.mu.Lock()
defer s.mu.Unlock()
if s.streams == nil {
s.streams = make(map[uint]remote.KV_StateChangesServer)
if s.chans == nil {
s.chans = make(map[uint]chan *remote.StateChangeBatch)
}
s.id++
id := s.id
s.streams[id] = stream
return func() { s.remove(id) }
ch = make(chan *remote.StateChangeBatch, 8)
s.chans[id] = ch
return ch, func() { s.remove(id) }
}
func (s *StateChangeStreams) doBroadcast(ctx context.Context, reply *remote.StateChangeBatch) (ids []uint, errs []error) {
func (s *StateChangePubSub) Pub(reply *remote.StateChangeBatch) {
s.mu.RLock()
defer s.mu.RUnlock()
Loop:
for id, stream := range s.streams {
for _, ch := range s.chans {
select {
case <-ctx.Done():
errs = append(errs, ctx.Err())
break Loop
default:
}
err := stream.Send(reply)
if err != nil {
select {
case <-stream.Context().Done():
ids = append(ids, id)
default:
case ch <- reply:
default: //if channel is full (slow consumer), drop old messages
for i := 0; i < cap(ch)/2; i++ {
select {
case <-ch:
default:
}
}
errs = append(errs, err)
ch <- reply
}
}
return
}
func (s *StateChangeStreams) Broadcast(ctx context.Context, reply *remote.StateChangeBatch) (errs []error) {
var ids []uint
ids, errs = s.doBroadcast(ctx, reply)
if len(ids) == 0 {
return errs
}
s.mu.Lock()
defer s.mu.Unlock()
for _, id := range ids {
delete(s.streams, id)
}
return errs
}
func (s *StateChangeStreams) Len() int {
func (s *StateChangePubSub) Len() int {
s.mu.RLock()
defer s.mu.RUnlock()
return len(s.streams)
return len(s.chans)
}
func (s *StateChangeStreams) remove(id uint) {
func (s *StateChangePubSub) remove(id uint) {
s.mu.Lock()
defer s.mu.Unlock()
_, ok := s.streams[id]
ch, ok := s.chans[id]
if !ok { // double-unsubscribe support
return
}
delete(s.streams, id)
close(ch)
delete(s.chans, id)
}