prysm-pulse/beacon-chain/sync/initial-sync/blocks_fetcher_peers.go

138 lines
4.9 KiB
Go
Raw Normal View History

package initialsync
import (
"context"
"math"
"sync"
"time"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/scorers"
"github.com/prysmaticlabs/prysm/cmd/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/shared/mathutil"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/timeutils"
"github.com/sirupsen/logrus"
Peer scoring: init sync (#6709) * refactors redundant part of varname * introduces score_block_providers * Merge branch 'master' into init-sync-peer-scoring-slow-peers * gazelle * adds comment * Merge branch 'master' into init-sync-peer-scoring-slow-peers * removes redundant checks * add block provider decay test * Merge branch 'master' into init-sync-peer-scoring-slow-peers * adds case * penalize inactive peers * adds scorebadresponses test * inroduces no-activity penalty * gazelle * gofmt * Merge branch 'master' into init-sync-peer-scoring-slow-peers * expanded tests * implement SortBlockProviders * change -> update * updates block fetcher peer filter * fixes test * allows to keep track of peer id * updates scoring coefficients * fixes test * block fetcher update * Merge branch 'master' into init-sync-peer-scoring-slow-peers * disables empty batch penalty * Merge branch 'master' into init-sync-peer-scoring-slow-peers * Merge branch 'master' into init-sync-peer-scoring-slow-peers * removes outdated code * update filterPeers * gazelle * updates var * revert changes to var name * updates blocks_fetcher * minor fix to import name * Merge branch 'master' into init-sync-peer-scoring-slow-peers * add max processed blocks cap * impoves scoring of stale peers * Merge branch 'master' into init-sync-peer-scoring-slow-peers * fixes test * adds weight sorting to scored peers * return pid when fetching batches * updates round robin * gazelle * Merge branch 'master' into init-sync-peer-scoring-slow-peers * updates block provider decay count * go tidy * cherry pick * fixes test * go tidy * Merge branch 'peer-scorer-weighted-sorter' into init-sync-peer-scoring-slow-peers * Merge branch 'master' into init-sync-peer-scoring-slow-peers * Merge branch 'master' into init-sync-peer-scoring-slow-peers * refactors blocks fetcher: repackage methods * Merge branch 'refactor-blocks-fetcher' into init-sync-peer-scoring-slow-peers * minor fixes * Merge branch 'master' into init-sync-peer-scoring-slow-peers * Merge branch 'master' into init-sync-peer-scoring-slow-peers * allow scores in range (0;1) in weighted filter * filterScoredPeers improve test suite * puts feature behind the flag * Merge branch 'master' into init-sync-peer-scoring-slow-peers * Merge refs/heads/master into init-sync-peer-scoring-slow-peers * fixes tests * Merge branch 'init-sync-peer-scoring-slow-peers' of github.com:prysmaticlabs/prysm into init-sync-peer-scoring-slow-peers * Merge refs/heads/master into init-sync-peer-scoring-slow-peers * Merge branch 'master' into init-sync-peer-scoring-slow-peers * Merge refs/heads/master into init-sync-peer-scoring-slow-peers * Update beacon-chain/sync/initial-sync/blocks_fetcher_test.go Co-authored-by: Shay Zluf <thezluf@gmail.com> * Nishant's suggestion on peer limit variable * better explanation of non-blocking peer scoring * Shay's sugession on peer naming * Merge refs/heads/master into init-sync-peer-scoring-slow-peers * Merge refs/heads/master into init-sync-peer-scoring-slow-peers * Update beacon-chain/sync/initial-sync/blocks_fetcher.go Co-authored-by: Nishant Das <nishdas93@gmail.com> * Merge refs/heads/master into init-sync-peer-scoring-slow-peers * gofmt * Merge refs/heads/master into init-sync-peer-scoring-slow-peers * Merge refs/heads/master into init-sync-peer-scoring-slow-peers * Merge refs/heads/master into init-sync-peer-scoring-slow-peers
2020-08-13 17:33:57 +00:00
"go.opencensus.io/trace"
)
// peerLock returns peer lock for a given peer. If lock is not found, it is created.
func (f *blocksFetcher) peerLock(pid peer.ID) *peerLock {
f.Lock()
defer f.Unlock()
if lock, ok := f.peerLocks[pid]; ok && lock != nil {
lock.accessed = timeutils.Now()
return lock
}
f.peerLocks[pid] = &peerLock{
Mutex: sync.Mutex{},
accessed: timeutils.Now(),
}
return f.peerLocks[pid]
}
// removeStalePeerLocks is a cleanup procedure which removes stale locks.
func (f *blocksFetcher) removeStalePeerLocks(age time.Duration) {
f.Lock()
defer f.Unlock()
for peerID, lock := range f.peerLocks {
if time.Since(lock.accessed) >= age {
lock.Lock()
delete(f.peerLocks, peerID)
lock.Unlock()
}
}
}
// selectFailOverPeer randomly selects fail over peer from the list of available peers.
func (f *blocksFetcher) selectFailOverPeer(excludedPID peer.ID, peers []peer.ID) (peer.ID, error) {
if len(peers) == 0 {
return "", errNoPeersAvailable
}
if len(peers) == 1 && peers[0] == excludedPID {
return "", errNoPeersAvailable
}
ind := f.rand.Int() % len(peers)
if peers[ind] == excludedPID {
return f.selectFailOverPeer(excludedPID, append(peers[:ind], peers[ind+1:]...))
}
return peers[ind], nil
}
// waitForMinimumPeers spins and waits up until enough peers are available.
func (f *blocksFetcher) waitForMinimumPeers(ctx context.Context) ([]peer.ID, error) {
required := params.BeaconConfig().MaxPeersToSync
if flags.Get().MinimumSyncPeers < required {
required = flags.Get().MinimumSyncPeers
}
for {
if ctx.Err() != nil {
return nil, ctx.Err()
}
var peers []peer.ID
if f.mode == modeStopOnFinalizedEpoch {
headEpoch := f.chain.FinalizedCheckpt().Epoch
_, peers = f.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, headEpoch)
} else {
headEpoch := helpers.SlotToEpoch(f.chain.HeadSlot())
_, peers = f.p2p.Peers().BestNonFinalized(flags.Get().MinimumSyncPeers, headEpoch)
}
if len(peers) >= required {
return peers, nil
}
log.WithFields(logrus.Fields{
"suitable": len(peers),
"required": required}).Info("Waiting for enough suitable peers before syncing")
time.Sleep(handshakePollingInterval)
}
}
// filterPeers returns transformed list of peers, weight sorted by scores and capacity remaining.
// List can be further constrained using peersPercentage, where only percentage of peers are returned.
func (f *blocksFetcher) filterPeers(ctx context.Context, peers []peer.ID, peersPercentage float64) []peer.ID {
ctx, span := trace.StartSpan(ctx, "initialsync.filterPeers")
Peer scoring: init sync (#6709) * refactors redundant part of varname * introduces score_block_providers * Merge branch 'master' into init-sync-peer-scoring-slow-peers * gazelle * adds comment * Merge branch 'master' into init-sync-peer-scoring-slow-peers * removes redundant checks * add block provider decay test * Merge branch 'master' into init-sync-peer-scoring-slow-peers * adds case * penalize inactive peers * adds scorebadresponses test * inroduces no-activity penalty * gazelle * gofmt * Merge branch 'master' into init-sync-peer-scoring-slow-peers * expanded tests * implement SortBlockProviders * change -> update * updates block fetcher peer filter * fixes test * allows to keep track of peer id * updates scoring coefficients * fixes test * block fetcher update * Merge branch 'master' into init-sync-peer-scoring-slow-peers * disables empty batch penalty * Merge branch 'master' into init-sync-peer-scoring-slow-peers * Merge branch 'master' into init-sync-peer-scoring-slow-peers * removes outdated code * update filterPeers * gazelle * updates var * revert changes to var name * updates blocks_fetcher * minor fix to import name * Merge branch 'master' into init-sync-peer-scoring-slow-peers * add max processed blocks cap * impoves scoring of stale peers * Merge branch 'master' into init-sync-peer-scoring-slow-peers * fixes test * adds weight sorting to scored peers * return pid when fetching batches * updates round robin * gazelle * Merge branch 'master' into init-sync-peer-scoring-slow-peers * updates block provider decay count * go tidy * cherry pick * fixes test * go tidy * Merge branch 'peer-scorer-weighted-sorter' into init-sync-peer-scoring-slow-peers * Merge branch 'master' into init-sync-peer-scoring-slow-peers * Merge branch 'master' into init-sync-peer-scoring-slow-peers * refactors blocks fetcher: repackage methods * Merge branch 'refactor-blocks-fetcher' into init-sync-peer-scoring-slow-peers * minor fixes * Merge branch 'master' into init-sync-peer-scoring-slow-peers * Merge branch 'master' into init-sync-peer-scoring-slow-peers * allow scores in range (0;1) in weighted filter * filterScoredPeers improve test suite * puts feature behind the flag * Merge branch 'master' into init-sync-peer-scoring-slow-peers * Merge refs/heads/master into init-sync-peer-scoring-slow-peers * fixes tests * Merge branch 'init-sync-peer-scoring-slow-peers' of github.com:prysmaticlabs/prysm into init-sync-peer-scoring-slow-peers * Merge refs/heads/master into init-sync-peer-scoring-slow-peers * Merge branch 'master' into init-sync-peer-scoring-slow-peers * Merge refs/heads/master into init-sync-peer-scoring-slow-peers * Update beacon-chain/sync/initial-sync/blocks_fetcher_test.go Co-authored-by: Shay Zluf <thezluf@gmail.com> * Nishant's suggestion on peer limit variable * better explanation of non-blocking peer scoring * Shay's sugession on peer naming * Merge refs/heads/master into init-sync-peer-scoring-slow-peers * Merge refs/heads/master into init-sync-peer-scoring-slow-peers * Update beacon-chain/sync/initial-sync/blocks_fetcher.go Co-authored-by: Nishant Das <nishdas93@gmail.com> * Merge refs/heads/master into init-sync-peer-scoring-slow-peers * gofmt * Merge refs/heads/master into init-sync-peer-scoring-slow-peers * Merge refs/heads/master into init-sync-peer-scoring-slow-peers * Merge refs/heads/master into init-sync-peer-scoring-slow-peers
2020-08-13 17:33:57 +00:00
defer span.End()
if len(peers) == 0 {
return peers
Peer scoring: init sync (#6709) * refactors redundant part of varname * introduces score_block_providers * Merge branch 'master' into init-sync-peer-scoring-slow-peers * gazelle * adds comment * Merge branch 'master' into init-sync-peer-scoring-slow-peers * removes redundant checks * add block provider decay test * Merge branch 'master' into init-sync-peer-scoring-slow-peers * adds case * penalize inactive peers * adds scorebadresponses test * inroduces no-activity penalty * gazelle * gofmt * Merge branch 'master' into init-sync-peer-scoring-slow-peers * expanded tests * implement SortBlockProviders * change -> update * updates block fetcher peer filter * fixes test * allows to keep track of peer id * updates scoring coefficients * fixes test * block fetcher update * Merge branch 'master' into init-sync-peer-scoring-slow-peers * disables empty batch penalty * Merge branch 'master' into init-sync-peer-scoring-slow-peers * Merge branch 'master' into init-sync-peer-scoring-slow-peers * removes outdated code * update filterPeers * gazelle * updates var * revert changes to var name * updates blocks_fetcher * minor fix to import name * Merge branch 'master' into init-sync-peer-scoring-slow-peers * add max processed blocks cap * impoves scoring of stale peers * Merge branch 'master' into init-sync-peer-scoring-slow-peers * fixes test * adds weight sorting to scored peers * return pid when fetching batches * updates round robin * gazelle * Merge branch 'master' into init-sync-peer-scoring-slow-peers * updates block provider decay count * go tidy * cherry pick * fixes test * go tidy * Merge branch 'peer-scorer-weighted-sorter' into init-sync-peer-scoring-slow-peers * Merge branch 'master' into init-sync-peer-scoring-slow-peers * Merge branch 'master' into init-sync-peer-scoring-slow-peers * refactors blocks fetcher: repackage methods * Merge branch 'refactor-blocks-fetcher' into init-sync-peer-scoring-slow-peers * minor fixes * Merge branch 'master' into init-sync-peer-scoring-slow-peers * Merge branch 'master' into init-sync-peer-scoring-slow-peers * allow scores in range (0;1) in weighted filter * filterScoredPeers improve test suite * puts feature behind the flag * Merge branch 'master' into init-sync-peer-scoring-slow-peers * Merge refs/heads/master into init-sync-peer-scoring-slow-peers * fixes tests * Merge branch 'init-sync-peer-scoring-slow-peers' of github.com:prysmaticlabs/prysm into init-sync-peer-scoring-slow-peers * Merge refs/heads/master into init-sync-peer-scoring-slow-peers * Merge branch 'master' into init-sync-peer-scoring-slow-peers * Merge refs/heads/master into init-sync-peer-scoring-slow-peers * Update beacon-chain/sync/initial-sync/blocks_fetcher_test.go Co-authored-by: Shay Zluf <thezluf@gmail.com> * Nishant's suggestion on peer limit variable * better explanation of non-blocking peer scoring * Shay's sugession on peer naming * Merge refs/heads/master into init-sync-peer-scoring-slow-peers * Merge refs/heads/master into init-sync-peer-scoring-slow-peers * Update beacon-chain/sync/initial-sync/blocks_fetcher.go Co-authored-by: Nishant Das <nishdas93@gmail.com> * Merge refs/heads/master into init-sync-peer-scoring-slow-peers * gofmt * Merge refs/heads/master into init-sync-peer-scoring-slow-peers * Merge refs/heads/master into init-sync-peer-scoring-slow-peers * Merge refs/heads/master into init-sync-peer-scoring-slow-peers
2020-08-13 17:33:57 +00:00
}
// Sort peers using both block provider score and, custom, capacity based score (see
// peerFilterCapacityWeight if you want to give different weights to provider's and capacity
// scores).
// Scores produced are used as weights, so peers are ordered probabilistically i.e. peer with
// a higher score has higher chance to end up higher in the list.
scorer := f.p2p.Peers().Scorers().BlockProviderScorer()
peers = scorer.WeightSorted(f.rand, peers, func(peerID peer.ID, blockProviderScore float64) float64 {
remaining, capacity := float64(f.rateLimiter.Remaining(peerID.String())), float64(f.rateLimiter.Capacity())
// When capacity is close to exhaustion, allow less performant peer to take a chance.
// Otherwise, there's a good chance system will be forced to wait for rate limiter.
if remaining < float64(f.blocksPerSecond) {
return 0.0
}
capScore := remaining / capacity
overallScore := blockProviderScore*(1.0-f.capacityWeight) + capScore*f.capacityWeight
return math.Round(overallScore*scorers.ScoreRoundingFactor) / scorers.ScoreRoundingFactor
})
return trimPeers(peers, peersPercentage)
Peer scoring: init sync (#6709) * refactors redundant part of varname * introduces score_block_providers * Merge branch 'master' into init-sync-peer-scoring-slow-peers * gazelle * adds comment * Merge branch 'master' into init-sync-peer-scoring-slow-peers * removes redundant checks * add block provider decay test * Merge branch 'master' into init-sync-peer-scoring-slow-peers * adds case * penalize inactive peers * adds scorebadresponses test * inroduces no-activity penalty * gazelle * gofmt * Merge branch 'master' into init-sync-peer-scoring-slow-peers * expanded tests * implement SortBlockProviders * change -> update * updates block fetcher peer filter * fixes test * allows to keep track of peer id * updates scoring coefficients * fixes test * block fetcher update * Merge branch 'master' into init-sync-peer-scoring-slow-peers * disables empty batch penalty * Merge branch 'master' into init-sync-peer-scoring-slow-peers * Merge branch 'master' into init-sync-peer-scoring-slow-peers * removes outdated code * update filterPeers * gazelle * updates var * revert changes to var name * updates blocks_fetcher * minor fix to import name * Merge branch 'master' into init-sync-peer-scoring-slow-peers * add max processed blocks cap * impoves scoring of stale peers * Merge branch 'master' into init-sync-peer-scoring-slow-peers * fixes test * adds weight sorting to scored peers * return pid when fetching batches * updates round robin * gazelle * Merge branch 'master' into init-sync-peer-scoring-slow-peers * updates block provider decay count * go tidy * cherry pick * fixes test * go tidy * Merge branch 'peer-scorer-weighted-sorter' into init-sync-peer-scoring-slow-peers * Merge branch 'master' into init-sync-peer-scoring-slow-peers * Merge branch 'master' into init-sync-peer-scoring-slow-peers * refactors blocks fetcher: repackage methods * Merge branch 'refactor-blocks-fetcher' into init-sync-peer-scoring-slow-peers * minor fixes * Merge branch 'master' into init-sync-peer-scoring-slow-peers * Merge branch 'master' into init-sync-peer-scoring-slow-peers * allow scores in range (0;1) in weighted filter * filterScoredPeers improve test suite * puts feature behind the flag * Merge branch 'master' into init-sync-peer-scoring-slow-peers * Merge refs/heads/master into init-sync-peer-scoring-slow-peers * fixes tests * Merge branch 'init-sync-peer-scoring-slow-peers' of github.com:prysmaticlabs/prysm into init-sync-peer-scoring-slow-peers * Merge refs/heads/master into init-sync-peer-scoring-slow-peers * Merge branch 'master' into init-sync-peer-scoring-slow-peers * Merge refs/heads/master into init-sync-peer-scoring-slow-peers * Update beacon-chain/sync/initial-sync/blocks_fetcher_test.go Co-authored-by: Shay Zluf <thezluf@gmail.com> * Nishant's suggestion on peer limit variable * better explanation of non-blocking peer scoring * Shay's sugession on peer naming * Merge refs/heads/master into init-sync-peer-scoring-slow-peers * Merge refs/heads/master into init-sync-peer-scoring-slow-peers * Update beacon-chain/sync/initial-sync/blocks_fetcher.go Co-authored-by: Nishant Das <nishdas93@gmail.com> * Merge refs/heads/master into init-sync-peer-scoring-slow-peers * gofmt * Merge refs/heads/master into init-sync-peer-scoring-slow-peers * Merge refs/heads/master into init-sync-peer-scoring-slow-peers * Merge refs/heads/master into init-sync-peer-scoring-slow-peers
2020-08-13 17:33:57 +00:00
}
// trimPeers limits peer list, returning only specified percentage of peers.
// Takes system constraints into account (min/max peers to sync).
func trimPeers(peers []peer.ID, peersPercentage float64) []peer.ID {
required := params.BeaconConfig().MaxPeersToSync
if flags.Get().MinimumSyncPeers < required {
required = flags.Get().MinimumSyncPeers
}
// Weak/slow peers will be pushed down the list and trimmed since only percentage of peers is selected.
limit := uint64(math.Round(float64(len(peers)) * peersPercentage))
// Limit cannot be less that minimum peers required by sync mechanism.
limit = mathutil.Max(limit, uint64(required))
// Limit cannot be higher than number of peers available (safe-guard).
limit = mathutil.Min(limit, uint64(len(peers)))
return peers[:limit]
}