Retry Initialization of ETH1 Connection (#5730)

* retry failures
* Merge branch 'master' into retryInitialization
* fix test
* Merge branch 'retryInitialization' of https://github.com/prysmaticlabs/geth-sharding into retryInitialization
* Merge refs/heads/master into retryInitialization
* Merge refs/heads/master into retryInitialization
* Merge refs/heads/master into retryInitialization
* Merge refs/heads/master into retryInitialization
* Merge refs/heads/master into retryInitialization
* Merge refs/heads/master into retryInitialization
* fix method
* Merge branch 'retryInitialization' of https://github.com/prysmaticlabs/geth-sharding into retryInitialization
* Merge refs/heads/master into retryInitialization
* Merge refs/heads/master into retryInitialization
* Merge refs/heads/master into retryInitialization
* Merge refs/heads/master into retryInitialization
This commit is contained in:
Nishant Das 2020-05-05 17:06:43 +08:00 committed by GitHub
parent bbde2a6820
commit 2eac24cb79
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 66 additions and 29 deletions

View File

@ -555,35 +555,73 @@ func (s *Service) handleDelayTicker() {
}
}
func (s *Service) initPOWService() ethereum.Subscription {
// initialize a nil subscription
headSub := ethereum.Subscription(nil)
// reconnect to eth1 node in case of any failure
retryETH1Node := func(err error) {
s.runError = err
s.connectedETH1 = false
s.waitForConnection()
// reset value in the event of a successful connection.
s.runError = nil
}
// run in a select loop to retry in the event of any failures.
for {
select {
case <-s.ctx.Done():
return headSub
default:
err := s.initDataFromContract()
if err != nil {
log.Errorf("Unable to retrieve data from deposit contract %v", err)
retryETH1Node(err)
continue
}
headSub, err = s.reader.SubscribeNewHead(s.ctx, s.headerChan)
if err != nil {
log.Errorf("Unable to subscribe to incoming ETH1.0 chain headers: %v", err)
retryETH1Node(err)
continue
}
if headSub == nil {
log.Errorf("Nil head subscription received: %v", err)
retryETH1Node(err)
continue
}
header, err := s.blockFetcher.HeaderByNumber(context.Background(), nil)
if err != nil {
log.Errorf("Unable to retrieve latest ETH1.0 chain header: %v", err)
retryETH1Node(err)
continue
}
s.latestEth1Data.BlockHeight = header.Number.Uint64()
s.latestEth1Data.BlockHash = header.Hash().Bytes()
if err := s.processPastLogs(context.Background()); err != nil {
log.Errorf("Unable to process past logs %v", err)
retryETH1Node(err)
continue
}
return headSub
}
}
}
// run subscribes to all the services for the ETH1.0 chain.
func (s *Service) run(done <-chan struct{}) {
var err error
s.isRunning = true
s.runError = nil
if err := s.initDataFromContract(); err != nil {
log.Errorf("Unable to retrieve data from deposit contract %v", err)
return
}
headSub, err := s.reader.SubscribeNewHead(s.ctx, s.headerChan)
if err != nil {
log.Errorf("Unable to subscribe to incoming ETH1.0 chain headers: %v", err)
s.runError = err
return
}
header, err := s.blockFetcher.HeaderByNumber(context.Background(), nil)
if err != nil {
log.Errorf("Unable to retrieve latest ETH1.0 chain header: %v", err)
s.runError = err
return
}
s.latestEth1Data.BlockHeight = header.Number.Uint64()
s.latestEth1Data.BlockHash = header.Hash().Bytes()
if err := s.processPastLogs(context.Background()); err != nil {
log.Errorf("Unable to process past logs %v", err)
s.runError = err
headSub := s.initPOWService()
if headSub == nil {
log.Error("Received a nil head subscription, exiting service")
return
}

View File

@ -310,12 +310,11 @@ func TestWeb3Service_BadReader(t *testing.T) {
testAcc.Backend.Commit()
web3Service.reader = &badReader{}
web3Service.logger = &goodLogger{}
web3Service.run(web3Service.ctx.Done())
msg := hook.LastEntry().Message
go web3Service.initPOWService()
time.Sleep(200 * time.Millisecond)
web3Service.cancel()
want := "Unable to subscribe to incoming ETH1.0 chain headers: subscription has failed"
if msg != want {
t.Errorf("incorrect log, expected %s, got %s", want, msg)
}
testutil.AssertLogsContain(t, hook, want)
hook.Reset()
}