Kafka exporter (#3840)

* abstract db interface, kafka build, work in progress
* checkpoint
* Merge branch 'master' of github.com:prysmaticlabs/prysm into es-exporter
* feature flag
* move passthrough
* flag change
* gofmt
* Merge branch 'master' of github.com:prysmaticlabs/prysm into es-exporter
* missing db methods
* Merge branch 'master' of github.com:prysmaticlabs/prysm into es-exporter
* fix interface
* Merge branch 'master' of github.com:prysmaticlabs/prysm into es-exporter
* Merge branch 'master' of github.com:prysmaticlabs/prysm into es-exporter
* Merge branch 'master' of github.com:prysmaticlabs/prysm into es-exporter
* try using cmake built from source
* lint godocs
* lint godocs
* lint godocs
* Update BUILD.bazel
* Merge branch 'master' into es-exporter
* Merge branch 'master' into es-exporter
* Merge branch 'master' into es-exporter
* Merge branch 'master' of github.com:prysmaticlabs/prysm into es-exporter
* gaz
This commit is contained in:
Preston Van Loon 2019-12-05 18:05:58 -08:00 committed by prylabs-bulldozer[bot]
parent 9d4c7cb4f7
commit d3375d98a8
15 changed files with 631 additions and 91 deletions

View File

@ -137,3 +137,9 @@ common_files = {
),
tags = ["manual"],
) for pair in binary_targets]
toolchain(
name = "built_cmake_toolchain",
toolchain = "@rules_foreign_cc//tools/build_defs/native_tools:built_cmake",
toolchain_type = "@rules_foreign_cc//tools/build_defs:cmake_toolchain",
)

View File

