Validator: cache domain data calls (#4914)

* Use a domain data cache to reduce the number of calls per epoch

* fix fakevalidator

* Refactor to use a feature flag, use proto.clone, move interceptor to its own file

* gofmt

* fix comment

* tune cache slightly

* log if error on domain data

Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
This commit is contained in:
Preston Van Loon 2020-02-24 13:02:45 -08:00 committed by GitHub
parent 855f5d2986
commit c0f1a1d674
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 131 additions and 16 deletions

View File

@ -43,6 +43,7 @@ type Flags struct {
DisableStrictAttestationPubsubVerification bool // DisableStrictAttestationPubsubVerification will disabling strict signature verification in pubsub.
DisableUpdateHeadPerAttestation bool // DisableUpdateHeadPerAttestation will disabling update head on per attestation basis.
EnableByteMempool bool // EnaableByteMempool memory management.
EnableDomainDataCache bool // EnableDomainDataCache caches validator calls to DomainData per epoch.
// DisableForkChoice disables using LMD-GHOST fork choice to update
// the head of the chain based on attestations and instead accepts any valid received block
@ -169,6 +170,10 @@ func ConfigureValidator(ctx *cli.Context) {
log.Warn("Enabled validator attestation slashing protection.")
cfg.ProtectAttester = true
}
if ctx.GlobalBool(enableDomainDataCacheFlag.Name) {
log.Warn("Enabled domain data cache.")
cfg.EnableDomainDataCache = true
}
Init(cfg)
}

View File

@ -98,6 +98,11 @@ var (
Name: "enable-byte-mempool",
Usage: "Enable use of sync.Pool for certain byte arrays in the beacon state",
}
enableDomainDataCacheFlag = cli.BoolFlag{
Name: "enable-domain-data-cache",
Usage: "Enable caching of domain data requests per epoch. This feature reduces the total " +
"calls to the beacon node for each assignment.",
}
)
// Deprecated flags list.
@ -235,12 +240,14 @@ var ValidatorFlags = append(deprecatedFlags, []cli.Flag{
minimalConfigFlag,
protectAttesterFlag,
protectProposerFlag,
enableDomainDataCacheFlag,
}...)
// E2EValidatorFlags contains a list of the validator feature flags to be tested in E2E.
var E2EValidatorFlags = []string{
"--protect-attester",
"--protect-proposer",
"--enable-domain-data-cache",
}
// BeaconChainFlags contains a list of all the feature flags that apply to the beacon-chain client.

View File

@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"grpc_interceptor.go",
"runner.go",
"service.go",
"validator.go",
@ -26,6 +27,8 @@ go_library(
"//shared/slotutil:go_default_library",
"//validator/db:go_default_library",
"//validator/keymanager:go_default_library",
"@com_github_dgraph_io_ristretto//:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
"@com_github_gogo_protobuf//types:go_default_library",
"@com_github_grpc_ecosystem_go_grpc_middleware//:go_default_library",
"@com_github_grpc_ecosystem_go_grpc_middleware//retry:go_default_library",
@ -43,6 +46,7 @@ go_library(
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//codes:go_default_library",
"@org_golang_google_grpc//credentials:go_default_library",
"@org_golang_google_grpc//metadata:go_default_library",
"@org_golang_google_grpc//status:go_default_library",
],
)

View File

