fix(headers): signal cleanly shutdown headers and allow exit (#5286)

This commit is contained in:
Max Revitt 2022-09-06 08:02:24 +01:00 committed by GitHub
parent f8c20afe8d
commit 488121f669
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 11 additions and 2 deletions

View File

@ -3,7 +3,6 @@ package stagedsync
import ( import (
"context" "context"
"encoding/binary" "encoding/binary"
"errors"
"fmt" "fmt"
"math/big" "math/big"
"runtime" "runtime"
@ -706,7 +705,8 @@ func forkingPoint(
func handleInterrupt(interrupt engineapi.Interrupt, cfg HeadersCfg, tx kv.RwTx, headerInserter *headerdownload.HeaderInserter, useExternalTx bool) (bool, error) { func handleInterrupt(interrupt engineapi.Interrupt, cfg HeadersCfg, tx kv.RwTx, headerInserter *headerdownload.HeaderInserter, useExternalTx bool) (bool, error) {
if interrupt != engineapi.None { if interrupt != engineapi.None {
if interrupt == engineapi.Stopping { if interrupt == engineapi.Stopping {
cfg.hd.PayloadStatusCh <- engineapi.PayloadStatus{CriticalError: errors.New("server is stopping")} close(cfg.hd.ShutdownCh)
return false, fmt.Errorf("server is stopping")
} }
if interrupt == engineapi.Synced && cfg.hd.HeadersCollector() != nil { if interrupt == engineapi.Synced && cfg.hd.HeadersCollector() != nil {
verifyAndSaveDownloadedPoSHeaders(tx, cfg, headerInserter) verifyAndSaveDownloadedPoSHeaders(tx, cfg, headerInserter)

View File

@ -309,6 +309,7 @@ type HeaderDownload struct {
headersCollector *etl.Collector // ETL collector for headers headersCollector *etl.Collector // ETL collector for headers
BeaconRequestList *engineapi.RequestList // Requests from ethbackend to staged sync BeaconRequestList *engineapi.RequestList // Requests from ethbackend to staged sync
PayloadStatusCh chan engineapi.PayloadStatus // Responses (validation/execution status) PayloadStatusCh chan engineapi.PayloadStatus // Responses (validation/execution status)
ShutdownCh chan struct{} // Channel to signal shutdown
pendingPayloadHash common.Hash // Header whose status we still should send to PayloadStatusCh pendingPayloadHash common.Hash // Header whose status we still should send to PayloadStatusCh
pendingPayloadStatus *engineapi.PayloadStatus // Alternatively, there can be an already prepared response to send to PayloadStatusCh pendingPayloadStatus *engineapi.PayloadStatus // Alternatively, there can be an already prepared response to send to PayloadStatusCh
unsettledForkChoice *engineapi.ForkChoiceMessage // Forkchoice to process after unwind unsettledForkChoice *engineapi.ForkChoiceMessage // Forkchoice to process after unwind
@ -344,6 +345,7 @@ func NewHeaderDownload(
QuitPoWMining: make(chan struct{}), QuitPoWMining: make(chan struct{}),
BeaconRequestList: engineapi.NewRequestList(), BeaconRequestList: engineapi.NewRequestList(),
PayloadStatusCh: make(chan engineapi.PayloadStatus, 1), PayloadStatusCh: make(chan engineapi.PayloadStatus, 1),
ShutdownCh: make(chan struct{}),
headerReader: headerReader, headerReader: headerReader,
badPoSHeaders: make(map[common.Hash]common.Hash), badPoSHeaders: make(map[common.Hash]common.Hash),
} }

View File

@ -79,6 +79,13 @@ func StageLoop(
for { for {
start := time.Now() start := time.Now()
select {
case <-hd.ShutdownCh:
return
default:
// continue
}
// Estimate the current top height seen from the peer // Estimate the current top height seen from the peer
height := hd.TopSeenHeight() height := hd.TopSeenHeight()
headBlockHash, err := StageLoopStep(ctx, db, sync, height, notifications, initialCycle, updateHead, nil) headBlockHash, err := StageLoopStep(ctx, db, sync, height, notifications, initialCycle, updateHead, nil)