@ -217,6 +217,28 @@ load("@com_google_protobuf//:protobuf_deps.bzl", "protobuf_deps")
protobuf_deps()
# Group the sources of the library so that CMake rule have access to it
all_content = """filegroup(name = "all", srcs = glob(["**"]), visibility = ["//visibility:public"])"""
http_archive(
name = "rules_foreign_cc",
strip_prefix = "rules_foreign_cc-master",
url = "https://github.com/bazelbuild/rules_foreign_cc/archive/master.zip",
)
load("@rules_foreign_cc//:workspace_definitions.bzl", "rules_foreign_cc_dependencies")
rules_foreign_cc_dependencies([
"@prysm//:built_cmake_toolchain",
])
http_archive(
name = "librdkafka",
build_file_content = all_content,
strip_prefix = "librdkafka-1.2.1",
urls = ["https://github.com/edenhill/librdkafka/archive/v1.2.1.tar.gz"],
)
# External dependencies
go_repository(
@ -1288,6 +1310,15 @@ go_repository(
version = "v0.0.0-20191002040644-a1355ae1e2c3",
)
go_repository(
name = "in_gopkg_confluentinc_confluent_kafka_go_v1",
importpath = "gopkg.in/confluentinc/confluent-kafka-go.v1",
patch_args = ["-p1"],
patches = ["//third_party:in_gopkg_confluentinc_confluent_kafka_go_v1.patch"],
sum = "h1:roy97m/3wj9/o8OuU3sZ5wildk30ep38k2x8nhNbKrI=",
version = "v1.1.0",
)
go_repository(
name = "com_github_naoina_toml",
importpath = "github.com/naoina/toml",

View File

@ -12,11 +12,9 @@ go_library(
"//tools:__subpackages__",
],
deps = [
"//beacon-chain/db/filters:go_default_library",
"//beacon-chain/db/iface:go_default_library",
"//beacon-chain/db/kafka:go_default_library",
"//beacon-chain/db/kv:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"@com_github_ethereum_go_ethereum//common:go_default_library",
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
],
)

View File

@ -1,91 +1,21 @@
package db
import (
"context"
"io"
"github.com/ethereum/go-ethereum/common"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/db/filters"
"github.com/prysmaticlabs/prysm/beacon-chain/db/iface"
"github.com/prysmaticlabs/prysm/beacon-chain/db/kafka"
"github.com/prysmaticlabs/prysm/beacon-chain/db/kv"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
)
// Database defines the necessary methods for Prysm's eth2 backend which may
// be implemented by any key-value or relational database in practice.
type Database interface {
io.Closer
DatabasePath() string
ClearDB() error
// Backup and restore methods
Backup(ctx context.Context) error
// Attestation related methods.
AttestationsByDataRoot(ctx context.Context, attDataRoot [32]byte) ([]*ethpb.Attestation, error)
Attestations(ctx context.Context, f *filters.QueryFilter) ([]*ethpb.Attestation, error)
HasAttestation(ctx context.Context, attDataRoot [32]byte) bool
DeleteAttestation(ctx context.Context, attDataRoot [32]byte) error
DeleteAttestations(ctx context.Context, attDataRoots [][32]byte) error
SaveAttestation(ctx context.Context, att *ethpb.Attestation) error
SaveAttestations(ctx context.Context, atts []*ethpb.Attestation) error
// Block related methods.
Block(ctx context.Context, blockRoot [32]byte) (*ethpb.BeaconBlock, error)
HeadBlock(ctx context.Context) (*ethpb.BeaconBlock, error)
Blocks(ctx context.Context, f *filters.QueryFilter) ([]*ethpb.BeaconBlock, error)
BlockRoots(ctx context.Context, f *filters.QueryFilter) ([][32]byte, error)
HasBlock(ctx context.Context, blockRoot [32]byte) bool
DeleteBlock(ctx context.Context, blockRoot [32]byte) error
DeleteBlocks(ctx context.Context, blockRoots [][32]byte) error
SaveBlock(ctx context.Context, block *ethpb.BeaconBlock) error
SaveBlocks(ctx context.Context, blocks []*ethpb.BeaconBlock) error
SaveHeadBlockRoot(ctx context.Context, blockRoot [32]byte) error
SaveGenesisBlockRoot(ctx context.Context, blockRoot [32]byte) error
IsFinalizedBlock(ctx context.Context, blockRoot [32]byte) bool
// Validator related methods.
ValidatorIndex(ctx context.Context, publicKey [48]byte) (uint64, bool, error)
HasValidatorIndex(ctx context.Context, publicKey [48]byte) bool
DeleteValidatorIndex(ctx context.Context, publicKey [48]byte) error
SaveValidatorIndex(ctx context.Context, publicKey [48]byte, validatorIdx uint64) error
// State related methods.
State(ctx context.Context, blockRoot [32]byte) (*pb.BeaconState, error)
HeadState(ctx context.Context) (*pb.BeaconState, error)
GenesisState(ctx context.Context) (*pb.BeaconState, error)
SaveState(ctx context.Context, state *pb.BeaconState, blockRoot [32]byte) error
DeleteState(ctx context.Context, blockRoot [32]byte) error
DeleteStates(ctx context.Context, blockRoots [][32]byte) error
// Slashing operations.
ProposerSlashing(ctx context.Context, slashingRoot [32]byte) (*ethpb.ProposerSlashing, error)
AttesterSlashing(ctx context.Context, slashingRoot [32]byte) (*ethpb.AttesterSlashing, error)
SaveProposerSlashing(ctx context.Context, slashing *ethpb.ProposerSlashing) error
SaveAttesterSlashing(ctx context.Context, slashing *ethpb.AttesterSlashing) error
HasProposerSlashing(ctx context.Context, slashingRoot [32]byte) bool
HasAttesterSlashing(ctx context.Context, slashingRoot [32]byte) bool
DeleteProposerSlashing(ctx context.Context, slashingRoot [32]byte) error
DeleteAttesterSlashing(ctx context.Context, slashingRoot [32]byte) error
// Block operations.
VoluntaryExit(ctx context.Context, exitRoot [32]byte) (*ethpb.VoluntaryExit, error)
SaveVoluntaryExit(ctx context.Context, exit *ethpb.VoluntaryExit) error
HasVoluntaryExit(ctx context.Context, exitRoot [32]byte) bool
DeleteVoluntaryExit(ctx context.Context, exitRoot [32]byte) error
// Checkpoint operations.
JustifiedCheckpoint(ctx context.Context) (*ethpb.Checkpoint, error)
FinalizedCheckpoint(ctx context.Context) (*ethpb.Checkpoint, error)
SaveJustifiedCheckpoint(ctx context.Context, checkpoint *ethpb.Checkpoint) error
SaveFinalizedCheckpoint(ctx context.Context, checkpoint *ethpb.Checkpoint) error
// Archival data handlers for storing/retrieving historical beacon node information.
ArchivedActiveValidatorChanges(ctx context.Context, epoch uint64) (*pb.ArchivedActiveSetChanges, error)
SaveArchivedActiveValidatorChanges(ctx context.Context, epoch uint64, changes *pb.ArchivedActiveSetChanges) error
ArchivedCommitteeInfo(ctx context.Context, epoch uint64) (*pb.ArchivedCommitteeInfo, error)
SaveArchivedCommitteeInfo(ctx context.Context, epoch uint64, info *pb.ArchivedCommitteeInfo) error
ArchivedBalances(ctx context.Context, epoch uint64) ([]uint64, error)
SaveArchivedBalances(ctx context.Context, epoch uint64, balances []uint64) error
ArchivedValidatorParticipation(ctx context.Context, epoch uint64) (*ethpb.ValidatorParticipation, error)
SaveArchivedValidatorParticipation(ctx context.Context, epoch uint64, part *ethpb.ValidatorParticipation) error
// Deposit contract related handlers.
DepositContractAddress(ctx context.Context) ([]byte, error)
SaveDepositContractAddress(ctx context.Context, addr common.Address) error
}
type Database = iface.Database
// NewDB initializes a new DB.
func NewDB(dirPath string) (Database, error) {
return kv.NewKVStore(dirPath)
db, err := kv.NewKVStore(dirPath)
if err != nil {
return nil, err
}
return kafka.Wrap(db)
}

View File

@ -0,0 +1,15 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["interface.go"],
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/db/iface",
# Other packages must use github.com/prysmaticlabs/prysm/beacon-chain/db.Database alias.
visibility = ["//beacon-chain/db:__subpackages__"],
deps = [
"//beacon-chain/db/filters:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"@com_github_ethereum_go_ethereum//common:go_default_library",
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
],
)

