mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2025-01-08 20:11:21 +00:00
98c57e75c0
eventsource is required for the validator api. this implements the eventsource sink/server handler the implementation is based off of this document: https://html.spec.whatwg.org/multipage/server-sent-events.html note that this is a building block for the full eventsource server. there still needs to be work done prysm has their own custom solution based off of protobuf/grpc: https://hackmd.io/@prysmaticlabs/eventstream-api using that would be not good existing eventsource implementations for golang are not good for our situation. options are: 1. https://github.com/r3labs/sse - has most stars - this is the best contender, since it uses []byte and not string, but it allocates and copies extra times in the server (because of use of fprintf) and makes an incorrect assumption about Last-Event-ID needing to be a number (i can't find this in the specification). 2. https://github.com/antage/eventsource -requires full buffers, copies many times, does not provide abstraction for headers. relatively unmaintained 3. https://github.com/donovanhide/eventsource - missing functionality around sending ids, requires full buffers, etc 4. https://github.com/bernerdschaefer/eventsource - 10 years old, unmaintained. additionally, implemetations other than r3labs/sse are very incorrect because they do not split up the data field correctly when newlines are sent. (parsers by specification will fail to encode messages sent by most of these implementations that have newlines, as i understand it). the implementation by r3labs/sse is also incorrect because it does not respect \r finally, all these implementations have very heavy implementation of the server, which we do not need since we will use fixed sequence ids. r3labs/sse for instance hijacks the entire handler and ties that to the server, losing a lot of flexiblity in how we implement our server for the beacon api, we need to stream: ```head, block, attestation, voluntary_exit, bls_to_execution_change, finalized_checkpoint, chain_reorg, contribution_and_proof, light_client_finality_update, light_client_optimistic_update, payload_attributes``` some of these are rather big json payloads, and the ability to simultaneously stream them from io.Readers instead of making a full copy of the payload every time we wish to rebroadcast it will save a lot of heap size for both resource constrained environments and serving at scale. the protocol itself is relatively simple, there are just a few gotchas
171 lines
3.7 KiB
Go
171 lines
3.7 KiB
Go
package sse
|
|
|
|
import (
|
|
"io"
|
|
"unicode/utf8"
|
|
//"github.com/segmentio/asm/utf8" -- can switch to this library in the future if needed
|
|
)
|
|
|
|
type Option func(*Options)
|
|
|
|
func OptionValidateUtf8(enable bool) Option {
|
|
return func(o *Options) {
|
|
o.validateUTF8 = true
|
|
}
|
|
}
|
|
|
|
type Options struct {
|
|
validateUTF8 bool
|
|
}
|
|
|
|
func (e *Options) ValidateUTF8() bool {
|
|
return e.validateUTF8
|
|
}
|
|
|
|
type writeState struct {
|
|
inMessage bool
|
|
trailingCarriage bool
|
|
}
|
|
|
|
// writer is not thread safe. it is meant for internal usage
|
|
type Writer struct {
|
|
raw io.Writer
|
|
|
|
es writeState
|
|
|
|
w io.Writer
|
|
|
|
o Options
|
|
}
|
|
|
|
func NewWriter(w io.Writer, opts ...Option) *Writer {
|
|
o := &Options{}
|
|
for _, v := range opts {
|
|
v(o)
|
|
}
|
|
return &Writer{
|
|
raw: w,
|
|
w: w,
|
|
o: *o,
|
|
}
|
|
}
|
|
|
|
func (e *Writer) writeByte(x byte) error {
|
|
_, err := e.w.Write([]byte{x})
|
|
return err
|
|
}
|
|
func (e *Writer) writeString(s string) (int, error) {
|
|
return e.w.Write([]byte(s))
|
|
}
|
|
|
|
func (e *Writer) Flush() error {
|
|
if e.es.inMessage {
|
|
// we are in a message, so write a newline to terminate it, as the user did not
|
|
err := e.writeByte('\n')
|
|
if err != nil {
|
|
return err
|
|
}
|
|
e.es.inMessage = false
|
|
}
|
|
// and reset the trailingCarriage state as well
|
|
e.es.trailingCarriage = false
|
|
return nil
|
|
}
|
|
|
|
// next should be called at the end of an event. it will call Flush and then write a newline
|
|
func (e *Writer) Next() error {
|
|
|
|
if err := e.Flush(); err != nil {
|
|
return err
|
|
}
|
|
// we write a newline, indicating now that this is a new event
|
|
if err := e.writeByte('\n'); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Event will start writing an event with the name topic to the stream
|
|
func (e *Writer) Header(name string, topic string) error {
|
|
if topic == "" {
|
|
return nil
|
|
}
|
|
if e.o.ValidateUTF8() {
|
|
if !utf8.ValidString(topic) {
|
|
return ErrInvalidUTF8Bytes
|
|
}
|
|
}
|
|
if len(topic) > 0 {
|
|
if _, err := e.writeString(name + ": "); err != nil {
|
|
return err
|
|
}
|
|
// write the supplied topic
|
|
if _, err := e.writeString(topic); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
if err := e.writeByte('\n'); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// a convenient wrapper for writing data from io.Reader so that one can easily replay events.
|
|
func (e *Writer) WriteData(r io.Reader) (err error) {
|
|
if _, err = io.Copy(e, r); err != nil {
|
|
return err
|
|
}
|
|
return
|
|
}
|
|
|
|
// Write underlying write method for piping data. be careful using this!
|
|
func (e *Writer) Write(xs []byte) (n int, err error) {
|
|
if e.o.ValidateUTF8() && !utf8.Valid(xs) {
|
|
return 0, ErrInvalidUTF8Bytes
|
|
}
|
|
for _, x := range xs {
|
|
// now, see if there was a trailing carriage left over from the last write
|
|
// only check and write the data if we are do not have a trailing carriage
|
|
if !e.es.trailingCarriage {
|
|
e.checkMessage()
|
|
}
|
|
if e.es.trailingCarriage {
|
|
// if there is, see if the character is a newline
|
|
if x != '\n' {
|
|
// its not a newline, so the trailing carriage was a valid end of message. write a new data field
|
|
e.es.inMessage = false
|
|
e.checkMessage()
|
|
}
|
|
// in the case that the character is a newline
|
|
// we will just write the newline and inMessage=false will be set in the case below
|
|
|
|
// in both cases, the trailing carriage is dealt with
|
|
e.es.trailingCarriage = false
|
|
}
|
|
// write the byte no matter what
|
|
err = e.writeByte(x)
|
|
if err != nil {
|
|
return
|
|
}
|
|
// if success, note that we wrote another byte
|
|
n++
|
|
if x == '\n' {
|
|
// end message if it's a newline always
|
|
e.es.inMessage = false
|
|
} else if x == '\r' {
|
|
// if x is a carriage return, mark it as trailing carriage
|
|
e.es.trailingCarriage = true
|
|
e.es.inMessage = false
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func (e *Writer) checkMessage() {
|
|
if !e.es.inMessage {
|
|
e.es.inMessage = true
|
|
e.writeString("data: ")
|
|
}
|
|
}
|