Cancel Initial Sync Properly (#5529)

* fix cancelling
* Merge branch 'master' into cancelServices
* Merge refs/heads/master into cancelServices
* Merge refs/heads/master into cancelServices
This commit is contained in:
Nishant Das 2020-04-20 12:04:45 +08:00 committed by GitHub
parent 984644257e
commit 639e3072fe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 40 additions and 32 deletions

View File

@ -47,6 +47,7 @@ type Config struct {
// Service service. // Service service.
type Service struct { type Service struct {
ctx context.Context ctx context.Context
cancel context.CancelFunc
chain blockchainService chain blockchainService
p2p p2p.P2P p2p p2p.P2P
db db.ReadOnlyDatabase db db.ReadOnlyDatabase
@ -60,8 +61,10 @@ type Service struct {
// NewInitialSync configures the initial sync service responsible for bringing the node up to the // NewInitialSync configures the initial sync service responsible for bringing the node up to the
// latest head of the blockchain. // latest head of the blockchain.
func NewInitialSync(cfg *Config) *Service { func NewInitialSync(cfg *Config) *Service {
ctx, cancel := context.WithCancel(context.Background())
return &Service{ return &Service{
ctx: context.Background(), ctx: ctx,
cancel: cancel,
chain: cfg.Chain, chain: cfg.Chain,
p2p: cfg.P2P, p2p: cfg.P2P,
db: cfg.DB, db: cfg.DB,
@ -139,6 +142,7 @@ func (s *Service) Start() {
// Stop initial sync. // Stop initial sync.
func (s *Service) Stop() error { func (s *Service) Stop() error {
s.cancel()
return nil return nil
} }

View File

@ -47,6 +47,7 @@ type Config struct {
// Service service. // Service service.
type Service struct { type Service struct {
ctx context.Context ctx context.Context
cancel context.CancelFunc
chain blockchainService chain blockchainService
p2p p2p.P2P p2p p2p.P2P
db db.ReadOnlyDatabase db db.ReadOnlyDatabase
@ -60,8 +61,10 @@ type Service struct {
// NewInitialSync configures the initial sync service responsible for bringing the node up to the // NewInitialSync configures the initial sync service responsible for bringing the node up to the
// latest head of the blockchain. // latest head of the blockchain.
func NewInitialSync(cfg *Config) *Service { func NewInitialSync(cfg *Config) *Service {
ctx, cancel := context.WithCancel(context.Background())
return &Service{ return &Service{
ctx: context.Background(), ctx: ctx,
cancel: cancel,
chain: cfg.Chain, chain: cfg.Chain,
p2p: cfg.P2P, p2p: cfg.P2P,
db: cfg.DB, db: cfg.DB,
@ -157,6 +160,7 @@ func (s *Service) Start() {
// Stop initial sync. // Stop initial sync.
func (s *Service) Stop() error { func (s *Service) Stop() error {
s.cancel()
return nil return nil
} }

View File

@ -62,36 +62,6 @@ type blockchainService interface {
blockchain.GenesisFetcher blockchain.GenesisFetcher
} }
// NewRegularSync service.
func NewRegularSync(cfg *Config) *Service {
ctx, cancel := context.WithCancel(context.Background())
r := &Service{
ctx: ctx,
cancel: cancel,
db: cfg.DB,
p2p: cfg.P2P,
attPool: cfg.AttPool,
exitPool: cfg.ExitPool,
slashingPool: cfg.SlashingPool,
chain: cfg.Chain,
initialSync: cfg.InitialSync,
attestationNotifier: cfg.AttestationNotifier,
slotToPendingBlocks: make(map[uint64]*ethpb.SignedBeaconBlock),
seenPendingBlocks: make(map[[32]byte]bool),
blkRootToPendingAtts: make(map[[32]byte][]*ethpb.SignedAggregateAttestationAndProof),
stateNotifier: cfg.StateNotifier,
blockNotifier: cfg.BlockNotifier,
stateSummaryCache: cfg.StateSummaryCache,
stateGen: cfg.StateGen,
blocksRateLimiter: leakybucket.NewCollector(allowedBlocksPerSecond, allowedBlocksBurst, false /* deleteEmptyBuckets */),
}
r.registerRPCHandlers()
go r.registerSubscribers()
return r
}
// Service is responsible for handling all run time p2p related operations as the // Service is responsible for handling all run time p2p related operations as the
// main entry point for network messages. // main entry point for network messages.
type Service struct { type Service struct {
@ -129,6 +99,36 @@ type Service struct {
stateGen *stategen.State stateGen *stategen.State
} }
// NewRegularSync service.
func NewRegularSync(cfg *Config) *Service {
ctx, cancel := context.WithCancel(context.Background())
r := &Service{
ctx: ctx,
cancel: cancel,
db: cfg.DB,
p2p: cfg.P2P,
attPool: cfg.AttPool,
exitPool: cfg.ExitPool,
slashingPool: cfg.SlashingPool,
chain: cfg.Chain,
initialSync: cfg.InitialSync,
attestationNotifier: cfg.AttestationNotifier,
slotToPendingBlocks: make(map[uint64]*ethpb.SignedBeaconBlock),
seenPendingBlocks: make(map[[32]byte]bool),
blkRootToPendingAtts: make(map[[32]byte][]*ethpb.SignedAggregateAttestationAndProof),
stateNotifier: cfg.StateNotifier,
blockNotifier: cfg.BlockNotifier,
stateSummaryCache: cfg.StateSummaryCache,
stateGen: cfg.StateGen,
blocksRateLimiter: leakybucket.NewCollector(allowedBlocksPerSecond, allowedBlocksBurst, false /* deleteEmptyBuckets */),
}
r.registerRPCHandlers()
go r.registerSubscribers()
return r
}
// Start the regular sync service. // Start the regular sync service.
func (r *Service) Start() { func (r *Service) Start() {
if err := r.initCaches(); err != nil { if err := r.initCaches(); err != nil {