Add Sync Checker (#13580)

* fix it

* add it in

* typo

* fix tests

* fix tests

* export and add test

* preston's review
This commit is contained in:
Nishant Das 2024-02-06 10:34:30 +08:00 committed by GitHub
parent 55a29a4670
commit 6fa656c1ee
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 81 additions and 35 deletions

View File

@ -557,17 +557,9 @@ func (s *Service) RecentBlockSlot(root [32]byte) (primitives.Slot, error) {
return s.cfg.ForkChoiceStore.Slot(root) return s.cfg.ForkChoiceStore.Slot(root)
} }
// inRegularSync applies the following heuristics to decide if the node is in // inRegularSync queries the initial sync service to
// regular sync mode vs init sync mode using only forkchoice. // determine if the node is in regular sync or is still
// It checks that the highest received block is behind the current time by at least 2 epochs // syncing to the head of the chain.
// and that it was imported at least one epoch late if both of these
// tests pass then the node is in init sync. The caller of this function MUST
// have a lock on forkchoice
func (s *Service) inRegularSync() bool { func (s *Service) inRegularSync() bool {
currentSlot := s.CurrentSlot() return s.cfg.SyncChecker.Synced()
fc := s.cfg.ForkChoiceStore
if currentSlot-fc.HighestReceivedBlockSlot() < 2*params.BeaconConfig().SlotsPerEpoch {
return true
}
return fc.HighestReceivedBlockDelay() < params.BeaconConfig().SlotsPerEpoch
} }

View File

@ -593,26 +593,3 @@ func TestService_IsFinalized(t *testing.T) {
require.Equal(t, true, c.IsFinalized(ctx, br)) require.Equal(t, true, c.IsFinalized(ctx, br))
require.Equal(t, false, c.IsFinalized(ctx, [32]byte{'c'})) require.Equal(t, false, c.IsFinalized(ctx, [32]byte{'c'}))
} }
func TestService_inRegularSync(t *testing.T) {
ctx := context.Background()
c := &Service{cfg: &config{ForkChoiceStore: doublylinkedtree.New()}, head: &head{root: [32]byte{'b'}}}
ojc := &ethpb.Checkpoint{Root: params.BeaconConfig().ZeroHash[:]}
ofc := &ethpb.Checkpoint{Root: params.BeaconConfig().ZeroHash[:]}
st, blkRoot, err := prepareForkchoiceState(ctx, 100, [32]byte{'a'}, [32]byte{}, params.BeaconConfig().ZeroHash, ojc, ofc)
require.NoError(t, err)
require.NoError(t, c.cfg.ForkChoiceStore.InsertNode(ctx, st, blkRoot))
require.Equal(t, false, c.inRegularSync())
c.SetGenesisTime(time.Now().Add(time.Second * time.Duration(-1*int64(params.BeaconConfig().SlotsPerEpoch)*int64(params.BeaconConfig().SecondsPerSlot))))
st, blkRoot, err = prepareForkchoiceState(ctx, 128, [32]byte{'b'}, [32]byte{'a'}, params.BeaconConfig().ZeroHash, ojc, ofc)
require.NoError(t, err)
require.NoError(t, c.cfg.ForkChoiceStore.InsertNode(ctx, st, blkRoot))
require.Equal(t, false, c.inRegularSync())
c.SetGenesisTime(time.Now().Add(time.Second * time.Duration(-5*int64(params.BeaconConfig().SlotsPerEpoch)*int64(params.BeaconConfig().SecondsPerSlot))))
require.Equal(t, true, c.inRegularSync())
c.SetGenesisTime(time.Now().Add(time.Second * time.Duration(-1*int64(params.BeaconConfig().SlotsPerEpoch)*int64(params.BeaconConfig().SecondsPerSlot))))
c.cfg.ForkChoiceStore.SetGenesisTime(uint64(time.Now().Unix()))
require.Equal(t, true, c.inRegularSync())
}

View File

@ -198,3 +198,10 @@ func WithBlobStorage(b *filesystem.BlobStorage) Option {
return nil return nil
} }
} }
func WithSyncChecker(checker Checker) Option {
return func(s *Service) error {
s.cfg.SyncChecker = checker
return nil
}
}

View File

