mirror of
https://gitlab.com/pulsechaincom/prysm-pulse.git
synced 2024-12-22 03:30:35 +00:00
backfill waits for init-sync before starting (#13623)
Co-authored-by: Kasey Kirkham <kasey@users.noreply.github.com>
This commit is contained in:
parent
f09fe4f038
commit
e5394fe081
@ -244,7 +244,8 @@ func New(cliCtx *cli.Context, cancel context.CancelFunc, opts ...Option) (*Beaco
|
||||
beacon.clockWaiter, forkchoice.NewROForkChoice(beacon.forkChoicer), beacon.stateGen)
|
||||
|
||||
pa := peers.NewAssigner(beacon.fetchP2P().Peers(), beacon.forkChoicer)
|
||||
beacon.BackfillOpts = append(beacon.BackfillOpts, backfill.WithVerifierWaiter(beacon.verifyInitWaiter))
|
||||
beacon.BackfillOpts = append(beacon.BackfillOpts, backfill.WithVerifierWaiter(beacon.verifyInitWaiter),
|
||||
backfill.WithInitSyncWaiter(initSyncWaiter(ctx, beacon.initialSyncComplete)))
|
||||
bf, err := backfill.NewService(ctx, bfs, beacon.BlobStorage, beacon.clockWaiter, beacon.fetchP2P(), pa, beacon.BackfillOpts...)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error initializing backfill service")
|
||||
@ -327,6 +328,16 @@ func New(cliCtx *cli.Context, cancel context.CancelFunc, opts ...Option) (*Beaco
|
||||
|
||||
return beacon, nil
|
||||
}
|
||||
func initSyncWaiter(ctx context.Context, complete chan struct{}) func() error {
|
||||
return func() error {
|
||||
select {
|
||||
case <-complete:
|
||||
return nil
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func newRouter(cliCtx *cli.Context) *mux.Router {
|
||||
var allowedOrigins []string
|
||||
|
@ -39,6 +39,7 @@ type Service struct {
|
||||
pa PeerAssigner
|
||||
batchImporter batchImporter
|
||||
blobStore *filesystem.BlobStorage
|
||||
initSyncWaiter func() error
|
||||
}
|
||||
|
||||
var _ runtime.Service = (*Service)(nil)
|
||||
@ -93,6 +94,15 @@ func WithBatchSize(n uint64) ServiceOption {
|
||||
}
|
||||
}
|
||||
|
||||
// WithInitSyncWaiter sets a function on the service which will block until init-sync
|
||||
// completes for the first time, or returns an error if context is canceled.
|
||||
func WithInitSyncWaiter(w func() error) ServiceOption {
|
||||
return func(s *Service) error {
|
||||
s.initSyncWaiter = w
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// InitializerWaiter is an interface that is satisfied by verification.InitializerWaiter.
|
||||
// Using this interface enables node init to satisfy this requirement for the backfill service
|
||||
// while also allowing backfill to mock it in tests.
|
||||
@ -261,8 +271,15 @@ func (s *Service) Start() {
|
||||
log.WithError(err).Error("Unable to initialize backfill verifier.")
|
||||
return
|
||||
}
|
||||
s.pool.spawn(ctx, s.nWorkers, clock, s.pa, s.verifier, s.ctxMap, s.newBlobVerifier, s.blobStore)
|
||||
|
||||
if s.initSyncWaiter != nil {
|
||||
log.Info("Backfill service waiting for initial-sync to reach head before starting.")
|
||||
if err := s.initSyncWaiter(); err != nil {
|
||||
log.WithError(err).Error("Error waiting for init-sync to complete.")
|
||||
return
|
||||
}
|
||||
}
|
||||
s.pool.spawn(ctx, s.nWorkers, clock, s.pa, s.verifier, s.ctxMap, s.newBlobVerifier, s.blobStore)
|
||||
s.batchSeq = newBatchSequencer(s.nWorkers, s.ms(s.clock.CurrentSlot()), primitives.Slot(status.LowSlot), primitives.Slot(s.batchSize))
|
||||
if err = s.initBatches(); err != nil {
|
||||
log.WithError(err).Error("Non-recoverable error in backfill service.")
|
||||
|
Loading…
Reference in New Issue
Block a user