prysm-pulse/validator/client/attest.go
Manu NALEPA ef21d3adf8
Implement EIP-3076 minimal slashing protection, using a filesystem database (#13360)
* `EpochFromString`: Use already defined `Uint64FromString` function.

* `Test_uint64FromString` => `Test_FromString`

This test function tests more functions than `Uint64FromString`.

* Slashing protection history: Remove unreachable code.

The function `NewKVStore` creates, via `kv.UpdatePublicKeysBuckets`,
a new item in the `proposal-history-bucket-interchange`.

IMO there is no real reason to prefer `proposal` than `attestation`
as a prefix for this bucket, but this is the way it is done right now
and renaming the bucket will probably be backward incompatible.

An `attestedPublicKey` cannot exist without
the corresponding `proposedPublicKey`.

Thus, the `else` portion of code removed in this commit is not reachable.
We raise an error if we get there.

This is also probably the reason why the removed `else` portion was not
tested.

* `NewKVStore`: Switch items in `createBuckets`.

So the order corresponds to `schema.go`

* `slashableAttestationCheck`: Fix comments and logs.

* `ValidatorClient.db`: Use `iface.ValidatorDB`.

* BoltDB database: Implement `GraffitiFileHash`.

* Filesystem database: Creates `db.go`.

This file defines the following structs:
- `Store`
- `Graffiti`
- `Configuration`
- `ValidatorSlashingProtection`

This files implements the following public functions:
- `NewStore`
- `Close`
- `Backup`
- `DatabasePath`
- `ClearDB`
- `UpdatePublicKeysBuckets`

This files implements the following private functions:
- `slashingProtectionDirPath`
- `configurationFilePath`
- `configuration`
- `saveConfiguration`
- `validatorSlashingProtection`
- `saveValidatorSlashingProtection`
- `publicKeys`

* Filesystem database: Creates `genesis.go`.

This file defines the following public functions:
- `GenesisValidatorsRoot`
- `SaveGenesisValidatorsRoot`

* Filesystem database: Creates `graffiti.go`.

This file defines the following public functions:
- `SaveGraffitiOrderedIndex`
- `GraffitiOrderedIndex`

* Filesystem database: Creates `migration.go`.

This file defines the following public functions:
- `RunUpMigrations`
- `RunDownMigrations`

* Filesystem database: Creates proposer_settings.go.

This file defines the following public functions:
- `ProposerSettings`
- `ProposerSettingsExists`
- `SaveProposerSettings`

* Filesystem database: Creates `attester_protection.go`.

This file defines the following public functions:
- `EIPImportBlacklistedPublicKeys`
- `SaveEIPImportBlacklistedPublicKeys`
- `SigningRootAtTargetEpoch`
- `LowestSignedTargetEpoch`
- `LowestSignedSourceEpoch`
- `AttestedPublicKeys`
- `CheckSlashableAttestation`
- `SaveAttestationForPubKey`
- `SaveAttestationsForPubKey`
- `AttestationHistoryForPubKey`

* Filesystem database: Creates `proposer_protection.go`.

This file defines the following public functions:
- `HighestSignedProposal`
- `LowestSignedProposal`
- `ProposalHistoryForPubKey`
- `ProposalHistoryForSlot`
- `ProposedPublicKeys`

* Ensure that the filesystem store implements the `ValidatorDB` interface.

* `slashableAttestationCheck`: Check the database type.

* `slashableProposalCheck`: Check the database type.

* `slashableAttestationCheck`: Allow usage of minimal slashing protection.

* `slashableProposalCheck`: Allow usage of minimal slashing protection.

* `ImportStandardProtectionJSON`: Check the database type.

* `ImportStandardProtectionJSON`: Allow usage of min slashing protection.

* Implement `RecursiveDirFind`.

* Implement minimal<->complete DB conversion.

3 public functions are implemented:
- `IsCompleteDatabaseExisting`
- `IsMinimalDatabaseExisting`
- `ConvertDatabase`

* `setupDB`: Add `isSlashingProtectionMinimal` argument.

The feature addition is located in `validator/node/node_test.go`.
The rest of this commit consists in minimal slashing protection testing.

* `setupWithKey`: Add `isSlashingProtectionMinimal` argument.

The feature addition is located in `validator/client/propose_test.go`.

The rest of this commit consists in tests wrapping.

* `setup`: Add `isSlashingProtectionMinimal` argument.

The added feature is located in the `validator/client/propose_test.go`
file.

The rest of this commit consists in tests wrapping.

* `initializeFromCLI` and `initializeForWeb`: Factorize db init.

* Add `convert-complete-to-minimal` command.

* Creates `--enable-minimal-slashing-protection` flag.

* `importSlashingProtectionJSON`: Check database type.

* `exportSlashingProtectionJSON`: Check database type.

* `TestClearDB`: Test with minimal slashing protection.

* KeyManager: Test with minimal slashing protection.

* RPC: KeyManager: Test with minimal slashing protection.

* `convert-complete-to-minimal`: Change option names.

Options were:
- `--source` (for source data directory), and
- `--target` (for target data directory)

However, since this command deals with slashing protection, which has
source (epochs) and target (epochs), the initial option names may confuse
the user.

In this commit:
`--source` ==> `--source-data-dir`
`--target` ==> `--target-data-dir`

* Set `SlashableAttestationCheck` as an iface method.

And delete `CheckSlashableAttestation` from iface.

* Move helpers functions in a more general directory.

No functional change.

* Extract common structs out of `kv`.

==> `filesystem` does not depend anymore on `kv`.
==> `iface` does not depend anymore on `kv`.
==> `slashing-protection` does not depend anymore on `kv`.

* Move `ValidateMetadata` in `validator/helpers`.

* `ValidateMetadata`: Test with mock.

This way, we can:
- Avoid any circular import for tests.
- Implement once for all `iface.ValidatorDB` implementations
  the `ValidateMetadata`function.
- Have tests (and coverage) of `ValidateMetadata`in
  its own package.

The ideal solution would have been to implement `ValidateMetadata` as
a method with the `iface.ValidatorDB`receiver.
Unfortunately, golang does not allow that.

* `iface.ValidatorDB`: Implement ImportStandardProtectionJSON.

The whole purpose of this commit is to avoid the `switch validatorDB.(type)`
in `ImportStandardProtectionJSON`.

* `iface.ValidatorDB`: Implement `SlashableProposalCheck`.

* Remove now useless `slashableProposalCheck`.

* Delete useless `ImportStandardProtectionJSON`.

* `file.Exists`: Detect directories and return an error.

Before, `Exists` was only able to detect if a file exists.
Now, this function takes an extra `File` or `Directory` argument.
It detects either if a file or a directory exists.

Before, if an error was returned by `os.Stat`, the the file was
considered as non existing.
Now, it is treated as a real error.

* Replace `os.Stat` by `file.Exists`.

* Remove `Is{Complete,Minimal}DatabaseExisting`.

* `publicKeys`: Add log if unexpected file found.

* Move `{Source,Target}DataDirFlag`in `db.go`.

* `failedAttLocalProtectionErr`: `var`==> `const`

* `signingRoot`: `32`==> `fieldparams.RootLength`.

* `validatorClientData`==> `validator-client-data`.

To be consistent with `slashing-protection`.

* Add progress bars for `import` and `convert`.

* `parseBlocksForUniquePublicKeys`: Move in `db/kv`.

* helpers: Remove unused `initializeProgressBar` function.
2024-03-05 15:27:15 +00:00

309 lines
10 KiB
Go

package client
import (
"bytes"
"context"
"fmt"
"strings"
"time"
"github.com/pkg/errors"
"github.com/prysmaticlabs/go-bitfield"
"github.com/prysmaticlabs/prysm/v5/async"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/signing"
"github.com/prysmaticlabs/prysm/v5/config/features"
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v5/monitoring/tracing"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
validatorpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1/validator-client"
prysmTime "github.com/prysmaticlabs/prysm/v5/time"
"github.com/prysmaticlabs/prysm/v5/time/slots"
"github.com/prysmaticlabs/prysm/v5/validator/client/iface"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)
var failedAttLocalProtectionErr = "attempted to make slashable attestation, rejected by local slashing protection"
// SubmitAttestation completes the validator client's attester responsibility at a given slot.
// It fetches the latest beacon block head along with the latest canonical beacon state
// information in order to sign the block and include information about the validator's
// participation in voting on the block.
func (v *validator) SubmitAttestation(ctx context.Context, slot primitives.Slot, pubKey [fieldparams.BLSPubkeyLength]byte) {
ctx, span := trace.StartSpan(ctx, "validator.SubmitAttestation")
defer span.End()
span.AddAttributes(trace.StringAttribute("validator", fmt.Sprintf("%#x", pubKey)))
v.waitOneThirdOrValidBlock(ctx, slot)
var b strings.Builder
if err := b.WriteByte(byte(iface.RoleAttester)); err != nil {
log.WithError(err).Error("Could not write role byte for lock key")
tracing.AnnotateError(span, err)
return
}
_, err := b.Write(pubKey[:])
if err != nil {
log.WithError(err).Error("Could not write pubkey bytes for lock key")
tracing.AnnotateError(span, err)
return
}
lock := async.NewMultilock(b.String())
lock.Lock()
defer lock.Unlock()
fmtKey := fmt.Sprintf("%#x", pubKey[:])
log := log.WithField("pubkey", fmt.Sprintf("%#x", bytesutil.Trunc(pubKey[:]))).WithField("slot", slot)
duty, err := v.duty(pubKey)
if err != nil {
log.WithError(err).Error("Could not fetch validator assignment")
if v.emitAccountMetrics {
ValidatorAttestFailVec.WithLabelValues(fmtKey).Inc()
}
tracing.AnnotateError(span, err)
return
}
if len(duty.Committee) == 0 {
log.Debug("Empty committee for validator duty, not attesting")
return
}
req := &ethpb.AttestationDataRequest{
Slot: slot,
CommitteeIndex: duty.CommitteeIndex,
}
data, err := v.validatorClient.GetAttestationData(ctx, req)
if err != nil {
log.WithError(err).Error("Could not request attestation to sign at slot")
if v.emitAccountMetrics {
ValidatorAttestFailVec.WithLabelValues(fmtKey).Inc()
}
tracing.AnnotateError(span, err)
return
}
indexedAtt := &ethpb.IndexedAttestation{
AttestingIndices: []uint64{uint64(duty.ValidatorIndex)},
Data: data,
}
_, signingRoot, err := v.getDomainAndSigningRoot(ctx, indexedAtt.Data)
if err != nil {
log.WithError(err).Error("Could not get domain and signing root from attestation")
if v.emitAccountMetrics {
ValidatorAttestFailVec.WithLabelValues(fmtKey).Inc()
}
tracing.AnnotateError(span, err)
return
}
sig, _, err := v.signAtt(ctx, pubKey, data, slot)
if err != nil {
log.WithError(err).Error("Could not sign attestation")
if v.emitAccountMetrics {
ValidatorAttestFailVec.WithLabelValues(fmtKey).Inc()
}
tracing.AnnotateError(span, err)
return
}
var indexInCommittee uint64
var found bool
for i, vID := range duty.Committee {
if vID == duty.ValidatorIndex {
indexInCommittee = uint64(i)
found = true
break
}
}
if !found {
log.Errorf("Validator ID %d not found in committee of %v", duty.ValidatorIndex, duty.Committee)
if v.emitAccountMetrics {
ValidatorAttestFailVec.WithLabelValues(fmtKey).Inc()
}
return
}
aggregationBitfield := bitfield.NewBitlist(uint64(len(duty.Committee)))
aggregationBitfield.SetBitAt(indexInCommittee, true)
attestation := &ethpb.Attestation{
Data: data,
AggregationBits: aggregationBitfield,
Signature: sig,
}
// Set the signature of the attestation and send it out to the beacon node.
indexedAtt.Signature = sig
if err := v.db.SlashableAttestationCheck(ctx, indexedAtt, pubKey, signingRoot, v.emitAccountMetrics, ValidatorAttestFailVec); err != nil {
log.WithError(err).Error("Failed attestation slashing protection check")
log.WithFields(
attestationLogFields(pubKey, indexedAtt),
).Debug("Attempted slashable attestation details")
tracing.AnnotateError(span, err)
return
}
attResp, err := v.validatorClient.ProposeAttestation(ctx, attestation)
if err != nil {
log.WithError(err).Error("Could not submit attestation to beacon node")
if v.emitAccountMetrics {
ValidatorAttestFailVec.WithLabelValues(fmtKey).Inc()
}
tracing.AnnotateError(span, err)
return
}
if err := v.saveSubmittedAtt(data, pubKey[:], false); err != nil {
log.WithError(err).Error("Could not save validator index for logging")
if v.emitAccountMetrics {
ValidatorAttestFailVec.WithLabelValues(fmtKey).Inc()
}
tracing.AnnotateError(span, err)
return
}
span.AddAttributes(
trace.Int64Attribute("slot", int64(slot)), // lint:ignore uintcast -- This conversion is OK for tracing.
trace.StringAttribute("attestationHash", fmt.Sprintf("%#x", attResp.AttestationDataRoot)),
trace.Int64Attribute("committeeIndex", int64(data.CommitteeIndex)),
trace.StringAttribute("blockRoot", fmt.Sprintf("%#x", data.BeaconBlockRoot)),
trace.Int64Attribute("justifiedEpoch", int64(data.Source.Epoch)),
trace.Int64Attribute("targetEpoch", int64(data.Target.Epoch)),
trace.StringAttribute("bitfield", fmt.Sprintf("%#x", aggregationBitfield)),
)
if v.emitAccountMetrics {
ValidatorAttestSuccessVec.WithLabelValues(fmtKey).Inc()
ValidatorAttestedSlotsGaugeVec.WithLabelValues(fmtKey).Set(float64(slot))
}
}
// Given the validator public key, this gets the validator assignment.
func (v *validator) duty(pubKey [fieldparams.BLSPubkeyLength]byte) (*ethpb.DutiesResponse_Duty, error) {
v.dutiesLock.RLock()
defer v.dutiesLock.RUnlock()
if v.duties == nil {
return nil, errors.New("no duties for validators")
}
for _, duty := range v.duties.CurrentEpochDuties {
if bytes.Equal(pubKey[:], duty.PublicKey) {
return duty, nil
}
}
return nil, fmt.Errorf("pubkey %#x not in duties", bytesutil.Trunc(pubKey[:]))
}
// Given validator's public key, this function returns the signature of an attestation data and its signing root.
func (v *validator) signAtt(ctx context.Context, pubKey [fieldparams.BLSPubkeyLength]byte, data *ethpb.AttestationData, slot primitives.Slot) ([]byte, [32]byte, error) {
domain, root, err := v.getDomainAndSigningRoot(ctx, data)
if err != nil {
return nil, [32]byte{}, err
}
sig, err := v.keyManager.Sign(ctx, &validatorpb.SignRequest{
PublicKey: pubKey[:],
SigningRoot: root[:],
SignatureDomain: domain.SignatureDomain,
Object: &validatorpb.SignRequest_AttestationData{AttestationData: data},
SigningSlot: slot,
})
if err != nil {
return nil, [32]byte{}, err
}
return sig.Marshal(), root, nil
}
func (v *validator) getDomainAndSigningRoot(ctx context.Context, data *ethpb.AttestationData) (*ethpb.DomainResponse, [32]byte, error) {
domain, err := v.domainData(ctx, data.Target.Epoch, params.BeaconConfig().DomainBeaconAttester[:])
if err != nil {
return nil, [32]byte{}, err
}
root, err := signing.ComputeSigningRoot(data, domain.SignatureDomain)
if err != nil {
return nil, [32]byte{}, err
}
return domain, root, nil
}
// highestSlot returns the highest slot with a valid block seen by the validator
func (v *validator) highestSlot() primitives.Slot {
v.highestValidSlotLock.Lock()
defer v.highestValidSlotLock.Unlock()
return v.highestValidSlot
}
// setHighestSlot sets the highest slot with a valid block seen by the validator
func (v *validator) setHighestSlot(slot primitives.Slot) {
v.highestValidSlotLock.Lock()
defer v.highestValidSlotLock.Unlock()
if slot > v.highestValidSlot {
v.highestValidSlot = slot
v.slotFeed.Send(slot)
}
}
// waitOneThirdOrValidBlock waits until (a) or (b) whichever comes first:
//
// (a) the validator has received a valid block that is the same slot as input slot
// (b) one-third of the slot has transpired (SECONDS_PER_SLOT / 3 seconds after the start of slot)
func (v *validator) waitOneThirdOrValidBlock(ctx context.Context, slot primitives.Slot) {
ctx, span := trace.StartSpan(ctx, "validator.waitOneThirdOrValidBlock")
defer span.End()
// Don't need to wait if requested slot is the same as highest valid slot.
if slot <= v.highestSlot() {
return
}
delay := slots.DivideSlotBy(3 /* a third of the slot duration */)
startTime := slots.StartTime(v.genesisTime, slot)
finalTime := startTime.Add(delay)
wait := prysmTime.Until(finalTime)
if wait <= 0 {
return
}
t := time.NewTimer(wait)
defer t.Stop()
ch := make(chan primitives.Slot, 1)
sub := v.slotFeed.Subscribe(ch)
defer sub.Unsubscribe()
for {
select {
case s := <-ch:
if features.Get().AttestTimely {
if slot <= s {
return
}
}
case <-ctx.Done():
tracing.AnnotateError(span, ctx.Err())
return
case <-sub.Err():
log.Error("Subscriber closed, exiting goroutine")
return
case <-t.C:
return
}
}
}
func attestationLogFields(pubKey [fieldparams.BLSPubkeyLength]byte, indexedAtt *ethpb.IndexedAttestation) logrus.Fields {
return logrus.Fields{
"pubkey": fmt.Sprintf("%#x", pubKey),
"slot": indexedAtt.Data.Slot,
"committeeIndex": indexedAtt.Data.CommitteeIndex,
"blockRoot": fmt.Sprintf("%#x", indexedAtt.Data.BeaconBlockRoot),
"sourceEpoch": indexedAtt.Data.Source.Epoch,
"sourceRoot": fmt.Sprintf("%#x", indexedAtt.Data.Source.Root),
"targetEpoch": indexedAtt.Data.Target.Epoch,
"targetRoot": fmt.Sprintf("%#x", indexedAtt.Data.Target.Root),
"signature": fmt.Sprintf("%#x", indexedAtt.Signature),
}
}