prysm-pulse/beacon-chain/startup/synchronizer.go
kasey 918129cf36
Replace statefeed Initialize (#12285)
* refactor initialization to blocking startup method

* require genesisSetter in blockchain, fix tests

* work-around gazelle weirdness

* fix dep gazelle ignores

* only call SetGenesis once

* fix typo

* validator test setup and fix to return right error

* move waitForChainStart to Start

* wire up sync Service.genesisWaiter

* fix p2p genesisWaiter plumbing

* remove extra clock type, integrate into genesis

and rename

* use time.Now when no Nower is specified

* remove unused ClockSetter

* simplify rpc context checking

* fix typo

* use clock everywhere in sync; [32]byte val root

* don't use DeepEqual to compare [32]byte and []byte

* don't use clock in init sync, not wired up yet

* use clock waiter in blockchain as well

* use cancelable contexts in tests with goroutines

* missed a reference to WithClockSetter

* Update beacon-chain/startup/genesis.go

Co-authored-by: Radosław Kapka <rkapka@wp.pl>

* Update beacon-chain/blockchain/service_test.go

Co-authored-by: Radosław Kapka <rkapka@wp.pl>

* more clear docs

* doc for NewClock

* move clock typedef to more logical file name

* adding documentation

* gaz

* fixes for capella

* reducing test raciness

* fix races in committee cache tests

* lint

* add tests on Duration slot math helper

* startup package test coverage

* fix bad merge

* set non-zero genesis time in tests that call Start

* happy deepsource, happy me-epsource

* replace Synced event with channel

* remove unused error

* remove accidental wip commit

* gaz!

* remove unused event constants

* remove sync statefeed subscription to fix deadlock

* remove state notifier

* fix build

---------

Co-authored-by: Kasey Kirkham <kasey@users.noreply.github.com>
Co-authored-by: Radosław Kapka <rkapka@wp.pl>
Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
Co-authored-by: nisdas <nishdas93@gmail.com>
2023-05-03 04:34:01 +00:00

61 lines
2.1 KiB
Go

package startup
import (
"context"
"github.com/pkg/errors"
)
var errClockSet = errors.New("refusing to change clock after it is set")
// ClockSynchronizer provides a synchronization mechanism for services that rely on the genesis time and validator root
// being known before getting to work.
type ClockSynchronizer struct {
ready chan struct{}
c *Clock
}
// ClockWaiter specifies the WaitForClock method. ClockSynchronizer works in a 1:N pattern, with 1 thread calling
// SetClock, and the others blocking on a call to WaitForClock until the expected *Clock value is set.
type ClockWaiter interface {
WaitForClock(context.Context) (*Clock, error)
}
// ClockSetter specifies the SetClock method. ClockSynchronizer works in a 1:N pattern, so in a given graph of services,
// only one service should be given the ClockSetter, and all others relying on the service's activation should use
// ClockWaiter.
type ClockSetter interface {
SetClock(c *Clock) error
}
// SetClock sets the Clock value `c` and unblocks all threads waiting for `c` via WaitForClock.
// Calling SetClock more than once will return an error, as calling this function is meant to be a signal
// that the system is ready to start.
func (w *ClockSynchronizer) SetClock(c *Clock) error {
if w.c != nil {
return errors.Wrapf(errClockSet, "when SetClock called, Clock already set to time=%d", w.c.GenesisTime().Unix())
}
w.c = c
close(w.ready)
return nil
}
// WaitForClock will block the caller until the *Clock value is available. If the provided context is canceled (eg via
// a deadline set upstream), the function will return the error given by ctx.Err().
func (w *ClockSynchronizer) WaitForClock(ctx context.Context) (*Clock, error) {
select {
case <-w.ready:
return w.c, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}
// NewClockSynchronizer initializes a single instance of ClockSynchronizer that must be used by all ClockWaiters that
// need to be synchronized to a ClockSetter (ie blockchain service).
func NewClockSynchronizer() *ClockSynchronizer {
return &ClockSynchronizer{
ready: make(chan struct{}),
}
}