mirror of
https://gitlab.com/pulsechaincom/prysm-pulse.git
synced 2024-12-21 11:10:36 +00:00
REST VC: Subscribe to Beacon API events (#13453)
* Revert "Revert "REST VC: Subscribe to Beacon API events (#13354)" (#13428)"
This reverts commit 8d092a1113
.
* change logic
* review
* test fix
* fix critical error
* merge flag check
* change error msg
* return on errors
---------
Co-authored-by: james-prysm <90280386+james-prysm@users.noreply.github.com>
This commit is contained in:
parent
f3ef1b64d6
commit
204de13c86
@ -7,4 +7,6 @@ const (
|
||||
ConsensusBlockValueHeader = "Eth-Consensus-Block-Value"
|
||||
JsonMediaType = "application/json"
|
||||
OctetStreamMediaType = "application/octet-stream"
|
||||
EventStreamMediaType = "text/event-stream"
|
||||
KeepAlive = "keep-alive"
|
||||
)
|
||||
|
@ -6,6 +6,7 @@ go_library(
|
||||
importpath = "github.com/prysmaticlabs/prysm/v4/beacon-chain/gateway",
|
||||
visibility = ["//beacon-chain:__subpackages__"],
|
||||
deps = [
|
||||
"//api:go_default_library",
|
||||
"//api/gateway:go_default_library",
|
||||
"//cmd/beacon-chain/flags:go_default_library",
|
||||
"//proto/prysm/v1alpha1:go_default_library",
|
||||
|
@ -2,6 +2,7 @@ package gateway
|
||||
|
||||
import (
|
||||
gwruntime "github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
|
||||
"github.com/prysmaticlabs/prysm/v4/api"
|
||||
"github.com/prysmaticlabs/prysm/v4/api/gateway"
|
||||
"github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/flags"
|
||||
ethpbalpha "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
|
||||
@ -40,7 +41,7 @@ func DefaultConfig(enableDebugRPCEndpoints bool, httpModules string) MuxConfig {
|
||||
},
|
||||
}),
|
||||
gwruntime.WithMarshalerOption(
|
||||
"text/event-stream", &gwruntime.EventSourceJSONPb{},
|
||||
api.EventStreamMediaType, &gwruntime.EventSourceJSONPb{},
|
||||
),
|
||||
)
|
||||
v1AlphaPbHandler = &gateway.PbMux{
|
||||
|
@ -8,8 +8,9 @@ go_library(
|
||||
"structs.go",
|
||||
],
|
||||
importpath = "github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/eth/events",
|
||||
visibility = ["//beacon-chain:__subpackages__"],
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//api:go_default_library",
|
||||
"//beacon-chain/blockchain:go_default_library",
|
||||
"//beacon-chain/core/feed:go_default_library",
|
||||
"//beacon-chain/core/feed/operation:go_default_library",
|
||||
@ -18,6 +19,7 @@ go_library(
|
||||
"//beacon-chain/core/time:go_default_library",
|
||||
"//beacon-chain/core/transition:go_default_library",
|
||||
"//beacon-chain/rpc/eth/shared:go_default_library",
|
||||
"//config/params:go_default_library",
|
||||
"//network/httputil:go_default_library",
|
||||
"//proto/eth/v1:go_default_library",
|
||||
"//proto/eth/v2:go_default_library",
|
||||
|
@ -5,11 +5,10 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
time2 "time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common/hexutil"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"go.opencensus.io/trace"
|
||||
|
||||
"github.com/prysmaticlabs/prysm/v4/api"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed/operation"
|
||||
@ -18,11 +17,14 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/time"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/transition"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/eth/shared"
|
||||
"github.com/prysmaticlabs/prysm/v4/config/params"
|
||||
"github.com/prysmaticlabs/prysm/v4/network/httputil"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v4/proto/eth/v1"
|
||||
ethpbv2 "github.com/prysmaticlabs/prysm/v4/proto/eth/v2"
|
||||
"github.com/prysmaticlabs/prysm/v4/runtime/version"
|
||||
"github.com/prysmaticlabs/prysm/v4/time/slots"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"go.opencensus.io/trace"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -114,16 +116,24 @@ func (s *Server) StreamEvents(w http.ResponseWriter, r *http.Request) {
|
||||
defer stateSub.Unsubscribe()
|
||||
|
||||
// Set up SSE response headers
|
||||
w.Header().Set("Content-Type", "text/event-stream")
|
||||
w.Header().Set("Connection", "keep-alive")
|
||||
w.Header().Set("Content-Type", api.EventStreamMediaType)
|
||||
w.Header().Set("Connection", api.KeepAlive)
|
||||
|
||||
// Handle each event received and context cancellation.
|
||||
// We send a keepalive dummy message immediately to prevent clients
|
||||
// stalling while waiting for the first response chunk.
|
||||
// After that we send a keepalive dummy message every SECONDS_PER_SLOT
|
||||
// to prevent anyone (e.g. proxy servers) from closing connections.
|
||||
sendKeepalive(w, flusher)
|
||||
keepaliveTicker := time2.NewTicker(time2.Duration(params.BeaconConfig().SecondsPerSlot) * time2.Second)
|
||||
for {
|
||||
select {
|
||||
case event := <-opsChan:
|
||||
handleBlockOperationEvents(w, flusher, topicsMap, event)
|
||||
case event := <-stateChan:
|
||||
s.handleStateEvents(ctx, w, flusher, topicsMap, event)
|
||||
case <-keepaliveTicker.C:
|
||||
sendKeepalive(w, flusher)
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
@ -505,6 +515,10 @@ func send(w http.ResponseWriter, flusher http.Flusher, name string, data interfa
|
||||
write(w, flusher, "event: %s\ndata: %s\n\n", name, string(j))
|
||||
}
|
||||
|
||||
func sendKeepalive(w http.ResponseWriter, flusher http.Flusher) {
|
||||
write(w, flusher, ":\n\n")
|
||||
}
|
||||
|
||||
func write(w http.ResponseWriter, flusher http.Flusher, format string, a ...any) {
|
||||
_, err := fmt.Fprintf(w, format, a...)
|
||||
if err != nil {
|
||||
|
@ -375,7 +375,9 @@ func TestStreamEvents_OperationsEvents(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
const operationsResult = `event: attestation
|
||||
const operationsResult = `:
|
||||
|
||||
event: attestation
|
||||
data: {"aggregation_bits":"0x00","data":{"slot":"0","index":"0","beacon_block_root":"0x0000000000000000000000000000000000000000000000000000000000000000","source":{"epoch":"0","root":"0x0000000000000000000000000000000000000000000000000000000000000000"},"target":{"epoch":"0","root":"0x0000000000000000000000000000000000000000000000000000000000000000"}},"signature":"0x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"}
|
||||
|
||||
event: attestation
|
||||
@ -401,7 +403,9 @@ data: {"signed_header_1":{"message":{"slot":"0","proposer_index":"0","parent_roo
|
||||
|
||||
`
|
||||
|
||||
const stateResult = `event: head
|
||||
const stateResult = `:
|
||||
|
||||
event: head
|
||||
data: {"slot":"0","block":"0x0000000000000000000000000000000000000000000000000000000000000000","state":"0x0000000000000000000000000000000000000000000000000000000000000000","epoch_transition":true,"execution_optimistic":false,"previous_duty_dependent_root":"0x0000000000000000000000000000000000000000000000000000000000000000","current_duty_dependent_root":"0x0000000000000000000000000000000000000000000000000000000000000000"}
|
||||
|
||||
event: finalized_checkpoint
|
||||
@ -415,17 +419,23 @@ data: {"slot":"0","block":"0xeade62f0457b2fdf48e7d3fc4b60736688286be7c7a3ac4c9a1
|
||||
|
||||
`
|
||||
|
||||
const payloadAttributesBellatrixResult = `event: payload_attributes
|
||||
const payloadAttributesBellatrixResult = `:
|
||||
|
||||
event: payload_attributes
|
||||
data: {"version":"bellatrix","data":{"proposer_index":"0","proposal_slot":"1","parent_block_number":"0","parent_block_root":"0x0000000000000000000000000000000000000000000000000000000000000000","parent_block_hash":"0x0000000000000000000000000000000000000000000000000000000000000000","payload_attributes":{"timestamp":"12","prev_randao":"0x0000000000000000000000000000000000000000000000000000000000000000","suggested_fee_recipient":"0x0000000000000000000000000000000000000000"}}}
|
||||
|
||||
`
|
||||
|
||||
const payloadAttributesCapellaResult = `event: payload_attributes
|
||||
const payloadAttributesCapellaResult = `:
|
||||
|
||||
event: payload_attributes
|
||||
data: {"version":"capella","data":{"proposer_index":"0","proposal_slot":"1","parent_block_number":"0","parent_block_root":"0x0000000000000000000000000000000000000000000000000000000000000000","parent_block_hash":"0x0000000000000000000000000000000000000000000000000000000000000000","payload_attributes":{"timestamp":"12","prev_randao":"0x0000000000000000000000000000000000000000000000000000000000000000","suggested_fee_recipient":"0x0000000000000000000000000000000000000000","withdrawals":[]}}}
|
||||
|
||||
`
|
||||
|
||||
const payloadAttributesDenebResult = `event: payload_attributes
|
||||
const payloadAttributesDenebResult = `:
|
||||
|
||||
event: payload_attributes
|
||||
data: {"version":"deneb","data":{"proposer_index":"0","proposal_slot":"1","parent_block_number":"0","parent_block_root":"0x0000000000000000000000000000000000000000000000000000000000000000","parent_block_hash":"0x0000000000000000000000000000000000000000000000000000000000000000","payload_attributes":{"timestamp":"12","prev_randao":"0x0000000000000000000000000000000000000000000000000000000000000000","suggested_fee_recipient":"0x0000000000000000000000000000000000000000","withdrawals":[],"parent_beacon_block_root":"0xbef96cb938fd48b2403d3e662664325abb0102ed12737cbb80d717520e50cf4a"}}}
|
||||
|
||||
`
|
||||
|
14
testing/validator-mock/node_client_mock.go
generated
14
testing/validator-mock/node_client_mock.go
generated
@ -81,6 +81,20 @@ func (mr *MockNodeClientMockRecorder) GetVersion(arg0, arg1 interface{}) *gomock
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetVersion", reflect.TypeOf((*MockNodeClient)(nil).GetVersion), arg0, arg1)
|
||||
}
|
||||
|
||||
// IsHealthy mocks base method.
|
||||
func (m *MockNodeClient) IsHealthy(arg0 context.Context) bool {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "IsHealthy", arg0)
|
||||
ret0, _ := ret[0].(bool)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// IsHealthy indicates an expected call of IsHealthy.
|
||||
func (mr *MockNodeClientMockRecorder) IsHealthy(arg0 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsHealthy", reflect.TypeOf((*MockNodeClient)(nil).IsHealthy), arg0)
|
||||
}
|
||||
|
||||
// ListPeers mocks base method.
|
||||
func (m *MockNodeClient) ListPeers(arg0 context.Context, arg1 *emptypb.Empty) (*eth.Peers, error) {
|
||||
m.ctrl.T.Helper()
|
||||
|
28
testing/validator-mock/validator_client_mock.go
generated
28
testing/validator-mock/validator_client_mock.go
generated
@ -67,6 +67,20 @@ func (mr *MockValidatorClientMockRecorder) DomainData(arg0, arg1 interface{}) *g
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DomainData", reflect.TypeOf((*MockValidatorClient)(nil).DomainData), arg0, arg1)
|
||||
}
|
||||
|
||||
// EventStreamIsRunning mocks base method.
|
||||
func (m *MockValidatorClient) EventStreamIsRunning() bool {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "EventStreamIsRunning")
|
||||
ret0, _ := ret[0].(bool)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// EventStreamIsRunning indicates an expected call of EventStreamIsRunning.
|
||||
func (mr *MockValidatorClientMockRecorder) EventStreamIsRunning() *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EventStreamIsRunning", reflect.TypeOf((*MockValidatorClient)(nil).EventStreamIsRunning))
|
||||
}
|
||||
|
||||
// GetAttestationData mocks base method.
|
||||
func (m *MockValidatorClient) GetAttestationData(arg0 context.Context, arg1 *eth.AttestationDataRequest) (*eth.AttestationData, error) {
|
||||
m.ctrl.T.Helper()
|
||||
@ -247,6 +261,20 @@ func (mr *MockValidatorClientMockRecorder) ProposeExit(arg0, arg1 interface{}) *
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProposeExit", reflect.TypeOf((*MockValidatorClient)(nil).ProposeExit), arg0, arg1)
|
||||
}
|
||||
|
||||
// StartEventStream mocks base method.
|
||||
func (m *MockValidatorClient) StartEventStream(arg0 context.Context) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "StartEventStream", arg0)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// StartEventStream indicates an expected call of StartEventStream.
|
||||
func (mr *MockValidatorClientMockRecorder) StartEventStream(arg0 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StartEventStream", reflect.TypeOf((*MockValidatorClient)(nil).StartEventStream), arg0)
|
||||
}
|
||||
|
||||
// StreamSlots mocks base method.
|
||||
func (m *MockValidatorClient) StreamSlots(arg0 context.Context, arg1 *eth.StreamSlotsRequest) (eth.BeaconNodeValidator_StreamSlotsClient, error) {
|
||||
m.ctrl.T.Helper()
|
||||
|
@ -212,3 +212,15 @@ func (m *Validator) SetProposerSettings(_ context.Context, settings *validatorse
|
||||
m.proposerSettings = settings
|
||||
return nil
|
||||
}
|
||||
|
||||
func (_ *Validator) StartEventStream(_ context.Context) error {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (_ *Validator) EventStreamIsRunning() bool {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (_ *Validator) NodeIsHealthy(ctx context.Context) bool {
|
||||
panic("implement me")
|
||||
}
|
||||
|
@ -15,6 +15,7 @@ go_library(
|
||||
"domain_data.go",
|
||||
"doppelganger.go",
|
||||
"duties.go",
|
||||
"event_handler.go",
|
||||
"genesis.go",
|
||||
"get_beacon_block.go",
|
||||
"index.go",
|
||||
@ -43,6 +44,7 @@ go_library(
|
||||
"//beacon-chain/core/signing:go_default_library",
|
||||
"//beacon-chain/rpc/eth/beacon:go_default_library",
|
||||
"//beacon-chain/rpc/eth/config:go_default_library",
|
||||
"//beacon-chain/rpc/eth/events:go_default_library",
|
||||
"//beacon-chain/rpc/eth/node:go_default_library",
|
||||
"//beacon-chain/rpc/eth/shared:go_default_library",
|
||||
"//beacon-chain/rpc/eth/validator:go_default_library",
|
||||
@ -83,6 +85,7 @@ go_test(
|
||||
"domain_data_test.go",
|
||||
"doppelganger_test.go",
|
||||
"duties_test.go",
|
||||
"event_handler_test.go",
|
||||
"genesis_test.go",
|
||||
"get_beacon_block_test.go",
|
||||
"index_test.go",
|
||||
@ -139,6 +142,7 @@ go_test(
|
||||
"@com_github_golang_mock//gomock:go_default_library",
|
||||
"@com_github_golang_protobuf//ptypes/empty",
|
||||
"@com_github_pkg_errors//:go_default_library",
|
||||
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
|
||||
"@org_golang_google_protobuf//types/known/emptypb:go_default_library",
|
||||
"@org_golang_google_protobuf//types/known/timestamppb:go_default_library",
|
||||
],
|
||||
|
@ -98,6 +98,10 @@ func (c *beaconApiNodeClient) ListPeers(ctx context.Context, in *empty.Empty) (*
|
||||
panic("beaconApiNodeClient.ListPeers is not implemented. To use a fallback client, pass a fallback client as the last argument of NewBeaconApiNodeClientWithFallback.")
|
||||
}
|
||||
|
||||
func (c *beaconApiNodeClient) IsHealthy(ctx context.Context) bool {
|
||||
return c.jsonRestHandler.Get(ctx, "/eth/v1/node/health", nil) == nil
|
||||
}
|
||||
|
||||
func NewNodeClientWithFallback(jsonRestHandler JsonRestHandler, fallbackClient iface.NodeClient) iface.NodeClient {
|
||||
return &beaconApiNodeClient{
|
||||
jsonRestHandler: jsonRestHandler,
|
||||
|
@ -13,17 +13,26 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/v4/validator/client/iface"
|
||||
)
|
||||
|
||||
type ValidatorClientOpt func(*beaconApiValidatorClient)
|
||||
|
||||
func WithEventHandler(h *EventHandler) ValidatorClientOpt {
|
||||
return func(c *beaconApiValidatorClient) {
|
||||
c.eventHandler = h
|
||||
}
|
||||
}
|
||||
|
||||
type beaconApiValidatorClient struct {
|
||||
genesisProvider GenesisProvider
|
||||
dutiesProvider dutiesProvider
|
||||
stateValidatorsProvider StateValidatorsProvider
|
||||
jsonRestHandler JsonRestHandler
|
||||
eventHandler *EventHandler
|
||||
beaconBlockConverter BeaconBlockConverter
|
||||
prysmBeaconChainCLient iface.PrysmBeaconChainClient
|
||||
}
|
||||
|
||||
func NewBeaconApiValidatorClient(jsonRestHandler JsonRestHandler) iface.ValidatorClient {
|
||||
return &beaconApiValidatorClient{
|
||||
func NewBeaconApiValidatorClient(jsonRestHandler JsonRestHandler, opts ...ValidatorClientOpt) iface.ValidatorClient {
|
||||
c := &beaconApiValidatorClient{
|
||||
genesisProvider: beaconApiGenesisProvider{jsonRestHandler: jsonRestHandler},
|
||||
dutiesProvider: beaconApiDutiesProvider{jsonRestHandler: jsonRestHandler},
|
||||
stateValidatorsProvider: beaconApiStateValidatorsProvider{jsonRestHandler: jsonRestHandler},
|
||||
@ -34,6 +43,10 @@ func NewBeaconApiValidatorClient(jsonRestHandler JsonRestHandler) iface.Validato
|
||||
jsonRestHandler: jsonRestHandler,
|
||||
},
|
||||
}
|
||||
for _, o := range opts {
|
||||
o(c)
|
||||
}
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *beaconApiValidatorClient) GetDuties(ctx context.Context, in *ethpb.DutiesRequest) (*ethpb.DutiesResponse, error) {
|
||||
@ -149,3 +162,16 @@ func (c *beaconApiValidatorClient) WaitForActivation(ctx context.Context, in *et
|
||||
func (c *beaconApiValidatorClient) WaitForChainStart(ctx context.Context, _ *empty.Empty) (*ethpb.ChainStartResponse, error) {
|
||||
return c.waitForChainStart(ctx)
|
||||
}
|
||||
|
||||
func (c *beaconApiValidatorClient) StartEventStream(ctx context.Context) error {
|
||||
if c.eventHandler != nil {
|
||||
if err := c.eventHandler.get(ctx, []string{"head"}); err != nil {
|
||||
return errors.Wrapf(err, "could not invoke event handler")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *beaconApiValidatorClient) EventStreamIsRunning() bool {
|
||||
return c.eventHandler.running
|
||||
}
|
||||
|
134
validator/client/beacon-api/event_handler.go
Normal file
134
validator/client/beacon-api/event_handler.go
Normal file
@ -0,0 +1,134 @@
|
||||
package beacon_api
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/prysm/v4/api"
|
||||
)
|
||||
|
||||
// Currently set to the first power of 2 bigger than the size of the `head` event
|
||||
// which is 446 bytes
|
||||
const eventByteLimit = 512
|
||||
|
||||
// EventHandler is responsible for subscribing to the Beacon API events endpoint
|
||||
// and dispatching received events to subscribers.
|
||||
type EventHandler struct {
|
||||
httpClient *http.Client
|
||||
host string
|
||||
running bool
|
||||
subs []eventSub
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
type eventSub struct {
|
||||
name string
|
||||
ch chan<- event
|
||||
}
|
||||
|
||||
type event struct {
|
||||
eventType string
|
||||
data string
|
||||
}
|
||||
|
||||
// NewEventHandler returns a new handler.
|
||||
func NewEventHandler(httpClient *http.Client, host string) *EventHandler {
|
||||
return &EventHandler{
|
||||
httpClient: httpClient,
|
||||
host: host,
|
||||
running: false,
|
||||
subs: make([]eventSub, 0),
|
||||
}
|
||||
}
|
||||
|
||||
func (h *EventHandler) subscribe(sub eventSub) {
|
||||
h.Lock()
|
||||
h.subs = append(h.subs, sub)
|
||||
h.Unlock()
|
||||
}
|
||||
|
||||
func (h *EventHandler) get(ctx context.Context, topics []string) error {
|
||||
if len(topics) == 0 {
|
||||
return errors.New("no topics provided")
|
||||
}
|
||||
if h.running {
|
||||
log.Warn("Event listener is already running, ignoring function call")
|
||||
}
|
||||
|
||||
go func() {
|
||||
h.running = true
|
||||
defer func() { h.running = false }()
|
||||
|
||||
allTopics := strings.Join(topics, ",")
|
||||
log.Info("Starting listening to Beacon API events on topics: " + allTopics)
|
||||
url := h.host + "/eth/v1/events?topics=" + allTopics
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to create HTTP request")
|
||||
return
|
||||
}
|
||||
req.Header.Set("Accept", api.EventStreamMediaType)
|
||||
req.Header.Set("Connection", api.KeepAlive)
|
||||
resp, err := h.httpClient.Do(req)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to perform HTTP request")
|
||||
return
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if closeErr := resp.Body.Close(); closeErr != nil {
|
||||
log.WithError(closeErr).Error("Failed to close events response body")
|
||||
}
|
||||
}()
|
||||
|
||||
// We signal an EOF error in a special way. When we get this error while reading the response body,
|
||||
// there might still be an event received in the body that we should handle.
|
||||
eof := false
|
||||
for {
|
||||
if ctx.Err() != nil {
|
||||
log.WithError(ctx.Err()).Error("Stopping listening to Beacon API events")
|
||||
return
|
||||
}
|
||||
|
||||
rawData := make([]byte, eventByteLimit)
|
||||
_, err = resp.Body.Read(rawData)
|
||||
if err != nil {
|
||||
if strings.Contains(err.Error(), "EOF") {
|
||||
log.Error("Received EOF while reading events response body. Stopping listening to Beacon API events")
|
||||
eof = true
|
||||
} else {
|
||||
log.WithError(err).Error("Stopping listening to Beacon API events")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
e := strings.Split(string(rawData), "\n")
|
||||
// We expect that the event format will contain event type and data separated with a newline
|
||||
if len(e) < 2 {
|
||||
// We reached EOF and there is no event to send
|
||||
if eof {
|
||||
return
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
for _, sub := range h.subs {
|
||||
select {
|
||||
case sub.ch <- event{eventType: e[0], data: e[1]}:
|
||||
// Event sent successfully.
|
||||
default:
|
||||
log.Warn("Subscriber '" + sub.name + "' not ready to receive events")
|
||||
}
|
||||
}
|
||||
// We reached EOF and sent the last event
|
||||
if eof {
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
55
validator/client/beacon-api/event_handler_test.go
Normal file
55
validator/client/beacon-api/event_handler_test.go
Normal file
@ -0,0 +1,55 @@
|
||||
package beacon_api
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/prysmaticlabs/prysm/v4/testing/assert"
|
||||
"github.com/prysmaticlabs/prysm/v4/testing/require"
|
||||
logtest "github.com/sirupsen/logrus/hooks/test"
|
||||
)
|
||||
|
||||
func TestEventHandler(t *testing.T) {
|
||||
logHook := logtest.NewGlobal()
|
||||
|
||||
mux := http.NewServeMux()
|
||||
mux.HandleFunc("/eth/v1/events", func(w http.ResponseWriter, r *http.Request) {
|
||||
flusher, ok := w.(http.Flusher)
|
||||
require.Equal(t, true, ok)
|
||||
_, err := fmt.Fprint(w, "head\ndata\n\n")
|
||||
require.NoError(t, err)
|
||||
flusher.Flush()
|
||||
})
|
||||
server := httptest.NewServer(mux)
|
||||
defer server.Close()
|
||||
|
||||
handler := NewEventHandler(http.DefaultClient, server.URL)
|
||||
ch1 := make(chan event, 1)
|
||||
sub1 := eventSub{ch: ch1}
|
||||
ch2 := make(chan event, 1)
|
||||
sub2 := eventSub{ch: ch2}
|
||||
ch3 := make(chan event, 1)
|
||||
sub3 := eventSub{name: "sub3", ch: ch3}
|
||||
// fill up the channel so that it can't receive more events
|
||||
ch3 <- event{}
|
||||
handler.subscribe(sub1)
|
||||
handler.subscribe(sub2)
|
||||
handler.subscribe(sub3)
|
||||
|
||||
require.NoError(t, handler.get(context.Background(), []string{"head"}))
|
||||
// make sure the goroutine inside handler.get is invoked
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
e := <-ch1
|
||||
assert.Equal(t, "head", e.eventType)
|
||||
assert.Equal(t, "data", e.data)
|
||||
e = <-ch2
|
||||
assert.Equal(t, "head", e.eventType)
|
||||
assert.Equal(t, "data", e.data)
|
||||
|
||||
assert.LogsContain(t, logHook, "Subscriber 'sub3' not ready to receive events")
|
||||
}
|
@ -6,6 +6,7 @@ import (
|
||||
"encoding/json"
|
||||
"io"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/prysm/v4/api"
|
||||
@ -25,10 +26,6 @@ type BeaconApiJsonRestHandler struct {
|
||||
// Get sends a GET request and decodes the response body as a JSON object into the passed in object.
|
||||
// If an HTTP error is returned, the body is decoded as a DefaultJsonError JSON object and returned as the first return value.
|
||||
func (c BeaconApiJsonRestHandler) Get(ctx context.Context, endpoint string, resp interface{}) error {
|
||||
if resp == nil {
|
||||
return errors.New("resp is nil")
|
||||
}
|
||||
|
||||
url := c.Host + endpoint
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
||||
if err != nil {
|
||||
@ -92,14 +89,16 @@ func decodeResp(httpResp *http.Response, resp interface{}) error {
|
||||
}
|
||||
|
||||
if httpResp.Header.Get("Content-Type") != api.JsonMediaType {
|
||||
if httpResp.StatusCode == http.StatusOK {
|
||||
// 2XX codes are a success
|
||||
if strings.HasPrefix(httpResp.Status, "2") {
|
||||
return nil
|
||||
}
|
||||
return &httputil.DefaultJsonError{Code: httpResp.StatusCode, Message: string(body)}
|
||||
}
|
||||
|
||||
decoder := json.NewDecoder(bytes.NewBuffer(body))
|
||||
if httpResp.StatusCode != http.StatusOK {
|
||||
// non-2XX codes are a failure
|
||||
if !strings.HasPrefix(httpResp.Status, "2") {
|
||||
errorJson := &httputil.DefaultJsonError{}
|
||||
if err = decoder.Decode(errorJson); err != nil {
|
||||
return errors.Wrapf(err, "failed to decode response body into error json for %s", httpResp.Request.URL)
|
||||
|
@ -103,17 +103,29 @@ func Test_decodeResp(t *testing.T) {
|
||||
t.Run("200 non-JSON", func(t *testing.T) {
|
||||
body := bytes.Buffer{}
|
||||
r := &http.Response{
|
||||
Status: "200",
|
||||
StatusCode: http.StatusOK,
|
||||
Body: io.NopCloser(&body),
|
||||
Header: map[string][]string{"Content-Type": {api.OctetStreamMediaType}},
|
||||
}
|
||||
require.NoError(t, decodeResp(r, nil))
|
||||
})
|
||||
t.Run("non-200 non-JSON", func(t *testing.T) {
|
||||
t.Run("204 non-JSON", func(t *testing.T) {
|
||||
body := bytes.Buffer{}
|
||||
r := &http.Response{
|
||||
Status: "204",
|
||||
StatusCode: http.StatusNoContent,
|
||||
Body: io.NopCloser(&body),
|
||||
Header: map[string][]string{"Content-Type": {api.OctetStreamMediaType}},
|
||||
}
|
||||
require.NoError(t, decodeResp(r, nil))
|
||||
})
|
||||
t.Run("500 non-JSON", func(t *testing.T) {
|
||||
body := bytes.Buffer{}
|
||||
_, err := body.WriteString("foo")
|
||||
require.NoError(t, err)
|
||||
r := &http.Response{
|
||||
Status: "500",
|
||||
StatusCode: http.StatusInternalServerError,
|
||||
Body: io.NopCloser(&body),
|
||||
Header: map[string][]string{"Content-Type": {api.OctetStreamMediaType}},
|
||||
@ -130,6 +142,7 @@ func Test_decodeResp(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
body.Write(b)
|
||||
r := &http.Response{
|
||||
Status: "200",
|
||||
StatusCode: http.StatusOK,
|
||||
Body: io.NopCloser(&body),
|
||||
Header: map[string][]string{"Content-Type": {api.JsonMediaType}},
|
||||
@ -141,18 +154,30 @@ func Test_decodeResp(t *testing.T) {
|
||||
t.Run("200 JSON without resp", func(t *testing.T) {
|
||||
body := bytes.Buffer{}
|
||||
r := &http.Response{
|
||||
Status: "200",
|
||||
StatusCode: http.StatusOK,
|
||||
Body: io.NopCloser(&body),
|
||||
Header: map[string][]string{"Content-Type": {api.JsonMediaType}},
|
||||
}
|
||||
require.NoError(t, decodeResp(r, nil))
|
||||
})
|
||||
t.Run("non-200 JSON", func(t *testing.T) {
|
||||
t.Run("204 JSON", func(t *testing.T) {
|
||||
body := bytes.Buffer{}
|
||||
r := &http.Response{
|
||||
Status: "204",
|
||||
StatusCode: http.StatusNoContent,
|
||||
Body: io.NopCloser(&body),
|
||||
Header: map[string][]string{"Content-Type": {api.JsonMediaType}},
|
||||
}
|
||||
require.NoError(t, decodeResp(r, nil))
|
||||
})
|
||||
t.Run("500 JSON", func(t *testing.T) {
|
||||
body := bytes.Buffer{}
|
||||
b, err := json.Marshal(&httputil.DefaultJsonError{Code: http.StatusInternalServerError, Message: "error"})
|
||||
require.NoError(t, err)
|
||||
body.Write(b)
|
||||
r := &http.Response{
|
||||
Status: "500",
|
||||
StatusCode: http.StatusInternalServerError,
|
||||
Body: io.NopCloser(&body),
|
||||
Header: map[string][]string{"Content-Type": {api.JsonMediaType}},
|
||||
@ -168,6 +193,7 @@ func Test_decodeResp(t *testing.T) {
|
||||
_, err := body.WriteString("foo")
|
||||
require.NoError(t, err)
|
||||
r := &http.Response{
|
||||
Status: "200",
|
||||
StatusCode: http.StatusOK,
|
||||
Body: io.NopCloser(&body),
|
||||
Header: map[string][]string{"Content-Type": {api.JsonMediaType}},
|
||||
@ -177,11 +203,12 @@ func Test_decodeResp(t *testing.T) {
|
||||
err = decodeResp(r, resp)
|
||||
assert.ErrorContains(t, "failed to decode response body into json", err)
|
||||
})
|
||||
t.Run("non-200 JSON cannot decode", func(t *testing.T) {
|
||||
t.Run("500 JSON cannot decode", func(t *testing.T) {
|
||||
body := bytes.Buffer{}
|
||||
_, err := body.WriteString("foo")
|
||||
require.NoError(t, err)
|
||||
r := &http.Response{
|
||||
Status: "500",
|
||||
StatusCode: http.StatusInternalServerError,
|
||||
Body: io.NopCloser(&body),
|
||||
Header: map[string][]string{"Content-Type": {api.JsonMediaType}},
|
||||
|
@ -4,10 +4,12 @@ import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common/hexutil"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/eth/events"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/eth/shared"
|
||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
|
||||
@ -26,8 +28,8 @@ type streamSlotsClient struct {
|
||||
ctx context.Context
|
||||
beaconApiClient beaconApiValidatorClient
|
||||
streamSlotsRequest *ethpb.StreamSlotsRequest
|
||||
prevBlockSlot primitives.Slot
|
||||
pingDelay time.Duration
|
||||
ch chan event
|
||||
}
|
||||
|
||||
type streamBlocksAltairClient struct {
|
||||
@ -46,11 +48,14 @@ type headSignedBeaconBlockResult struct {
|
||||
}
|
||||
|
||||
func (c beaconApiValidatorClient) streamSlots(ctx context.Context, in *ethpb.StreamSlotsRequest, pingDelay time.Duration) ethpb.BeaconNodeValidator_StreamSlotsClient {
|
||||
ch := make(chan event, 1)
|
||||
c.eventHandler.subscribe(eventSub{name: "stream slots", ch: ch})
|
||||
return &streamSlotsClient{
|
||||
ctx: ctx,
|
||||
beaconApiClient: c,
|
||||
streamSlotsRequest: in,
|
||||
pingDelay: pingDelay,
|
||||
ch: ch,
|
||||
}
|
||||
}
|
||||
|
||||
@ -64,28 +69,27 @@ func (c beaconApiValidatorClient) streamBlocks(ctx context.Context, in *ethpb.St
|
||||
}
|
||||
|
||||
func (c *streamSlotsClient) Recv() (*ethpb.StreamSlotsResponse, error) {
|
||||
result, err := c.beaconApiClient.getHeadSignedBeaconBlock(c.ctx)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to get latest signed block")
|
||||
}
|
||||
|
||||
// We keep querying the beacon chain for the latest block until we receive a new slot
|
||||
for (c.streamSlotsRequest.VerifiedOnly && result.executionOptimistic) || c.prevBlockSlot == result.slot {
|
||||
for {
|
||||
select {
|
||||
case <-time.After(c.pingDelay):
|
||||
result, err = c.beaconApiClient.getHeadSignedBeaconBlock(c.ctx)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to get latest signed block")
|
||||
case rawEvent := <-c.ch:
|
||||
if rawEvent.eventType != events.HeadTopic {
|
||||
continue
|
||||
}
|
||||
e := &events.HeadEvent{}
|
||||
if err := json.Unmarshal([]byte(rawEvent.data), e); err != nil {
|
||||
return nil, errors.Wrap(err, "failed to unmarshal head event into JSON")
|
||||
}
|
||||
uintSlot, err := strconv.ParseUint(e.Slot, 10, 64)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to parse slot")
|
||||
}
|
||||
return ðpb.StreamSlotsResponse{
|
||||
Slot: primitives.Slot(uintSlot),
|
||||
}, nil
|
||||
case <-c.ctx.Done():
|
||||
return nil, errors.New("context canceled")
|
||||
}
|
||||
}
|
||||
|
||||
c.prevBlockSlot = result.slot
|
||||
return ðpb.StreamSlotsResponse{
|
||||
Slot: result.slot,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *streamBlocksAltairClient) Recv() (*ethpb.StreamBlocksResponse, error) {
|
||||
|
@ -29,6 +29,10 @@ func (c *grpcNodeClient) ListPeers(ctx context.Context, in *empty.Empty) (*ethpb
|
||||
return c.nodeClient.ListPeers(ctx, in)
|
||||
}
|
||||
|
||||
func (c *grpcNodeClient) IsHealthy(context.Context) bool {
|
||||
panic("function not supported for gRPC client")
|
||||
}
|
||||
|
||||
func NewNodeClient(cc grpc.ClientConnInterface) iface.NodeClient {
|
||||
return &grpcNodeClient{ethpb.NewNodeClient(cc)}
|
||||
}
|
||||
|
@ -141,3 +141,11 @@ func (c *grpcValidatorClient) AggregatedSigAndAggregationBits(
|
||||
func NewGrpcValidatorClient(cc grpc.ClientConnInterface) iface.ValidatorClient {
|
||||
return &grpcValidatorClient{ethpb.NewBeaconNodeValidatorClient(cc)}
|
||||
}
|
||||
|
||||
func (c *grpcValidatorClient) StartEventStream(context.Context) error {
|
||||
panic("function not supported for gRPC client")
|
||||
}
|
||||
|
||||
func (c *grpcValidatorClient) EventStreamIsRunning() bool {
|
||||
panic("function not supported for gRPC client")
|
||||
}
|
||||
|
@ -12,4 +12,5 @@ type NodeClient interface {
|
||||
GetGenesis(ctx context.Context, in *empty.Empty) (*ethpb.Genesis, error)
|
||||
GetVersion(ctx context.Context, in *empty.Empty) (*ethpb.Version, error)
|
||||
ListPeers(ctx context.Context, in *empty.Empty) (*ethpb.Peers, error)
|
||||
IsHealthy(ctx context.Context) bool
|
||||
}
|
||||
|
@ -64,6 +64,9 @@ type Validator interface {
|
||||
SignValidatorRegistrationRequest(ctx context.Context, signer SigningFunc, newValidatorRegistration *ethpb.ValidatorRegistrationV1) (*ethpb.SignedValidatorRegistrationV1, error)
|
||||
ProposerSettings() *validatorserviceconfig.ProposerSettings
|
||||
SetProposerSettings(context.Context, *validatorserviceconfig.ProposerSettings) error
|
||||
StartEventStream(ctx context.Context) error
|
||||
EventStreamIsRunning() bool
|
||||
NodeIsHealthy(ctx context.Context) bool
|
||||
}
|
||||
|
||||
// SigningFunc interface defines a type for the a function that signs a message
|
||||
|
@ -34,4 +34,6 @@ type ValidatorClient interface {
|
||||
SubmitSignedContributionAndProof(ctx context.Context, in *ethpb.SignedContributionAndProof) (*empty.Empty, error)
|
||||
StreamSlots(ctx context.Context, in *ethpb.StreamSlotsRequest) (ethpb.BeaconNodeValidator_StreamSlotsClient, error)
|
||||
SubmitValidatorRegistrations(ctx context.Context, in *ethpb.SignedValidatorRegistrationsV1) (*empty.Empty, error)
|
||||
StartEventStream(ctx context.Context) error
|
||||
EventStreamIsRunning() bool
|
||||
}
|
||||
|
@ -7,6 +7,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/prysm/v4/config/features"
|
||||
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
|
||||
"github.com/prysmaticlabs/prysm/v4/config/params"
|
||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
|
||||
@ -195,6 +196,13 @@ func initializeValidatorAndGetHeadSlot(ctx context.Context, v iface.Validator) (
|
||||
log.WithError(err).Fatal("Could not wait for validator activation")
|
||||
}
|
||||
|
||||
if features.Get().EnableBeaconRESTApi {
|
||||
if err = v.StartEventStream(ctx); err != nil {
|
||||
log.WithError(err).Fatal("Could not start API event stream")
|
||||
}
|
||||
runHealthCheckRoutine(ctx, v)
|
||||
}
|
||||
|
||||
headSlot, err = v.CanonicalHeadSlot(ctx)
|
||||
if isConnectionError(err) {
|
||||
log.WithError(err).Warn("Could not get current canonical head slot")
|
||||
@ -279,3 +287,25 @@ func handleAssignmentError(err error, slot primitives.Slot) {
|
||||
log.WithField("error", err).Error("Failed to update assignments")
|
||||
}
|
||||
}
|
||||
|
||||
func runHealthCheckRoutine(ctx context.Context, v iface.Validator) {
|
||||
healthCheckTicker := time.NewTicker(time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second)
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-healthCheckTicker.C:
|
||||
if v.NodeIsHealthy(ctx) && !v.EventStreamIsRunning() {
|
||||
if err := v.StartEventStream(ctx); err != nil {
|
||||
log.WithError(err).Error("Could not start API event stream")
|
||||
}
|
||||
}
|
||||
case <-ctx.Done():
|
||||
if ctx.Err() != nil {
|
||||
log.WithError(ctx.Err()).Error("Context cancelled")
|
||||
}
|
||||
log.Error("Context cancelled")
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
@ -196,11 +196,16 @@ func (v *ValidatorService) Start() {
|
||||
Host: v.conn.GetBeaconApiUrl(),
|
||||
}
|
||||
|
||||
evHandler := beaconApi.NewEventHandler(http.DefaultClient, v.conn.GetBeaconApiUrl())
|
||||
opts := []beaconApi.ValidatorClientOpt{beaconApi.WithEventHandler(evHandler)}
|
||||
validatorClient := validatorClientFactory.NewValidatorClient(v.conn, restHandler, opts...)
|
||||
|
||||
valStruct := &validator{
|
||||
db: v.db,
|
||||
validatorClient: validatorClientFactory.NewValidatorClient(v.conn, restHandler),
|
||||
validatorClient: validatorClient,
|
||||
beaconClient: beaconChainClientFactory.NewBeaconChainClient(v.conn, restHandler),
|
||||
node: nodeClientFactory.NewNodeClient(v.conn, restHandler),
|
||||
nodeClient: nodeClientFactory.NewNodeClient(v.conn, restHandler),
|
||||
prysmBeaconClient: beaconChainClientFactory.NewPrysmBeaconClient(v.conn, restHandler),
|
||||
db: v.db,
|
||||
graffiti: v.graffiti,
|
||||
logValidatorBalances: v.logValidatorBalances,
|
||||
emitAccountMetrics: v.emitAccountMetrics,
|
||||
@ -224,7 +229,6 @@ func (v *ValidatorService) Start() {
|
||||
Web3SignerConfig: v.Web3SignerConfig,
|
||||
proposerSettings: v.proposerSettings,
|
||||
walletInitializedChannel: make(chan *wallet.Wallet, 1),
|
||||
prysmBeaconClient: beaconChainClientFactory.NewPrysmBeaconClient(v.conn, restHandler),
|
||||
validatorsRegBatchSize: v.validatorsRegBatchSize,
|
||||
}
|
||||
|
||||
|
@ -174,18 +174,18 @@ func (fv *FakeValidator) ProposeBlock(_ context.Context, slot primitives.Slot, _
|
||||
}
|
||||
|
||||
// SubmitAggregateAndProof for mocking.
|
||||
func (_ *FakeValidator) SubmitAggregateAndProof(_ context.Context, _ primitives.Slot, _ [fieldparams.BLSPubkeyLength]byte) {
|
||||
func (*FakeValidator) SubmitAggregateAndProof(_ context.Context, _ primitives.Slot, _ [fieldparams.BLSPubkeyLength]byte) {
|
||||
}
|
||||
|
||||
// SubmitSyncCommitteeMessage for mocking.
|
||||
func (_ *FakeValidator) SubmitSyncCommitteeMessage(_ context.Context, _ primitives.Slot, _ [fieldparams.BLSPubkeyLength]byte) {
|
||||
func (*FakeValidator) SubmitSyncCommitteeMessage(_ context.Context, _ primitives.Slot, _ [fieldparams.BLSPubkeyLength]byte) {
|
||||
}
|
||||
|
||||
// LogAttestationsSubmitted for mocking.
|
||||
func (_ *FakeValidator) LogAttestationsSubmitted() {}
|
||||
func (*FakeValidator) LogAttestationsSubmitted() {}
|
||||
|
||||
// UpdateDomainDataCaches for mocking.
|
||||
func (_ *FakeValidator) UpdateDomainDataCaches(context.Context, primitives.Slot) {}
|
||||
func (*FakeValidator) UpdateDomainDataCaches(context.Context, primitives.Slot) {}
|
||||
|
||||
// BalancesByPubkeys for mocking.
|
||||
func (fv *FakeValidator) BalancesByPubkeys(_ context.Context) map[[fieldparams.BLSPubkeyLength]byte]uint64 {
|
||||
@ -213,7 +213,7 @@ func (fv *FakeValidator) Keymanager() (keymanager.IKeymanager, error) {
|
||||
}
|
||||
|
||||
// CheckDoppelGanger for mocking
|
||||
func (_ *FakeValidator) CheckDoppelGanger(_ context.Context) error {
|
||||
func (*FakeValidator) CheckDoppelGanger(_ context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -237,7 +237,7 @@ func (fv *FakeValidator) HandleKeyReload(_ context.Context, newKeys [][fieldpara
|
||||
}
|
||||
|
||||
// SubmitSignedContributionAndProof for mocking
|
||||
func (_ *FakeValidator) SubmitSignedContributionAndProof(_ context.Context, _ primitives.Slot, _ [fieldparams.BLSPubkeyLength]byte) {
|
||||
func (*FakeValidator) SubmitSignedContributionAndProof(_ context.Context, _ primitives.Slot, _ [fieldparams.BLSPubkeyLength]byte) {
|
||||
}
|
||||
|
||||
// HasProposerSettings for mocking
|
||||
@ -266,22 +266,34 @@ func (fv *FakeValidator) PushProposerSettings(ctx context.Context, km keymanager
|
||||
}
|
||||
|
||||
// SetPubKeyToValidatorIndexMap for mocking
|
||||
func (_ *FakeValidator) SetPubKeyToValidatorIndexMap(_ context.Context, _ keymanager.IKeymanager) error {
|
||||
func (*FakeValidator) SetPubKeyToValidatorIndexMap(_ context.Context, _ keymanager.IKeymanager) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// SignValidatorRegistrationRequest for mocking
|
||||
func (_ *FakeValidator) SignValidatorRegistrationRequest(_ context.Context, _ iface.SigningFunc, _ *ethpb.ValidatorRegistrationV1) (*ethpb.SignedValidatorRegistrationV1, error) {
|
||||
func (*FakeValidator) SignValidatorRegistrationRequest(_ context.Context, _ iface.SigningFunc, _ *ethpb.ValidatorRegistrationV1) (*ethpb.SignedValidatorRegistrationV1, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// ProposerSettings for mocking
|
||||
func (f *FakeValidator) ProposerSettings() *validatorserviceconfig.ProposerSettings {
|
||||
return f.proposerSettings
|
||||
func (fv *FakeValidator) ProposerSettings() *validatorserviceconfig.ProposerSettings {
|
||||
return fv.proposerSettings
|
||||
}
|
||||
|
||||
// SetProposerSettings for mocking
|
||||
func (f *FakeValidator) SetProposerSettings(_ context.Context, settings *validatorserviceconfig.ProposerSettings) error {
|
||||
f.proposerSettings = settings
|
||||
func (fv *FakeValidator) SetProposerSettings(_ context.Context, settings *validatorserviceconfig.ProposerSettings) error {
|
||||
fv.proposerSettings = settings
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fv *FakeValidator) StartEventStream(_ context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fv *FakeValidator) EventStreamIsRunning() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (fv *FakeValidator) NodeIsHealthy(context.Context) bool {
|
||||
return true
|
||||
}
|
||||
|
@ -11,9 +11,10 @@ import (
|
||||
func NewValidatorClient(
|
||||
validatorConn validatorHelpers.NodeConnection,
|
||||
jsonRestHandler beaconApi.JsonRestHandler,
|
||||
opt ...beaconApi.ValidatorClientOpt,
|
||||
) iface.ValidatorClient {
|
||||
if features.Get().EnableBeaconRESTApi {
|
||||
return beaconApi.NewBeaconApiValidatorClient(jsonRestHandler)
|
||||
return beaconApi.NewBeaconApiValidatorClient(jsonRestHandler, opt...)
|
||||
} else {
|
||||
return grpcApi.NewGrpcValidatorClient(validatorConn.GetGrpcClientConn())
|
||||
}
|
||||
|
@ -91,19 +91,19 @@ type validator struct {
|
||||
interopKeysConfig *local.InteropKeymanagerConfig
|
||||
wallet *wallet.Wallet
|
||||
graffitiStruct *graffiti.Graffiti
|
||||
node iface.NodeClient
|
||||
db vdb.Database
|
||||
beaconClient iface.BeaconChainClient
|
||||
nodeClient iface.NodeClient
|
||||
validatorClient iface.ValidatorClient
|
||||
prysmBeaconClient iface.PrysmBeaconChainClient
|
||||
db vdb.Database
|
||||
keyManager keymanager.IKeymanager
|
||||
ticker slots.Ticker
|
||||
validatorClient iface.ValidatorClient
|
||||
graffiti []byte
|
||||
voteStats voteStats
|
||||
syncCommitteeStats syncCommitteeStats
|
||||
Web3SignerConfig *remoteweb3signer.SetupConfig
|
||||
proposerSettings *validatorserviceconfig.ProposerSettings
|
||||
walletInitializedChannel chan *wallet.Wallet
|
||||
prysmBeaconClient iface.PrysmBeaconChainClient
|
||||
validatorsRegBatchSize int
|
||||
}
|
||||
|
||||
@ -304,7 +304,7 @@ func (v *validator) WaitForSync(ctx context.Context) error {
|
||||
ctx, span := trace.StartSpan(ctx, "validator.WaitForSync")
|
||||
defer span.End()
|
||||
|
||||
s, err := v.node.GetSyncStatus(ctx, &emptypb.Empty{})
|
||||
s, err := v.nodeClient.GetSyncStatus(ctx, &emptypb.Empty{})
|
||||
if err != nil {
|
||||
return errors.Wrap(iface.ErrConnectionIssue, errors.Wrap(err, "could not get sync status").Error())
|
||||
}
|
||||
@ -316,7 +316,7 @@ func (v *validator) WaitForSync(ctx context.Context) error {
|
||||
select {
|
||||
// Poll every half slot.
|
||||
case <-time.After(slots.DivideSlotBy(2 /* twice per slot */)):
|
||||
s, err := v.node.GetSyncStatus(ctx, &emptypb.Empty{})
|
||||
s, err := v.nodeClient.GetSyncStatus(ctx, &emptypb.Empty{})
|
||||
if err != nil {
|
||||
return errors.Wrap(iface.ErrConnectionIssue, errors.Wrap(err, "could not get sync status").Error())
|
||||
}
|
||||
@ -330,7 +330,7 @@ func (v *validator) WaitForSync(ctx context.Context) error {
|
||||
}
|
||||
}
|
||||
|
||||
// ReceiveSlots starts a gRPC client stream listener to obtain
|
||||
// ReceiveSlots starts a stream listener to obtain
|
||||
// slots from the beacon node when it imports a block. Upon receiving a slot, the service
|
||||
// broadcasts it to a feed for other usages to subscribe to.
|
||||
func (v *validator) ReceiveSlots(ctx context.Context, connectionErrorChannel chan<- error) {
|
||||
@ -348,13 +348,14 @@ func (v *validator) ReceiveSlots(ctx context.Context, connectionErrorChannel cha
|
||||
}
|
||||
res, err := stream.Recv()
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Could not receive slots from beacon node, " + iface.ErrConnectionIssue.Error())
|
||||
log.WithError(err).Error("Could not receive slots from beacon node: " + iface.ErrConnectionIssue.Error())
|
||||
connectionErrorChannel <- errors.Wrap(iface.ErrConnectionIssue, err.Error())
|
||||
return
|
||||
}
|
||||
if res == nil {
|
||||
continue
|
||||
}
|
||||
log.Error("Setting highest slot")
|
||||
v.setHighestSlot(res.Slot)
|
||||
}
|
||||
}
|
||||
@ -1042,6 +1043,18 @@ func (v *validator) PushProposerSettings(ctx context.Context, km keymanager.IKey
|
||||
return nil
|
||||
}
|
||||
|
||||
func (v *validator) StartEventStream(ctx context.Context) error {
|
||||
return v.validatorClient.StartEventStream(ctx)
|
||||
}
|
||||
|
||||
func (v *validator) EventStreamIsRunning() bool {
|
||||
return v.validatorClient.EventStreamIsRunning()
|
||||
}
|
||||
|
||||
func (v *validator) NodeIsHealthy(ctx context.Context) bool {
|
||||
return v.nodeClient.IsHealthy(ctx)
|
||||
}
|
||||
|
||||
func (v *validator) filterAndCacheActiveKeys(ctx context.Context, pubkeys [][fieldparams.BLSPubkeyLength]byte, slot primitives.Slot) ([][fieldparams.BLSPubkeyLength]byte, error) {
|
||||
filteredKeys := make([][fieldparams.BLSPubkeyLength]byte, 0)
|
||||
statusRequestKeys := make([][]byte, 0)
|
||||
|
@ -394,7 +394,7 @@ func TestWaitSync_ContextCanceled(t *testing.T) {
|
||||
n := validatormock.NewMockNodeClient(ctrl)
|
||||
|
||||
v := validator{
|
||||
node: n,
|
||||
nodeClient: n,
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
@ -414,7 +414,7 @@ func TestWaitSync_NotSyncing(t *testing.T) {
|
||||
n := validatormock.NewMockNodeClient(ctrl)
|
||||
|
||||
v := validator{
|
||||
node: n,
|
||||
nodeClient: n,
|
||||
}
|
||||
|
||||
n.EXPECT().GetSyncStatus(
|
||||
@ -431,7 +431,7 @@ func TestWaitSync_Syncing(t *testing.T) {
|
||||
n := validatormock.NewMockNodeClient(ctrl)
|
||||
|
||||
v := validator{
|
||||
node: n,
|
||||
nodeClient: n,
|
||||
}
|
||||
|
||||
n.EXPECT().GetSyncStatus(
|
||||
@ -1304,7 +1304,7 @@ func TestValidator_PushProposerSettings(t *testing.T) {
|
||||
|
||||
v := validator{
|
||||
validatorClient: client,
|
||||
node: nodeClient,
|
||||
nodeClient: nodeClient,
|
||||
db: db,
|
||||
pubkeyToValidatorIndex: make(map[[fieldparams.BLSPubkeyLength]byte]primitives.ValidatorIndex),
|
||||
signedValidatorRegistrations: make(map[[fieldparams.BLSPubkeyLength]byte]*ethpb.SignedValidatorRegistrationV1),
|
||||
@ -1386,7 +1386,7 @@ func TestValidator_PushProposerSettings(t *testing.T) {
|
||||
|
||||
v := validator{
|
||||
validatorClient: client,
|
||||
node: nodeClient,
|
||||
nodeClient: nodeClient,
|
||||
db: db,
|
||||
pubkeyToValidatorIndex: make(map[[fieldparams.BLSPubkeyLength]byte]primitives.ValidatorIndex),
|
||||
signedValidatorRegistrations: make(map[[fieldparams.BLSPubkeyLength]byte]*ethpb.SignedValidatorRegistrationV1),
|
||||
@ -1464,7 +1464,7 @@ func TestValidator_PushProposerSettings(t *testing.T) {
|
||||
|
||||
v := validator{
|
||||
validatorClient: client,
|
||||
node: nodeClient,
|
||||
nodeClient: nodeClient,
|
||||
db: db,
|
||||
pubkeyToValidatorIndex: make(map[[fieldparams.BLSPubkeyLength]byte]primitives.ValidatorIndex),
|
||||
signedValidatorRegistrations: make(map[[fieldparams.BLSPubkeyLength]byte]*ethpb.SignedValidatorRegistrationV1),
|
||||
@ -1525,7 +1525,7 @@ func TestValidator_PushProposerSettings(t *testing.T) {
|
||||
|
||||
v := validator{
|
||||
validatorClient: client,
|
||||
node: nodeClient,
|
||||
nodeClient: nodeClient,
|
||||
db: db,
|
||||
pubkeyToValidatorIndex: make(map[[fieldparams.BLSPubkeyLength]byte]primitives.ValidatorIndex),
|
||||
signedValidatorRegistrations: make(map[[fieldparams.BLSPubkeyLength]byte]*ethpb.SignedValidatorRegistrationV1),
|
||||
@ -1593,7 +1593,7 @@ func TestValidator_PushProposerSettings(t *testing.T) {
|
||||
|
||||
v := validator{
|
||||
validatorClient: client,
|
||||
node: nodeClient,
|
||||
nodeClient: nodeClient,
|
||||
db: db,
|
||||
pubkeyToValidatorIndex: make(map[[fieldparams.BLSPubkeyLength]byte]primitives.ValidatorIndex),
|
||||
signedValidatorRegistrations: make(map[[fieldparams.BLSPubkeyLength]byte]*ethpb.SignedValidatorRegistrationV1),
|
||||
@ -1657,7 +1657,7 @@ func TestValidator_PushProposerSettings(t *testing.T) {
|
||||
|
||||
v := validator{
|
||||
validatorClient: client,
|
||||
node: nodeClient,
|
||||
nodeClient: nodeClient,
|
||||
db: db,
|
||||
pubkeyToValidatorIndex: make(map[[fieldparams.BLSPubkeyLength]byte]primitives.ValidatorIndex),
|
||||
signedValidatorRegistrations: make(map[[fieldparams.BLSPubkeyLength]byte]*ethpb.SignedValidatorRegistrationV1),
|
||||
@ -1752,7 +1752,7 @@ func TestValidator_PushProposerSettings(t *testing.T) {
|
||||
validatorSetter: func(t *testing.T) *validator {
|
||||
v := validator{
|
||||
validatorClient: client,
|
||||
node: nodeClient,
|
||||
nodeClient: nodeClient,
|
||||
db: db,
|
||||
pubkeyToValidatorIndex: make(map[[fieldparams.BLSPubkeyLength]byte]primitives.ValidatorIndex),
|
||||
signedValidatorRegistrations: make(map[[fieldparams.BLSPubkeyLength]byte]*ethpb.SignedValidatorRegistrationV1),
|
||||
|
@ -43,6 +43,7 @@ go_library(
|
||||
"//validator:__subpackages__",
|
||||
],
|
||||
deps = [
|
||||
"//api:go_default_library",
|
||||
"//api/gateway:go_default_library",
|
||||
"//api/server:go_default_library",
|
||||
"//async/event:go_default_library",
|
||||
|
@ -26,6 +26,7 @@ import (
|
||||
gwruntime "github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
|
||||
"github.com/pkg/errors"
|
||||
fastssz "github.com/prysmaticlabs/fastssz"
|
||||
"github.com/prysmaticlabs/prysm/v4/api"
|
||||
"github.com/prysmaticlabs/prysm/v4/api/gateway"
|
||||
"github.com/prysmaticlabs/prysm/v4/api/server"
|
||||
"github.com/prysmaticlabs/prysm/v4/async/event"
|
||||
@ -865,7 +866,7 @@ func (c *ValidatorClient) registerRPCGatewayService(router *mux.Router) error {
|
||||
},
|
||||
}),
|
||||
gwruntime.WithMarshalerOption(
|
||||
"text/event-stream", &gwruntime.EventSourceJSONPb{}, // TODO: remove this
|
||||
api.EventStreamMediaType, &gwruntime.EventSourceJSONPb{}, // TODO: remove this
|
||||
),
|
||||
gwruntime.WithForwardResponseOption(gateway.HttpResponseModifier),
|
||||
)
|
||||
|
@ -5,6 +5,7 @@ import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
"github.com/prysmaticlabs/prysm/v4/api"
|
||||
"github.com/prysmaticlabs/prysm/v4/network/httputil"
|
||||
pb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/v4/runtime/version"
|
||||
@ -39,9 +40,9 @@ func (s *Server) StreamBeaconLogs(w http.ResponseWriter, r *http.Request) {
|
||||
ctx, span := trace.StartSpan(r.Context(), "validator.web.health.StreamBeaconLogs")
|
||||
defer span.End()
|
||||
// Set up SSE response headers
|
||||
w.Header().Set("Content-Type", "text/event-stream")
|
||||
w.Header().Set("Content-Type", api.EventStreamMediaType)
|
||||
w.Header().Set("Cache-Control", "no-cache")
|
||||
w.Header().Set("Connection", "keep-alive")
|
||||
w.Header().Set("Connection", api.KeepAlive)
|
||||
|
||||
// Flush helper function to ensure data is sent to client
|
||||
flusher, ok := w.(http.Flusher)
|
||||
@ -108,9 +109,9 @@ func (s *Server) StreamValidatorLogs(w http.ResponseWriter, r *http.Request) {
|
||||
close(ch)
|
||||
}()
|
||||
// Set up SSE response headers
|
||||
w.Header().Set("Content-Type", "text/event-stream")
|
||||
w.Header().Set("Content-Type", api.EventStreamMediaType)
|
||||
w.Header().Set("Cache-Control", "no-cache")
|
||||
w.Header().Set("Connection", "keep-alive")
|
||||
w.Header().Set("Connection", api.KeepAlive)
|
||||
|
||||
recentLogs := s.logsStreamer.GetLastFewLogs()
|
||||
logStrings := make([]string, len(recentLogs))
|
||||
|
@ -11,6 +11,7 @@ import (
|
||||
|
||||
"github.com/golang/mock/gomock"
|
||||
"github.com/golang/protobuf/ptypes/empty"
|
||||
"github.com/prysmaticlabs/prysm/v4/api"
|
||||
"github.com/prysmaticlabs/prysm/v4/io/logs/mock"
|
||||
eth "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
|
||||
pb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
|
||||
@ -92,10 +93,10 @@ func TestStreamBeaconLogs(t *testing.T) {
|
||||
}
|
||||
ct, ok := resp.Header["Content-Type"]
|
||||
require.Equal(t, ok, true)
|
||||
require.Equal(t, ct[0], "text/event-stream")
|
||||
require.Equal(t, ct[0], api.EventStreamMediaType)
|
||||
cn, ok := resp.Header["Connection"]
|
||||
require.Equal(t, ok, true)
|
||||
require.Equal(t, cn[0], "keep-alive")
|
||||
require.Equal(t, cn[0], api.KeepAlive)
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, body)
|
||||
@ -143,10 +144,10 @@ func TestStreamValidatorLogs(t *testing.T) {
|
||||
}
|
||||
ct, ok := resp.Header["Content-Type"]
|
||||
require.Equal(t, ok, true)
|
||||
require.Equal(t, ct[0], "text/event-stream")
|
||||
require.Equal(t, ct[0], api.EventStreamMediaType)
|
||||
cn, ok := resp.Header["Connection"]
|
||||
require.Equal(t, ok, true)
|
||||
require.Equal(t, cn[0], "keep-alive")
|
||||
require.Equal(t, cn[0], api.KeepAlive)
|
||||
// Check if data was written
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
require.NoError(t, err)
|
||||
|
Loading…
Reference in New Issue
Block a user