prysm-pulse/beacon-chain/sync/initial-sync/blocks_fetcher.go
Raul Jordan 9145310647
Eliminate Proto V2 Namespace (#9297)
* get rid of v2 in prysm codebase

* replace block2

* builds

* terence comments

* gazelle
2021-07-28 21:23:44 +00:00

370 lines
12 KiB
Go

package initialsync
import (
"context"
"fmt"
"sync"
"time"
"github.com/kevinms/leakybucket-go"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/pkg/errors"
types "github.com/prysmaticlabs/eth2-types"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
p2pTypes "github.com/prysmaticlabs/prysm/beacon-chain/p2p/types"
prysmsync "github.com/prysmaticlabs/prysm/beacon-chain/sync"
"github.com/prysmaticlabs/prysm/cmd/beacon-chain/flags"
p2ppb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/block"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/rand"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)
const (
// maxPendingRequests limits how many concurrent fetch request one can initiate.
maxPendingRequests = 64
// peersPercentagePerRequest caps percentage of peers to be used in a request.
peersPercentagePerRequest = 0.75
// handshakePollingInterval is a polling interval for checking the number of received handshakes.
handshakePollingInterval = 5 * time.Second
// peerLocksPollingInterval is a polling interval for checking if there are stale peer locks.
peerLocksPollingInterval = 5 * time.Minute
// peerLockMaxAge is maximum time before stale lock is purged.
peerLockMaxAge = 60 * time.Minute
// nonSkippedSlotsFullSearchEpochs how many epochs to check in full, before resorting to random
// sampling of slots once per epoch
nonSkippedSlotsFullSearchEpochs = 10
// peerFilterCapacityWeight defines how peer's capacity affects peer's score. Provided as
// percentage, i.e. 0.3 means capacity will determine 30% of peer's score.
peerFilterCapacityWeight = 0.2
// backtrackingMaxHops how many hops (during search for common ancestor in backtracking) to do
// before giving up.
backtrackingMaxHops = 128
)
var (
errNoPeersAvailable = errors.New("no peers available, waiting for reconnect")
errFetcherCtxIsDone = errors.New("fetcher's context is done, reinitialize")
errSlotIsTooHigh = errors.New("slot is higher than the finalized slot")
errBlockAlreadyProcessed = errors.New("block is already processed")
errParentDoesNotExist = errors.New("beacon node doesn't have a parent in db with root")
errNoPeersWithAltBlocks = errors.New("no peers with alternative blocks found")
)
// blocksFetcherConfig is a config to setup the block fetcher.
type blocksFetcherConfig struct {
chain blockchainService
p2p p2p.P2P
db db.ReadOnlyDatabase
peerFilterCapacityWeight float64
mode syncMode
}
// blocksFetcher is a service to fetch chain data from peers.
// On an incoming requests, requested block range is evenly divided
// among available peers (for fair network load distribution).
type blocksFetcher struct {
sync.Mutex
ctx context.Context
cancel context.CancelFunc
rand *rand.Rand
chain blockchainService
p2p p2p.P2P
db db.ReadOnlyDatabase
blocksPerSecond uint64
rateLimiter *leakybucket.Collector
peerLocks map[peer.ID]*peerLock
fetchRequests chan *fetchRequestParams
fetchResponses chan *fetchRequestResponse
capacityWeight float64 // how remaining capacity affects peer selection
mode syncMode // allows to use fetcher in different sync scenarios
quit chan struct{} // termination notifier
}
// peerLock restricts fetcher actions on per peer basis. Currently, used for rate limiting.
type peerLock struct {
sync.Mutex
accessed time.Time
}
// fetchRequestParams holds parameters necessary to schedule a fetch request.
type fetchRequestParams struct {
ctx context.Context // if provided, it is used instead of global fetcher's context
start types.Slot // starting slot
count uint64 // how many slots to receive (fetcher may return fewer slots)
}
// fetchRequestResponse is a combined type to hold results of both successful executions and errors.
// Valid usage pattern will be to check whether result's `err` is nil, before using `blocks`.
type fetchRequestResponse struct {
pid peer.ID
start types.Slot
count uint64
blocks []block.SignedBeaconBlock
err error
}
// newBlocksFetcher creates ready to use fetcher.
func newBlocksFetcher(ctx context.Context, cfg *blocksFetcherConfig) *blocksFetcher {
blocksPerSecond := flags.Get().BlockBatchLimit
allowedBlocksBurst := flags.Get().BlockBatchLimitBurstFactor * flags.Get().BlockBatchLimit
// Allow fetcher to go almost to the full burst capacity (less a single batch).
rateLimiter := leakybucket.NewCollector(
float64(blocksPerSecond), int64(allowedBlocksBurst-blocksPerSecond),
false /* deleteEmptyBuckets */)
capacityWeight := cfg.peerFilterCapacityWeight
if capacityWeight >= 1 {
capacityWeight = peerFilterCapacityWeight
}
ctx, cancel := context.WithCancel(ctx)
return &blocksFetcher{
ctx: ctx,
cancel: cancel,
rand: rand.NewGenerator(),
chain: cfg.chain,
p2p: cfg.p2p,
db: cfg.db,
blocksPerSecond: uint64(blocksPerSecond),
rateLimiter: rateLimiter,
peerLocks: make(map[peer.ID]*peerLock),
fetchRequests: make(chan *fetchRequestParams, maxPendingRequests),
fetchResponses: make(chan *fetchRequestResponse, maxPendingRequests),
capacityWeight: capacityWeight,
mode: cfg.mode,
quit: make(chan struct{}),
}
}
// start boots up the fetcher, which starts listening for incoming fetch requests.
func (f *blocksFetcher) start() error {
select {
case <-f.ctx.Done():
return errFetcherCtxIsDone
default:
go f.loop()
return nil
}
}
// stop terminates all fetcher operations.
func (f *blocksFetcher) stop() {
defer func() {
if f.rateLimiter != nil {
f.rateLimiter.Free()
f.rateLimiter = nil
}
}()
f.cancel()
<-f.quit // make sure that loop() is done
}
// requestResponses exposes a channel into which fetcher pushes generated request responses.
func (f *blocksFetcher) requestResponses() <-chan *fetchRequestResponse {
return f.fetchResponses
}
// loop is a main fetcher loop, listens for incoming requests/cancellations, forwards outgoing responses.
func (f *blocksFetcher) loop() {
defer close(f.quit)
// Wait for all loop's goroutines to finish, and safely release resources.
wg := &sync.WaitGroup{}
defer func() {
wg.Wait()
close(f.fetchResponses)
}()
// Periodically remove stale peer locks.
go func() {
ticker := time.NewTicker(peerLocksPollingInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
f.removeStalePeerLocks(peerLockMaxAge)
case <-f.ctx.Done():
return
}
}
}()
// Main loop.
for {
// Make sure there is are available peers before processing requests.
if _, err := f.waitForMinimumPeers(f.ctx); err != nil {
log.Error(err)
}
select {
case <-f.ctx.Done():
log.Debug("Context closed, exiting goroutine (blocks fetcher)")
return
case req := <-f.fetchRequests:
wg.Add(1)
go func() {
defer wg.Done()
select {
case <-f.ctx.Done():
case f.fetchResponses <- f.handleRequest(req.ctx, req.start, req.count):
}
}()
}
}
}
// scheduleRequest adds request to incoming queue.
func (f *blocksFetcher) scheduleRequest(ctx context.Context, start types.Slot, count uint64) error {
if ctx.Err() != nil {
return ctx.Err()
}
request := &fetchRequestParams{
ctx: ctx,
start: start,
count: count,
}
select {
case <-f.ctx.Done():
return errFetcherCtxIsDone
case f.fetchRequests <- request:
}
return nil
}
// handleRequest parses fetch request and forwards it to response builder.
func (f *blocksFetcher) handleRequest(ctx context.Context, start types.Slot, count uint64) *fetchRequestResponse {
ctx, span := trace.StartSpan(ctx, "initialsync.handleRequest")
defer span.End()
response := &fetchRequestResponse{
start: start,
count: count,
blocks: []block.SignedBeaconBlock{},
err: nil,
}
if ctx.Err() != nil {
response.err = ctx.Err()
return response
}
_, targetEpoch, peers := f.calculateHeadAndTargetEpochs()
if len(peers) == 0 {
response.err = errNoPeersAvailable
return response
}
// Short circuit start far exceeding the highest finalized epoch in some infinite loop.
if f.mode == modeStopOnFinalizedEpoch {
highestFinalizedSlot := params.BeaconConfig().SlotsPerEpoch.Mul(uint64(targetEpoch + 1))
if start > highestFinalizedSlot {
response.err = fmt.Errorf("%w, slot: %d, highest finalized slot: %d",
errSlotIsTooHigh, start, highestFinalizedSlot)
return response
}
}
response.blocks, response.pid, response.err = f.fetchBlocksFromPeer(ctx, start, count, peers)
return response
}
// fetchBlocksFromPeer fetches blocks from a single randomly selected peer.
func (f *blocksFetcher) fetchBlocksFromPeer(
ctx context.Context,
start types.Slot, count uint64,
peers []peer.ID,
) ([]block.SignedBeaconBlock, peer.ID, error) {
ctx, span := trace.StartSpan(ctx, "initialsync.fetchBlocksFromPeer")
defer span.End()
peers = f.filterPeers(ctx, peers, peersPercentagePerRequest)
req := &p2ppb.BeaconBlocksByRangeRequest{
StartSlot: start,
Count: count,
Step: 1,
}
for i := 0; i < len(peers); i++ {
if blocks, err := f.requestBlocks(ctx, req, peers[i]); err == nil {
f.p2p.Peers().Scorers().BlockProviderScorer().Touch(peers[i])
return blocks, peers[i], err
}
}
return nil, "", errNoPeersAvailable
}
// requestBlocks is a wrapper for handling BeaconBlocksByRangeRequest requests/streams.
func (f *blocksFetcher) requestBlocks(
ctx context.Context,
req *p2ppb.BeaconBlocksByRangeRequest,
pid peer.ID,
) ([]block.SignedBeaconBlock, error) {
if ctx.Err() != nil {
return nil, ctx.Err()
}
l := f.peerLock(pid)
l.Lock()
log.WithFields(logrus.Fields{
"peer": pid,
"start": req.StartSlot,
"count": req.Count,
"step": req.Step,
"capacity": f.rateLimiter.Remaining(pid.String()),
"score": f.p2p.Peers().Scorers().BlockProviderScorer().FormatScorePretty(pid),
}).Debug("Requesting blocks")
if f.rateLimiter.Remaining(pid.String()) < int64(req.Count) {
if err := f.waitForBandwidth(pid); err != nil {
return nil, err
}
}
f.rateLimiter.Add(pid.String(), int64(req.Count))
l.Unlock()
return prysmsync.SendBeaconBlocksByRangeRequest(ctx, f.chain, f.p2p, pid, req, nil)
}
// requestBlocksByRoot is a wrapper for handling BeaconBlockByRootsReq requests/streams.
func (f *blocksFetcher) requestBlocksByRoot(
ctx context.Context,
req *p2pTypes.BeaconBlockByRootsReq,
pid peer.ID,
) ([]block.SignedBeaconBlock, error) {
if ctx.Err() != nil {
return nil, ctx.Err()
}
l := f.peerLock(pid)
l.Lock()
log.WithFields(logrus.Fields{
"peer": pid,
"numRoots": len(*req),
"capacity": f.rateLimiter.Remaining(pid.String()),
"score": f.p2p.Peers().Scorers().BlockProviderScorer().FormatScorePretty(pid),
}).Debug("Requesting blocks (by roots)")
if f.rateLimiter.Remaining(pid.String()) < int64(len(*req)) {
if err := f.waitForBandwidth(pid); err != nil {
return nil, err
}
}
f.rateLimiter.Add(pid.String(), int64(len(*req)))
l.Unlock()
return prysmsync.SendBeaconBlocksByRootRequest(ctx, f.chain, f.p2p, pid, req, nil)
}
// waitForBandwidth blocks up until peer's bandwidth is restored.
func (f *blocksFetcher) waitForBandwidth(pid peer.ID) error {
log.WithField("peer", pid).Debug("Slowing down for rate limit")
timer := time.NewTimer(f.rateLimiter.TillEmpty(pid.String()))
defer timer.Stop()
select {
case <-f.ctx.Done():
return errFetcherCtxIsDone
case <-timer.C:
// Peer has gathered enough capacity to be polled again.
}
return nil
}