diff --git a/beacon-chain/blockchain/chain_info.go b/beacon-chain/blockchain/chain_info.go index c7dbf96c2..fb678c3f2 100644 --- a/beacon-chain/blockchain/chain_info.go +++ b/beacon-chain/blockchain/chain_info.go @@ -557,17 +557,9 @@ func (s *Service) RecentBlockSlot(root [32]byte) (primitives.Slot, error) { return s.cfg.ForkChoiceStore.Slot(root) } -// inRegularSync applies the following heuristics to decide if the node is in -// regular sync mode vs init sync mode using only forkchoice. -// It checks that the highest received block is behind the current time by at least 2 epochs -// and that it was imported at least one epoch late if both of these -// tests pass then the node is in init sync. The caller of this function MUST -// have a lock on forkchoice +// inRegularSync queries the initial sync service to +// determine if the node is in regular sync or is still +// syncing to the head of the chain. func (s *Service) inRegularSync() bool { - currentSlot := s.CurrentSlot() - fc := s.cfg.ForkChoiceStore - if currentSlot-fc.HighestReceivedBlockSlot() < 2*params.BeaconConfig().SlotsPerEpoch { - return true - } - return fc.HighestReceivedBlockDelay() < params.BeaconConfig().SlotsPerEpoch + return s.cfg.SyncChecker.Synced() } diff --git a/beacon-chain/blockchain/chain_info_test.go b/beacon-chain/blockchain/chain_info_test.go index 5d1187e73..044eb89c8 100644 --- a/beacon-chain/blockchain/chain_info_test.go +++ b/beacon-chain/blockchain/chain_info_test.go @@ -593,26 +593,3 @@ func TestService_IsFinalized(t *testing.T) { require.Equal(t, true, c.IsFinalized(ctx, br)) require.Equal(t, false, c.IsFinalized(ctx, [32]byte{'c'})) } - -func TestService_inRegularSync(t *testing.T) { - ctx := context.Background() - c := &Service{cfg: &config{ForkChoiceStore: doublylinkedtree.New()}, head: &head{root: [32]byte{'b'}}} - ojc := ðpb.Checkpoint{Root: params.BeaconConfig().ZeroHash[:]} - ofc := ðpb.Checkpoint{Root: params.BeaconConfig().ZeroHash[:]} - st, blkRoot, err := prepareForkchoiceState(ctx, 100, [32]byte{'a'}, [32]byte{}, params.BeaconConfig().ZeroHash, ojc, ofc) - require.NoError(t, err) - require.NoError(t, c.cfg.ForkChoiceStore.InsertNode(ctx, st, blkRoot)) - require.Equal(t, false, c.inRegularSync()) - c.SetGenesisTime(time.Now().Add(time.Second * time.Duration(-1*int64(params.BeaconConfig().SlotsPerEpoch)*int64(params.BeaconConfig().SecondsPerSlot)))) - st, blkRoot, err = prepareForkchoiceState(ctx, 128, [32]byte{'b'}, [32]byte{'a'}, params.BeaconConfig().ZeroHash, ojc, ofc) - require.NoError(t, err) - require.NoError(t, c.cfg.ForkChoiceStore.InsertNode(ctx, st, blkRoot)) - require.Equal(t, false, c.inRegularSync()) - - c.SetGenesisTime(time.Now().Add(time.Second * time.Duration(-5*int64(params.BeaconConfig().SlotsPerEpoch)*int64(params.BeaconConfig().SecondsPerSlot)))) - require.Equal(t, true, c.inRegularSync()) - - c.SetGenesisTime(time.Now().Add(time.Second * time.Duration(-1*int64(params.BeaconConfig().SlotsPerEpoch)*int64(params.BeaconConfig().SecondsPerSlot)))) - c.cfg.ForkChoiceStore.SetGenesisTime(uint64(time.Now().Unix())) - require.Equal(t, true, c.inRegularSync()) -} diff --git a/beacon-chain/blockchain/options.go b/beacon-chain/blockchain/options.go index 72110b3c7..0be880264 100644 --- a/beacon-chain/blockchain/options.go +++ b/beacon-chain/blockchain/options.go @@ -198,3 +198,10 @@ func WithBlobStorage(b *filesystem.BlobStorage) Option { return nil } } + +func WithSyncChecker(checker Checker) Option { + return func(s *Service) error { + s.cfg.SyncChecker = checker + return nil + } +} diff --git a/beacon-chain/blockchain/service.go b/beacon-chain/blockchain/service.go index 1825d9009..64a15f0bd 100644 --- a/beacon-chain/blockchain/service.go +++ b/beacon-chain/blockchain/service.go @@ -93,6 +93,13 @@ type config struct { BlockFetcher execution.POWBlockFetcher FinalizedStateAtStartUp state.BeaconState ExecutionEngineCaller execution.EngineCaller + SyncChecker Checker +} + +// Checker is an interface used to determine if a node is in initial sync +// or regular sync. +type Checker interface { + Synced() bool } var ErrMissingClockSetter = errors.New("blockchain Service initialized without a startup.ClockSetter") diff --git a/beacon-chain/blockchain/setup_test.go b/beacon-chain/blockchain/setup_test.go index 746d6e25f..a07f1aff4 100644 --- a/beacon-chain/blockchain/setup_test.go +++ b/beacon-chain/blockchain/setup_test.go @@ -6,6 +6,7 @@ import ( "testing" "github.com/prysmaticlabs/prysm/v4/async/event" + mock "github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain/testing" "github.com/prysmaticlabs/prysm/v4/beacon-chain/cache" "github.com/prysmaticlabs/prysm/v4/beacon-chain/cache/depositcache" statefeed "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed/state" @@ -118,6 +119,7 @@ func minimalTestService(t *testing.T, opts ...Option) (*Service, *testServiceReq WithDepositCache(dc), WithTrackedValidatorsCache(cache.NewTrackedValidatorsCache()), WithBlobStorage(filesystem.NewEphemeralBlobStorage(t)), + WithSyncChecker(mock.MockChecker{}), } // append the variadic opts so they override the defaults by being processed afterwards opts = append(defOpts, opts...) diff --git a/beacon-chain/blockchain/testing/mock.go b/beacon-chain/blockchain/testing/mock.go index c9b21ce43..28cc0ba0b 100644 --- a/beacon-chain/blockchain/testing/mock.go +++ b/beacon-chain/blockchain/testing/mock.go @@ -180,6 +180,14 @@ func (mon *MockOperationNotifier) OperationFeed() *event.Feed { return mon.feed } +// MockChecker is a mock sync checker. +type MockChecker struct{} + +// Synced returns true. +func (_ MockChecker) Synced() bool { + return true +} + // ReceiveBlockInitialSync mocks ReceiveBlockInitialSync method in chain service. func (s *ChainService) ReceiveBlockInitialSync(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, _ [32]byte) error { if s.State == nil { diff --git a/beacon-chain/node/node.go b/beacon-chain/node/node.go index e0747898f..8c59aed2c 100644 --- a/beacon-chain/node/node.go +++ b/beacon-chain/node/node.go @@ -120,6 +120,7 @@ type BeaconNode struct { BlobStorage *filesystem.BlobStorage blobRetentionEpochs primitives.Epoch verifyInitWaiter *verification.InitializerWaiter + syncChecker *initialsync.SyncChecker } // New creates a new node instance, sets up configuration options, and registers @@ -192,6 +193,7 @@ func New(cliCtx *cli.Context, cancel context.CancelFunc, opts ...Option) (*Beaco } beacon.initialSyncComplete = make(chan struct{}) + beacon.syncChecker = &initialsync.SyncChecker{} for _, opt := range opts { if err := opt(beacon); err != nil { return nil, err @@ -674,6 +676,7 @@ func (b *BeaconNode) registerBlockchainService(fc forkchoice.ForkChoicer, gs *st blockchain.WithBlobStorage(b.BlobStorage), blockchain.WithTrackedValidatorsCache(b.trackedValidatorsCache), blockchain.WithPayloadIDCache(b.payloadIDCache), + blockchain.WithSyncChecker(b.syncChecker), ) blockchainService, err := blockchain.NewService(b.ctx, opts...) @@ -767,6 +770,7 @@ func (b *BeaconNode) registerInitialSyncService(complete chan struct{}) error { opts := []initialsync.Option{ initialsync.WithVerifierWaiter(b.verifyInitWaiter), + initialsync.WithSyncChecker(b.syncChecker), } is := initialsync.NewService(b.ctx, &initialsync.Config{ DB: b.db, diff --git a/beacon-chain/node/node_test.go b/beacon-chain/node/node_test.go index d18b8ff97..14dee9182 100644 --- a/beacon-chain/node/node_test.go +++ b/beacon-chain/node/node_test.go @@ -27,6 +27,7 @@ import ( ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/v4/runtime" "github.com/prysmaticlabs/prysm/v4/runtime/interop" + "github.com/prysmaticlabs/prysm/v4/testing/assert" "github.com/prysmaticlabs/prysm/v4/testing/require" logTest "github.com/sirupsen/logrus/hooks/test" "github.com/urfave/cli/v2" @@ -91,6 +92,30 @@ func TestNodeStart_Ok(t *testing.T) { require.LogsContain(t, hook, "Starting beacon node") } +func TestNodeStart_SyncChecker(t *testing.T) { + hook := logTest.NewGlobal() + app := cli.App{} + tmp := fmt.Sprintf("%s/datadirtest2", t.TempDir()) + set := flag.NewFlagSet("test", 0) + set.String("datadir", tmp, "node data directory") + set.String("suggested-fee-recipient", "0x6e35733c5af9B61374A128e6F85f553aF09ff89A", "fee recipient") + require.NoError(t, set.Set("suggested-fee-recipient", "0x6e35733c5af9B61374A128e6F85f553aF09ff89A")) + + ctx, cancel := newCliContextWithCancel(&app, set) + node, err := New(ctx, cancel, WithBlockchainFlagOptions([]blockchain.Option{}), + WithBuilderFlagOptions([]builder.Option{}), + WithExecutionChainOptions([]execution.Option{}), + WithBlobStorage(filesystem.NewEphemeralBlobStorage(t))) + require.NoError(t, err) + go func() { + node.Start() + }() + time.Sleep(3 * time.Second) + assert.NotNil(t, node.syncChecker.Svc) + node.Close() + require.LogsContain(t, hook, "Starting beacon node") +} + func TestNodeStart_Ok_registerDeterministicGenesisService(t *testing.T) { numValidators := uint64(1) hook := logTest.NewGlobal() diff --git a/beacon-chain/sync/initial-sync/service.go b/beacon-chain/sync/initial-sync/service.go index 540a7b4a4..b22ad60df 100644 --- a/beacon-chain/sync/initial-sync/service.go +++ b/beacon-chain/sync/initial-sync/service.go @@ -71,6 +71,29 @@ func WithVerifierWaiter(viw *verification.InitializerWaiter) Option { } } +// WithSyncChecker registers the initial sync service +// in the checker. +func WithSyncChecker(checker *SyncChecker) Option { + return func(service *Service) { + checker.Svc = service + } +} + +// SyncChecker allows other services to check the current status of +// initial-sync and use that internally in their service. +type SyncChecker struct { + Svc *Service +} + +// Synced returns the status of the service. +func (s *SyncChecker) Synced() bool { + if s.Svc == nil { + log.Warn("Calling sync checker with a nil service initialized") + return false + } + return s.Svc.Synced() +} + // NewService configures the initial sync service responsible for bringing the node up to the // latest head of the blockchain. func NewService(ctx context.Context, cfg *Config, opts ...Option) *Service { diff --git a/testing/spectest/shared/common/forkchoice/service.go b/testing/spectest/shared/common/forkchoice/service.go index f506224d0..f3165381c 100644 --- a/testing/spectest/shared/common/forkchoice/service.go +++ b/testing/spectest/shared/common/forkchoice/service.go @@ -72,6 +72,7 @@ func startChainService(t testing.TB, blockchain.WithPayloadIDCache(cache.NewPayloadIDCache()), blockchain.WithClockSynchronizer(startup.NewClockSynchronizer()), blockchain.WithBlobStorage(filesystem.NewEphemeralBlobStorage(t)), + blockchain.WithSyncChecker(mock.MockChecker{}), ) service, err := blockchain.NewService(context.Background(), opts...) require.NoError(t, err)