mirror of
https://gitlab.com/pulsechaincom/prysm-pulse.git
synced 2025-01-12 04:30:04 +00:00
d5ddd012bc
* Enforce error handling and checking type assertions * Reference issue #5404 in the TODO message * doc description * Merge branch 'master' into errcheck * fix tests and address @nisdas feedbacK * gaz * fix docker image
410 lines
12 KiB
Go
410 lines
12 KiB
Go
package initialsync
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"time"
|
|
|
|
eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
|
|
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
|
|
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
|
|
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
|
|
"github.com/prysmaticlabs/prysm/shared/params"
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
const (
|
|
// queueStopCallTimeout is time allowed for queue to release resources when quitting.
|
|
queueStopCallTimeout = 1 * time.Second
|
|
// pollingInterval defines how often state machine needs to check for new events.
|
|
pollingInterval = 200 * time.Millisecond
|
|
// staleEpochTimeout is an period after which epoch's state is considered stale.
|
|
staleEpochTimeout = 5 * pollingInterval
|
|
// lookaheadEpochs is a default limit on how many forward epochs are loaded into queue.
|
|
lookaheadEpochs = 4
|
|
)
|
|
|
|
var (
|
|
errQueueCtxIsDone = errors.New("queue's context is done, reinitialize")
|
|
errQueueTakesTooLongToStop = errors.New("queue takes too long to stop")
|
|
errNoEpochState = errors.New("epoch state not found")
|
|
errInputNotFetchRequestParams = errors.New("input data is not type *fetchRequestParams")
|
|
)
|
|
|
|
// blocksProvider exposes enough methods for queue to fetch incoming blocks.
|
|
type blocksProvider interface {
|
|
requestResponses() <-chan *fetchRequestResponse
|
|
scheduleRequest(ctx context.Context, start, count uint64) error
|
|
nonSkippedSlotAfter(ctx context.Context, slot uint64) (uint64, error)
|
|
bestFinalizedSlot() uint64
|
|
start() error
|
|
stop()
|
|
}
|
|
|
|
// blocksQueueConfig is a config to setup block queue service.
|
|
type blocksQueueConfig struct {
|
|
blocksFetcher blocksProvider
|
|
headFetcher blockchain.HeadFetcher
|
|
startSlot uint64
|
|
highestExpectedSlot uint64
|
|
p2p p2p.P2P
|
|
}
|
|
|
|
// blocksQueue is a priority queue that serves as a intermediary between block fetchers (producers)
|
|
// and block processing goroutine (consumer). Consumer can rely on order of incoming blocks.
|
|
type blocksQueue struct {
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
highestExpectedSlot uint64
|
|
state *stateMachine
|
|
blocksFetcher blocksProvider
|
|
headFetcher blockchain.HeadFetcher
|
|
fetchedBlocks chan *eth.SignedBeaconBlock // output channel for ready blocks
|
|
quit chan struct{} // termination notifier
|
|
}
|
|
|
|
// newBlocksQueue creates initialized priority queue.
|
|
func newBlocksQueue(ctx context.Context, cfg *blocksQueueConfig) *blocksQueue {
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
|
|
blocksFetcher := cfg.blocksFetcher
|
|
if blocksFetcher == nil {
|
|
blocksFetcher = newBlocksFetcher(ctx, &blocksFetcherConfig{
|
|
headFetcher: cfg.headFetcher,
|
|
p2p: cfg.p2p,
|
|
})
|
|
}
|
|
highestExpectedSlot := cfg.highestExpectedSlot
|
|
if highestExpectedSlot <= cfg.startSlot {
|
|
highestExpectedSlot = blocksFetcher.bestFinalizedSlot()
|
|
}
|
|
|
|
queue := &blocksQueue{
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
highestExpectedSlot: highestExpectedSlot,
|
|
blocksFetcher: blocksFetcher,
|
|
headFetcher: cfg.headFetcher,
|
|
fetchedBlocks: make(chan *eth.SignedBeaconBlock, allowedBlocksPerSecond),
|
|
quit: make(chan struct{}),
|
|
}
|
|
|
|
// Configure state machine.
|
|
queue.state = newStateMachine()
|
|
queue.state.addHandler(stateNew, eventSchedule, queue.onScheduleEvent(ctx))
|
|
queue.state.addHandler(stateScheduled, eventDataReceived, queue.onDataReceivedEvent(ctx))
|
|
queue.state.addHandler(stateDataParsed, eventReadyToSend, queue.onReadyToSendEvent(ctx))
|
|
queue.state.addHandler(stateSkipped, eventExtendWindow, queue.onExtendWindowEvent(ctx))
|
|
queue.state.addHandler(stateSent, eventCheckStale, queue.onCheckStaleEvent(ctx))
|
|
|
|
return queue
|
|
}
|
|
|
|
// start boots up the queue processing.
|
|
func (q *blocksQueue) start() error {
|
|
select {
|
|
case <-q.ctx.Done():
|
|
return errQueueCtxIsDone
|
|
default:
|
|
go q.loop()
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// stop terminates all queue operations.
|
|
func (q *blocksQueue) stop() error {
|
|
q.cancel()
|
|
select {
|
|
case <-q.quit:
|
|
return nil
|
|
case <-time.After(queueStopCallTimeout):
|
|
return errQueueTakesTooLongToStop
|
|
}
|
|
}
|
|
|
|
// loop is a main queue loop.
|
|
func (q *blocksQueue) loop() {
|
|
defer close(q.quit)
|
|
|
|
defer func() {
|
|
q.blocksFetcher.stop()
|
|
close(q.fetchedBlocks)
|
|
}()
|
|
|
|
if err := q.blocksFetcher.start(); err != nil {
|
|
log.WithError(err).Debug("Can not start blocks provider")
|
|
}
|
|
|
|
startEpoch := helpers.SlotToEpoch(q.headFetcher.HeadSlot())
|
|
slotsPerEpoch := params.BeaconConfig().SlotsPerEpoch
|
|
|
|
// Define epoch states as finite state machines.
|
|
for i := startEpoch; i < startEpoch+lookaheadEpochs; i++ {
|
|
q.state.addEpochState(i)
|
|
}
|
|
|
|
ticker := time.NewTicker(pollingInterval)
|
|
tickerEvents := []eventID{eventSchedule, eventReadyToSend, eventCheckStale, eventExtendWindow}
|
|
for {
|
|
|
|
if q.headFetcher.HeadSlot() >= q.highestExpectedSlot {
|
|
// By the time initial sync is complete, highest slot may increase, re-check.
|
|
if q.highestExpectedSlot < q.blocksFetcher.bestFinalizedSlot() {
|
|
q.highestExpectedSlot = q.blocksFetcher.bestFinalizedSlot()
|
|
continue
|
|
}
|
|
log.Debug("Highest expected slot reached")
|
|
q.cancel()
|
|
}
|
|
|
|
select {
|
|
case <-ticker.C:
|
|
for _, state := range q.state.epochs {
|
|
data := &fetchRequestParams{
|
|
start: helpers.StartSlot(state.epoch),
|
|
count: slotsPerEpoch,
|
|
}
|
|
|
|
// Trigger events on each epoch's state machine.
|
|
for _, event := range tickerEvents {
|
|
if err := q.state.trigger(event, state.epoch, data); err != nil {
|
|
log.WithFields(logrus.Fields{
|
|
"event": event,
|
|
"epoch": state.epoch,
|
|
"error": err.Error(),
|
|
}).Debug("Can not trigger event")
|
|
}
|
|
}
|
|
|
|
// Do garbage collection, and advance sliding window forward.
|
|
if q.headFetcher.HeadSlot() >= helpers.StartSlot(state.epoch+1) {
|
|
highestEpoch, err := q.state.highestEpoch()
|
|
if err != nil {
|
|
log.WithError(err).Debug("Cannot obtain highest epoch state number")
|
|
continue
|
|
}
|
|
if err := q.state.removeEpochState(state.epoch); err != nil {
|
|
log.WithError(err).Debug("Can not remove epoch state")
|
|
}
|
|
if len(q.state.epochs) < lookaheadEpochs {
|
|
q.state.addEpochState(highestEpoch + 1)
|
|
}
|
|
}
|
|
}
|
|
case response, ok := <-q.blocksFetcher.requestResponses():
|
|
if !ok {
|
|
log.Debug("Fetcher closed output channel")
|
|
q.cancel()
|
|
return
|
|
}
|
|
// Update state of an epoch for which data is received.
|
|
epoch := helpers.SlotToEpoch(response.start)
|
|
if ind, ok := q.state.findEpochState(epoch); ok {
|
|
state := q.state.epochs[ind]
|
|
if err := q.state.trigger(eventDataReceived, state.epoch, response); err != nil {
|
|
log.WithFields(logrus.Fields{
|
|
"event": eventDataReceived,
|
|
"epoch": state.epoch,
|
|
"error": err.Error(),
|
|
}).Debug("Can not trigger event")
|
|
state.setState(stateNew)
|
|
continue
|
|
}
|
|
}
|
|
case <-q.ctx.Done():
|
|
log.Debug("Context closed, exiting goroutine (blocks queue)")
|
|
ticker.Stop()
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// onScheduleEvent is an event called on newly arrived epochs. Transforms state to scheduled.
|
|
func (q *blocksQueue) onScheduleEvent(ctx context.Context) eventHandlerFn {
|
|
return func(es *epochState, in interface{}) (stateID, error) {
|
|
data, ok := in.(*fetchRequestParams)
|
|
if !ok {
|
|
return 0, errInputNotFetchRequestParams
|
|
}
|
|
if err := q.blocksFetcher.scheduleRequest(ctx, data.start, data.count); err != nil {
|
|
return es.state, err
|
|
}
|
|
return stateScheduled, nil
|
|
}
|
|
}
|
|
|
|
// onDataReceivedEvent is an event called when data is received from fetcher.
|
|
func (q *blocksQueue) onDataReceivedEvent(ctx context.Context) eventHandlerFn {
|
|
return func(es *epochState, in interface{}) (stateID, error) {
|
|
if ctx.Err() != nil {
|
|
return es.state, ctx.Err()
|
|
}
|
|
|
|
response, ok := in.(*fetchRequestResponse)
|
|
if !ok {
|
|
return 0, errInputNotFetchRequestParams
|
|
}
|
|
epoch := helpers.SlotToEpoch(response.start)
|
|
if response.err != nil {
|
|
// Current window is already too big, re-request previous epochs.
|
|
if response.err == errSlotIsTooHigh {
|
|
for _, state := range q.state.epochs {
|
|
isSkipped := state.state == stateSkipped || state.state == stateSkippedExt
|
|
if state.epoch < epoch && isSkipped {
|
|
state.setState(stateNew)
|
|
}
|
|
}
|
|
}
|
|
return es.state, response.err
|
|
}
|
|
|
|
ind, ok := q.state.findEpochState(epoch)
|
|
if !ok {
|
|
return es.state, errNoEpochState
|
|
}
|
|
q.state.epochs[ind].blocks = response.blocks
|
|
return stateDataParsed, nil
|
|
}
|
|
}
|
|
|
|
// onReadyToSendEvent is an event called to allow epochs with available blocks to send them downstream.
|
|
func (q *blocksQueue) onReadyToSendEvent(ctx context.Context) eventHandlerFn {
|
|
return func(es *epochState, in interface{}) (stateID, error) {
|
|
if ctx.Err() != nil {
|
|
return es.state, ctx.Err()
|
|
}
|
|
|
|
data, ok := in.(*fetchRequestParams)
|
|
if !ok {
|
|
return 0, errInputNotFetchRequestParams
|
|
}
|
|
epoch := helpers.SlotToEpoch(data.start)
|
|
ind, ok := q.state.findEpochState(epoch)
|
|
if !ok {
|
|
return es.state, errNoEpochState
|
|
}
|
|
if len(q.state.epochs[ind].blocks) == 0 {
|
|
return stateSkipped, nil
|
|
}
|
|
|
|
send := func() (stateID, error) {
|
|
for _, block := range q.state.epochs[ind].blocks {
|
|
select {
|
|
case <-ctx.Done():
|
|
return es.state, ctx.Err()
|
|
case q.fetchedBlocks <- block:
|
|
}
|
|
}
|
|
return stateSent, nil
|
|
}
|
|
|
|
// Make sure that we send epochs in a correct order.
|
|
if q.state.isLowestEpochState(epoch) {
|
|
return send()
|
|
}
|
|
|
|
// Make sure that previous epoch is already processed.
|
|
for _, state := range q.state.epochs {
|
|
// Review only previous slots.
|
|
if state.epoch < epoch {
|
|
switch state.state {
|
|
case stateNew, stateScheduled, stateDataParsed:
|
|
return es.state, nil
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
|
|
return send()
|
|
}
|
|
}
|
|
|
|
// onExtendWindowEvent is and event allowing handlers to extend sliding window,
|
|
// in case where progress is not possible otherwise.
|
|
func (q *blocksQueue) onExtendWindowEvent(ctx context.Context) eventHandlerFn {
|
|
return func(es *epochState, in interface{}) (stateID, error) {
|
|
if ctx.Err() != nil {
|
|
return es.state, ctx.Err()
|
|
}
|
|
|
|
data, ok := in.(*fetchRequestParams)
|
|
if !ok {
|
|
return 0, errInputNotFetchRequestParams
|
|
}
|
|
epoch := helpers.SlotToEpoch(data.start)
|
|
if _, ok := q.state.findEpochState(epoch); !ok {
|
|
return es.state, errNoEpochState
|
|
}
|
|
|
|
// Only the highest epoch with skipped state can trigger extension.
|
|
highestEpoch, err := q.state.highestEpoch()
|
|
if err != nil {
|
|
return es.state, err
|
|
}
|
|
if highestEpoch != epoch {
|
|
return es.state, nil
|
|
}
|
|
|
|
// Check if window is expanded recently, if so, time to reset and re-request the same blocks.
|
|
resetWindow := false
|
|
skippedEpochs := 0
|
|
for _, state := range q.state.epochs {
|
|
if state.state == stateSkippedExt {
|
|
resetWindow = true
|
|
break
|
|
}
|
|
if state.state == stateSkipped || state.state == stateSkippedExt {
|
|
skippedEpochs++
|
|
}
|
|
}
|
|
// Reset if everything is skipped or extension took place during previous iteration.
|
|
if resetWindow || (skippedEpochs == len(q.state.epochs)) {
|
|
for _, state := range q.state.epochs {
|
|
state.setState(stateNew)
|
|
}
|
|
// Fill the gaps between epochs.
|
|
start, err := q.state.lowestEpoch()
|
|
if err != nil {
|
|
return es.state, err
|
|
}
|
|
end, err := q.state.highestEpoch()
|
|
if err != nil {
|
|
return es.state, err
|
|
}
|
|
for i := start; i < end; i++ {
|
|
if _, ok := q.state.findEpochState(i); !ok {
|
|
q.state.addEpochState(i)
|
|
}
|
|
}
|
|
return stateNew, nil
|
|
}
|
|
|
|
// Extend sliding window.
|
|
nonSkippedSlot, err := q.blocksFetcher.nonSkippedSlotAfter(ctx, helpers.StartSlot(highestEpoch+1))
|
|
if err != nil {
|
|
return es.state, err
|
|
}
|
|
if nonSkippedSlot > q.highestExpectedSlot {
|
|
return es.state, nil
|
|
}
|
|
q.state.addEpochState(helpers.SlotToEpoch(nonSkippedSlot))
|
|
return stateSkippedExt, nil
|
|
}
|
|
}
|
|
|
|
// onCheckStaleEvent is an event that allows to mark stale epochs,
|
|
// so that they can be re-processed.
|
|
func (q *blocksQueue) onCheckStaleEvent(ctx context.Context) eventHandlerFn {
|
|
return func(es *epochState, in interface{}) (stateID, error) {
|
|
if ctx.Err() != nil {
|
|
return es.state, ctx.Err()
|
|
}
|
|
|
|
if time.Since(es.updated) > staleEpochTimeout {
|
|
return stateSkipped, nil
|
|
}
|
|
|
|
return es.state, nil
|
|
}
|
|
}
|