erigon-pulse/cl/sentinel/httpreqresp/server.go
a 436493350e
Sentinel refactor (#8296)
1. changes sentinel to use an http-like interface

2. moves hexutil, crypto/blake2b, metrics packages to erigon-lib
2023-10-22 01:17:18 +02:00

123 lines
4.2 KiB
Go

// package httpreqresp encapsulates eth2 beacon chain resp-resp into http
package httpreqresp
import (
"io"
"net/http"
"net/http/httptest"
"strconv"
"time"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
)
const (
ResponseCodeHeader = "Reqresp-Response-Code"
PeerIdHeader = "Reqresp-Peer-Id"
TopicHeader = "Reqresp-Topic"
)
// Do performs an http request against the http handler.
// NOTE: this is actually very similar to the http.RoundTripper interface... maybe we should investigate using that.
/*
the following headers have meaning when passed in to the request:
REQRESP-PEER-ID - the peer id to target for the request
REQRESP-TOPIC - the topic to request with
REQRESP-EXPECTED-CHUNKS - this is an integer, which will be multiplied by 10 to calculate the amount of seconds the peer has to respond with all the data
*/
func Do(handler http.Handler, r *http.Request) (*http.Response, error) {
// TODO: there potentially extra alloc here (responses are bufferd)
// is that a big deal? not sure. maybe can reuse these buffers since they are read once (and known when close) if so
ans := make(chan *http.Response)
go func() {
res := httptest.NewRecorder()
handler.ServeHTTP(res, r)
// linter does not know we are passing the resposne through channel.
// nolint: bodyclose
resp := res.Result()
ans <- resp
}()
select {
case res := <-ans:
return res, nil
case <-r.Context().Done():
return nil, r.Context().Err()
}
}
// Handles a request
func NewRequestHandler(host host.Host) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
// get the peer parameters
peerIdBase58 := r.Header.Get("REQRESP-PEER-ID")
topic := r.Header.Get("REQRESP-TOPIC")
chunkCount := r.Header.Get("REQRESP-EXPECTED-CHUNKS")
chunks, _ := strconv.Atoi(chunkCount)
// some sanity checking on chunks
if chunks < 1 {
chunks = 1
}
// idk why this would happen, so lets make sure it doesnt. future-proofing from bad input
if chunks > 512 {
chunks = 512
}
// read the base58 encoded peer id to know which we are trying to dial
peerId, err := peer.Decode(peerIdBase58)
if err != nil {
http.Error(w, "Invalid Peer Id", http.StatusBadRequest)
return
}
// we can't connect to the peer - so we should disconnect them. send a code 4xx
stream, err := host.NewStream(r.Context(), peerId, protocol.ID(topic))
if err != nil {
http.Error(w, "Can't Connect to Peer: "+err.Error(), http.StatusBadRequest)
return
}
defer stream.Close()
// this write deadline is not part of the eth p2p spec, but we are implying it.
stream.SetWriteDeadline(time.Now().Add(5 * time.Second))
if r.Body != nil && r.ContentLength > 0 {
_, err := io.Copy(stream, r.Body)
if err != nil {
http.Error(w, "Processing Stream: "+err.Error(), http.StatusBadRequest)
return
}
}
err = stream.CloseWrite()
if err != nil {
http.Error(w, "Close Write Side: "+err.Error(), http.StatusBadRequest)
return
}
code := make([]byte, 1)
// we have 5 seconds to read the next byte. this is the 5 TTFB_TIMEOUT in the spec
stream.SetReadDeadline(time.Now().Add(5 * time.Second))
_, err = io.ReadFull(stream, code)
if err != nil {
http.Error(w, "Read Code: "+err.Error(), http.StatusBadRequest)
return
}
// this is not neccesary, but seems like the right thing to do
w.Header().Set("CONTENT-TYPE", "application/octet-stream")
w.Header().Set("CONTENT-ENCODING", "snappy/stream")
// add the response code & headers
w.Header().Set("REQRESP-RESPONSE-CODE", strconv.Itoa(int(code[0])))
w.Header().Set("REQRESP-PEER-ID", peerIdBase58)
w.Header().Set("REQRESP-TOPIC", topic)
// the deadline is 10 * expected chunk count, which the user can send. otherwise we will only wait 10 seconds
// this is technically incorrect, and more aggressive than the network might like.
stream.SetReadDeadline(time.Now().Add(10 * time.Second * time.Duration(chunks)))
// copy the data now to the stream
// the first write to w will call code 200, so we do not need to
_, err = io.Copy(w, stream)
if err != nil {
http.Error(w, "Reading Stream Response: "+err.Error(), http.StatusBadRequest)
return
}
return
}
}