Use RunEvery in place of custom tickers (#4290)

This commit is contained in:
Jim McDonald 2019-12-16 17:00:15 +00:00 committed by Raul Jordan
parent 3f344aee55
commit d9062a7e30
5 changed files with 22 additions and 32 deletions

View File

@ -6,6 +6,7 @@ go_library(
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/p2p/connmgr",
visibility = ["//beacon-chain:__subpackages__"],
deps = [
"//shared/runutil:go_default_library",
"@com_github_ipfs_go_log//:go_default_library",
"@com_github_libp2p_go_libp2p_core//connmgr:go_default_library",
"@com_github_libp2p_go_libp2p_core//network:go_default_library",

View File

@ -35,6 +35,7 @@ import (
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"
"github.com/prysmaticlabs/prysm/shared/runutil"
)
// SilencePeriod refers to the period in which a connection is given leeway by the connection
@ -139,7 +140,13 @@ func NewConnManager(low, hi int, grace time.Duration) *BasicConnMgr {
}(),
}
go cm.background()
// Check every TickerPeriod to see if we should trim the number of active connections.
runutil.RunEvery(cm.ctx, TickerPeriod, func() {
if atomic.LoadInt32(&cm.connCount) > int32(cm.highWater) {
cm.TrimOpenConns(cm.ctx)
}
})
return cm
}
@ -220,23 +227,6 @@ func (cm *BasicConnMgr) TrimOpenConns(ctx context.Context) {
cm.lastTrim = time.Now()
}
func (cm *BasicConnMgr) background() {
ticker := time.NewTicker(TickerPeriod)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if atomic.LoadInt32(&cm.connCount) > int32(cm.highWater) {
cm.TrimOpenConns(cm.ctx)
}
case <-cm.ctx.Done():
return
}
}
}
// getConnsToClose runs the heuristics described in TrimOpenConns and returns the
// connections to close.
func (cm *BasicConnMgr) getConnsToClose(ctx context.Context) []network.Conn {

View File

@ -213,6 +213,7 @@ func (s *Service) Start() {
// Stop the p2p service and terminate all peer connections.
func (s *Service) Stop() error {
defer s.cancel()
s.started = false
if s.dv5Listener != nil {
s.dv5Listener.Close()

View File

@ -11,6 +11,7 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/runutil"
"github.com/prysmaticlabs/prysm/shared/traceutil"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
@ -21,17 +22,10 @@ var processPendingBlocksPeriod = time.Duration(params.BeaconConfig().SecondsPerS
// processes pending blocks queue on every processPendingBlocksPeriod
func (r *RegularSync) processPendingBlocksQueue() {
ticker := time.NewTicker(processPendingBlocksPeriod)
for {
ctx := context.TODO()
select {
case <-ticker.C:
r.processPendingBlocks(ctx)
case <-r.ctx.Done():
log.Debug("Context closed, exiting routine")
return
}
}
ctx := context.Background()
runutil.RunEvery(r.ctx, processPendingBlocksPeriod, func() {
r.processPendingBlocks(ctx)
})
}
// processes the block tree inside the queue

View File

@ -38,8 +38,10 @@ type blockchainService interface {
// NewRegularSync service.
func NewRegularSync(cfg *Config) *RegularSync {
ctx, cancel := context.WithCancel(context.Background())
r := &RegularSync{
ctx: context.Background(),
ctx: ctx,
cancel: cancel,
db: cfg.DB,
p2p: cfg.P2P,
operations: cfg.Operations,
@ -60,6 +62,7 @@ func NewRegularSync(cfg *Config) *RegularSync {
// main entry point for network messages.
type RegularSync struct {
ctx context.Context
cancel context.CancelFunc
p2p p2p.P2P
db db.Database
operations *operations.Service
@ -77,12 +80,13 @@ type RegularSync struct {
func (r *RegularSync) Start() {
r.p2p.AddConnectionHandler(r.sendRPCStatusRequest)
r.p2p.AddDisconnectionHandler(r.removeDisconnectedPeerStatus)
go r.processPendingBlocksQueue()
go r.maintainPeerStatuses()
r.processPendingBlocksQueue()
r.maintainPeerStatuses()
}
// Stop the regular sync service.
func (r *RegularSync) Stop() error {
defer r.cancel()
return nil
}