mirror of
https://gitlab.com/pulsechaincom/prysm-pulse.git
synced 2024-12-25 04:47:18 +00:00
196f4c6222
* refactors failover peer selection in init-sync * Merge refs/heads/master into init-sync-more-effective-failover
546 lines
16 KiB
Go
546 lines
16 KiB
Go
package initialsync
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"math"
|
|
"sort"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/kevinms/leakybucket-go"
|
|
"github.com/libp2p/go-libp2p-core/peer"
|
|
"github.com/pkg/errors"
|
|
eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
|
|
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
|
|
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
|
|
"github.com/prysmaticlabs/prysm/beacon-chain/flags"
|
|
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
|
|
prysmsync "github.com/prysmaticlabs/prysm/beacon-chain/sync"
|
|
p2ppb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
|
|
"github.com/prysmaticlabs/prysm/shared/mathutil"
|
|
"github.com/prysmaticlabs/prysm/shared/params"
|
|
"github.com/prysmaticlabs/prysm/shared/rand"
|
|
"github.com/prysmaticlabs/prysm/shared/roughtime"
|
|
"github.com/sirupsen/logrus"
|
|
"go.opencensus.io/trace"
|
|
)
|
|
|
|
const (
|
|
// maxPendingRequests limits how many concurrent fetch request one can initiate.
|
|
maxPendingRequests = 64
|
|
// peersPercentagePerRequest caps percentage of peers to be used in a request.
|
|
peersPercentagePerRequest = 0.75
|
|
// handshakePollingInterval is a polling interval for checking the number of received handshakes.
|
|
handshakePollingInterval = 5 * time.Second
|
|
// peerLocksPollingInterval is a polling interval for checking if there are stale peer locks.
|
|
peerLocksPollingInterval = 5 * time.Minute
|
|
// peerLockMaxAge is maximum time before stale lock is purged.
|
|
peerLockMaxAge = 60 * time.Minute
|
|
// nonSkippedSlotsFullSearchEpochs how many epochs to check in full, before resorting to random
|
|
// sampling of slots once per epoch
|
|
nonSkippedSlotsFullSearchEpochs = 10
|
|
)
|
|
|
|
var (
|
|
errNoPeersAvailable = errors.New("no peers available, waiting for reconnect")
|
|
errFetcherCtxIsDone = errors.New("fetcher's context is done, reinitialize")
|
|
errSlotIsTooHigh = errors.New("slot is higher than the finalized slot")
|
|
)
|
|
|
|
// blocksFetcherConfig is a config to setup the block fetcher.
|
|
type blocksFetcherConfig struct {
|
|
headFetcher blockchain.HeadFetcher
|
|
p2p p2p.P2P
|
|
}
|
|
|
|
// blocksFetcher is a service to fetch chain data from peers.
|
|
// On an incoming requests, requested block range is evenly divided
|
|
// among available peers (for fair network load distribution).
|
|
type blocksFetcher struct {
|
|
sync.Mutex
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
rand *rand.Rand
|
|
headFetcher blockchain.HeadFetcher
|
|
p2p p2p.P2P
|
|
blocksPerSecond uint64
|
|
rateLimiter *leakybucket.Collector
|
|
peerLocks map[peer.ID]*peerLock
|
|
fetchRequests chan *fetchRequestParams
|
|
fetchResponses chan *fetchRequestResponse
|
|
quit chan struct{} // termination notifier
|
|
}
|
|
|
|
// peerLock restricts fetcher actions on per peer basis. Currently, used for rate limiting.
|
|
type peerLock struct {
|
|
sync.Mutex
|
|
accessed time.Time
|
|
}
|
|
|
|
// fetchRequestParams holds parameters necessary to schedule a fetch request.
|
|
type fetchRequestParams struct {
|
|
ctx context.Context // if provided, it is used instead of global fetcher's context
|
|
start uint64 // starting slot
|
|
count uint64 // how many slots to receive (fetcher may return fewer slots)
|
|
}
|
|
|
|
// fetchRequestResponse is a combined type to hold results of both successful executions and errors.
|
|
// Valid usage pattern will be to check whether result's `err` is nil, before using `blocks`.
|
|
type fetchRequestResponse struct {
|
|
start, count uint64
|
|
blocks []*eth.SignedBeaconBlock
|
|
err error
|
|
}
|
|
|
|
// newBlocksFetcher creates ready to use fetcher.
|
|
func newBlocksFetcher(ctx context.Context, cfg *blocksFetcherConfig) *blocksFetcher {
|
|
blocksPerSecond := flags.Get().BlockBatchLimit
|
|
allowedBlocksBurst := flags.Get().BlockBatchLimitBurstFactor * flags.Get().BlockBatchLimit
|
|
// Allow fetcher to go almost to the full burst capacity (less a single batch).
|
|
rateLimiter := leakybucket.NewCollector(
|
|
float64(blocksPerSecond), int64(allowedBlocksBurst-blocksPerSecond),
|
|
false /* deleteEmptyBuckets */)
|
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
return &blocksFetcher{
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
rand: rand.NewGenerator(),
|
|
headFetcher: cfg.headFetcher,
|
|
p2p: cfg.p2p,
|
|
blocksPerSecond: uint64(blocksPerSecond),
|
|
rateLimiter: rateLimiter,
|
|
peerLocks: make(map[peer.ID]*peerLock),
|
|
fetchRequests: make(chan *fetchRequestParams, maxPendingRequests),
|
|
fetchResponses: make(chan *fetchRequestResponse, maxPendingRequests),
|
|
quit: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
// start boots up the fetcher, which starts listening for incoming fetch requests.
|
|
func (f *blocksFetcher) start() error {
|
|
select {
|
|
case <-f.ctx.Done():
|
|
return errFetcherCtxIsDone
|
|
default:
|
|
go f.loop()
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// stop terminates all fetcher operations.
|
|
func (f *blocksFetcher) stop() {
|
|
defer func() {
|
|
if f.rateLimiter != nil {
|
|
f.rateLimiter.Free()
|
|
f.rateLimiter = nil
|
|
}
|
|
}()
|
|
f.cancel()
|
|
<-f.quit // make sure that loop() is done
|
|
}
|
|
|
|
// requestResponses exposes a channel into which fetcher pushes generated request responses.
|
|
func (f *blocksFetcher) requestResponses() <-chan *fetchRequestResponse {
|
|
return f.fetchResponses
|
|
}
|
|
|
|
// loop is a main fetcher loop, listens for incoming requests/cancellations, forwards outgoing responses.
|
|
func (f *blocksFetcher) loop() {
|
|
defer close(f.quit)
|
|
|
|
// Wait for all loop's goroutines to finish, and safely release resources.
|
|
wg := &sync.WaitGroup{}
|
|
defer func() {
|
|
wg.Wait()
|
|
close(f.fetchResponses)
|
|
}()
|
|
|
|
// Periodically remove stale peer locks.
|
|
go func() {
|
|
ticker := time.NewTicker(peerLocksPollingInterval)
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
f.removeStalePeerLocks(peerLockMaxAge)
|
|
case <-f.ctx.Done():
|
|
ticker.Stop()
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
// Main loop.
|
|
for {
|
|
// Make sure there is are available peers before processing requests.
|
|
if _, err := f.waitForMinimumPeers(f.ctx); err != nil {
|
|
log.Error(err)
|
|
}
|
|
|
|
select {
|
|
case <-f.ctx.Done():
|
|
log.Debug("Context closed, exiting goroutine (blocks fetcher)")
|
|
return
|
|
case req := <-f.fetchRequests:
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
select {
|
|
case <-f.ctx.Done():
|
|
case f.fetchResponses <- f.handleRequest(req.ctx, req.start, req.count):
|
|
}
|
|
}()
|
|
}
|
|
}
|
|
}
|
|
|
|
// scheduleRequest adds request to incoming queue.
|
|
func (f *blocksFetcher) scheduleRequest(ctx context.Context, start, count uint64) error {
|
|
if ctx.Err() != nil {
|
|
return ctx.Err()
|
|
}
|
|
|
|
request := &fetchRequestParams{
|
|
ctx: ctx,
|
|
start: start,
|
|
count: count,
|
|
}
|
|
select {
|
|
case <-f.ctx.Done():
|
|
return errFetcherCtxIsDone
|
|
case f.fetchRequests <- request:
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// handleRequest parses fetch request and forwards it to response builder.
|
|
func (f *blocksFetcher) handleRequest(ctx context.Context, start, count uint64) *fetchRequestResponse {
|
|
ctx, span := trace.StartSpan(ctx, "initialsync.handleRequest")
|
|
defer span.End()
|
|
|
|
response := &fetchRequestResponse{
|
|
start: start,
|
|
count: count,
|
|
blocks: []*eth.SignedBeaconBlock{},
|
|
err: nil,
|
|
}
|
|
|
|
if ctx.Err() != nil {
|
|
response.err = ctx.Err()
|
|
return response
|
|
}
|
|
|
|
headEpoch := helpers.SlotToEpoch(f.headFetcher.HeadSlot())
|
|
finalizedEpoch, peers := f.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, headEpoch)
|
|
if len(peers) == 0 {
|
|
response.err = errNoPeersAvailable
|
|
return response
|
|
}
|
|
|
|
// Short circuit start far exceeding the highest finalized epoch in some infinite loop.
|
|
highestFinalizedSlot := helpers.StartSlot(finalizedEpoch + 1)
|
|
if start > highestFinalizedSlot {
|
|
response.err = fmt.Errorf("%v, slot: %d, higest finilized slot: %d",
|
|
errSlotIsTooHigh, start, highestFinalizedSlot)
|
|
return response
|
|
}
|
|
|
|
response.blocks, response.err = f.fetchBlocksFromPeer(ctx, start, count, peers)
|
|
return response
|
|
}
|
|
|
|
// fetchBlocksFromPeer fetches blocks from a single randomly selected peer.
|
|
func (f *blocksFetcher) fetchBlocksFromPeer(
|
|
ctx context.Context,
|
|
start, count uint64,
|
|
peers []peer.ID,
|
|
) ([]*eth.SignedBeaconBlock, error) {
|
|
ctx, span := trace.StartSpan(ctx, "initialsync.fetchBlocksFromPeer")
|
|
defer span.End()
|
|
|
|
blocks := []*eth.SignedBeaconBlock{}
|
|
var err error
|
|
peers, err = f.filterPeers(peers, peersPercentagePerRequest)
|
|
if err != nil {
|
|
return blocks, err
|
|
}
|
|
if len(peers) == 0 {
|
|
return blocks, errNoPeersAvailable
|
|
}
|
|
req := &p2ppb.BeaconBlocksByRangeRequest{
|
|
StartSlot: start,
|
|
Count: count,
|
|
Step: 1,
|
|
}
|
|
for i := 0; i < len(peers); i++ {
|
|
if blocks, err = f.requestBlocks(ctx, req, peers[i]); err == nil {
|
|
return blocks, err
|
|
}
|
|
}
|
|
return blocks, nil
|
|
}
|
|
|
|
// requestBlocks is a wrapper for handling BeaconBlocksByRangeRequest requests/streams.
|
|
func (f *blocksFetcher) requestBlocks(
|
|
ctx context.Context,
|
|
req *p2ppb.BeaconBlocksByRangeRequest,
|
|
pid peer.ID,
|
|
) ([]*eth.SignedBeaconBlock, error) {
|
|
if ctx.Err() != nil {
|
|
return nil, ctx.Err()
|
|
}
|
|
l := f.getPeerLock(pid)
|
|
if l == nil {
|
|
return nil, errors.New("cannot obtain lock")
|
|
}
|
|
l.Lock()
|
|
log.WithFields(logrus.Fields{
|
|
"peer": pid,
|
|
"start": req.StartSlot,
|
|
"count": req.Count,
|
|
"step": req.Step,
|
|
"capacity": f.rateLimiter.Remaining(pid.String()),
|
|
}).Debug("Requesting blocks")
|
|
if f.rateLimiter.Remaining(pid.String()) < int64(req.Count) {
|
|
log.WithField("peer", pid).Debug("Slowing down for rate limit")
|
|
timer := time.NewTimer(f.rateLimiter.TillEmpty(pid.String()))
|
|
select {
|
|
case <-f.ctx.Done():
|
|
timer.Stop()
|
|
return nil, errFetcherCtxIsDone
|
|
case <-timer.C:
|
|
// Peer has gathered enough capacity to be polled again.
|
|
}
|
|
}
|
|
f.rateLimiter.Add(pid.String(), int64(req.Count))
|
|
l.Unlock()
|
|
stream, err := f.p2p.Send(ctx, req, p2p.RPCBlocksByRangeTopic, pid)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer func() {
|
|
if err := stream.Reset(); err != nil {
|
|
log.WithError(err).Errorf("Failed to close stream with protocol %s", stream.Protocol())
|
|
}
|
|
}()
|
|
|
|
resp := make([]*eth.SignedBeaconBlock, 0, req.Count)
|
|
for i := uint64(0); ; i++ {
|
|
blk, err := prysmsync.ReadChunkedBlock(stream, f.p2p)
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
// exit if more than max request blocks are returned
|
|
if i >= params.BeaconNetworkConfig().MaxRequestBlocks {
|
|
break
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
resp = append(resp, blk)
|
|
}
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
// getPeerLock returns peer lock for a given peer. If lock is not found, it is created.
|
|
func (f *blocksFetcher) getPeerLock(pid peer.ID) *peerLock {
|
|
f.Lock()
|
|
defer f.Unlock()
|
|
if lock, ok := f.peerLocks[pid]; ok {
|
|
lock.accessed = roughtime.Now()
|
|
return lock
|
|
}
|
|
f.peerLocks[pid] = &peerLock{
|
|
Mutex: sync.Mutex{},
|
|
accessed: roughtime.Now(),
|
|
}
|
|
return f.peerLocks[pid]
|
|
}
|
|
|
|
// removeStalePeerLocks is a cleanup procedure which removes stale locks.
|
|
func (f *blocksFetcher) removeStalePeerLocks(age time.Duration) {
|
|
f.Lock()
|
|
defer f.Unlock()
|
|
for peerID, lock := range f.peerLocks {
|
|
if time.Since(lock.accessed) >= age {
|
|
lock.Lock()
|
|
delete(f.peerLocks, peerID)
|
|
lock.Unlock()
|
|
}
|
|
}
|
|
}
|
|
|
|
// selectFailOverPeer randomly selects fail over peer from the list of available peers.
|
|
func (f *blocksFetcher) selectFailOverPeer(excludedPID peer.ID, peers []peer.ID) (peer.ID, error) {
|
|
if len(peers) == 0 {
|
|
return "", errNoPeersAvailable
|
|
}
|
|
if len(peers) == 1 && peers[0] == excludedPID {
|
|
return "", errNoPeersAvailable
|
|
}
|
|
|
|
ind := f.rand.Int() % len(peers)
|
|
if peers[ind] == excludedPID {
|
|
return f.selectFailOverPeer(excludedPID, append(peers[:ind], peers[ind+1:]...))
|
|
}
|
|
return peers[ind], nil
|
|
}
|
|
|
|
// waitForMinimumPeers spins and waits up until enough peers are available.
|
|
func (f *blocksFetcher) waitForMinimumPeers(ctx context.Context) ([]peer.ID, error) {
|
|
required := params.BeaconConfig().MaxPeersToSync
|
|
if flags.Get().MinimumSyncPeers < required {
|
|
required = flags.Get().MinimumSyncPeers
|
|
}
|
|
for {
|
|
if ctx.Err() != nil {
|
|
return nil, ctx.Err()
|
|
}
|
|
headEpoch := helpers.SlotToEpoch(f.headFetcher.HeadSlot())
|
|
_, peers := f.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, headEpoch)
|
|
if len(peers) >= required {
|
|
return peers, nil
|
|
}
|
|
log.WithFields(logrus.Fields{
|
|
"suitable": len(peers),
|
|
"required": required}).Info("Waiting for enough suitable peers before syncing")
|
|
time.Sleep(handshakePollingInterval)
|
|
}
|
|
}
|
|
|
|
// filterPeers returns transformed list of peers,
|
|
// weight ordered or randomized, constrained if necessary.
|
|
func (f *blocksFetcher) filterPeers(peers []peer.ID, peersPercentage float64) ([]peer.ID, error) {
|
|
if len(peers) == 0 {
|
|
return peers, nil
|
|
}
|
|
|
|
// Shuffle peers to prevent a bad peer from
|
|
// stalling sync with invalid blocks.
|
|
f.rand.Shuffle(len(peers), func(i, j int) {
|
|
peers[i], peers[j] = peers[j], peers[i]
|
|
})
|
|
|
|
// Select sub-sample from peers (honoring min-max invariants).
|
|
required := params.BeaconConfig().MaxPeersToSync
|
|
if flags.Get().MinimumSyncPeers < required {
|
|
required = flags.Get().MinimumSyncPeers
|
|
}
|
|
limit := uint64(math.Round(float64(len(peers)) * peersPercentage))
|
|
limit = mathutil.Max(limit, uint64(required))
|
|
limit = mathutil.Min(limit, uint64(len(peers)))
|
|
peers = peers[:limit]
|
|
|
|
// Order peers by remaining capacity, effectively turning in-order
|
|
// round robin peer processing into a weighted one (peers with higher
|
|
// remaining capacity are preferred). Peers with the same capacity
|
|
// are selected at random, since we have already shuffled peers
|
|
// at this point.
|
|
sort.SliceStable(peers, func(i, j int) bool {
|
|
cap1 := f.rateLimiter.Remaining(peers[i].String())
|
|
cap2 := f.rateLimiter.Remaining(peers[j].String())
|
|
return cap1 > cap2
|
|
})
|
|
|
|
return peers, nil
|
|
}
|
|
|
|
// nonSkippedSlotAfter checks slots after the given one in an attempt to find a non-empty future slot.
|
|
// For efficiency only one random slot is checked per epoch, so returned slot might not be the first
|
|
// non-skipped slot. This shouldn't be a problem, as in case of adversary peer, we might get incorrect
|
|
// data anyway, so code that relies on this function must be robust enough to re-request, if no progress
|
|
// is possible with a returned value.
|
|
func (f *blocksFetcher) nonSkippedSlotAfter(ctx context.Context, slot uint64) (uint64, error) {
|
|
ctx, span := trace.StartSpan(ctx, "initialsync.nonSkippedSlotAfter")
|
|
defer span.End()
|
|
|
|
headEpoch := helpers.SlotToEpoch(f.headFetcher.HeadSlot())
|
|
epoch, peers := f.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, headEpoch)
|
|
var err error
|
|
peers, err = f.filterPeers(peers, peersPercentagePerRequest)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
if len(peers) == 0 {
|
|
return 0, errNoPeersAvailable
|
|
}
|
|
|
|
slotsPerEpoch := params.BeaconConfig().SlotsPerEpoch
|
|
pidInd := 0
|
|
|
|
fetch := func(pid peer.ID, start, count, step uint64) (uint64, error) {
|
|
req := &p2ppb.BeaconBlocksByRangeRequest{
|
|
StartSlot: start,
|
|
Count: count,
|
|
Step: step,
|
|
}
|
|
blocks, err := f.requestBlocks(ctx, req, pid)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
if len(blocks) > 0 {
|
|
for _, block := range blocks {
|
|
if block.Block.Slot > slot {
|
|
return block.Block.Slot, nil
|
|
}
|
|
}
|
|
}
|
|
return 0, nil
|
|
}
|
|
|
|
// Start by checking several epochs fully, w/o resorting to random sampling.
|
|
start := slot + 1
|
|
end := start + nonSkippedSlotsFullSearchEpochs*slotsPerEpoch
|
|
for ind := start; ind < end; ind += slotsPerEpoch {
|
|
nextSlot, err := fetch(peers[pidInd%len(peers)], ind, slotsPerEpoch, 1)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
if nextSlot > slot {
|
|
return nextSlot, nil
|
|
}
|
|
pidInd++
|
|
}
|
|
|
|
// Quickly find the close enough epoch where a non-empty slot definitely exists.
|
|
// Only single random slot per epoch is checked - allowing to move forward relatively quickly.
|
|
slot = slot + nonSkippedSlotsFullSearchEpochs*slotsPerEpoch
|
|
upperBoundSlot := helpers.StartSlot(epoch + 1)
|
|
for ind := slot + 1; ind < upperBoundSlot; ind += (slotsPerEpoch * slotsPerEpoch) / 2 {
|
|
start := ind + uint64(f.rand.Intn(int(slotsPerEpoch)))
|
|
nextSlot, err := fetch(peers[pidInd%len(peers)], start, slotsPerEpoch/2, slotsPerEpoch)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
pidInd++
|
|
if nextSlot > slot && upperBoundSlot >= nextSlot {
|
|
upperBoundSlot = nextSlot
|
|
break
|
|
}
|
|
}
|
|
|
|
// Epoch with non-empty slot is located. Check all slots within two nearby epochs.
|
|
if upperBoundSlot > slotsPerEpoch {
|
|
upperBoundSlot -= slotsPerEpoch
|
|
}
|
|
upperBoundSlot = helpers.StartSlot(helpers.SlotToEpoch(upperBoundSlot))
|
|
nextSlot, err := fetch(peers[pidInd%len(peers)], upperBoundSlot, slotsPerEpoch*2, 1)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
if nextSlot < slot || helpers.StartSlot(epoch+1) < nextSlot {
|
|
return 0, errors.New("invalid range for non-skipped slot")
|
|
}
|
|
return nextSlot, nil
|
|
}
|
|
|
|
// bestFinalizedSlot returns the highest finalized slot of the majority of connected peers.
|
|
func (f *blocksFetcher) bestFinalizedSlot() uint64 {
|
|
headEpoch := helpers.SlotToEpoch(f.headFetcher.HeadSlot())
|
|
finalizedEpoch, _ := f.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, headEpoch)
|
|
return helpers.StartSlot(finalizedEpoch)
|
|
}
|