refactor slot tickers with intervals (#12440)

* refactor slot tickers with intervals

* GoDoc

---------

Co-authored-by: Radosław Kapka <rkapka@wp.pl>
This commit is contained in:
Potuz 2023-08-03 20:55:16 -03:00 committed by GitHub
parent 3a09405bb7
commit dd14d5cef0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 59 additions and 20 deletions

View File

@ -92,22 +92,24 @@ func (s *Service) spawnProcessAttestationsRoutine() {
return return
} }
st := slots.NewSlotTicker(s.genesisTime, params.BeaconConfig().SecondsPerSlot) reorgInterval := time.Second*time.Duration(params.BeaconConfig().SecondsPerSlot) - reorgLateBlockCountAttestations
pat := slots.NewSlotTickerWithOffset(s.genesisTime, -reorgLateBlockCountAttestations, params.BeaconConfig().SecondsPerSlot) ticker := slots.NewSlotTickerWithIntervals(s.genesisTime, []time.Duration{0, reorgInterval})
for { for {
select { select {
case <-s.ctx.Done(): case <-s.ctx.Done():
return return
case <-pat.C(): case slotInterval := <-ticker.C():
s.UpdateHead(s.ctx, s.CurrentSlot()+1) if slotInterval.Interval > 0 {
case <-st.C(): s.UpdateHead(s.ctx, slotInterval.Slot+1)
s.cfg.ForkChoiceStore.Lock() } else {
if err := s.cfg.ForkChoiceStore.NewSlot(s.ctx, s.CurrentSlot()); err != nil { s.cfg.ForkChoiceStore.Lock()
log.WithError(err).Error("could not process new slot") if err := s.cfg.ForkChoiceStore.NewSlot(s.ctx, slotInterval.Slot); err != nil {
} log.WithError(err).Error("could not process new slot")
s.cfg.ForkChoiceStore.Unlock() }
s.cfg.ForkChoiceStore.Unlock()
s.UpdateHead(s.ctx, s.CurrentSlot()) s.UpdateHead(s.ctx, slotInterval.Slot)
}
} }
} }
}() }()

View File

