diff --git a/kv/remotedbserver/server.go b/kv/remotedbserver/server.go index f9e6484aa..257fee868 100644 --- a/kv/remotedbserver/server.go +++ b/kv/remotedbserver/server.go @@ -26,7 +26,6 @@ import ( "github.com/ledgerwatch/erigon-lib/gointerfaces/remote" "github.com/ledgerwatch/erigon-lib/gointerfaces/types" "github.com/ledgerwatch/erigon-lib/kv" - "github.com/ledgerwatch/log/v3" "google.golang.org/protobuf/types/known/emptypb" ) @@ -46,7 +45,7 @@ type KvServer struct { remote.UnimplementedKVServer // must be embedded to have forward compatible implementations. kv kv.RwDB - stateChangeStreams *StateChangeStreams + stateChangeStreams *StateChangePubSub ctx context.Context } @@ -260,94 +259,80 @@ func bytesCopy(b []byte) []byte { } func (s *KvServer) StateChanges(req *remote.StateChangeRequest, server remote.KV_StateChangesServer) error { - clean := s.stateChangeStreams.Add(server) - defer clean() - select { - case <-s.ctx.Done(): - return nil - case <-server.Context().Done(): - return nil + ch, remove := s.stateChangeStreams.Sub() + defer remove() + for { + select { + case reply := <-ch: + if err := server.Send(reply); err != nil { + return err + } + case <-s.ctx.Done(): + return nil + case <-server.Context().Done(): + return nil + } } } func (s *KvServer) SendStateChanges(ctx context.Context, sc *remote.StateChangeBatch) { - if err := s.stateChangeStreams.Broadcast(ctx, sc); err != nil { - log.Warn("Sending new peer notice to core P2P failed", "error", err) - } + s.stateChangeStreams.Pub(sc) } -type StateChangeStreams struct { - mu sync.RWMutex - id uint - streams map[uint]remote.KV_StateChangesServer +type StateChangePubSub struct { + mu sync.RWMutex + id uint + chans map[uint]chan *remote.StateChangeBatch } -func newStateChangeStreams() *StateChangeStreams { - return &StateChangeStreams{} +func newStateChangeStreams() *StateChangePubSub { + return &StateChangePubSub{} } -func (s *StateChangeStreams) Add(stream remote.KV_StateChangesServer) (remove func()) { +func (s *StateChangePubSub) Sub() (ch chan *remote.StateChangeBatch, remove func()) { s.mu.Lock() defer s.mu.Unlock() - if s.streams == nil { - s.streams = make(map[uint]remote.KV_StateChangesServer) + if s.chans == nil { + s.chans = make(map[uint]chan *remote.StateChangeBatch) } s.id++ id := s.id - s.streams[id] = stream - return func() { s.remove(id) } + ch = make(chan *remote.StateChangeBatch, 8) + s.chans[id] = ch + return ch, func() { s.remove(id) } } -func (s *StateChangeStreams) doBroadcast(ctx context.Context, reply *remote.StateChangeBatch) (ids []uint, errs []error) { +func (s *StateChangePubSub) Pub(reply *remote.StateChangeBatch) { s.mu.RLock() defer s.mu.RUnlock() -Loop: - for id, stream := range s.streams { + for _, ch := range s.chans { select { - case <-ctx.Done(): - errs = append(errs, ctx.Err()) - break Loop - default: - } - err := stream.Send(reply) - if err != nil { - select { - case <-stream.Context().Done(): - ids = append(ids, id) - default: + case ch <- reply: + default: //if channel is full (slow consumer), drop old messages + for i := 0; i < cap(ch)/2; i++ { + select { + case <-ch: + default: + } } - errs = append(errs, err) + ch <- reply } } - return } -func (s *StateChangeStreams) Broadcast(ctx context.Context, reply *remote.StateChangeBatch) (errs []error) { - var ids []uint - ids, errs = s.doBroadcast(ctx, reply) - if len(ids) == 0 { - return errs - } - s.mu.Lock() - defer s.mu.Unlock() - for _, id := range ids { - delete(s.streams, id) - } - return errs -} - -func (s *StateChangeStreams) Len() int { +func (s *StateChangePubSub) Len() int { s.mu.RLock() defer s.mu.RUnlock() - return len(s.streams) + return len(s.chans) } -func (s *StateChangeStreams) remove(id uint) { +func (s *StateChangePubSub) remove(id uint) { s.mu.Lock() defer s.mu.Unlock() - _, ok := s.streams[id] + ch, ok := s.chans[id] if !ok { // double-unsubscribe support return } - delete(s.streams, id) + close(ch) + delete(s.chans, id) }