mirror of
https://gitlab.com/pulsechaincom/prysm-pulse.git
synced 2024-12-26 05:17:22 +00:00
aed6e13498
* initial commit for cli integration of web3signer * resolving conflicts and execution * remove aggregation slot from proto * rem aggregation slot * define a sync message block root struct * fix sync message name * sync message block root struct * amend where sync committee block root is used * altered switch statement to return correct json request by type * fixing fork data import, types, and unit tests * reverting unwanted changes * reverting more unwanted changes * fixing deepsource issues * fixing formatting * more fixes for deepsource and code clean up * only want to fetch once for fetch validating public keys * adding more comments * new unit tests for requests and fixing a mapper issue * Update validator/client/validator.go Co-authored-by: Raul Jordan <raul@prysmaticlabs.com> * Update validator/accounts/wallet/wallet.go Co-authored-by: Raul Jordan <raul@prysmaticlabs.com> * adjusting comment * adjusting comment * fixing import organization * including more unit tests * adding new cli edit * adding in checks for wallet initialize * adding web3signer flags to main.go * some how resolved files did not save correctly * adding in check to make sure web flag only works with types imported and derived * Update validator/client/sync_committee.go Co-authored-by: Raul Jordan <raul@prysmaticlabs.com> * Update validator/client/aggregate.go Co-authored-by: Raul Jordan <raul@prysmaticlabs.com> * Update validator/accounts/wallet/wallet.go Co-authored-by: Raul Jordan <raul@prysmaticlabs.com> * Update cmd/validator/wallet/wallet.go Co-authored-by: Raul Jordan <raul@prysmaticlabs.com> * Update cmd/validator/wallet/wallet.go Co-authored-by: Raul Jordan <raul@prysmaticlabs.com> * Update cmd/validator/main.go Co-authored-by: Raul Jordan <raul@prysmaticlabs.com> * Update cmd/validator/flags/flags.go Co-authored-by: Raul Jordan <raul@prysmaticlabs.com> * Update cmd/validator/flags/flags.go Co-authored-by: Raul Jordan <raul@prysmaticlabs.com> * Update cmd/validator/wallet/wallet.go Co-authored-by: Raul Jordan <raul@prysmaticlabs.com> * Update cmd/validator/wallet/wallet.go Co-authored-by: Raul Jordan <raul@prysmaticlabs.com> * reverting changes that accidently got checked in * reverting * reverting * continuing to revert unintenteded changes * reverting * removing more unneeded changes * addressing review comment * initial refactor * adding in more clarifying comments * fixing mock * resolving desource issues * addressing gosec scan for helper go file * addressing gosec * trying to fix bazel build * removal of interface to fix build * fixing maligned struct * addressing deepsource * fixing deepsource * addressing efficiency of type checking * fixing bazel test failure * fixing go linter errors * gaz * web changes * add w3signer * new kind * proper use * align * adding prysm validator flags to help flags list * addressing root comment * ci lint * fixing standardapi tests * fixing accounts_test after removal of keymanager from rpc server * fixing more unit tests * Update cmd/validator/flags/flags.go Co-authored-by: Raul Jordan <raul@prysmaticlabs.com> * Update cmd/validator/flags/flags.go Co-authored-by: Raul Jordan <raul@prysmaticlabs.com> * Update validator/client/service.go Co-authored-by: Raul Jordan <raul@prysmaticlabs.com> * Update validator/client/service.go Co-authored-by: Raul Jordan <raul@prysmaticlabs.com> * addressing missed err checks * fixing mock tests * fixing gofmt * unskipping minimal e2e test and removing related TODOs * Update testing/endtoend/components/validator.go Co-authored-by: Preston Van Loon <preston@prysmaticlabs.com> * Update testing/endtoend/components/validator.go Co-authored-by: Preston Van Loon <preston@prysmaticlabs.com> * adding some error wrapers to clarify failure point * fixing bazel build with new error checks * taking preston's advice to make test fail faster to understand what's going on with the test * checking if genesis validators root is not zero hash * adding check for genesis validators root giving zero hash * fixing missing dependency * adding check for wallet * log all * fixing errors for http responses * switching marshal to pretty print * adding pretty sign request test * fixing base url setting * adding in check for web3signer and temporary wallet instead of having to open the wallet * refactoring web3signer to not require wallet * bazel build fix * fixing gazelle build * adding content type of request * fixing more bazel * removing unused code * removing unused comments * adding skip test back in * addressing a validation and error message * fix parse * body * fixing logic for datadir * improving error handling * show resp * fix * sign resp as str * point of pointer remove * sign resp * unmarshal sig resp * read body as str * adding more verbose logging * removing unused result * fixing unit test * reconfiguring files to properly nest code and mocks * fix build issue * using context when using client function calls * fixing based on suggestion * addressing comments * gaz * removing defined max timeout * reverting json print pretty * Update validator/accounts/wallet_edit.go Co-authored-by: Preston Van Loon <preston@prysmaticlabs.com> * removing unneeded code restrictions * should not introduce new code that may impact existing key manager types * adjusting comments * adding in json validation * running go mod tidy * some logging * more logs * fixing typo * remove logs * testing without byte trim * fixing order or properties * gaz * tidy * reverting some logs * removing the confusing comments * Update validator/client/aggregate.go Co-authored-by: Raul Jordan <raul@prysmaticlabs.com> * Update validator/client/aggregate.go Co-authored-by: Raul Jordan <raul@prysmaticlabs.com> * addressing pr comments * editing bytes test * Run gazelle update-repos * run gazelle * improving unit test coverage * fixing text * fixing a potential escaped error Co-authored-by: Raul Jordan <raul@prysmaticlabs.com> Co-authored-by: Preston Van Loon <preston@prysmaticlabs.com>
208 lines
6.6 KiB
Go
208 lines
6.6 KiB
Go
package client
|
|
|
|
import (
|
|
"context"
|
|
"io"
|
|
"time"
|
|
|
|
"github.com/pkg/errors"
|
|
fieldparams "github.com/prysmaticlabs/prysm/config/fieldparams"
|
|
"github.com/prysmaticlabs/prysm/config/params"
|
|
"github.com/prysmaticlabs/prysm/encoding/bytesutil"
|
|
"github.com/prysmaticlabs/prysm/math"
|
|
"github.com/prysmaticlabs/prysm/monitoring/tracing"
|
|
ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
|
|
"github.com/prysmaticlabs/prysm/time/slots"
|
|
"github.com/prysmaticlabs/prysm/validator/keymanager/remote"
|
|
"go.opencensus.io/trace"
|
|
)
|
|
|
|
// WaitForActivation checks whether the validator pubkey is in the active
|
|
// validator set. If not, this operation will block until an activation message is
|
|
// received. This method also monitors the keymanager for updates while waiting for an activation
|
|
// from the gRPC server.
|
|
//
|
|
// If the channel parameter is nil, WaitForActivation creates and manages its own channel.
|
|
func (v *validator) WaitForActivation(ctx context.Context, accountsChangedChan chan [][fieldparams.BLSPubkeyLength]byte) error {
|
|
// Monitor the key manager for updates.
|
|
if accountsChangedChan == nil {
|
|
accountsChangedChan = make(chan [][fieldparams.BLSPubkeyLength]byte, 1)
|
|
km, err := v.Keymanager()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
sub := km.SubscribeAccountChanges(accountsChangedChan)
|
|
defer func() {
|
|
sub.Unsubscribe()
|
|
close(accountsChangedChan)
|
|
}()
|
|
}
|
|
|
|
return v.waitForActivation(ctx, accountsChangedChan)
|
|
}
|
|
|
|
// waitForActivation performs the following:
|
|
// 1) While the key manager is empty, poll the key manager until some validator keys exist.
|
|
// 2) Open a server side stream for activation events against the given keys.
|
|
// 3) In another go routine, the key manager is monitored for updates and emits an update event on
|
|
// the accountsChangedChan. When an event signal is received, restart the waitForActivation routine.
|
|
// 4) If the stream is reset in error, restart the routine.
|
|
// 5) If the stream returns a response indicating one or more validators are active, exit the routine.
|
|
func (v *validator) waitForActivation(ctx context.Context, accountsChangedChan <-chan [][fieldparams.BLSPubkeyLength]byte) error {
|
|
ctx, span := trace.StartSpan(ctx, "validator.WaitForActivation")
|
|
defer span.End()
|
|
|
|
validatingKeys, err := v.keyManager.FetchValidatingPublicKeys(ctx)
|
|
if err != nil {
|
|
return errors.Wrap(err, "could not fetch validating keys")
|
|
}
|
|
if len(validatingKeys) == 0 {
|
|
log.Warn(msgNoKeysFetched)
|
|
|
|
ticker := time.NewTicker(keyRefetchPeriod)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
validatingKeys, err = v.keyManager.FetchValidatingPublicKeys(ctx)
|
|
if err != nil {
|
|
return errors.Wrap(err, msgCouldNotFetchKeys)
|
|
}
|
|
if len(validatingKeys) == 0 {
|
|
log.Warn(msgNoKeysFetched)
|
|
continue
|
|
}
|
|
case <-ctx.Done():
|
|
log.Debug("Context closed, exiting fetching validating keys")
|
|
return ctx.Err()
|
|
}
|
|
break
|
|
}
|
|
}
|
|
|
|
req := ðpb.ValidatorActivationRequest{
|
|
PublicKeys: bytesutil.FromBytes48Array(validatingKeys),
|
|
}
|
|
stream, err := v.validatorClient.WaitForActivation(ctx, req)
|
|
if err != nil {
|
|
tracing.AnnotateError(span, err)
|
|
attempts := streamAttempts(ctx)
|
|
log.WithError(err).WithField("attempts", attempts).
|
|
Error("Stream broken while waiting for activation. Reconnecting...")
|
|
// Reconnection attempt backoff, up to 60s.
|
|
time.Sleep(time.Second * time.Duration(math.Min(uint64(attempts), 60)))
|
|
return v.waitForActivation(incrementRetries(ctx), accountsChangedChan)
|
|
}
|
|
|
|
remoteKm, ok := v.keyManager.(remote.RemoteKeymanager)
|
|
if ok {
|
|
for {
|
|
select {
|
|
case <-accountsChangedChan:
|
|
// Accounts (keys) changed, restart the process.
|
|
return v.waitForActivation(ctx, accountsChangedChan)
|
|
case <-v.NextSlot():
|
|
if ctx.Err() == context.Canceled {
|
|
return errors.Wrap(ctx.Err(), "context canceled, not waiting for activation anymore")
|
|
}
|
|
|
|
validatingKeys, err = remoteKm.ReloadPublicKeys(ctx)
|
|
if err != nil {
|
|
return errors.Wrap(err, msgCouldNotFetchKeys)
|
|
}
|
|
statusRequestKeys := make([][]byte, len(validatingKeys))
|
|
for i := range validatingKeys {
|
|
statusRequestKeys[i] = validatingKeys[i][:]
|
|
}
|
|
resp, err := v.validatorClient.MultipleValidatorStatus(ctx, ðpb.MultipleValidatorStatusRequest{
|
|
PublicKeys: statusRequestKeys,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
statuses := make([]*validatorStatus, len(resp.Statuses))
|
|
for i, s := range resp.Statuses {
|
|
statuses[i] = &validatorStatus{
|
|
publicKey: resp.PublicKeys[i],
|
|
status: s,
|
|
index: resp.Indices[i],
|
|
}
|
|
}
|
|
|
|
valActivated := v.checkAndLogValidatorStatus(statuses)
|
|
if valActivated {
|
|
logActiveValidatorStatus(statuses)
|
|
} else {
|
|
continue
|
|
}
|
|
}
|
|
break
|
|
}
|
|
} else {
|
|
for {
|
|
select {
|
|
case <-accountsChangedChan:
|
|
// Accounts (keys) changed, restart the process.
|
|
return v.waitForActivation(ctx, accountsChangedChan)
|
|
default:
|
|
res, err := stream.Recv()
|
|
// If the stream is closed, we stop the loop.
|
|
if errors.Is(err, io.EOF) {
|
|
break
|
|
}
|
|
// If context is canceled we return from the function.
|
|
if ctx.Err() == context.Canceled {
|
|
return errors.Wrap(ctx.Err(), "context has been canceled so shutting down the loop")
|
|
}
|
|
if err != nil {
|
|
tracing.AnnotateError(span, err)
|
|
attempts := streamAttempts(ctx)
|
|
log.WithError(err).WithField("attempts", attempts).
|
|
Error("Stream broken while waiting for activation. Reconnecting...")
|
|
// Reconnection attempt backoff, up to 60s.
|
|
time.Sleep(time.Second * time.Duration(math.Min(uint64(attempts), 60)))
|
|
return v.waitForActivation(incrementRetries(ctx), accountsChangedChan)
|
|
}
|
|
|
|
statuses := make([]*validatorStatus, len(res.Statuses))
|
|
for i, s := range res.Statuses {
|
|
statuses[i] = &validatorStatus{
|
|
publicKey: s.PublicKey,
|
|
status: s.Status,
|
|
index: s.Index,
|
|
}
|
|
}
|
|
|
|
valActivated := v.checkAndLogValidatorStatus(statuses)
|
|
if valActivated {
|
|
logActiveValidatorStatus(statuses)
|
|
} else {
|
|
continue
|
|
}
|
|
}
|
|
break
|
|
}
|
|
}
|
|
|
|
v.ticker = slots.NewSlotTicker(time.Unix(int64(v.genesisTime), 0), params.BeaconConfig().SecondsPerSlot)
|
|
return nil
|
|
}
|
|
|
|
// Preferred way to use context keys is with a non built-in type. See: RVV-B0003
|
|
type waitForActivationContextKey string
|
|
|
|
const waitForActivationAttemptsContextKey = waitForActivationContextKey("WaitForActivation-attempts")
|
|
|
|
func streamAttempts(ctx context.Context) int {
|
|
attempts, ok := ctx.Value(waitForActivationAttemptsContextKey).(int)
|
|
if !ok {
|
|
return 1
|
|
}
|
|
return attempts
|
|
}
|
|
|
|
func incrementRetries(ctx context.Context) context.Context {
|
|
attempts := streamAttempts(ctx)
|
|
return context.WithValue(ctx, waitForActivationAttemptsContextKey, attempts+1)
|
|
}
|