diff --git a/shared/featureconfig/config.go b/shared/featureconfig/config.go index 78f70e96e..904876cb0 100644 --- a/shared/featureconfig/config.go +++ b/shared/featureconfig/config.go @@ -43,6 +43,7 @@ type Flags struct { DisableStrictAttestationPubsubVerification bool // DisableStrictAttestationPubsubVerification will disabling strict signature verification in pubsub. DisableUpdateHeadPerAttestation bool // DisableUpdateHeadPerAttestation will disabling update head on per attestation basis. EnableByteMempool bool // EnaableByteMempool memory management. + EnableDomainDataCache bool // EnableDomainDataCache caches validator calls to DomainData per epoch. // DisableForkChoice disables using LMD-GHOST fork choice to update // the head of the chain based on attestations and instead accepts any valid received block @@ -169,6 +170,10 @@ func ConfigureValidator(ctx *cli.Context) { log.Warn("Enabled validator attestation slashing protection.") cfg.ProtectAttester = true } + if ctx.GlobalBool(enableDomainDataCacheFlag.Name) { + log.Warn("Enabled domain data cache.") + cfg.EnableDomainDataCache = true + } Init(cfg) } diff --git a/shared/featureconfig/flags.go b/shared/featureconfig/flags.go index 2513e36bd..d7a26ac52 100644 --- a/shared/featureconfig/flags.go +++ b/shared/featureconfig/flags.go @@ -98,6 +98,11 @@ var ( Name: "enable-byte-mempool", Usage: "Enable use of sync.Pool for certain byte arrays in the beacon state", } + enableDomainDataCacheFlag = cli.BoolFlag{ + Name: "enable-domain-data-cache", + Usage: "Enable caching of domain data requests per epoch. This feature reduces the total " + + "calls to the beacon node for each assignment.", + } ) // Deprecated flags list. @@ -235,12 +240,14 @@ var ValidatorFlags = append(deprecatedFlags, []cli.Flag{ minimalConfigFlag, protectAttesterFlag, protectProposerFlag, + enableDomainDataCacheFlag, }...) // E2EValidatorFlags contains a list of the validator feature flags to be tested in E2E. var E2EValidatorFlags = []string{ "--protect-attester", "--protect-proposer", + "--enable-domain-data-cache", } // BeaconChainFlags contains a list of all the feature flags that apply to the beacon-chain client. diff --git a/validator/client/BUILD.bazel b/validator/client/BUILD.bazel index 7c6530f44..4a252bbef 100644 --- a/validator/client/BUILD.bazel +++ b/validator/client/BUILD.bazel @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", srcs = [ + "grpc_interceptor.go", "runner.go", "service.go", "validator.go", @@ -26,6 +27,8 @@ go_library( "//shared/slotutil:go_default_library", "//validator/db:go_default_library", "//validator/keymanager:go_default_library", + "@com_github_dgraph_io_ristretto//:go_default_library", + "@com_github_gogo_protobuf//proto:go_default_library", "@com_github_gogo_protobuf//types:go_default_library", "@com_github_grpc_ecosystem_go_grpc_middleware//:go_default_library", "@com_github_grpc_ecosystem_go_grpc_middleware//retry:go_default_library", @@ -43,6 +46,7 @@ go_library( "@org_golang_google_grpc//:go_default_library", "@org_golang_google_grpc//codes:go_default_library", "@org_golang_google_grpc//credentials:go_default_library", + "@org_golang_google_grpc//metadata:go_default_library", "@org_golang_google_grpc//status:go_default_library", ], ) diff --git a/validator/client/fake_validator_test.go b/validator/client/fake_validator_test.go index 26254e8f3..ef7937957 100644 --- a/validator/client/fake_validator_test.go +++ b/validator/client/fake_validator_test.go @@ -98,3 +98,5 @@ func (fv *fakeValidator) ProposeBlock(_ context.Context, slot uint64, pubKey [48 func (fv *fakeValidator) SubmitAggregateAndProof(_ context.Context, slot uint64, pubKey [48]byte) {} func (fv *fakeValidator) LogAttestationsSubmitted() {} + +func (fv *fakeValidator) UpdateDomainDataCaches(context.Context, uint64) {} diff --git a/validator/client/grpc_interceptor.go b/validator/client/grpc_interceptor.go new file mode 100644 index 000000000..a076e3f33 --- /dev/null +++ b/validator/client/grpc_interceptor.go @@ -0,0 +1,31 @@ +package client + +import ( + "context" + "time" + + "github.com/sirupsen/logrus" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" +) + +// This method logs the gRPC backend as well as request duration when the log level is set to debug +// or higher. +func logDebugRequestInfoUnaryInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + // Shortcut when debug logging is not enabled. + if logrus.GetLevel() < logrus.DebugLevel { + return invoker(ctx, method, req, reply, cc, opts...) + } + + var header metadata.MD + opts = append( + opts, + grpc.Header(&header), + ) + start := time.Now() + err := invoker(ctx, method, req, reply, cc, opts...) + log.WithField("backend", header["x-backend"]). + WithField("method", method).WithField("duration", time.Now().Sub(start)). + Debug("gRPC request finished.") + return err +} diff --git a/validator/client/runner.go b/validator/client/runner.go index 62a673bd9..c11a50f9c 100644 --- a/validator/client/runner.go +++ b/validator/client/runner.go @@ -5,6 +5,7 @@ import ( "sync" "time" + "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" pb "github.com/prysmaticlabs/prysm/proto/beacon/rpc/v1" "github.com/prysmaticlabs/prysm/shared/params" "go.opencensus.io/trace" @@ -28,6 +29,7 @@ type Validator interface { ProposeBlock(ctx context.Context, slot uint64, pubKey [48]byte) SubmitAggregateAndProof(ctx context.Context, slot uint64, pubKey [48]byte) LogAttestationsSubmitted() + UpdateDomainDataCaches(ctx context.Context, slot uint64) } // Run the main validator routine. This routine exits if the context is @@ -83,6 +85,11 @@ func run(ctx context.Context, v Validator) { continue } + // Start fetching domain data for the next epoch. + if helpers.IsEpochEnd(slot) { + go v.UpdateDomainDataCaches(ctx, slot+1) + } + var wg sync.WaitGroup allRoles, err := v.RolesAt(ctx, slot) diff --git a/validator/client/service.go b/validator/client/service.go index e7240e295..a6e49dc21 100644 --- a/validator/client/service.go +++ b/validator/client/service.go @@ -3,6 +3,7 @@ package client import ( "context" + "github.com/dgraph-io/ristretto" middleware "github.com/grpc-ecosystem/go-grpc-middleware" grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" @@ -110,6 +111,7 @@ func (v *ValidatorService) Start() { grpc_opentracing.UnaryClientInterceptor(), grpc_prometheus.UnaryClientInterceptor, grpc_retry.UnaryClientInterceptor(), + logDebugRequestInfoUnaryInterceptor, )), } conn, err := grpc.DialContext(v.ctx, v.endpoint, opts...) @@ -132,6 +134,15 @@ func (v *ValidatorService) Start() { } v.conn = conn + cache, err := ristretto.NewCache(&ristretto.Config{ + NumCounters: 1280, // number of keys to track. + MaxCost: 128, // maximum cost of cache, 1 item = 1 cost. + BufferItems: 64, // number of keys per Get buffer. + }) + if err != nil { + panic(err) + } + v.validator = &validator{ db: valDB, validatorClient: ethpb.NewBeaconNodeValidatorClient(v.conn), @@ -144,6 +155,7 @@ func (v *ValidatorService) Start() { emitAccountMetrics: v.emitAccountMetrics, prevBalance: make(map[[48]byte]uint64), attLogs: make(map[[32]byte]*attSubmitted), + domainDataCache: cache, } go run(v.ctx, v.validator) } diff --git a/validator/client/validator.go b/validator/client/validator.go index 58d3bc178..9f3197a0c 100644 --- a/validator/client/validator.go +++ b/validator/client/validator.go @@ -4,17 +4,23 @@ package client import ( "context" "encoding/binary" + "encoding/hex" "fmt" "io" + "strconv" + "strings" "sync" "time" + "github.com/dgraph-io/ristretto" + "github.com/gogo/protobuf/proto" ptypes "github.com/gogo/protobuf/types" "github.com/pkg/errors" ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" pb "github.com/prysmaticlabs/prysm/proto/beacon/rpc/v1" "github.com/prysmaticlabs/prysm/shared/bytesutil" + "github.com/prysmaticlabs/prysm/shared/featureconfig" "github.com/prysmaticlabs/prysm/shared/hashutil" "github.com/prysmaticlabs/prysm/shared/params" "github.com/prysmaticlabs/prysm/shared/slotutil" @@ -40,6 +46,8 @@ type validator struct { emitAccountMetrics bool attLogs map[[32]byte]*attSubmitted attLogsLock sync.Mutex + domainDataLock sync.Mutex + domainDataCache *ristretto.Cache } // Done cleans up the validator. @@ -330,3 +338,53 @@ func (v *validator) isAggregator(ctx context.Context, committee []uint64, slot u return binary.LittleEndian.Uint64(b[:8])%modulo == 0, nil } + +// UpdateDomainDataCaches by making calls for all of the possible domain data. These can change when +// the fork version changes which can happen once per epoch. Although changing for the fork version +// is very rare, a validator should check these data every epoch to be sure the validator is +// participating on the correct fork version. +func (v *validator) UpdateDomainDataCaches(ctx context.Context, slot uint64) { + if !featureconfig.Get().EnableDomainDataCache { + return + } + + for _, d := range [][]byte{ + params.BeaconConfig().DomainRandao, + params.BeaconConfig().DomainBeaconAttester, + params.BeaconConfig().DomainBeaconProposer, + } { + _, err := v.domainData(ctx, helpers.SlotToEpoch(slot), d) + if err != nil { + log.WithError(err).Errorf("Failed to update domain data for domain %v", d) + } + } +} + +func (v *validator) domainData(ctx context.Context, epoch uint64, domain []byte) (*ethpb.DomainResponse, error) { + v.domainDataLock.Lock() + defer v.domainDataLock.Unlock() + + req := ðpb.DomainRequest{ + Epoch: epoch, + Domain: domain, + } + + key := strings.Join([]string{strconv.FormatUint(req.Epoch, 10), hex.EncodeToString(req.Domain)}, ",") + + if featureconfig.Get().EnableDomainDataCache { + if val, ok := v.domainDataCache.Get(key); ok { + return proto.Clone(val.(proto.Message)).(*ethpb.DomainResponse), nil + } + } + + res, err := v.validatorClient.DomainData(ctx, req) + if err != nil { + return nil, err + } + + if featureconfig.Get().EnableDomainDataCache { + v.domainDataCache.Set(key, proto.Clone(res), 1) + } + + return res, nil +} diff --git a/validator/client/validator_aggregate.go b/validator/client/validator_aggregate.go index bbeea15c3..d3686c242 100644 --- a/validator/client/validator_aggregate.go +++ b/validator/client/validator_aggregate.go @@ -104,10 +104,7 @@ func (v *validator) SubmitAggregateAndProof(ctx context.Context, slot uint64, pu // This implements selection logic outlined in: // https://github.com/ethereum/eth2.0-specs/blob/v0.9.3/specs/validator/0_beacon-chain-validator.md#aggregation-selection func (v *validator) signSlot(ctx context.Context, pubKey [48]byte, slot uint64) ([]byte, error) { - domain, err := v.validatorClient.DomainData(ctx, ðpb.DomainRequest{ - Epoch: helpers.SlotToEpoch(slot), - Domain: params.BeaconConfig().DomainBeaconAttester, - }) + domain, err := v.domainData(ctx, helpers.SlotToEpoch(slot), params.BeaconConfig().DomainBeaconAttester) if err != nil { return nil, err } diff --git a/validator/client/validator_attest.go b/validator/client/validator_attest.go index 41ab2d445..5bc41e6ee 100644 --- a/validator/client/validator_attest.go +++ b/validator/client/validator_attest.go @@ -199,10 +199,7 @@ func (v *validator) duty(pubKey [48]byte) (*ethpb.DutiesResponse_Duty, error) { // Given validator's public key, this returns the signature of an attestation data. func (v *validator) signAtt(ctx context.Context, pubKey [48]byte, data *ethpb.AttestationData) ([]byte, error) { - domain, err := v.validatorClient.DomainData(ctx, ðpb.DomainRequest{ - Epoch: data.Target.Epoch, - Domain: params.BeaconConfig().DomainBeaconAttester, - }) + domain, err := v.domainData(ctx, data.Target.Epoch, params.BeaconConfig().DomainBeaconAttester) if err != nil { return nil, err } diff --git a/validator/client/validator_propose.go b/validator/client/validator_propose.go index 119988d39..c9d5d4280 100644 --- a/validator/client/validator_propose.go +++ b/validator/client/validator_propose.go @@ -172,10 +172,8 @@ func (v *validator) ProposeExit(ctx context.Context, exit *ethpb.VoluntaryExit) // Sign randao reveal with randao domain and private key. func (v *validator) signRandaoReveal(ctx context.Context, pubKey [48]byte, epoch uint64) ([]byte, error) { - domain, err := v.validatorClient.DomainData(ctx, ðpb.DomainRequest{ - Epoch: epoch, - Domain: params.BeaconConfig().DomainRandao, - }) + domain, err := v.domainData(ctx, epoch, params.BeaconConfig().DomainRandao) + if err != nil { return nil, errors.Wrap(err, "could not get domain data") } @@ -190,10 +188,7 @@ func (v *validator) signRandaoReveal(ctx context.Context, pubKey [48]byte, epoch // Sign block with proposer domain and private key. func (v *validator) signBlock(ctx context.Context, pubKey [48]byte, epoch uint64, b *ethpb.BeaconBlock) ([]byte, error) { - domain, err := v.validatorClient.DomainData(ctx, ðpb.DomainRequest{ - Epoch: epoch, - Domain: params.BeaconConfig().DomainBeaconProposer, - }) + domain, err := v.domainData(ctx, epoch, params.BeaconConfig().DomainBeaconProposer) if err != nil { return nil, errors.Wrap(err, "could not get domain data") }