prysm-pulse/validator/rpc/handlers_health.go
james-prysm c0fb16a96f
HTTP validator API: health endpoints (#13149)
* updating health endpoints

* updating tests

* updating tests

* moving where the header is written and adding allow origin header

* removing header

* Update validator/rpc/handlers_health.go

Co-authored-by: Radosław Kapka <rkapka@wp.pl>

* Update validator/rpc/handlers_health.go

Co-authored-by: Radosław Kapka <rkapka@wp.pl>

* Update validator/rpc/handlers_health.go

Co-authored-by: Radosław Kapka <rkapka@wp.pl>

* radek's comments

* Update handlers_health.go

Co-authored-by: Radosław Kapka <rkapka@wp.pl>

* adding the correct errors to handle error

---------

Co-authored-by: Radosław Kapka <rkapka@wp.pl>
2023-11-02 15:51:21 +00:00

164 lines
4.6 KiB
Go

package rpc
import (
"encoding/json"
"fmt"
"net/http"
http2 "github.com/prysmaticlabs/prysm/v4/network/http"
pb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v4/runtime/version"
"go.opencensus.io/trace"
"google.golang.org/protobuf/types/known/emptypb"
)
// GetVersion returns the beacon node and validator client versions
func (s *Server) GetVersion(w http.ResponseWriter, r *http.Request) {
ctx, span := trace.StartSpan(r.Context(), "validator.web.health.GetVersion")
defer span.End()
beacon, err := s.beaconNodeClient.GetVersion(ctx, &emptypb.Empty{})
if err != nil {
http2.HandleError(w, err.Error(), http.StatusInternalServerError)
return
}
http2.WriteJson(w, struct {
Beacon string `json:"beacon"`
Validator string `json:"validator"`
}{
Beacon: beacon.Version,
Validator: version.Version(),
})
}
// StreamBeaconLogs from the beacon node via server-side events.
func (s *Server) StreamBeaconLogs(w http.ResponseWriter, r *http.Request) {
// Wrap service context with a cancel in order to propagate the exiting of
// this method properly to the beacon node server.
ctx, span := trace.StartSpan(r.Context(), "validator.web.health.StreamBeaconLogs")
defer span.End()
// Set up SSE response headers
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
// Flush helper function to ensure data is sent to client
flusher, ok := w.(http.Flusher)
if !ok {
http2.HandleError(w, "Streaming unsupported!", http.StatusInternalServerError)
return
}
// TODO: StreamBeaconLogs grpc will need to be replaced in the future
client, err := s.beaconNodeHealthClient.StreamBeaconLogs(ctx, &emptypb.Empty{})
if err != nil {
http2.HandleError(w, err.Error(), http.StatusInternalServerError)
return
}
for {
select {
case <-s.ctx.Done():
return
case <-ctx.Done():
return
case <-client.Context().Done():
return
default:
logResp, err := client.Recv()
if err != nil {
http2.HandleError(w, "could not receive beacon logs from stream: "+err.Error(), http.StatusInternalServerError)
return
}
jsonResp, err := json.Marshal(logResp)
if err != nil {
http2.HandleError(w, "could not encode log response into JSON: "+err.Error(), http.StatusInternalServerError)
return
}
// Send the response as an SSE event
// Assuming resp has a String() method for simplicity
_, err = fmt.Fprintf(w, "%s\n", jsonResp)
if err != nil {
http2.HandleError(w, err.Error(), http.StatusInternalServerError)
return
}
// Flush the data to the client immediately
flusher.Flush()
}
}
}
// StreamValidatorLogs from the validator client via server-side events.
func (s *Server) StreamValidatorLogs(w http.ResponseWriter, r *http.Request) {
ctx, span := trace.StartSpan(r.Context(), "validator.web.health.StreamValidatorLogs")
defer span.End()
// Ensure that the writer supports flushing.
flusher, ok := w.(http.Flusher)
if !ok {
http2.HandleError(w, "Streaming unsupported!", http.StatusInternalServerError)
return
}
ch := make(chan []byte, s.streamLogsBufferSize)
sub := s.logsStreamer.LogsFeed().Subscribe(ch)
defer func() {
sub.Unsubscribe()
close(ch)
}()
// Set up SSE response headers
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
recentLogs := s.logsStreamer.GetLastFewLogs()
logStrings := make([]string, len(recentLogs))
for i, l := range recentLogs {
logStrings[i] = string(l)
}
ls := &pb.LogsResponse{
Logs: logStrings,
}
jsonLogs, err := json.Marshal(ls)
if err != nil {
http2.HandleError(w, "Failed to marshal logs: "+err.Error(), http.StatusInternalServerError)
return
}
_, err = fmt.Fprintf(w, "%s\n", jsonLogs)
if err != nil {
http2.HandleError(w, "Error sending data: "+err.Error(), http.StatusInternalServerError)
return
}
flusher.Flush()
for {
select {
case log := <-ch:
// Set up SSE response headers
ls = &pb.LogsResponse{
Logs: []string{string(log)},
}
jsonLogs, err = json.Marshal(ls)
if err != nil {
http2.HandleError(w, "Failed to marshal logs: "+err.Error(), http.StatusInternalServerError)
return
}
_, err = fmt.Fprintf(w, "%s\n", jsonLogs)
if err != nil {
http2.HandleError(w, "Error sending data: "+err.Error(), http.StatusInternalServerError)
return
}
flusher.Flush()
case <-s.ctx.Done():
return
case err := <-sub.Err():
http2.HandleError(w, "Subscriber error: "+err.Error(), http.StatusInternalServerError)
return
case <-ctx.Done():
return
}
}
}