[caplin] sse handler placeholder (#9082)

This commit is contained in:
a 2023-12-30 08:49:45 -06:00 committed by GitHub
parent b562eff482
commit 510d62ab8a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 39 additions and 428 deletions

View File

@ -0,0 +1,23 @@
package validatorapi
import (
"net/http"
"github.com/gfx-labs/sse"
)
func (v *ValidatorApiHandler) GetEthV1Events(w http.ResponseWriter, r *http.Request) {
sink, err := sse.DefaultUpgrader.Upgrade(w, r)
if err != nil {
// OK to ignore this error.
return
}
topics := r.URL.Query()["topics"]
for _, topic := range topics {
sink.Encode(&sse.Event{
Event: []byte(topic),
Data: nil,
})
// OK to ignore this error. maybe should log it later
}
}

View File

@ -48,7 +48,7 @@ func (v *ValidatorApiHandler) Route(r chi.Router) {
r.Get("/node/syncing", beaconhttp.HandleEndpointFunc(v.GetEthV1NodeSyncing))
})
r.Get("/config/spec", beaconhttp.HandleEndpointFunc(v.GetEthV1ConfigSpec))
r.Get("/events", http.NotFound)
r.Get("/events", v.GetEthV1Events)
r.Route("/validator", func(r chi.Router) {
r.Route("/duties", func(r chi.Router) {
r.Post("/attester/{epoch}", http.NotFound)

View File

@ -1,8 +0,0 @@
## sse
sse implement server side events also known as eventsource
see the specification here: https://html.spec.whatwg.org/multipage/server-sent-events.html

View File

@ -1,40 +0,0 @@
package sse
import (
"bufio"
"net/http"
"strings"
)
// EventSink tracks a event source connection between a client and a server
type EventSink struct {
wr http.ResponseWriter
r *http.Request
bw *bufio.Writer
enc *Encoder
LastEventId string
}
func Upgrade(wr http.ResponseWriter, r *http.Request) (*EventSink, error) {
if !strings.EqualFold(r.Header.Get("Content-Type"), "text/event-stream") {
return nil, ErrInvalidContentType
}
o := &EventSink{
wr: wr,
r: r,
bw: bufio.NewWriter(wr),
}
o.LastEventId = r.Header.Get("Last-Event-ID")
wr.Header().Add("Content-Type", "text/event-stream")
o.enc = NewEncoder(o.bw)
return o, nil
}
func (e *EventSink) Encode(p *Packet) error {
err := e.enc.Encode(p)
if err != nil {
return err
}
return e.bw.Flush()
}

View File

@ -1,82 +0,0 @@
package sse
import "io"
// Packet represents an event to send
// the order in this struct is the order that they will be sent.
type Packet struct {
// as a special case, an empty value of event will not write an event header
Event string
// additional headers to be added.
// using the reserved headers event, header, data, id is undefined behavior
// note that this is the canonical way to send the "retry" header
Header map[string]string
// the io.Reader to source the data from
Data io.Reader
// whether or not to send an id, and if so, what id to send
// a nil id means to not send an id.
// empty string means to simply send the string "id\n"
// otherwise, the id is sent as is
// id is always sent at the end of the packet
ID *string
}
func ID(x string) *string {
return &x
}
// Encoder works at a higher level than the encoder.
// it works on the packet level.
type Encoder struct {
wr *Writer
firstWriteDone bool
}
func NewEncoder(w io.Writer) *Encoder {
wr := NewWriter(w)
return &Encoder{
wr: wr,
}
}
func (e *Encoder) Encode(p *Packet) error {
if e.firstWriteDone {
err := e.wr.Next()
if err != nil {
return err
}
}
e.firstWriteDone = true
if len(p.Event) > 0 {
if err := e.wr.Header("event", p.Event); err != nil {
return err
}
}
if p.Header != nil {
for k, v := range p.Header {
if err := e.wr.Header(k, v); err != nil {
return err
}
}
}
if p.Data != nil {
if err := e.wr.WriteData(p.Data); err != nil {
return err
}
}
err := e.wr.Flush()
if err != nil {
return err
}
if p.ID != nil {
if err := e.wr.Header("id", *p.ID); err != nil {
return err
}
}
return nil
}

View File

@ -1,40 +0,0 @@
package sse
import (
"bytes"
"strings"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestEncoderSimple(t *testing.T) {
type testCase struct {
xs []*Packet
w string
}
cases := []testCase{{
[]*Packet{
{Event: "hello", Data: strings.NewReader("some data")},
{Data: strings.NewReader("some other data with no event header")},
},
"event: hello\ndata: some data\n\ndata: some other data with no event header\n",
},
{
[]*Packet{
{Event: "hello", Data: strings.NewReader("some \n funky\r\n data\r")},
{Data: strings.NewReader("some other data with an id"), ID: ID("dogs")},
},
"event: hello\ndata: some \ndata: funky\r\ndata: data\r\ndata: some other data with an id\nid: dogs\n",
},
}
for _, v := range cases {
buf := &bytes.Buffer{}
enc := NewEncoder(buf)
for _, p := range v.xs {
require.NoError(t, enc.Encode(p))
}
assert.EqualValues(t, v.w, buf.String())
}
}

View File

@ -1,8 +0,0 @@
package sse
import "errors"
var (
ErrInvalidUTF8Bytes = errors.New("invalid utf8 bytes")
ErrInvalidContentType = errors.New("invalid content type")
)

View File

@ -1,170 +0,0 @@
package sse
import (
"io"
"unicode/utf8"
//"github.com/segmentio/asm/utf8" -- can switch to this library in the future if needed
)
type Option func(*Options)
func OptionValidateUtf8(enable bool) Option {
return func(o *Options) {
o.validateUTF8 = true
}
}
type Options struct {
validateUTF8 bool
}
func (e *Options) ValidateUTF8() bool {
return e.validateUTF8
}
type writeState struct {
inMessage bool
trailingCarriage bool
}
// writer is not thread safe. it is meant for internal usage
type Writer struct {
raw io.Writer
es writeState
w io.Writer
o Options
}
func NewWriter(w io.Writer, opts ...Option) *Writer {
o := &Options{}
for _, v := range opts {
v(o)
}
return &Writer{
raw: w,
w: w,
o: *o,
}
}
func (e *Writer) writeByte(x byte) error {
_, err := e.w.Write([]byte{x})
return err
}
func (e *Writer) writeString(s string) (int, error) {
return e.w.Write([]byte(s))
}
func (e *Writer) Flush() error {
if e.es.inMessage {
// we are in a message, so write a newline to terminate it, as the user did not
err := e.writeByte('\n')
if err != nil {
return err
}
e.es.inMessage = false
}
// and reset the trailingCarriage state as well
e.es.trailingCarriage = false
return nil
}
// next should be called at the end of an event. it will call Flush and then write a newline
func (e *Writer) Next() error {
if err := e.Flush(); err != nil {
return err
}
// we write a newline, indicating now that this is a new event
if err := e.writeByte('\n'); err != nil {
return err
}
return nil
}
// Event will start writing an event with the name topic to the stream
func (e *Writer) Header(name string, topic string) error {
if topic == "" {
return nil
}
if e.o.ValidateUTF8() {
if !utf8.ValidString(topic) {
return ErrInvalidUTF8Bytes
}
}
if len(topic) > 0 {
if _, err := e.writeString(name + ": "); err != nil {
return err
}
// write the supplied topic
if _, err := e.writeString(topic); err != nil {
return err
}
}
if err := e.writeByte('\n'); err != nil {
return err
}
return nil
}
// a convenient wrapper for writing data from io.Reader so that one can easily replay events.
func (e *Writer) WriteData(r io.Reader) (err error) {
if _, err = io.Copy(e, r); err != nil {
return err
}
return
}
// Write underlying write method for piping data. be careful using this!
func (e *Writer) Write(xs []byte) (n int, err error) {
if e.o.ValidateUTF8() && !utf8.Valid(xs) {
return 0, ErrInvalidUTF8Bytes
}
for _, x := range xs {
// now, see if there was a trailing carriage left over from the last write
// only check and write the data if we are do not have a trailing carriage
if !e.es.trailingCarriage {
e.checkMessage()
}
if e.es.trailingCarriage {
// if there is, see if the character is a newline
if x != '\n' {
// its not a newline, so the trailing carriage was a valid end of message. write a new data field
e.es.inMessage = false
e.checkMessage()
}
// in the case that the character is a newline
// we will just write the newline and inMessage=false will be set in the case below
// in both cases, the trailing carriage is dealt with
e.es.trailingCarriage = false
}
// write the byte no matter what
err = e.writeByte(x)
if err != nil {
return
}
// if success, note that we wrote another byte
n++
if x == '\n' {
// end message if it's a newline always
e.es.inMessage = false
} else if x == '\r' {
// if x is a carriage return, mark it as trailing carriage
e.es.trailingCarriage = true
e.es.inMessage = false
}
}
return
}
func (e *Writer) checkMessage() {
if !e.es.inMessage {
e.es.inMessage = true
e.writeString("data: ")
}
}

View File

@ -1,76 +0,0 @@
package sse
import (
"bytes"
"strings"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestEncoderWrite(t *testing.T) {
type testCase struct {
e string
i string
w string
}
cases := []testCase{{
"",
"foo bar\nbar foo\nwowwwwza\n",
`data: foo bar
data: bar foo
data: wowwwwza
`}, {
"hello",
"there\nfriend",
`event: hello
data: there
data: friend
`},
}
for _, v := range cases {
buf := &bytes.Buffer{}
enc := NewWriter(buf)
err := enc.Header("event", v.e)
require.NoError(t, err)
_, err = enc.Write([]byte(v.i))
require.NoError(t, err)
require.NoError(t, enc.Flush())
assert.EqualValues(t, buf.String(), v.w)
}
}
func TestEncoderWriteData(t *testing.T) {
type testCase struct {
e string
i string
w string
}
cases := []testCase{{
"",
"foo bar\nbar foo\nwowwwwza\n",
`data: foo bar
data: bar foo
data: wowwwwza
`}, {
"hello",
"there\nfriend",
`event: hello
data: there
data: friend
`},
}
for _, v := range cases {
buf := &bytes.Buffer{}
enc := NewWriter(buf)
err := enc.Header("event", v.e)
require.NoError(t, err)
err = enc.WriteData(strings.NewReader(v.i))
require.NoError(t, err)
require.NoError(t, enc.Flush())
assert.EqualValues(t, v.w, buf.String())
}
}

4
go.mod
View File

@ -37,6 +37,7 @@ require (
github.com/emicklei/dot v1.6.0
github.com/fjl/gencodec v0.0.0-20220412091415-8bb9e558978c
github.com/gballet/go-verkle v0.0.0-20221121182333-31427a1f2d35
github.com/gfx-labs/sse v0.0.0-20231226060816-f747e26a9baa
github.com/go-chi/chi/v5 v5.0.10
github.com/goccy/go-json v0.9.11
github.com/gofrs/flock v0.8.1
@ -251,7 +252,7 @@ require (
github.com/raulk/go-watchdog v1.3.0 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/rogpeppe/go-internal v1.10.0 // indirect
github.com/rogpeppe/go-internal v1.12.0 // indirect
github.com/rs/dnscache v0.0.0-20211102005908-e0241e321417 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/sirupsen/logrus v1.9.0 // indirect
@ -269,6 +270,7 @@ require (
golang.org/x/text v0.14.0 // indirect
golang.org/x/tools v0.16.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d // indirect
gopkg.in/cenkalti/backoff.v1 v1.1.0 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
lukechampine.com/blake3 v1.2.1 // indirect
lukechampine.com/uint128 v1.3.0 // indirect

14
go.sum
View File

@ -309,6 +309,8 @@ github.com/garslo/gogen v0.0.0-20170307003452-d6ebae628c7c h1:uYNKzPntb8c6DKvP9E
github.com/garslo/gogen v0.0.0-20170307003452-d6ebae628c7c/go.mod h1:Q0X6pkwTILDlzrGEckF6HKjXe48EgsY/l7K7vhY4MW8=
github.com/gballet/go-verkle v0.0.0-20221121182333-31427a1f2d35 h1:I8QswD9gf3VEpr7bpepKKOm7ChxFITIG+oc1I5/S0no=
github.com/gballet/go-verkle v0.0.0-20221121182333-31427a1f2d35/go.mod h1:DMDd04jjQgdynaAwbEgiRERIGpC8fDjx0+y06an7Psg=
github.com/gfx-labs/sse v0.0.0-20231226060816-f747e26a9baa h1:b6fBm4SLM8jywQHNmc3ZCl6zQEhEyZl6bp7is4en72M=
github.com/gfx-labs/sse v0.0.0-20231226060816-f747e26a9baa/go.mod h1:K0FMPjMrIaS1+/SeZeOVkGVjDVERZJW53inQL00FjLE=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/gliderlabs/ssh v0.1.1/go.mod h1:U7qILu1NlMHj9FlMhZLlkCdDnU1DBEAqr0aevW3Awn0=
github.com/glycerine/go-unsnap-stream v0.0.0-20180323001048-9f0cb55181dd/go.mod h1:/20jfyN9Y5QPEAprSgKAUr+glWDY39ZiUEAYOEv5dsE=
@ -789,8 +791,8 @@ github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/rs/cors v1.10.1 h1:L0uuZVXIKlI1SShY2nhFfo44TYvDPQ1w4oFkUJNfhyo=
github.com/rs/cors v1.10.1/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU=
github.com/rs/dnscache v0.0.0-20211102005908-e0241e321417 h1:Lt9DzQALzHoDwMBGJ6v8ObDPR0dzr2a6sXTB1Fq7IHs=
@ -952,6 +954,7 @@ golang.org/x/crypto v0.0.0-20220427172511-eb4f295cb31f/go.mod h1:IxCIyHEi3zRg3s0
golang.org/x/crypto v0.0.0-20220516162934-403b01795ae8/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.8.0/go.mod h1:mRqEX+O9/h5TFCrQhkgjo2yKi0yYA+9ecGkdQoHrywE=
golang.org/x/crypto v0.16.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k=
golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
@ -993,6 +996,7 @@ golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.9.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.14.0 h1:dGoOF9QVLYng8IHTm7BAyWqCqSheQ5pYWGhzW00YJr0=
golang.org/x/mod v0.14.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@ -1048,6 +1052,7 @@ golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c=
golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
@ -1147,6 +1152,7 @@ golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
@ -1154,6 +1160,8 @@ golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9sn
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/term v0.7.0/go.mod h1:P32HKFT3hSsZrRxla30E9HqToFYAQPCMs/zFMBUFqPY=
golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo=
golang.org/x/term v0.15.0/go.mod h1:BDl952bC7+uMoWR75FIrCDx79TPU9oHkTZ9yRbYOrX0=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@ -1349,6 +1357,8 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/cenkalti/backoff.v1 v1.1.0 h1:Arh75ttbsvlpVA7WtVpH4u9h6Zl46xuptxqLxPiSo4Y=
gopkg.in/cenkalti/backoff.v1 v1.1.0/go.mod h1:J6Vskwqd+OMVJl8C33mmtxTBs2gyzfv7UDAkHu8BrjI=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=