From f6764fe62b9ca43d5777e3c476a9620c7207a940 Mon Sep 17 00:00:00 2001 From: Potuz Date: Wed, 10 May 2023 09:48:51 -0300 Subject: [PATCH] Add a new slot ticker and use it on attestation aggregation (#12377) * Add slot ticker with intervals * add flags for aggregation duration * misspelling * hide flags * fix flags and default durations * lint * wait for initial sync * deep source * add log * Preston's review * fix error message --------- Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com> --- beacon-chain/node/node.go | 3 +- .../operations/attestations/BUILD.bazel | 1 + .../attestations/prepare_forkchoice.go | 24 +++++-- .../operations/attestations/service.go | 20 +++++- config/features/config.go | 4 ++ config/features/flags.go | 21 +++++++ time/slots/BUILD.bazel | 1 + time/slots/slotticker.go | 63 +++++++++++++++++++ time/slots/slotticker_test.go | 48 ++++++++++++++ time/slots/slottime.go | 11 +++- 10 files changed, 184 insertions(+), 12 deletions(-) diff --git a/beacon-chain/node/node.go b/beacon-chain/node/node.go index 057b27926..061112c4f 100644 --- a/beacon-chain/node/node.go +++ b/beacon-chain/node/node.go @@ -581,7 +581,8 @@ func (b *BeaconNode) fetchBuilderService() *builder.Service { func (b *BeaconNode) registerAttestationPool() error { s, err := attestations.NewService(b.ctx, &attestations.Config{ - Pool: b.attestationPool, + Pool: b.attestationPool, + InitialSyncComplete: b.initialSyncComplete, }) if err != nil { return errors.Wrap(err, "could not register atts pool service") diff --git a/beacon-chain/operations/attestations/BUILD.bazel b/beacon-chain/operations/attestations/BUILD.bazel index d3bf2fd32..4f8567513 100644 --- a/beacon-chain/operations/attestations/BUILD.bazel +++ b/beacon-chain/operations/attestations/BUILD.bazel @@ -18,6 +18,7 @@ go_library( deps = [ "//beacon-chain/operations/attestations/kv:go_default_library", "//cache/lru:go_default_library", + "//config/features:go_default_library", "//config/params:go_default_library", "//consensus-types/primitives:go_default_library", "//crypto/hash:go_default_library", diff --git a/beacon-chain/operations/attestations/prepare_forkchoice.go b/beacon-chain/operations/attestations/prepare_forkchoice.go index 67104e5f4..bd19cb071 100644 --- a/beacon-chain/operations/attestations/prepare_forkchoice.go +++ b/beacon-chain/operations/attestations/prepare_forkchoice.go @@ -7,6 +7,8 @@ import ( "time" "github.com/prysmaticlabs/go-bitfield" + "github.com/prysmaticlabs/prysm/v4/config/features" + "github.com/prysmaticlabs/prysm/v4/config/params" "github.com/prysmaticlabs/prysm/v4/crypto/hash" ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" attaggregation "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1/attestation/aggregation/attestations" @@ -14,20 +16,30 @@ import ( "go.opencensus.io/trace" ) -// Prepare attestations for fork choice three times per slot. -var prepareForkChoiceAttsPeriod = slots.DivideSlotBy(3 /* times-per-slot */) - // This prepares fork choice attestations by running batchForkChoiceAtts // every prepareForkChoiceAttsPeriod. func (s *Service) prepareForkChoiceAtts() { - ticker := time.NewTicker(prepareForkChoiceAttsPeriod) - defer ticker.Stop() + intervals := features.Get().AggregateIntervals + slotDuration := time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second + // Adjust intervals for networks with a lower slot duration (Hive, e2e, etc) + for { + if intervals[len(intervals)-1] >= slotDuration { + for i, offset := range intervals { + intervals[i] = offset / 2 + } + } else { + break + } + } + ticker := slots.NewSlotTickerWithIntervals(time.Unix(int64(s.genesisTime), 0), intervals) for { select { - case <-ticker.C: + case <-ticker.C(): + t := time.Now() if err := s.batchForkChoiceAtts(s.ctx); err != nil { log.WithError(err).Error("Could not prepare attestations for fork choice") } + log.WithField("latency", time.Since(t).Milliseconds()).Debug("batched forkchoice attestations") case <-s.ctx.Done(): log.Debug("Context closed, exiting routine") return diff --git a/beacon-chain/operations/attestations/service.go b/beacon-chain/operations/attestations/service.go index 9a74b9f00..d8fcb042c 100644 --- a/beacon-chain/operations/attestations/service.go +++ b/beacon-chain/operations/attestations/service.go @@ -5,6 +5,7 @@ package attestations import ( "context" + "errors" "time" lru "github.com/hashicorp/golang-lru" @@ -26,8 +27,9 @@ type Service struct { // Config options for the service. type Config struct { - Pool Pool - pruneInterval time.Duration + Pool Pool + pruneInterval time.Duration + InitialSyncComplete chan struct{} } // NewService instantiates a new attestation pool service instance that will @@ -51,10 +53,24 @@ func NewService(ctx context.Context, cfg *Config) (*Service, error) { // Start an attestation pool service's main event loop. func (s *Service) Start() { + if err := s.waitForSync(s.cfg.InitialSyncComplete); err != nil { + log.WithError(err).Error("failed to wait for initial sync") + return + } go s.prepareForkChoiceAtts() go s.pruneAttsPool() } +// waitForSync waits until the beacon node is synced to the latest head. +func (s *Service) waitForSync(syncChan chan struct{}) error { + select { + case <-syncChan: + return nil + case <-s.ctx.Done(): + return errors.New("context closed, exiting goroutine") + } +} + // Stop the beacon block attestation pool service's main event loop // and associated goroutines. func (s *Service) Stop() error { diff --git a/config/features/config.go b/config/features/config.go index 907e1b08a..d1bb3461b 100644 --- a/config/features/config.go +++ b/config/features/config.go @@ -72,6 +72,9 @@ type Flags struct { // KeystoreImportDebounceInterval specifies the time duration the validator waits to reload new keys if they have // changed on disk. This feature is for advanced use cases only. KeystoreImportDebounceInterval time.Duration + + // AggregateIntervals specifies the time durations at which we aggregate attestations preparing for forkchoice. + AggregateIntervals []time.Duration } var featureConfig *Flags @@ -218,6 +221,7 @@ func ConfigureBeaconChain(ctx *cli.Context) error { logEnabled(buildBlockParallel) cfg.BuildBlockParallel = true } + cfg.AggregateIntervals = []time.Duration{aggregateFirstInterval.Value, aggregateSecondInterval.Value, aggregateThirdInterval.Value} Init(cfg) return nil } diff --git a/config/features/flags.go b/config/features/flags.go index 4ebe55416..b85293e46 100644 --- a/config/features/flags.go +++ b/config/features/flags.go @@ -50,6 +50,24 @@ var ( Usage: "(Danger): Writes the wallet password to the wallet directory on completing Prysm web onboarding. " + "We recommend against this flag unless you are an advanced user.", } + aggregateFirstInterval = &cli.DurationFlag{ + Name: "aggregate-first-interval", + Usage: "(Advanced): Specifies the first interval in which attestations are aggregated in the slot (typically unnaggregated attestations are aggregated in this interval)", + Value: 7 * time.Second, + Hidden: true, + } + aggregateSecondInterval = &cli.DurationFlag{ + Name: "aggregate-second-interval", + Usage: "(Advanced): Specifies the second interval in which attestations are aggregated in the slot", + Value: 9 * time.Second, + Hidden: true, + } + aggregateThirdInterval = &cli.DurationFlag{ + Name: "aggregate-third-interval", + Usage: "(Advanced): Specifies the third interval in which attestations are aggregated in the slot", + Value: 11 * time.Second, + Hidden: true, + } dynamicKeyReloadDebounceInterval = &cli.DurationFlag{ Name: "dynamic-key-reload-debounce-interval", Usage: "(Advanced): Specifies the time duration the validator waits to reload new keys if they have " + @@ -168,6 +186,9 @@ var BeaconChainFlags = append(deprecatedBeaconFlags, append(deprecatedFlags, []c enableVerboseSigVerification, enableOptionalEngineMethods, prepareAllPayloads, + aggregateFirstInterval, + aggregateSecondInterval, + aggregateThirdInterval, buildBlockParallel, }...)...) diff --git a/time/slots/BUILD.bazel b/time/slots/BUILD.bazel index 4f1b0c78e..266a14962 100644 --- a/time/slots/BUILD.bazel +++ b/time/slots/BUILD.bazel @@ -37,5 +37,6 @@ go_test( "//time:go_default_library", "@com_github_sirupsen_logrus//:go_default_library", "@com_github_sirupsen_logrus//hooks/test:go_default_library", + "@com_github_stretchr_testify//require:go_default_library", ], ) diff --git a/time/slots/slotticker.go b/time/slots/slotticker.go index a96604f4d..52f86c192 100644 --- a/time/slots/slotticker.go +++ b/time/slots/slotticker.go @@ -4,6 +4,7 @@ package slots import ( "time" + "github.com/prysmaticlabs/prysm/v4/config/params" "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" prysmTime "github.com/prysmaticlabs/prysm/v4/time" ) @@ -104,3 +105,65 @@ func (s *SlotTicker) start( } }() } + +// startWithIntervals starts a ticker that emits a tick every slot at the +// prescribed intervals. The caller is responsible to make these intervals increasing and +// less than secondsPerSlot +func (s *SlotTicker) startWithIntervals( + genesisTime time.Time, + until func(time.Time) time.Duration, + after func(time.Duration) <-chan time.Time, + intervals []time.Duration) { + go func() { + slot := Since(genesisTime) + slot++ + interval := 0 + nextTickTime := startFromTime(genesisTime, slot).Add(intervals[0]) + + for { + waitTime := until(nextTickTime) + select { + case <-after(waitTime): + s.c <- slot + interval++ + if interval == len(intervals) { + interval = 0 + slot++ + } + nextTickTime = startFromTime(genesisTime, slot).Add(intervals[interval]) + case <-s.done: + return + } + } + }() +} + +// NewSlotTickerWithIntervals starts and returns a SlotTicker instance that allows +// several offsets of time from genesis, +// Caller is responsible to input the intervals in increasing order and none bigger or equal than +// SecondsPerSlot +func NewSlotTickerWithIntervals(genesisTime time.Time, intervals []time.Duration) *SlotTicker { + if genesisTime.Unix() == 0 { + panic("zero genesis time") + } + if len(intervals) == 0 { + panic("at least one interval has to be entered") + } + slotDuration := time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second + lastOffset := time.Duration(0) + for _, offset := range intervals { + if offset < lastOffset { + panic("invalid decreasing offsets") + } + if offset >= slotDuration { + panic("invalid ticker offset") + } + lastOffset = offset + } + ticker := &SlotTicker{ + c: make(chan primitives.Slot), + done: make(chan struct{}), + } + ticker.startWithIntervals(genesisTime, prysmTime.Until, time.After, intervals) + return ticker +} diff --git a/time/slots/slotticker_test.go b/time/slots/slotticker_test.go index 7b173fc5e..b615f4505 100644 --- a/time/slots/slotticker_test.go +++ b/time/slots/slotticker_test.go @@ -4,7 +4,9 @@ import ( "testing" "time" + "github.com/prysmaticlabs/prysm/v4/config/params" "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" + "github.com/stretchr/testify/require" ) var _ Ticker = (*SlotTicker)(nil) @@ -136,3 +138,49 @@ func TestGetSlotTickerWithOffset_OK(t *testing.T) { } } } + +func TestGetSlotTickerWitIntervals(t *testing.T) { + genesisTime := time.Now() + offset := time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second / 3 + intervals := []time.Duration{offset, 2 * offset} + + intervalTicker := NewSlotTickerWithIntervals(genesisTime, intervals) + normalTicker := NewSlotTicker(genesisTime, params.BeaconConfig().SecondsPerSlot) + + firstTicked := 0 + for { + select { + case <-intervalTicker.C(): + // interval ticks starts in second slot + if firstTicked < 2 { + t.Fatal("Expected other ticker to tick first") + } + return + case <-normalTicker.C(): + if firstTicked > 1 { + t.Fatal("Expected normal ticker to tick first") + } + firstTicked++ + } + } +} + +func TestSlotTickerWithIntervalsInputValidation(t *testing.T) { + var genesisTime time.Time + offset := time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second / 3 + intervals := make([]time.Duration, 0) + panicCall := func() { + NewSlotTickerWithIntervals(genesisTime, intervals) + } + require.Panics(t, panicCall, "zero genesis time") + genesisTime = time.Now() + require.Panics(t, panicCall, "at least one interval has to be entered") + intervals = []time.Duration{2 * offset, offset} + require.Panics(t, panicCall, "invalid decreasing offsets") + intervals = []time.Duration{offset, 4 * offset} + require.Panics(t, panicCall, "invalid ticker offset") + intervals = []time.Duration{4 * offset, offset} + require.Panics(t, panicCall, "invalid ticker offset") + intervals = []time.Duration{offset, 2 * offset} + require.NotPanics(t, panicCall) +} diff --git a/time/slots/slottime.go b/time/slots/slottime.go index 63e482f97..84387e3ef 100644 --- a/time/slots/slottime.go +++ b/time/slots/slottime.go @@ -16,12 +16,17 @@ import ( // incoming objects. (24 mins with mainnet spec) const MaxSlotBuffer = uint64(1 << 7) +// startFromTime returns the slot start in terms of genesis time.Time +func startFromTime(genesis time.Time, slot primitives.Slot) time.Time { + duration := time.Second * time.Duration(slot.Mul(params.BeaconConfig().SecondsPerSlot)) + return genesis.Add(duration) // lint:ignore uintcast -- Genesis timestamp will not exceed int64 in your lifetime. +} + // StartTime returns the start time in terms of its unix epoch // value. func StartTime(genesis uint64, slot primitives.Slot) time.Time { - duration := time.Second * time.Duration(slot.Mul(params.BeaconConfig().SecondsPerSlot)) - startTime := time.Unix(int64(genesis), 0).Add(duration) // lint:ignore uintcast -- Genesis timestamp will not exceed int64 in your lifetime. - return startTime + genesisTime := time.Unix(int64(genesis), 0) // lint:ignore uintcast -- Genesis timestamp will not exceed int64 in your lifetime. + return startFromTime(genesisTime, slot) } // SinceGenesis returns the number of slots since