2020-03-04 17:19:09 +00:00
|
|
|
package initialsync
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
2020-05-18 18:59:03 +00:00
|
|
|
"fmt"
|
2020-03-04 17:19:09 +00:00
|
|
|
"sync"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/kevinms/leakybucket-go"
|
|
|
|
"github.com/libp2p/go-libp2p-core/peer"
|
|
|
|
"github.com/pkg/errors"
|
2021-02-16 07:45:34 +00:00
|
|
|
types "github.com/prysmaticlabs/eth2-types"
|
2020-10-31 21:33:57 +00:00
|
|
|
"github.com/prysmaticlabs/prysm/beacon-chain/db"
|
2020-03-04 17:19:09 +00:00
|
|
|
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
|
2020-11-05 07:27:46 +00:00
|
|
|
p2pTypes "github.com/prysmaticlabs/prysm/beacon-chain/p2p/types"
|
2020-03-04 17:19:09 +00:00
|
|
|
prysmsync "github.com/prysmaticlabs/prysm/beacon-chain/sync"
|
2021-03-02 19:36:03 +00:00
|
|
|
"github.com/prysmaticlabs/prysm/cmd/beacon-chain/flags"
|
2021-09-21 19:59:25 +00:00
|
|
|
"github.com/prysmaticlabs/prysm/config/params"
|
2021-09-15 22:55:11 +00:00
|
|
|
"github.com/prysmaticlabs/prysm/crypto/rand"
|
2021-07-28 21:23:44 +00:00
|
|
|
p2ppb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
|
|
|
|
"github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/block"
|
2020-03-04 17:19:09 +00:00
|
|
|
"github.com/sirupsen/logrus"
|
|
|
|
"go.opencensus.io/trace"
|
|
|
|
)
|
|
|
|
|
2020-03-27 06:54:57 +00:00
|
|
|
const (
|
|
|
|
// maxPendingRequests limits how many concurrent fetch request one can initiate.
|
2020-05-18 18:59:03 +00:00
|
|
|
maxPendingRequests = 64
|
2020-03-27 06:54:57 +00:00
|
|
|
// peersPercentagePerRequest caps percentage of peers to be used in a request.
|
|
|
|
peersPercentagePerRequest = 0.75
|
2020-05-06 21:29:50 +00:00
|
|
|
// handshakePollingInterval is a polling interval for checking the number of received handshakes.
|
|
|
|
handshakePollingInterval = 5 * time.Second
|
2020-05-19 17:48:32 +00:00
|
|
|
// peerLocksPollingInterval is a polling interval for checking if there are stale peer locks.
|
|
|
|
peerLocksPollingInterval = 5 * time.Minute
|
|
|
|
// peerLockMaxAge is maximum time before stale lock is purged.
|
|
|
|
peerLockMaxAge = 60 * time.Minute
|
2020-05-25 16:43:59 +00:00
|
|
|
// nonSkippedSlotsFullSearchEpochs how many epochs to check in full, before resorting to random
|
|
|
|
// sampling of slots once per epoch
|
|
|
|
nonSkippedSlotsFullSearchEpochs = 10
|
2020-08-13 17:33:57 +00:00
|
|
|
// peerFilterCapacityWeight defines how peer's capacity affects peer's score. Provided as
|
|
|
|
// percentage, i.e. 0.3 means capacity will determine 30% of peer's score.
|
|
|
|
peerFilterCapacityWeight = 0.2
|
2020-11-05 07:27:46 +00:00
|
|
|
// backtrackingMaxHops how many hops (during search for common ancestor in backtracking) to do
|
|
|
|
// before giving up.
|
|
|
|
backtrackingMaxHops = 128
|
2020-03-27 06:54:57 +00:00
|
|
|
)
|
|
|
|
|
2020-03-04 17:19:09 +00:00
|
|
|
var (
|
2020-08-15 15:14:59 +00:00
|
|
|
errNoPeersAvailable = errors.New("no peers available, waiting for reconnect")
|
|
|
|
errFetcherCtxIsDone = errors.New("fetcher's context is done, reinitialize")
|
|
|
|
errSlotIsTooHigh = errors.New("slot is higher than the finalized slot")
|
|
|
|
errBlockAlreadyProcessed = errors.New("block is already processed")
|
2020-10-31 11:10:08 +00:00
|
|
|
errParentDoesNotExist = errors.New("beacon node doesn't have a parent in db with root")
|
2020-11-05 07:27:46 +00:00
|
|
|
errNoPeersWithAltBlocks = errors.New("no peers with alternative blocks found")
|
2020-03-04 17:19:09 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
// blocksFetcherConfig is a config to setup the block fetcher.
|
|
|
|
type blocksFetcherConfig struct {
|
2020-10-31 21:33:57 +00:00
|
|
|
chain blockchainService
|
2020-08-13 17:33:57 +00:00
|
|
|
p2p p2p.P2P
|
2020-10-31 21:33:57 +00:00
|
|
|
db db.ReadOnlyDatabase
|
2020-08-13 17:33:57 +00:00
|
|
|
peerFilterCapacityWeight float64
|
2020-08-16 17:51:14 +00:00
|
|
|
mode syncMode
|
2020-03-04 17:19:09 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// blocksFetcher is a service to fetch chain data from peers.
|
|
|
|
// On an incoming requests, requested block range is evenly divided
|
|
|
|
// among available peers (for fair network load distribution).
|
|
|
|
type blocksFetcher struct {
|
2020-03-27 06:54:57 +00:00
|
|
|
sync.Mutex
|
2020-10-31 21:33:57 +00:00
|
|
|
ctx context.Context
|
|
|
|
cancel context.CancelFunc
|
|
|
|
rand *rand.Rand
|
|
|
|
chain blockchainService
|
|
|
|
p2p p2p.P2P
|
|
|
|
db db.ReadOnlyDatabase
|
|
|
|
blocksPerSecond uint64
|
|
|
|
rateLimiter *leakybucket.Collector
|
|
|
|
peerLocks map[peer.ID]*peerLock
|
|
|
|
fetchRequests chan *fetchRequestParams
|
|
|
|
fetchResponses chan *fetchRequestResponse
|
|
|
|
capacityWeight float64 // how remaining capacity affects peer selection
|
|
|
|
mode syncMode // allows to use fetcher in different sync scenarios
|
|
|
|
quit chan struct{} // termination notifier
|
2020-03-04 17:19:09 +00:00
|
|
|
}
|
|
|
|
|
2020-05-19 17:48:32 +00:00
|
|
|
// peerLock restricts fetcher actions on per peer basis. Currently, used for rate limiting.
|
|
|
|
type peerLock struct {
|
|
|
|
sync.Mutex
|
|
|
|
accessed time.Time
|
|
|
|
}
|
|
|
|
|
2020-03-04 17:19:09 +00:00
|
|
|
// fetchRequestParams holds parameters necessary to schedule a fetch request.
|
|
|
|
type fetchRequestParams struct {
|
|
|
|
ctx context.Context // if provided, it is used instead of global fetcher's context
|
2021-02-16 07:45:34 +00:00
|
|
|
start types.Slot // starting slot
|
2020-03-04 17:19:09 +00:00
|
|
|
count uint64 // how many slots to receive (fetcher may return fewer slots)
|
|
|
|
}
|
|
|
|
|
|
|
|
// fetchRequestResponse is a combined type to hold results of both successful executions and errors.
|
|
|
|
// Valid usage pattern will be to check whether result's `err` is nil, before using `blocks`.
|
|
|
|
type fetchRequestResponse struct {
|
2021-02-16 07:45:34 +00:00
|
|
|
pid peer.ID
|
|
|
|
start types.Slot
|
|
|
|
count uint64
|
2021-07-23 20:10:15 +00:00
|
|
|
blocks []block.SignedBeaconBlock
|
2021-02-16 07:45:34 +00:00
|
|
|
err error
|
2020-03-04 17:19:09 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// newBlocksFetcher creates ready to use fetcher.
|
|
|
|
func newBlocksFetcher(ctx context.Context, cfg *blocksFetcherConfig) *blocksFetcher {
|
2020-05-18 18:59:03 +00:00
|
|
|
blocksPerSecond := flags.Get().BlockBatchLimit
|
|
|
|
allowedBlocksBurst := flags.Get().BlockBatchLimitBurstFactor * flags.Get().BlockBatchLimit
|
|
|
|
// Allow fetcher to go almost to the full burst capacity (less a single batch).
|
2020-03-04 17:19:09 +00:00
|
|
|
rateLimiter := leakybucket.NewCollector(
|
2020-05-18 18:59:03 +00:00
|
|
|
float64(blocksPerSecond), int64(allowedBlocksBurst-blocksPerSecond),
|
2020-05-11 22:03:15 +00:00
|
|
|
false /* deleteEmptyBuckets */)
|
2020-03-04 17:19:09 +00:00
|
|
|
|
2020-08-13 17:33:57 +00:00
|
|
|
capacityWeight := cfg.peerFilterCapacityWeight
|
|
|
|
if capacityWeight >= 1 {
|
|
|
|
capacityWeight = peerFilterCapacityWeight
|
|
|
|
}
|
|
|
|
|
2020-05-18 18:59:03 +00:00
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
2020-03-04 17:19:09 +00:00
|
|
|
return &blocksFetcher{
|
2020-10-31 21:33:57 +00:00
|
|
|
ctx: ctx,
|
|
|
|
cancel: cancel,
|
|
|
|
rand: rand.NewGenerator(),
|
|
|
|
chain: cfg.chain,
|
|
|
|
p2p: cfg.p2p,
|
|
|
|
db: cfg.db,
|
|
|
|
blocksPerSecond: uint64(blocksPerSecond),
|
|
|
|
rateLimiter: rateLimiter,
|
|
|
|
peerLocks: make(map[peer.ID]*peerLock),
|
|
|
|
fetchRequests: make(chan *fetchRequestParams, maxPendingRequests),
|
|
|
|
fetchResponses: make(chan *fetchRequestResponse, maxPendingRequests),
|
|
|
|
capacityWeight: capacityWeight,
|
|
|
|
mode: cfg.mode,
|
|
|
|
quit: make(chan struct{}),
|
2020-03-04 17:19:09 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// start boots up the fetcher, which starts listening for incoming fetch requests.
|
|
|
|
func (f *blocksFetcher) start() error {
|
|
|
|
select {
|
|
|
|
case <-f.ctx.Done():
|
2020-03-11 11:21:41 +00:00
|
|
|
return errFetcherCtxIsDone
|
2020-03-04 17:19:09 +00:00
|
|
|
default:
|
|
|
|
go f.loop()
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// stop terminates all fetcher operations.
|
|
|
|
func (f *blocksFetcher) stop() {
|
2020-06-22 11:23:23 +00:00
|
|
|
defer func() {
|
|
|
|
if f.rateLimiter != nil {
|
|
|
|
f.rateLimiter.Free()
|
|
|
|
f.rateLimiter = nil
|
|
|
|
}
|
|
|
|
}()
|
2020-03-04 17:19:09 +00:00
|
|
|
f.cancel()
|
|
|
|
<-f.quit // make sure that loop() is done
|
|
|
|
}
|
|
|
|
|
|
|
|
// requestResponses exposes a channel into which fetcher pushes generated request responses.
|
|
|
|
func (f *blocksFetcher) requestResponses() <-chan *fetchRequestResponse {
|
2020-03-11 11:21:41 +00:00
|
|
|
return f.fetchResponses
|
2020-03-04 17:19:09 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// loop is a main fetcher loop, listens for incoming requests/cancellations, forwards outgoing responses.
|
|
|
|
func (f *blocksFetcher) loop() {
|
|
|
|
defer close(f.quit)
|
|
|
|
|
2020-03-11 11:21:41 +00:00
|
|
|
// Wait for all loop's goroutines to finish, and safely release resources.
|
|
|
|
wg := &sync.WaitGroup{}
|
|
|
|
defer func() {
|
|
|
|
wg.Wait()
|
|
|
|
close(f.fetchResponses)
|
|
|
|
}()
|
|
|
|
|
2020-05-19 17:48:32 +00:00
|
|
|
// Periodically remove stale peer locks.
|
|
|
|
go func() {
|
|
|
|
ticker := time.NewTicker(peerLocksPollingInterval)
|
2020-09-08 18:05:38 +00:00
|
|
|
defer ticker.Stop()
|
2020-05-19 17:48:32 +00:00
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-ticker.C:
|
|
|
|
f.removeStalePeerLocks(peerLockMaxAge)
|
|
|
|
case <-f.ctx.Done():
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
// Main loop.
|
2020-03-04 17:19:09 +00:00
|
|
|
for {
|
2020-03-27 06:54:57 +00:00
|
|
|
// Make sure there is are available peers before processing requests.
|
|
|
|
if _, err := f.waitForMinimumPeers(f.ctx); err != nil {
|
|
|
|
log.Error(err)
|
|
|
|
}
|
|
|
|
|
2020-03-04 17:19:09 +00:00
|
|
|
select {
|
|
|
|
case <-f.ctx.Done():
|
2020-03-11 11:21:41 +00:00
|
|
|
log.Debug("Context closed, exiting goroutine (blocks fetcher)")
|
2020-03-04 17:19:09 +00:00
|
|
|
return
|
2020-03-11 11:21:41 +00:00
|
|
|
case req := <-f.fetchRequests:
|
|
|
|
wg.Add(1)
|
|
|
|
go func() {
|
|
|
|
defer wg.Done()
|
2020-03-14 18:21:07 +00:00
|
|
|
select {
|
|
|
|
case <-f.ctx.Done():
|
|
|
|
case f.fetchResponses <- f.handleRequest(req.ctx, req.start, req.count):
|
|
|
|
}
|
2020-03-11 11:21:41 +00:00
|
|
|
}()
|
2020-03-04 17:19:09 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// scheduleRequest adds request to incoming queue.
|
2021-02-16 07:45:34 +00:00
|
|
|
func (f *blocksFetcher) scheduleRequest(ctx context.Context, start types.Slot, count uint64) error {
|
2020-03-14 18:21:07 +00:00
|
|
|
if ctx.Err() != nil {
|
|
|
|
return ctx.Err()
|
|
|
|
}
|
|
|
|
|
|
|
|
request := &fetchRequestParams{
|
|
|
|
ctx: ctx,
|
|
|
|
start: start,
|
|
|
|
count: count,
|
|
|
|
}
|
2020-03-04 17:19:09 +00:00
|
|
|
select {
|
|
|
|
case <-f.ctx.Done():
|
2020-03-11 11:21:41 +00:00
|
|
|
return errFetcherCtxIsDone
|
2020-03-14 18:21:07 +00:00
|
|
|
case f.fetchRequests <- request:
|
2020-03-04 17:19:09 +00:00
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// handleRequest parses fetch request and forwards it to response builder.
|
2021-02-16 07:45:34 +00:00
|
|
|
func (f *blocksFetcher) handleRequest(ctx context.Context, start types.Slot, count uint64) *fetchRequestResponse {
|
2020-03-04 17:19:09 +00:00
|
|
|
ctx, span := trace.StartSpan(ctx, "initialsync.handleRequest")
|
|
|
|
defer span.End()
|
|
|
|
|
2020-03-14 18:21:07 +00:00
|
|
|
response := &fetchRequestResponse{
|
|
|
|
start: start,
|
|
|
|
count: count,
|
2021-07-23 20:10:15 +00:00
|
|
|
blocks: []block.SignedBeaconBlock{},
|
2020-03-14 18:21:07 +00:00
|
|
|
err: nil,
|
2020-03-11 11:21:41 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if ctx.Err() != nil {
|
2020-03-14 18:21:07 +00:00
|
|
|
response.err = ctx.Err()
|
|
|
|
return response
|
2020-03-04 17:19:09 +00:00
|
|
|
}
|
|
|
|
|
2020-10-31 21:33:57 +00:00
|
|
|
_, targetEpoch, peers := f.calculateHeadAndTargetEpochs()
|
2020-03-04 17:19:09 +00:00
|
|
|
if len(peers) == 0 {
|
2020-03-14 18:21:07 +00:00
|
|
|
response.err = errNoPeersAvailable
|
|
|
|
return response
|
2020-03-04 17:19:09 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Short circuit start far exceeding the highest finalized epoch in some infinite loop.
|
2020-08-16 17:51:14 +00:00
|
|
|
if f.mode == modeStopOnFinalizedEpoch {
|
2021-02-16 07:45:34 +00:00
|
|
|
highestFinalizedSlot := params.BeaconConfig().SlotsPerEpoch.Mul(uint64(targetEpoch + 1))
|
2020-08-16 17:51:14 +00:00
|
|
|
if start > highestFinalizedSlot {
|
2021-02-12 23:44:46 +00:00
|
|
|
response.err = fmt.Errorf("%w, slot: %d, highest finalized slot: %d",
|
2020-08-16 17:51:14 +00:00
|
|
|
errSlotIsTooHigh, start, highestFinalizedSlot)
|
|
|
|
return response
|
|
|
|
}
|
2020-03-04 17:19:09 +00:00
|
|
|
}
|
|
|
|
|
2020-08-13 08:42:21 +00:00
|
|
|
response.blocks, response.pid, response.err = f.fetchBlocksFromPeer(ctx, start, count, peers)
|
2020-03-14 18:21:07 +00:00
|
|
|
return response
|
2020-03-04 17:19:09 +00:00
|
|
|
}
|
|
|
|
|
2020-06-25 15:26:30 +00:00
|
|
|
// fetchBlocksFromPeer fetches blocks from a single randomly selected peer.
|
|
|
|
func (f *blocksFetcher) fetchBlocksFromPeer(
|
2020-05-18 18:59:03 +00:00
|
|
|
ctx context.Context,
|
2021-02-16 07:45:34 +00:00
|
|
|
start types.Slot, count uint64,
|
2020-05-18 18:59:03 +00:00
|
|
|
peers []peer.ID,
|
2021-07-23 20:10:15 +00:00
|
|
|
) ([]block.SignedBeaconBlock, peer.ID, error) {
|
2020-06-25 15:26:30 +00:00
|
|
|
ctx, span := trace.StartSpan(ctx, "initialsync.fetchBlocksFromPeer")
|
2020-05-18 18:59:03 +00:00
|
|
|
defer span.End()
|
|
|
|
|
2020-11-04 23:29:50 +00:00
|
|
|
peers = f.filterPeers(ctx, peers, peersPercentagePerRequest)
|
2020-05-18 18:59:03 +00:00
|
|
|
req := &p2ppb.BeaconBlocksByRangeRequest{
|
|
|
|
StartSlot: start,
|
|
|
|
Count: count,
|
|
|
|
Step: 1,
|
|
|
|
}
|
|
|
|
for i := 0; i < len(peers); i++ {
|
2020-11-04 23:29:50 +00:00
|
|
|
if blocks, err := f.requestBlocks(ctx, req, peers[i]); err == nil {
|
2020-12-03 00:45:20 +00:00
|
|
|
f.p2p.Peers().Scorers().BlockProviderScorer().Touch(peers[i])
|
2020-08-13 08:42:21 +00:00
|
|
|
return blocks, peers[i], err
|
2020-05-18 18:59:03 +00:00
|
|
|
}
|
|
|
|
}
|
2020-11-04 23:29:50 +00:00
|
|
|
return nil, "", errNoPeersAvailable
|
2020-05-18 18:59:03 +00:00
|
|
|
}
|
|
|
|
|
2020-03-04 17:19:09 +00:00
|
|
|
// requestBlocks is a wrapper for handling BeaconBlocksByRangeRequest requests/streams.
|
|
|
|
func (f *blocksFetcher) requestBlocks(
|
|
|
|
ctx context.Context,
|
|
|
|
req *p2ppb.BeaconBlocksByRangeRequest,
|
|
|
|
pid peer.ID,
|
2021-07-23 20:10:15 +00:00
|
|
|
) ([]block.SignedBeaconBlock, error) {
|
2020-06-25 15:26:30 +00:00
|
|
|
if ctx.Err() != nil {
|
|
|
|
return nil, ctx.Err()
|
|
|
|
}
|
2021-01-25 21:27:30 +00:00
|
|
|
l := f.peerLock(pid)
|
2020-05-19 17:48:32 +00:00
|
|
|
l.Lock()
|
2020-05-18 18:59:03 +00:00
|
|
|
log.WithFields(logrus.Fields{
|
2020-05-19 17:48:32 +00:00
|
|
|
"peer": pid,
|
|
|
|
"start": req.StartSlot,
|
|
|
|
"count": req.Count,
|
|
|
|
"step": req.Step,
|
|
|
|
"capacity": f.rateLimiter.Remaining(pid.String()),
|
2020-08-13 17:33:57 +00:00
|
|
|
"score": f.p2p.Peers().Scorers().BlockProviderScorer().FormatScorePretty(pid),
|
2020-05-18 18:59:03 +00:00
|
|
|
}).Debug("Requesting blocks")
|
2020-03-04 17:19:09 +00:00
|
|
|
if f.rateLimiter.Remaining(pid.String()) < int64(req.Count) {
|
2020-11-04 20:09:19 +00:00
|
|
|
if err := f.waitForBandwidth(pid); err != nil {
|
|
|
|
return nil, err
|
2020-05-19 17:48:32 +00:00
|
|
|
}
|
2020-03-04 17:19:09 +00:00
|
|
|
}
|
|
|
|
f.rateLimiter.Add(pid.String(), int64(req.Count))
|
2020-05-19 17:48:32 +00:00
|
|
|
l.Unlock()
|
2021-05-17 19:25:59 +00:00
|
|
|
return prysmsync.SendBeaconBlocksByRangeRequest(ctx, f.chain, f.p2p, pid, req, nil)
|
2020-11-04 20:09:19 +00:00
|
|
|
}
|
|
|
|
|
2020-11-05 07:27:46 +00:00
|
|
|
// requestBlocksByRoot is a wrapper for handling BeaconBlockByRootsReq requests/streams.
|
|
|
|
func (f *blocksFetcher) requestBlocksByRoot(
|
|
|
|
ctx context.Context,
|
|
|
|
req *p2pTypes.BeaconBlockByRootsReq,
|
|
|
|
pid peer.ID,
|
2021-07-23 20:10:15 +00:00
|
|
|
) ([]block.SignedBeaconBlock, error) {
|
2020-11-05 07:27:46 +00:00
|
|
|
if ctx.Err() != nil {
|
|
|
|
return nil, ctx.Err()
|
|
|
|
}
|
2021-01-25 21:27:30 +00:00
|
|
|
l := f.peerLock(pid)
|
2020-11-05 07:27:46 +00:00
|
|
|
l.Lock()
|
|
|
|
log.WithFields(logrus.Fields{
|
|
|
|
"peer": pid,
|
|
|
|
"numRoots": len(*req),
|
|
|
|
"capacity": f.rateLimiter.Remaining(pid.String()),
|
|
|
|
"score": f.p2p.Peers().Scorers().BlockProviderScorer().FormatScorePretty(pid),
|
|
|
|
}).Debug("Requesting blocks (by roots)")
|
|
|
|
if f.rateLimiter.Remaining(pid.String()) < int64(len(*req)) {
|
|
|
|
if err := f.waitForBandwidth(pid); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
f.rateLimiter.Add(pid.String(), int64(len(*req)))
|
|
|
|
l.Unlock()
|
|
|
|
|
2021-05-17 19:25:59 +00:00
|
|
|
return prysmsync.SendBeaconBlocksByRootRequest(ctx, f.chain, f.p2p, pid, req, nil)
|
2020-11-05 07:27:46 +00:00
|
|
|
}
|
|
|
|
|
2020-11-04 20:09:19 +00:00
|
|
|
// waitForBandwidth blocks up until peer's bandwidth is restored.
|
|
|
|
func (f *blocksFetcher) waitForBandwidth(pid peer.ID) error {
|
|
|
|
log.WithField("peer", pid).Debug("Slowing down for rate limit")
|
|
|
|
timer := time.NewTimer(f.rateLimiter.TillEmpty(pid.String()))
|
|
|
|
defer timer.Stop()
|
|
|
|
select {
|
|
|
|
case <-f.ctx.Done():
|
|
|
|
return errFetcherCtxIsDone
|
|
|
|
case <-timer.C:
|
|
|
|
// Peer has gathered enough capacity to be polled again.
|
2020-03-04 17:19:09 +00:00
|
|
|
}
|
2020-11-04 20:09:19 +00:00
|
|
|
return nil
|
2020-03-04 17:19:09 +00:00
|
|
|
}
|