diff --git a/core/blockchain.go b/core/blockchain.go index 1b3499994..c9717bb55 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -177,6 +177,7 @@ type BlockChain struct { // procInterrupt must be atomically called procInterrupt int32 // interrupt signaler for block processing wg sync.WaitGroup // chain processing wait group for shutting down + quitMu sync.RWMutex engine consensus.Engine validator Validator // Block and state validator interface @@ -874,8 +875,9 @@ func (bc *BlockChain) Stop() { // Unsubscribe all subscriptions registered from blockchain bc.scope.Close() close(bc.quit) - atomic.StoreInt32(&bc.procInterrupt, 1) - bc.wg.Wait() + + bc.waitJobs() + if bc.pruner != nil { bc.pruner.Stop() } @@ -987,8 +989,10 @@ type numberHash struct { func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain []types.Receipts, ancientLimit uint64) (int, error) { // We don't require the chainMu here since we want to maximize the // concurrency of header insertion and receipt insertion. - bc.wg.Add(1) - defer bc.wg.Done() + if err := bc.addJob(); err != nil { + return 0, nil + } + defer bc.doneJob() var ( ancientBlocks, liveBlocks types.Blocks @@ -1056,7 +1060,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ var deleted []*numberHash for i, block := range blockChain { // Short circuit insertion if shutting down or processing failed - if atomic.LoadInt32(&bc.procInterrupt) == 1 { + if bc.getProcInterrupt() { return 0, errInsertionInterrupted } // Short circuit insertion if it is required(used in testing only) @@ -1129,7 +1133,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ batch := bc.db.NewBatch() for i, block := range blockChain { // Short circuit insertion if shutting down or processing failed - if atomic.LoadInt32(&bc.procInterrupt) == 1 { + if bc.getProcInterrupt() { return 0, errInsertionInterrupted } // Short circuit if the owner header is unknown @@ -1196,14 +1200,14 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ return 0, nil } -var lastWrite uint64 - // writeBlockWithoutState writes only the block and its metadata to the database, // but does not write any state. This is used to construct competing side forks // up to the point where they exceed the canonical total difficulty. func (bc *BlockChain) writeBlockWithoutState(block *types.Block, td *big.Int) (err error) { - bc.wg.Add(1) - defer bc.wg.Done() + if err := bc.addJob(); err != nil { + return nil + } + defer bc.doneJob() if err := bc.hc.WriteTd(bc.db, block.Hash(), block.NumberU64(), td); err != nil { return err @@ -1216,8 +1220,10 @@ func (bc *BlockChain) writeBlockWithoutState(block *types.Block, td *big.Int) (e // writeKnownBlock updates the head block flag with a known block // and introduces chain reorg if necessary. func (bc *BlockChain) writeKnownBlock(block *types.Block) error { - bc.wg.Add(1) - defer bc.wg.Done() + if err := bc.addJob(); err != nil { + return nil + } + defer bc.doneJob() current := bc.CurrentBlock() if block.ParentHash() != current.Hash() { @@ -1244,8 +1250,10 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types. // writeBlockWithState writes the block and all associated state to the database, // but is expects the chain mutex to be held. func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.IntraBlockState, tds *state.TrieDbState) (status WriteStatus, err error) { - bc.wg.Add(1) - defer bc.wg.Done() + if err = bc.addJob(); err != nil { + return NonStatTy, nil + } + defer bc.doneJob() // Make sure no inconsistent state is leaked during insertion currentBlock := bc.CurrentBlock() @@ -1363,12 +1371,14 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { } // Only insert if the difficulty of the inserted chain is bigger than existing chain // Pre-checks passed, start the full block imports - bc.wg.Add(1) + if err := bc.addJob(); err != nil { + return 0, nil + } + defer bc.doneJob() ctx := bc.WithContext(context.Background(), chain[0].Number()) bc.chainmu.Lock() n, events, logs, err := bc.insertChain(ctx, chain, true) bc.chainmu.Unlock() - bc.wg.Done() bc.PostChainEvents(events, logs) return n, err @@ -1385,7 +1395,7 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { func (bc *BlockChain) insertChain(ctx context.Context, chain types.Blocks, verifySeals bool) (int, []interface{}, []*types.Log, error) { log.Info("Inserting chain", "start", chain[0].NumberU64(), "end", chain[len(chain)-1].NumberU64()) // If the chain is terminating, don't even bother starting u - if atomic.LoadInt32(&bc.procInterrupt) == 1 { + if bc.getProcInterrupt() { return 0, nil, nil, nil } // Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss) @@ -1487,7 +1497,7 @@ func (bc *BlockChain) insertChain(ctx context.Context, chain types.Blocks, verif k = i - offset } // If the chain is terminating, stop processing blocks - if atomic.LoadInt32(&bc.procInterrupt) == 1 { + if bc.getProcInterrupt() { log.Debug("Premature abort during blocks processing") break } @@ -2016,8 +2026,10 @@ func (bc *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (i bc.chainmu.Lock() defer bc.chainmu.Unlock() - bc.wg.Add(1) - defer bc.wg.Done() + if err := bc.addJob(); err != nil { + return 0, nil + } + defer bc.doneJob() whFunc := func(header *types.Header) error { _, err := bc.hc.WriteHeader(header) @@ -2178,3 +2190,25 @@ type Pruner interface { Start() error Stop() } + +func (bc *BlockChain) addJob() error { + bc.quitMu.RLock() + defer bc.quitMu.RUnlock() + if bc.getProcInterrupt() { + return errors.New("blockchain is stopped") + } + bc.wg.Add(1) + + return nil +} + +func (bc *BlockChain) doneJob() { + bc.wg.Done() +} + +func (bc *BlockChain) waitJobs() { + bc.quitMu.Lock() + atomic.StoreInt32(&bc.procInterrupt, 1) + bc.wg.Wait() + bc.quitMu.Unlock() +}