@ -93,6 +93,13 @@ type config struct {
BlockFetcher execution.POWBlockFetcher BlockFetcher execution.POWBlockFetcher
FinalizedStateAtStartUp state.BeaconState FinalizedStateAtStartUp state.BeaconState
ExecutionEngineCaller execution.EngineCaller ExecutionEngineCaller execution.EngineCaller
SyncChecker Checker
}
// Checker is an interface used to determine if a node is in initial sync
// or regular sync.
type Checker interface {
Synced() bool
} }
var ErrMissingClockSetter = errors.New("blockchain Service initialized without a startup.ClockSetter") var ErrMissingClockSetter = errors.New("blockchain Service initialized without a startup.ClockSetter")

View File

@ -6,6 +6,7 @@ import (
"testing" "testing"
"github.com/prysmaticlabs/prysm/v4/async/event" "github.com/prysmaticlabs/prysm/v4/async/event"
mock "github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain/testing"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/cache" "github.com/prysmaticlabs/prysm/v4/beacon-chain/cache"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/cache/depositcache" "github.com/prysmaticlabs/prysm/v4/beacon-chain/cache/depositcache"
statefeed "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed/state" statefeed "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed/state"
@ -118,6 +119,7 @@ func minimalTestService(t *testing.T, opts ...Option) (*Service, *testServiceReq
WithDepositCache(dc), WithDepositCache(dc),
WithTrackedValidatorsCache(cache.NewTrackedValidatorsCache()), WithTrackedValidatorsCache(cache.NewTrackedValidatorsCache()),
WithBlobStorage(filesystem.NewEphemeralBlobStorage(t)), WithBlobStorage(filesystem.NewEphemeralBlobStorage(t)),
WithSyncChecker(mock.MockChecker{}),
} }
// append the variadic opts so they override the defaults by being processed afterwards // append the variadic opts so they override the defaults by being processed afterwards
opts = append(defOpts, opts...) opts = append(defOpts, opts...)

View File

