diff --git a/beacon-chain/sync/initial-sync-old/service.go b/beacon-chain/sync/initial-sync-old/service.go index adc89b71e..14abc9386 100644 --- a/beacon-chain/sync/initial-sync-old/service.go +++ b/beacon-chain/sync/initial-sync-old/service.go @@ -47,6 +47,7 @@ type Config struct { // Service service. type Service struct { ctx context.Context + cancel context.CancelFunc chain blockchainService p2p p2p.P2P db db.ReadOnlyDatabase @@ -60,8 +61,10 @@ type Service struct { // NewInitialSync configures the initial sync service responsible for bringing the node up to the // latest head of the blockchain. func NewInitialSync(cfg *Config) *Service { + ctx, cancel := context.WithCancel(context.Background()) return &Service{ - ctx: context.Background(), + ctx: ctx, + cancel: cancel, chain: cfg.Chain, p2p: cfg.P2P, db: cfg.DB, @@ -139,6 +142,7 @@ func (s *Service) Start() { // Stop initial sync. func (s *Service) Stop() error { + s.cancel() return nil } diff --git a/beacon-chain/sync/initial-sync/service.go b/beacon-chain/sync/initial-sync/service.go index 1fa04e8ad..10501d903 100644 --- a/beacon-chain/sync/initial-sync/service.go +++ b/beacon-chain/sync/initial-sync/service.go @@ -47,6 +47,7 @@ type Config struct { // Service service. type Service struct { ctx context.Context + cancel context.CancelFunc chain blockchainService p2p p2p.P2P db db.ReadOnlyDatabase @@ -60,8 +61,10 @@ type Service struct { // NewInitialSync configures the initial sync service responsible for bringing the node up to the // latest head of the blockchain. func NewInitialSync(cfg *Config) *Service { + ctx, cancel := context.WithCancel(context.Background()) return &Service{ - ctx: context.Background(), + ctx: ctx, + cancel: cancel, chain: cfg.Chain, p2p: cfg.P2P, db: cfg.DB, @@ -157,6 +160,7 @@ func (s *Service) Start() { // Stop initial sync. func (s *Service) Stop() error { + s.cancel() return nil } diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go index cf3967ce1..362d1f74d 100644 --- a/beacon-chain/sync/service.go +++ b/beacon-chain/sync/service.go @@ -62,36 +62,6 @@ type blockchainService interface { blockchain.GenesisFetcher } -// NewRegularSync service. -func NewRegularSync(cfg *Config) *Service { - ctx, cancel := context.WithCancel(context.Background()) - r := &Service{ - ctx: ctx, - cancel: cancel, - db: cfg.DB, - p2p: cfg.P2P, - attPool: cfg.AttPool, - exitPool: cfg.ExitPool, - slashingPool: cfg.SlashingPool, - chain: cfg.Chain, - initialSync: cfg.InitialSync, - attestationNotifier: cfg.AttestationNotifier, - slotToPendingBlocks: make(map[uint64]*ethpb.SignedBeaconBlock), - seenPendingBlocks: make(map[[32]byte]bool), - blkRootToPendingAtts: make(map[[32]byte][]*ethpb.SignedAggregateAttestationAndProof), - stateNotifier: cfg.StateNotifier, - blockNotifier: cfg.BlockNotifier, - stateSummaryCache: cfg.StateSummaryCache, - stateGen: cfg.StateGen, - blocksRateLimiter: leakybucket.NewCollector(allowedBlocksPerSecond, allowedBlocksBurst, false /* deleteEmptyBuckets */), - } - - r.registerRPCHandlers() - go r.registerSubscribers() - - return r -} - // Service is responsible for handling all run time p2p related operations as the // main entry point for network messages. type Service struct { @@ -129,6 +99,36 @@ type Service struct { stateGen *stategen.State } +// NewRegularSync service. +func NewRegularSync(cfg *Config) *Service { + ctx, cancel := context.WithCancel(context.Background()) + r := &Service{ + ctx: ctx, + cancel: cancel, + db: cfg.DB, + p2p: cfg.P2P, + attPool: cfg.AttPool, + exitPool: cfg.ExitPool, + slashingPool: cfg.SlashingPool, + chain: cfg.Chain, + initialSync: cfg.InitialSync, + attestationNotifier: cfg.AttestationNotifier, + slotToPendingBlocks: make(map[uint64]*ethpb.SignedBeaconBlock), + seenPendingBlocks: make(map[[32]byte]bool), + blkRootToPendingAtts: make(map[[32]byte][]*ethpb.SignedAggregateAttestationAndProof), + stateNotifier: cfg.StateNotifier, + blockNotifier: cfg.BlockNotifier, + stateSummaryCache: cfg.StateSummaryCache, + stateGen: cfg.StateGen, + blocksRateLimiter: leakybucket.NewCollector(allowedBlocksPerSecond, allowedBlocksBurst, false /* deleteEmptyBuckets */), + } + + r.registerRPCHandlers() + go r.registerSubscribers() + + return r +} + // Start the regular sync service. func (r *Service) Start() { if err := r.initCaches(); err != nil {