@ -98,3 +98,5 @@ func (fv *fakeValidator) ProposeBlock(_ context.Context, slot uint64, pubKey [48
func (fv *fakeValidator) SubmitAggregateAndProof(_ context.Context, slot uint64, pubKey [48]byte) {}
func (fv *fakeValidator) LogAttestationsSubmitted() {}
func (fv *fakeValidator) UpdateDomainDataCaches(context.Context, uint64) {}

View File

@ -0,0 +1,31 @@
package client
import (
"context"
"time"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
// This method logs the gRPC backend as well as request duration when the log level is set to debug
// or higher.
func logDebugRequestInfoUnaryInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
// Shortcut when debug logging is not enabled.
if logrus.GetLevel() < logrus.DebugLevel {
return invoker(ctx, method, req, reply, cc, opts...)
}
var header metadata.MD
opts = append(
opts,
grpc.Header(&header),
)
start := time.Now()
err := invoker(ctx, method, req, reply, cc, opts...)
log.WithField("backend", header["x-backend"]).
WithField("method", method).WithField("duration", time.Now().Sub(start)).
Debug("gRPC request finished.")
return err
}

View File

@ -5,6 +5,7 @@ import (
"sync"
"time"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
pb "github.com/prysmaticlabs/prysm/proto/beacon/rpc/v1"
"github.com/prysmaticlabs/prysm/shared/params"
"go.opencensus.io/trace"
@ -28,6 +29,7 @@ type Validator interface {
ProposeBlock(ctx context.Context, slot uint64, pubKey [48]byte)
SubmitAggregateAndProof(ctx context.Context, slot uint64, pubKey [48]byte)
LogAttestationsSubmitted()
UpdateDomainDataCaches(ctx context.Context, slot uint64)
}
// Run the main validator routine. This routine exits if the context is
@ -83,6 +85,11 @@ func run(ctx context.Context, v Validator) {
continue
}
// Start fetching domain data for the next epoch.
if helpers.IsEpochEnd(slot) {
go v.UpdateDomainDataCaches(ctx, slot+1)
}
var wg sync.WaitGroup
allRoles, err := v.RolesAt(ctx, slot)

View File

@ -3,6 +3,7 @@ package client
import (
"context"
"github.com/dgraph-io/ristretto"
middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
@ -110,6 +111,7 @@ func (v *ValidatorService) Start() {
grpc_opentracing.UnaryClientInterceptor(),
grpc_prometheus.UnaryClientInterceptor,
grpc_retry.UnaryClientInterceptor(),
logDebugRequestInfoUnaryInterceptor,
)),
}
conn, err := grpc.DialContext(v.ctx, v.endpoint, opts...)
@ -132,6 +134,15 @@ func (v *ValidatorService) Start() {
}
v.conn = conn
cache, err := ristretto.NewCache(&ristretto.Config{
NumCounters: 1280, // number of keys to track.
MaxCost: 128, // maximum cost of cache, 1 item = 1 cost.
BufferItems: 64, // number of keys per Get buffer.
})
if err != nil {
panic(err)
}
v.validator = &validator{
db: valDB,
validatorClient: ethpb.NewBeaconNodeValidatorClient(v.conn),
@ -144,6 +155,7 @@ func (v *ValidatorService) Start() {
emitAccountMetrics: v.emitAccountMetrics,
prevBalance: make(map[[48]byte]uint64),
attLogs: make(map[[32]byte]*attSubmitted),
domainDataCache: cache,
}
go run(v.ctx, v.validator)
}

View File

@ -4,17 +4,23 @@ package client
import (
"context"
"encoding/binary"
"encoding/hex"
"fmt"
"io"
"strconv"
"strings"
"sync"
"time"
"github.com/dgraph-io/ristretto"
"github.com/gogo/protobuf/proto"
ptypes "github.com/gogo/protobuf/types"
"github.com/pkg/errors"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
pb "github.com/prysmaticlabs/prysm/proto/beacon/rpc/v1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/hashutil"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/slotutil"
@ -40,6 +46,8 @@ type validator struct {
emitAccountMetrics bool
attLogs map[[32]byte]*attSubmitted
attLogsLock sync.Mutex
domainDataLock sync.Mutex
domainDataCache *ristretto.Cache
}
// Done cleans up the validator.
@ -330,3 +338,53 @@ func (v *validator) isAggregator(ctx context.Context, committee []uint64, slot u
return binary.LittleEndian.Uint64(b[:8])%modulo == 0, nil
}
// UpdateDomainDataCaches by making calls for all of the possible domain data. These can change when
// the fork version changes which can happen once per epoch. Although changing for the fork version
// is very rare, a validator should check these data every epoch to be sure the validator is
// participating on the correct fork version.
func (v *validator) UpdateDomainDataCaches(ctx context.Context, slot uint64) {
if !featureconfig.Get().EnableDomainDataCache {
return
}
for _, d := range [][]byte{
params.BeaconConfig().DomainRandao,
params.BeaconConfig().DomainBeaconAttester,
params.BeaconConfig().DomainBeaconProposer,
} {
_, err := v.domainData(ctx, helpers.SlotToEpoch(slot), d)
if err != nil {
log.WithError(err).Errorf("Failed to update domain data for domain %v", d)
}
}
}
func (v *validator) domainData(ctx context.Context, epoch uint64, domain []byte) (*ethpb.DomainResponse, error) {
v.domainDataLock.Lock()
defer v.domainDataLock.Unlock()
req := &ethpb.DomainRequest{
Epoch: epoch,
Domain: domain,
}
key := strings.Join([]string{strconv.FormatUint(req.Epoch, 10), hex.EncodeToString(req.Domain)}, ",")
if featureconfig.Get().EnableDomainDataCache {
if val, ok := v.domainDataCache.Get(key); ok {
return proto.Clone(val.(proto.Message)).(*ethpb.DomainResponse), nil
}
}
res, err := v.validatorClient.DomainData(ctx, req)
if err != nil {
return nil, err
}
if featureconfig.Get().EnableDomainDataCache {
v.domainDataCache.Set(key, proto.Clone(res), 1)
}
return res, nil
}

View File

@ -104,10 +104,7 @@ func (v *validator) SubmitAggregateAndProof(ctx context.Context, slot uint64, pu
// This implements selection logic outlined in:
// https://github.com/ethereum/eth2.0-specs/blob/v0.9.3/specs/validator/0_beacon-chain-validator.md#aggregation-selection
func (v *validator) signSlot(ctx context.Context, pubKey [48]byte, slot uint64) ([]byte, error) {
domain, err := v.validatorClient.DomainData(ctx, &ethpb.DomainRequest{
Epoch: helpers.SlotToEpoch(slot),
Domain: params.BeaconConfig().DomainBeaconAttester,
})
domain, err := v.domainData(ctx, helpers.SlotToEpoch(slot), params.BeaconConfig().DomainBeaconAttester)
if err != nil {
return nil, err
}

View File

@ -199,10 +199,7 @@ func (v *validator) duty(pubKey [48]byte) (*ethpb.DutiesResponse_Duty, error) {
// Given validator's public key, this returns the signature of an attestation data.
func (v *validator) signAtt(ctx context.Context, pubKey [48]byte, data *ethpb.AttestationData) ([]byte, error) {
domain, err := v.validatorClient.DomainData(ctx, &ethpb.DomainRequest{
Epoch: data.Target.Epoch,
Domain: params.BeaconConfig().DomainBeaconAttester,
})
domain, err := v.domainData(ctx, data.Target.Epoch, params.BeaconConfig().DomainBeaconAttester)
if err != nil {
return nil, err
}

View File

@ -172,10 +172,8 @@ func (v *validator) ProposeExit(ctx context.Context, exit *ethpb.VoluntaryExit)
// Sign randao reveal with randao domain and private key.
func (v *validator) signRandaoReveal(ctx context.Context, pubKey [48]byte, epoch uint64) ([]byte, error) {
domain, err := v.validatorClient.DomainData(ctx, &ethpb.DomainRequest{
Epoch: epoch,
Domain: params.BeaconConfig().DomainRandao,
})
domain, err := v.domainData(ctx, epoch, params.BeaconConfig().DomainRandao)
if err != nil {
return nil, errors.Wrap(err, "could not get domain data")
}
@ -190,10 +188,7 @@ func (v *validator) signRandaoReveal(ctx context.Context, pubKey [48]byte, epoch
// Sign block with proposer domain and private key.
func (v *validator) signBlock(ctx context.Context, pubKey [48]byte, epoch uint64, b *ethpb.BeaconBlock) ([]byte, error) {
domain, err := v.validatorClient.DomainData(ctx, &ethpb.DomainRequest{
Epoch: epoch,
Domain: params.BeaconConfig().DomainBeaconProposer,
})
domain, err := v.domainData(ctx, epoch, params.BeaconConfig().DomainBeaconProposer)
if err != nil {
return nil, errors.Wrap(err, "could not get domain data")
}