View File

@ -0,0 +1,86 @@
// Package iface exists to prevent circular dependencies when implementing the database interface.
package iface
import (
"context"
"io"
"github.com/ethereum/go-ethereum/common"
"github.com/prysmaticlabs/prysm/beacon-chain/db/filters"
"github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
)
// Database defines the necessary methods for Prysm's eth2 backend which may
// be implemented by any key-value or relational database in practice.
type Database interface {
io.Closer
DatabasePath() string
ClearDB() error
// Backup and restore methods
Backup(ctx context.Context) error
// Attestation related methods.
AttestationsByDataRoot(ctx context.Context, attDataRoot [32]byte) ([]*eth.Attestation, error)
Attestations(ctx context.Context, f *filters.QueryFilter) ([]*eth.Attestation, error)
HasAttestation(ctx context.Context, attDataRoot [32]byte) bool
DeleteAttestation(ctx context.Context, attDataRoot [32]byte) error
DeleteAttestations(ctx context.Context, attDataRoots [][32]byte) error
SaveAttestation(ctx context.Context, att *eth.Attestation) error
SaveAttestations(ctx context.Context, atts []*eth.Attestation) error
// Block related methods.
Block(ctx context.Context, blockRoot [32]byte) (*eth.BeaconBlock, error)
HeadBlock(ctx context.Context) (*eth.BeaconBlock, error)
Blocks(ctx context.Context, f *filters.QueryFilter) ([]*eth.BeaconBlock, error)
BlockRoots(ctx context.Context, f *filters.QueryFilter) ([][32]byte, error)
HasBlock(ctx context.Context, blockRoot [32]byte) bool
DeleteBlock(ctx context.Context, blockRoot [32]byte) error
DeleteBlocks(ctx context.Context, blockRoots [][32]byte) error
SaveBlock(ctx context.Context, block *eth.BeaconBlock) error
SaveBlocks(ctx context.Context, blocks []*eth.BeaconBlock) error
SaveHeadBlockRoot(ctx context.Context, blockRoot [32]byte) error
SaveGenesisBlockRoot(ctx context.Context, blockRoot [32]byte) error
IsFinalizedBlock(ctx context.Context, blockRoot [32]byte) bool
// Validator related methods.
ValidatorIndex(ctx context.Context, publicKey [48]byte) (uint64, bool, error)
HasValidatorIndex(ctx context.Context, publicKey [48]byte) bool
DeleteValidatorIndex(ctx context.Context, publicKey [48]byte) error
SaveValidatorIndex(ctx context.Context, publicKey [48]byte, validatorIdx uint64) error
// State related methods.
State(ctx context.Context, blockRoot [32]byte) (*ethereum_beacon_p2p_v1.BeaconState, error)
HeadState(ctx context.Context) (*ethereum_beacon_p2p_v1.BeaconState, error)
GenesisState(ctx context.Context) (*ethereum_beacon_p2p_v1.BeaconState, error)
SaveState(ctx context.Context, state *ethereum_beacon_p2p_v1.BeaconState, blockRoot [32]byte) error
DeleteState(ctx context.Context, blockRoot [32]byte) error
DeleteStates(ctx context.Context, blockRoots [][32]byte) error
// Slashing operations.
ProposerSlashing(ctx context.Context, slashingRoot [32]byte) (*eth.ProposerSlashing, error)
AttesterSlashing(ctx context.Context, slashingRoot [32]byte) (*eth.AttesterSlashing, error)
SaveProposerSlashing(ctx context.Context, slashing *eth.ProposerSlashing) error
SaveAttesterSlashing(ctx context.Context, slashing *eth.AttesterSlashing) error
HasProposerSlashing(ctx context.Context, slashingRoot [32]byte) bool
HasAttesterSlashing(ctx context.Context, slashingRoot [32]byte) bool
DeleteProposerSlashing(ctx context.Context, slashingRoot [32]byte) error
DeleteAttesterSlashing(ctx context.Context, slashingRoot [32]byte) error
// Block operations.
VoluntaryExit(ctx context.Context, exitRoot [32]byte) (*eth.VoluntaryExit, error)
SaveVoluntaryExit(ctx context.Context, exit *eth.VoluntaryExit) error
HasVoluntaryExit(ctx context.Context, exitRoot [32]byte) bool
DeleteVoluntaryExit(ctx context.Context, exitRoot [32]byte) error
// Checkpoint operations.
JustifiedCheckpoint(ctx context.Context) (*eth.Checkpoint, error)
FinalizedCheckpoint(ctx context.Context) (*eth.Checkpoint, error)
SaveJustifiedCheckpoint(ctx context.Context, checkpoint *eth.Checkpoint) error
SaveFinalizedCheckpoint(ctx context.Context, checkpoint *eth.Checkpoint) error
// Archival data handlers for storing/retrieving historical beacon node information.
ArchivedActiveValidatorChanges(ctx context.Context, epoch uint64) (*ethereum_beacon_p2p_v1.ArchivedActiveSetChanges, error)
SaveArchivedActiveValidatorChanges(ctx context.Context, epoch uint64, changes *ethereum_beacon_p2p_v1.ArchivedActiveSetChanges) error
ArchivedCommitteeInfo(ctx context.Context, epoch uint64) (*ethereum_beacon_p2p_v1.ArchivedCommitteeInfo, error)
SaveArchivedCommitteeInfo(ctx context.Context, epoch uint64, info *ethereum_beacon_p2p_v1.ArchivedCommitteeInfo) error
ArchivedBalances(ctx context.Context, epoch uint64) ([]uint64, error)
SaveArchivedBalances(ctx context.Context, epoch uint64, balances []uint64) error
ArchivedValidatorParticipation(ctx context.Context, epoch uint64) (*eth.ValidatorParticipation, error)
SaveArchivedValidatorParticipation(ctx context.Context, epoch uint64, part *eth.ValidatorParticipation) error
// Deposit contract related handlers.
DepositContractAddress(ctx context.Context) ([]byte, error)
SaveDepositContractAddress(ctx context.Context, addr common.Address) error
}

