mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2024-12-31 16:21:21 +00:00
eth: interrupt chain insertion on shutdown (#21114)
This adds a new API method on core.BlockChain to allow interrupting running data inserts, and calls the method before shutting down the downloader. The BlockChain interrupt checks are now done through a method instead of inlining the atomic load everywhere. There is no loss of efficiency from this and it makes the interrupt protocol a lot clearer because the check is defined next to the method that sets the flag. # Conflicts: # core/blockchain.go # light/lightchain.go
This commit is contained in:
parent
0550e2a362
commit
a9f5546d2e
@ -176,11 +176,10 @@ type BlockChain struct {
|
||||
txLookupCache *lru.Cache // Cache for the most recent transaction lookup data.
|
||||
futureBlocks *lru.Cache // future blocks are blocks added for later processing
|
||||
|
||||
quit chan struct{} // blockchain quit channel
|
||||
running int32 // running must be called atomically
|
||||
// procInterrupt must be atomically called
|
||||
procInterrupt int32 // interrupt signaler for block processing
|
||||
quit chan struct{} // blockchain quit channel
|
||||
wg sync.WaitGroup // chain processing wait group for shutting down
|
||||
running int32 // 0 if chain is running, 1 when stopped (must be called atomically)
|
||||
procInterrupt int32 // interrupt signaler for block processing
|
||||
quitMu sync.RWMutex
|
||||
|
||||
engine consensus.Engine
|
||||
@ -257,7 +256,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
|
||||
bc.processor = NewStateProcessor(chainConfig, bc, engine)
|
||||
|
||||
var err error
|
||||
bc.hc, err = NewHeaderChain(cdb, chainConfig, engine, bc.getProcInterrupt)
|
||||
bc.hc, err = NewHeaderChain(cdb, chainConfig, engine, bc.insertStopped)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -391,10 +390,6 @@ func (bc *BlockChain) GetTrieDbStateByBlock(root common.Hash, blockNr uint64) (*
|
||||
return bc.trieDbState, nil
|
||||
}
|
||||
|
||||
func (bc *BlockChain) getProcInterrupt() bool {
|
||||
return atomic.LoadInt32(&bc.procInterrupt) == 1
|
||||
}
|
||||
|
||||
// GetVMConfig returns the block chain VM config.
|
||||
func (bc *BlockChain) GetVMConfig() *vm.Config {
|
||||
return &bc.vmConfig
|
||||
@ -884,7 +879,10 @@ func (bc *BlockChain) Stop() {
|
||||
bc.scope.Close()
|
||||
close(bc.quit)
|
||||
|
||||
bc.waitJobs()
|
||||
bc.quitMu.Lock()
|
||||
bc.StopInsert()
|
||||
bc.wg.Wait()
|
||||
bc.quitMu.Unlock()
|
||||
|
||||
if bc.pruner != nil {
|
||||
bc.pruner.Stop()
|
||||
@ -892,6 +890,18 @@ func (bc *BlockChain) Stop() {
|
||||
log.Info("Blockchain stopped")
|
||||
}
|
||||
|
||||
// StopInsert interrupts all insertion methods, causing them to return
|
||||
// errInsertionInterrupted as soon as possible. Insertion is permanently disabled after
|
||||
// calling this method.
|
||||
func (bc *BlockChain) StopInsert() {
|
||||
atomic.StoreInt32(&bc.procInterrupt, 1)
|
||||
}
|
||||
|
||||
// insertStopped returns true after StopInsert has been called.
|
||||
func (bc *BlockChain) insertStopped() bool {
|
||||
return atomic.LoadInt32(&bc.procInterrupt) == 1
|
||||
}
|
||||
|
||||
func (bc *BlockChain) procFutureBlocks() {
|
||||
blocks := make([]*types.Block, 0, bc.futureBlocks.Len())
|
||||
for _, hash := range bc.futureBlocks.Keys() {
|
||||
@ -1154,7 +1164,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
|
||||
batch := bc.db.NewBatch()
|
||||
for i, block := range blockChain {
|
||||
// Short circuit insertion if shutting down or processing failed
|
||||
if bc.getProcInterrupt() {
|
||||
if bc.insertStopped() {
|
||||
return 0, errInsertionInterrupted
|
||||
}
|
||||
// Short circuit if the owner header is unknown
|
||||
@ -1615,8 +1625,8 @@ func (bc *BlockChain) insertChain(ctx context.Context, chain types.Blocks, verif
|
||||
}
|
||||
|
||||
// If the chain is terminating, stop processing blocks
|
||||
if bc.getProcInterrupt() {
|
||||
log.Debug("Premature abort during blocks processing")
|
||||
if bc.insertStopped() {
|
||||
log.Debug("Abort during block processing")
|
||||
break
|
||||
}
|
||||
|
||||
@ -2366,13 +2376,6 @@ func (bc *BlockChain) doneJob() {
|
||||
bc.wg.Done()
|
||||
}
|
||||
|
||||
func (bc *BlockChain) waitJobs() {
|
||||
bc.quitMu.Lock()
|
||||
atomic.StoreInt32(&bc.procInterrupt, 1)
|
||||
bc.wg.Wait()
|
||||
bc.quitMu.Unlock()
|
||||
}
|
||||
|
||||
// ExecuteBlockEphemerally runs a block from provided stateReader and
|
||||
// writes the result to the provided stateWriter
|
||||
func ExecuteBlockEphemerally(
|
||||
|
@ -198,7 +198,6 @@ func (cs *chainSyncer) loop() {
|
||||
cs.pm.txFetcher.Start()
|
||||
defer cs.pm.blockFetcher.Stop()
|
||||
defer cs.pm.txFetcher.Stop()
|
||||
defer cs.pm.downloader.Terminate()
|
||||
|
||||
// The force timer lowers the peer count threshold down to one when it fires.
|
||||
// This ensures we'll always start sync even if there aren't enough peers.
|
||||
@ -221,8 +220,13 @@ func (cs *chainSyncer) loop() {
|
||||
cs.forced = true
|
||||
|
||||
case <-cs.pm.quitSync:
|
||||
// Disable all insertion on the blockchain. This needs to happen before
|
||||
// terminating the downloader because the downloader waits for blockchain
|
||||
// inserts, and these can take a long time to finish.
|
||||
cs.pm.blockchain.StopInsert()
|
||||
cs.pm.downloader.Terminate()
|
||||
if cs.doneCh != nil {
|
||||
cs.pm.downloader.Terminate() // Double term is fine, Cancel would block until queue is emptied
|
||||
// Wait for the current sync to end.
|
||||
<-cs.doneCh
|
||||
}
|
||||
return
|
||||
|
Loading…
Reference in New Issue
Block a user