diff --git a/beacon-chain/blockchain/receive_attestation.go b/beacon-chain/blockchain/receive_attestation.go index 606549b60..13f0dc3c9 100644 --- a/beacon-chain/blockchain/receive_attestation.go +++ b/beacon-chain/blockchain/receive_attestation.go @@ -92,22 +92,24 @@ func (s *Service) spawnProcessAttestationsRoutine() { return } - st := slots.NewSlotTicker(s.genesisTime, params.BeaconConfig().SecondsPerSlot) - pat := slots.NewSlotTickerWithOffset(s.genesisTime, -reorgLateBlockCountAttestations, params.BeaconConfig().SecondsPerSlot) + reorgInterval := time.Second*time.Duration(params.BeaconConfig().SecondsPerSlot) - reorgLateBlockCountAttestations + ticker := slots.NewSlotTickerWithIntervals(s.genesisTime, []time.Duration{0, reorgInterval}) for { select { case <-s.ctx.Done(): return - case <-pat.C(): - s.UpdateHead(s.ctx, s.CurrentSlot()+1) - case <-st.C(): - s.cfg.ForkChoiceStore.Lock() - if err := s.cfg.ForkChoiceStore.NewSlot(s.ctx, s.CurrentSlot()); err != nil { - log.WithError(err).Error("could not process new slot") - } - s.cfg.ForkChoiceStore.Unlock() + case slotInterval := <-ticker.C(): + if slotInterval.Interval > 0 { + s.UpdateHead(s.ctx, slotInterval.Slot+1) + } else { + s.cfg.ForkChoiceStore.Lock() + if err := s.cfg.ForkChoiceStore.NewSlot(s.ctx, slotInterval.Slot); err != nil { + log.WithError(err).Error("could not process new slot") + } + s.cfg.ForkChoiceStore.Unlock() - s.UpdateHead(s.ctx, s.CurrentSlot()) + s.UpdateHead(s.ctx, slotInterval.Slot) + } } } }() diff --git a/beacon-chain/operations/attestations/prepare_forkchoice.go b/beacon-chain/operations/attestations/prepare_forkchoice.go index 9aca038ec..4af542eb7 100644 --- a/beacon-chain/operations/attestations/prepare_forkchoice.go +++ b/beacon-chain/operations/attestations/prepare_forkchoice.go @@ -34,14 +34,17 @@ func (s *Service) prepareForkChoiceAtts() { ticker := slots.NewSlotTickerWithIntervals(time.Unix(int64(s.genesisTime), 0), intervals[:]) for { select { - case <-ticker.C(): + case slotInterval := <-ticker.C(): t := time.Now() if err := s.batchForkChoiceAtts(s.ctx); err != nil { log.WithError(err).Error("Could not prepare attestations for fork choice") } - if slots.TimeIntoSlot(s.genesisTime) < intervals[1] { - batchForkChoiceAttsT1.Observe(float64(time.Since(t).Milliseconds())) - } else if slots.TimeIntoSlot(s.genesisTime) < intervals[2] { + switch slotInterval.Interval { + case 0: + duration := time.Since(t) + log.WithField("Duration", duration).Debug("aggregated unaggregated attestations") + batchForkChoiceAttsT1.Observe(float64(duration.Milliseconds())) + case 1: batchForkChoiceAttsT2.Observe(float64(time.Since(t).Milliseconds())) } case <-s.ctx.Done(): diff --git a/time/slots/slotticker.go b/time/slots/slotticker.go index 52f86c192..88b6dc55a 100644 --- a/time/slots/slotticker.go +++ b/time/slots/slotticker.go @@ -16,6 +16,20 @@ type Ticker interface { Done() } +// SlotInterval is a wrapper that contains a slot and the interval index that +// triggered the ticker +type SlotInterval struct { + Slot primitives.Slot + Interval int +} + +// The IntervalTicker is similar to the Ticker interface but +// exposes also the interval along with the slot number +type IntervalTicker interface { + C() <-chan SlotInterval + Done() +} + // SlotTicker is a special ticker for the beacon chain block. // The channel emits over the slot interval, and ensures that // the ticks are in line with the genesis time. This means that @@ -27,12 +41,25 @@ type SlotTicker struct { done chan struct{} } +// SlotIntervalTicker is similar to a slot ticker but it returns also +// the index of the interval that triggered the event +type SlotIntervalTicker struct { + c chan SlotInterval + done chan struct{} +} + // C returns the ticker channel. Call Cancel afterwards to ensure // that the goroutine exits cleanly. func (s *SlotTicker) C() <-chan primitives.Slot { return s.c } +// C returns the ticker channel. Call Cancel afterwards to ensure +// that the goroutine exits cleanly. +func (s *SlotIntervalTicker) C() <-chan SlotInterval { + return s.c +} + // Done should be called to clean up the ticker. func (s *SlotTicker) Done() { go func() { @@ -40,6 +67,13 @@ func (s *SlotTicker) Done() { }() } +// Done should be called to clean up the ticker. +func (s *SlotIntervalTicker) Done() { + go func() { + s.done <- struct{}{} + }() +} + // NewSlotTicker starts and returns a new SlotTicker instance. func NewSlotTicker(genesisTime time.Time, secondsPerSlot uint64) *SlotTicker { if genesisTime.IsZero() { @@ -109,7 +143,7 @@ func (s *SlotTicker) start( // startWithIntervals starts a ticker that emits a tick every slot at the // prescribed intervals. The caller is responsible to make these intervals increasing and // less than secondsPerSlot -func (s *SlotTicker) startWithIntervals( +func (s *SlotIntervalTicker) startWithIntervals( genesisTime time.Time, until func(time.Time) time.Duration, after func(time.Duration) <-chan time.Time, @@ -124,7 +158,7 @@ func (s *SlotTicker) startWithIntervals( waitTime := until(nextTickTime) select { case <-after(waitTime): - s.c <- slot + s.c <- SlotInterval{Slot: slot, Interval: interval} interval++ if interval == len(intervals) { interval = 0 @@ -142,7 +176,7 @@ func (s *SlotTicker) startWithIntervals( // several offsets of time from genesis, // Caller is responsible to input the intervals in increasing order and none bigger or equal than // SecondsPerSlot -func NewSlotTickerWithIntervals(genesisTime time.Time, intervals []time.Duration) *SlotTicker { +func NewSlotTickerWithIntervals(genesisTime time.Time, intervals []time.Duration) *SlotIntervalTicker { if genesisTime.Unix() == 0 { panic("zero genesis time") } @@ -160,8 +194,8 @@ func NewSlotTickerWithIntervals(genesisTime time.Time, intervals []time.Duration } lastOffset = offset } - ticker := &SlotTicker{ - c: make(chan primitives.Slot), + ticker := &SlotIntervalTicker{ + c: make(chan SlotInterval), done: make(chan struct{}), } ticker.startWithIntervals(genesisTime, prysmTime.Until, time.After, intervals)