View File

@ -0,0 +1,26 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = [
"export_wrapper.go",
"passthrough.go",
],
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/db/kafka",
visibility = ["//beacon-chain/db:__pkg__"],
deps = [
"//beacon-chain/db/filters:go_default_library",
"//beacon-chain/db/iface:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//shared/featureconfig:go_default_library",
"//shared/traceutil:go_default_library",
"@com_github_ethereum_go_ethereum//common:go_default_library",
"@com_github_golang_protobuf//jsonpb:go_default_library_gen",
"@com_github_golang_protobuf//proto:go_default_library",
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
"@com_github_prysmaticlabs_go_ssz//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@in_gopkg_confluentinc_confluent_kafka_go_v1//kafka:go_default_library",
"@io_opencensus_go//trace:go_default_library",
],
)

View File

@ -0,0 +1,124 @@
package kafka
import (
"bytes"
"context"
"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
"github.com/prysmaticlabs/go-ssz"
"github.com/prysmaticlabs/prysm/beacon-chain/db/iface"
eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/traceutil"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)
var _ = iface.Database(&Exporter{})
var log = logrus.WithField("prefix", "exporter")
var marshaler = &jsonpb.Marshaler{}
// Exporter wraps a database interface and exports certain objects to kafka topics.
type Exporter struct {
db iface.Database
p *kafka.Producer
}
// Wrap the db with kafka exporter. If the feature flag is not enabled, this service does not wrap
// the database, but returns the underlying database pointer itself.
func Wrap(db iface.Database) (iface.Database, error) {
if featureconfig.Get().KafkaBootstrapServers == "" {
return db, nil
}
p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": featureconfig.Get().KafkaBootstrapServers})
if err != nil {
return nil, err
}
return &Exporter{db: db, p: p}, nil
}
func (e Exporter) publish(ctx context.Context, topic string, msg proto.Message) error {
ctx, span := trace.StartSpan(ctx, "kafka.publish")
defer span.End()
buf := bytes.NewBuffer(nil)
if err := marshaler.Marshal(buf, msg); err != nil {
traceutil.AnnotateError(span, err)
return err
}
key, err := ssz.HashTreeRoot(msg)
if err != nil {
traceutil.AnnotateError(span, err)
return err
}
if err := e.p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &topic,
},
Value: buf.Bytes(),
Key: key[:],
}, nil); err != nil {
traceutil.AnnotateError(span, err)
return err
}
return nil
}
// Close closes kafka producer and underlying db.
func (e Exporter) Close() error {
e.p.Close()
return e.db.Close()
}
// SaveAttestation publishes to the kafka topic for attestations.
func (e Exporter) SaveAttestation(ctx context.Context, att *eth.Attestation) error {
go func() {
if err := e.publish(ctx, "beacon_attestation", att); err != nil {
log.WithError(err).Error("Failed to publish attestation")
}
}()
return e.db.SaveAttestation(ctx, att)
}
// SaveAttestations publishes to the kafka topic for beacon attestations.
func (e Exporter) SaveAttestations(ctx context.Context, atts []*eth.Attestation) error {
go func() {
for _, att := range atts {
if err := e.publish(ctx, "beacon_attestation", att); err != nil {
log.WithError(err).Error("Failed to publish attestation")
}
}
}()
return e.db.SaveAttestations(ctx, atts)
}
// SaveBlock publishes to the kafka topic for beacon blocks.
func (e Exporter) SaveBlock(ctx context.Context, block *eth.BeaconBlock) error {
go func() {
if err := e.publish(ctx, "beacon_block", block); err != nil {
log.WithError(err).Error("Failed to publish block")
}
}()
return e.db.SaveBlock(ctx, block)
}
// SaveBlocks publishes to the kafka topic for beacon blocks.
func (e Exporter) SaveBlocks(ctx context.Context, blocks []*eth.BeaconBlock) error {
go func() {
for _, block := range blocks {
if err := e.publish(ctx, "beacon_block", block); err != nil {
log.WithError(err).Error("Failed to publish block")
}
}
}()
return e.db.SaveBlocks(ctx, blocks)
}