@ -180,6 +180,14 @@ func (mon *MockOperationNotifier) OperationFeed() *event.Feed {
return mon.feed return mon.feed
} }
// MockChecker is a mock sync checker.
type MockChecker struct{}
// Synced returns true.
func (_ MockChecker) Synced() bool {
return true
}
// ReceiveBlockInitialSync mocks ReceiveBlockInitialSync method in chain service. // ReceiveBlockInitialSync mocks ReceiveBlockInitialSync method in chain service.
func (s *ChainService) ReceiveBlockInitialSync(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, _ [32]byte) error { func (s *ChainService) ReceiveBlockInitialSync(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, _ [32]byte) error {
if s.State == nil { if s.State == nil {

View File

@ -120,6 +120,7 @@ type BeaconNode struct {
BlobStorage *filesystem.BlobStorage BlobStorage *filesystem.BlobStorage
blobRetentionEpochs primitives.Epoch blobRetentionEpochs primitives.Epoch
verifyInitWaiter *verification.InitializerWaiter verifyInitWaiter *verification.InitializerWaiter
syncChecker *initialsync.SyncChecker
} }
// New creates a new node instance, sets up configuration options, and registers // New creates a new node instance, sets up configuration options, and registers
@ -192,6 +193,7 @@ func New(cliCtx *cli.Context, cancel context.CancelFunc, opts ...Option) (*Beaco
} }
beacon.initialSyncComplete = make(chan struct{}) beacon.initialSyncComplete = make(chan struct{})
beacon.syncChecker = &initialsync.SyncChecker{}
for _, opt := range opts { for _, opt := range opts {
if err := opt(beacon); err != nil { if err := opt(beacon); err != nil {
return nil, err return nil, err
@ -674,6 +676,7 @@ func (b *BeaconNode) registerBlockchainService(fc forkchoice.ForkChoicer, gs *st
blockchain.WithBlobStorage(b.BlobStorage), blockchain.WithBlobStorage(b.BlobStorage),
blockchain.WithTrackedValidatorsCache(b.trackedValidatorsCache), blockchain.WithTrackedValidatorsCache(b.trackedValidatorsCache),
blockchain.WithPayloadIDCache(b.payloadIDCache), blockchain.WithPayloadIDCache(b.payloadIDCache),
blockchain.WithSyncChecker(b.syncChecker),
) )
blockchainService, err := blockchain.NewService(b.ctx, opts...) blockchainService, err := blockchain.NewService(b.ctx, opts...)
@ -767,6 +770,7 @@ func (b *BeaconNode) registerInitialSyncService(complete chan struct{}) error {
opts := []initialsync.Option{ opts := []initialsync.Option{
initialsync.WithVerifierWaiter(b.verifyInitWaiter), initialsync.WithVerifierWaiter(b.verifyInitWaiter),
initialsync.WithSyncChecker(b.syncChecker),
} }
is := initialsync.NewService(b.ctx, &initialsync.Config{ is := initialsync.NewService(b.ctx, &initialsync.Config{
DB: b.db, DB: b.db,

View File

@ -27,6 +27,7 @@ import (
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v4/runtime" "github.com/prysmaticlabs/prysm/v4/runtime"
"github.com/prysmaticlabs/prysm/v4/runtime/interop" "github.com/prysmaticlabs/prysm/v4/runtime/interop"
"github.com/prysmaticlabs/prysm/v4/testing/assert"
"github.com/prysmaticlabs/prysm/v4/testing/require" "github.com/prysmaticlabs/prysm/v4/testing/require"
logTest "github.com/sirupsen/logrus/hooks/test" logTest "github.com/sirupsen/logrus/hooks/test"
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
@ -91,6 +92,30 @@ func TestNodeStart_Ok(t *testing.T) {
require.LogsContain(t, hook, "Starting beacon node") require.LogsContain(t, hook, "Starting beacon node")
} }
func TestNodeStart_SyncChecker(t *testing.T) {
hook := logTest.NewGlobal()
app := cli.App{}
tmp := fmt.Sprintf("%s/datadirtest2", t.TempDir())
set := flag.NewFlagSet("test", 0)
set.String("datadir", tmp, "node data directory")
set.String("suggested-fee-recipient", "0x6e35733c5af9B61374A128e6F85f553aF09ff89A", "fee recipient")
require.NoError(t, set.Set("suggested-fee-recipient", "0x6e35733c5af9B61374A128e6F85f553aF09ff89A"))
ctx, cancel := newCliContextWithCancel(&app, set)
node, err := New(ctx, cancel, WithBlockchainFlagOptions([]blockchain.Option{}),
WithBuilderFlagOptions([]builder.Option{}),
WithExecutionChainOptions([]execution.Option{}),
WithBlobStorage(filesystem.NewEphemeralBlobStorage(t)))
require.NoError(t, err)
go func() {
node.Start()
}()
time.Sleep(3 * time.Second)
assert.NotNil(t, node.syncChecker.Svc)
node.Close()
require.LogsContain(t, hook, "Starting beacon node")
}
func TestNodeStart_Ok_registerDeterministicGenesisService(t *testing.T) { func TestNodeStart_Ok_registerDeterministicGenesisService(t *testing.T) {
numValidators := uint64(1) numValidators := uint64(1)
hook := logTest.NewGlobal() hook := logTest.NewGlobal()

View File

@ -71,6 +71,29 @@ func WithVerifierWaiter(viw *verification.InitializerWaiter) Option {
} }
} }
// WithSyncChecker registers the initial sync service
// in the checker.
func WithSyncChecker(checker *SyncChecker) Option {
return func(service *Service) {
checker.Svc = service
}
}
// SyncChecker allows other services to check the current status of
// initial-sync and use that internally in their service.
type SyncChecker struct {
Svc *Service
}
// Synced returns the status of the service.
func (s *SyncChecker) Synced() bool {
if s.Svc == nil {
log.Warn("Calling sync checker with a nil service initialized")
return false
}
return s.Svc.Synced()
}
// NewService configures the initial sync service responsible for bringing the node up to the // NewService configures the initial sync service responsible for bringing the node up to the
// latest head of the blockchain. // latest head of the blockchain.
func NewService(ctx context.Context, cfg *Config, opts ...Option) *Service { func NewService(ctx context.Context, cfg *Config, opts ...Option) *Service {

View File

@ -72,6 +72,7 @@ func startChainService(t testing.TB,
blockchain.WithPayloadIDCache(cache.NewPayloadIDCache()), blockchain.WithPayloadIDCache(cache.NewPayloadIDCache()),
blockchain.WithClockSynchronizer(startup.NewClockSynchronizer()), blockchain.WithClockSynchronizer(startup.NewClockSynchronizer()),
blockchain.WithBlobStorage(filesystem.NewEphemeralBlobStorage(t)), blockchain.WithBlobStorage(filesystem.NewEphemeralBlobStorage(t)),
blockchain.WithSyncChecker(mock.MockChecker{}),
) )
service, err := blockchain.NewService(context.Background(), opts...) service, err := blockchain.NewService(context.Background(), opts...)
require.NoError(t, err) require.NoError(t, err)