@ -34,14 +34,17 @@ func (s *Service) prepareForkChoiceAtts() {
ticker := slots.NewSlotTickerWithIntervals(time.Unix(int64(s.genesisTime), 0), intervals[:]) ticker := slots.NewSlotTickerWithIntervals(time.Unix(int64(s.genesisTime), 0), intervals[:])
for { for {
select { select {
case <-ticker.C(): case slotInterval := <-ticker.C():
t := time.Now() t := time.Now()
if err := s.batchForkChoiceAtts(s.ctx); err != nil { if err := s.batchForkChoiceAtts(s.ctx); err != nil {
log.WithError(err).Error("Could not prepare attestations for fork choice") log.WithError(err).Error("Could not prepare attestations for fork choice")
} }
if slots.TimeIntoSlot(s.genesisTime) < intervals[1] { switch slotInterval.Interval {
batchForkChoiceAttsT1.Observe(float64(time.Since(t).Milliseconds())) case 0:
} else if slots.TimeIntoSlot(s.genesisTime) < intervals[2] { duration := time.Since(t)
log.WithField("Duration", duration).Debug("aggregated unaggregated attestations")
batchForkChoiceAttsT1.Observe(float64(duration.Milliseconds()))
case 1:
batchForkChoiceAttsT2.Observe(float64(time.Since(t).Milliseconds())) batchForkChoiceAttsT2.Observe(float64(time.Since(t).Milliseconds()))
} }
case <-s.ctx.Done(): case <-s.ctx.Done():

View File

@ -16,6 +16,20 @@ type Ticker interface {
Done() Done()
} }
// SlotInterval is a wrapper that contains a slot and the interval index that
// triggered the ticker
type SlotInterval struct {
Slot primitives.Slot
Interval int
}
// The IntervalTicker is similar to the Ticker interface but
// exposes also the interval along with the slot number
type IntervalTicker interface {
C() <-chan SlotInterval
Done()
}
// SlotTicker is a special ticker for the beacon chain block. // SlotTicker is a special ticker for the beacon chain block.
// The channel emits over the slot interval, and ensures that // The channel emits over the slot interval, and ensures that
// the ticks are in line with the genesis time. This means that // the ticks are in line with the genesis time. This means that
@ -27,12 +41,25 @@ type SlotTicker struct {
done chan struct{} done chan struct{}
} }
// SlotIntervalTicker is similar to a slot ticker but it returns also
// the index of the interval that triggered the event
type SlotIntervalTicker struct {
c chan SlotInterval
done chan struct{}
}
// C returns the ticker channel. Call Cancel afterwards to ensure // C returns the ticker channel. Call Cancel afterwards to ensure
// that the goroutine exits cleanly. // that the goroutine exits cleanly.
func (s *SlotTicker) C() <-chan primitives.Slot { func (s *SlotTicker) C() <-chan primitives.Slot {
return s.c return s.c
} }
// C returns the ticker channel. Call Cancel afterwards to ensure
// that the goroutine exits cleanly.
func (s *SlotIntervalTicker) C() <-chan SlotInterval {
return s.c
}
// Done should be called to clean up the ticker. // Done should be called to clean up the ticker.
func (s *SlotTicker) Done() { func (s *SlotTicker) Done() {
go func() { go func() {
@ -40,6 +67,13 @@ func (s *SlotTicker) Done() {
}() }()
} }
// Done should be called to clean up the ticker.
func (s *SlotIntervalTicker) Done() {
go func() {
s.done <- struct{}{}
}()
}
// NewSlotTicker starts and returns a new SlotTicker instance. // NewSlotTicker starts and returns a new SlotTicker instance.
func NewSlotTicker(genesisTime time.Time, secondsPerSlot uint64) *SlotTicker { func NewSlotTicker(genesisTime time.Time, secondsPerSlot uint64) *SlotTicker {
if genesisTime.IsZero() { if genesisTime.IsZero() {
@ -109,7 +143,7 @@ func (s *SlotTicker) start(
// startWithIntervals starts a ticker that emits a tick every slot at the // startWithIntervals starts a ticker that emits a tick every slot at the
// prescribed intervals. The caller is responsible to make these intervals increasing and // prescribed intervals. The caller is responsible to make these intervals increasing and
// less than secondsPerSlot // less than secondsPerSlot
func (s *SlotTicker) startWithIntervals( func (s *SlotIntervalTicker) startWithIntervals(
genesisTime time.Time, genesisTime time.Time,
until func(time.Time) time.Duration, until func(time.Time) time.Duration,
after func(time.Duration) <-chan time.Time, after func(time.Duration) <-chan time.Time,
@ -124,7 +158,7 @@ func (s *SlotTicker) startWithIntervals(
waitTime := until(nextTickTime) waitTime := until(nextTickTime)
select { select {
case <-after(waitTime): case <-after(waitTime):
s.c <- slot s.c <- SlotInterval{Slot: slot, Interval: interval}
interval++ interval++
if interval == len(intervals) { if interval == len(intervals) {
interval = 0 interval = 0
@ -142,7 +176,7 @@ func (s *SlotTicker) startWithIntervals(
// several offsets of time from genesis, // several offsets of time from genesis,
// Caller is responsible to input the intervals in increasing order and none bigger or equal than // Caller is responsible to input the intervals in increasing order and none bigger or equal than
// SecondsPerSlot // SecondsPerSlot
func NewSlotTickerWithIntervals(genesisTime time.Time, intervals []time.Duration) *SlotTicker { func NewSlotTickerWithIntervals(genesisTime time.Time, intervals []time.Duration) *SlotIntervalTicker {
if genesisTime.Unix() == 0 { if genesisTime.Unix() == 0 {
panic("zero genesis time") panic("zero genesis time")
} }
@ -160,8 +194,8 @@ func NewSlotTickerWithIntervals(genesisTime time.Time, intervals []time.Duration
} }
lastOffset = offset lastOffset = offset
} }
ticker := &SlotTicker{ ticker := &SlotIntervalTicker{
c: make(chan primitives.Slot), c: make(chan SlotInterval),
done: make(chan struct{}), done: make(chan struct{}),
} }
ticker.startWithIntervals(genesisTime, prysmTime.Until, time.After, intervals) ticker.startWithIntervals(genesisTime, prysmTime.Until, time.After, intervals)