From 3134066bba4f5a514de827ad761a65cff4fbc6d9 Mon Sep 17 00:00:00 2001 From: Evgeny Danilenko <6655321@bk.ru> Date: Sat, 6 Jun 2020 18:09:57 +0300 Subject: [PATCH] Fix stage 3 shutdown (#629) * check stop on every block inside jobs * stop spawnRecoverSendersStage properly * fmt * close recoverSenders goroutines * fmt --- eth/fetcher/block_fetcher.go | 2 +- eth/stagedsync/stage_senders.go | 26 +++++++++++++++++--------- eth/stagedsync/state.go | 11 +++++------ 3 files changed, 23 insertions(+), 16 deletions(-) diff --git a/eth/fetcher/block_fetcher.go b/eth/fetcher/block_fetcher.go index 87610f0fb..82c2222b9 100644 --- a/eth/fetcher/block_fetcher.go +++ b/eth/fetcher/block_fetcher.go @@ -205,7 +205,7 @@ func (f *BlockFetcher) Start() { // Stop terminates the announcement based synchroniser, canceling all pending // operations. func (f *BlockFetcher) Stop() { - close(f.quit) + common.SafeClose(f.quit) } // Notify announces the fetcher of the potential availability of a new block in diff --git a/eth/stagedsync/stage_senders.go b/eth/stagedsync/stage_senders.go index 506e9f5dd..4ae8efd01 100644 --- a/eth/stagedsync/stage_senders.go +++ b/eth/stagedsync/stage_senders.go @@ -5,6 +5,7 @@ import ( "fmt" "math/big" "runtime" + "sync" "github.com/pkg/errors" @@ -54,14 +55,16 @@ func spawnRecoverSendersStage(s *StageState, stateDB ethdb.Database, config *par jobs := make(chan *senderRecoveryJob, batchSize) out := make(chan *senderRecoveryJob, batchSize) + wg := &sync.WaitGroup{} + wg.Add(numOfGoroutines) defer func() { close(jobs) + wg.Wait() close(out) }() - for i := 0; i < numOfGoroutines; i++ { // each goroutine gets it's own crypto context to make sure they are really parallel - go recoverSenders(cryptoContexts[i], jobs, out, quitCh) + go recoverSenders(cryptoContexts[i], jobs, out, quitCh, wg) } log.Info("Sync (Senders): Started recoverer goroutines", "numOfGoroutines", numOfGoroutines) @@ -125,14 +128,10 @@ type senderRecoveryJob struct { err error } -func recoverSenders(cryptoContext *secp256k1.Context, in chan *senderRecoveryJob, out chan *senderRecoveryJob, quit chan struct{}) { - var job *senderRecoveryJob - for { - if err := common.Stopped(quit); err != nil { - return - } +func recoverSenders(cryptoContext *secp256k1.Context, in chan *senderRecoveryJob, out chan *senderRecoveryJob, quit chan struct{}, wg *sync.WaitGroup) { + defer wg.Done() - job = <-in + for job := range in { if job == nil { return } @@ -148,7 +147,16 @@ func recoverSenders(cryptoContext *secp256k1.Context, in chan *senderRecoveryJob break } } + + // prevent sending to close channel + if err := common.Stopped(quit); err != nil { + job.err = err + } out <- job + + if job.err == common.ErrStopped { + return + } } } diff --git a/eth/stagedsync/state.go b/eth/stagedsync/state.go index bb8820347..06f5013aa 100644 --- a/eth/stagedsync/state.go +++ b/eth/stagedsync/state.go @@ -15,6 +15,9 @@ func (s *State) Len() int { } func (s *State) NextStage() { + if s == nil { + return + } s.currentStage++ } @@ -52,9 +55,7 @@ func (s *StageState) Update(db ethdb.Putter, newBlockNum uint64) error { } func (s *StageState) Done() { - if s.state != nil { - s.state.NextStage() - } + s.state.NextStage() } func (s *StageState) ExecutionAt(db ethdb.Getter) (uint64, error) { @@ -63,8 +64,6 @@ func (s *StageState) ExecutionAt(db ethdb.Getter) (uint64, error) { func (s *StageState) DoneAndUpdate(db ethdb.Putter, newBlockNum uint64) error { err := stages.SaveStageProgress(db, s.Stage, newBlockNum) - if s.state != nil { - s.state.NextStage() - } + s.state.NextStage() return err }