[Service Revamp] - Powchain Service With Functional Options (#9856)

* begin powchain service refactor

* begin refactor

* powchain passes

* options pkg

* gaz

* rev

* rev

* comments

* move to right place

* bazel powchain

* fix test

* log

* contract addr

* happy path and comments

* gaz

* new service
This commit is contained in:
Raul Jordan 2021-11-04 14:19:44 -04:00 committed by GitHub
parent ae56f643eb
commit 7974fe01cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 615 additions and 494 deletions

View File

@ -94,11 +94,12 @@ func setupBeaconChain(t *testing.T, beaconDB db.Database) *Service {
DepositContainers: []*ethpb.DepositContainer{},
})
require.NoError(t, err)
web3Service, err = powchain.NewService(ctx, &powchain.Web3ServiceConfig{
BeaconDB: beaconDB,
HttpEndpoints: []string{endpoint},
DepositContract: common.Address{},
})
web3Service, err = powchain.NewService(
ctx,
powchain.WithDatabase(beaconDB),
powchain.WithHttpEndpoints([]string{endpoint}),
powchain.WithDepositContractAddress(common.Address{}),
)
require.NoError(t, err, "Unable to set up web3 service")
attService, err := attestations.NewService(ctx, &attestations.Config{Pool: attestations.NewPool()})

View File