View File

@ -0,0 +1,280 @@
package kafka
import (
"context"
"github.com/ethereum/go-ethereum/common"
"github.com/prysmaticlabs/prysm/beacon-chain/db/filters"
ethereum_beacon_p2p_v1 "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
)
// DatabasePath -- passthrough.
func (e Exporter) DatabasePath() string {
return e.db.DatabasePath()
}
// ClearDB -- passthrough.
func (e Exporter) ClearDB() error {
return e.db.ClearDB()
}
// Backup -- passthrough.
func (e Exporter) Backup(ctx context.Context) error {
return e.db.Backup(ctx)
}
// AttestationsByDataRoot -- passthrough.
func (e Exporter) AttestationsByDataRoot(ctx context.Context, attDataRoot [32]byte) ([]*eth.Attestation, error) {
return e.db.AttestationsByDataRoot(ctx, attDataRoot)
}
// Attestations -- passthrough.
func (e Exporter) Attestations(ctx context.Context, f *filters.QueryFilter) ([]*eth.Attestation, error) {
return e.db.Attestations(ctx, f)
}
// HasAttestation -- passthrough.
func (e Exporter) HasAttestation(ctx context.Context, attDataRoot [32]byte) bool {
return e.db.HasAttestation(ctx, attDataRoot)
}
// DeleteAttestation -- passthrough.
func (e Exporter) DeleteAttestation(ctx context.Context, attDataRoot [32]byte) error {
return e.db.DeleteAttestation(ctx, attDataRoot)
}
// DeleteAttestations -- passthrough.
func (e Exporter) DeleteAttestations(ctx context.Context, attDataRoots [][32]byte) error {
return e.db.DeleteAttestations(ctx, attDataRoots)
}
// Block -- passthrough.
func (e Exporter) Block(ctx context.Context, blockRoot [32]byte) (*eth.BeaconBlock, error) {
return e.db.Block(ctx, blockRoot)
}
// HeadBlock -- passthrough.
func (e Exporter) HeadBlock(ctx context.Context) (*eth.BeaconBlock, error) {
return e.db.HeadBlock(ctx)
}
// Blocks -- passthrough.
func (e Exporter) Blocks(ctx context.Context, f *filters.QueryFilter) ([]*eth.BeaconBlock, error) {
return e.db.Blocks(ctx, f)
}
// BlockRoots -- passthrough.
func (e Exporter) BlockRoots(ctx context.Context, f *filters.QueryFilter) ([][32]byte, error) {
return e.db.BlockRoots(ctx, f)
}
// HasBlock -- passthrough.
func (e Exporter) HasBlock(ctx context.Context, blockRoot [32]byte) bool {
return e.db.HasBlock(ctx, blockRoot)
}
// DeleteBlock -- passthrough.
func (e Exporter) DeleteBlock(ctx context.Context, blockRoot [32]byte) error {
return e.db.DeleteBlock(ctx, blockRoot)
}
// DeleteBlocks -- passthrough.
func (e Exporter) DeleteBlocks(ctx context.Context, blockRoots [][32]byte) error {
return e.db.DeleteBlocks(ctx, blockRoots)
}
// ValidatorIndex -- passthrough.
func (e Exporter) ValidatorIndex(ctx context.Context, publicKey [48]byte) (uint64, bool, error) {
return e.db.ValidatorIndex(ctx, publicKey)
}
// HasValidatorIndex -- passthrough.
func (e Exporter) HasValidatorIndex(ctx context.Context, publicKey [48]byte) bool {
return e.db.HasValidatorIndex(ctx, publicKey)
}
// DeleteValidatorIndex -- passthrough.
func (e Exporter) DeleteValidatorIndex(ctx context.Context, publicKey [48]byte) error {
return e.db.DeleteValidatorIndex(ctx, publicKey)
}
// State -- passthrough.
func (e Exporter) State(ctx context.Context, blockRoot [32]byte) (*ethereum_beacon_p2p_v1.BeaconState, error) {
return e.db.State(ctx, blockRoot)
}
// HeadState -- passthrough.
func (e Exporter) HeadState(ctx context.Context) (*ethereum_beacon_p2p_v1.BeaconState, error) {
return e.db.HeadState(ctx)
}
// GenesisState -- passthrough.
func (e Exporter) GenesisState(ctx context.Context) (*ethereum_beacon_p2p_v1.BeaconState, error) {
return e.db.GenesisState(ctx)
}
// ProposerSlashing -- passthrough.
func (e Exporter) ProposerSlashing(ctx context.Context, slashingRoot [32]byte) (*eth.ProposerSlashing, error) {
return e.db.ProposerSlashing(ctx, slashingRoot)
}
// AttesterSlashing -- passthrough.
func (e Exporter) AttesterSlashing(ctx context.Context, slashingRoot [32]byte) (*eth.AttesterSlashing, error) {
return e.db.AttesterSlashing(ctx, slashingRoot)
}
// HasProposerSlashing -- passthrough.
func (e Exporter) HasProposerSlashing(ctx context.Context, slashingRoot [32]byte) bool {
return e.db.HasProposerSlashing(ctx, slashingRoot)
}
// HasAttesterSlashing -- passthrough.
func (e Exporter) HasAttesterSlashing(ctx context.Context, slashingRoot [32]byte) bool {
return e.db.HasAttesterSlashing(ctx, slashingRoot)
}
// DeleteProposerSlashing -- passthrough.
func (e Exporter) DeleteProposerSlashing(ctx context.Context, slashingRoot [32]byte) error {
return e.db.DeleteProposerSlashing(ctx, slashingRoot)
}
// DeleteAttesterSlashing -- passthrough.
func (e Exporter) DeleteAttesterSlashing(ctx context.Context, slashingRoot [32]byte) error {
return e.db.DeleteAttesterSlashing(ctx, slashingRoot)
}
// VoluntaryExit -- passthrough.
func (e Exporter) VoluntaryExit(ctx context.Context, exitRoot [32]byte) (*eth.VoluntaryExit, error) {
return e.db.VoluntaryExit(ctx, exitRoot)
}
// HasVoluntaryExit -- passthrough.
func (e Exporter) HasVoluntaryExit(ctx context.Context, exitRoot [32]byte) bool {
return e.db.HasVoluntaryExit(ctx, exitRoot)
}
// DeleteVoluntaryExit -- passthrough.
func (e Exporter) DeleteVoluntaryExit(ctx context.Context, exitRoot [32]byte) error {
return e.db.DeleteVoluntaryExit(ctx, exitRoot)
}
// JustifiedCheckpoint -- passthrough.
func (e Exporter) JustifiedCheckpoint(ctx context.Context) (*eth.Checkpoint, error) {
return e.db.JustifiedCheckpoint(ctx)
}
// FinalizedCheckpoint -- passthrough.
func (e Exporter) FinalizedCheckpoint(ctx context.Context) (*eth.Checkpoint, error) {
return e.db.FinalizedCheckpoint(ctx)
}
// ArchivedActiveValidatorChanges -- passthrough.
func (e Exporter) ArchivedActiveValidatorChanges(ctx context.Context, epoch uint64) (*ethereum_beacon_p2p_v1.ArchivedActiveSetChanges, error) {
return e.db.ArchivedActiveValidatorChanges(ctx, epoch)
}
// ArchivedCommitteeInfo -- passthrough.
func (e Exporter) ArchivedCommitteeInfo(ctx context.Context, epoch uint64) (*ethereum_beacon_p2p_v1.ArchivedCommitteeInfo, error) {
return e.db.ArchivedCommitteeInfo(ctx, epoch)
}
// ArchivedBalances -- passthrough.
func (e Exporter) ArchivedBalances(ctx context.Context, epoch uint64) ([]uint64, error) {
return e.db.ArchivedBalances(ctx, epoch)
}
// ArchivedValidatorParticipation -- passthrough.
func (e Exporter) ArchivedValidatorParticipation(ctx context.Context, epoch uint64) (*eth.ValidatorParticipation, error) {
return e.db.ArchivedValidatorParticipation(ctx, epoch)
}
// DepositContractAddress -- passthrough.
func (e Exporter) DepositContractAddress(ctx context.Context) ([]byte, error) {
return e.db.DepositContractAddress(ctx)
}
// SaveHeadBlockRoot -- passthrough.
func (e Exporter) SaveHeadBlockRoot(ctx context.Context, blockRoot [32]byte) error {
return e.db.SaveHeadBlockRoot(ctx, blockRoot)
}
// SaveGenesisBlockRoot -- passthrough.
func (e Exporter) SaveGenesisBlockRoot(ctx context.Context, blockRoot [32]byte) error {
return e.db.SaveGenesisBlockRoot(ctx, blockRoot)
}
// SaveValidatorIndex -- passthrough.
func (e Exporter) SaveValidatorIndex(ctx context.Context, publicKey [48]byte, validatorIdx uint64) error {
return e.db.SaveValidatorIndex(ctx, publicKey, validatorIdx)
}
// SaveState -- passthrough.
func (e Exporter) SaveState(ctx context.Context, state *ethereum_beacon_p2p_v1.BeaconState, blockRoot [32]byte) error {
return e.db.SaveState(ctx, state, blockRoot)
}
// SaveProposerSlashing -- passthrough.
func (e Exporter) SaveProposerSlashing(ctx context.Context, slashing *eth.ProposerSlashing) error {
return e.db.SaveProposerSlashing(ctx, slashing)
}
// SaveAttesterSlashing -- passthrough.
func (e Exporter) SaveAttesterSlashing(ctx context.Context, slashing *eth.AttesterSlashing) error {
return e.db.SaveAttesterSlashing(ctx, slashing)
}
// SaveVoluntaryExit -- passthrough.
func (e Exporter) SaveVoluntaryExit(ctx context.Context, exit *eth.VoluntaryExit) error {
return e.db.SaveVoluntaryExit(ctx, exit)
}
// SaveJustifiedCheckpoint -- passthrough.
func (e Exporter) SaveJustifiedCheckpoint(ctx context.Context, checkpoint *eth.Checkpoint) error {
return e.db.SaveJustifiedCheckpoint(ctx, checkpoint)
}
// SaveFinalizedCheckpoint -- passthrough.
func (e Exporter) SaveFinalizedCheckpoint(ctx context.Context, checkpoint *eth.Checkpoint) error {
return e.db.SaveFinalizedCheckpoint(ctx, checkpoint)
}
// SaveArchivedActiveValidatorChanges -- passthrough.
func (e Exporter) SaveArchivedActiveValidatorChanges(ctx context.Context, epoch uint64, changes *ethereum_beacon_p2p_v1.ArchivedActiveSetChanges) error {
return e.db.SaveArchivedActiveValidatorChanges(ctx, epoch, changes)
}
// SaveArchivedCommitteeInfo -- passthrough.
func (e Exporter) SaveArchivedCommitteeInfo(ctx context.Context, epoch uint64, info *ethereum_beacon_p2p_v1.ArchivedCommitteeInfo) error {
return e.db.SaveArchivedCommitteeInfo(ctx, epoch, info)
}
// SaveArchivedBalances -- passthrough.
func (e Exporter) SaveArchivedBalances(ctx context.Context, epoch uint64, balances []uint64) error {
return e.db.SaveArchivedBalances(ctx, epoch, balances)
}
// SaveArchivedValidatorParticipation -- passthrough.
func (e Exporter) SaveArchivedValidatorParticipation(ctx context.Context, epoch uint64, part *eth.ValidatorParticipation) error {
return e.db.SaveArchivedValidatorParticipation(ctx, epoch, part)
}
// SaveDepositContractAddress -- passthrough.
func (e Exporter) SaveDepositContractAddress(ctx context.Context, addr common.Address) error {
return e.db.SaveDepositContractAddress(ctx, addr)
}
// DeleteState -- passthrough.
func (e Exporter) DeleteState(ctx context.Context, blockRoot [32]byte) error {
return e.db.DeleteState(ctx, blockRoot)
}
// DeleteStates -- passthrough.
func (e Exporter) DeleteStates(ctx context.Context, blockRoots [][32]byte) error {
return e.db.DeleteStates(ctx, blockRoots)
}
// IsFinalizedBlock -- passthrough.
func (e Exporter) IsFinalizedBlock(ctx context.Context, blockRoot [32]byte) bool {
return e.db.IsFinalizedBlock(ctx, blockRoot)
}

