mirror of
https://gitlab.com/pulsechaincom/prysm-pulse.git
synced 2025-01-19 00:04:12 +00:00
96401e734e
* Implement `bls_to_execution_change` API event (cherry picked from commit 1c8a2c580b3664e1238380138dc65c30e238cb11) (cherry picked from commit aabcaac6191cb25048be301ab62e8b391f561d6f) * missing event code (cherry picked from commit daa4fd2b72aeaff16196faa46dca6efd289d9934) * Update beacon-chain/core/feed/operation/events.go Co-authored-by: Nishant Das <nishdas93@gmail.com>
483 lines
15 KiB
Go
483 lines
15 KiB
Go
package apimiddleware
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/base64"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"regexp"
|
|
"strconv"
|
|
"strings"
|
|
|
|
"github.com/prysmaticlabs/prysm/v3/api/gateway/apimiddleware"
|
|
"github.com/prysmaticlabs/prysm/v3/api/grpc"
|
|
"github.com/prysmaticlabs/prysm/v3/beacon-chain/rpc/eth/events"
|
|
"github.com/r3labs/sse"
|
|
)
|
|
|
|
const (
|
|
versionHeader = "Eth-Consensus-Version"
|
|
grpcVersionHeader = "Grpc-metadata-Eth-Consensus-Version"
|
|
jsonMediaType = "application/json"
|
|
octetStreamMediaType = "application/octet-stream"
|
|
)
|
|
|
|
// match a number with optional decimals
|
|
var priorityRegex = regexp.MustCompile(`q=(\d+(?:\.\d+)?)`)
|
|
|
|
type sszConfig struct {
|
|
fileName string
|
|
responseJson SszResponse
|
|
}
|
|
|
|
func handleGetBeaconStateSSZ(m *apimiddleware.ApiProxyMiddleware, endpoint apimiddleware.Endpoint, w http.ResponseWriter, req *http.Request) (handled bool) {
|
|
config := sszConfig{
|
|
fileName: "beacon_state.ssz",
|
|
responseJson: &SszResponseJson{},
|
|
}
|
|
return handleGetSSZ(m, endpoint, w, req, config)
|
|
}
|
|
|
|
func handleGetBeaconBlockSSZ(m *apimiddleware.ApiProxyMiddleware, endpoint apimiddleware.Endpoint, w http.ResponseWriter, req *http.Request) (handled bool) {
|
|
config := sszConfig{
|
|
fileName: "beacon_block.ssz",
|
|
responseJson: &SszResponseJson{},
|
|
}
|
|
return handleGetSSZ(m, endpoint, w, req, config)
|
|
}
|
|
|
|
func handleGetBeaconStateSSZV2(m *apimiddleware.ApiProxyMiddleware, endpoint apimiddleware.Endpoint, w http.ResponseWriter, req *http.Request) (handled bool) {
|
|
config := sszConfig{
|
|
fileName: "beacon_state.ssz",
|
|
responseJson: &VersionedSSZResponseJson{},
|
|
}
|
|
return handleGetSSZ(m, endpoint, w, req, config)
|
|
}
|
|
|
|
func handleGetBeaconBlockSSZV2(m *apimiddleware.ApiProxyMiddleware, endpoint apimiddleware.Endpoint, w http.ResponseWriter, req *http.Request) (handled bool) {
|
|
config := sszConfig{
|
|
fileName: "beacon_block.ssz",
|
|
responseJson: &VersionedSSZResponseJson{},
|
|
}
|
|
return handleGetSSZ(m, endpoint, w, req, config)
|
|
}
|
|
|
|
func handleGetBlindedBeaconBlockSSZ(
|
|
m *apimiddleware.ApiProxyMiddleware,
|
|
endpoint apimiddleware.Endpoint,
|
|
w http.ResponseWriter,
|
|
req *http.Request,
|
|
) (handled bool) {
|
|
config := sszConfig{
|
|
fileName: "beacon_block.ssz",
|
|
responseJson: &VersionedSSZResponseJson{},
|
|
}
|
|
return handleGetSSZ(m, endpoint, w, req, config)
|
|
}
|
|
|
|
func handleSubmitBlockSSZ(m *apimiddleware.ApiProxyMiddleware, endpoint apimiddleware.Endpoint, w http.ResponseWriter, req *http.Request) (handled bool) {
|
|
return handlePostSSZ(m, endpoint, w, req)
|
|
}
|
|
|
|
func handleSubmitBlindedBlockSSZ(
|
|
m *apimiddleware.ApiProxyMiddleware,
|
|
endpoint apimiddleware.Endpoint,
|
|
w http.ResponseWriter,
|
|
req *http.Request,
|
|
) (handled bool) {
|
|
return handlePostSSZ(m, endpoint, w, req)
|
|
}
|
|
|
|
func handleProduceBlockSSZ(m *apimiddleware.ApiProxyMiddleware, endpoint apimiddleware.Endpoint, w http.ResponseWriter, req *http.Request) (handled bool) {
|
|
config := sszConfig{
|
|
fileName: "produce_beacon_block.ssz",
|
|
responseJson: &VersionedSSZResponseJson{},
|
|
}
|
|
return handleGetSSZ(m, endpoint, w, req, config)
|
|
}
|
|
|
|
func handleProduceBlindedBlockSSZ(
|
|
m *apimiddleware.ApiProxyMiddleware,
|
|
endpoint apimiddleware.Endpoint,
|
|
w http.ResponseWriter,
|
|
req *http.Request,
|
|
) (handled bool) {
|
|
config := sszConfig{
|
|
fileName: "produce_blinded_beacon_block.ssz",
|
|
responseJson: &VersionedSSZResponseJson{},
|
|
}
|
|
return handleGetSSZ(m, endpoint, w, req, config)
|
|
}
|
|
|
|
func handleGetSSZ(
|
|
m *apimiddleware.ApiProxyMiddleware,
|
|
endpoint apimiddleware.Endpoint,
|
|
w http.ResponseWriter,
|
|
req *http.Request,
|
|
config sszConfig,
|
|
) (handled bool) {
|
|
ssz, err := sszRequested(req)
|
|
if err != nil {
|
|
apimiddleware.WriteError(w, apimiddleware.InternalServerError(err), nil)
|
|
return true
|
|
}
|
|
if !ssz {
|
|
return false
|
|
}
|
|
|
|
if errJson := prepareSSZRequestForProxying(m, endpoint, req); errJson != nil {
|
|
apimiddleware.WriteError(w, errJson, nil)
|
|
return true
|
|
}
|
|
grpcResponse, errJson := m.ProxyRequest(req)
|
|
if errJson != nil {
|
|
apimiddleware.WriteError(w, errJson, nil)
|
|
return true
|
|
}
|
|
grpcResponseBody, errJson := apimiddleware.ReadGrpcResponseBody(grpcResponse.Body)
|
|
if errJson != nil {
|
|
apimiddleware.WriteError(w, errJson, nil)
|
|
return true
|
|
}
|
|
respHasError, errJson := apimiddleware.HandleGrpcResponseError(endpoint.Err, grpcResponse, grpcResponseBody, w)
|
|
if errJson != nil {
|
|
apimiddleware.WriteError(w, errJson, nil)
|
|
return true
|
|
}
|
|
if respHasError {
|
|
return true
|
|
}
|
|
if errJson := apimiddleware.DeserializeGrpcResponseBodyIntoContainer(grpcResponseBody, config.responseJson); errJson != nil {
|
|
apimiddleware.WriteError(w, errJson, nil)
|
|
return true
|
|
}
|
|
respVersion, responseSsz, errJson := serializeMiddlewareResponseIntoSSZ(config.responseJson)
|
|
if errJson != nil {
|
|
apimiddleware.WriteError(w, errJson, nil)
|
|
return true
|
|
}
|
|
if errJson := writeSSZResponseHeaderAndBody(grpcResponse, w, responseSsz, respVersion, config.fileName); errJson != nil {
|
|
apimiddleware.WriteError(w, errJson, nil)
|
|
return true
|
|
}
|
|
if errJson := apimiddleware.Cleanup(grpcResponse.Body); errJson != nil {
|
|
apimiddleware.WriteError(w, errJson, nil)
|
|
return true
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
func handlePostSSZ(m *apimiddleware.ApiProxyMiddleware, endpoint apimiddleware.Endpoint, w http.ResponseWriter, req *http.Request) (handled bool) {
|
|
if !sszPosted(req) {
|
|
return false
|
|
}
|
|
|
|
if errJson := prepareSSZRequestForProxying(m, endpoint, req); errJson != nil {
|
|
apimiddleware.WriteError(w, errJson, nil)
|
|
return true
|
|
}
|
|
prepareCustomHeaders(req)
|
|
if errJson := preparePostedSSZData(req); errJson != nil {
|
|
apimiddleware.WriteError(w, errJson, nil)
|
|
return true
|
|
}
|
|
|
|
grpcResponse, errJson := m.ProxyRequest(req)
|
|
if errJson != nil {
|
|
apimiddleware.WriteError(w, errJson, nil)
|
|
return true
|
|
}
|
|
grpcResponseBody, errJson := apimiddleware.ReadGrpcResponseBody(grpcResponse.Body)
|
|
if errJson != nil {
|
|
apimiddleware.WriteError(w, errJson, nil)
|
|
return true
|
|
}
|
|
respHasError, errJson := apimiddleware.HandleGrpcResponseError(endpoint.Err, grpcResponse, grpcResponseBody, w)
|
|
if errJson != nil {
|
|
apimiddleware.WriteError(w, errJson, nil)
|
|
return true
|
|
}
|
|
if respHasError {
|
|
return true
|
|
}
|
|
if errJson := apimiddleware.Cleanup(grpcResponse.Body); errJson != nil {
|
|
apimiddleware.WriteError(w, errJson, nil)
|
|
return true
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
func sszRequested(req *http.Request) (bool, error) {
|
|
accept := req.Header.Values("Accept")
|
|
if len(accept) == 0 {
|
|
return false, nil
|
|
}
|
|
types := strings.Split(accept[0], ",")
|
|
currentType, currentPriority := "", 0.0
|
|
for _, t := range types {
|
|
values := strings.Split(t, ";")
|
|
name := values[0]
|
|
if name != jsonMediaType && name != octetStreamMediaType {
|
|
continue
|
|
}
|
|
// no params specified
|
|
if len(values) == 1 {
|
|
priority := 1.0
|
|
if priority > currentPriority {
|
|
currentType, currentPriority = name, priority
|
|
}
|
|
continue
|
|
}
|
|
params := values[1]
|
|
match := priorityRegex.FindAllStringSubmatch(params, 1)
|
|
if len(match) != 1 {
|
|
continue
|
|
}
|
|
priority, err := strconv.ParseFloat(match[0][1], 32)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
if priority > currentPriority {
|
|
currentType, currentPriority = name, priority
|
|
}
|
|
}
|
|
|
|
return currentType == octetStreamMediaType, nil
|
|
}
|
|
|
|
func sszPosted(req *http.Request) bool {
|
|
ct, ok := req.Header["Content-Type"]
|
|
if !ok {
|
|
return false
|
|
}
|
|
if len(ct) != 1 {
|
|
return false
|
|
}
|
|
return ct[0] == octetStreamMediaType
|
|
}
|
|
|
|
func prepareSSZRequestForProxying(m *apimiddleware.ApiProxyMiddleware, endpoint apimiddleware.Endpoint, req *http.Request) apimiddleware.ErrorJson {
|
|
req.URL.Scheme = "http"
|
|
req.URL.Host = m.GatewayAddress
|
|
req.RequestURI = ""
|
|
if errJson := apimiddleware.HandleURLParameters(endpoint.Path, req, endpoint.RequestURLLiterals); errJson != nil {
|
|
return errJson
|
|
}
|
|
if errJson := apimiddleware.HandleQueryParameters(req, endpoint.RequestQueryParams); errJson != nil {
|
|
return errJson
|
|
}
|
|
// We have to add new segments after handling parameters because it changes URL segment indexing.
|
|
req.URL.Path = "/internal" + req.URL.Path + "/ssz"
|
|
return nil
|
|
}
|
|
|
|
func prepareCustomHeaders(req *http.Request) {
|
|
ver := req.Header.Get(versionHeader)
|
|
if ver != "" {
|
|
req.Header.Del(versionHeader)
|
|
req.Header.Add(grpcVersionHeader, ver)
|
|
}
|
|
}
|
|
|
|
func preparePostedSSZData(req *http.Request) apimiddleware.ErrorJson {
|
|
buf, err := io.ReadAll(req.Body)
|
|
if err != nil {
|
|
return apimiddleware.InternalServerErrorWithMessage(err, "could not read body")
|
|
}
|
|
j := SszRequestJson{Data: base64.StdEncoding.EncodeToString(buf)}
|
|
data, err := json.Marshal(j)
|
|
if err != nil {
|
|
return apimiddleware.InternalServerErrorWithMessage(err, "could not prepare POST data")
|
|
}
|
|
req.Body = io.NopCloser(bytes.NewBuffer(data))
|
|
req.ContentLength = int64(len(data))
|
|
req.Header.Set("Content-Type", jsonMediaType)
|
|
return nil
|
|
}
|
|
|
|
func serializeMiddlewareResponseIntoSSZ(respJson SszResponse) (version string, ssz []byte, errJson apimiddleware.ErrorJson) {
|
|
// Serialize the SSZ part of the deserialized value.
|
|
data, err := base64.StdEncoding.DecodeString(respJson.SSZData())
|
|
if err != nil {
|
|
return "", nil, apimiddleware.InternalServerErrorWithMessage(err, "could not decode response body into base64")
|
|
}
|
|
return strings.ToLower(respJson.SSZVersion()), data, nil
|
|
}
|
|
|
|
func writeSSZResponseHeaderAndBody(grpcResp *http.Response, w http.ResponseWriter, respSsz []byte, respVersion, fileName string) apimiddleware.ErrorJson {
|
|
var statusCodeHeader string
|
|
for h, vs := range grpcResp.Header {
|
|
// We don't want to expose any gRPC metadata in the HTTP response, so we skip forwarding metadata headers.
|
|
if strings.HasPrefix(h, "Grpc-Metadata") {
|
|
if h == "Grpc-Metadata-"+grpc.HttpCodeMetadataKey {
|
|
statusCodeHeader = vs[0]
|
|
}
|
|
} else {
|
|
for _, v := range vs {
|
|
w.Header().Set(h, v)
|
|
}
|
|
}
|
|
}
|
|
w.Header().Set("Content-Length", strconv.Itoa(len(respSsz)))
|
|
w.Header().Set("Content-Type", octetStreamMediaType)
|
|
w.Header().Set("Content-Disposition", "attachment; filename="+fileName)
|
|
w.Header().Set(versionHeader, respVersion)
|
|
if statusCodeHeader != "" {
|
|
code, err := strconv.Atoi(statusCodeHeader)
|
|
if err != nil {
|
|
return apimiddleware.InternalServerErrorWithMessage(err, "could not parse status code")
|
|
}
|
|
w.WriteHeader(code)
|
|
} else {
|
|
w.WriteHeader(grpcResp.StatusCode)
|
|
}
|
|
if _, err := io.Copy(w, io.NopCloser(bytes.NewReader(respSsz))); err != nil {
|
|
return apimiddleware.InternalServerErrorWithMessage(err, "could not write response message")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func handleEvents(m *apimiddleware.ApiProxyMiddleware, _ apimiddleware.Endpoint, w http.ResponseWriter, req *http.Request) (handled bool) {
|
|
sseClient := sse.NewClient("http://" + m.GatewayAddress + "/internal" + req.URL.RequestURI())
|
|
sseClient.Headers["Grpc-Timeout"] = "0S"
|
|
eventChan := make(chan *sse.Event)
|
|
|
|
// We use grpc-gateway as the server side of events, not the sse library.
|
|
// Because of this subscribing to streams doesn't work as intended, resulting in each event being handled by all subscriptions.
|
|
// To handle events properly, we subscribe just once using a placeholder value ('events') and handle all topics inside this subscription.
|
|
if err := sseClient.SubscribeChan("events", eventChan); err != nil {
|
|
apimiddleware.WriteError(w, apimiddleware.InternalServerError(err), nil)
|
|
sseClient.Unsubscribe(eventChan)
|
|
return
|
|
}
|
|
|
|
errJson := receiveEvents(eventChan, w, req)
|
|
if errJson != nil {
|
|
apimiddleware.WriteError(w, errJson, nil)
|
|
}
|
|
|
|
sseClient.Unsubscribe(eventChan)
|
|
return true
|
|
}
|
|
|
|
func receiveEvents(eventChan <-chan *sse.Event, w http.ResponseWriter, req *http.Request) apimiddleware.ErrorJson {
|
|
for {
|
|
select {
|
|
case msg := <-eventChan:
|
|
var data interface{}
|
|
|
|
// The message's event comes to us with trailing whitespace. Remove it here for
|
|
// ease of future processing.
|
|
msg.Event = bytes.TrimSpace(msg.Event)
|
|
|
|
switch string(msg.Event) {
|
|
case events.HeadTopic:
|
|
data = &EventHeadJson{}
|
|
case events.BlockTopic:
|
|
data = &ReceivedBlockDataJson{}
|
|
case events.AttestationTopic:
|
|
data = &AttestationJson{}
|
|
|
|
// Data received in the aggregated att event does not fit the expected event stream output.
|
|
// We extract the underlying attestation from event data
|
|
// and assign the attestation back to event data for further processing.
|
|
aggEventData := &AggregatedAttReceivedDataJson{}
|
|
if err := json.Unmarshal(msg.Data, aggEventData); err != nil {
|
|
return apimiddleware.InternalServerError(err)
|
|
}
|
|
var attData []byte
|
|
var err error
|
|
// If true, then we have an unaggregated attestation
|
|
if aggEventData.Aggregate == nil {
|
|
unaggEventData := &UnaggregatedAttReceivedDataJson{}
|
|
if err := json.Unmarshal(msg.Data, unaggEventData); err != nil {
|
|
return apimiddleware.InternalServerError(err)
|
|
}
|
|
attData, err = json.Marshal(unaggEventData)
|
|
if err != nil {
|
|
return apimiddleware.InternalServerError(err)
|
|
}
|
|
} else {
|
|
attData, err = json.Marshal(aggEventData.Aggregate)
|
|
if err != nil {
|
|
return apimiddleware.InternalServerError(err)
|
|
}
|
|
}
|
|
msg.Data = attData
|
|
case events.VoluntaryExitTopic:
|
|
data = &SignedVoluntaryExitJson{}
|
|
case events.FinalizedCheckpointTopic:
|
|
data = &EventFinalizedCheckpointJson{}
|
|
case events.ChainReorgTopic:
|
|
data = &EventChainReorgJson{}
|
|
case events.SyncCommitteeContributionTopic:
|
|
data = &SignedContributionAndProofJson{}
|
|
case events.BLSToExecutionChangeTopic:
|
|
data = &SignedBLSToExecutionChangeJson{}
|
|
case "error":
|
|
data = &EventErrorJson{}
|
|
default:
|
|
return &apimiddleware.DefaultErrorJson{
|
|
Message: fmt.Sprintf("Event type '%s' not supported", string(msg.Event)),
|
|
Code: http.StatusInternalServerError,
|
|
}
|
|
}
|
|
|
|
if errJson := writeEvent(msg, w, data); errJson != nil {
|
|
return errJson
|
|
}
|
|
if errJson := flushEvent(w); errJson != nil {
|
|
return errJson
|
|
}
|
|
case <-req.Context().Done():
|
|
return nil
|
|
}
|
|
}
|
|
}
|
|
|
|
func writeEvent(msg *sse.Event, w http.ResponseWriter, data interface{}) apimiddleware.ErrorJson {
|
|
if err := json.Unmarshal(msg.Data, data); err != nil {
|
|
return apimiddleware.InternalServerError(err)
|
|
}
|
|
if errJson := apimiddleware.ProcessMiddlewareResponseFields(data); errJson != nil {
|
|
return errJson
|
|
}
|
|
dataJson, errJson := apimiddleware.SerializeMiddlewareResponseIntoJson(data)
|
|
if errJson != nil {
|
|
return errJson
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "text/event-stream")
|
|
|
|
if _, err := w.Write([]byte("event: ")); err != nil {
|
|
return apimiddleware.InternalServerError(err)
|
|
}
|
|
if _, err := w.Write(msg.Event); err != nil {
|
|
return apimiddleware.InternalServerError(err)
|
|
}
|
|
if _, err := w.Write([]byte("\ndata: ")); err != nil {
|
|
return apimiddleware.InternalServerError(err)
|
|
}
|
|
if _, err := w.Write(dataJson); err != nil {
|
|
return apimiddleware.InternalServerError(err)
|
|
}
|
|
if _, err := w.Write([]byte("\n\n")); err != nil {
|
|
return apimiddleware.InternalServerError(err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func flushEvent(w http.ResponseWriter) apimiddleware.ErrorJson {
|
|
flusher, ok := w.(http.Flusher)
|
|
if !ok {
|
|
return &apimiddleware.DefaultErrorJson{Message: fmt.Sprintf("Flush not supported in %T", w), Code: http.StatusInternalServerError}
|
|
}
|
|
flusher.Flush()
|
|
return nil
|
|
}
|