@ -61,6 +61,14 @@ const testSkipPowFlag = "test-skip-pow"
// 128MB max message size when enabling debug endpoints.
const debugGrpcMaxMsgSize = 1 << 27
// Used as a struct to keep cli flag options for configuring services
// for the beacon node. We keep this as a separate struct to not pollute the actual BeaconNode
// struct, as it is merely used to pass down configuration options into the appropriate services.
type serviceFlagOpts struct {
blockchainFlagOpts []blockchain.Option
powchainFlagOpts []powchain.Option
}
// BeaconNode defines a struct that handles the services running a random beacon chain
// full PoS node. It handles the lifecycle of the entire system and registers
// services to a service registry.
@ -86,7 +94,7 @@ type BeaconNode struct {
collector *bcnodeCollector
slasherBlockHeadersFeed *event.Feed
slasherAttestationsFeed *event.Feed
blockchainFlagOpts []blockchain.Option
serviceFlagOpts *serviceFlagOpts
}
// New creates a new node instance, sets up configuration options, and registers
@ -127,6 +135,7 @@ func New(cliCtx *cli.Context, opts ...Option) (*BeaconNode, error) {
syncCommitteePool: synccommittee.NewPool(),
slasherBlockHeadersFeed: new(event.Feed),
slasherAttestationsFeed: new(event.Feed),
serviceFlagOpts: &serviceFlagOpts{},
}
for _, opt := range opts {
@ -135,7 +144,7 @@ func New(cliCtx *cli.Context, opts ...Option) (*BeaconNode, error) {
}
}
depositAddress, err := registration.DepositContractAddress()
depositAddress, err := powchain.DepositContractAddress()
if err != nil {
return nil, err
}
@ -488,7 +497,7 @@ func (b *BeaconNode) registerBlockchainService() error {
// skipcq: CRT-D0001
opts := append(
b.blockchainFlagOpts,
b.serviceFlagOpts.blockchainFlagOpts,
blockchain.WithDatabase(b.db),
blockchain.WithDepositCache(b.depositCache),
blockchain.WithChainStartFetcher(web3Service),
@ -513,29 +522,25 @@ func (b *BeaconNode) registerPOWChainService() error {
if b.cliCtx.Bool(testSkipPowFlag) {
return b.services.RegisterService(&powchain.Service{})
}
depAddress, endpoints, err := registration.PowchainPreregistration(b.cliCtx)
if err != nil {
return err
}
bs, err := powchain.NewPowchainCollector(b.ctx)
if err != nil {
return err
}
cfg := &powchain.Web3ServiceConfig{
HttpEndpoints: endpoints,
DepositContract: common.HexToAddress(depAddress),
BeaconDB: b.db,
DepositCache: b.depositCache,
StateNotifier: b,
StateGen: b.stateGen,
Eth1HeaderReqLimit: b.cliCtx.Uint64(flags.Eth1HeaderReqLimit.Name),
BeaconNodeStatsUpdater: bs,
depositContractAddr, err := powchain.DepositContractAddress()
if err != nil {
return err
}
web3Service, err := powchain.NewService(b.ctx, cfg)
// skipcq: CRT-D0001
opts := append(
b.serviceFlagOpts.powchainFlagOpts,
powchain.WithDepositContractAddress(common.HexToAddress(depositContractAddr)),
powchain.WithDatabase(b.db),
powchain.WithDepositCache(b.depositCache),
powchain.WithStateNotifier(b),
powchain.WithStateGen(b.stateGen),
powchain.WithBeaconNodeStatsUpdater(bs),
)
web3Service, err := powchain.NewService(b.ctx, opts...)
if err != nil {
return errors.Wrap(err, "could not register proof-of-work chain web3Service")
}

View File

@ -1,6 +1,9 @@
package node
import "github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
import (
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/beacon-chain/powchain"
)
// Option for beacon node configuration.
type Option func(bn *BeaconNode) error
@ -8,7 +11,15 @@ type Option func(bn *BeaconNode) error
// WithBlockchainFlagOptions includes functional options for the blockchain service related to CLI flags.
func WithBlockchainFlagOptions(opts []blockchain.Option) Option {
return func(bn *BeaconNode) error {
bn.blockchainFlagOpts = opts
bn.serviceFlagOpts.blockchainFlagOpts = opts
return nil
}
}
// WithPowchainFlagOptions includes functional options for the powchain service related to CLI flags.
func WithPowchainFlagOptions(opts []powchain.Option) Option {
return func(bn *BeaconNode) error {
bn.serviceFlagOpts.powchainFlagOpts = opts
return nil
}
}

View File

@ -5,15 +5,12 @@ go_library(
srcs = [
"log.go",
"p2p.go",
"powchain.go",
],
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/node/registration",
visibility = ["//beacon-chain/node:__subpackages__"],
deps = [
"//cmd:go_default_library",
"//cmd/beacon-chain/flags:go_default_library",
"//config/params:go_default_library",
"@com_github_ethereum_go_ethereum//common:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@com_github_urfave_cli_v2//:go_default_library",
"@in_gopkg_yaml_v2//:go_default_library",
@ -22,18 +19,13 @@ go_library(
go_test(
name = "go_default_test",
srcs = [
"p2p_test.go",
"powchain_test.go",
],
srcs = ["p2p_test.go"],
embed = [":go_default_library"],
deps = [
"//cmd:go_default_library",
"//cmd/beacon-chain/flags:go_default_library",
"//config/params:go_default_library",
"//testing/assert:go_default_library",
"//testing/require:go_default_library",
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
"@com_github_urfave_cli_v2//:go_default_library",
],
)

View File

@ -1,45 +0,0 @@
package registration
import (
"errors"
"github.com/ethereum/go-ethereum/common"
"github.com/prysmaticlabs/prysm/cmd/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/config/params"
"github.com/urfave/cli/v2"
)
// PowchainPreregistration prepares data for powchain.Service's registration.
func PowchainPreregistration(cliCtx *cli.Context) (depositContractAddress string, endpoints []string, err error) {
depositContractAddress, err = DepositContractAddress()
if err != nil {
return "", nil, err
}
if cliCtx.String(flags.HTTPWeb3ProviderFlag.Name) == "" && len(cliCtx.StringSlice(flags.FallbackWeb3ProviderFlag.Name)) == 0 {
log.Error(
"No ETH1 node specified to run with the beacon node. Please consider running your own Ethereum proof-of-work node for better uptime, security, and decentralization of Ethereum. Visit https://docs.prylabs.network/docs/prysm-usage/setup-eth1 for more information.",
)
log.Error(
"You will need to specify --http-web3provider and/or --fallback-web3provider to attach an eth1 node to the prysm node. Without an eth1 node block proposals for your validator will be affected and the beacon node will not be able to initialize the genesis state.",
)
}
endpoints = []string{cliCtx.String(flags.HTTPWeb3ProviderFlag.Name)}
endpoints = append(endpoints, cliCtx.StringSlice(flags.FallbackWeb3ProviderFlag.Name)...)
return
}
// DepositContractAddress returns the address of the deposit contract.
func DepositContractAddress() (string, error) {
address := params.BeaconConfig().DepositContractAddress
if address == "" {
return "", errors.New("valid deposit contract is required")
}
if !common.IsHexAddress(address) {
return "", errors.New("invalid deposit contract address given: " + address)
}
return address, nil
}

View File

@ -1,71 +0,0 @@
package registration
import (
"flag"
"testing"
"github.com/prysmaticlabs/prysm/cmd/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/config/params"
"github.com/prysmaticlabs/prysm/testing/assert"
"github.com/prysmaticlabs/prysm/testing/require"
logTest "github.com/sirupsen/logrus/hooks/test"
"github.com/urfave/cli/v2"
)
func TestPowchainPreregistration(t *testing.T) {
app := cli.App{}
set := flag.NewFlagSet("test", 0)
set.String(flags.HTTPWeb3ProviderFlag.Name, "primary", "")
fallback := cli.StringSlice{}
err := fallback.Set("fallback1")
require.NoError(t, err)
err = fallback.Set("fallback2")
require.NoError(t, err)
set.Var(&fallback, flags.FallbackWeb3ProviderFlag.Name, "")
ctx := cli.NewContext(&app, set, nil)
address, endpoints, err := PowchainPreregistration(ctx)
require.NoError(t, err)
assert.Equal(t, params.BeaconConfig().DepositContractAddress, address)
assert.DeepEqual(t, []string{"primary", "fallback1", "fallback2"}, endpoints)
}
func TestPowchainPreregistration_EmptyWeb3Provider(t *testing.T) {
hook := logTest.NewGlobal()
app := cli.App{}
set := flag.NewFlagSet("test", 0)
set.String(flags.HTTPWeb3ProviderFlag.Name, "", "")
fallback := cli.StringSlice{}
set.Var(&fallback, flags.FallbackWeb3ProviderFlag.Name, "")
ctx := cli.NewContext(&app, set, nil)
_, _, err := PowchainPreregistration(ctx)
require.NoError(t, err)
assert.LogsContain(t, hook, "No ETH1 node specified to run with the beacon node")
}
func TestDepositContractAddress_Ok(t *testing.T) {
address, err := DepositContractAddress()
require.NoError(t, err)
assert.Equal(t, params.BeaconConfig().DepositContractAddress, address)
}
func TestDepositContractAddress_EmptyAddress(t *testing.T) {
params.SetupTestConfigCleanup(t)
config := params.BeaconConfig()
config.DepositContractAddress = ""
params.OverrideBeaconConfig(config)
_, err := DepositContractAddress()
assert.ErrorContains(t, "valid deposit contract is required", err)
}
func TestDepositContractAddress_NotHexAddress(t *testing.T) {
params.SetupTestConfigCleanup(t)
config := params.BeaconConfig()
config.DepositContractAddress = "abc?!"
params.OverrideBeaconConfig(config)
_, err := DepositContractAddress()
assert.ErrorContains(t, "invalid deposit contract address given", err)
}

View File

@ -8,6 +8,7 @@ go_library(
"deposit.go",
"log.go",
"log_processing.go",
"options.go",
"prometheus.go",
"provider.go",
"service.go",
@ -15,6 +16,7 @@ go_library(
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/powchain",
visibility = [
"//beacon-chain:__subpackages__",
"//cmd/beacon-chain:__subpackages__",
"//contracts:__subpackages__",
],
deps = [

View File

@ -22,7 +22,7 @@ var endpoint = "http://127.0.0.1"
func setDefaultMocks(service *Service) *Service {
service.eth1DataFetcher = &goodFetcher{}
service.httpLogger = &goodLogger{}
service.cfg.StateNotifier = &goodNotifier{}
service.cfg.stateNotifier = &goodNotifier{}
return service
}
@ -31,11 +31,11 @@ func TestLatestMainchainInfo_OK(t *testing.T) {
require.NoError(t, err, "Unable to set up simulated backend")
beaconDB := dbutil.SetupDB(t)
web3Service, err := NewService(context.Background(), &Web3ServiceConfig{
HttpEndpoints: []string{endpoint},
DepositContract: testAcc.ContractAddr,
BeaconDB: beaconDB,
})
web3Service, err := NewService(context.Background(),
WithHttpEndpoints([]string{endpoint}),
WithDepositContractAddress(testAcc.ContractAddr),
WithDatabase(beaconDB),
)
require.NoError(t, err, "Unable to setup web3 ETH1.0 chain service")
web3Service = setDefaultMocks(web3Service)
@ -69,10 +69,10 @@ func TestLatestMainchainInfo_OK(t *testing.T) {
func TestBlockHashByHeight_ReturnsHash(t *testing.T) {
beaconDB := dbutil.SetupDB(t)
web3Service, err := NewService(context.Background(), &Web3ServiceConfig{
HttpEndpoints: []string{endpoint},
BeaconDB: beaconDB,
})
web3Service, err := NewService(context.Background(),
WithHttpEndpoints([]string{endpoint}),
WithDatabase(beaconDB),
)
require.NoError(t, err, "unable to setup web3 ETH1.0 chain service")
web3Service = setDefaultMocks(web3Service)
@ -96,10 +96,10 @@ func TestBlockHashByHeight_ReturnsHash(t *testing.T) {
func TestBlockHashByHeight_ReturnsError_WhenNoEth1Client(t *testing.T) {
beaconDB := dbutil.SetupDB(t)
web3Service, err := NewService(context.Background(), &Web3ServiceConfig{
HttpEndpoints: []string{endpoint},
BeaconDB: beaconDB,
})
web3Service, err := NewService(context.Background(),
WithHttpEndpoints([]string{endpoint}),
WithDatabase(beaconDB),
)
require.NoError(t, err, "unable to setup web3 ETH1.0 chain service")
web3Service = setDefaultMocks(web3Service)
@ -112,10 +112,10 @@ func TestBlockHashByHeight_ReturnsError_WhenNoEth1Client(t *testing.T) {
func TestBlockExists_ValidHash(t *testing.T) {
beaconDB := dbutil.SetupDB(t)
web3Service, err := NewService(context.Background(), &Web3ServiceConfig{
HttpEndpoints: []string{endpoint},
BeaconDB: beaconDB,
})
web3Service, err := NewService(context.Background(),
WithHttpEndpoints([]string{endpoint}),
WithDatabase(beaconDB),
)
require.NoError(t, err, "unable to setup web3 ETH1.0 chain service")
web3Service = setDefaultMocks(web3Service)
@ -143,10 +143,10 @@ func TestBlockExists_ValidHash(t *testing.T) {
func TestBlockExists_InvalidHash(t *testing.T) {
beaconDB := dbutil.SetupDB(t)
web3Service, err := NewService(context.Background(), &Web3ServiceConfig{
HttpEndpoints: []string{endpoint},
BeaconDB: beaconDB,
})
web3Service, err := NewService(context.Background(),
WithHttpEndpoints([]string{endpoint}),
WithDatabase(beaconDB),
)
require.NoError(t, err, "unable to setup web3 ETH1.0 chain service")
web3Service = setDefaultMocks(web3Service)
@ -157,10 +157,10 @@ func TestBlockExists_InvalidHash(t *testing.T) {
func TestBlockExists_UsesCachedBlockInfo(t *testing.T) {
beaconDB := dbutil.SetupDB(t)
web3Service, err := NewService(context.Background(), &Web3ServiceConfig{
HttpEndpoints: []string{endpoint},
BeaconDB: beaconDB,
})
web3Service, err := NewService(context.Background(),
WithHttpEndpoints([]string{endpoint}),
WithDatabase(beaconDB),
)
require.NoError(t, err, "unable to setup web3 ETH1.0 chain service")
// nil eth1DataFetcher would panic if cached value not used
web3Service.eth1DataFetcher = nil
@ -180,10 +180,10 @@ func TestBlockExists_UsesCachedBlockInfo(t *testing.T) {
func TestBlockExistsWithCache_UsesCachedHeaderInfo(t *testing.T) {
beaconDB := dbutil.SetupDB(t)
web3Service, err := NewService(context.Background(), &Web3ServiceConfig{
HttpEndpoints: []string{endpoint},
BeaconDB: beaconDB,
})
web3Service, err := NewService(context.Background(),
WithHttpEndpoints([]string{endpoint}),
WithDatabase(beaconDB),
)
require.NoError(t, err, "unable to setup web3 ETH1.0 chain service")
header := &gethTypes.Header{
@ -201,10 +201,10 @@ func TestBlockExistsWithCache_UsesCachedHeaderInfo(t *testing.T) {
func TestBlockExistsWithCache_HeaderNotCached(t *testing.T) {
beaconDB := dbutil.SetupDB(t)
web3Service, err := NewService(context.Background(), &Web3ServiceConfig{
HttpEndpoints: []string{endpoint},
BeaconDB: beaconDB,
})
web3Service, err := NewService(context.Background(),
WithHttpEndpoints([]string{endpoint}),
WithDatabase(beaconDB),
)
require.NoError(t, err, "unable to setup web3 ETH1.0 chain service")
exists, height, err := web3Service.BlockExistsWithCache(context.Background(), common.BytesToHash([]byte("hash")))
@ -217,10 +217,10 @@ func TestService_BlockNumberByTimestamp(t *testing.T) {
beaconDB := dbutil.SetupDB(t)
testAcc, err := contracts.Setup()
require.NoError(t, err, "Unable to set up simulated backend")
web3Service, err := NewService(context.Background(), &Web3ServiceConfig{
HttpEndpoints: []string{endpoint},
BeaconDB: beaconDB,
})
web3Service, err := NewService(context.Background(),
WithHttpEndpoints([]string{endpoint}),
WithDatabase(beaconDB),
)
require.NoError(t, err)
web3Service = setDefaultMocks(web3Service)
web3Service.eth1DataFetcher = &goodFetcher{backend: testAcc.Backend}
@ -244,10 +244,10 @@ func TestService_BlockNumberByTimestampLessTargetTime(t *testing.T) {
beaconDB := dbutil.SetupDB(t)
testAcc, err := contracts.Setup()
require.NoError(t, err, "Unable to set up simulated backend")
web3Service, err := NewService(context.Background(), &Web3ServiceConfig{
HttpEndpoints: []string{endpoint},
BeaconDB: beaconDB,
})
web3Service, err := NewService(context.Background(),
WithHttpEndpoints([]string{endpoint}),
WithDatabase(beaconDB),
)
require.NoError(t, err)
web3Service = setDefaultMocks(web3Service)
web3Service.eth1DataFetcher = &goodFetcher{backend: testAcc.Backend}
@ -277,10 +277,10 @@ func TestService_BlockNumberByTimestampMoreTargetTime(t *testing.T) {
beaconDB := dbutil.SetupDB(t)
testAcc, err := contracts.Setup()
require.NoError(t, err, "Unable to set up simulated backend")
web3Service, err := NewService(context.Background(), &Web3ServiceConfig{
HttpEndpoints: []string{endpoint},
BeaconDB: beaconDB,
})
web3Service, err := NewService(context.Background(),
WithHttpEndpoints([]string{endpoint}),
WithDatabase(beaconDB),
)
require.NoError(t, err)
web3Service = setDefaultMocks(web3Service)
web3Service.eth1DataFetcher = &goodFetcher{backend: testAcc.Backend}
@ -308,10 +308,10 @@ func TestService_BlockNumberByTimestampMoreTargetTime(t *testing.T) {
func TestService_BlockTimeByHeight_ReturnsError_WhenNoEth1Client(t *testing.T) {
beaconDB := dbutil.SetupDB(t)
web3Service, err := NewService(context.Background(), &Web3ServiceConfig{
HttpEndpoints: []string{endpoint},
BeaconDB: beaconDB,
})
web3Service, err := NewService(context.Background(),
WithHttpEndpoints([]string{endpoint}),
WithDatabase(beaconDB),
)
require.NoError(t, err, "unable to setup web3 ETH1.0 chain service")
web3Service = setDefaultMocks(web3Service)

View File

@ -3,11 +3,26 @@ package powchain
import (
"context"
"github.com/ethereum/go-ethereum/common"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/beacon-chain/core/blocks"
"github.com/prysmaticlabs/prysm/config/params"
ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
)
// DepositContractAddress returns the deposit contract address for the given chain.
func DepositContractAddress() (string, error) {
address := params.BeaconConfig().DepositContractAddress
if address == "" {
return "", errors.New("valid deposit contract is required")
}
if !common.IsHexAddress(address) {
return "", errors.New("invalid deposit contract address given: " + address)
}
return address, nil
}
func (s *Service) processDeposit(ctx context.Context, eth1Data *ethpb.Eth1Data, deposit *ethpb.Deposit) error {
var err error
if err := s.preGenesisState.SetEth1Data(eth1Data); err != nil {

View File

@ -22,12 +22,39 @@ import (
const pubKeyErr = "could not convert bytes to public key"
func TestDepositContractAddress_EmptyAddress(t *testing.T) {
params.SetupTestConfigCleanup(t)
config := params.BeaconConfig()
config.DepositContractAddress = ""
params.OverrideBeaconConfig(config)
_, err := DepositContractAddress()
assert.ErrorContains(t, "valid deposit contract is required", err)
}
func TestDepositContractAddress_NotHexAddress(t *testing.T) {
params.SetupTestConfigCleanup(t)
config := params.BeaconConfig()
config.DepositContractAddress = "abc?!"
params.OverrideBeaconConfig(config)
_, err := DepositContractAddress()
assert.ErrorContains(t, "invalid deposit contract address given", err)
}
func TestDepositContractAddress_OK(t *testing.T) {
params.SetupTestConfigCleanup(t)
addr, err := DepositContractAddress()
require.NoError(t, err)
assert.Equal(t, params.BeaconConfig().DepositContractAddress, addr)
}
func TestProcessDeposit_OK(t *testing.T) {
beaconDB := testDB.SetupDB(t)
web3Service, err := NewService(context.Background(), &Web3ServiceConfig{
HttpEndpoints: []string{endpoint},
BeaconDB: beaconDB,
})
web3Service, err := NewService(context.Background(),
WithHttpEndpoints([]string{endpoint}),
WithDatabase(beaconDB),
)
require.NoError(t, err, "Unable to setup web3 ETH1.0 chain service")
web3Service = setDefaultMocks(web3Service)
@ -48,10 +75,10 @@ func TestProcessDeposit_OK(t *testing.T) {
func TestProcessDeposit_InvalidMerkleBranch(t *testing.T) {
beaconDB := testDB.SetupDB(t)
web3Service, err := NewService(context.Background(), &Web3ServiceConfig{
HttpEndpoints: []string{endpoint},
BeaconDB: beaconDB,
})
web3Service, err := NewService(context.Background(),
WithHttpEndpoints([]string{endpoint}),
WithDatabase(beaconDB),
)
require.NoError(t, err, "unable to setup web3 ETH1.0 chain service")
web3Service = setDefaultMocks(web3Service)
@ -74,10 +101,10 @@ func TestProcessDeposit_InvalidMerkleBranch(t *testing.T) {
func TestProcessDeposit_InvalidPublicKey(t *testing.T) {
hook := logTest.NewGlobal()
beaconDB := testDB.SetupDB(t)
web3Service, err := NewService(context.Background(), &Web3ServiceConfig{
HttpEndpoints: []string{endpoint},
BeaconDB: beaconDB,
})
web3Service, err := NewService(context.Background(),
WithHttpEndpoints([]string{endpoint}),
WithDatabase(beaconDB),
)
require.NoError(t, err, "unable to setup web3 ETH1.0 chain service")
web3Service = setDefaultMocks(web3Service)
@ -110,10 +137,10 @@ func TestProcessDeposit_InvalidPublicKey(t *testing.T) {
func TestProcessDeposit_InvalidSignature(t *testing.T) {
hook := logTest.NewGlobal()
beaconDB := testDB.SetupDB(t)
web3Service, err := NewService(context.Background(), &Web3ServiceConfig{
HttpEndpoints: []string{endpoint},
BeaconDB: beaconDB,
})
web3Service, err := NewService(context.Background(),
WithHttpEndpoints([]string{endpoint}),
WithDatabase(beaconDB),
)
require.NoError(t, err, "unable to setup web3 ETH1.0 chain service")
web3Service = setDefaultMocks(web3Service)
@ -145,10 +172,10 @@ func TestProcessDeposit_InvalidSignature(t *testing.T) {
func TestProcessDeposit_UnableToVerify(t *testing.T) {
hook := logTest.NewGlobal()
beaconDB := testDB.SetupDB(t)
web3Service, err := NewService(context.Background(), &Web3ServiceConfig{
HttpEndpoints: []string{endpoint},
BeaconDB: beaconDB,
})
web3Service, err := NewService(context.Background(),
WithHttpEndpoints([]string{endpoint}),
WithDatabase(beaconDB),
)
require.NoError(t, err, "unable to setup web3 ETH1.0 chain service")
web3Service = setDefaultMocks(web3Service)
@ -177,10 +204,10 @@ func TestProcessDeposit_UnableToVerify(t *testing.T) {
func TestProcessDeposit_IncompleteDeposit(t *testing.T) {
beaconDB := testDB.SetupDB(t)
web3Service, err := NewService(context.Background(), &Web3ServiceConfig{
HttpEndpoints: []string{endpoint},
BeaconDB: beaconDB,
})
web3Service, err := NewService(context.Background(),
WithHttpEndpoints([]string{endpoint}),
WithDatabase(beaconDB),
)
require.NoError(t, err, "unable to setup web3 ETH1.0 chain service")
web3Service = setDefaultMocks(web3Service)
require.NoError(t, web3Service.preGenesisState.SetValidators([]*ethpb.Validator{}))
@ -239,10 +266,10 @@ func TestProcessDeposit_IncompleteDeposit(t *testing.T) {
func TestProcessDeposit_AllDepositedSuccessfully(t *testing.T) {
beaconDB := testDB.SetupDB(t)
web3Service, err := NewService(context.Background(), &Web3ServiceConfig{
HttpEndpoints: []string{endpoint},
BeaconDB: beaconDB,
})
web3Service, err := NewService(context.Background(),
WithHttpEndpoints([]string{endpoint}),
WithDatabase(beaconDB),
)
require.NoError(t, err, "unable to setup web3 ETH1.0 chain service")
web3Service = setDefaultMocks(web3Service)

View File

@ -52,7 +52,7 @@ func (s *Service) Eth2GenesisPowchainInfo() (uint64, *big.Int) {
func (s *Service) ProcessETH1Block(ctx context.Context, blkNum *big.Int) error {
query := ethereum.FilterQuery{
Addresses: []common.Address{
s.cfg.DepositContract,
s.cfg.depositContractAddr,
},
FromBlock: blkNum,
ToBlock: blkNum,
@ -155,7 +155,7 @@ func (s *Service) ProcessDepositLog(ctx context.Context, depositLog gethTypes.Lo
}
// We always store all historical deposits in the DB.
err = s.cfg.DepositCache.InsertDeposit(ctx, deposit, depositLog.BlockNumber, index, s.depositTrie.HashTreeRoot())
err = s.cfg.depositCache.InsertDeposit(ctx, deposit, depositLog.BlockNumber, index, s.depositTrie.HashTreeRoot())
if err != nil {
return errors.Wrap(err, "unable to insert deposit into cache")
}
@ -172,7 +172,7 @@ func (s *Service) ProcessDepositLog(ctx context.Context, depositLog gethTypes.Lo
validData = false
}
} else {
s.cfg.DepositCache.InsertPendingDeposit(ctx, deposit, depositLog.BlockNumber, index, s.depositTrie.HashTreeRoot())
s.cfg.depositCache.InsertPendingDeposit(ctx, deposit, depositLog.BlockNumber, index, s.depositTrie.HashTreeRoot())
}
if validData {
log.WithFields(logrus.Fields{
@ -231,7 +231,7 @@ func (s *Service) ProcessChainStart(genesisTime uint64, eth1BlockHash [32]byte,
log.WithFields(logrus.Fields{
"ChainStartTime": chainStartTime,
}).Info("Minimum number of validators reached for beacon-chain to start")
s.cfg.StateNotifier.StateFeed().Send(&feed.Event{
s.cfg.stateNotifier.StateFeed().Send(&feed.Event{
Type: statefeed.ChainStarted,
Data: &statefeed.ChainStartedData{
StartTime: chainStartTime,
@ -288,7 +288,7 @@ func (s *Service) processPastLogs(ctx context.Context) error {
return err
}
batchSize := s.cfg.Eth1HeaderReqLimit
batchSize := s.cfg.eth1HeaderReqLimit
additiveFactor := uint64(float64(batchSize) * additiveFactorMultiplier)
for currentBlockNum < latestFollowHeight {
@ -301,7 +301,7 @@ func (s *Service) processPastLogs(ctx context.Context) error {
}
query := ethereum.FilterQuery{
Addresses: []common.Address{
s.cfg.DepositContract,
s.cfg.depositContractAddr,
},
FromBlock: big.NewInt(int64(start)),
ToBlock: big.NewInt(int64(end)),
@ -354,18 +354,18 @@ func (s *Service) processPastLogs(ctx context.Context) error {
}
currentBlockNum = end
if batchSize < s.cfg.Eth1HeaderReqLimit {
if batchSize < s.cfg.eth1HeaderReqLimit {
// update the batchSize with additive increase
batchSize += additiveFactor
if batchSize > s.cfg.Eth1HeaderReqLimit {
batchSize = s.cfg.Eth1HeaderReqLimit
if batchSize > s.cfg.eth1HeaderReqLimit {
batchSize = s.cfg.eth1HeaderReqLimit
}
}
}
s.latestEth1Data.LastRequestedBlock = currentBlockNum
c, err := s.cfg.BeaconDB.FinalizedCheckpoint(ctx)
c, err := s.cfg.beaconDB.FinalizedCheckpoint(ctx)
if err != nil {
return err
}
@ -374,12 +374,12 @@ func (s *Service) processPastLogs(ctx context.Context) error {
if fRoot == params.BeaconConfig().ZeroHash {
return nil
}
fState, err := s.cfg.StateGen.StateByRoot(ctx, fRoot)
fState, err := s.cfg.stateGen.StateByRoot(ctx, fRoot)
if err != nil {
return err
}
if fState != nil && !fState.IsNil() && fState.Eth1DepositIndex() > 0 {
s.cfg.DepositCache.PrunePendingDeposits(ctx, int64(fState.Eth1DepositIndex()))
s.cfg.depositCache.PrunePendingDeposits(ctx, int64(fState.Eth1DepositIndex()))
}
return nil
}
@ -500,7 +500,7 @@ func (s *Service) savePowchainData(ctx context.Context) error {
ChainstartData: s.chainStartData,
BeaconState: pbState, // I promise not to mutate it!
Trie: s.depositTrie.ToProto(),
DepositContainers: s.cfg.DepositCache.AllDepositContainers(ctx),
DepositContainers: s.cfg.depositCache.AllDepositContainers(ctx),
}
return s.cfg.BeaconDB.SavePowchainData(ctx, eth1Data)
return s.cfg.beaconDB.SavePowchainData(ctx, eth1Data)
}

View File

@ -33,12 +33,12 @@ func TestProcessDepositLog_OK(t *testing.T) {
depositCache, err := depositcache.New()
require.NoError(t, err)
web3Service, err := NewService(context.Background(), &Web3ServiceConfig{
HttpEndpoints: []string{endpoint},
DepositContract: testAcc.ContractAddr,
BeaconDB: beaconDB,
DepositCache: depositCache,
})
web3Service, err := NewService(context.Background(),
WithHttpEndpoints([]string{endpoint}),
WithDepositContractAddress(testAcc.ContractAddr),
WithDatabase(beaconDB),
WithDepositCache(depositCache),
)
require.NoError(t, err, "unable to setup web3 ETH1.0 chain service")
web3Service = setDefaultMocks(web3Service)
web3Service.depositContractCaller, err = contracts.NewDepositContractCaller(testAcc.ContractAddr, testAcc.Backend)
@ -62,7 +62,7 @@ func TestProcessDepositLog_OK(t *testing.T) {
query := ethereum.FilterQuery{
Addresses: []common.Address{
web3Service.cfg.DepositContract,
web3Service.cfg.depositContractAddr,
},
}
@ -97,12 +97,12 @@ func TestProcessDepositLog_InsertsPendingDeposit(t *testing.T) {
depositCache, err := depositcache.New()
require.NoError(t, err)
web3Service, err := NewService(context.Background(), &Web3ServiceConfig{
HttpEndpoints: []string{endpoint},
DepositContract: testAcc.ContractAddr,
BeaconDB: beaconDB,
DepositCache: depositCache,
})
web3Service, err := NewService(context.Background(),
WithHttpEndpoints([]string{endpoint}),
WithDepositContractAddress(testAcc.ContractAddr),
WithDatabase(beaconDB),
WithDepositCache(depositCache),
)
require.NoError(t, err, "unable to setup web3 ETH1.0 chain service")
web3Service = setDefaultMocks(web3Service)
web3Service.depositContractCaller, err = contracts.NewDepositContractCaller(testAcc.ContractAddr, testAcc.Backend)
@ -129,7 +129,7 @@ func TestProcessDepositLog_InsertsPendingDeposit(t *testing.T) {
query := ethereum.FilterQuery{
Addresses: []common.Address{
web3Service.cfg.DepositContract,
web3Service.cfg.depositContractAddr,
},
}
@ -143,7 +143,7 @@ func TestProcessDepositLog_InsertsPendingDeposit(t *testing.T) {
err = web3Service.ProcessDepositLog(context.Background(), logs[1])
require.NoError(t, err)
pendingDeposits := web3Service.cfg.DepositCache.PendingDeposits(context.Background(), nil /*blockNum*/)
pendingDeposits := web3Service.cfg.depositCache.PendingDeposits(context.Background(), nil /*blockNum*/)
require.Equal(t, 2, len(pendingDeposits), "Unexpected number of deposits")
hook.Reset()
@ -153,11 +153,11 @@ func TestUnpackDepositLogData_OK(t *testing.T) {
testAcc, err := contracts.Setup()
require.NoError(t, err, "Unable to set up simulated backend")
beaconDB := testDB.SetupDB(t)
web3Service, err := NewService(context.Background(), &Web3ServiceConfig{
HttpEndpoints: []string{endpoint},
BeaconDB: beaconDB,
DepositContract: testAcc.ContractAddr,
})
web3Service, err := NewService(context.Background(),
WithHttpEndpoints([]string{endpoint}),
WithDepositContractAddress(testAcc.ContractAddr),
WithDatabase(beaconDB),
)
require.NoError(t, err, "unable to setup web3 ETH1.0 chain service")
web3Service = setDefaultMocks(web3Service)
web3Service.depositContractCaller, err = contracts.NewDepositContractCaller(testAcc.ContractAddr, testAcc.Backend)
@ -179,7 +179,7 @@ func TestUnpackDepositLogData_OK(t *testing.T) {
query := ethereum.FilterQuery{
Addresses: []common.Address{
web3Service.cfg.DepositContract,
web3Service.cfg.depositContractAddr,
},
}
@ -203,12 +203,12 @@ func TestProcessETH2GenesisLog_8DuplicatePubkeys(t *testing.T) {
depositCache, err := depositcache.New()
require.NoError(t, err)
web3Service, err := NewService(context.Background(), &Web3ServiceConfig{
HttpEndpoints: []string{endpoint},
DepositContract: testAcc.ContractAddr,
BeaconDB: beaconDB,
DepositCache: depositCache,
})
web3Service, err := NewService(context.Background(),
WithHttpEndpoints([]string{endpoint}),
WithDepositContractAddress(testAcc.ContractAddr),
WithDatabase(beaconDB),
WithDepositCache(depositCache),
)
require.NoError(t, err, "unable to setup web3 ETH1.0 chain service")
web3Service = setDefaultMocks(web3Service)
web3Service.depositContractCaller, err = contracts.NewDepositContractCaller(testAcc.ContractAddr, testAcc.Backend)
@ -244,7 +244,7 @@ func TestProcessETH2GenesisLog_8DuplicatePubkeys(t *testing.T) {
query := ethereum.FilterQuery{
Addresses: []common.Address{
web3Service.cfg.DepositContract,
web3Service.cfg.depositContractAddr,
},
}
@ -273,12 +273,12 @@ func TestProcessETH2GenesisLog(t *testing.T) {
depositCache, err := depositcache.New()
require.NoError(t, err)
web3Service, err := NewService(context.Background(), &Web3ServiceConfig{
HttpEndpoints: []string{endpoint},
DepositContract: testAcc.ContractAddr,
BeaconDB: beaconDB,
DepositCache: depositCache,
})
web3Service, err := NewService(context.Background(),
WithHttpEndpoints([]string{endpoint}),
WithDepositContractAddress(testAcc.ContractAddr),
WithDatabase(beaconDB),
WithDepositCache(depositCache),
)
require.NoError(t, err, "unable to setup web3 ETH1.0 chain service")
web3Service = setDefaultMocks(web3Service)
web3Service.depositContractCaller, err = contracts.NewDepositContractCaller(testAcc.ContractAddr, testAcc.Backend)
@ -311,7 +311,7 @@ func TestProcessETH2GenesisLog(t *testing.T) {
query := ethereum.FilterQuery{
Addresses: []common.Address{
web3Service.cfg.DepositContract,
web3Service.cfg.depositContractAddr,
},
}
@ -321,7 +321,7 @@ func TestProcessETH2GenesisLog(t *testing.T) {
// Set up our subscriber now to listen for the chain started event.
stateChannel := make(chan *feed.Event, 1)
stateSub := web3Service.cfg.StateNotifier.StateFeed().Subscribe(stateChannel)
stateSub := web3Service.cfg.stateNotifier.StateFeed().Subscribe(stateChannel)
defer stateSub.Unsubscribe()
for _, log := range logs {
@ -359,12 +359,12 @@ func TestProcessETH2GenesisLog_CorrectNumOfDeposits(t *testing.T) {
depositCache, err := depositcache.New()
require.NoError(t, err)
web3Service, err := NewService(context.Background(), &Web3ServiceConfig{
HttpEndpoints: []string{endpoint},
DepositContract: testAcc.ContractAddr,
BeaconDB: kvStore,
DepositCache: depositCache,
})
web3Service, err := NewService(context.Background(),
WithHttpEndpoints([]string{endpoint}),
WithDepositContractAddress(testAcc.ContractAddr),
WithDatabase(kvStore),
WithDepositCache(depositCache),
)
require.NoError(t, err, "unable to setup web3 ETH1.0 chain service")
web3Service = setDefaultMocks(web3Service)
web3Service.depositContractCaller, err = contracts.NewDepositContractCaller(testAcc.ContractAddr, testAcc.Backend)
@ -418,7 +418,7 @@ func TestProcessETH2GenesisLog_CorrectNumOfDeposits(t *testing.T) {
// Set up our subscriber now to listen for the chain started event.
stateChannel := make(chan *feed.Event, 1)
stateSub := web3Service.cfg.StateNotifier.StateFeed().Subscribe(stateChannel)
stateSub := web3Service.cfg.stateNotifier.StateFeed().Subscribe(stateChannel)
defer stateSub.Unsubscribe()
err = web3Service.processPastLogs(context.Background())
@ -452,12 +452,12 @@ func TestProcessETH2GenesisLog_LargePeriodOfNoLogs(t *testing.T) {
depositCache, err := depositcache.New()
require.NoError(t, err)
web3Service, err := NewService(context.Background(), &Web3ServiceConfig{
HttpEndpoints: []string{endpoint},
DepositContract: testAcc.ContractAddr,
BeaconDB: kvStore,
DepositCache: depositCache,
})
web3Service, err := NewService(context.Background(),
WithHttpEndpoints([]string{endpoint}),
WithDepositContractAddress(testAcc.ContractAddr),
WithDatabase(kvStore),
WithDepositCache(depositCache),
)
require.NoError(t, err, "unable to setup web3 ETH1.0 chain service")
web3Service = setDefaultMocks(web3Service)
web3Service.depositContractCaller, err = contracts.NewDepositContractCaller(testAcc.ContractAddr, testAcc.Backend)
@ -522,7 +522,7 @@ func TestProcessETH2GenesisLog_LargePeriodOfNoLogs(t *testing.T) {
// Set up our subscriber now to listen for the chain started event.
stateChannel := make(chan *feed.Event, 1)
stateSub := web3Service.cfg.StateNotifier.StateFeed().Subscribe(stateChannel)
stateSub := web3Service.cfg.stateNotifier.StateFeed().Subscribe(stateChannel)
defer stateSub.Unsubscribe()
err = web3Service.processPastLogs(context.Background())
@ -560,13 +560,12 @@ func TestCheckForChainstart_NoValidator(t *testing.T) {
func newPowchainService(t *testing.T, eth1Backend *contracts.TestAccount, beaconDB db.Database) *Service {
depositCache, err := depositcache.New()
require.NoError(t, err)
web3Service, err := NewService(context.Background(), &Web3ServiceConfig{
HttpEndpoints: []string{endpoint},
DepositContract: eth1Backend.ContractAddr,
BeaconDB: beaconDB,
DepositCache: depositCache,
})
web3Service, err := NewService(context.Background(),
WithHttpEndpoints([]string{endpoint}),
WithDepositContractAddress(eth1Backend.ContractAddr),
WithDatabase(beaconDB),
WithDepositCache(depositCache),
)
require.NoError(t, err, "unable to setup web3 ETH1.0 chain service")
web3Service = setDefaultMocks(web3Service)
web3Service.depositContractCaller, err = contracts.NewDepositContractCaller(eth1Backend.ContractAddr, eth1Backend.Backend)

View File

@ -0,0 +1,88 @@
package powchain
import (
"github.com/ethereum/go-ethereum/common"
"github.com/prysmaticlabs/prysm/beacon-chain/cache/depositcache"
statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
"github.com/prysmaticlabs/prysm/beacon-chain/state/stategen"
"github.com/prysmaticlabs/prysm/network"
)
type Option func(s *Service) error
// WithHttpEndpoints deduplicates and parses http endpoints for the powchain service to use,
// and sets the "current" endpoint that will be used first.
func WithHttpEndpoints(endpointStrings []string) Option {
return func(s *Service) error {
stringEndpoints := dedupEndpoints(endpointStrings)
endpoints := make([]network.Endpoint, len(stringEndpoints))
for i, e := range stringEndpoints {
endpoints[i] = HttpEndpoint(e)
}
// Select first http endpoint in the provided list.
var currEndpoint network.Endpoint
if len(endpointStrings) > 0 {
currEndpoint = endpoints[0]
}
s.cfg.httpEndpoints = endpoints
s.cfg.currHttpEndpoint = currEndpoint
return nil
}
}
// WithDepositContractAddress for the deposit contract.
func WithDepositContractAddress(addr common.Address) Option {
return func(s *Service) error {
s.cfg.depositContractAddr = addr
return nil
}
}
// WithDatabase for the beacon chain database.
func WithDatabase(database db.HeadAccessDatabase) Option {
return func(s *Service) error {
s.cfg.beaconDB = database
return nil
}
}
// WithDepositCache for caching deposits.
func WithDepositCache(cache *depositcache.DepositCache) Option {
return func(s *Service) error {
s.cfg.depositCache = cache
return nil
}
}
// WithStateNotifier for subscribing to state changes.
func WithStateNotifier(notifier statefeed.Notifier) Option {
return func(s *Service) error {
s.cfg.stateNotifier = notifier
return nil
}
}
// WithStateGen to regenerate beacon states from checkpoints.
func WithStateGen(gen *stategen.State) Option {
return func(s *Service) error {
s.cfg.stateGen = gen
return nil
}
}
// WithEth1HeaderRequestLimit to set the upper limit of eth1 header requests.
func WithEth1HeaderRequestLimit(limit uint64) Option {
return func(s *Service) error {
s.cfg.eth1HeaderReqLimit = limit
return nil
}
}
// WithBeaconNodeStatsUpdater to set the beacon node stats updater.
func WithBeaconNodeStatsUpdater(updater BeaconNodeStatsUpdater) Option {
return func(s *Service) error {
s.cfg.beaconNodeStatsUpdater = updater
return nil
}
}

View File

@ -120,6 +120,19 @@ type RPCClient interface {
BatchCall(b []gethRPC.BatchElem) error
}
// config defines a config struct for dependencies into the service.
type config struct {
depositContractAddr common.Address
beaconDB db.HeadAccessDatabase
depositCache *depositcache.DepositCache
stateNotifier statefeed.Notifier
stateGen *stategen.State
eth1HeaderReqLimit uint64
beaconNodeStatsUpdater BeaconNodeStatsUpdater
httpEndpoints []network.Endpoint
currHttpEndpoint network.Endpoint
}
// Service fetches important information about the canonical
// Ethereum ETH1.0 chain via a web3 endpoint using an ethclient. The Random
// Beacon Chain requires synchronization with the ETH1.0 chain's current
@ -130,12 +143,10 @@ type Service struct {
connectedETH1 bool
isRunning bool
processingLock sync.RWMutex
cfg *Web3ServiceConfig
cfg *config
ctx context.Context
cancel context.CancelFunc
headTicker *time.Ticker
httpEndpoints []network.Endpoint
currHttpEndpoint network.Endpoint
httpLogger bind.ContractFilterer
eth1DataFetcher RPCDataFetcher
rpcClient RPCClient
@ -147,24 +158,10 @@ type Service struct {
lastReceivedMerkleIndex int64 // Keeps track of the last received index to prevent log spam.
runError error
preGenesisState state.BeaconState
bsUpdater BeaconNodeStatsUpdater
}
// Web3ServiceConfig defines a config struct for web3 service to use through its life cycle.
type Web3ServiceConfig struct {
HttpEndpoints []string
DepositContract common.Address
BeaconDB db.HeadAccessDatabase
DepositCache *depositcache.DepositCache
StateNotifier statefeed.Notifier
StateGen *stategen.State
Eth1HeaderReqLimit uint64
BeaconNodeStatsUpdater BeaconNodeStatsUpdater
}
// NewService sets up a new instance with an ethclient when
// given a web3 endpoint as a string in the config.
func NewService(ctx context.Context, config *Web3ServiceConfig) (*Service, error) {
// NewService sets up a new instance with an ethclient when given a web3 endpoint as a string in the config.
func NewService(ctx context.Context, opts ...Option) (*Service, error) {
ctx, cancel := context.WithCancel(ctx)
_ = cancel // govet fix for lost cancel. Cancel is handled in service.Stop()
depositTrie, err := trie.NewTrie(params.BeaconConfig().DepositContractTreeDepth)
@ -177,27 +174,13 @@ func NewService(ctx context.Context, config *Web3ServiceConfig) (*Service, error
return nil, errors.Wrap(err, "could not setup genesis state")
}
if config.Eth1HeaderReqLimit == 0 {
config.Eth1HeaderReqLimit = defaultEth1HeaderReqLimit
}
stringEndpoints := dedupEndpoints(config.HttpEndpoints)
endpoints := make([]network.Endpoint, len(stringEndpoints))
for i, e := range stringEndpoints {
endpoints[i] = HttpEndpoint(e)
}
// Select first http endpoint in the provided list.
var currEndpoint network.Endpoint
if len(config.HttpEndpoints) > 0 {
currEndpoint = endpoints[0]
}
s := &Service{
ctx: ctx,
cancel: cancel,
cfg: config,
httpEndpoints: endpoints,
currHttpEndpoint: currEndpoint,
ctx: ctx,
cancel: cancel,
cfg: &config{
beaconNodeStatsUpdater: &NopBeaconNodeStatsUpdater{},
eth1HeaderReqLimit: defaultEth1HeaderReqLimit,
},
latestEth1Data: &protodb.LatestETH1Data{
BlockHeight: 0,
BlockTime: 0,
@ -213,26 +196,25 @@ func NewService(ctx context.Context, config *Web3ServiceConfig) (*Service, error
lastReceivedMerkleIndex: -1,
preGenesisState: genState,
headTicker: time.NewTicker(time.Duration(params.BeaconConfig().SecondsPerETH1Block) * time.Second),
// use the nop updater by default, rely on upstream set up to pass in an appropriate impl
bsUpdater: config.BeaconNodeStatsUpdater,
}
if config.BeaconNodeStatsUpdater == nil {
s.bsUpdater = &NopBeaconNodeStatsUpdater{}
for _, opt := range opts {
if err := opt(s); err != nil {
return nil, err
}
}
if err := s.ensureValidPowchainData(ctx); err != nil {
return nil, errors.Wrap(err, "unable to validate powchain data")
}
eth1Data, err := config.BeaconDB.PowchainData(ctx)
eth1Data, err := s.cfg.beaconDB.PowchainData(ctx)
if err != nil {
return nil, errors.Wrap(err, "unable to retrieve eth1 data")
}
if err := s.initializeEth1Data(ctx, eth1Data); err != nil {
return nil, err
}
return s, nil
}
@ -240,10 +222,10 @@ func NewService(ctx context.Context, config *Web3ServiceConfig) (*Service, error
func (s *Service) Start() {
// If the chain has not started already and we don't have access to eth1 nodes, we will not be
// able to generate the genesis state.
if !s.chainStartData.Chainstarted && s.currHttpEndpoint.Url == "" {
if !s.chainStartData.Chainstarted && s.cfg.currHttpEndpoint.Url == "" {
// check for genesis state before shutting down the node,
// if a genesis state exists, we can continue on.
genState, err := s.cfg.BeaconDB.GenesisState(s.ctx)
genState, err := s.cfg.beaconDB.GenesisState(s.ctx)
if err != nil {
log.Fatal(err)
}
@ -253,7 +235,7 @@ func (s *Service) Start() {
}
// Exit early if eth1 endpoint is not set.
if s.currHttpEndpoint.Url == "" {
if s.cfg.currHttpEndpoint.Url == "" {
return
}
go func() {
@ -314,7 +296,7 @@ func (s *Service) Status() error {
func (s *Service) updateBeaconNodeStats() {
bs := clientstats.BeaconNodeStats{}
if len(s.httpEndpoints) > 1 {
if len(s.cfg.httpEndpoints) > 1 {
bs.SyncEth1FallbackConfigured = true
}
if s.IsConnectedToETH1() {
@ -324,11 +306,11 @@ func (s *Service) updateBeaconNodeStats() {
bs.SyncEth1FallbackConnected = true
}
}
s.bsUpdater.Update(bs)
s.cfg.beaconNodeStatsUpdater.Update(bs)
}
func (s *Service) updateCurrHttpEndpoint(endpoint network.Endpoint) {
s.currHttpEndpoint = endpoint
s.cfg.currHttpEndpoint = endpoint
s.updateBeaconNodeStats()
}
@ -374,7 +356,7 @@ func (s *Service) AreAllDepositsProcessed() (bool, error) {
return false, errors.Wrap(err, "could not get deposit count")
}
count := bytesutil.FromBytes8(countByte)
deposits := s.cfg.DepositCache.AllDeposits(s.ctx, nil)
deposits := s.cfg.depositCache.AllDeposits(s.ctx, nil)
if count != uint64(len(deposits)) {
return false, nil
}
@ -392,12 +374,12 @@ func (s *Service) followBlockHeight(_ context.Context) (uint64, error) {
}
func (s *Service) connectToPowChain() error {
httpClient, rpcClient, err := s.dialETH1Nodes(s.currHttpEndpoint)
httpClient, rpcClient, err := s.dialETH1Nodes(s.cfg.currHttpEndpoint)
if err != nil {
return errors.Wrap(err, "could not dial eth1 nodes")
}
depositContractCaller, err := contracts.NewDepositContractCaller(s.cfg.DepositContract, httpClient)
depositContractCaller, err := contracts.NewDepositContractCaller(s.cfg.depositContractAddr, httpClient)
if err != nil {
return errors.Wrap(err, "could not create deposit contract caller")
}
@ -493,7 +475,7 @@ func (s *Service) waitForConnection() {
s.updateConnectedETH1(true)
s.runError = nil
log.WithFields(logrus.Fields{
"endpoint": logs.MaskCredentialsLogging(s.currHttpEndpoint.Url),
"endpoint": logs.MaskCredentialsLogging(s.cfg.currHttpEndpoint.Url),
}).Info("Connected to eth1 proof-of-work chain")
return
}
@ -522,7 +504,7 @@ func (s *Service) waitForConnection() {
for {
select {
case <-ticker.C:
log.Debugf("Trying to dial endpoint: %s", logs.MaskCredentialsLogging(s.currHttpEndpoint.Url))
log.Debugf("Trying to dial endpoint: %s", logs.MaskCredentialsLogging(s.cfg.currHttpEndpoint.Url))
errConnect := s.connectToPowChain()
if errConnect != nil {
errorLogger(errConnect, "Could not connect to powchain endpoint")
@ -541,7 +523,7 @@ func (s *Service) waitForConnection() {
s.updateConnectedETH1(true)
s.runError = nil
log.WithFields(logrus.Fields{
"endpoint": logs.MaskCredentialsLogging(s.currHttpEndpoint.Url),
"endpoint": logs.MaskCredentialsLogging(s.cfg.currHttpEndpoint.Url),
}).Info("Connected to eth1 proof-of-work chain")
return
}
@ -587,27 +569,27 @@ func (s *Service) initDepositCaches(ctx context.Context, ctrs []*protodb.Deposit
if len(ctrs) == 0 {
return nil
}
s.cfg.DepositCache.InsertDepositContainers(ctx, ctrs)
s.cfg.depositCache.InsertDepositContainers(ctx, ctrs)
if !s.chainStartData.Chainstarted {
// do not add to pending cache
// if no genesis state exists.
validDepositsCount.Add(float64(s.preGenesisState.Eth1DepositIndex()))
return nil
}
genesisState, err := s.cfg.BeaconDB.GenesisState(ctx)
genesisState, err := s.cfg.beaconDB.GenesisState(ctx)
if err != nil {
return err
}
// Default to all deposits post-genesis deposits in
// the event we cannot find a finalized state.
currIndex := genesisState.Eth1DepositIndex()
chkPt, err := s.cfg.BeaconDB.FinalizedCheckpoint(ctx)
chkPt, err := s.cfg.beaconDB.FinalizedCheckpoint(ctx)
if err != nil {
return err
}
rt := bytesutil.ToBytes32(chkPt.Root)
if rt != [32]byte{} {
fState, err := s.cfg.StateGen.StateByRoot(ctx, rt)
fState, err := s.cfg.stateGen.StateByRoot(ctx, rt)
if err != nil {
return errors.Wrap(err, "could not get finalized state")
}
@ -621,9 +603,9 @@ func (s *Service) initDepositCaches(ctx context.Context, ctrs []*protodb.Deposit
// accumulates. we finalize them here before we are ready to receive a block.
// Otherwise, the first few blocks will be slower to compute as we will
// hold the lock and be busy finalizing the deposits.
s.cfg.DepositCache.InsertFinalizedDeposits(ctx, int64(currIndex))
s.cfg.depositCache.InsertFinalizedDeposits(ctx, int64(currIndex))
// Deposit proofs are only used during state transition and can be safely removed to save space.
if err = s.cfg.DepositCache.PruneProofs(ctx, int64(currIndex)); err != nil {
if err = s.cfg.depositCache.PruneProofs(ctx, int64(currIndex)); err != nil {
return errors.Wrap(err, "could not prune deposit proofs")
}
}
@ -632,7 +614,7 @@ func (s *Service) initDepositCaches(ctx context.Context, ctrs []*protodb.Deposit
// is more than the current index in state.
if uint64(len(ctrs)) > currIndex {
for _, c := range ctrs[currIndex:] {
s.cfg.DepositCache.InsertPendingDeposit(ctx, c.Deposit, c.Eth1BlockHeight, c.Index, bytesutil.ToBytes32(c.DepositRoot))
s.cfg.depositCache.InsertPendingDeposit(ctx, c.Deposit, c.Eth1BlockHeight, c.Index, bytesutil.ToBytes32(c.DepositRoot))
}
}
return nil
@ -918,10 +900,10 @@ func (s *Service) determineEarliestVotingBlock(ctx context.Context, followBlock
// is ready to serve we connect to it again. This method is only
// relevant if we are on our backup endpoint.
func (s *Service) checkDefaultEndpoint() {
primaryEndpoint := s.httpEndpoints[0]
primaryEndpoint := s.cfg.httpEndpoints[0]
// Return early if we are running on our primary
// endpoint.
if s.currHttpEndpoint.Equals(primaryEndpoint) {
if s.cfg.currHttpEndpoint.Equals(primaryEndpoint) {
return
}
@ -947,11 +929,11 @@ func (s *Service) checkDefaultEndpoint() {
// This is an inefficient way to search for the next endpoint, but given N is expected to be
// small ( < 25), it is fine to search this way.
func (s *Service) fallbackToNextEndpoint() {
currEndpoint := s.currHttpEndpoint
currEndpoint := s.cfg.currHttpEndpoint
currIndex := 0
totalEndpoints := len(s.httpEndpoints)
totalEndpoints := len(s.cfg.httpEndpoints)
for i, endpoint := range s.httpEndpoints {
for i, endpoint := range s.cfg.httpEndpoints {
if endpoint.Equals(currEndpoint) {
currIndex = i
break
@ -961,9 +943,9 @@ func (s *Service) fallbackToNextEndpoint() {
if nextIndex >= totalEndpoints {
nextIndex = 0
}
s.updateCurrHttpEndpoint(s.httpEndpoints[nextIndex])
s.updateCurrHttpEndpoint(s.cfg.httpEndpoints[nextIndex])
if nextIndex != currIndex {
log.Infof("Falling back to alternative endpoint: %s", logs.MaskCredentialsLogging(s.currHttpEndpoint.Url))
log.Infof("Falling back to alternative endpoint: %s", logs.MaskCredentialsLogging(s.cfg.currHttpEndpoint.Url))
}
}
@ -1019,7 +1001,7 @@ func (s *Service) validateDepositContainers(ctrs []*protodb.DepositContainer) bo
// validates the current powchain data saved and makes sure that any
// embedded genesis state is correctly accounted for.
func (s *Service) ensureValidPowchainData(ctx context.Context) error {
genState, err := s.cfg.BeaconDB.GenesisState(ctx)
genState, err := s.cfg.beaconDB.GenesisState(ctx)
if err != nil {
return err
}
@ -1027,7 +1009,7 @@ func (s *Service) ensureValidPowchainData(ctx context.Context) error {
if genState == nil || genState.IsNil() {
return nil
}
eth1Data, err := s.cfg.BeaconDB.PowchainData(ctx)
eth1Data, err := s.cfg.beaconDB.PowchainData(ctx)
if err != nil {
return errors.Wrap(err, "unable to retrieve eth1 data")
}
@ -1048,9 +1030,9 @@ func (s *Service) ensureValidPowchainData(ctx context.Context) error {
ChainstartData: s.chainStartData,
BeaconState: pbState,
Trie: s.depositTrie.ToProto(),
DepositContainers: s.cfg.DepositCache.AllDepositContainers(ctx),
DepositContainers: s.cfg.depositCache.AllDepositContainers(ctx),
}
return s.cfg.BeaconDB.SavePowchainData(ctx, eth1Data)
return s.cfg.beaconDB.SavePowchainData(ctx, eth1Data)
}
return nil
}
@ -1077,5 +1059,5 @@ func eth1HeadIsBehind(timestamp uint64) bool {
}
func (s *Service) primaryConnected() bool {
return s.currHttpEndpoint.Equals(s.httpEndpoints[0])
return s.cfg.currHttpEndpoint.Equals(s.cfg.httpEndpoints[0])
}

View File

@ -129,11 +129,11 @@ func TestStart_OK(t *testing.T) {
beaconDB := dbutil.SetupDB(t)
testAcc, err := contracts.Setup()
require.NoError(t, err, "Unable to set up simulated backend")
web3Service, err := NewService(context.Background(), &Web3ServiceConfig{
HttpEndpoints: []string{endpoint},
DepositContract: testAcc.ContractAddr,
BeaconDB: beaconDB,
})
web3Service, err := NewService(context.Background(),
WithHttpEndpoints([]string{endpoint}),
WithDepositContractAddress(testAcc.ContractAddr),
WithDatabase(beaconDB),
)
require.NoError(t, err, "unable to setup web3 ETH1.0 chain service")
web3Service = setDefaultMocks(web3Service)
web3Service.rpcClient = &mockPOW.RPCClient{Backend: testAcc.Backend}
@ -158,11 +158,11 @@ func TestStart_NoHttpEndpointDefinedFails_WithoutChainStarted(t *testing.T) {
beaconDB := dbutil.SetupDB(t)
testAcc, err := contracts.Setup()
require.NoError(t, err, "Unable to set up simulated backend")
s, err := NewService(context.Background(), &Web3ServiceConfig{
HttpEndpoints: []string{""}, // No endpoint defined!
DepositContract: testAcc.ContractAddr,
BeaconDB: beaconDB,
})
s, err := NewService(context.Background(),
WithHttpEndpoints([]string{""}),
WithDepositContractAddress(testAcc.ContractAddr),
WithDatabase(beaconDB),
)
require.NoError(t, err)
// Set custom exit func so test can proceed
log.Logger.ExitFunc = func(i int) {
@ -201,12 +201,12 @@ func TestStart_NoHttpEndpointDefinedSucceeds_WithGenesisState(t *testing.T) {
require.NoError(t, beaconDB.SaveGenesisBlockRoot(context.Background(), genRoot))
depositCache, err := depositcache.New()
require.NoError(t, err)
s, err := NewService(context.Background(), &Web3ServiceConfig{
HttpEndpoints: []string{""}, // No endpoint defined!
DepositContract: testAcc.ContractAddr,
BeaconDB: beaconDB,
DepositCache: depositCache,
})
s, err := NewService(context.Background(),
WithHttpEndpoints([]string{""}),
WithDepositContractAddress(testAcc.ContractAddr),
WithDatabase(beaconDB),
WithDepositCache(depositCache),
)
require.NoError(t, err)
wg := new(sync.WaitGroup)
@ -232,11 +232,11 @@ func TestStart_NoHttpEndpointDefinedSucceeds_WithChainStarted(t *testing.T) {
ChainstartData: &protodb.ChainStartData{Chainstarted: true},
Trie: &protodb.SparseMerkleTrie{},
}))
s, err := NewService(context.Background(), &Web3ServiceConfig{
HttpEndpoints: []string{""}, // No endpoint defined!
DepositContract: testAcc.ContractAddr,
BeaconDB: beaconDB,
})
s, err := NewService(context.Background(),
WithHttpEndpoints([]string{""}),
WithDepositContractAddress(testAcc.ContractAddr),
WithDatabase(beaconDB),
)
require.NoError(t, err)
s.Start()
@ -249,11 +249,11 @@ func TestStop_OK(t *testing.T) {
testAcc, err := contracts.Setup()
require.NoError(t, err, "Unable to set up simulated backend")
beaconDB := dbutil.SetupDB(t)
web3Service, err := NewService(context.Background(), &Web3ServiceConfig{
HttpEndpoints: []string{endpoint},
DepositContract: testAcc.ContractAddr,
BeaconDB: beaconDB,
})
web3Service, err := NewService(context.Background(),
WithHttpEndpoints([]string{endpoint}),
WithDepositContractAddress(testAcc.ContractAddr),
WithDatabase(beaconDB),
)
require.NoError(t, err, "unable to setup web3 ETH1.0 chain service")
web3Service = setDefaultMocks(web3Service)
web3Service.depositContractCaller, err = contracts.NewDepositContractCaller(testAcc.ContractAddr, testAcc.Backend)
@ -274,11 +274,11 @@ func TestService_Eth1Synced(t *testing.T) {
testAcc, err := contracts.Setup()
require.NoError(t, err, "Unable to set up simulated backend")
beaconDB := dbutil.SetupDB(t)
web3Service, err := NewService(context.Background(), &Web3ServiceConfig{
HttpEndpoints: []string{endpoint},
DepositContract: testAcc.ContractAddr,
BeaconDB: beaconDB,
})
web3Service, err := NewService(context.Background(),
WithHttpEndpoints([]string{endpoint}),
WithDepositContractAddress(testAcc.ContractAddr),
WithDatabase(beaconDB),
)
require.NoError(t, err, "unable to setup web3 ETH1.0 chain service")
web3Service = setDefaultMocks(web3Service)
web3Service.depositContractCaller, err = contracts.NewDepositContractCaller(testAcc.ContractAddr, testAcc.Backend)
@ -299,11 +299,11 @@ func TestFollowBlock_OK(t *testing.T) {
testAcc, err := contracts.Setup()
require.NoError(t, err, "Unable to set up simulated backend")
beaconDB := dbutil.SetupDB(t)
web3Service, err := NewService(context.Background(), &Web3ServiceConfig{
HttpEndpoints: []string{endpoint},
DepositContract: testAcc.ContractAddr,
BeaconDB: beaconDB,
})
web3Service, err := NewService(context.Background(),
WithHttpEndpoints([]string{endpoint}),
WithDepositContractAddress(testAcc.ContractAddr),
WithDatabase(beaconDB),
)
require.NoError(t, err, "unable to setup web3 ETH1.0 chain service")
// simulated backend sets eth1 block
@ -372,10 +372,10 @@ func TestStatus(t *testing.T) {
func TestHandlePanic_OK(t *testing.T) {
hook := logTest.NewGlobal()
beaconDB := dbutil.SetupDB(t)
web3Service, err := NewService(context.Background(), &Web3ServiceConfig{
HttpEndpoints: []string{endpoint},
BeaconDB: beaconDB,
})
web3Service, err := NewService(context.Background(),
WithHttpEndpoints([]string{endpoint}),
WithDatabase(beaconDB),
)
require.NoError(t, err, "unable to setup web3 ETH1.0 chain service")
// nil eth1DataFetcher would panic if cached value not used
web3Service.eth1DataFetcher = nil
@ -411,11 +411,11 @@ func TestLogTillGenesis_OK(t *testing.T) {
testAcc, err := contracts.Setup()
require.NoError(t, err, "Unable to set up simulated backend")
beaconDB := dbutil.SetupDB(t)
web3Service, err := NewService(context.Background(), &Web3ServiceConfig{
HttpEndpoints: []string{endpoint},
DepositContract: testAcc.ContractAddr,
BeaconDB: beaconDB,
})
web3Service, err := NewService(context.Background(),
WithHttpEndpoints([]string{endpoint}),
WithDepositContractAddress(testAcc.ContractAddr),
WithDatabase(beaconDB),
)
require.NoError(t, err, "unable to setup web3 ETH1.0 chain service")
web3Service.depositContractCaller, err = contracts.NewDepositContractCaller(testAcc.ContractAddr, testAcc.Backend)
require.NoError(t, err)
@ -447,24 +447,24 @@ func TestInitDepositCache_OK(t *testing.T) {
s := &Service{
chainStartData: &protodb.ChainStartData{Chainstarted: false},
preGenesisState: gs,
cfg: &Web3ServiceConfig{BeaconDB: beaconDB},
cfg: &config{beaconDB: beaconDB},
}
var err error
s.cfg.DepositCache, err = depositcache.New()
s.cfg.depositCache, err = depositcache.New()
require.NoError(t, err)
require.NoError(t, s.initDepositCaches(context.Background(), ctrs))
require.Equal(t, 0, len(s.cfg.DepositCache.PendingContainers(context.Background(), nil)))
require.Equal(t, 0, len(s.cfg.depositCache.PendingContainers(context.Background(), nil)))
blockRootA := [32]byte{'a'}
emptyState, err := util.NewBeaconState()
require.NoError(t, err)
require.NoError(t, s.cfg.BeaconDB.SaveGenesisBlockRoot(context.Background(), blockRootA))
require.NoError(t, s.cfg.BeaconDB.SaveState(context.Background(), emptyState, blockRootA))
require.NoError(t, s.cfg.beaconDB.SaveGenesisBlockRoot(context.Background(), blockRootA))
require.NoError(t, s.cfg.beaconDB.SaveState(context.Background(), emptyState, blockRootA))
s.chainStartData.Chainstarted = true
require.NoError(t, s.initDepositCaches(context.Background(), ctrs))
require.Equal(t, 3, len(s.cfg.DepositCache.PendingContainers(context.Background(), nil)))
require.Equal(t, 3, len(s.cfg.depositCache.PendingContainers(context.Background(), nil)))
}
func TestInitDepositCacheWithFinalization_OK(t *testing.T) {
@ -508,14 +508,14 @@ func TestInitDepositCacheWithFinalization_OK(t *testing.T) {
s := &Service{
chainStartData: &protodb.ChainStartData{Chainstarted: false},
preGenesisState: gs,
cfg: &Web3ServiceConfig{BeaconDB: beaconDB},
cfg: &config{beaconDB: beaconDB},
}
var err error
s.cfg.DepositCache, err = depositcache.New()
s.cfg.depositCache, err = depositcache.New()
require.NoError(t, err)
require.NoError(t, s.initDepositCaches(context.Background(), ctrs))
require.Equal(t, 0, len(s.cfg.DepositCache.PendingContainers(context.Background(), nil)))
require.Equal(t, 0, len(s.cfg.depositCache.PendingContainers(context.Background(), nil)))
headBlock := util.NewBeaconBlock()
headRoot, err := headBlock.Block.HashTreeRoot()
@ -524,10 +524,10 @@ func TestInitDepositCacheWithFinalization_OK(t *testing.T) {
emptyState, err := util.NewBeaconState()
require.NoError(t, err)
require.NoError(t, s.cfg.BeaconDB.SaveGenesisBlockRoot(context.Background(), headRoot))
require.NoError(t, s.cfg.BeaconDB.SaveState(context.Background(), emptyState, headRoot))
require.NoError(t, s.cfg.beaconDB.SaveGenesisBlockRoot(context.Background(), headRoot))
require.NoError(t, s.cfg.beaconDB.SaveState(context.Background(), emptyState, headRoot))
require.NoError(t, stateGen.SaveState(context.Background(), headRoot, emptyState))
s.cfg.StateGen = stateGen
s.cfg.stateGen = stateGen
require.NoError(t, emptyState.SetEth1DepositIndex(2))
ctx := context.Background()
@ -539,7 +539,7 @@ func TestInitDepositCacheWithFinalization_OK(t *testing.T) {
s.chainStartData.Chainstarted = true
require.NoError(t, s.initDepositCaches(context.Background(), ctrs))
deps := s.cfg.DepositCache.NonFinalizedDeposits(context.Background(), nil)
deps := s.cfg.depositCache.NonFinalizedDeposits(context.Background(), nil)
assert.Equal(t, 0, len(deps))
}
@ -547,11 +547,11 @@ func TestNewService_EarliestVotingBlock(t *testing.T) {
testAcc, err := contracts.Setup()
require.NoError(t, err, "Unable to set up simulated backend")
beaconDB := dbutil.SetupDB(t)
web3Service, err := NewService(context.Background(), &Web3ServiceConfig{
HttpEndpoints: []string{endpoint},
DepositContract: testAcc.ContractAddr,
BeaconDB: beaconDB,
})
web3Service, err := NewService(context.Background(),
WithHttpEndpoints([]string{endpoint}),
WithDepositContractAddress(testAcc.ContractAddr),
WithDatabase(beaconDB),
)
require.NoError(t, err, "unable to setup web3 ETH1.0 chain service")
web3Service.eth1DataFetcher = &goodFetcher{backend: testAcc.Backend}
// simulated backend sets eth1 block
@ -598,22 +598,21 @@ func TestNewService_Eth1HeaderRequLimit(t *testing.T) {
require.NoError(t, err, "Unable to set up simulated backend")
beaconDB := dbutil.SetupDB(t)
s1, err := NewService(context.Background(), &Web3ServiceConfig{
HttpEndpoints: []string{endpoint},
DepositContract: testAcc.ContractAddr,
BeaconDB: beaconDB,
})
s1, err := NewService(context.Background(),
WithHttpEndpoints([]string{endpoint}),
WithDepositContractAddress(testAcc.ContractAddr),
WithDatabase(beaconDB),
)
require.NoError(t, err, "unable to setup web3 ETH1.0 chain service")
assert.Equal(t, defaultEth1HeaderReqLimit, s1.cfg.Eth1HeaderReqLimit, "default eth1 header request limit not set")
s2, err := NewService(context.Background(), &Web3ServiceConfig{
HttpEndpoints: []string{endpoint},
DepositContract: testAcc.ContractAddr,
BeaconDB: beaconDB,
Eth1HeaderReqLimit: uint64(150),
})
assert.Equal(t, defaultEth1HeaderReqLimit, s1.cfg.eth1HeaderReqLimit, "default eth1 header request limit not set")
s2, err := NewService(context.Background(),
WithHttpEndpoints([]string{endpoint}),
WithDepositContractAddress(testAcc.ContractAddr),
WithDatabase(beaconDB),
WithEth1HeaderRequestLimit(uint64(150)),
)
require.NoError(t, err, "unable to setup web3 ETH1.0 chain service")
assert.Equal(t, uint64(150), s2.cfg.Eth1HeaderReqLimit, "unable to set eth1HeaderRequestLimit")
assert.Equal(t, uint64(150), s2.cfg.eth1HeaderReqLimit, "unable to set eth1HeaderRequestLimit")
}
type mockBSUpdater struct {
@ -635,41 +634,41 @@ func TestServiceFallbackCorrectly(t *testing.T) {
beaconDB := dbutil.SetupDB(t)
mbs := &mockBSUpdater{}
s1, err := NewService(context.Background(), &Web3ServiceConfig{
HttpEndpoints: []string{firstEndpoint},
DepositContract: testAcc.ContractAddr,
BeaconDB: beaconDB,
BeaconNodeStatsUpdater: mbs,
})
s1.bsUpdater = mbs
s1, err := NewService(context.Background(),
WithHttpEndpoints([]string{firstEndpoint}),
WithDepositContractAddress(testAcc.ContractAddr),
WithDatabase(beaconDB),
WithBeaconNodeStatsUpdater(mbs),
)
s1.cfg.beaconNodeStatsUpdater = mbs
require.NoError(t, err)
assert.Equal(t, firstEndpoint, s1.currHttpEndpoint.Url, "Unexpected http endpoint")
assert.Equal(t, firstEndpoint, s1.cfg.currHttpEndpoint.Url, "Unexpected http endpoint")
// Stay at the first endpoint.
s1.fallbackToNextEndpoint()
assert.Equal(t, firstEndpoint, s1.currHttpEndpoint.Url, "Unexpected http endpoint")
assert.Equal(t, firstEndpoint, s1.cfg.currHttpEndpoint.Url, "Unexpected http endpoint")
assert.Equal(t, false, mbs.lastBS.SyncEth1FallbackConfigured, "SyncEth1FallbackConfigured in clientstats update should be false when only 1 endpoint is configured")
s1.httpEndpoints = append(s1.httpEndpoints, network.Endpoint{Url: secondEndpoint})
s1.cfg.httpEndpoints = append(s1.cfg.httpEndpoints, network.Endpoint{Url: secondEndpoint})
s1.fallbackToNextEndpoint()
assert.Equal(t, secondEndpoint, s1.currHttpEndpoint.Url, "Unexpected http endpoint")
assert.Equal(t, secondEndpoint, s1.cfg.currHttpEndpoint.Url, "Unexpected http endpoint")
assert.Equal(t, true, mbs.lastBS.SyncEth1FallbackConfigured, "SyncEth1FallbackConfigured in clientstats update should be true when > 1 endpoint is configured")
thirdEndpoint := "C"
fourthEndpoint := "D"
s1.httpEndpoints = append(s1.httpEndpoints, network.Endpoint{Url: thirdEndpoint}, network.Endpoint{Url: fourthEndpoint})
s1.cfg.httpEndpoints = append(s1.cfg.httpEndpoints, network.Endpoint{Url: thirdEndpoint}, network.Endpoint{Url: fourthEndpoint})
s1.fallbackToNextEndpoint()
assert.Equal(t, thirdEndpoint, s1.currHttpEndpoint.Url, "Unexpected http endpoint")
assert.Equal(t, thirdEndpoint, s1.cfg.currHttpEndpoint.Url, "Unexpected http endpoint")
s1.fallbackToNextEndpoint()
assert.Equal(t, fourthEndpoint, s1.currHttpEndpoint.Url, "Unexpected http endpoint")
assert.Equal(t, fourthEndpoint, s1.cfg.currHttpEndpoint.Url, "Unexpected http endpoint")
// Rollover correctly back to the first endpoint
s1.fallbackToNextEndpoint()
assert.Equal(t, firstEndpoint, s1.currHttpEndpoint.Url, "Unexpected http endpoint")
assert.Equal(t, firstEndpoint, s1.cfg.currHttpEndpoint.Url, "Unexpected http endpoint")
}
func TestDedupEndpoints(t *testing.T) {
@ -697,19 +696,19 @@ func TestService_EnsureConsistentPowchainData(t *testing.T) {
cache, err := depositcache.New()
require.NoError(t, err)
s1, err := NewService(context.Background(), &Web3ServiceConfig{
BeaconDB: beaconDB,
DepositCache: cache,
})
s1, err := NewService(context.Background(),
WithDatabase(beaconDB),
WithDepositCache(cache),
)
require.NoError(t, err)
genState, err := util.NewBeaconState()
require.NoError(t, err)
assert.NoError(t, genState.SetSlot(1000))
require.NoError(t, s1.cfg.BeaconDB.SaveGenesisData(context.Background(), genState))
require.NoError(t, s1.cfg.beaconDB.SaveGenesisData(context.Background(), genState))
require.NoError(t, s1.ensureValidPowchainData(context.Background()))
eth1Data, err := s1.cfg.BeaconDB.PowchainData(context.Background())
eth1Data, err := s1.cfg.beaconDB.PowchainData(context.Background())
assert.NoError(t, err)
assert.NotNil(t, eth1Data)
@ -721,19 +720,19 @@ func TestService_InitializeCorrectly(t *testing.T) {
cache, err := depositcache.New()
require.NoError(t, err)
s1, err := NewService(context.Background(), &Web3ServiceConfig{
BeaconDB: beaconDB,
DepositCache: cache,
})
s1, err := NewService(context.Background(),
WithDatabase(beaconDB),
WithDepositCache(cache),
)
require.NoError(t, err)
genState, err := util.NewBeaconState()
require.NoError(t, err)
assert.NoError(t, genState.SetSlot(1000))
require.NoError(t, s1.cfg.BeaconDB.SaveGenesisData(context.Background(), genState))
require.NoError(t, s1.cfg.beaconDB.SaveGenesisData(context.Background(), genState))
require.NoError(t, s1.ensureValidPowchainData(context.Background()))
eth1Data, err := s1.cfg.BeaconDB.PowchainData(context.Background())
eth1Data, err := s1.cfg.beaconDB.PowchainData(context.Background())
assert.NoError(t, err)
assert.NoError(t, s1.initializeEth1Data(context.Background(), eth1Data))
@ -745,25 +744,25 @@ func TestService_EnsureValidPowchainData(t *testing.T) {
cache, err := depositcache.New()
require.NoError(t, err)
s1, err := NewService(context.Background(), &Web3ServiceConfig{
BeaconDB: beaconDB,
DepositCache: cache,
})
s1, err := NewService(context.Background(),
WithDatabase(beaconDB),
WithDepositCache(cache),
)
require.NoError(t, err)
genState, err := util.NewBeaconState()
require.NoError(t, err)
assert.NoError(t, genState.SetSlot(1000))
require.NoError(t, s1.cfg.BeaconDB.SaveGenesisData(context.Background(), genState))
require.NoError(t, s1.cfg.beaconDB.SaveGenesisData(context.Background(), genState))
err = s1.cfg.BeaconDB.SavePowchainData(context.Background(), &protodb.ETH1ChainData{
err = s1.cfg.beaconDB.SavePowchainData(context.Background(), &protodb.ETH1ChainData{
ChainstartData: &protodb.ChainStartData{Chainstarted: true},
DepositContainers: []*protodb.DepositContainer{{Index: 1}},
})
require.NoError(t, err)
require.NoError(t, s1.ensureValidPowchainData(context.Background()))
eth1Data, err := s1.cfg.BeaconDB.PowchainData(context.Background())
eth1Data, err := s1.cfg.beaconDB.PowchainData(context.Background())
assert.NoError(t, err)
assert.NotNil(t, eth1Data)
@ -775,10 +774,10 @@ func TestService_ValidateDepositContainers(t *testing.T) {
cache, err := depositcache.New()
require.NoError(t, err)
s1, err := NewService(context.Background(), &Web3ServiceConfig{
BeaconDB: beaconDB,
DepositCache: cache,
})
s1, err := NewService(context.Background(),
WithDatabase(beaconDB),
WithDepositCache(cache),
)
require.NoError(t, err)
var tt = []struct {

View File

@ -382,7 +382,7 @@ func (s *Service) logNewClientConnection(ctx context.Context) {
if !s.connectedRPCClients[clientInfo.Addr] {
log.WithFields(logrus.Fields{
"addr": clientInfo.Addr.String(),
}).Infof("New gRPC client connected to beacon node")
}).Infof("NewService gRPC client connected to beacon node")
s.connectedRPCClients[clientInfo.Addr] = true
}
}

View File

@ -20,6 +20,7 @@ go_library(
"//cmd/beacon-chain/blockchain:go_default_library",
"//cmd/beacon-chain/db:go_default_library",
"//cmd/beacon-chain/flags:go_default_library",
"//cmd/beacon-chain/powchain:go_default_library",
"//config/features:go_default_library",
"//io/file:go_default_library",
"//io/logs:go_default_library",

View File

@ -16,6 +16,7 @@ import (
blockchaincmd "github.com/prysmaticlabs/prysm/cmd/beacon-chain/blockchain"
dbcommands "github.com/prysmaticlabs/prysm/cmd/beacon-chain/db"
"github.com/prysmaticlabs/prysm/cmd/beacon-chain/flags"
powchaincmd "github.com/prysmaticlabs/prysm/cmd/beacon-chain/powchain"
"github.com/prysmaticlabs/prysm/config/features"
"github.com/prysmaticlabs/prysm/io/file"
"github.com/prysmaticlabs/prysm/io/logs"
@ -229,8 +230,13 @@ func startNode(ctx *cli.Context) error {
if err != nil {
return nil
}
powchainFlagOpts, err := powchaincmd.FlagOptions(ctx)
if err != nil {
return nil
}
opts := []node.Option{
node.WithBlockchainFlagOptions(blockchainFlagOpts),
node.WithPowchainFlagOptions(powchainFlagOpts),
}
beacon, err := node.New(ctx, opts...)
if err != nil {

View File

@ -0,0 +1,30 @@
load("@prysm//tools/go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["options.go"],
importpath = "github.com/prysmaticlabs/prysm/cmd/beacon-chain/powchain",
visibility = [
"//beacon-chain:__subpackages__",
"//cmd:__subpackages__",
],
deps = [
"//beacon-chain/powchain:go_default_library",
"//cmd/beacon-chain/flags:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@com_github_urfave_cli_v2//:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["options_test.go"],
embed = [":go_default_library"],
deps = [
"//cmd/beacon-chain/flags:go_default_library",
"//testing/assert:go_default_library",
"//testing/require:go_default_library",
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
"@com_github_urfave_cli_v2//:go_default_library",
],
)

View File

@ -0,0 +1,39 @@
package powchaincmd
import (
"github.com/prysmaticlabs/prysm/beacon-chain/powchain"
"github.com/prysmaticlabs/prysm/cmd/beacon-chain/flags"
"github.com/sirupsen/logrus"
"github.com/urfave/cli/v2"
)
var log = logrus.WithField("prefix", "cmd-powchain")
// FlagOptions for powchain service flag configurations.
func FlagOptions(c *cli.Context) ([]powchain.Option, error) {
endpoints := parseHttpEndpoints(c)
opts := []powchain.Option{
powchain.WithHttpEndpoints(endpoints),
powchain.WithEth1HeaderRequestLimit(c.Uint64(flags.Eth1HeaderReqLimit.Name)),
}
return opts, nil
}
func parseHttpEndpoints(c *cli.Context) []string {
if c.String(flags.HTTPWeb3ProviderFlag.Name) == "" && len(c.StringSlice(flags.FallbackWeb3ProviderFlag.Name)) == 0 {
log.Error(
"No ETH1 node specified to run with the beacon node. " +
"Please consider running your own Ethereum proof-of-work node for better uptime, " +
"security, and decentralization of Ethereum. Visit " +
"https://docs.prylabs.network/docs/prysm-usage/setup-eth1 for more information",
)
log.Error(
"You will need to specify --http-web3provider and/or --fallback-web3provider to attach " +
"an eth1 node to the prysm node. Without an eth1 node block proposals for your " +
"validator will be affected and the beacon node will not be able to initialize the genesis state",
)
}
endpoints := []string{c.String(flags.HTTPWeb3ProviderFlag.Name)}
endpoints = append(endpoints, c.StringSlice(flags.FallbackWeb3ProviderFlag.Name)...)
return endpoints
}

View File

@ -0,0 +1,40 @@
package powchaincmd
import (
"flag"
"testing"
"github.com/prysmaticlabs/prysm/cmd/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/testing/assert"
"github.com/prysmaticlabs/prysm/testing/require"
logTest "github.com/sirupsen/logrus/hooks/test"
"github.com/urfave/cli/v2"
)
func TestPowchainCmd(t *testing.T) {
app := cli.App{}
set := flag.NewFlagSet("test", 0)
set.String(flags.HTTPWeb3ProviderFlag.Name, "primary", "")
fallback := cli.StringSlice{}
err := fallback.Set("fallback1")
require.NoError(t, err)
err = fallback.Set("fallback2")
require.NoError(t, err)
set.Var(&fallback, flags.FallbackWeb3ProviderFlag.Name, "")
ctx := cli.NewContext(&app, set, nil)
endpoints := parseHttpEndpoints(ctx)
assert.DeepEqual(t, []string{"primary", "fallback1", "fallback2"}, endpoints)
}
func TestPowchainPreregistration_EmptyWeb3Provider(t *testing.T) {
hook := logTest.NewGlobal()
app := cli.App{}
set := flag.NewFlagSet("test", 0)
set.String(flags.HTTPWeb3ProviderFlag.Name, "", "")
fallback := cli.StringSlice{}
set.Var(&fallback, flags.FallbackWeb3ProviderFlag.Name, "")
ctx := cli.NewContext(&app, set, nil)
parseHttpEndpoints(ctx)
assert.LogsContain(t, hook, "No ETH1 node specified to run with the beacon node")
}