View File

@ -26,6 +26,7 @@ go_library(
deps = [
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/db/filters:go_default_library",
"//beacon-chain/db/iface:go_default_library",
"//proto/beacon/db:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//shared/bytesutil:go_default_library",

View File

@ -11,8 +11,11 @@ import (
"github.com/mdlayher/prombolt"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prysmaticlabs/prysm/beacon-chain/db/iface"
)
var _ = iface.Database(&Store{})
const (
// VotesCacheSize with 1M validators will be 8MB.
VotesCacheSize = 1 << 23

View File

@ -35,6 +35,7 @@ type Flags struct {
EnableSnappyDBCompression bool // EnableSnappyDBCompression in the database.
EnableCustomStateSSZ bool // EnableCustomStateSSZ in the the state transition function.
InitSyncCacheState bool // InitSyncCacheState caches state during initial sync.
KafkaBootstrapServers string // KafkaBootstrapServers to find kafka servers to stream blocks, attestations, etc.
// Cache toggles.
EnableAttestationCache bool // EnableAttestationCache; see https://github.com/prysmaticlabs/prysm/issues/3106.
@ -122,6 +123,10 @@ func ConfigureBeaconChain(ctx *cli.Context) {
log.Warn("Enabled skip slots cache.")
cfg.EnableSkipSlotsCache = true
}
if ctx.GlobalString(kafkaBootstrapServersFlag.Name) != "" {
log.Warn("Enabling experimental kafka streaming.")
cfg.KafkaBootstrapServers = ctx.GlobalString(kafkaBootstrapServersFlag.Name)
}
if ctx.GlobalBool(enableCommitteeCacheFlag.Name) {
log.Warn("Enabled committee cache.")
cfg.EnableCommitteeCache = true

View File

@ -73,6 +73,10 @@ var (
Name: "enable-skip-slots-cache",
Usage: "Enables the skip slot cache to be used in the event of skipped slots.",
}
kafkaBootstrapServersFlag = cli.StringFlag{
Name: "kafka-url",
Usage: "Stream attestations and blocks to specified kafka servers. This field is used for bootstrap.servers kafka config field.",
}
enableSnappyDBCompressionFlag = cli.BoolFlag{
Name: "snappy",
Usage: "Enables snappy compression in the database.",
@ -157,6 +161,7 @@ var BeaconChainFlags = append(deprecatedFlags, []cli.Flag{
initSyncCacheState,
NewCacheFlag,
SkipBLSVerifyFlag,
kafkaBootstrapServersFlag,
enableBackupWebhookFlag,
enableBLSPubkeyCacheFlag,
enableShuffledIndexCache,

View File

@ -0,0 +1,10 @@
--- a/kafka/BUILD.bazel
+++ b/kafka/BUILD.bazel
@@ -23,6 +23,7 @@
"producer.go",
"testhelpers.go",
],
+ cdeps = ["@prysm//third_party/kafka:librdkafka"],
cgo = True,
importpath = "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka",
visibility = ["//visibility:public"],

20
third_party/kafka/BUILD.bazel vendored Normal file
View File

@ -0,0 +1,20 @@
load("@rules_foreign_cc//tools/build_defs:cmake.bzl", "cmake_external")
cmake_external(
name = "librdkafka",
cache_entries = {
"RDKAFKA_BUILD_STATIC": "ON",
"WITH_ZSTD": "OFF",
"WITH_SSL": "OFF",
"WITH_SASL": "OFF",
"ENABLE_LZ4_EXT": "OFF",
"WITH_LIBDL": "OFF",
"WITH_ZLIB": "OFF",
},
lib_source = "@librdkafka//:all",
static_libraries = [
"librdkafka++.a",
"librdkafka.a",
],
visibility = ["//visibility:public"],
)