diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher.go b/beacon-chain/sync/initial-sync/blocks_fetcher.go index 923a015cc..03c2afb35 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher.go @@ -507,3 +507,10 @@ func (f *blocksFetcher) nonSkippedSlotAfter(ctx context.Context, slot uint64) (u return slot, nil } + +// bestFinalizedSlot returns the highest finalized slot of the majority of connected peers. +func (f *blocksFetcher) bestFinalizedSlot() uint64 { + headEpoch := helpers.SlotToEpoch(f.headFetcher.HeadSlot()) + _, finalizedEpoch, _ := f.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, headEpoch) + return helpers.StartSlot(finalizedEpoch) +} diff --git a/beacon-chain/sync/initial-sync/blocks_queue.go b/beacon-chain/sync/initial-sync/blocks_queue.go index 8e1e38b4c..ffcd28032 100644 --- a/beacon-chain/sync/initial-sync/blocks_queue.go +++ b/beacon-chain/sync/initial-sync/blocks_queue.go @@ -9,8 +9,8 @@ import ( "github.com/prysmaticlabs/prysm/beacon-chain/blockchain" "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" "github.com/prysmaticlabs/prysm/beacon-chain/p2p" - "github.com/prysmaticlabs/prysm/shared/mathutil" "github.com/prysmaticlabs/prysm/shared/params" + "github.com/sirupsen/logrus" ) const ( @@ -35,6 +35,7 @@ type blocksProvider interface { requestResponses() <-chan *fetchRequestResponse scheduleRequest(ctx context.Context, start, count uint64) error nonSkippedSlotAfter(ctx context.Context, slot uint64) (uint64, error) + bestFinalizedSlot() uint64 start() error stop() } @@ -72,11 +73,15 @@ func newBlocksQueue(ctx context.Context, cfg *blocksQueueConfig) *blocksQueue { p2p: cfg.p2p, }) } + highestExpectedSlot := cfg.highestExpectedSlot + if highestExpectedSlot <= cfg.startSlot { + highestExpectedSlot = blocksFetcher.bestFinalizedSlot() + } queue := &blocksQueue{ ctx: ctx, cancel: cancel, - highestExpectedSlot: cfg.highestExpectedSlot, + highestExpectedSlot: highestExpectedSlot, blocksFetcher: blocksFetcher, headFetcher: cfg.headFetcher, fetchedBlocks: make(chan *eth.SignedBeaconBlock, allowedBlocksPerSecond), @@ -140,7 +145,13 @@ func (q *blocksQueue) loop() { ticker := time.NewTicker(pollingInterval) tickerEvents := []eventID{eventSchedule, eventReadyToSend, eventCheckStale, eventExtendWindow} for { + if q.headFetcher.HeadSlot() >= q.highestExpectedSlot { + // By the time initial sync is complete, highest slot may increase, re-check. + if q.highestExpectedSlot < q.blocksFetcher.bestFinalizedSlot() { + q.highestExpectedSlot = q.blocksFetcher.bestFinalizedSlot() + continue + } log.Debug("Highest expected slot reached") q.cancel() } @@ -156,7 +167,11 @@ func (q *blocksQueue) loop() { // Trigger events on each epoch's state machine. for _, event := range tickerEvents { if err := q.state.trigger(event, state.epoch, data); err != nil { - log.WithError(err).Debug("Can not trigger event") + log.WithFields(logrus.Fields{ + "event": event, + "epoch": state.epoch, + "error": err.Error(), + }).Debug("Can not trigger event") } } @@ -186,7 +201,11 @@ func (q *blocksQueue) loop() { if ind, ok := q.state.findEpochState(epoch); ok { state := q.state.epochs[ind] if err := q.state.trigger(eventDataReceived, state.epoch, response); err != nil { - log.WithError(err).Debug("Can not trigger event") + log.WithFields(logrus.Fields{ + "event": eventDataReceived, + "epoch": state.epoch, + "error": err.Error(), + }).Debug("Can not trigger event") state.setState(stateNew) continue } @@ -203,13 +222,7 @@ func (q *blocksQueue) loop() { func (q *blocksQueue) onScheduleEvent(ctx context.Context) eventHandlerFn { return func(es *epochState, in interface{}) (stateID, error) { data := in.(*fetchRequestParams) - start := data.start - count := mathutil.Min(data.count, q.highestExpectedSlot-start+1) - if count <= 0 { - return es.state, errSlotIsTooHigh - } - - if err := q.blocksFetcher.scheduleRequest(ctx, start, count); err != nil { + if err := q.blocksFetcher.scheduleRequest(ctx, data.start, data.count); err != nil { return es.state, err } return stateScheduled, nil diff --git a/beacon-chain/sync/initial-sync/blocks_queue_test.go b/beacon-chain/sync/initial-sync/blocks_queue_test.go index 72793c75c..0dfb9cec9 100644 --- a/beacon-chain/sync/initial-sync/blocks_queue_test.go +++ b/beacon-chain/sync/initial-sync/blocks_queue_test.go @@ -157,8 +157,8 @@ func TestBlocksQueueLoop(t *testing.T) { }{ { name: "Single peer with all blocks", - highestExpectedSlot: 251, - expectedBlockSlots: makeSequence(1, 251), + highestExpectedSlot: 251, // will be auto-fixed to 256 (to 8th epoch), by queue + expectedBlockSlots: makeSequence(1, 256), peers: []*peerData{ { blocks: makeSequence(1, 320), @@ -169,8 +169,8 @@ func TestBlocksQueueLoop(t *testing.T) { }, { name: "Multiple peers with all blocks", - highestExpectedSlot: 251, - expectedBlockSlots: makeSequence(1, 251), + highestExpectedSlot: 256, + expectedBlockSlots: makeSequence(1, 256), peers: []*peerData{ { blocks: makeSequence(1, 320), @@ -224,7 +224,7 @@ func TestBlocksQueueLoop(t *testing.T) { { name: "Multiple peers with failures", highestExpectedSlot: 128, - expectedBlockSlots: makeSequence(1, 128), + expectedBlockSlots: makeSequence(1, 256), peers: []*peerData{ { blocks: makeSequence(1, 320), @@ -301,7 +301,7 @@ func TestBlocksQueueLoop(t *testing.T) { t.Error(err) } - if queue.headFetcher.HeadSlot() < uint64(len(tt.expectedBlockSlots)) { + if queue.headFetcher.HeadSlot() < tt.highestExpectedSlot { t.Errorf("Not enough slots synced, want: %v, got: %v", len(tt.expectedBlockSlots), queue.headFetcher.HeadSlot()) }