From 0df12261a1c25c5df1bf1aa008e271d6059cfdd3 Mon Sep 17 00:00:00 2001 From: shayzluf Date: Thu, 2 Apr 2020 08:38:23 +0530 Subject: [PATCH] slasher retrieve and cache validator public key (#5220) * cache and retrieval of validator public keys * fix comments * fix comment * fix variables * gaz * ivan feedback fixes * goimports * fix test * comments on in line slice update Co-authored-by: terence tsao --- slasher/beaconclient/BUILD.bazel | 5 ++ slasher/beaconclient/service.go | 14 +++- slasher/beaconclient/validator_retrieval.go | 60 ++++++++++++++ .../beaconclient/validator_retrieval_test.go | 83 +++++++++++++++++++ slasher/cache/BUILD.bazel | 5 +- slasher/cache/validators_cache.go | 71 ++++++++++++++++ slasher/node/BUILD.bazel | 1 + slasher/node/node.go | 6 +- 8 files changed, 240 insertions(+), 5 deletions(-) create mode 100644 slasher/beaconclient/validator_retrieval.go create mode 100644 slasher/beaconclient/validator_retrieval_test.go create mode 100644 slasher/cache/validators_cache.go diff --git a/slasher/beaconclient/BUILD.bazel b/slasher/beaconclient/BUILD.bazel index 408e3849c..499ea9adf 100644 --- a/slasher/beaconclient/BUILD.bazel +++ b/slasher/beaconclient/BUILD.bazel @@ -9,12 +9,14 @@ go_library( "receivers.go", "service.go", "submit.go", + "validator_retrieval.go", ], importpath = "github.com/prysmaticlabs/prysm/slasher/beaconclient", visibility = ["//slasher:__subpackages__"], deps = [ "//shared/event:go_default_library", "//shared/params:go_default_library", + "//slasher/cache:go_default_library", "//slasher/db:go_default_library", "@com_github_gogo_protobuf//types:go_default_library", "@com_github_grpc_ecosystem_go_grpc_middleware//:go_default_library", @@ -40,6 +42,7 @@ go_test( "receivers_test.go", "service_test.go", "submit_test.go", + "validator_retrieval_test.go", ], embed = [":go_default_library"], deps = [ @@ -47,11 +50,13 @@ go_test( "//shared/mock:go_default_library", "//shared/params:go_default_library", "//shared/testutil:go_default_library", + "//slasher/cache:go_default_library", "//slasher/db/testing:go_default_library", "@com_github_gogo_protobuf//proto:go_default_library", "@com_github_gogo_protobuf//types:go_default_library", "@com_github_golang_mock//gomock:go_default_library", "@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library", + "@com_github_sirupsen_logrus//:go_default_library", "@com_github_sirupsen_logrus//hooks/test:go_default_library", ], ) diff --git a/slasher/beaconclient/service.go b/slasher/beaconclient/service.go index c38f81bbf..243563bc9 100644 --- a/slasher/beaconclient/service.go +++ b/slasher/beaconclient/service.go @@ -7,13 +7,14 @@ package beaconclient import ( "context" - "errors" middleware "github.com/grpc-ecosystem/go-grpc-middleware" grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "github.com/pkg/errors" ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" "github.com/prysmaticlabs/prysm/shared/event" + "github.com/prysmaticlabs/prysm/slasher/cache" "github.com/prysmaticlabs/prysm/slasher/db" "github.com/sirupsen/logrus" "go.opencensus.io/plugin/ocgrpc" @@ -56,6 +57,7 @@ type Service struct { proposerSlashingsFeed *event.Feed receivedAttestationsBuffer chan *ethpb.IndexedAttestation collectedAttestationsBuffer chan []*ethpb.IndexedAttestation + publicKeyCache *cache.PublicKeyCache } // Config options for the beaconclient service. @@ -68,8 +70,13 @@ type Config struct { } // NewBeaconClientService instantiation. -func NewBeaconClientService(ctx context.Context, cfg *Config) *Service { +func NewBeaconClientService(ctx context.Context, cfg *Config) (*Service, error) { ctx, cancel := context.WithCancel(ctx) + publicKeyCache, err := cache.NewPublicKeyCache(0, nil) + if err != nil { + return nil, errors.Wrap(err, "could not create new cache") + } + return &Service{ cert: cfg.BeaconCert, ctx: ctx, @@ -85,7 +92,8 @@ func NewBeaconClientService(ctx context.Context, cfg *Config) *Service { proposerSlashingsFeed: cfg.ProposerSlashingsFeed, receivedAttestationsBuffer: make(chan *ethpb.IndexedAttestation, 1), collectedAttestationsBuffer: make(chan []*ethpb.IndexedAttestation, 1), - } + publicKeyCache: publicKeyCache, + }, nil } // BlockFeed returns a feed other services in slasher can subscribe to diff --git a/slasher/beaconclient/validator_retrieval.go b/slasher/beaconclient/validator_retrieval.go new file mode 100644 index 000000000..fb75b1a34 --- /dev/null +++ b/slasher/beaconclient/validator_retrieval.go @@ -0,0 +1,60 @@ +package beaconclient + +import ( + "context" + + "github.com/pkg/errors" + ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" + "go.opencensus.io/trace" +) + +// FindOrGetPublicKeys gets public keys from cache or request validators public +// keys from a beacon node via gRPC. +func (bs *Service) FindOrGetPublicKeys( + ctx context.Context, + validatorIndices []uint64, +) (map[uint64][]byte, error) { + ctx, span := trace.StartSpan(ctx, "beaconclient.FindOrGetPublicKeys") + defer span.End() + + validators := make(map[uint64][]byte, len(validatorIndices)) + notFound := 0 + for _, validatorIdx := range validatorIndices { + pub, exists := bs.publicKeyCache.Get(validatorIdx) + if exists { + validators[validatorIdx] = pub + continue + } + // inline removal of cached elements from slice + validatorIndices[notFound] = validatorIdx + notFound++ + } + // trim the slice to its new size + validatorIndices = validatorIndices[:notFound] + + if len(validators) > 0 { + log.Tracef( + "Retrieved validators public keys from cache: %v", + validators, + ) + } + + if notFound == 0 { + return validators, nil + } + vc, err := bs.beaconClient.ListValidators(ctx, ðpb.ListValidatorsRequest{ + Indices: validatorIndices, + }) + if err != nil { + return nil, errors.Wrapf(err, "could not request validators public key: %d", validatorIndices) + } + for _, v := range vc.ValidatorList { + validators[v.Index] = v.Validator.PublicKey + bs.publicKeyCache.Set(v.Index, v.Validator.PublicKey) + } + log.Tracef( + "Retrieved validators id public key map: %v", + validators, + ) + return validators, nil +} diff --git a/slasher/beaconclient/validator_retrieval_test.go b/slasher/beaconclient/validator_retrieval_test.go new file mode 100644 index 000000000..f1cdaafe0 --- /dev/null +++ b/slasher/beaconclient/validator_retrieval_test.go @@ -0,0 +1,83 @@ +package beaconclient + +import ( + "bytes" + "context" + "testing" + + "github.com/golang/mock/gomock" + ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" + "github.com/prysmaticlabs/prysm/shared/mock" + "github.com/prysmaticlabs/prysm/shared/testutil" + "github.com/prysmaticlabs/prysm/slasher/cache" + "github.com/sirupsen/logrus" + logTest "github.com/sirupsen/logrus/hooks/test" +) + +func TestService_RequestValidator(t *testing.T) { + hook := logTest.NewGlobal() + logrus.SetLevel(logrus.TraceLevel) + ctrl := gomock.NewController(t) + defer ctrl.Finish() + client := mock.NewMockBeaconChainClient(ctrl) + validatorCache, err := cache.NewPublicKeyCache(0, nil) + if err != nil { + t.Fatalf("could not create new cache: %v", err) + } + bs := Service{ + beaconClient: client, + publicKeyCache: validatorCache, + } + wanted := ðpb.Validators{ + ValidatorList: []*ethpb.Validators_ValidatorContainer{ + { + Index: 0, Validator: ðpb.Validator{PublicKey: []byte{1, 2, 3}}, + }, + { + Index: 1, Validator: ðpb.Validator{PublicKey: []byte{2, 4, 5}}, + }, + }, + } + wanted2 := ðpb.Validators{ + ValidatorList: []*ethpb.Validators_ValidatorContainer{ + { + Index: 3, Validator: ðpb.Validator{PublicKey: []byte{3, 4, 5}}, + }, + }, + } + client.EXPECT().ListValidators( + gomock.Any(), + gomock.Any(), + ).Return(wanted, nil) + + client.EXPECT().ListValidators( + gomock.Any(), + gomock.Any(), + ).Return(wanted2, nil) + + // We request public key of validator id 0,1. + res, err := bs.FindOrGetPublicKeys(context.Background(), []uint64{0, 1}) + if err != nil { + t.Fatal(err) + } + for i, v := range wanted.ValidatorList { + if !bytes.Equal(res[v.Index], wanted.ValidatorList[i].Validator.PublicKey) { + t.Errorf("Wanted %v, received %v", wanted, res) + } + } + + testutil.AssertLogsContain(t, hook, "Retrieved validators id public key map:") + testutil.AssertLogsDoNotContain(t, hook, "Retrieved validators public keys from cache:") + // We expect public key of validator id 0 to be in cache. + res, err = bs.FindOrGetPublicKeys(context.Background(), []uint64{0, 3}) + if err != nil { + t.Fatal(err) + } + + for i, v := range wanted2.ValidatorList { + if !bytes.Equal(res[v.Index], wanted2.ValidatorList[i].Validator.PublicKey) { + t.Errorf("Wanted %v, received %v", wanted2, res) + } + } + testutil.AssertLogsContain(t, hook, "Retrieved validators public keys from cache: map[0:[1 2 3]]") +} diff --git a/slasher/cache/BUILD.bazel b/slasher/cache/BUILD.bazel index 120f50e7f..690579281 100644 --- a/slasher/cache/BUILD.bazel +++ b/slasher/cache/BUILD.bazel @@ -2,7 +2,10 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( name = "go_default_library", - srcs = ["span_cache.go"], + srcs = [ + "span_cache.go", + "validators_cache.go", + ], importpath = "github.com/prysmaticlabs/prysm/slasher/cache", visibility = ["//slasher:__subpackages__"], deps = [ diff --git a/slasher/cache/validators_cache.go b/slasher/cache/validators_cache.go new file mode 100644 index 000000000..9868e98a5 --- /dev/null +++ b/slasher/cache/validators_cache.go @@ -0,0 +1,71 @@ +package cache + +import ( + lru "github.com/hashicorp/golang-lru" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var ( + // validatorsCacheSize defines the max number of validators public keys the cache can hold. + validatorsCacheSize = 300000 + // Metrics for the validator cache. + validatorsCacheHit = promauto.NewCounter(prometheus.CounterOpts{ + Name: "validators_cache_hit", + Help: "The total number of cache hits on the validators cache.", + }) + validatorsCacheMiss = promauto.NewCounter(prometheus.CounterOpts{ + Name: "validators_cache_miss", + Help: "The total number of cache misses on the validators cache.", + }) +) + +// PublicKeyCache is used to store the public keys needed for signature verification. +type PublicKeyCache struct { + cache *lru.Cache +} + +// NewPublicKeyCache initializes the cache. +func NewPublicKeyCache(size int, onEvicted func(key interface{}, value interface{})) (*PublicKeyCache, error) { + if size != 0 { + validatorsCacheSize = size + } + cache, err := lru.NewWithEvict(validatorsCacheSize, onEvicted) + if err != nil { + return nil, err + } + return &PublicKeyCache{cache: cache}, nil +} + +// Get returns an ok bool and the cached value for the requested validator id key, if any. +func (c *PublicKeyCache) Get(validatorIdx uint64) ([]byte, bool) { + item, exists := c.cache.Get(validatorIdx) + if exists && item != nil { + validatorsCacheHit.Inc() + return item.([]byte), true + } + + validatorsCacheMiss.Inc() + return nil, false +} + +// Set the response in the cache. +func (c *PublicKeyCache) Set(validatorIdx uint64, publicKey []byte) { + _ = c.cache.Add(validatorIdx, publicKey) +} + +// Delete removes a validator id from the cache and returns if it existed or not. +// Performs the onEviction function before removal. +func (c *PublicKeyCache) Delete(validatorIdx uint64) bool { + return c.cache.Remove(validatorIdx) +} + +// Has returns true if the key exists in the cache. +func (c *PublicKeyCache) Has(validatorIdx uint64) bool { + return c.cache.Contains(validatorIdx) +} + +// Clear removes all keys from the ValidatorCache. +func (c *PublicKeyCache) Clear() { + c.cache.Purge() +} diff --git a/slasher/node/BUILD.bazel b/slasher/node/BUILD.bazel index df116580d..0e38f4fba 100644 --- a/slasher/node/BUILD.bazel +++ b/slasher/node/BUILD.bazel @@ -18,6 +18,7 @@ go_library( "//slasher/detection:go_default_library", "//slasher/flags:go_default_library", "//slasher/rpc:go_default_library", + "@com_github_pkg_errors//:go_default_library", "@com_github_sirupsen_logrus//:go_default_library", "@in_gopkg_urfave_cli_v2//:go_default_library", ], diff --git a/slasher/node/node.go b/slasher/node/node.go index 02fc9ee59..93eb2efd5 100644 --- a/slasher/node/node.go +++ b/slasher/node/node.go @@ -9,6 +9,7 @@ import ( "sync" "syscall" + "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/shared" "github.com/prysmaticlabs/prysm/shared/cmd" "github.com/prysmaticlabs/prysm/shared/debug" @@ -178,13 +179,16 @@ func (s *SlasherNode) registerBeaconClientService(ctx *cli.Context) error { beaconProvider = flags.BeaconRPCProviderFlag.Value } - bs := beaconclient.NewBeaconClientService(context.Background(), &beaconclient.Config{ + bs, err := beaconclient.NewBeaconClientService(context.Background(), &beaconclient.Config{ BeaconCert: beaconCert, SlasherDB: s.db, BeaconProvider: beaconProvider, AttesterSlashingsFeed: s.attesterSlashingsFeed, ProposerSlashingsFeed: s.proposerSlashingsFeed, }) + if err != nil { + return errors.Wrap(err, "failed to initialize beacon client") + } return s.services.RegisterService(bs) }