mirror of
https://gitlab.com/pulsechaincom/prysm-pulse.git
synced 2025-01-12 04:30:04 +00:00
790bf03123
* use uints instead of ints * fix method * fix * fix * builds * deepsource * deep source
138 lines
4.9 KiB
Go
138 lines
4.9 KiB
Go
package initialsync
|
|
|
|
import (
|
|
"context"
|
|
"math"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/libp2p/go-libp2p-core/peer"
|
|
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/scorers"
|
|
"github.com/prysmaticlabs/prysm/cmd/beacon-chain/flags"
|
|
"github.com/prysmaticlabs/prysm/config/params"
|
|
mathutil "github.com/prysmaticlabs/prysm/math"
|
|
prysmTime "github.com/prysmaticlabs/prysm/time"
|
|
"github.com/prysmaticlabs/prysm/time/slots"
|
|
"github.com/sirupsen/logrus"
|
|
"go.opencensus.io/trace"
|
|
)
|
|
|
|
// peerLock returns peer lock for a given peer. If lock is not found, it is created.
|
|
func (f *blocksFetcher) peerLock(pid peer.ID) *peerLock {
|
|
f.Lock()
|
|
defer f.Unlock()
|
|
if lock, ok := f.peerLocks[pid]; ok && lock != nil {
|
|
lock.accessed = prysmTime.Now()
|
|
return lock
|
|
}
|
|
f.peerLocks[pid] = &peerLock{
|
|
Mutex: sync.Mutex{},
|
|
accessed: prysmTime.Now(),
|
|
}
|
|
return f.peerLocks[pid]
|
|
}
|
|
|
|
// removeStalePeerLocks is a cleanup procedure which removes stale locks.
|
|
func (f *blocksFetcher) removeStalePeerLocks(age time.Duration) {
|
|
f.Lock()
|
|
defer f.Unlock()
|
|
for peerID, lock := range f.peerLocks {
|
|
if time.Since(lock.accessed) >= age {
|
|
lock.Lock()
|
|
delete(f.peerLocks, peerID)
|
|
lock.Unlock()
|
|
}
|
|
}
|
|
}
|
|
|
|
// selectFailOverPeer randomly selects fail over peer from the list of available peers.
|
|
func (f *blocksFetcher) selectFailOverPeer(excludedPID peer.ID, peers []peer.ID) (peer.ID, error) {
|
|
if len(peers) == 0 {
|
|
return "", errNoPeersAvailable
|
|
}
|
|
if len(peers) == 1 && peers[0] == excludedPID {
|
|
return "", errNoPeersAvailable
|
|
}
|
|
|
|
ind := f.rand.Int() % len(peers)
|
|
if peers[ind] == excludedPID {
|
|
return f.selectFailOverPeer(excludedPID, append(peers[:ind], peers[ind+1:]...))
|
|
}
|
|
return peers[ind], nil
|
|
}
|
|
|
|
// waitForMinimumPeers spins and waits up until enough peers are available.
|
|
func (f *blocksFetcher) waitForMinimumPeers(ctx context.Context) ([]peer.ID, error) {
|
|
required := uint64(params.BeaconConfig().MaxPeersToSync)
|
|
if flags.Get().MinimumSyncPeers < required {
|
|
required = flags.Get().MinimumSyncPeers
|
|
}
|
|
for {
|
|
if ctx.Err() != nil {
|
|
return nil, ctx.Err()
|
|
}
|
|
var peers []peer.ID
|
|
if f.mode == modeStopOnFinalizedEpoch {
|
|
headEpoch := f.chain.FinalizedCheckpt().Epoch
|
|
_, peers = f.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, headEpoch)
|
|
} else {
|
|
headEpoch := slots.ToEpoch(f.chain.HeadSlot())
|
|
_, peers = f.p2p.Peers().BestNonFinalized(flags.Get().MinimumSyncPeers, headEpoch)
|
|
}
|
|
if uint64(len(peers)) >= required {
|
|
return peers, nil
|
|
}
|
|
log.WithFields(logrus.Fields{
|
|
"suitable": len(peers),
|
|
"required": required}).Info("Waiting for enough suitable peers before syncing")
|
|
time.Sleep(handshakePollingInterval)
|
|
}
|
|
}
|
|
|
|
// filterPeers returns transformed list of peers, weight sorted by scores and capacity remaining.
|
|
// List can be further constrained using peersPercentage, where only percentage of peers are returned.
|
|
func (f *blocksFetcher) filterPeers(ctx context.Context, peers []peer.ID, peersPercentage float64) []peer.ID {
|
|
ctx, span := trace.StartSpan(ctx, "initialsync.filterPeers")
|
|
defer span.End()
|
|
|
|
if len(peers) == 0 {
|
|
return peers
|
|
}
|
|
|
|
// Sort peers using both block provider score and, custom, capacity based score (see
|
|
// peerFilterCapacityWeight if you want to give different weights to provider's and capacity
|
|
// scores).
|
|
// Scores produced are used as weights, so peers are ordered probabilistically i.e. peer with
|
|
// a higher score has higher chance to end up higher in the list.
|
|
scorer := f.p2p.Peers().Scorers().BlockProviderScorer()
|
|
peers = scorer.WeightSorted(f.rand, peers, func(peerID peer.ID, blockProviderScore float64) float64 {
|
|
remaining, capacity := float64(f.rateLimiter.Remaining(peerID.String())), float64(f.rateLimiter.Capacity())
|
|
// When capacity is close to exhaustion, allow less performant peer to take a chance.
|
|
// Otherwise, there's a good chance system will be forced to wait for rate limiter.
|
|
if remaining < float64(f.blocksPerSecond) {
|
|
return 0.0
|
|
}
|
|
capScore := remaining / capacity
|
|
overallScore := blockProviderScore*(1.0-f.capacityWeight) + capScore*f.capacityWeight
|
|
return math.Round(overallScore*scorers.ScoreRoundingFactor) / scorers.ScoreRoundingFactor
|
|
})
|
|
|
|
return trimPeers(peers, peersPercentage)
|
|
}
|
|
|
|
// trimPeers limits peer list, returning only specified percentage of peers.
|
|
// Takes system constraints into account (min/max peers to sync).
|
|
func trimPeers(peers []peer.ID, peersPercentage float64) []peer.ID {
|
|
required := uint64(params.BeaconConfig().MaxPeersToSync)
|
|
if flags.Get().MinimumSyncPeers < required {
|
|
required = flags.Get().MinimumSyncPeers
|
|
}
|
|
// Weak/slow peers will be pushed down the list and trimmed since only percentage of peers is selected.
|
|
limit := uint64(math.Round(float64(len(peers)) * peersPercentage))
|
|
// Limit cannot be less that minimum peers required by sync mechanism.
|
|
limit = mathutil.Max(limit, required)
|
|
// Limit cannot be higher than number of peers available (safe-guard).
|
|
limit = mathutil.Min(limit, uint64(len(peers)))
|
|
return peers[:limit]
|
|
}
|