Fix stage 3 shutdown (#629)

* check stop on every block inside jobs

* stop spawnRecoverSendersStage properly

* fmt

* close recoverSenders goroutines

* fmt
This commit is contained in:
Evgeny Danilenko 2020-06-06 18:09:57 +03:00 committed by GitHub
parent ec3a5f6d38
commit 3134066bba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 23 additions and 16 deletions

View File

@ -205,7 +205,7 @@ func (f *BlockFetcher) Start() {
// Stop terminates the announcement based synchroniser, canceling all pending
// operations.
func (f *BlockFetcher) Stop() {
close(f.quit)
common.SafeClose(f.quit)
}
// Notify announces the fetcher of the potential availability of a new block in

View File

@ -5,6 +5,7 @@ import (
"fmt"
"math/big"
"runtime"
"sync"
"github.com/pkg/errors"
@ -54,14 +55,16 @@ func spawnRecoverSendersStage(s *StageState, stateDB ethdb.Database, config *par
jobs := make(chan *senderRecoveryJob, batchSize)
out := make(chan *senderRecoveryJob, batchSize)
wg := &sync.WaitGroup{}
wg.Add(numOfGoroutines)
defer func() {
close(jobs)
wg.Wait()
close(out)
}()
for i := 0; i < numOfGoroutines; i++ {
// each goroutine gets it's own crypto context to make sure they are really parallel
go recoverSenders(cryptoContexts[i], jobs, out, quitCh)
go recoverSenders(cryptoContexts[i], jobs, out, quitCh, wg)
}
log.Info("Sync (Senders): Started recoverer goroutines", "numOfGoroutines", numOfGoroutines)
@ -125,14 +128,10 @@ type senderRecoveryJob struct {
err error
}
func recoverSenders(cryptoContext *secp256k1.Context, in chan *senderRecoveryJob, out chan *senderRecoveryJob, quit chan struct{}) {
var job *senderRecoveryJob
for {
if err := common.Stopped(quit); err != nil {
return
}
func recoverSenders(cryptoContext *secp256k1.Context, in chan *senderRecoveryJob, out chan *senderRecoveryJob, quit chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()
job = <-in
for job := range in {
if job == nil {
return
}
@ -148,7 +147,16 @@ func recoverSenders(cryptoContext *secp256k1.Context, in chan *senderRecoveryJob
break
}
}
// prevent sending to close channel
if err := common.Stopped(quit); err != nil {
job.err = err
}
out <- job
if job.err == common.ErrStopped {
return
}
}
}

View File

@ -15,6 +15,9 @@ func (s *State) Len() int {
}
func (s *State) NextStage() {
if s == nil {
return
}
s.currentStage++
}
@ -52,9 +55,7 @@ func (s *StageState) Update(db ethdb.Putter, newBlockNum uint64) error {
}
func (s *StageState) Done() {
if s.state != nil {
s.state.NextStage()
}
}
func (s *StageState) ExecutionAt(db ethdb.Getter) (uint64, error) {
@ -63,8 +64,6 @@ func (s *StageState) ExecutionAt(db ethdb.Getter) (uint64, error) {
func (s *StageState) DoneAndUpdate(db ethdb.Putter, newBlockNum uint64) error {
err := stages.SaveStageProgress(db, s.Stage, newBlockNum)
if s.state != nil {
s.state.NextStage()
}
return err
}