Init sync update highest slot (#5298)

* updates highest slot before wrapping up
* more verbose error message
* error w/o stack
* revert back
This commit is contained in:
Victor Farazdagi 2020-04-04 17:11:38 +03:00 committed by GitHub
parent 9aac572c21
commit f440c815f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 37 additions and 17 deletions

View File

@ -507,3 +507,10 @@ func (f *blocksFetcher) nonSkippedSlotAfter(ctx context.Context, slot uint64) (u
return slot, nil
}
// bestFinalizedSlot returns the highest finalized slot of the majority of connected peers.
func (f *blocksFetcher) bestFinalizedSlot() uint64 {
headEpoch := helpers.SlotToEpoch(f.headFetcher.HeadSlot())
_, finalizedEpoch, _ := f.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, headEpoch)
return helpers.StartSlot(finalizedEpoch)
}

View File

@ -9,8 +9,8 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/shared/mathutil"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/sirupsen/logrus"
)
const (
@ -35,6 +35,7 @@ type blocksProvider interface {
requestResponses() <-chan *fetchRequestResponse
scheduleRequest(ctx context.Context, start, count uint64) error
nonSkippedSlotAfter(ctx context.Context, slot uint64) (uint64, error)
bestFinalizedSlot() uint64
start() error
stop()
}
@ -72,11 +73,15 @@ func newBlocksQueue(ctx context.Context, cfg *blocksQueueConfig) *blocksQueue {
p2p: cfg.p2p,
})
}
highestExpectedSlot := cfg.highestExpectedSlot
if highestExpectedSlot <= cfg.startSlot {
highestExpectedSlot = blocksFetcher.bestFinalizedSlot()
}
queue := &blocksQueue{
ctx: ctx,
cancel: cancel,
highestExpectedSlot: cfg.highestExpectedSlot,
highestExpectedSlot: highestExpectedSlot,
blocksFetcher: blocksFetcher,
headFetcher: cfg.headFetcher,
fetchedBlocks: make(chan *eth.SignedBeaconBlock, allowedBlocksPerSecond),
@ -140,7 +145,13 @@ func (q *blocksQueue) loop() {
ticker := time.NewTicker(pollingInterval)
tickerEvents := []eventID{eventSchedule, eventReadyToSend, eventCheckStale, eventExtendWindow}
for {
if q.headFetcher.HeadSlot() >= q.highestExpectedSlot {
// By the time initial sync is complete, highest slot may increase, re-check.
if q.highestExpectedSlot < q.blocksFetcher.bestFinalizedSlot() {
q.highestExpectedSlot = q.blocksFetcher.bestFinalizedSlot()
continue
}
log.Debug("Highest expected slot reached")
q.cancel()
}
@ -156,7 +167,11 @@ func (q *blocksQueue) loop() {
// Trigger events on each epoch's state machine.
for _, event := range tickerEvents {
if err := q.state.trigger(event, state.epoch, data); err != nil {
log.WithError(err).Debug("Can not trigger event")
log.WithFields(logrus.Fields{
"event": event,
"epoch": state.epoch,
"error": err.Error(),
}).Debug("Can not trigger event")
}
}
@ -186,7 +201,11 @@ func (q *blocksQueue) loop() {
if ind, ok := q.state.findEpochState(epoch); ok {
state := q.state.epochs[ind]
if err := q.state.trigger(eventDataReceived, state.epoch, response); err != nil {
log.WithError(err).Debug("Can not trigger event")
log.WithFields(logrus.Fields{
"event": eventDataReceived,
"epoch": state.epoch,
"error": err.Error(),
}).Debug("Can not trigger event")
state.setState(stateNew)
continue
}
@ -203,13 +222,7 @@ func (q *blocksQueue) loop() {
func (q *blocksQueue) onScheduleEvent(ctx context.Context) eventHandlerFn {
return func(es *epochState, in interface{}) (stateID, error) {
data := in.(*fetchRequestParams)
start := data.start
count := mathutil.Min(data.count, q.highestExpectedSlot-start+1)
if count <= 0 {
return es.state, errSlotIsTooHigh
}
if err := q.blocksFetcher.scheduleRequest(ctx, start, count); err != nil {
if err := q.blocksFetcher.scheduleRequest(ctx, data.start, data.count); err != nil {
return es.state, err
}
return stateScheduled, nil

View File

@ -157,8 +157,8 @@ func TestBlocksQueueLoop(t *testing.T) {
}{
{
name: "Single peer with all blocks",
highestExpectedSlot: 251,
expectedBlockSlots: makeSequence(1, 251),
highestExpectedSlot: 251, // will be auto-fixed to 256 (to 8th epoch), by queue
expectedBlockSlots: makeSequence(1, 256),
peers: []*peerData{
{
blocks: makeSequence(1, 320),
@ -169,8 +169,8 @@ func TestBlocksQueueLoop(t *testing.T) {
},
{
name: "Multiple peers with all blocks",
highestExpectedSlot: 251,
expectedBlockSlots: makeSequence(1, 251),
highestExpectedSlot: 256,
expectedBlockSlots: makeSequence(1, 256),
peers: []*peerData{
{
blocks: makeSequence(1, 320),
@ -224,7 +224,7 @@ func TestBlocksQueueLoop(t *testing.T) {
{
name: "Multiple peers with failures",
highestExpectedSlot: 128,
expectedBlockSlots: makeSequence(1, 128),
expectedBlockSlots: makeSequence(1, 256),
peers: []*peerData{
{
blocks: makeSequence(1, 320),
@ -301,7 +301,7 @@ func TestBlocksQueueLoop(t *testing.T) {
t.Error(err)
}
if queue.headFetcher.HeadSlot() < uint64(len(tt.expectedBlockSlots)) {
if queue.headFetcher.HeadSlot() < tt.highestExpectedSlot {
t.Errorf("Not enough slots synced, want: %v, got: %v",
len(tt.expectedBlockSlots), queue.headFetcher.HeadSlot())
}