wg data race

This commit is contained in:
Evgeny Danienko 2019-11-07 13:32:00 +03:00
parent c9e1cb8b77
commit e2605ba0ee

View File

@ -177,6 +177,7 @@ type BlockChain struct {
// procInterrupt must be atomically called
procInterrupt int32 // interrupt signaler for block processing
wg sync.WaitGroup // chain processing wait group for shutting down
quitMu sync.RWMutex
engine consensus.Engine
validator Validator // Block and state validator interface
@ -874,8 +875,9 @@ func (bc *BlockChain) Stop() {
// Unsubscribe all subscriptions registered from blockchain
bc.scope.Close()
close(bc.quit)
atomic.StoreInt32(&bc.procInterrupt, 1)
bc.wg.Wait()
bc.waitJobs()
if bc.pruner != nil {
bc.pruner.Stop()
}
@ -987,8 +989,10 @@ type numberHash struct {
func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain []types.Receipts, ancientLimit uint64) (int, error) {
// We don't require the chainMu here since we want to maximize the
// concurrency of header insertion and receipt insertion.
bc.wg.Add(1)
defer bc.wg.Done()
if err := bc.addJob(); err != nil {
return 0, nil
}
defer bc.doneJob()
var (
ancientBlocks, liveBlocks types.Blocks
@ -1056,7 +1060,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
var deleted []*numberHash
for i, block := range blockChain {
// Short circuit insertion if shutting down or processing failed
if atomic.LoadInt32(&bc.procInterrupt) == 1 {
if bc.getProcInterrupt() {
return 0, errInsertionInterrupted
}
// Short circuit insertion if it is required(used in testing only)
@ -1129,7 +1133,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
batch := bc.db.NewBatch()
for i, block := range blockChain {
// Short circuit insertion if shutting down or processing failed
if atomic.LoadInt32(&bc.procInterrupt) == 1 {
if bc.getProcInterrupt() {
return 0, errInsertionInterrupted
}
// Short circuit if the owner header is unknown
@ -1196,14 +1200,14 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
return 0, nil
}
var lastWrite uint64
// writeBlockWithoutState writes only the block and its metadata to the database,
// but does not write any state. This is used to construct competing side forks
// up to the point where they exceed the canonical total difficulty.
func (bc *BlockChain) writeBlockWithoutState(block *types.Block, td *big.Int) (err error) {
bc.wg.Add(1)
defer bc.wg.Done()
if err := bc.addJob(); err != nil {
return nil
}
defer bc.doneJob()
if err := bc.hc.WriteTd(bc.db, block.Hash(), block.NumberU64(), td); err != nil {
return err
@ -1216,8 +1220,10 @@ func (bc *BlockChain) writeBlockWithoutState(block *types.Block, td *big.Int) (e
// writeKnownBlock updates the head block flag with a known block
// and introduces chain reorg if necessary.
func (bc *BlockChain) writeKnownBlock(block *types.Block) error {
bc.wg.Add(1)
defer bc.wg.Done()
if err := bc.addJob(); err != nil {
return nil
}
defer bc.doneJob()
current := bc.CurrentBlock()
if block.ParentHash() != current.Hash() {
@ -1244,8 +1250,10 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.
// writeBlockWithState writes the block and all associated state to the database,
// but is expects the chain mutex to be held.
func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.IntraBlockState, tds *state.TrieDbState) (status WriteStatus, err error) {
bc.wg.Add(1)
defer bc.wg.Done()
if err = bc.addJob(); err != nil {
return NonStatTy, nil
}
defer bc.doneJob()
// Make sure no inconsistent state is leaked during insertion
currentBlock := bc.CurrentBlock()
@ -1363,12 +1371,14 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
}
// Only insert if the difficulty of the inserted chain is bigger than existing chain
// Pre-checks passed, start the full block imports
bc.wg.Add(1)
if err := bc.addJob(); err != nil {
return 0, nil
}
defer bc.doneJob()
ctx := bc.WithContext(context.Background(), chain[0].Number())
bc.chainmu.Lock()
n, events, logs, err := bc.insertChain(ctx, chain, true)
bc.chainmu.Unlock()
bc.wg.Done()
bc.PostChainEvents(events, logs)
return n, err
@ -1385,7 +1395,7 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
func (bc *BlockChain) insertChain(ctx context.Context, chain types.Blocks, verifySeals bool) (int, []interface{}, []*types.Log, error) {
log.Info("Inserting chain", "start", chain[0].NumberU64(), "end", chain[len(chain)-1].NumberU64())
// If the chain is terminating, don't even bother starting u
if atomic.LoadInt32(&bc.procInterrupt) == 1 {
if bc.getProcInterrupt() {
return 0, nil, nil, nil
}
// Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss)
@ -1487,7 +1497,7 @@ func (bc *BlockChain) insertChain(ctx context.Context, chain types.Blocks, verif
k = i - offset
}
// If the chain is terminating, stop processing blocks
if atomic.LoadInt32(&bc.procInterrupt) == 1 {
if bc.getProcInterrupt() {
log.Debug("Premature abort during blocks processing")
break
}
@ -2016,8 +2026,10 @@ func (bc *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (i
bc.chainmu.Lock()
defer bc.chainmu.Unlock()
bc.wg.Add(1)
defer bc.wg.Done()
if err := bc.addJob(); err != nil {
return 0, nil
}
defer bc.doneJob()
whFunc := func(header *types.Header) error {
_, err := bc.hc.WriteHeader(header)
@ -2178,3 +2190,25 @@ type Pruner interface {
Start() error
Stop()
}
func (bc *BlockChain) addJob() error {
bc.quitMu.RLock()
defer bc.quitMu.RUnlock()
if bc.getProcInterrupt() {
return errors.New("blockchain is stopped")
}
bc.wg.Add(1)
return nil
}
func (bc *BlockChain) doneJob() {
bc.wg.Done()
}
func (bc *BlockChain) waitJobs() {
bc.quitMu.Lock()
atomic.StoreInt32(&bc.procInterrupt, 1)
bc.wg.Wait()
bc.quitMu.Unlock()
}