mirror of
https://gitlab.com/pulsechaincom/prysm-pulse.git
synced 2025-01-17 07:18:46 +00:00
7a5010ecea
* Add roughtime static code analysis and fix all violations * Merge branch 'master' into enforce-roughtime
518 lines
15 KiB
Go
518 lines
15 KiB
Go
package initialsync
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"math"
|
|
"math/rand"
|
|
"sort"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/kevinms/leakybucket-go"
|
|
"github.com/libp2p/go-libp2p-core/peer"
|
|
"github.com/pkg/errors"
|
|
eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
|
|
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
|
|
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
|
|
"github.com/prysmaticlabs/prysm/beacon-chain/flags"
|
|
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
|
|
prysmsync "github.com/prysmaticlabs/prysm/beacon-chain/sync"
|
|
p2ppb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
|
|
"github.com/prysmaticlabs/prysm/shared/mathutil"
|
|
"github.com/prysmaticlabs/prysm/shared/params"
|
|
"github.com/prysmaticlabs/prysm/shared/roughtime"
|
|
"github.com/sirupsen/logrus"
|
|
"go.opencensus.io/trace"
|
|
)
|
|
|
|
const (
|
|
// maxPendingRequests limits how many concurrent fetch request one can initiate.
|
|
maxPendingRequests = 8
|
|
// peersPercentagePerRequest caps percentage of peers to be used in a request.
|
|
peersPercentagePerRequest = 0.75
|
|
)
|
|
|
|
var (
|
|
errNoPeersAvailable = errors.New("no peers available, waiting for reconnect")
|
|
errFetcherCtxIsDone = errors.New("fetcher's context is done, reinitialize")
|
|
errSlotIsTooHigh = errors.New("slot is higher than the finalized slot")
|
|
)
|
|
|
|
// blocksFetcherConfig is a config to setup the block fetcher.
|
|
type blocksFetcherConfig struct {
|
|
headFetcher blockchain.HeadFetcher
|
|
p2p p2p.P2P
|
|
}
|
|
|
|
// blocksFetcher is a service to fetch chain data from peers.
|
|
// On an incoming requests, requested block range is evenly divided
|
|
// among available peers (for fair network load distribution).
|
|
type blocksFetcher struct {
|
|
sync.Mutex
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
headFetcher blockchain.HeadFetcher
|
|
p2p p2p.P2P
|
|
rateLimiter *leakybucket.Collector
|
|
fetchRequests chan *fetchRequestParams
|
|
fetchResponses chan *fetchRequestResponse
|
|
quit chan struct{} // termination notifier
|
|
}
|
|
|
|
// fetchRequestParams holds parameters necessary to schedule a fetch request.
|
|
type fetchRequestParams struct {
|
|
ctx context.Context // if provided, it is used instead of global fetcher's context
|
|
start uint64 // starting slot
|
|
count uint64 // how many slots to receive (fetcher may return fewer slots)
|
|
}
|
|
|
|
// fetchRequestResponse is a combined type to hold results of both successful executions and errors.
|
|
// Valid usage pattern will be to check whether result's `err` is nil, before using `blocks`.
|
|
type fetchRequestResponse struct {
|
|
start, count uint64
|
|
blocks []*eth.SignedBeaconBlock
|
|
err error
|
|
peers []peer.ID
|
|
}
|
|
|
|
// newBlocksFetcher creates ready to use fetcher.
|
|
func newBlocksFetcher(ctx context.Context, cfg *blocksFetcherConfig) *blocksFetcher {
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
rateLimiter := leakybucket.NewCollector(
|
|
allowedBlocksPerSecond, /* rate */
|
|
allowedBlocksPerSecond, /* capacity */
|
|
false /* deleteEmptyBuckets */)
|
|
|
|
return &blocksFetcher{
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
headFetcher: cfg.headFetcher,
|
|
p2p: cfg.p2p,
|
|
rateLimiter: rateLimiter,
|
|
fetchRequests: make(chan *fetchRequestParams, maxPendingRequests),
|
|
fetchResponses: make(chan *fetchRequestResponse, maxPendingRequests),
|
|
quit: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
// start boots up the fetcher, which starts listening for incoming fetch requests.
|
|
func (f *blocksFetcher) start() error {
|
|
select {
|
|
case <-f.ctx.Done():
|
|
return errFetcherCtxIsDone
|
|
default:
|
|
go f.loop()
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// stop terminates all fetcher operations.
|
|
func (f *blocksFetcher) stop() {
|
|
f.cancel()
|
|
<-f.quit // make sure that loop() is done
|
|
}
|
|
|
|
// requestResponses exposes a channel into which fetcher pushes generated request responses.
|
|
func (f *blocksFetcher) requestResponses() <-chan *fetchRequestResponse {
|
|
return f.fetchResponses
|
|
}
|
|
|
|
// loop is a main fetcher loop, listens for incoming requests/cancellations, forwards outgoing responses.
|
|
func (f *blocksFetcher) loop() {
|
|
defer close(f.quit)
|
|
|
|
// Wait for all loop's goroutines to finish, and safely release resources.
|
|
wg := &sync.WaitGroup{}
|
|
defer func() {
|
|
wg.Wait()
|
|
close(f.fetchResponses)
|
|
}()
|
|
|
|
for {
|
|
// Make sure there is are available peers before processing requests.
|
|
if _, err := f.waitForMinimumPeers(f.ctx); err != nil {
|
|
log.Error(err)
|
|
}
|
|
|
|
select {
|
|
case <-f.ctx.Done():
|
|
log.Debug("Context closed, exiting goroutine (blocks fetcher)")
|
|
return
|
|
case req := <-f.fetchRequests:
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
select {
|
|
case <-f.ctx.Done():
|
|
case f.fetchResponses <- f.handleRequest(req.ctx, req.start, req.count):
|
|
}
|
|
}()
|
|
}
|
|
}
|
|
}
|
|
|
|
// scheduleRequest adds request to incoming queue.
|
|
func (f *blocksFetcher) scheduleRequest(ctx context.Context, start, count uint64) error {
|
|
if ctx.Err() != nil {
|
|
return ctx.Err()
|
|
}
|
|
|
|
request := &fetchRequestParams{
|
|
ctx: ctx,
|
|
start: start,
|
|
count: count,
|
|
}
|
|
select {
|
|
case <-f.ctx.Done():
|
|
return errFetcherCtxIsDone
|
|
case f.fetchRequests <- request:
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// handleRequest parses fetch request and forwards it to response builder.
|
|
func (f *blocksFetcher) handleRequest(ctx context.Context, start, count uint64) *fetchRequestResponse {
|
|
ctx, span := trace.StartSpan(ctx, "initialsync.handleRequest")
|
|
defer span.End()
|
|
|
|
response := &fetchRequestResponse{
|
|
start: start,
|
|
count: count,
|
|
blocks: []*eth.SignedBeaconBlock{},
|
|
err: nil,
|
|
peers: []peer.ID{},
|
|
}
|
|
|
|
if ctx.Err() != nil {
|
|
response.err = ctx.Err()
|
|
return response
|
|
}
|
|
|
|
headEpoch := helpers.SlotToEpoch(f.headFetcher.HeadSlot())
|
|
root, finalizedEpoch, peers := f.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, headEpoch)
|
|
|
|
if len(peers) == 0 {
|
|
response.err = errNoPeersAvailable
|
|
return response
|
|
}
|
|
|
|
// Short circuit start far exceeding the highest finalized epoch in some infinite loop.
|
|
highestFinalizedSlot := helpers.StartSlot(finalizedEpoch + 1)
|
|
if start > highestFinalizedSlot {
|
|
response.err = errSlotIsTooHigh
|
|
return response
|
|
}
|
|
|
|
blocks, err := f.collectPeerResponses(ctx, root, finalizedEpoch, start, 1, count, peers)
|
|
if err != nil {
|
|
response.err = err
|
|
return response
|
|
}
|
|
|
|
response.blocks = blocks
|
|
response.peers = peers
|
|
return response
|
|
}
|
|
|
|
// collectPeerResponses orchestrates block fetching from the available peers.
|
|
// In each request a range of blocks is to be requested from multiple peers.
|
|
// Example:
|
|
// - number of peers = 4
|
|
// - range of block slots is 64...128
|
|
// Four requests will be spread across the peers using step argument to distribute the load
|
|
// i.e. the first peer is asked for block 64, 68, 72... while the second peer is asked for
|
|
// 65, 69, 73... and so on for other peers.
|
|
func (f *blocksFetcher) collectPeerResponses(
|
|
ctx context.Context,
|
|
root []byte,
|
|
finalizedEpoch, start, step, count uint64,
|
|
peers []peer.ID,
|
|
) ([]*eth.SignedBeaconBlock, error) {
|
|
ctx, span := trace.StartSpan(ctx, "initialsync.collectPeerResponses")
|
|
defer span.End()
|
|
|
|
if ctx.Err() != nil {
|
|
return nil, ctx.Err()
|
|
}
|
|
|
|
peers = f.selectPeers(peers)
|
|
if len(peers) == 0 {
|
|
return nil, errNoPeersAvailable
|
|
}
|
|
|
|
p2pRequests := new(sync.WaitGroup)
|
|
errChan := make(chan error)
|
|
blocksChan := make(chan []*eth.SignedBeaconBlock)
|
|
|
|
p2pRequests.Add(len(peers))
|
|
go func() {
|
|
p2pRequests.Wait()
|
|
close(blocksChan)
|
|
}()
|
|
|
|
// Short circuit start far exceeding the highest finalized epoch in some infinite loop.
|
|
highestFinalizedSlot := helpers.StartSlot(finalizedEpoch + 1)
|
|
if start > highestFinalizedSlot {
|
|
return nil, errSlotIsTooHigh
|
|
}
|
|
|
|
// Spread load evenly among available peers.
|
|
perPeerCount := mathutil.Min(count/uint64(len(peers)), allowedBlocksPerSecond)
|
|
remainder := int(count % uint64(len(peers)))
|
|
for i, pid := range peers {
|
|
start, step := start+uint64(i)*step, step*uint64(len(peers))
|
|
|
|
// If the count was divided by an odd number of peers, there will be some blocks
|
|
// missing from the first requests so we accommodate that scenario.
|
|
count := perPeerCount
|
|
if i < remainder {
|
|
count++
|
|
}
|
|
// Asking for no blocks may cause the client to hang.
|
|
if count == 0 {
|
|
p2pRequests.Done()
|
|
continue
|
|
}
|
|
|
|
go func(ctx context.Context, pid peer.ID) {
|
|
defer p2pRequests.Done()
|
|
|
|
blocks, err := f.requestBeaconBlocksByRange(ctx, pid, root, start, step, count)
|
|
if err != nil {
|
|
select {
|
|
case <-ctx.Done():
|
|
case errChan <- err:
|
|
return
|
|
}
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
case blocksChan <- blocks:
|
|
}
|
|
}(ctx, pid)
|
|
}
|
|
|
|
var unionRespBlocks []*eth.SignedBeaconBlock
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
case err := <-errChan:
|
|
return nil, err
|
|
case resp, ok := <-blocksChan:
|
|
if ok {
|
|
unionRespBlocks = append(unionRespBlocks, resp...)
|
|
} else {
|
|
sort.Slice(unionRespBlocks, func(i, j int) bool {
|
|
return unionRespBlocks[i].Block.Slot < unionRespBlocks[j].Block.Slot
|
|
})
|
|
return unionRespBlocks, nil
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// requestBeaconBlocksByRange prepares BeaconBlocksByRange request, and handles possible stale peers
|
|
// (by resending the request).
|
|
func (f *blocksFetcher) requestBeaconBlocksByRange(
|
|
ctx context.Context,
|
|
pid peer.ID,
|
|
root []byte,
|
|
start, step, count uint64,
|
|
) ([]*eth.SignedBeaconBlock, error) {
|
|
if ctx.Err() != nil {
|
|
return nil, ctx.Err()
|
|
}
|
|
|
|
req := &p2ppb.BeaconBlocksByRangeRequest{
|
|
HeadBlockRoot: root,
|
|
StartSlot: start,
|
|
Count: count,
|
|
Step: step,
|
|
}
|
|
|
|
resp, respErr := f.requestBlocks(ctx, req, pid)
|
|
if respErr != nil {
|
|
// Fail over to some other, randomly selected, peer.
|
|
headEpoch := helpers.SlotToEpoch(f.headFetcher.HeadSlot())
|
|
root1, _, peers := f.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, headEpoch)
|
|
if bytes.Compare(root, root1) != 0 {
|
|
return nil, errors.Errorf("can not resend, root mismatch: %x:%x", root, root1)
|
|
}
|
|
newPID, err := selectFailOverPeer(pid, peers)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
log.WithError(respErr).WithFields(logrus.Fields{
|
|
"numPeers": len(peers),
|
|
"failedPeer": pid.Pretty(),
|
|
"newPeer": newPID.Pretty(),
|
|
}).Debug("Request failed, trying to forward request to another peer")
|
|
|
|
return f.requestBeaconBlocksByRange(ctx, newPID, root, start, step, count)
|
|
}
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
// requestBlocks is a wrapper for handling BeaconBlocksByRangeRequest requests/streams.
|
|
func (f *blocksFetcher) requestBlocks(
|
|
ctx context.Context,
|
|
req *p2ppb.BeaconBlocksByRangeRequest,
|
|
pid peer.ID,
|
|
) ([]*eth.SignedBeaconBlock, error) {
|
|
f.Lock()
|
|
if f.rateLimiter.Remaining(pid.String()) < int64(req.Count) {
|
|
log.WithField("peer", pid).Debug("Slowing down for rate limit")
|
|
time.Sleep(f.rateLimiter.TillEmpty(pid.String()))
|
|
}
|
|
f.rateLimiter.Add(pid.String(), int64(req.Count))
|
|
log.WithFields(logrus.Fields{
|
|
"peer": pid,
|
|
"start": req.StartSlot,
|
|
"count": req.Count,
|
|
"step": req.Step,
|
|
"head": fmt.Sprintf("%#x", req.HeadBlockRoot),
|
|
}).Debug("Requesting blocks")
|
|
f.Unlock()
|
|
stream, err := f.p2p.Send(ctx, req, pid)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer stream.Close()
|
|
|
|
resp := make([]*eth.SignedBeaconBlock, 0, req.Count)
|
|
for {
|
|
blk, err := prysmsync.ReadChunkedBlock(stream, f.p2p)
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
resp = append(resp, blk)
|
|
}
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
// selectFailOverPeer randomly selects fail over peer from the list of available peers.
|
|
func selectFailOverPeer(excludedPID peer.ID, peers []peer.ID) (peer.ID, error) {
|
|
for i, pid := range peers {
|
|
if pid == excludedPID {
|
|
peers = append(peers[:i], peers[i+1:]...)
|
|
break
|
|
}
|
|
}
|
|
|
|
if len(peers) == 0 {
|
|
return "", errNoPeersAvailable
|
|
}
|
|
|
|
randGenerator := rand.New(rand.NewSource(roughtime.Now().Unix()))
|
|
randGenerator.Shuffle(len(peers), func(i, j int) {
|
|
peers[i], peers[j] = peers[j], peers[i]
|
|
})
|
|
|
|
return peers[0], nil
|
|
}
|
|
|
|
// waitForMinimumPeers spins and waits up until enough peers are available.
|
|
func (f *blocksFetcher) waitForMinimumPeers(ctx context.Context) ([]peer.ID, error) {
|
|
required := params.BeaconConfig().MaxPeersToSync
|
|
if flags.Get().MinimumSyncPeers < required {
|
|
required = flags.Get().MinimumSyncPeers
|
|
}
|
|
for {
|
|
if ctx.Err() != nil {
|
|
return nil, ctx.Err()
|
|
}
|
|
headEpoch := helpers.SlotToEpoch(f.headFetcher.HeadSlot())
|
|
_, _, peers := f.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, headEpoch)
|
|
if len(peers) >= required {
|
|
return peers, nil
|
|
}
|
|
log.WithFields(logrus.Fields{
|
|
"suitable": len(peers),
|
|
"required": required}).Info("Waiting for enough suitable peers before syncing")
|
|
time.Sleep(handshakePollingInterval)
|
|
}
|
|
}
|
|
|
|
// selectPeers returns transformed list of peers (randomized, constrained if necessary).
|
|
func (f *blocksFetcher) selectPeers(peers []peer.ID) []peer.ID {
|
|
if len(peers) == 0 {
|
|
return peers
|
|
}
|
|
|
|
// Shuffle peers to prevent a bad peer from
|
|
// stalling sync with invalid blocks.
|
|
randGenerator := rand.New(rand.NewSource(roughtime.Now().Unix()))
|
|
randGenerator.Shuffle(len(peers), func(i, j int) {
|
|
peers[i], peers[j] = peers[j], peers[i]
|
|
})
|
|
|
|
required := params.BeaconConfig().MaxPeersToSync
|
|
if flags.Get().MinimumSyncPeers < required {
|
|
required = flags.Get().MinimumSyncPeers
|
|
}
|
|
|
|
limit := uint64(math.Round(float64(len(peers)) * peersPercentagePerRequest))
|
|
limit = mathutil.Max(limit, uint64(required))
|
|
limit = mathutil.Min(limit, uint64(len(peers)))
|
|
return peers[:limit]
|
|
}
|
|
|
|
// nonSkippedSlotAfter checks slots after the given one in an attempt to find non-empty future slot.
|
|
func (f *blocksFetcher) nonSkippedSlotAfter(ctx context.Context, slot uint64) (uint64, error) {
|
|
headEpoch := helpers.SlotToEpoch(f.headFetcher.HeadSlot())
|
|
root, epoch, peers := f.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, headEpoch)
|
|
if len(peers) == 0 {
|
|
return 0, errNoPeersAvailable
|
|
}
|
|
|
|
randGenerator := rand.New(rand.NewSource(roughtime.Now().Unix()))
|
|
nextPID := func() peer.ID {
|
|
randGenerator.Shuffle(len(peers), func(i, j int) {
|
|
peers[i], peers[j] = peers[j], peers[i]
|
|
})
|
|
return peers[0]
|
|
}
|
|
|
|
for slot <= helpers.StartSlot(epoch+1) {
|
|
req := &p2ppb.BeaconBlocksByRangeRequest{
|
|
HeadBlockRoot: root,
|
|
StartSlot: slot + 1,
|
|
Count: blockBatchSize,
|
|
Step: 1,
|
|
}
|
|
|
|
blocks, err := f.requestBlocks(ctx, req, nextPID())
|
|
if err != nil {
|
|
return slot, err
|
|
}
|
|
|
|
if len(blocks) > 0 {
|
|
slots := make([]uint64, len(blocks))
|
|
for i, block := range blocks {
|
|
slots[i] = block.Block.Slot
|
|
}
|
|
return blocks[0].Block.Slot, nil
|
|
}
|
|
slot += blockBatchSize
|
|
}
|
|
|
|
return slot, nil
|
|
}
|
|
|
|
// bestFinalizedSlot returns the highest finalized slot of the majority of connected peers.
|
|
func (f *blocksFetcher) bestFinalizedSlot() uint64 {
|
|
headEpoch := helpers.SlotToEpoch(f.headFetcher.HeadSlot())
|
|
_, finalizedEpoch, _ := f.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, headEpoch)
|
|
return helpers.StartSlot(finalizedEpoch)
|
|
}
|