prysm-pulse/shared/logutil/stream.go
Raul Jordan 72dc43989f
Stream Validator and Beacon Logs via gRPC Streams (#8150)
* implement validator logs stream

* fix test

* tidy

* proto regen

* add logs stream to the beacon node

* beacon logs working

* impl

* pass test

* gaz

* rem lock

* fix space
2020-12-18 18:03:24 +00:00

73 lines
1.7 KiB
Go

package logutil
import (
"io"
lru "github.com/hashicorp/golang-lru"
"github.com/prysmaticlabs/prysm/shared/event"
"github.com/prysmaticlabs/prysm/shared/rand"
)
const (
// The number of log entries to keep in memory.
logCacheSize = 20
)
var (
// Compile time interface checks.
_ = io.Writer(&StreamServer{})
_ = Streamer(&StreamServer{})
)
// Streamer defines a struct which can retrieve and stream process logs.
type Streamer interface {
GetLastFewLogs() [][]byte
LogsFeed() *event.Feed
}
// StreamServer defines a a websocket server which can receive events from
// a feed and write them to open websocket connections.
type StreamServer struct {
feed *event.Feed
cache *lru.Cache
}
// NewStreamServer initializes a new stream server capable of
// streaming log events.
func NewStreamServer() *StreamServer {
c, err := lru.New(logCacheSize)
if err != nil {
panic(err) // This can only occur when the LogCacheSize is negative.
}
ss := &StreamServer{
feed: new(event.Feed),
cache: c,
}
addLogWriter(ss)
return ss
}
// GetLastFewLogs returns the last few entries of logs stored in an LRU cache.
func (ss *StreamServer) GetLastFewLogs() [][]byte {
messages := make([][]byte, 0)
for _, k := range ss.cache.Keys() {
d, ok := ss.cache.Get(k)
if ok {
messages = append(messages, d.([]byte))
}
}
return messages
}
// LogsFeed returns a feed callers can subscribe to to receive logs via a channel.
func (ss *StreamServer) LogsFeed() *event.Feed {
return ss.feed
}
// Write a binary message and send over the event feed.
func (ss *StreamServer) Write(p []byte) (n int, err error) {
ss.feed.Send(p)
ss.cache.Add(rand.NewGenerator().Uint64(), p)
return len(p), nil
}