From d3375d98a8849cb37032af0c36b1046cc70da01e Mon Sep 17 00:00:00 2001 From: Preston Van Loon Date: Thu, 5 Dec 2019 18:05:58 -0800 Subject: [PATCH] Kafka exporter (#3840) * abstract db interface, kafka build, work in progress * checkpoint * Merge branch 'master' of github.com:prysmaticlabs/prysm into es-exporter * feature flag * move passthrough * flag change * gofmt * Merge branch 'master' of github.com:prysmaticlabs/prysm into es-exporter * missing db methods * Merge branch 'master' of github.com:prysmaticlabs/prysm into es-exporter * fix interface * Merge branch 'master' of github.com:prysmaticlabs/prysm into es-exporter * Merge branch 'master' of github.com:prysmaticlabs/prysm into es-exporter * Merge branch 'master' of github.com:prysmaticlabs/prysm into es-exporter * try using cmake built from source * lint godocs * lint godocs * lint godocs * Update BUILD.bazel * Merge branch 'master' into es-exporter * Merge branch 'master' into es-exporter * Merge branch 'master' into es-exporter * Merge branch 'master' of github.com:prysmaticlabs/prysm into es-exporter * gaz --- BUILD.bazel | 6 + WORKSPACE | 31 ++ beacon-chain/db/BUILD.bazel | 6 +- beacon-chain/db/db.go | 88 +----- beacon-chain/db/iface/BUILD.bazel | 15 + beacon-chain/db/iface/interface.go | 86 ++++++ beacon-chain/db/kafka/BUILD.bazel | 26 ++ beacon-chain/db/kafka/export_wrapper.go | 124 ++++++++ beacon-chain/db/kafka/passthrough.go | 280 ++++++++++++++++++ beacon-chain/db/kv/BUILD.bazel | 1 + beacon-chain/db/kv/kv.go | 3 + shared/featureconfig/config.go | 21 +- shared/featureconfig/flags.go | 5 + ...g_confluentinc_confluent_kafka_go_v1.patch | 10 + third_party/kafka/BUILD.bazel | 20 ++ 15 files changed, 631 insertions(+), 91 deletions(-) create mode 100644 beacon-chain/db/iface/BUILD.bazel create mode 100644 beacon-chain/db/iface/interface.go create mode 100644 beacon-chain/db/kafka/BUILD.bazel create mode 100644 beacon-chain/db/kafka/export_wrapper.go create mode 100644 beacon-chain/db/kafka/passthrough.go create mode 100644 third_party/in_gopkg_confluentinc_confluent_kafka_go_v1.patch create mode 100644 third_party/kafka/BUILD.bazel diff --git a/BUILD.bazel b/BUILD.bazel index f78e7b902..42ba48c44 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -137,3 +137,9 @@ common_files = { ), tags = ["manual"], ) for pair in binary_targets] + +toolchain( + name = "built_cmake_toolchain", + toolchain = "@rules_foreign_cc//tools/build_defs/native_tools:built_cmake", + toolchain_type = "@rules_foreign_cc//tools/build_defs:cmake_toolchain", +) diff --git a/WORKSPACE b/WORKSPACE index 622d2605b..c36fbe36b 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -217,6 +217,28 @@ load("@com_google_protobuf//:protobuf_deps.bzl", "protobuf_deps") protobuf_deps() +# Group the sources of the library so that CMake rule have access to it +all_content = """filegroup(name = "all", srcs = glob(["**"]), visibility = ["//visibility:public"])""" + +http_archive( + name = "rules_foreign_cc", + strip_prefix = "rules_foreign_cc-master", + url = "https://github.com/bazelbuild/rules_foreign_cc/archive/master.zip", +) + +load("@rules_foreign_cc//:workspace_definitions.bzl", "rules_foreign_cc_dependencies") + +rules_foreign_cc_dependencies([ + "@prysm//:built_cmake_toolchain", +]) + +http_archive( + name = "librdkafka", + build_file_content = all_content, + strip_prefix = "librdkafka-1.2.1", + urls = ["https://github.com/edenhill/librdkafka/archive/v1.2.1.tar.gz"], +) + # External dependencies go_repository( @@ -1288,6 +1310,15 @@ go_repository( version = "v0.0.0-20191002040644-a1355ae1e2c3", ) +go_repository( + name = "in_gopkg_confluentinc_confluent_kafka_go_v1", + importpath = "gopkg.in/confluentinc/confluent-kafka-go.v1", + patch_args = ["-p1"], + patches = ["//third_party:in_gopkg_confluentinc_confluent_kafka_go_v1.patch"], + sum = "h1:roy97m/3wj9/o8OuU3sZ5wildk30ep38k2x8nhNbKrI=", + version = "v1.1.0", +) + go_repository( name = "com_github_naoina_toml", importpath = "github.com/naoina/toml", diff --git a/beacon-chain/db/BUILD.bazel b/beacon-chain/db/BUILD.bazel index 51c136f9f..c75c71b9a 100644 --- a/beacon-chain/db/BUILD.bazel +++ b/beacon-chain/db/BUILD.bazel @@ -12,11 +12,9 @@ go_library( "//tools:__subpackages__", ], deps = [ - "//beacon-chain/db/filters:go_default_library", + "//beacon-chain/db/iface:go_default_library", + "//beacon-chain/db/kafka:go_default_library", "//beacon-chain/db/kv:go_default_library", - "//proto/beacon/p2p/v1:go_default_library", - "@com_github_ethereum_go_ethereum//common:go_default_library", - "@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library", "@com_github_sirupsen_logrus//:go_default_library", ], ) diff --git a/beacon-chain/db/db.go b/beacon-chain/db/db.go index 0f0c6b353..ce8faebf9 100644 --- a/beacon-chain/db/db.go +++ b/beacon-chain/db/db.go @@ -1,91 +1,21 @@ package db import ( - "context" - "io" - - "github.com/ethereum/go-ethereum/common" - ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" - "github.com/prysmaticlabs/prysm/beacon-chain/db/filters" + "github.com/prysmaticlabs/prysm/beacon-chain/db/iface" + "github.com/prysmaticlabs/prysm/beacon-chain/db/kafka" "github.com/prysmaticlabs/prysm/beacon-chain/db/kv" - pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" ) // Database defines the necessary methods for Prysm's eth2 backend which may // be implemented by any key-value or relational database in practice. -type Database interface { - io.Closer - DatabasePath() string - ClearDB() error - // Backup and restore methods - Backup(ctx context.Context) error - // Attestation related methods. - AttestationsByDataRoot(ctx context.Context, attDataRoot [32]byte) ([]*ethpb.Attestation, error) - Attestations(ctx context.Context, f *filters.QueryFilter) ([]*ethpb.Attestation, error) - HasAttestation(ctx context.Context, attDataRoot [32]byte) bool - DeleteAttestation(ctx context.Context, attDataRoot [32]byte) error - DeleteAttestations(ctx context.Context, attDataRoots [][32]byte) error - SaveAttestation(ctx context.Context, att *ethpb.Attestation) error - SaveAttestations(ctx context.Context, atts []*ethpb.Attestation) error - // Block related methods. - Block(ctx context.Context, blockRoot [32]byte) (*ethpb.BeaconBlock, error) - HeadBlock(ctx context.Context) (*ethpb.BeaconBlock, error) - Blocks(ctx context.Context, f *filters.QueryFilter) ([]*ethpb.BeaconBlock, error) - BlockRoots(ctx context.Context, f *filters.QueryFilter) ([][32]byte, error) - HasBlock(ctx context.Context, blockRoot [32]byte) bool - DeleteBlock(ctx context.Context, blockRoot [32]byte) error - DeleteBlocks(ctx context.Context, blockRoots [][32]byte) error - SaveBlock(ctx context.Context, block *ethpb.BeaconBlock) error - SaveBlocks(ctx context.Context, blocks []*ethpb.BeaconBlock) error - SaveHeadBlockRoot(ctx context.Context, blockRoot [32]byte) error - SaveGenesisBlockRoot(ctx context.Context, blockRoot [32]byte) error - IsFinalizedBlock(ctx context.Context, blockRoot [32]byte) bool - // Validator related methods. - ValidatorIndex(ctx context.Context, publicKey [48]byte) (uint64, bool, error) - HasValidatorIndex(ctx context.Context, publicKey [48]byte) bool - DeleteValidatorIndex(ctx context.Context, publicKey [48]byte) error - SaveValidatorIndex(ctx context.Context, publicKey [48]byte, validatorIdx uint64) error - // State related methods. - State(ctx context.Context, blockRoot [32]byte) (*pb.BeaconState, error) - HeadState(ctx context.Context) (*pb.BeaconState, error) - GenesisState(ctx context.Context) (*pb.BeaconState, error) - SaveState(ctx context.Context, state *pb.BeaconState, blockRoot [32]byte) error - DeleteState(ctx context.Context, blockRoot [32]byte) error - DeleteStates(ctx context.Context, blockRoots [][32]byte) error - // Slashing operations. - ProposerSlashing(ctx context.Context, slashingRoot [32]byte) (*ethpb.ProposerSlashing, error) - AttesterSlashing(ctx context.Context, slashingRoot [32]byte) (*ethpb.AttesterSlashing, error) - SaveProposerSlashing(ctx context.Context, slashing *ethpb.ProposerSlashing) error - SaveAttesterSlashing(ctx context.Context, slashing *ethpb.AttesterSlashing) error - HasProposerSlashing(ctx context.Context, slashingRoot [32]byte) bool - HasAttesterSlashing(ctx context.Context, slashingRoot [32]byte) bool - DeleteProposerSlashing(ctx context.Context, slashingRoot [32]byte) error - DeleteAttesterSlashing(ctx context.Context, slashingRoot [32]byte) error - // Block operations. - VoluntaryExit(ctx context.Context, exitRoot [32]byte) (*ethpb.VoluntaryExit, error) - SaveVoluntaryExit(ctx context.Context, exit *ethpb.VoluntaryExit) error - HasVoluntaryExit(ctx context.Context, exitRoot [32]byte) bool - DeleteVoluntaryExit(ctx context.Context, exitRoot [32]byte) error - // Checkpoint operations. - JustifiedCheckpoint(ctx context.Context) (*ethpb.Checkpoint, error) - FinalizedCheckpoint(ctx context.Context) (*ethpb.Checkpoint, error) - SaveJustifiedCheckpoint(ctx context.Context, checkpoint *ethpb.Checkpoint) error - SaveFinalizedCheckpoint(ctx context.Context, checkpoint *ethpb.Checkpoint) error - // Archival data handlers for storing/retrieving historical beacon node information. - ArchivedActiveValidatorChanges(ctx context.Context, epoch uint64) (*pb.ArchivedActiveSetChanges, error) - SaveArchivedActiveValidatorChanges(ctx context.Context, epoch uint64, changes *pb.ArchivedActiveSetChanges) error - ArchivedCommitteeInfo(ctx context.Context, epoch uint64) (*pb.ArchivedCommitteeInfo, error) - SaveArchivedCommitteeInfo(ctx context.Context, epoch uint64, info *pb.ArchivedCommitteeInfo) error - ArchivedBalances(ctx context.Context, epoch uint64) ([]uint64, error) - SaveArchivedBalances(ctx context.Context, epoch uint64, balances []uint64) error - ArchivedValidatorParticipation(ctx context.Context, epoch uint64) (*ethpb.ValidatorParticipation, error) - SaveArchivedValidatorParticipation(ctx context.Context, epoch uint64, part *ethpb.ValidatorParticipation) error - // Deposit contract related handlers. - DepositContractAddress(ctx context.Context) ([]byte, error) - SaveDepositContractAddress(ctx context.Context, addr common.Address) error -} +type Database = iface.Database // NewDB initializes a new DB. func NewDB(dirPath string) (Database, error) { - return kv.NewKVStore(dirPath) + db, err := kv.NewKVStore(dirPath) + if err != nil { + return nil, err + } + + return kafka.Wrap(db) } diff --git a/beacon-chain/db/iface/BUILD.bazel b/beacon-chain/db/iface/BUILD.bazel new file mode 100644 index 000000000..2caad3d3c --- /dev/null +++ b/beacon-chain/db/iface/BUILD.bazel @@ -0,0 +1,15 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = ["interface.go"], + importpath = "github.com/prysmaticlabs/prysm/beacon-chain/db/iface", + # Other packages must use github.com/prysmaticlabs/prysm/beacon-chain/db.Database alias. + visibility = ["//beacon-chain/db:__subpackages__"], + deps = [ + "//beacon-chain/db/filters:go_default_library", + "//proto/beacon/p2p/v1:go_default_library", + "@com_github_ethereum_go_ethereum//common:go_default_library", + "@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library", + ], +) diff --git a/beacon-chain/db/iface/interface.go b/beacon-chain/db/iface/interface.go new file mode 100644 index 000000000..ba30be6b5 --- /dev/null +++ b/beacon-chain/db/iface/interface.go @@ -0,0 +1,86 @@ +// Package iface exists to prevent circular dependencies when implementing the database interface. +package iface + +import ( + "context" + "io" + + "github.com/ethereum/go-ethereum/common" + "github.com/prysmaticlabs/prysm/beacon-chain/db/filters" + "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" + eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" +) + +// Database defines the necessary methods for Prysm's eth2 backend which may +// be implemented by any key-value or relational database in practice. +type Database interface { + io.Closer + DatabasePath() string + ClearDB() error + // Backup and restore methods + Backup(ctx context.Context) error + // Attestation related methods. + AttestationsByDataRoot(ctx context.Context, attDataRoot [32]byte) ([]*eth.Attestation, error) + Attestations(ctx context.Context, f *filters.QueryFilter) ([]*eth.Attestation, error) + HasAttestation(ctx context.Context, attDataRoot [32]byte) bool + DeleteAttestation(ctx context.Context, attDataRoot [32]byte) error + DeleteAttestations(ctx context.Context, attDataRoots [][32]byte) error + SaveAttestation(ctx context.Context, att *eth.Attestation) error + SaveAttestations(ctx context.Context, atts []*eth.Attestation) error + // Block related methods. + Block(ctx context.Context, blockRoot [32]byte) (*eth.BeaconBlock, error) + HeadBlock(ctx context.Context) (*eth.BeaconBlock, error) + Blocks(ctx context.Context, f *filters.QueryFilter) ([]*eth.BeaconBlock, error) + BlockRoots(ctx context.Context, f *filters.QueryFilter) ([][32]byte, error) + HasBlock(ctx context.Context, blockRoot [32]byte) bool + DeleteBlock(ctx context.Context, blockRoot [32]byte) error + DeleteBlocks(ctx context.Context, blockRoots [][32]byte) error + SaveBlock(ctx context.Context, block *eth.BeaconBlock) error + SaveBlocks(ctx context.Context, blocks []*eth.BeaconBlock) error + SaveHeadBlockRoot(ctx context.Context, blockRoot [32]byte) error + SaveGenesisBlockRoot(ctx context.Context, blockRoot [32]byte) error + IsFinalizedBlock(ctx context.Context, blockRoot [32]byte) bool + // Validator related methods. + ValidatorIndex(ctx context.Context, publicKey [48]byte) (uint64, bool, error) + HasValidatorIndex(ctx context.Context, publicKey [48]byte) bool + DeleteValidatorIndex(ctx context.Context, publicKey [48]byte) error + SaveValidatorIndex(ctx context.Context, publicKey [48]byte, validatorIdx uint64) error + // State related methods. + State(ctx context.Context, blockRoot [32]byte) (*ethereum_beacon_p2p_v1.BeaconState, error) + HeadState(ctx context.Context) (*ethereum_beacon_p2p_v1.BeaconState, error) + GenesisState(ctx context.Context) (*ethereum_beacon_p2p_v1.BeaconState, error) + SaveState(ctx context.Context, state *ethereum_beacon_p2p_v1.BeaconState, blockRoot [32]byte) error + DeleteState(ctx context.Context, blockRoot [32]byte) error + DeleteStates(ctx context.Context, blockRoots [][32]byte) error + // Slashing operations. + ProposerSlashing(ctx context.Context, slashingRoot [32]byte) (*eth.ProposerSlashing, error) + AttesterSlashing(ctx context.Context, slashingRoot [32]byte) (*eth.AttesterSlashing, error) + SaveProposerSlashing(ctx context.Context, slashing *eth.ProposerSlashing) error + SaveAttesterSlashing(ctx context.Context, slashing *eth.AttesterSlashing) error + HasProposerSlashing(ctx context.Context, slashingRoot [32]byte) bool + HasAttesterSlashing(ctx context.Context, slashingRoot [32]byte) bool + DeleteProposerSlashing(ctx context.Context, slashingRoot [32]byte) error + DeleteAttesterSlashing(ctx context.Context, slashingRoot [32]byte) error + // Block operations. + VoluntaryExit(ctx context.Context, exitRoot [32]byte) (*eth.VoluntaryExit, error) + SaveVoluntaryExit(ctx context.Context, exit *eth.VoluntaryExit) error + HasVoluntaryExit(ctx context.Context, exitRoot [32]byte) bool + DeleteVoluntaryExit(ctx context.Context, exitRoot [32]byte) error + // Checkpoint operations. + JustifiedCheckpoint(ctx context.Context) (*eth.Checkpoint, error) + FinalizedCheckpoint(ctx context.Context) (*eth.Checkpoint, error) + SaveJustifiedCheckpoint(ctx context.Context, checkpoint *eth.Checkpoint) error + SaveFinalizedCheckpoint(ctx context.Context, checkpoint *eth.Checkpoint) error + // Archival data handlers for storing/retrieving historical beacon node information. + ArchivedActiveValidatorChanges(ctx context.Context, epoch uint64) (*ethereum_beacon_p2p_v1.ArchivedActiveSetChanges, error) + SaveArchivedActiveValidatorChanges(ctx context.Context, epoch uint64, changes *ethereum_beacon_p2p_v1.ArchivedActiveSetChanges) error + ArchivedCommitteeInfo(ctx context.Context, epoch uint64) (*ethereum_beacon_p2p_v1.ArchivedCommitteeInfo, error) + SaveArchivedCommitteeInfo(ctx context.Context, epoch uint64, info *ethereum_beacon_p2p_v1.ArchivedCommitteeInfo) error + ArchivedBalances(ctx context.Context, epoch uint64) ([]uint64, error) + SaveArchivedBalances(ctx context.Context, epoch uint64, balances []uint64) error + ArchivedValidatorParticipation(ctx context.Context, epoch uint64) (*eth.ValidatorParticipation, error) + SaveArchivedValidatorParticipation(ctx context.Context, epoch uint64, part *eth.ValidatorParticipation) error + // Deposit contract related handlers. + DepositContractAddress(ctx context.Context) ([]byte, error) + SaveDepositContractAddress(ctx context.Context, addr common.Address) error +} diff --git a/beacon-chain/db/kafka/BUILD.bazel b/beacon-chain/db/kafka/BUILD.bazel new file mode 100644 index 000000000..215b66d96 --- /dev/null +++ b/beacon-chain/db/kafka/BUILD.bazel @@ -0,0 +1,26 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = [ + "export_wrapper.go", + "passthrough.go", + ], + importpath = "github.com/prysmaticlabs/prysm/beacon-chain/db/kafka", + visibility = ["//beacon-chain/db:__pkg__"], + deps = [ + "//beacon-chain/db/filters:go_default_library", + "//beacon-chain/db/iface:go_default_library", + "//proto/beacon/p2p/v1:go_default_library", + "//shared/featureconfig:go_default_library", + "//shared/traceutil:go_default_library", + "@com_github_ethereum_go_ethereum//common:go_default_library", + "@com_github_golang_protobuf//jsonpb:go_default_library_gen", + "@com_github_golang_protobuf//proto:go_default_library", + "@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library", + "@com_github_prysmaticlabs_go_ssz//:go_default_library", + "@com_github_sirupsen_logrus//:go_default_library", + "@in_gopkg_confluentinc_confluent_kafka_go_v1//kafka:go_default_library", + "@io_opencensus_go//trace:go_default_library", + ], +) diff --git a/beacon-chain/db/kafka/export_wrapper.go b/beacon-chain/db/kafka/export_wrapper.go new file mode 100644 index 000000000..c92d5b651 --- /dev/null +++ b/beacon-chain/db/kafka/export_wrapper.go @@ -0,0 +1,124 @@ +package kafka + +import ( + "bytes" + "context" + + "github.com/golang/protobuf/jsonpb" + "github.com/golang/protobuf/proto" + "github.com/prysmaticlabs/go-ssz" + "github.com/prysmaticlabs/prysm/beacon-chain/db/iface" + eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" + "github.com/prysmaticlabs/prysm/shared/featureconfig" + "github.com/prysmaticlabs/prysm/shared/traceutil" + "github.com/sirupsen/logrus" + "go.opencensus.io/trace" + "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka" +) + +var _ = iface.Database(&Exporter{}) +var log = logrus.WithField("prefix", "exporter") +var marshaler = &jsonpb.Marshaler{} + +// Exporter wraps a database interface and exports certain objects to kafka topics. +type Exporter struct { + db iface.Database + p *kafka.Producer +} + +// Wrap the db with kafka exporter. If the feature flag is not enabled, this service does not wrap +// the database, but returns the underlying database pointer itself. +func Wrap(db iface.Database) (iface.Database, error) { + if featureconfig.Get().KafkaBootstrapServers == "" { + return db, nil + } + + p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": featureconfig.Get().KafkaBootstrapServers}) + if err != nil { + return nil, err + } + + return &Exporter{db: db, p: p}, nil +} + +func (e Exporter) publish(ctx context.Context, topic string, msg proto.Message) error { + ctx, span := trace.StartSpan(ctx, "kafka.publish") + defer span.End() + + buf := bytes.NewBuffer(nil) + if err := marshaler.Marshal(buf, msg); err != nil { + traceutil.AnnotateError(span, err) + return err + } + + key, err := ssz.HashTreeRoot(msg) + if err != nil { + traceutil.AnnotateError(span, err) + return err + } + + if err := e.p.Produce(&kafka.Message{ + TopicPartition: kafka.TopicPartition{ + Topic: &topic, + }, + Value: buf.Bytes(), + Key: key[:], + }, nil); err != nil { + traceutil.AnnotateError(span, err) + return err + } + return nil +} + +// Close closes kafka producer and underlying db. +func (e Exporter) Close() error { + e.p.Close() + return e.db.Close() +} + +// SaveAttestation publishes to the kafka topic for attestations. +func (e Exporter) SaveAttestation(ctx context.Context, att *eth.Attestation) error { + go func() { + if err := e.publish(ctx, "beacon_attestation", att); err != nil { + log.WithError(err).Error("Failed to publish attestation") + } + }() + + return e.db.SaveAttestation(ctx, att) +} + +// SaveAttestations publishes to the kafka topic for beacon attestations. +func (e Exporter) SaveAttestations(ctx context.Context, atts []*eth.Attestation) error { + go func() { + for _, att := range atts { + if err := e.publish(ctx, "beacon_attestation", att); err != nil { + log.WithError(err).Error("Failed to publish attestation") + } + } + }() + return e.db.SaveAttestations(ctx, atts) +} + +// SaveBlock publishes to the kafka topic for beacon blocks. +func (e Exporter) SaveBlock(ctx context.Context, block *eth.BeaconBlock) error { + go func() { + if err := e.publish(ctx, "beacon_block", block); err != nil { + log.WithError(err).Error("Failed to publish block") + } + }() + + return e.db.SaveBlock(ctx, block) +} + +// SaveBlocks publishes to the kafka topic for beacon blocks. +func (e Exporter) SaveBlocks(ctx context.Context, blocks []*eth.BeaconBlock) error { + go func() { + for _, block := range blocks { + if err := e.publish(ctx, "beacon_block", block); err != nil { + log.WithError(err).Error("Failed to publish block") + } + } + }() + + return e.db.SaveBlocks(ctx, blocks) +} diff --git a/beacon-chain/db/kafka/passthrough.go b/beacon-chain/db/kafka/passthrough.go new file mode 100644 index 000000000..3ca1abc29 --- /dev/null +++ b/beacon-chain/db/kafka/passthrough.go @@ -0,0 +1,280 @@ +package kafka + +import ( + "context" + + "github.com/ethereum/go-ethereum/common" + "github.com/prysmaticlabs/prysm/beacon-chain/db/filters" + ethereum_beacon_p2p_v1 "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" + eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" +) + +// DatabasePath -- passthrough. +func (e Exporter) DatabasePath() string { + return e.db.DatabasePath() +} + +// ClearDB -- passthrough. +func (e Exporter) ClearDB() error { + return e.db.ClearDB() +} + +// Backup -- passthrough. +func (e Exporter) Backup(ctx context.Context) error { + return e.db.Backup(ctx) +} + +// AttestationsByDataRoot -- passthrough. +func (e Exporter) AttestationsByDataRoot(ctx context.Context, attDataRoot [32]byte) ([]*eth.Attestation, error) { + return e.db.AttestationsByDataRoot(ctx, attDataRoot) +} + +// Attestations -- passthrough. +func (e Exporter) Attestations(ctx context.Context, f *filters.QueryFilter) ([]*eth.Attestation, error) { + return e.db.Attestations(ctx, f) +} + +// HasAttestation -- passthrough. +func (e Exporter) HasAttestation(ctx context.Context, attDataRoot [32]byte) bool { + return e.db.HasAttestation(ctx, attDataRoot) +} + +// DeleteAttestation -- passthrough. +func (e Exporter) DeleteAttestation(ctx context.Context, attDataRoot [32]byte) error { + return e.db.DeleteAttestation(ctx, attDataRoot) +} + +// DeleteAttestations -- passthrough. +func (e Exporter) DeleteAttestations(ctx context.Context, attDataRoots [][32]byte) error { + return e.db.DeleteAttestations(ctx, attDataRoots) +} + +// Block -- passthrough. +func (e Exporter) Block(ctx context.Context, blockRoot [32]byte) (*eth.BeaconBlock, error) { + return e.db.Block(ctx, blockRoot) +} + +// HeadBlock -- passthrough. +func (e Exporter) HeadBlock(ctx context.Context) (*eth.BeaconBlock, error) { + return e.db.HeadBlock(ctx) +} + +// Blocks -- passthrough. +func (e Exporter) Blocks(ctx context.Context, f *filters.QueryFilter) ([]*eth.BeaconBlock, error) { + return e.db.Blocks(ctx, f) +} + +// BlockRoots -- passthrough. +func (e Exporter) BlockRoots(ctx context.Context, f *filters.QueryFilter) ([][32]byte, error) { + return e.db.BlockRoots(ctx, f) +} + +// HasBlock -- passthrough. +func (e Exporter) HasBlock(ctx context.Context, blockRoot [32]byte) bool { + return e.db.HasBlock(ctx, blockRoot) +} + +// DeleteBlock -- passthrough. +func (e Exporter) DeleteBlock(ctx context.Context, blockRoot [32]byte) error { + return e.db.DeleteBlock(ctx, blockRoot) +} + +// DeleteBlocks -- passthrough. +func (e Exporter) DeleteBlocks(ctx context.Context, blockRoots [][32]byte) error { + return e.db.DeleteBlocks(ctx, blockRoots) +} + +// ValidatorIndex -- passthrough. +func (e Exporter) ValidatorIndex(ctx context.Context, publicKey [48]byte) (uint64, bool, error) { + return e.db.ValidatorIndex(ctx, publicKey) +} + +// HasValidatorIndex -- passthrough. +func (e Exporter) HasValidatorIndex(ctx context.Context, publicKey [48]byte) bool { + return e.db.HasValidatorIndex(ctx, publicKey) +} + +// DeleteValidatorIndex -- passthrough. +func (e Exporter) DeleteValidatorIndex(ctx context.Context, publicKey [48]byte) error { + return e.db.DeleteValidatorIndex(ctx, publicKey) +} + +// State -- passthrough. +func (e Exporter) State(ctx context.Context, blockRoot [32]byte) (*ethereum_beacon_p2p_v1.BeaconState, error) { + return e.db.State(ctx, blockRoot) +} + +// HeadState -- passthrough. +func (e Exporter) HeadState(ctx context.Context) (*ethereum_beacon_p2p_v1.BeaconState, error) { + return e.db.HeadState(ctx) +} + +// GenesisState -- passthrough. +func (e Exporter) GenesisState(ctx context.Context) (*ethereum_beacon_p2p_v1.BeaconState, error) { + return e.db.GenesisState(ctx) +} + +// ProposerSlashing -- passthrough. +func (e Exporter) ProposerSlashing(ctx context.Context, slashingRoot [32]byte) (*eth.ProposerSlashing, error) { + return e.db.ProposerSlashing(ctx, slashingRoot) +} + +// AttesterSlashing -- passthrough. +func (e Exporter) AttesterSlashing(ctx context.Context, slashingRoot [32]byte) (*eth.AttesterSlashing, error) { + return e.db.AttesterSlashing(ctx, slashingRoot) +} + +// HasProposerSlashing -- passthrough. +func (e Exporter) HasProposerSlashing(ctx context.Context, slashingRoot [32]byte) bool { + return e.db.HasProposerSlashing(ctx, slashingRoot) +} + +// HasAttesterSlashing -- passthrough. +func (e Exporter) HasAttesterSlashing(ctx context.Context, slashingRoot [32]byte) bool { + return e.db.HasAttesterSlashing(ctx, slashingRoot) +} + +// DeleteProposerSlashing -- passthrough. +func (e Exporter) DeleteProposerSlashing(ctx context.Context, slashingRoot [32]byte) error { + return e.db.DeleteProposerSlashing(ctx, slashingRoot) +} + +// DeleteAttesterSlashing -- passthrough. +func (e Exporter) DeleteAttesterSlashing(ctx context.Context, slashingRoot [32]byte) error { + return e.db.DeleteAttesterSlashing(ctx, slashingRoot) +} + +// VoluntaryExit -- passthrough. +func (e Exporter) VoluntaryExit(ctx context.Context, exitRoot [32]byte) (*eth.VoluntaryExit, error) { + return e.db.VoluntaryExit(ctx, exitRoot) +} + +// HasVoluntaryExit -- passthrough. +func (e Exporter) HasVoluntaryExit(ctx context.Context, exitRoot [32]byte) bool { + return e.db.HasVoluntaryExit(ctx, exitRoot) +} + +// DeleteVoluntaryExit -- passthrough. +func (e Exporter) DeleteVoluntaryExit(ctx context.Context, exitRoot [32]byte) error { + return e.db.DeleteVoluntaryExit(ctx, exitRoot) +} + +// JustifiedCheckpoint -- passthrough. +func (e Exporter) JustifiedCheckpoint(ctx context.Context) (*eth.Checkpoint, error) { + return e.db.JustifiedCheckpoint(ctx) +} + +// FinalizedCheckpoint -- passthrough. +func (e Exporter) FinalizedCheckpoint(ctx context.Context) (*eth.Checkpoint, error) { + return e.db.FinalizedCheckpoint(ctx) +} + +// ArchivedActiveValidatorChanges -- passthrough. +func (e Exporter) ArchivedActiveValidatorChanges(ctx context.Context, epoch uint64) (*ethereum_beacon_p2p_v1.ArchivedActiveSetChanges, error) { + return e.db.ArchivedActiveValidatorChanges(ctx, epoch) +} + +// ArchivedCommitteeInfo -- passthrough. +func (e Exporter) ArchivedCommitteeInfo(ctx context.Context, epoch uint64) (*ethereum_beacon_p2p_v1.ArchivedCommitteeInfo, error) { + return e.db.ArchivedCommitteeInfo(ctx, epoch) +} + +// ArchivedBalances -- passthrough. +func (e Exporter) ArchivedBalances(ctx context.Context, epoch uint64) ([]uint64, error) { + return e.db.ArchivedBalances(ctx, epoch) +} + +// ArchivedValidatorParticipation -- passthrough. +func (e Exporter) ArchivedValidatorParticipation(ctx context.Context, epoch uint64) (*eth.ValidatorParticipation, error) { + return e.db.ArchivedValidatorParticipation(ctx, epoch) +} + +// DepositContractAddress -- passthrough. +func (e Exporter) DepositContractAddress(ctx context.Context) ([]byte, error) { + return e.db.DepositContractAddress(ctx) +} + +// SaveHeadBlockRoot -- passthrough. +func (e Exporter) SaveHeadBlockRoot(ctx context.Context, blockRoot [32]byte) error { + return e.db.SaveHeadBlockRoot(ctx, blockRoot) +} + +// SaveGenesisBlockRoot -- passthrough. +func (e Exporter) SaveGenesisBlockRoot(ctx context.Context, blockRoot [32]byte) error { + return e.db.SaveGenesisBlockRoot(ctx, blockRoot) +} + +// SaveValidatorIndex -- passthrough. +func (e Exporter) SaveValidatorIndex(ctx context.Context, publicKey [48]byte, validatorIdx uint64) error { + return e.db.SaveValidatorIndex(ctx, publicKey, validatorIdx) +} + +// SaveState -- passthrough. +func (e Exporter) SaveState(ctx context.Context, state *ethereum_beacon_p2p_v1.BeaconState, blockRoot [32]byte) error { + return e.db.SaveState(ctx, state, blockRoot) +} + +// SaveProposerSlashing -- passthrough. +func (e Exporter) SaveProposerSlashing(ctx context.Context, slashing *eth.ProposerSlashing) error { + return e.db.SaveProposerSlashing(ctx, slashing) +} + +// SaveAttesterSlashing -- passthrough. +func (e Exporter) SaveAttesterSlashing(ctx context.Context, slashing *eth.AttesterSlashing) error { + return e.db.SaveAttesterSlashing(ctx, slashing) +} + +// SaveVoluntaryExit -- passthrough. +func (e Exporter) SaveVoluntaryExit(ctx context.Context, exit *eth.VoluntaryExit) error { + return e.db.SaveVoluntaryExit(ctx, exit) +} + +// SaveJustifiedCheckpoint -- passthrough. +func (e Exporter) SaveJustifiedCheckpoint(ctx context.Context, checkpoint *eth.Checkpoint) error { + return e.db.SaveJustifiedCheckpoint(ctx, checkpoint) +} + +// SaveFinalizedCheckpoint -- passthrough. +func (e Exporter) SaveFinalizedCheckpoint(ctx context.Context, checkpoint *eth.Checkpoint) error { + return e.db.SaveFinalizedCheckpoint(ctx, checkpoint) +} + +// SaveArchivedActiveValidatorChanges -- passthrough. +func (e Exporter) SaveArchivedActiveValidatorChanges(ctx context.Context, epoch uint64, changes *ethereum_beacon_p2p_v1.ArchivedActiveSetChanges) error { + return e.db.SaveArchivedActiveValidatorChanges(ctx, epoch, changes) +} + +// SaveArchivedCommitteeInfo -- passthrough. +func (e Exporter) SaveArchivedCommitteeInfo(ctx context.Context, epoch uint64, info *ethereum_beacon_p2p_v1.ArchivedCommitteeInfo) error { + return e.db.SaveArchivedCommitteeInfo(ctx, epoch, info) +} + +// SaveArchivedBalances -- passthrough. +func (e Exporter) SaveArchivedBalances(ctx context.Context, epoch uint64, balances []uint64) error { + return e.db.SaveArchivedBalances(ctx, epoch, balances) +} + +// SaveArchivedValidatorParticipation -- passthrough. +func (e Exporter) SaveArchivedValidatorParticipation(ctx context.Context, epoch uint64, part *eth.ValidatorParticipation) error { + return e.db.SaveArchivedValidatorParticipation(ctx, epoch, part) +} + +// SaveDepositContractAddress -- passthrough. +func (e Exporter) SaveDepositContractAddress(ctx context.Context, addr common.Address) error { + return e.db.SaveDepositContractAddress(ctx, addr) +} + +// DeleteState -- passthrough. +func (e Exporter) DeleteState(ctx context.Context, blockRoot [32]byte) error { + return e.db.DeleteState(ctx, blockRoot) +} + +// DeleteStates -- passthrough. +func (e Exporter) DeleteStates(ctx context.Context, blockRoots [][32]byte) error { + return e.db.DeleteStates(ctx, blockRoots) +} + +// IsFinalizedBlock -- passthrough. +func (e Exporter) IsFinalizedBlock(ctx context.Context, blockRoot [32]byte) bool { + return e.db.IsFinalizedBlock(ctx, blockRoot) +} diff --git a/beacon-chain/db/kv/BUILD.bazel b/beacon-chain/db/kv/BUILD.bazel index 079651496..6180693d8 100644 --- a/beacon-chain/db/kv/BUILD.bazel +++ b/beacon-chain/db/kv/BUILD.bazel @@ -26,6 +26,7 @@ go_library( deps = [ "//beacon-chain/core/helpers:go_default_library", "//beacon-chain/db/filters:go_default_library", + "//beacon-chain/db/iface:go_default_library", "//proto/beacon/db:go_default_library", "//proto/beacon/p2p/v1:go_default_library", "//shared/bytesutil:go_default_library", diff --git a/beacon-chain/db/kv/kv.go b/beacon-chain/db/kv/kv.go index 64e74ffc8..f5f274590 100644 --- a/beacon-chain/db/kv/kv.go +++ b/beacon-chain/db/kv/kv.go @@ -11,8 +11,11 @@ import ( "github.com/mdlayher/prombolt" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/prysmaticlabs/prysm/beacon-chain/db/iface" ) +var _ = iface.Database(&Store{}) + const ( // VotesCacheSize with 1M validators will be 8MB. VotesCacheSize = 1 << 23 diff --git a/shared/featureconfig/config.go b/shared/featureconfig/config.go index 32ba2f80e..72de377a5 100644 --- a/shared/featureconfig/config.go +++ b/shared/featureconfig/config.go @@ -25,16 +25,17 @@ var log = logrus.WithField("prefix", "flags") // Flags is a struct to represent which features the client will perform on runtime. type Flags struct { - GenesisDelay bool // GenesisDelay when processing a chain start genesis event. - MinimalConfig bool // MinimalConfig as defined in the spec. - WriteSSZStateTransitions bool // WriteSSZStateTransitions to tmp directory. - InitSyncNoVerify bool // InitSyncNoVerify when initial syncing w/o verifying block's contents. - SkipBLSVerify bool // Skips BLS verification across the runtime. - EnableBackupWebhook bool // EnableBackupWebhook to allow database backups to trigger from monitoring port /db/backup. - PruneEpochBoundaryStates bool // PruneEpochBoundaryStates prunes the epoch boundary state before last finalized check point. + GenesisDelay bool // GenesisDelay when processing a chain start genesis event. + MinimalConfig bool // MinimalConfig as defined in the spec. + WriteSSZStateTransitions bool // WriteSSZStateTransitions to tmp directory. + InitSyncNoVerify bool // InitSyncNoVerify when initial syncing w/o verifying block's contents. + SkipBLSVerify bool // Skips BLS verification across the runtime. + EnableBackupWebhook bool // EnableBackupWebhook to allow database backups to trigger from monitoring port /db/backup. + PruneEpochBoundaryStates bool // PruneEpochBoundaryStates prunes the epoch boundary state before last finalized check point. EnableSnappyDBCompression bool // EnableSnappyDBCompression in the database. - EnableCustomStateSSZ bool // EnableCustomStateSSZ in the the state transition function. + EnableCustomStateSSZ bool // EnableCustomStateSSZ in the the state transition function. InitSyncCacheState bool // InitSyncCacheState caches state during initial sync. + KafkaBootstrapServers string // KafkaBootstrapServers to find kafka servers to stream blocks, attestations, etc. // Cache toggles. EnableAttestationCache bool // EnableAttestationCache; see https://github.com/prysmaticlabs/prysm/issues/3106. @@ -122,6 +123,10 @@ func ConfigureBeaconChain(ctx *cli.Context) { log.Warn("Enabled skip slots cache.") cfg.EnableSkipSlotsCache = true } + if ctx.GlobalString(kafkaBootstrapServersFlag.Name) != "" { + log.Warn("Enabling experimental kafka streaming.") + cfg.KafkaBootstrapServers = ctx.GlobalString(kafkaBootstrapServersFlag.Name) + } if ctx.GlobalBool(enableCommitteeCacheFlag.Name) { log.Warn("Enabled committee cache.") cfg.EnableCommitteeCache = true diff --git a/shared/featureconfig/flags.go b/shared/featureconfig/flags.go index 30f4517d5..a2e863fde 100644 --- a/shared/featureconfig/flags.go +++ b/shared/featureconfig/flags.go @@ -73,6 +73,10 @@ var ( Name: "enable-skip-slots-cache", Usage: "Enables the skip slot cache to be used in the event of skipped slots.", } + kafkaBootstrapServersFlag = cli.StringFlag{ + Name: "kafka-url", + Usage: "Stream attestations and blocks to specified kafka servers. This field is used for bootstrap.servers kafka config field.", + } enableSnappyDBCompressionFlag = cli.BoolFlag{ Name: "snappy", Usage: "Enables snappy compression in the database.", @@ -157,6 +161,7 @@ var BeaconChainFlags = append(deprecatedFlags, []cli.Flag{ initSyncCacheState, NewCacheFlag, SkipBLSVerifyFlag, + kafkaBootstrapServersFlag, enableBackupWebhookFlag, enableBLSPubkeyCacheFlag, enableShuffledIndexCache, diff --git a/third_party/in_gopkg_confluentinc_confluent_kafka_go_v1.patch b/third_party/in_gopkg_confluentinc_confluent_kafka_go_v1.patch new file mode 100644 index 000000000..d7c8ac1c1 --- /dev/null +++ b/third_party/in_gopkg_confluentinc_confluent_kafka_go_v1.patch @@ -0,0 +1,10 @@ +--- a/kafka/BUILD.bazel ++++ b/kafka/BUILD.bazel +@@ -23,6 +23,7 @@ + "producer.go", + "testhelpers.go", + ], ++ cdeps = ["@prysm//third_party/kafka:librdkafka"], + cgo = True, + importpath = "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka", + visibility = ["//visibility:public"], diff --git a/third_party/kafka/BUILD.bazel b/third_party/kafka/BUILD.bazel new file mode 100644 index 000000000..f55aa70e4 --- /dev/null +++ b/third_party/kafka/BUILD.bazel @@ -0,0 +1,20 @@ +load("@rules_foreign_cc//tools/build_defs:cmake.bzl", "cmake_external") + +cmake_external( + name = "librdkafka", + cache_entries = { + "RDKAFKA_BUILD_STATIC": "ON", + "WITH_ZSTD": "OFF", + "WITH_SSL": "OFF", + "WITH_SASL": "OFF", + "ENABLE_LZ4_EXT": "OFF", + "WITH_LIBDL": "OFF", + "WITH_ZLIB": "OFF", + }, + lib_source = "@librdkafka//:all", + static_libraries = [ + "librdkafka++.a", + "librdkafka.a", + ], + visibility = ["//visibility:public"], +)