Handle ETH1 Node Failure (#3914)

* refactor powchain config

* send mock eth1 votes when proposing blocks

* fix all tests

* lint

* fix all tests

* fix blockchain test

* Apply suggestions from code review

Co-Authored-By: Raul Jordan <raul@prysmaticlabs.com>

* raul's review

* add warning
This commit is contained in:
Nishant Das 2019-11-03 03:56:03 +08:00 committed by Raul Jordan
parent cc16a10a33
commit 8977e5088e
11 changed files with 240 additions and 204 deletions

View File

@ -199,13 +199,9 @@ func setupBeaconChain(t *testing.T, beaconDB db.Database) *Service {
ctx := context.Background()
var web3Service *powchain.Service
var err error
client := &mockClient{}
web3Service, err = powchain.NewService(ctx, &powchain.Web3ServiceConfig{
Endpoint: endpoint,
ETH1Endpoint: endpoint,
DepositContract: common.Address{},
Reader: client,
Client: client,
Logger: client,
})
if err != nil {
t.Fatalf("unable to set up web3 service: %v", err)

View File

@ -33,8 +33,6 @@ go_library(
"//shared/tracing:go_default_library",
"//shared/version:go_default_library",
"@com_github_ethereum_go_ethereum//common:go_default_library",
"@com_github_ethereum_go_ethereum//ethclient:go_default_library",
"@com_github_ethereum_go_ethereum//rpc:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@com_github_urfave_cli//:go_default_library",

View File

@ -15,8 +15,6 @@ import (
"syscall"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
gethRPC "github.com/ethereum/go-ethereum/rpc"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/beacon-chain/archiver"
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
@ -311,28 +309,11 @@ func (b *BeaconNode) registerPOWChainService(cliCtx *cli.Context) error {
log.Fatalf("Invalid deposit contract address given: %s", depAddress)
}
httpRPCClient, err := gethRPC.Dial(cliCtx.GlobalString(flags.HTTPWeb3ProviderFlag.Name))
if err != nil {
log.Fatalf("Access to PoW chain is required for validator. Unable to connect to Geth node: %v", err)
}
httpClient := ethclient.NewClient(httpRPCClient)
rpcClient, err := gethRPC.Dial(cliCtx.GlobalString(flags.Web3ProviderFlag.Name))
if err != nil {
log.Fatalf("Access to PoW chain is required for validator. Unable to connect to Geth node: %v", err)
}
powClient := ethclient.NewClient(rpcClient)
ctx := context.Background()
cfg := &powchain.Web3ServiceConfig{
Endpoint: cliCtx.GlobalString(flags.Web3ProviderFlag.Name),
ETH1Endpoint: cliCtx.GlobalString(flags.Web3ProviderFlag.Name),
HTTPEndPoint: cliCtx.GlobalString(flags.HTTPWeb3ProviderFlag.Name),
DepositContract: common.HexToAddress(depAddress),
Client: httpClient,
Reader: powClient,
Logger: powClient,
HTTPLogger: httpClient,
BlockFetcher: httpClient,
ContractBackend: httpClient,
BeaconDB: b.db,
DepositCache: b.depositCache,
}

View File

@ -28,6 +28,8 @@ go_library(
"@com_github_ethereum_go_ethereum//accounts/abi/bind:go_default_library",
"@com_github_ethereum_go_ethereum//common:go_default_library",
"@com_github_ethereum_go_ethereum//core/types:go_default_library",
"@com_github_ethereum_go_ethereum//ethclient:go_default_library",
"@com_github_ethereum_go_ethereum//rpc:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",

View File

@ -15,6 +15,14 @@ import (
var endpoint = "ws://127.0.0.1"
func setDefaultMocks(service *Service) *Service {
service.reader = &goodReader{}
service.blockFetcher = &goodFetcher{}
service.logger = &goodLogger{}
service.httpLogger = &goodLogger{}
return service
}
func TestLatestMainchainInfo_OK(t *testing.T) {
testAcc, err := contracts.Setup()
if err != nil {
@ -23,18 +31,18 @@ func TestLatestMainchainInfo_OK(t *testing.T) {
beaconDB := dbutil.SetupDB(t)
defer dbutil.TeardownDB(t, beaconDB)
web3Service, err := NewService(context.Background(), &Web3ServiceConfig{
Endpoint: endpoint,
ETH1Endpoint: endpoint,
DepositContract: testAcc.ContractAddr,
BlockFetcher: &goodFetcher{},
Reader: &goodReader{},
Logger: &goodLogger{},
HTTPLogger: &goodLogger{},
ContractBackend: testAcc.Backend,
BeaconDB: beaconDB,
})
if err != nil {
t.Fatalf("unable to setup web3 ETH1.0 chain service: %v", err)
}
web3Service = setDefaultMocks(web3Service)
web3Service.depositContractCaller, err = contracts.NewDepositContractCaller(testAcc.ContractAddr, testAcc.Backend)
if err != nil {
t.Fatal(err)
}
testAcc.Backend.Commit()
exitRoutine := make(chan bool)
@ -83,12 +91,12 @@ func TestLatestMainchainInfo_OK(t *testing.T) {
func TestBlockHashByHeight_ReturnsHash(t *testing.T) {
web3Service, err := NewService(context.Background(), &Web3ServiceConfig{
Endpoint: endpoint,
BlockFetcher: &goodFetcher{},
ETH1Endpoint: endpoint,
})
if err != nil {
t.Fatalf("unable to setup web3 ETH1.0 chain service: %v", err)
}
web3Service = setDefaultMocks(web3Service)
ctx := context.Background()
block := gethTypes.NewBlock(
@ -122,12 +130,12 @@ func TestBlockHashByHeight_ReturnsHash(t *testing.T) {
func TestBlockExists_ValidHash(t *testing.T) {
web3Service, err := NewService(context.Background(), &Web3ServiceConfig{
Endpoint: endpoint,
BlockFetcher: &goodFetcher{},
ETH1Endpoint: endpoint,
})
if err != nil {
t.Fatalf("unable to setup web3 ETH1.0 chain service: %v", err)
}
web3Service = setDefaultMocks(web3Service)
block := gethTypes.NewBlock(
&gethTypes.Header{
@ -161,12 +169,12 @@ func TestBlockExists_ValidHash(t *testing.T) {
func TestBlockExists_InvalidHash(t *testing.T) {
web3Service, err := NewService(context.Background(), &Web3ServiceConfig{
Endpoint: endpoint,
BlockFetcher: &goodFetcher{},
ETH1Endpoint: endpoint,
})
if err != nil {
t.Fatalf("unable to setup web3 ETH1.0 chain service: %v", err)
}
web3Service = setDefaultMocks(web3Service)
_, _, err = web3Service.BlockExists(context.Background(), common.BytesToHash([]byte{0}))
if err == nil {
@ -176,12 +184,13 @@ func TestBlockExists_InvalidHash(t *testing.T) {
func TestBlockExists_UsesCachedBlockInfo(t *testing.T) {
web3Service, err := NewService(context.Background(), &Web3ServiceConfig{
Endpoint: endpoint,
BlockFetcher: nil, // nil blockFetcher would panic if cached value not used
ETH1Endpoint: endpoint,
})
if err != nil {
t.Fatalf("unable to setup web3 ETH1.0 chain service: %v", err)
}
// nil blockFetcher would panic if cached value not used
web3Service.blockFetcher = nil
block := gethTypes.NewBlock(
&gethTypes.Header{
@ -211,13 +220,14 @@ func TestBlockExists_UsesCachedBlockInfo(t *testing.T) {
func TestBlockNumberByTimestamp(t *testing.T) {
web3Service, err := NewService(context.Background(), &Web3ServiceConfig{
Endpoint: endpoint,
BlockFetcher: &goodFetcher{},
Client: nil,
ETH1Endpoint: endpoint,
})
if err != nil {
t.Fatal(err)
}
web3Service = setDefaultMocks(web3Service)
web3Service.client = nil
ctx := context.Background()
bn, err := web3Service.BlockNumberByTimestamp(ctx, 150000 /* time */)
if err != nil {

View File

@ -21,16 +21,13 @@ const pubKeyErr = "could not deserialize validator public key"
func TestProcessDeposit_OK(t *testing.T) {
web3Service, err := NewService(context.Background(), &Web3ServiceConfig{
Endpoint: endpoint,
Reader: &goodReader{},
Logger: &goodLogger{},
HTTPLogger: &goodLogger{},
ETH1Endpoint: endpoint,
BeaconDB: &kv.Store{},
BlockFetcher: &goodFetcher{},
})
if err != nil {
t.Fatalf("Unable to setup web3 ETH1.0 chain service: %v", err)
}
web3Service = setDefaultMocks(web3Service)
deposits, depositDataRoots, _ := testutil.SetupInitialDeposits(t, 1)
@ -57,16 +54,13 @@ func TestProcessDeposit_OK(t *testing.T) {
func TestProcessDeposit_InvalidMerkleBranch(t *testing.T) {
web3Service, err := NewService(context.Background(), &Web3ServiceConfig{
Endpoint: endpoint,
Reader: &goodReader{},
Logger: &goodLogger{},
HTTPLogger: &goodLogger{},
ETH1Endpoint: endpoint,
BeaconDB: &kv.Store{},
BlockFetcher: &goodFetcher{},
})
if err != nil {
t.Fatalf("Unable to setup web3 ETH1.0 chain service: %v", err)
}
web3Service = setDefaultMocks(web3Service)
deposits, depositDataRoots, _ := testutil.SetupInitialDeposits(t, 1)
@ -99,16 +93,13 @@ func TestProcessDeposit_InvalidMerkleBranch(t *testing.T) {
func TestProcessDeposit_InvalidPublicKey(t *testing.T) {
web3Service, err := NewService(context.Background(), &Web3ServiceConfig{
Endpoint: endpoint,
Reader: &goodReader{},
Logger: &goodLogger{},
HTTPLogger: &goodLogger{},
ETH1Endpoint: endpoint,
BeaconDB: &kv.Store{},
BlockFetcher: &goodFetcher{},
})
if err != nil {
t.Fatalf("Unable to setup web3 ETH1.0 chain service: %v", err)
}
web3Service = setDefaultMocks(web3Service)
deposits, _, _ := testutil.SetupInitialDeposits(t, 1)
deposits[0].Data.PublicKey = []byte("junk")
@ -142,16 +133,13 @@ func TestProcessDeposit_InvalidPublicKey(t *testing.T) {
func TestProcessDeposit_InvalidSignature(t *testing.T) {
web3Service, err := NewService(context.Background(), &Web3ServiceConfig{
Endpoint: endpoint,
Reader: &goodReader{},
Logger: &goodLogger{},
HTTPLogger: &goodLogger{},
ETH1Endpoint: endpoint,
BeaconDB: &kv.Store{},
BlockFetcher: &goodFetcher{},
})
if err != nil {
t.Fatalf("Unable to setup web3 ETH1.0 chain service: %v", err)
}
web3Service = setDefaultMocks(web3Service)
deposits, _, _ := testutil.SetupInitialDeposits(t, 1)
var fakeSig [96]byte
@ -189,16 +177,13 @@ func TestProcessDeposit_InvalidSignature(t *testing.T) {
func TestProcessDeposit_UnableToVerify(t *testing.T) {
helpers.ClearAllCaches()
web3Service, err := NewService(context.Background(), &Web3ServiceConfig{
Endpoint: endpoint,
Reader: &goodReader{},
Logger: &goodLogger{},
HTTPLogger: &goodLogger{},
ETH1Endpoint: endpoint,
BeaconDB: &kv.Store{},
BlockFetcher: &goodFetcher{},
})
if err != nil {
t.Fatalf("Unable to setup web3 ETH1.0 chain service: %v", err)
}
web3Service = setDefaultMocks(web3Service)
testutil.ResetCache()
deposits, _, keys := testutil.SetupInitialDeposits(t, 1)
@ -221,16 +206,13 @@ func TestProcessDeposit_UnableToVerify(t *testing.T) {
func TestProcessDeposit_IncompleteDeposit(t *testing.T) {
web3Service, err := NewService(context.Background(), &Web3ServiceConfig{
Endpoint: endpoint,
Reader: &goodReader{},
Logger: &goodLogger{},
HTTPLogger: &goodLogger{},
ETH1Endpoint: endpoint,
BeaconDB: &kv.Store{},
BlockFetcher: &goodFetcher{},
})
if err != nil {
t.Fatalf("Unable to setup web3 ETH1.0 chain service: %v", err)
}
web3Service = setDefaultMocks(web3Service)
deposit := &ethpb.Deposit{
Data: &ethpb.Deposit_Data{
@ -275,16 +257,13 @@ func TestProcessDeposit_IncompleteDeposit(t *testing.T) {
func TestProcessDeposit_AllDepositedSuccessfully(t *testing.T) {
web3Service, err := NewService(context.Background(), &Web3ServiceConfig{
Endpoint: endpoint,
Reader: &goodReader{},
Logger: &goodLogger{},
HTTPLogger: &goodLogger{},
ETH1Endpoint: endpoint,
BeaconDB: &kv.Store{},
BlockFetcher: &goodFetcher{},
})
if err != nil {
t.Fatalf("Unable to setup web3 ETH1.0 chain service: %v", err)
}
web3Service = setDefaultMocks(web3Service)
testutil.ResetCache()
deposits, _, keys := testutil.SetupInitialDeposits(t, 10)

View File

@ -33,19 +33,19 @@ func TestProcessDepositLog_OK(t *testing.T) {
t.Fatalf("Unable to set up simulated backend %v", err)
}
web3Service, err := NewService(context.Background(), &Web3ServiceConfig{
Endpoint: endpoint,
ETH1Endpoint: endpoint,
DepositContract: testAcc.ContractAddr,
Reader: &goodReader{},
Logger: &goodLogger{},
HTTPLogger: &goodLogger{},
ContractBackend: testAcc.Backend,
BeaconDB: &kv.Store{},
DepositCache: depositcache.NewDepositCache(),
BlockFetcher: &goodFetcher{},
})
if err != nil {
t.Fatalf("unable to setup web3 ETH1.0 chain service: %v", err)
}
web3Service = setDefaultMocks(web3Service)
web3Service.depositContractCaller, err = contracts.NewDepositContractCaller(testAcc.ContractAddr, testAcc.Backend)
if err != nil {
t.Fatal(err)
}
testAcc.Backend.Commit()
deposits, _, _ := testutil.SetupInitialDeposits(t, 1)
@ -95,18 +95,19 @@ func TestProcessDepositLog_InsertsPendingDeposit(t *testing.T) {
t.Fatalf("Unable to set up simulated backend %v", err)
}
web3Service, err := NewService(context.Background(), &Web3ServiceConfig{
Endpoint: endpoint,
ETH1Endpoint: endpoint,
DepositContract: testAcc.ContractAddr,
Reader: &goodReader{},
Logger: &goodLogger{},
HTTPLogger: &goodLogger{},
ContractBackend: testAcc.Backend,
BeaconDB: &kv.Store{},
DepositCache: depositcache.NewDepositCache(),
})
if err != nil {
t.Fatalf("unable to setup web3 ETH1.0 chain service: %v", err)
}
web3Service = setDefaultMocks(web3Service)
web3Service.depositContractCaller, err = contracts.NewDepositContractCaller(testAcc.ContractAddr, testAcc.Backend)
if err != nil {
t.Fatal(err)
}
testAcc.Backend.Commit()
@ -154,16 +155,17 @@ func TestUnpackDepositLogData_OK(t *testing.T) {
t.Fatalf("Unable to set up simulated backend %v", err)
}
web3Service, err := NewService(context.Background(), &Web3ServiceConfig{
Endpoint: endpoint,
ETH1Endpoint: endpoint,
DepositContract: testAcc.ContractAddr,
Reader: &goodReader{},
Logger: &goodLogger{},
HTTPLogger: &goodLogger{},
ContractBackend: testAcc.Backend,
})
if err != nil {
t.Fatalf("unable to setup web3 ETH1.0 chain service: %v", err)
}
web3Service = setDefaultMocks(web3Service)
web3Service.depositContractCaller, err = contracts.NewDepositContractCaller(testAcc.ContractAddr, testAcc.Backend)
if err != nil {
t.Fatal(err)
}
testAcc.Backend.Commit()
@ -222,19 +224,19 @@ func TestProcessETH2GenesisLog_8DuplicatePubkeys(t *testing.T) {
t.Fatalf("Unable to set up simulated backend %v", err)
}
web3Service, err := NewService(context.Background(), &Web3ServiceConfig{
Endpoint: endpoint,
ETH1Endpoint: endpoint,
DepositContract: testAcc.ContractAddr,
Reader: &goodReader{},
Logger: &goodLogger{},
HTTPLogger: &goodLogger{},
ContractBackend: testAcc.Backend,
BeaconDB: &kv.Store{},
DepositCache: depositcache.NewDepositCache(),
BlockFetcher: &goodFetcher{},
})
if err != nil {
t.Fatalf("unable to setup web3 ETH1.0 chain service: %v", err)
}
web3Service = setDefaultMocks(web3Service)
web3Service.depositContractCaller, err = contracts.NewDepositContractCaller(testAcc.ContractAddr, testAcc.Backend)
if err != nil {
t.Fatal(err)
}
bConfig := params.MinimalSpecConfig()
bConfig.MinGenesisTime = 0
@ -291,19 +293,19 @@ func TestProcessETH2GenesisLog(t *testing.T) {
t.Fatalf("Unable to set up simulated backend %v", err)
}
web3Service, err := NewService(context.Background(), &Web3ServiceConfig{
Endpoint: endpoint,
ETH1Endpoint: endpoint,
DepositContract: testAcc.ContractAddr,
Reader: &goodReader{},
Logger: &goodLogger{},
HTTPLogger: &goodLogger{},
ContractBackend: testAcc.Backend,
BeaconDB: &kv.Store{},
DepositCache: depositcache.NewDepositCache(),
BlockFetcher: &goodFetcher{},
})
if err != nil {
t.Fatalf("unable to setup web3 ETH1.0 chain service: %v", err)
}
web3Service = setDefaultMocks(web3Service)
web3Service.depositContractCaller, err = contracts.NewDepositContractCaller(testAcc.ContractAddr, testAcc.Backend)
if err != nil {
t.Fatal(err)
}
bConfig := params.MinimalSpecConfig()
bConfig.MinGenesisTime = 0
params.OverrideBeaconConfig(bConfig)
@ -376,19 +378,20 @@ func TestWeb3ServiceProcessDepositLog_RequestMissedDeposits(t *testing.T) {
t.Fatalf("Unable to set up simulated backend %v", err)
}
web3Service, err := NewService(context.Background(), &Web3ServiceConfig{
Endpoint: endpoint,
ETH1Endpoint: endpoint,
DepositContract: testAcc.ContractAddr,
Reader: &goodReader{},
Logger: &goodLogger{},
HTTPLogger: testAcc.Backend,
ContractBackend: testAcc.Backend,
BeaconDB: &kv.Store{},
DepositCache: depositcache.NewDepositCache(),
BlockFetcher: &goodFetcher{},
})
if err != nil {
t.Fatalf("unable to setup web3 ETH1.0 chain service: %v", err)
}
web3Service = setDefaultMocks(web3Service)
web3Service.depositContractCaller, err = contracts.NewDepositContractCaller(testAcc.ContractAddr, testAcc.Backend)
if err != nil {
t.Fatal(err)
}
web3Service.httpLogger = testAcc.Backend
bConfig := params.MinimalSpecConfig()
bConfig.MinGenesisTime = 0
params.OverrideBeaconConfig(bConfig)

View File

@ -14,6 +14,8 @@ import (
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
gethTypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
gethRPC "github.com/ethereum/go-ethereum/rpc"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
@ -45,6 +47,9 @@ var (
})
)
// time to wait before trying to reconnect with the eth1 node.
var backOffPeriod = 6 * time.Second
// Reader defines a struct that can fetch latest header events from a web3 endpoint.
type Reader interface {
SubscribeNewHead(ctx context.Context, ch chan<- *gethTypes.Header) (ethereum.Subscription, error)
@ -106,7 +111,8 @@ type Service struct {
cancel context.CancelFunc
client Client
headerChan chan *gethTypes.Header
endpoint string
eth1Endpoint string
httpEndpoint string
depositContractAddress common.Address
chainStartFeed *event.Feed
reader Reader
@ -138,14 +144,9 @@ type Service struct {
// Web3ServiceConfig defines a config struct for web3 service to use through its life cycle.
type Web3ServiceConfig struct {
Endpoint string
ETH1Endpoint string
HTTPEndPoint string
DepositContract common.Address
Client Client
Reader Reader
Logger bind.ContractFilterer
HTTPLogger bind.ContractFilterer
BlockFetcher RPCBlockFetcher
ContractBackend bind.ContractBackend
BeaconDB db.Database
DepositCache *depositcache.DepositCache
}
@ -153,18 +154,13 @@ type Web3ServiceConfig struct {
// NewService sets up a new instance with an ethclient when
// given a web3 endpoint as a string in the config.
func NewService(ctx context.Context, config *Web3ServiceConfig) (*Service, error) {
if !strings.HasPrefix(config.Endpoint, "ws") && !strings.HasPrefix(config.Endpoint, "ipc") {
if !strings.HasPrefix(config.ETH1Endpoint, "ws") && !strings.HasPrefix(config.ETH1Endpoint, "ipc") {
return nil, fmt.Errorf(
"powchain service requires either an IPC or WebSocket endpoint, provided %s",
config.Endpoint,
config.ETH1Endpoint,
)
}
depositContractCaller, err := contracts.NewDepositContractCaller(config.DepositContract, config.ContractBackend)
if err != nil {
return nil, errors.Wrap(err, "could not create deposit contract caller")
}
ctx, cancel := context.WithCancel(ctx)
depositTrie, err := trieutil.NewTrie(int(params.BeaconConfig().DepositContractTreeDepth))
if err != nil {
@ -175,19 +171,14 @@ func NewService(ctx context.Context, config *Web3ServiceConfig) (*Service, error
ctx: ctx,
cancel: cancel,
headerChan: make(chan *gethTypes.Header),
endpoint: config.Endpoint,
eth1Endpoint: config.ETH1Endpoint,
httpEndpoint: config.HTTPEndPoint,
blockHeight: nil,
blockHash: common.BytesToHash([]byte{}),
blockCache: newBlockCache(),
depositContractAddress: config.DepositContract,
chainStartFeed: new(event.Feed),
client: config.Client,
depositTrie: depositTrie,
reader: config.Reader,
logger: config.Logger,
httpLogger: config.HTTPLogger,
blockFetcher: config.BlockFetcher,
depositContractCaller: depositContractCaller,
chainStartDeposits: make([]*ethpb.Deposit, 0),
beaconDB: config.BeaconDB,
depositCache: config.DepositCache,
@ -200,10 +191,10 @@ func NewService(ctx context.Context, config *Web3ServiceConfig) (*Service, error
// Start a web3 service's main event loop.
func (s *Service) Start() {
log.WithFields(logrus.Fields{
"endpoint": s.endpoint,
}).Info("Connected to eth1 proof-of-work chain")
go s.run(s.ctx.Done())
go func() {
s.waitForConnection()
s.run(s.ctx.Done())
}()
}
// Stop the web3 service's main event loop and associated goroutines.
@ -298,6 +289,78 @@ func (s *Service) AreAllDepositsProcessed() (bool, error) {
return true, nil
}
func (s *Service) connectToPowChain() error {
powClient, httpClient, err := s.dialETH1Nodes()
if err != nil {
return errors.Wrap(err, "could not dial eth1 nodes")
}
depositContractCaller, err := contracts.NewDepositContractCaller(s.depositContractAddress, httpClient)
if err != nil {
return errors.Wrap(err, "could not create deposit contract caller")
}
s.initializeConnection(powClient, httpClient, depositContractCaller)
return nil
}
func (s *Service) dialETH1Nodes() (*ethclient.Client, *ethclient.Client, error) {
httpRPCClient, err := gethRPC.Dial(s.httpEndpoint)
if err != nil {
return nil, nil, err
}
httpClient := ethclient.NewClient(httpRPCClient)
rpcClient, err := gethRPC.Dial(s.eth1Endpoint)
if err != nil {
httpClient.Close()
return nil, nil, err
}
powClient := ethclient.NewClient(rpcClient)
return powClient, httpClient, nil
}
func (s *Service) initializeConnection(powClient *ethclient.Client,
httpClient *ethclient.Client, contractCaller *contracts.DepositContractCaller) {
s.reader = powClient
s.logger = powClient
s.client = httpClient
s.httpLogger = httpClient
s.blockFetcher = httpClient
s.depositContractCaller = contractCaller
}
func (s *Service) waitForConnection() {
err := s.connectToPowChain()
if err == nil {
log.WithFields(logrus.Fields{
"endpoint": s.eth1Endpoint,
}).Info("Connected to eth1 proof-of-work chain")
return
}
log.WithError(err).Error("Could not connect to powchain endpoint")
ticker := time.NewTicker(backOffPeriod)
for {
select {
case <-ticker.C:
err := s.connectToPowChain()
if err == nil {
log.WithFields(logrus.Fields{
"endpoint": s.eth1Endpoint,
}).Info("Connected to eth1 proof-of-work chain")
ticker.Stop()
break
}
log.WithError(err).Error("Could not connect to powchain endpoint")
case <-s.ctx.Done():
ticker.Stop()
log.Debug("Received cancelled context, existing powchain service")
}
}
}
// initDataFromContract calls the deposit contract and finds the deposit count
// and deposit root.
func (s *Service) initDataFromContract() error {

View File

@ -103,37 +103,29 @@ func TestNewWeb3Service_OK(t *testing.T) {
ctx := context.Background()
var err error
if _, err = NewService(ctx, &Web3ServiceConfig{
Endpoint: endpoint,
ETH1Endpoint: endpoint,
DepositContract: common.Address{},
Reader: &goodReader{},
Logger: &goodLogger{},
}); err == nil {
t.Errorf("passing in an HTTP endpoint should throw an error, received nil")
}
endpoint = "ftp://127.0.0.1"
if _, err = NewService(ctx, &Web3ServiceConfig{
Endpoint: endpoint,
ETH1Endpoint: endpoint,
DepositContract: common.Address{},
Reader: &goodReader{},
Logger: &goodLogger{},
}); err == nil {
t.Errorf("passing in a non-ws, wss, or ipc endpoint should throw an error, received nil")
}
endpoint = "ws://127.0.0.1"
if _, err = NewService(ctx, &Web3ServiceConfig{
Endpoint: endpoint,
ETH1Endpoint: endpoint,
DepositContract: common.Address{},
Reader: &goodReader{},
Logger: &goodLogger{},
}); err != nil {
t.Errorf("passing in as ws endpoint should not throw error, received %v", err)
}
endpoint = "ipc://geth.ipc"
if _, err = NewService(ctx, &Web3ServiceConfig{
Endpoint: endpoint,
ETH1Endpoint: endpoint,
DepositContract: common.Address{},
Reader: &goodReader{},
Logger: &goodLogger{},
}); err != nil {
t.Errorf("passing in an ipc endpoint should not throw error, received %v", err)
}
@ -148,26 +140,27 @@ func TestStart_OK(t *testing.T) {
t.Fatalf("Unable to set up simulated backend %v", err)
}
web3Service, err := NewService(context.Background(), &Web3ServiceConfig{
Endpoint: endpoint,
ETH1Endpoint: endpoint,
DepositContract: testAcc.ContractAddr,
Reader: &goodReader{},
Logger: &goodLogger{},
HTTPLogger: &goodLogger{},
BlockFetcher: &goodFetcher{},
ContractBackend: testAcc.Backend,
BeaconDB: beaconDB,
})
if err != nil {
t.Fatalf("unable to setup web3 ETH1.0 chain service: %v", err)
}
web3Service = setDefaultMocks(web3Service)
web3Service.depositContractCaller, err = contracts.NewDepositContractCaller(testAcc.ContractAddr, testAcc.Backend)
if err != nil {
t.Fatal(err)
}
testAcc.Backend.Commit()
web3Service.Start()
msg := hook.LastEntry().Message
want := "Could not connect to ETH1.0 chain RPC client"
if strings.Contains(want, msg) {
t.Errorf("incorrect log, expected %s, got %s", want, msg)
if len(hook.Entries) > 0 {
msg := hook.LastEntry().Message
want := "Could not connect to ETH1.0 chain RPC client"
if strings.Contains(want, msg) {
t.Errorf("incorrect log, expected %s, got %s", want, msg)
}
}
hook.Reset()
web3Service.cancel()
@ -181,16 +174,17 @@ func TestStop_OK(t *testing.T) {
t.Fatalf("Unable to set up simulated backend %v", err)
}
web3Service, err := NewService(context.Background(), &Web3ServiceConfig{
Endpoint: endpoint,
ETH1Endpoint: endpoint,
DepositContract: testAcc.ContractAddr,
Reader: &goodReader{},
Logger: &goodLogger{},
BlockFetcher: &goodFetcher{},
ContractBackend: testAcc.Backend,
})
if err != nil {
t.Fatalf("unable to setup web3 ETH1.0 chain service: %v", err)
}
web3Service = setDefaultMocks(web3Service)
web3Service.depositContractCaller, err = contracts.NewDepositContractCaller(testAcc.ContractAddr, testAcc.Backend)
if err != nil {
t.Fatal(err)
}
testAcc.Backend.Commit()
@ -212,16 +206,17 @@ func TestInitDataFromContract_OK(t *testing.T) {
t.Fatalf("Unable to set up simulated backend %v", err)
}
web3Service, err := NewService(context.Background(), &Web3ServiceConfig{
Endpoint: endpoint,
ETH1Endpoint: endpoint,
DepositContract: testAcc.ContractAddr,
Reader: &goodReader{},
Logger: &goodLogger{},
HTTPLogger: &goodLogger{},
ContractBackend: testAcc.Backend,
})
if err != nil {
t.Fatalf("unable to setup web3 ETH1.0 chain service: %v", err)
}
web3Service = setDefaultMocks(web3Service)
web3Service.depositContractCaller, err = contracts.NewDepositContractCaller(testAcc.ContractAddr, testAcc.Backend)
if err != nil {
t.Fatal(err)
}
testAcc.Backend.Commit()
if err := web3Service.initDataFromContract(); err != nil {
@ -237,16 +232,17 @@ func TestWeb3Service_BadReader(t *testing.T) {
t.Fatalf("Unable to set up simulated backend %v", err)
}
web3Service, err := NewService(context.Background(), &Web3ServiceConfig{
Endpoint: endpoint,
ETH1Endpoint: endpoint,
DepositContract: testAcc.ContractAddr,
Reader: &badReader{},
Logger: &goodLogger{},
HTTPLogger: &goodLogger{},
ContractBackend: testAcc.Backend,
})
if err != nil {
t.Fatalf("unable to setup web3 ETH1.0 chain service: %v", err)
}
web3Service = setDefaultMocks(web3Service)
web3Service.depositContractCaller, err = contracts.NewDepositContractCaller(testAcc.ContractAddr, testAcc.Backend)
if err != nil {
t.Fatal(err)
}
testAcc.Backend.Commit()
web3Service.reader = &badReader{}
@ -296,12 +292,13 @@ func TestHandlePanic_OK(t *testing.T) {
hook := logTest.NewGlobal()
web3Service, err := NewService(context.Background(), &Web3ServiceConfig{
Endpoint: endpoint,
BlockFetcher: nil, // nil blockFetcher would panic if cached value not used
ETH1Endpoint: endpoint,
})
if err != nil {
t.Fatalf("unable to setup web3 ETH1.0 chain service: %v", err)
}
// nil blockFetcher would panic if cached value not used
web3Service.blockFetcher = nil
web3Service.processSubscribedHeaders(nil)
testutil.AssertLogsContain(t, hook, "Panicked when handling data from ETH 1.0 Chain!")

View File

@ -134,28 +134,7 @@ func (ps *ProposerServer) ProposeBlock(ctx context.Context, blk *ethpb.BeaconBlo
// - This is the eth1block to use for the block proposal.
func (ps *ProposerServer) eth1Data(ctx context.Context, slot uint64) (*ethpb.Eth1Data, error) {
if ps.mockEth1Votes {
// If a mock eth1 data votes is specified, we use the following for the
// eth1data we provide to every proposer based on https://github.com/ethereum/eth2.0-pm/issues/62:
//
// slot_in_voting_period = current_slot % SLOTS_PER_ETH1_VOTING_PERIOD
// Eth1Data(
// DepositRoot = hash(current_epoch + slot_in_voting_period),
// DepositCount = state.eth1_deposit_index,
// BlockHash = hash(hash(current_epoch + slot_in_voting_period)),
// )
slotInVotingPeriod := slot % params.BeaconConfig().SlotsPerEth1VotingPeriod
headState := ps.headFetcher.HeadState()
enc, err := ssz.Marshal(helpers.SlotToEpoch(slot) + slotInVotingPeriod)
if err != nil {
return nil, err
}
depRoot := hashutil.Hash(enc)
blockHash := hashutil.Hash(depRoot[:])
return &ethpb.Eth1Data{
DepositRoot: depRoot[:],
DepositCount: headState.Eth1DepositIndex,
BlockHash: blockHash[:],
}, nil
return ps.mockETH1DataVote(slot)
}
eth1VotingPeriodStartTime, _ := ps.eth1InfoFetcher.Eth2GenesisPowchainInfo()
@ -164,12 +143,39 @@ func (ps *ProposerServer) eth1Data(ctx context.Context, slot uint64) (*ethpb.Eth
// Look up most recent block up to timestamp
blockNumber, err := ps.eth1BlockFetcher.BlockNumberByTimestamp(ctx, eth1VotingPeriodStartTime)
if err != nil {
return nil, err
return ps.mockETH1DataVote(slot)
}
return ps.defaultEth1DataResponse(ctx, blockNumber)
}
func (ps *ProposerServer) mockETH1DataVote(slot uint64) (*ethpb.Eth1Data, error) {
log.Warn("Beacon Node is no longer connected to an ETH1 Chain, so " +
"ETH1 Data votes are now mocked.")
// If a mock eth1 data votes is specified, we use the following for the
// eth1data we provide to every proposer based on https://github.com/ethereum/eth2.0-pm/issues/62:
//
// slot_in_voting_period = current_slot % SLOTS_PER_ETH1_VOTING_PERIOD
// Eth1Data(
// DepositRoot = hash(current_epoch + slot_in_voting_period),
// DepositCount = state.eth1_deposit_index,
// BlockHash = hash(hash(current_epoch + slot_in_voting_period)),
// )
slotInVotingPeriod := slot % params.BeaconConfig().SlotsPerEth1VotingPeriod
headState := ps.headFetcher.HeadState()
enc, err := ssz.Marshal(helpers.SlotToEpoch(slot) + slotInVotingPeriod)
if err != nil {
return nil, err
}
depRoot := hashutil.Hash(enc)
blockHash := hashutil.Hash(depRoot[:])
return &ethpb.Eth1Data{
DepositRoot: depRoot[:],
DepositCount: headState.Eth1DepositIndex,
BlockHash: blockHash[:],
}, nil
}
// computeStateRoot computes the state root after a block has been processed through a state transition and
// returns it to the validator client.
func (ps *ProposerServer) computeStateRoot(ctx context.Context, block *ethpb.BeaconBlock) ([]byte, error) {

View File

@ -5,9 +5,10 @@ package db
import (
fmt "fmt"
proto "github.com/gogo/protobuf/proto"
io "io"
math "math"
proto "github.com/gogo/protobuf/proto"
)
// Reference imports to suppress errors if they are not otherwise used.