package initialsync import ( "context" "fmt" "io" "math" "sort" "sync" "time" "github.com/kevinms/leakybucket-go" streamhelpers "github.com/libp2p/go-libp2p-core/helpers" "github.com/libp2p/go-libp2p-core/peer" "github.com/pkg/errors" eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" "github.com/prysmaticlabs/prysm/beacon-chain/blockchain" "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" "github.com/prysmaticlabs/prysm/beacon-chain/flags" "github.com/prysmaticlabs/prysm/beacon-chain/p2p" prysmsync "github.com/prysmaticlabs/prysm/beacon-chain/sync" p2ppb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" "github.com/prysmaticlabs/prysm/shared/mathutil" "github.com/prysmaticlabs/prysm/shared/params" "github.com/prysmaticlabs/prysm/shared/rand" "github.com/prysmaticlabs/prysm/shared/roughtime" "github.com/sirupsen/logrus" "go.opencensus.io/trace" ) const ( // maxPendingRequests limits how many concurrent fetch request one can initiate. maxPendingRequests = 64 // peersPercentagePerRequest caps percentage of peers to be used in a request. peersPercentagePerRequest = 0.75 // handshakePollingInterval is a polling interval for checking the number of received handshakes. handshakePollingInterval = 5 * time.Second // peerLocksPollingInterval is a polling interval for checking if there are stale peer locks. peerLocksPollingInterval = 5 * time.Minute // peerLockMaxAge is maximum time before stale lock is purged. peerLockMaxAge = 60 * time.Minute // nonSkippedSlotsFullSearchEpochs how many epochs to check in full, before resorting to random // sampling of slots once per epoch nonSkippedSlotsFullSearchEpochs = 10 ) var ( errNoPeersAvailable = errors.New("no peers available, waiting for reconnect") errFetcherCtxIsDone = errors.New("fetcher's context is done, reinitialize") errSlotIsTooHigh = errors.New("slot is higher than the finalized slot") ) // blocksFetcherConfig is a config to setup the block fetcher. type blocksFetcherConfig struct { headFetcher blockchain.HeadFetcher p2p p2p.P2P } // blocksFetcher is a service to fetch chain data from peers. // On an incoming requests, requested block range is evenly divided // among available peers (for fair network load distribution). type blocksFetcher struct { sync.Mutex ctx context.Context cancel context.CancelFunc rand *rand.Rand headFetcher blockchain.HeadFetcher p2p p2p.P2P blocksPerSecond uint64 rateLimiter *leakybucket.Collector peerLocks map[peer.ID]*peerLock fetchRequests chan *fetchRequestParams fetchResponses chan *fetchRequestResponse quit chan struct{} // termination notifier } // peerLock restricts fetcher actions on per peer basis. Currently, used for rate limiting. type peerLock struct { sync.Mutex accessed time.Time } // fetchRequestParams holds parameters necessary to schedule a fetch request. type fetchRequestParams struct { ctx context.Context // if provided, it is used instead of global fetcher's context start uint64 // starting slot count uint64 // how many slots to receive (fetcher may return fewer slots) } // fetchRequestResponse is a combined type to hold results of both successful executions and errors. // Valid usage pattern will be to check whether result's `err` is nil, before using `blocks`. type fetchRequestResponse struct { start, count uint64 blocks []*eth.SignedBeaconBlock err error } // newBlocksFetcher creates ready to use fetcher. func newBlocksFetcher(ctx context.Context, cfg *blocksFetcherConfig) *blocksFetcher { blocksPerSecond := flags.Get().BlockBatchLimit allowedBlocksBurst := flags.Get().BlockBatchLimitBurstFactor * flags.Get().BlockBatchLimit // Allow fetcher to go almost to the full burst capacity (less a single batch). rateLimiter := leakybucket.NewCollector( float64(blocksPerSecond), int64(allowedBlocksBurst-blocksPerSecond), false /* deleteEmptyBuckets */) ctx, cancel := context.WithCancel(ctx) return &blocksFetcher{ ctx: ctx, cancel: cancel, rand: rand.NewGenerator(), headFetcher: cfg.headFetcher, p2p: cfg.p2p, blocksPerSecond: uint64(blocksPerSecond), rateLimiter: rateLimiter, peerLocks: make(map[peer.ID]*peerLock), fetchRequests: make(chan *fetchRequestParams, maxPendingRequests), fetchResponses: make(chan *fetchRequestResponse, maxPendingRequests), quit: make(chan struct{}), } } // start boots up the fetcher, which starts listening for incoming fetch requests. func (f *blocksFetcher) start() error { select { case <-f.ctx.Done(): return errFetcherCtxIsDone default: go f.loop() return nil } } // stop terminates all fetcher operations. func (f *blocksFetcher) stop() { defer func() { if f.rateLimiter != nil { f.rateLimiter.Free() f.rateLimiter = nil } }() f.cancel() <-f.quit // make sure that loop() is done } // requestResponses exposes a channel into which fetcher pushes generated request responses. func (f *blocksFetcher) requestResponses() <-chan *fetchRequestResponse { return f.fetchResponses } // loop is a main fetcher loop, listens for incoming requests/cancellations, forwards outgoing responses. func (f *blocksFetcher) loop() { defer close(f.quit) // Wait for all loop's goroutines to finish, and safely release resources. wg := &sync.WaitGroup{} defer func() { wg.Wait() close(f.fetchResponses) }() // Periodically remove stale peer locks. go func() { ticker := time.NewTicker(peerLocksPollingInterval) for { select { case <-ticker.C: f.removeStalePeerLocks(peerLockMaxAge) case <-f.ctx.Done(): ticker.Stop() return } } }() // Main loop. for { // Make sure there is are available peers before processing requests. if _, err := f.waitForMinimumPeers(f.ctx); err != nil { log.Error(err) } select { case <-f.ctx.Done(): log.Debug("Context closed, exiting goroutine (blocks fetcher)") return case req := <-f.fetchRequests: wg.Add(1) go func() { defer wg.Done() select { case <-f.ctx.Done(): case f.fetchResponses <- f.handleRequest(req.ctx, req.start, req.count): } }() } } } // scheduleRequest adds request to incoming queue. func (f *blocksFetcher) scheduleRequest(ctx context.Context, start, count uint64) error { if ctx.Err() != nil { return ctx.Err() } request := &fetchRequestParams{ ctx: ctx, start: start, count: count, } select { case <-f.ctx.Done(): return errFetcherCtxIsDone case f.fetchRequests <- request: } return nil } // handleRequest parses fetch request and forwards it to response builder. func (f *blocksFetcher) handleRequest(ctx context.Context, start, count uint64) *fetchRequestResponse { ctx, span := trace.StartSpan(ctx, "initialsync.handleRequest") defer span.End() response := &fetchRequestResponse{ start: start, count: count, blocks: []*eth.SignedBeaconBlock{}, err: nil, } if ctx.Err() != nil { response.err = ctx.Err() return response } headEpoch := helpers.SlotToEpoch(f.headFetcher.HeadSlot()) finalizedEpoch, peers := f.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, headEpoch) if len(peers) == 0 { response.err = errNoPeersAvailable return response } // Short circuit start far exceeding the highest finalized epoch in some infinite loop. highestFinalizedSlot := helpers.StartSlot(finalizedEpoch + 1) if start > highestFinalizedSlot { response.err = fmt.Errorf("%v, slot: %d, higest finilized slot: %d", errSlotIsTooHigh, start, highestFinalizedSlot) return response } response.blocks, response.err = f.fetchBlocksFromPeer(ctx, start, count, peers) return response } // fetchBlocksFromPeer fetches blocks from a single randomly selected peer. func (f *blocksFetcher) fetchBlocksFromPeer( ctx context.Context, start, count uint64, peers []peer.ID, ) ([]*eth.SignedBeaconBlock, error) { ctx, span := trace.StartSpan(ctx, "initialsync.fetchBlocksFromPeer") defer span.End() blocks := []*eth.SignedBeaconBlock{} var err error peers, err = f.filterPeers(peers, peersPercentagePerRequest) if err != nil { return blocks, err } if len(peers) == 0 { return blocks, errNoPeersAvailable } req := &p2ppb.BeaconBlocksByRangeRequest{ StartSlot: start, Count: count, Step: 1, } for i := 0; i < len(peers); i++ { if blocks, err = f.requestBlocks(ctx, req, peers[i]); err == nil { return blocks, err } } return blocks, nil } // requestBlocks is a wrapper for handling BeaconBlocksByRangeRequest requests/streams. func (f *blocksFetcher) requestBlocks( ctx context.Context, req *p2ppb.BeaconBlocksByRangeRequest, pid peer.ID, ) ([]*eth.SignedBeaconBlock, error) { if ctx.Err() != nil { return nil, ctx.Err() } l := f.getPeerLock(pid) if l == nil { return nil, errors.New("cannot obtain lock") } l.Lock() log.WithFields(logrus.Fields{ "peer": pid, "start": req.StartSlot, "count": req.Count, "step": req.Step, "capacity": f.rateLimiter.Remaining(pid.String()), }).Debug("Requesting blocks") if f.rateLimiter.Remaining(pid.String()) < int64(req.Count) { log.WithField("peer", pid).Debug("Slowing down for rate limit") timer := time.NewTimer(f.rateLimiter.TillEmpty(pid.String())) select { case <-f.ctx.Done(): timer.Stop() return nil, errFetcherCtxIsDone case <-timer.C: // Peer has gathered enough capacity to be polled again. } } f.rateLimiter.Add(pid.String(), int64(req.Count)) l.Unlock() stream, err := f.p2p.Send(ctx, req, p2p.RPCBlocksByRangeTopic, pid) if err != nil { return nil, err } defer func() { if err := streamhelpers.FullClose(stream); err != nil { log.WithError(err).Errorf("Failed to close stream with protocol %s", stream.Protocol()) } }() resp := make([]*eth.SignedBeaconBlock, 0, req.Count) for i := uint64(0); ; i++ { blk, err := prysmsync.ReadChunkedBlock(stream, f.p2p) if err == io.EOF { break } // exit if more than max request blocks are returned if i >= params.BeaconNetworkConfig().MaxRequestBlocks { break } if err != nil { return nil, err } resp = append(resp, blk) } return resp, nil } // getPeerLock returns peer lock for a given peer. If lock is not found, it is created. func (f *blocksFetcher) getPeerLock(pid peer.ID) *peerLock { f.Lock() defer f.Unlock() if lock, ok := f.peerLocks[pid]; ok { lock.accessed = roughtime.Now() return lock } f.peerLocks[pid] = &peerLock{ Mutex: sync.Mutex{}, accessed: roughtime.Now(), } return f.peerLocks[pid] } // removeStalePeerLocks is a cleanup procedure which removes stale locks. func (f *blocksFetcher) removeStalePeerLocks(age time.Duration) { f.Lock() defer f.Unlock() for peerID, lock := range f.peerLocks { if time.Since(lock.accessed) >= age { lock.Lock() delete(f.peerLocks, peerID) lock.Unlock() } } } // selectFailOverPeer randomly selects fail over peer from the list of available peers. func (f *blocksFetcher) selectFailOverPeer(excludedPID peer.ID, peers []peer.ID) (peer.ID, error) { if len(peers) == 0 { return "", errNoPeersAvailable } if len(peers) == 1 && peers[0] == excludedPID { return "", errNoPeersAvailable } ind := f.rand.Int() % len(peers) if peers[ind] == excludedPID { return f.selectFailOverPeer(excludedPID, append(peers[:ind], peers[ind+1:]...)) } return peers[ind], nil } // waitForMinimumPeers spins and waits up until enough peers are available. func (f *blocksFetcher) waitForMinimumPeers(ctx context.Context) ([]peer.ID, error) { required := params.BeaconConfig().MaxPeersToSync if flags.Get().MinimumSyncPeers < required { required = flags.Get().MinimumSyncPeers } for { if ctx.Err() != nil { return nil, ctx.Err() } headEpoch := helpers.SlotToEpoch(f.headFetcher.HeadSlot()) _, peers := f.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, headEpoch) if len(peers) >= required { return peers, nil } log.WithFields(logrus.Fields{ "suitable": len(peers), "required": required}).Info("Waiting for enough suitable peers before syncing") time.Sleep(handshakePollingInterval) } } // filterPeers returns transformed list of peers, // weight ordered or randomized, constrained if necessary. func (f *blocksFetcher) filterPeers(peers []peer.ID, peersPercentage float64) ([]peer.ID, error) { if len(peers) == 0 { return peers, nil } // Shuffle peers to prevent a bad peer from // stalling sync with invalid blocks. f.rand.Shuffle(len(peers), func(i, j int) { peers[i], peers[j] = peers[j], peers[i] }) // Select sub-sample from peers (honoring min-max invariants). required := params.BeaconConfig().MaxPeersToSync if flags.Get().MinimumSyncPeers < required { required = flags.Get().MinimumSyncPeers } limit := uint64(math.Round(float64(len(peers)) * peersPercentage)) limit = mathutil.Max(limit, uint64(required)) limit = mathutil.Min(limit, uint64(len(peers))) peers = peers[:limit] // Order peers by remaining capacity, effectively turning in-order // round robin peer processing into a weighted one (peers with higher // remaining capacity are preferred). Peers with the same capacity // are selected at random, since we have already shuffled peers // at this point. sort.SliceStable(peers, func(i, j int) bool { cap1 := f.rateLimiter.Remaining(peers[i].String()) cap2 := f.rateLimiter.Remaining(peers[j].String()) return cap1 > cap2 }) return peers, nil } // nonSkippedSlotAfter checks slots after the given one in an attempt to find a non-empty future slot. // For efficiency only one random slot is checked per epoch, so returned slot might not be the first // non-skipped slot. This shouldn't be a problem, as in case of adversary peer, we might get incorrect // data anyway, so code that relies on this function must be robust enough to re-request, if no progress // is possible with a returned value. func (f *blocksFetcher) nonSkippedSlotAfter(ctx context.Context, slot uint64) (uint64, error) { ctx, span := trace.StartSpan(ctx, "initialsync.nonSkippedSlotAfter") defer span.End() headEpoch := helpers.SlotToEpoch(f.headFetcher.HeadSlot()) epoch, peers := f.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, headEpoch) var err error peers, err = f.filterPeers(peers, peersPercentagePerRequest) if err != nil { return 0, err } if len(peers) == 0 { return 0, errNoPeersAvailable } slotsPerEpoch := params.BeaconConfig().SlotsPerEpoch pidInd := 0 fetch := func(pid peer.ID, start, count, step uint64) (uint64, error) { req := &p2ppb.BeaconBlocksByRangeRequest{ StartSlot: start, Count: count, Step: step, } blocks, err := f.requestBlocks(ctx, req, pid) if err != nil { return 0, err } if len(blocks) > 0 { for _, block := range blocks { if block.Block.Slot > slot { return block.Block.Slot, nil } } } return 0, nil } // Start by checking several epochs fully, w/o resorting to random sampling. start := slot + 1 end := start + nonSkippedSlotsFullSearchEpochs*slotsPerEpoch for ind := start; ind < end; ind += slotsPerEpoch { nextSlot, err := fetch(peers[pidInd%len(peers)], ind, slotsPerEpoch, 1) if err != nil { return 0, err } if nextSlot > slot { return nextSlot, nil } pidInd++ } // Quickly find the close enough epoch where a non-empty slot definitely exists. // Only single random slot per epoch is checked - allowing to move forward relatively quickly. slot = slot + nonSkippedSlotsFullSearchEpochs*slotsPerEpoch upperBoundSlot := helpers.StartSlot(epoch + 1) for ind := slot + 1; ind < upperBoundSlot; ind += (slotsPerEpoch * slotsPerEpoch) / 2 { start := ind + uint64(f.rand.Intn(int(slotsPerEpoch))) nextSlot, err := fetch(peers[pidInd%len(peers)], start, slotsPerEpoch/2, slotsPerEpoch) if err != nil { return 0, err } pidInd++ if nextSlot > slot && upperBoundSlot >= nextSlot { upperBoundSlot = nextSlot break } } // Epoch with non-empty slot is located. Check all slots within two nearby epochs. if upperBoundSlot > slotsPerEpoch { upperBoundSlot -= slotsPerEpoch } upperBoundSlot = helpers.StartSlot(helpers.SlotToEpoch(upperBoundSlot)) nextSlot, err := fetch(peers[pidInd%len(peers)], upperBoundSlot, slotsPerEpoch*2, 1) if err != nil { return 0, err } if nextSlot < slot || helpers.StartSlot(epoch+1) < nextSlot { return 0, errors.New("invalid range for non-skipped slot") } return nextSlot, nil } // bestFinalizedSlot returns the highest finalized slot of the majority of connected peers. func (f *blocksFetcher) bestFinalizedSlot() uint64 { headEpoch := helpers.SlotToEpoch(f.headFetcher.HeadSlot()) finalizedEpoch, _ := f.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, headEpoch) return helpers.StartSlot(finalizedEpoch) }