Peer scoring: init sync (#6709)

* refactors redundant part of varname
* introduces score_block_providers
* Merge branch 'master' into init-sync-peer-scoring-slow-peers
* gazelle
* adds comment
* Merge branch 'master' into init-sync-peer-scoring-slow-peers
* removes redundant checks
* add block provider decay test
* Merge branch 'master' into init-sync-peer-scoring-slow-peers
* adds case
* penalize inactive peers
* adds scorebadresponses test
* inroduces no-activity penalty
* gazelle
* gofmt
* Merge branch 'master' into init-sync-peer-scoring-slow-peers
* expanded tests
* implement SortBlockProviders
* change -> update
* updates block fetcher peer filter
* fixes test
* allows to keep track of peer id
* updates scoring coefficients
* fixes test
* block fetcher update
* Merge branch 'master' into init-sync-peer-scoring-slow-peers
* disables empty batch penalty
* Merge branch 'master' into init-sync-peer-scoring-slow-peers
* Merge branch 'master' into init-sync-peer-scoring-slow-peers
* removes outdated code
* update filterPeers
* gazelle
* updates var
* revert changes to var name
* updates blocks_fetcher
* minor fix to import name
* Merge branch 'master' into init-sync-peer-scoring-slow-peers
* add max processed blocks cap
* impoves scoring of stale peers
* Merge branch 'master' into init-sync-peer-scoring-slow-peers
* fixes test
* adds weight sorting to scored peers
* return pid when fetching batches
* updates round robin
* gazelle
* Merge branch 'master' into init-sync-peer-scoring-slow-peers
* updates block provider decay count
* go tidy
* cherry pick
* fixes test
* go tidy
* Merge branch 'peer-scorer-weighted-sorter' into init-sync-peer-scoring-slow-peers
* Merge branch 'master' into init-sync-peer-scoring-slow-peers
* Merge branch 'master' into init-sync-peer-scoring-slow-peers
* refactors blocks fetcher: repackage methods
* Merge branch 'refactor-blocks-fetcher' into init-sync-peer-scoring-slow-peers
* minor fixes
* Merge branch 'master' into init-sync-peer-scoring-slow-peers
* Merge branch 'master' into init-sync-peer-scoring-slow-peers
* allow scores in range (0;1) in weighted filter
* filterScoredPeers improve test suite
* puts feature behind the flag
* Merge branch 'master' into init-sync-peer-scoring-slow-peers
* Merge refs/heads/master into init-sync-peer-scoring-slow-peers
* fixes tests
* Merge branch 'init-sync-peer-scoring-slow-peers' of github.com:prysmaticlabs/prysm into init-sync-peer-scoring-slow-peers
* Merge refs/heads/master into init-sync-peer-scoring-slow-peers
* Merge branch 'master' into init-sync-peer-scoring-slow-peers
* Merge refs/heads/master into init-sync-peer-scoring-slow-peers
* Update beacon-chain/sync/initial-sync/blocks_fetcher_test.go

Co-authored-by: Shay Zluf <thezluf@gmail.com>
* Nishant's suggestion on peer limit variable
* better explanation of non-blocking peer scoring
* Shay's sugession on peer naming
* Merge refs/heads/master into init-sync-peer-scoring-slow-peers
* Merge refs/heads/master into init-sync-peer-scoring-slow-peers
* Update beacon-chain/sync/initial-sync/blocks_fetcher.go

Co-authored-by: Nishant Das <nishdas93@gmail.com>
* Merge refs/heads/master into init-sync-peer-scoring-slow-peers
* gofmt
* Merge refs/heads/master into init-sync-peer-scoring-slow-peers
* Merge refs/heads/master into init-sync-peer-scoring-slow-peers
* Merge refs/heads/master into init-sync-peer-scoring-slow-peers
This commit is contained in:
Victor Farazdagi 2020-08-13 20:33:57 +03:00 committed by GitHub
parent 282398fd13
commit 4de0b9fe69
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 426 additions and 50 deletions

View File

@ -16,6 +16,7 @@ go_library(
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/flags:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//shared/featureconfig:go_default_library",
"//shared/rand:go_default_library",
"//shared/roughtime:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/enr:go_default_library",

View File

@ -16,7 +16,9 @@ func TestMain(m *testing.M) {
logrus.SetLevel(logrus.DebugLevel)
logrus.SetOutput(ioutil.Discard)
resetCfg := featureconfig.InitWithReset(&featureconfig.Flags{})
resetCfg := featureconfig.InitWithReset(&featureconfig.Flags{
EnablePeerScorer: true,
})
defer resetCfg()
resetFlags := flags.Get()

View File

@ -9,6 +9,7 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
"github.com/prysmaticlabs/prysm/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/rand"
"github.com/prysmaticlabs/prysm/shared/roughtime"
)
@ -281,6 +282,9 @@ func (s *BlockProviderScorer) mapScoresAndPeers(
func (s *BlockProviderScorer) FormatScorePretty(pid peer.ID) string {
s.store.RLock()
defer s.store.RUnlock()
if !featureconfig.Get().EnablePeerScorer {
return "disabled"
}
score := s.score(pid)
return fmt.Sprintf("[%0.1f%%, raw: %0.2f, blocks: %d/%d]",
(score/s.MaxScore())*100, score, s.processedBlocks(pid), s.config.ProcessedBlocksCap)

View File

@ -25,6 +25,7 @@ go_library(
"//beacon-chain/db:go_default_library",
"//beacon-chain/flags:go_default_library",
"//beacon-chain/p2p:go_default_library",
"//beacon-chain/p2p/peers:go_default_library",
"//beacon-chain/state/stateutil:go_default_library",
"//beacon-chain/sync:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",

View File

@ -19,6 +19,7 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
prysmsync "github.com/prysmaticlabs/prysm/beacon-chain/sync"
p2ppb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/rand"
"github.com/sirupsen/logrus"
@ -39,6 +40,9 @@ const (
// nonSkippedSlotsFullSearchEpochs how many epochs to check in full, before resorting to random
// sampling of slots once per epoch
nonSkippedSlotsFullSearchEpochs = 10
// peerFilterCapacityWeight defines how peer's capacity affects peer's score. Provided as
// percentage, i.e. 0.3 means capacity will determine 30% of peer's score.
peerFilterCapacityWeight = 0.2
)
var (
@ -49,8 +53,9 @@ var (
// blocksFetcherConfig is a config to setup the block fetcher.
type blocksFetcherConfig struct {
headFetcher blockchain.HeadFetcher
p2p p2p.P2P
headFetcher blockchain.HeadFetcher
p2p p2p.P2P
peerFilterCapacityWeight float64
}
// blocksFetcher is a service to fetch chain data from peers.
@ -68,6 +73,7 @@ type blocksFetcher struct {
peerLocks map[peer.ID]*peerLock
fetchRequests chan *fetchRequestParams
fetchResponses chan *fetchRequestResponse
capacityWeight float64 // how remaining capacity affects peer selection
quit chan struct{} // termination notifier
}
@ -102,6 +108,11 @@ func newBlocksFetcher(ctx context.Context, cfg *blocksFetcherConfig) *blocksFetc
float64(blocksPerSecond), int64(allowedBlocksBurst-blocksPerSecond),
false /* deleteEmptyBuckets */)
capacityWeight := cfg.peerFilterCapacityWeight
if capacityWeight >= 1 {
capacityWeight = peerFilterCapacityWeight
}
ctx, cancel := context.WithCancel(ctx)
return &blocksFetcher{
ctx: ctx,
@ -114,6 +125,7 @@ func newBlocksFetcher(ctx context.Context, cfg *blocksFetcherConfig) *blocksFetc
peerLocks: make(map[peer.ID]*peerLock),
fetchRequests: make(chan *fetchRequestParams, maxPendingRequests),
fetchResponses: make(chan *fetchRequestResponse, maxPendingRequests),
capacityWeight: capacityWeight,
quit: make(chan struct{}),
}
}
@ -261,7 +273,11 @@ func (f *blocksFetcher) fetchBlocksFromPeer(
var blocks []*eth.SignedBeaconBlock
var err error
peers, err = f.filterPeers(peers, peersPercentagePerRequest)
if featureconfig.Get().EnablePeerScorer {
peers, err = f.filterScoredPeers(ctx, peers, peersPercentagePerRequest)
} else {
peers, err = f.filterPeers(peers, peersPercentagePerRequest)
}
if err != nil {
return blocks, "", err
}
@ -272,6 +288,9 @@ func (f *blocksFetcher) fetchBlocksFromPeer(
}
for i := 0; i < len(peers); i++ {
if blocks, err = f.requestBlocks(ctx, req, peers[i]); err == nil {
if featureconfig.Get().EnablePeerScorer {
f.p2p.Peers().Scorers().BlockProviderScorer().Touch(peers[i])
}
return blocks, peers[i], err
}
}
@ -298,6 +317,7 @@ func (f *blocksFetcher) requestBlocks(
"count": req.Count,
"step": req.Step,
"capacity": f.rateLimiter.Remaining(pid.String()),
"score": f.p2p.Peers().Scorers().BlockProviderScorer().FormatScorePretty(pid),
}).Debug("Requesting blocks")
if f.rateLimiter.Remaining(pid.String()) < int64(req.Count) {
log.WithField("peer", pid).Debug("Slowing down for rate limit")

View File

@ -10,10 +10,12 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/flags"
scorers "github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers"
"github.com/prysmaticlabs/prysm/shared/mathutil"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/roughtime"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)
// getPeerLock returns peer lock for a given peer. If lock is not found, it is created.
@ -83,7 +85,7 @@ func (f *blocksFetcher) waitForMinimumPeers(ctx context.Context) ([]peer.ID, err
}
// filterPeers returns transformed list of peers,
// weight ordered or randomized, constrained if necessary.
// weight ordered or randomized, constrained if necessary (only percentage of peers returned).
func (f *blocksFetcher) filterPeers(peers []peer.ID, peersPercentage float64) ([]peer.ID, error) {
if len(peers) == 0 {
return peers, nil
@ -96,14 +98,7 @@ func (f *blocksFetcher) filterPeers(peers []peer.ID, peersPercentage float64) ([
})
// Select sub-sample from peers (honoring min-max invariants).
required := params.BeaconConfig().MaxPeersToSync
if flags.Get().MinimumSyncPeers < required {
required = flags.Get().MinimumSyncPeers
}
limit := uint64(math.Round(float64(len(peers)) * peersPercentage))
limit = mathutil.Max(limit, uint64(required))
limit = mathutil.Min(limit, uint64(len(peers)))
peers = peers[:limit]
peers = trimPeers(peers, peersPercentage)
// Order peers by remaining capacity, effectively turning in-order
// round robin peer processing into a weighted one (peers with higher
@ -118,3 +113,52 @@ func (f *blocksFetcher) filterPeers(peers []peer.ID, peersPercentage float64) ([
return peers, nil
}
// filterScoredPeers returns transformed list of peers,
// weight sorted by scores and capacity remaining. List can be constrained using peersPercentage,
// where only percentage of peers are returned.
func (f *blocksFetcher) filterScoredPeers(ctx context.Context, peers []peer.ID, peersPercentage float64) ([]peer.ID, error) {
ctx, span := trace.StartSpan(ctx, "initialsync.filterScoredPeers")
defer span.End()
if len(peers) == 0 {
return peers, nil
}
// Sort peers using both block provider score and, custom, capacity based score (see
// peerFilterCapacityWeight if you want to give different weights to provider's and capacity
// scores).
// Scores produced are used as weights, so peers are ordered probabilistically i.e. peer with
// a higher score has higher chance to end up higher in the list.
scorer := f.p2p.Peers().Scorers().BlockProviderScorer()
peers = scorer.WeightSorted(f.rand, peers, func(peerID peer.ID, blockProviderScore float64) float64 {
remaining, capacity := float64(f.rateLimiter.Remaining(peerID.String())), float64(f.rateLimiter.Capacity())
// When capacity is close to exhaustion, allow less performant peer to take a chance.
// Otherwise, there's a good chance system will be forced to wait for rate limiter.
if remaining < float64(f.blocksPerSecond) {
return 0.0
}
capScore := remaining / capacity
overallScore := blockProviderScore*(1.0-f.capacityWeight) + capScore*f.capacityWeight
return math.Round(overallScore*scorers.ScoreRoundingFactor) / scorers.ScoreRoundingFactor
})
peers = trimPeers(peers, peersPercentage)
return peers, nil
}
// trimPeers limits peer list, returning only specified percentage of peers.
// Takes system constraints into account (min/max peers to sync).
func trimPeers(peers []peer.ID, peersPercentage float64) []peer.ID {
required := params.BeaconConfig().MaxPeersToSync
if flags.Get().MinimumSyncPeers < required {
required = flags.Get().MinimumSyncPeers
}
// Weak/slow peers will be pushed down the list and trimmed since only percentage of peers is selected.
limit := uint64(math.Round(float64(len(peers)) * peersPercentage))
// Limit cannot be less that minimum peers required by sync mechanism.
limit = mathutil.Max(limit, uint64(required))
// Limit cannot be higher than number of peers available (safe-guard).
limit = mathutil.Min(limit, uint64(len(peers)))
return peers[:limit]
}

View File

@ -3,6 +3,7 @@ package initialsync
import (
"context"
"fmt"
"math"
"sort"
"sync"
"testing"
@ -18,6 +19,7 @@ import (
dbtest "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/flags"
p2pm "github.com/prysmaticlabs/prysm/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers"
p2pt "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing"
stateTrie "github.com/prysmaticlabs/prysm/beacon-chain/state"
p2ppb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
@ -501,7 +503,7 @@ func TestBlocksFetcher_selectFailOverPeer(t *testing.T) {
{
name: "No peers provided",
args: args{
excludedPID: "abc",
excludedPID: "a",
peers: []peer.ID{},
},
want: "",
@ -510,9 +512,9 @@ func TestBlocksFetcher_selectFailOverPeer(t *testing.T) {
{
name: "Single peer which needs to be excluded",
args: args{
excludedPID: "abc",
excludedPID: "a",
peers: []peer.ID{
"abc",
"a",
},
},
want: "",
@ -521,7 +523,7 @@ func TestBlocksFetcher_selectFailOverPeer(t *testing.T) {
{
name: "Single peer available",
args: args{
excludedPID: "abc",
excludedPID: "a",
peers: []peer.ID{
"cde",
},
@ -532,9 +534,9 @@ func TestBlocksFetcher_selectFailOverPeer(t *testing.T) {
{
name: "Two peers available, excluded first",
args: args{
excludedPID: "abc",
excludedPID: "a",
peers: []peer.ID{
"abc", "cde",
"a", "cde",
},
},
want: "cde",
@ -543,9 +545,9 @@ func TestBlocksFetcher_selectFailOverPeer(t *testing.T) {
{
name: "Two peers available, excluded second",
args: args{
excludedPID: "abc",
excludedPID: "a",
peers: []peer.ID{
"cde", "abc",
"cde", "a",
},
},
want: "cde",
@ -554,9 +556,9 @@ func TestBlocksFetcher_selectFailOverPeer(t *testing.T) {
{
name: "Multiple peers available",
args: args{
excludedPID: "abc",
excludedPID: "a",
peers: []peer.ID{
"abc", "cde", "cde", "cde",
"a", "cde", "cde", "cde",
},
},
want: "cde",
@ -683,37 +685,37 @@ func TestBlocksFetcher_filterPeers(t *testing.T) {
name: "single peer",
args: args{
peers: []weightedPeer{
{"abc", 10},
{"a", 10},
},
peersPercentage: 1.0,
},
want: []peer.ID{"abc"},
want: []peer.ID{"a"},
},
{
name: "multiple peers same capacity",
args: args{
peers: []weightedPeer{
{"abc", 10},
{"def", 10},
{"xyz", 10},
{"a", 10},
{"b", 10},
{"c", 10},
},
peersPercentage: 1.0,
},
want: []peer.ID{"abc", "def", "xyz"},
want: []peer.ID{"a", "b", "c"},
},
{
name: "multiple peers different capacity",
args: args{
peers: []weightedPeer{
{"abc", 20},
{"def", 15},
{"ghi", 10},
{"jkl", 90},
{"xyz", 20},
{"a", 20},
{"b", 15},
{"c", 10},
{"d", 90},
{"e", 20},
},
peersPercentage: 1.0,
},
want: []peer.ID{"ghi", "def", "abc", "xyz", "jkl"},
want: []peer.ID{"c", "b", "a", "e", "d"},
},
}
for _, tt := range tests {
@ -743,6 +745,200 @@ func TestBlocksFetcher_filterPeers(t *testing.T) {
}
}
func TestBlocksFetcher_filterScoredPeers(t *testing.T) {
type weightedPeer struct {
peer.ID
usedCapacity int64
}
type args struct {
peers []weightedPeer
peersPercentage float64
capacityWeight float64
}
batchSize := uint64(flags.Get().BlockBatchLimit)
tests := []struct {
name string
args args
update func(s *peers.BlockProviderScorer)
want []peer.ID
}{
{
name: "no peers available",
args: args{
peers: []weightedPeer{},
peersPercentage: 1.0,
capacityWeight: 0.2,
},
want: []peer.ID{},
},
{
name: "single peer",
args: args{
peers: []weightedPeer{
{"a", 1200},
},
peersPercentage: 1.0,
capacityWeight: 0.2,
},
want: []peer.ID{"a"},
},
{
name: "multiple peers same capacity",
args: args{
peers: []weightedPeer{
{"a", 2400},
{"b", 2400},
{"c", 2400},
},
peersPercentage: 1.0,
capacityWeight: 0.2,
},
want: []peer.ID{"a", "b", "c"},
},
{
name: "multiple peers capacity as tie-breaker",
args: args{
peers: []weightedPeer{
{"a", 6000},
{"b", 3000},
{"c", 0},
{"d", 9000},
{"e", 6000},
},
peersPercentage: 1.0,
capacityWeight: 0.2,
},
update: func(s *peers.BlockProviderScorer) {
s.IncrementProcessedBlocks("a", batchSize*2)
s.IncrementProcessedBlocks("b", batchSize*2)
s.IncrementProcessedBlocks("c", batchSize*2)
s.IncrementProcessedBlocks("d", batchSize*2)
s.IncrementProcessedBlocks("e", batchSize*2)
},
want: []peer.ID{"c", "b", "a", "e", "d"},
},
{
name: "multiple peers same capacity different scores",
args: args{
peers: []weightedPeer{
{"a", 9000},
{"b", 9000},
{"c", 9000},
{"d", 9000},
{"e", 9000},
},
peersPercentage: 0.8,
capacityWeight: 0.2,
},
update: func(s *peers.BlockProviderScorer) {
s.IncrementProcessedBlocks("e", s.Params().ProcessedBlocksCap)
s.IncrementProcessedBlocks("b", s.Params().ProcessedBlocksCap/2)
s.IncrementProcessedBlocks("c", s.Params().ProcessedBlocksCap/4)
s.IncrementProcessedBlocks("a", s.Params().ProcessedBlocksCap/8)
s.IncrementProcessedBlocks("d", 0)
},
want: []peer.ID{"e", "b", "c", "a"},
},
{
name: "multiple peers different capacities and scores",
args: args{
peers: []weightedPeer{
{"a", 6500},
{"b", 2500},
{"c", 1000},
{"d", 9000},
{"e", 6500},
},
peersPercentage: 0.8,
capacityWeight: 0.2,
},
update: func(s *peers.BlockProviderScorer) {
// Make sure that score takes priority over capacity.
s.IncrementProcessedBlocks("c", batchSize*5)
s.IncrementProcessedBlocks("b", batchSize*15)
// Break tie using capacity as a tie-breaker (a and ghi have the same score).
s.IncrementProcessedBlocks("a", batchSize*3)
s.IncrementProcessedBlocks("e", batchSize*3)
// Exclude peer (peers percentage is 80%).
s.IncrementProcessedBlocks("d", batchSize)
},
want: []peer.ID{"b", "c", "a", "e"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mc, p2p, _ := initializeTestServices(t, []uint64{}, []*peerData{})
fetcher := newBlocksFetcher(context.Background(), &blocksFetcherConfig{
headFetcher: mc,
p2p: p2p,
peerFilterCapacityWeight: tt.args.capacityWeight,
})
// Non-leaking bucket, with initial capacity of 10000.
fetcher.rateLimiter = leakybucket.NewCollector(0.000001, 10000, false)
peerIDs := make([]peer.ID, 0)
for _, pid := range tt.args.peers {
peerIDs = append(peerIDs, pid.ID)
fetcher.rateLimiter.Add(pid.ID.String(), pid.usedCapacity)
}
if tt.update != nil {
tt.update(fetcher.p2p.Peers().Scorers().BlockProviderScorer())
}
// Since peer selection is probabilistic (weighted, with high scorers having higher
// chance of being selected), we need multiple rounds of filtering to test the order:
// over multiple attempts, top scorers should be picked on high positions more often.
peerStats := make(map[peer.ID]int, len(tt.want))
var filteredPIDs []peer.ID
var err error
for i := 0; i < 1000; i++ {
filteredPIDs, err = fetcher.filterScoredPeers(context.Background(), peerIDs, tt.args.peersPercentage)
if len(filteredPIDs) <= 1 {
break
}
require.NoError(t, err)
for j, pid := range filteredPIDs {
// The higher peer in the list, the more "points" will it get.
peerStats[pid] += len(tt.want) - j
}
}
// If percentage of peers was requested, rebuild combined filtered peers list.
if len(filteredPIDs) != len(peerStats) && len(peerStats) > 0 {
filteredPIDs = []peer.ID{}
for pid := range peerStats {
filteredPIDs = append(filteredPIDs, pid)
}
}
// Sort by frequency of appearance in high positions on filtering.
sort.Slice(filteredPIDs, func(i, j int) bool {
return peerStats[filteredPIDs[i]] > peerStats[filteredPIDs[j]]
})
if tt.args.peersPercentage < 1.0 {
limit := uint64(math.Round(float64(len(filteredPIDs)) * tt.args.peersPercentage))
filteredPIDs = filteredPIDs[:limit]
}
// Re-arrange peers with the same remaining capacity, deterministically .
// They are deliberately shuffled - so that on the same capacity any of
// such peers can be selected. That's why they are sorted here.
sort.SliceStable(filteredPIDs, func(i, j int) bool {
score1 := fetcher.p2p.Peers().Scorers().BlockProviderScorer().Score(filteredPIDs[i])
score2 := fetcher.p2p.Peers().Scorers().BlockProviderScorer().Score(filteredPIDs[j])
if score1 == score2 {
cap1 := fetcher.rateLimiter.Remaining(filteredPIDs[i].String())
cap2 := fetcher.rateLimiter.Remaining(filteredPIDs[j].String())
if cap1 == cap2 {
return filteredPIDs[i].String() < filteredPIDs[j].String()
}
}
return i < j
})
assert.DeepEqual(t, tt.want, filteredPIDs)
})
}
}
func TestBlocksFetcher_RequestBlocksRateLimitingLocks(t *testing.T) {
p1 := p2pt.NewTestP2P(t)
p2 := p2pt.NewTestP2P(t)
@ -831,29 +1027,29 @@ func TestBlocksFetcher_removeStalePeerLocks(t *testing.T) {
age: peerLockMaxAge,
peersIn: []peerData{
{
peerID: "abc",
peerID: "a",
accessed: roughtime.Now(),
},
{
peerID: "def",
peerID: "b",
accessed: roughtime.Now(),
},
{
peerID: "ghi",
peerID: "c",
accessed: roughtime.Now(),
},
},
peersOut: []peerData{
{
peerID: "abc",
peerID: "a",
accessed: roughtime.Now(),
},
{
peerID: "def",
peerID: "b",
accessed: roughtime.Now(),
},
{
peerID: "ghi",
peerID: "c",
accessed: roughtime.Now(),
},
},
@ -863,25 +1059,25 @@ func TestBlocksFetcher_removeStalePeerLocks(t *testing.T) {
age: peerLockMaxAge,
peersIn: []peerData{
{
peerID: "abc",
peerID: "a",
accessed: roughtime.Now(),
},
{
peerID: "def",
peerID: "b",
accessed: roughtime.Now().Add(-peerLockMaxAge),
},
{
peerID: "ghi",
peerID: "c",
accessed: roughtime.Now(),
},
},
peersOut: []peerData{
{
peerID: "abc",
peerID: "a",
accessed: roughtime.Now(),
},
{
peerID: "ghi",
peerID: "c",
accessed: roughtime.Now(),
},
},
@ -891,15 +1087,15 @@ func TestBlocksFetcher_removeStalePeerLocks(t *testing.T) {
age: peerLockMaxAge,
peersIn: []peerData{
{
peerID: "abc",
peerID: "a",
accessed: roughtime.Now().Add(-peerLockMaxAge),
},
{
peerID: "def",
peerID: "b",
accessed: roughtime.Now().Add(-peerLockMaxAge),
},
{
peerID: "ghi",
peerID: "c",
accessed: roughtime.Now().Add(-peerLockMaxAge),
},
},

View File

@ -7,6 +7,7 @@ import (
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
p2ppb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
@ -33,7 +34,11 @@ func (f *blocksFetcher) nonSkippedSlotAfter(ctx context.Context, slot uint64) (u
return 0, errSlotIsTooHigh
}
var err error
peers, err = f.filterPeers(peers, peersPercentagePerRequest)
if featureconfig.Get().EnablePeerScorer {
peers, err = f.filterScoredPeers(ctx, peers, peersPercentagePerRequest)
} else {
peers, err = f.filterPeers(peers, peersPercentagePerRequest)
}
if err != nil {
return 0, err
}

View File

@ -57,6 +57,7 @@ func TestMain(m *testing.M) {
resetCfg := featureconfig.InitWithReset(&featureconfig.Flags{
NewStateMgmt: true,
BatchBlockVerify: true,
EnablePeerScorer: true,
})
defer resetCfg()

View File

@ -128,6 +128,15 @@ func (s *Service) roundRobinSync(genesis time.Time) error {
// processFetchedData processes data received from queue.
func (s *Service) processFetchedData(
ctx context.Context, genesis time.Time, startSlot uint64, data *blocksQueueFetchedData) {
defer func() {
if !featureconfig.Get().EnablePeerScorer || data.pid == "" {
return
}
scorer := s.p2p.Peers().Scorers().BlockProviderScorer()
if diff := s.chain.HeadSlot() - startSlot; diff > 0 {
scorer.IncrementProcessedBlocks(data.pid, diff)
}
}()
blockReceiver := s.chain.ReceiveBlockInitialSync
batchReceiver := s.chain.ReceiveBlockBatch

View File

@ -476,3 +476,85 @@ func TestService_processBlockBatch(t *testing.T) {
assert.Equal(t, uint64(19), s.chain.HeadSlot(), "Unexpected head slot")
})
}
func TestService_blockProviderScoring(t *testing.T) {
cache.initializeRootCache(makeSequence(1, 320), t)
p := p2pt.NewTestP2P(t)
beaconDB, _ := dbtest.SetupDB(t)
peerData := []*peerData{
{
// The slowest peer, only a single block in couple of epochs.
blocks: []uint64{1, 65, 129},
finalizedEpoch: 5,
headSlot: 160,
},
{
// A relatively slow peer, still should perform better than the slowest peer.
blocks: append([]uint64{1, 2, 3, 4, 65, 66, 67, 68, 129, 130}, makeSequence(131, 160)...),
finalizedEpoch: 5,
headSlot: 160,
},
{
// This peer has all blocks - should be a preferred one.
blocks: makeSequence(1, 160),
finalizedEpoch: 5,
headSlot: 160,
},
}
peer1 := connectPeer(t, p, peerData[0], p.Peers())
peer2 := connectPeer(t, p, peerData[1], p.Peers())
peer3 := connectPeer(t, p, peerData[2], p.Peers())
cache.RLock()
genesisRoot := cache.rootCache[0]
cache.RUnlock()
err := beaconDB.SaveBlock(context.Background(), &eth.SignedBeaconBlock{Block: &eth.BeaconBlock{Slot: 0}})
require.NoError(t, err)
st, err := stateTrie.InitializeFromProto(&p2ppb.BeaconState{})
require.NoError(t, err)
mc := &mock.ChainService{
State: st,
Root: genesisRoot[:],
DB: beaconDB,
} // no-op mock
s := &Service{
chain: mc,
p2p: p,
db: beaconDB,
synced: false,
chainStarted: true,
}
scorer := s.p2p.Peers().Scorers().BlockProviderScorer()
expectedBlockSlots := makeSequence(1, 160)
currentSlot := uint64(160)
assert.Equal(t, scorer.MaxScore(), scorer.Score(peer1))
assert.Equal(t, scorer.MaxScore(), scorer.Score(peer2))
assert.Equal(t, scorer.MaxScore(), scorer.Score(peer3))
assert.NoError(t, s.roundRobinSync(makeGenesisTime(currentSlot)))
if s.chain.HeadSlot() != currentSlot {
t.Errorf("Head slot (%d) is not currentSlot (%d)", s.chain.HeadSlot(), currentSlot)
}
assert.Equal(t, len(expectedBlockSlots), len(mc.BlocksReceived), "Processes wrong number of blocks")
var receivedBlockSlots []uint64
for _, blk := range mc.BlocksReceived {
receivedBlockSlots = append(receivedBlockSlots, blk.Block.Slot)
}
missing := sliceutil.NotUint64(sliceutil.IntersectionUint64(expectedBlockSlots, receivedBlockSlots), expectedBlockSlots)
if len(missing) > 0 {
t.Errorf("Missing blocks at slots %v", missing)
}
score1 := scorer.Score(peer1)
score2 := scorer.Score(peer2)
score3 := scorer.Score(peer3)
assert.Equal(t, true, score1 < score3, "Incorrect score (%v) for peer: %v (must be lower than %v)", score1, peer1, score3)
assert.Equal(t, true, score2 < score3, "Incorrect score (%v) for peer: %v (must be lower than %v)", score2, peer2, score3)
assert.Equal(t, true, scorer.ProcessedBlocks(peer3) > 100, "Not enough blocks returned by healthy peer")
}

View File

@ -61,6 +61,7 @@ type Flags struct {
EnableFinalizedDepositsCache bool // EnableFinalizedDepositsCache enables utilization of cached finalized deposits.
EnableEth1DataMajorityVote bool // EnableEth1DataMajorityVote uses the Voting With The Majority algorithm to vote for eth1data.
EnableAttBroadcastDiscoveryAttempts bool // EnableAttBroadcastDiscoveryAttempts allows the p2p service to attempt to ensure a subnet peer is present before broadcasting an attestation.
EnablePeerScorer bool // EnablePeerScorer enables experimental peer scoring in p2p.
// DisableForkChoice disables using LMD-GHOST fork choice to update
// the head of the chain based on attestations and instead accepts any valid received block
@ -255,6 +256,10 @@ func ConfigureBeaconChain(ctx *cli.Context) {
if ctx.Bool(enableAttBroadcastDiscoveryAttempts.Name) {
cfg.EnableAttBroadcastDiscoveryAttempts = true
}
if ctx.Bool(enablePeerScorer.Name) {
log.Warn("Enabling peer scoring in P2P")
cfg.EnablePeerScorer = true
}
Init(cfg)
}

View File

@ -166,12 +166,17 @@ var (
Name: "enable-att-broadcast-discovery-attempts",
Usage: "Enable experimental attestation subnet discovery before broadcasting.",
}
enablePeerScorer = &cli.BoolFlag{
Name: "enable-peer-scorer",
Usage: "Enable experimental P2P peer scorer",
}
)
// devModeFlags holds list of flags that are set when development mode is on.
var devModeFlags = []cli.Flag{
batchBlockVerify,
enableAttBroadcastDiscoveryAttempts,
enablePeerScorer,
}
// Deprecated flags list.
@ -654,6 +659,7 @@ var BeaconChainFlags = append(deprecatedFlags, []cli.Flag{
enableFinalizedDepositsCache,
enableEth1DataMajorityVote,
enableAttBroadcastDiscoveryAttempts,
enablePeerScorer,
}...)
// E2EBeaconChainFlags contains a list of the beacon chain feature flags to be tested in E2E.