From 4de0b9fe69e293d31b07473311fa4ea5b5505386 Mon Sep 17 00:00:00 2001 From: Victor Farazdagi Date: Thu, 13 Aug 2020 20:33:57 +0300 Subject: [PATCH] Peer scoring: init sync (#6709) * refactors redundant part of varname * introduces score_block_providers * Merge branch 'master' into init-sync-peer-scoring-slow-peers * gazelle * adds comment * Merge branch 'master' into init-sync-peer-scoring-slow-peers * removes redundant checks * add block provider decay test * Merge branch 'master' into init-sync-peer-scoring-slow-peers * adds case * penalize inactive peers * adds scorebadresponses test * inroduces no-activity penalty * gazelle * gofmt * Merge branch 'master' into init-sync-peer-scoring-slow-peers * expanded tests * implement SortBlockProviders * change -> update * updates block fetcher peer filter * fixes test * allows to keep track of peer id * updates scoring coefficients * fixes test * block fetcher update * Merge branch 'master' into init-sync-peer-scoring-slow-peers * disables empty batch penalty * Merge branch 'master' into init-sync-peer-scoring-slow-peers * Merge branch 'master' into init-sync-peer-scoring-slow-peers * removes outdated code * update filterPeers * gazelle * updates var * revert changes to var name * updates blocks_fetcher * minor fix to import name * Merge branch 'master' into init-sync-peer-scoring-slow-peers * add max processed blocks cap * impoves scoring of stale peers * Merge branch 'master' into init-sync-peer-scoring-slow-peers * fixes test * adds weight sorting to scored peers * return pid when fetching batches * updates round robin * gazelle * Merge branch 'master' into init-sync-peer-scoring-slow-peers * updates block provider decay count * go tidy * cherry pick * fixes test * go tidy * Merge branch 'peer-scorer-weighted-sorter' into init-sync-peer-scoring-slow-peers * Merge branch 'master' into init-sync-peer-scoring-slow-peers * Merge branch 'master' into init-sync-peer-scoring-slow-peers * refactors blocks fetcher: repackage methods * Merge branch 'refactor-blocks-fetcher' into init-sync-peer-scoring-slow-peers * minor fixes * Merge branch 'master' into init-sync-peer-scoring-slow-peers * Merge branch 'master' into init-sync-peer-scoring-slow-peers * allow scores in range (0;1) in weighted filter * filterScoredPeers improve test suite * puts feature behind the flag * Merge branch 'master' into init-sync-peer-scoring-slow-peers * Merge refs/heads/master into init-sync-peer-scoring-slow-peers * fixes tests * Merge branch 'init-sync-peer-scoring-slow-peers' of github.com:prysmaticlabs/prysm into init-sync-peer-scoring-slow-peers * Merge refs/heads/master into init-sync-peer-scoring-slow-peers * Merge branch 'master' into init-sync-peer-scoring-slow-peers * Merge refs/heads/master into init-sync-peer-scoring-slow-peers * Update beacon-chain/sync/initial-sync/blocks_fetcher_test.go Co-authored-by: Shay Zluf * Nishant's suggestion on peer limit variable * better explanation of non-blocking peer scoring * Shay's sugession on peer naming * Merge refs/heads/master into init-sync-peer-scoring-slow-peers * Merge refs/heads/master into init-sync-peer-scoring-slow-peers * Update beacon-chain/sync/initial-sync/blocks_fetcher.go Co-authored-by: Nishant Das * Merge refs/heads/master into init-sync-peer-scoring-slow-peers * gofmt * Merge refs/heads/master into init-sync-peer-scoring-slow-peers * Merge refs/heads/master into init-sync-peer-scoring-slow-peers * Merge refs/heads/master into init-sync-peer-scoring-slow-peers --- beacon-chain/p2p/peers/BUILD.bazel | 1 + beacon-chain/p2p/peers/peers_test.go | 4 +- .../p2p/peers/score_block_providers.go | 4 + beacon-chain/sync/initial-sync/BUILD.bazel | 1 + .../sync/initial-sync/blocks_fetcher.go | 26 +- .../sync/initial-sync/blocks_fetcher_peers.go | 62 +++- .../sync/initial-sync/blocks_fetcher_test.go | 268 +++++++++++++++--- .../sync/initial-sync/blocks_fetcher_utils.go | 7 +- .../sync/initial-sync/initial_sync_test.go | 1 + beacon-chain/sync/initial-sync/round_robin.go | 9 + .../sync/initial-sync/round_robin_test.go | 82 ++++++ shared/featureconfig/config.go | 5 + shared/featureconfig/flags.go | 6 + 13 files changed, 426 insertions(+), 50 deletions(-) diff --git a/beacon-chain/p2p/peers/BUILD.bazel b/beacon-chain/p2p/peers/BUILD.bazel index 459cfbd41..3d7422796 100644 --- a/beacon-chain/p2p/peers/BUILD.bazel +++ b/beacon-chain/p2p/peers/BUILD.bazel @@ -16,6 +16,7 @@ go_library( "//beacon-chain/core/helpers:go_default_library", "//beacon-chain/flags:go_default_library", "//proto/beacon/p2p/v1:go_default_library", + "//shared/featureconfig:go_default_library", "//shared/rand:go_default_library", "//shared/roughtime:go_default_library", "@com_github_ethereum_go_ethereum//p2p/enr:go_default_library", diff --git a/beacon-chain/p2p/peers/peers_test.go b/beacon-chain/p2p/peers/peers_test.go index 2e765b993..7538350bb 100644 --- a/beacon-chain/p2p/peers/peers_test.go +++ b/beacon-chain/p2p/peers/peers_test.go @@ -16,7 +16,9 @@ func TestMain(m *testing.M) { logrus.SetLevel(logrus.DebugLevel) logrus.SetOutput(ioutil.Discard) - resetCfg := featureconfig.InitWithReset(&featureconfig.Flags{}) + resetCfg := featureconfig.InitWithReset(&featureconfig.Flags{ + EnablePeerScorer: true, + }) defer resetCfg() resetFlags := flags.Get() diff --git a/beacon-chain/p2p/peers/score_block_providers.go b/beacon-chain/p2p/peers/score_block_providers.go index 3129a101e..0329c283e 100644 --- a/beacon-chain/p2p/peers/score_block_providers.go +++ b/beacon-chain/p2p/peers/score_block_providers.go @@ -9,6 +9,7 @@ import ( "github.com/libp2p/go-libp2p-core/peer" "github.com/prysmaticlabs/prysm/beacon-chain/flags" + "github.com/prysmaticlabs/prysm/shared/featureconfig" "github.com/prysmaticlabs/prysm/shared/rand" "github.com/prysmaticlabs/prysm/shared/roughtime" ) @@ -281,6 +282,9 @@ func (s *BlockProviderScorer) mapScoresAndPeers( func (s *BlockProviderScorer) FormatScorePretty(pid peer.ID) string { s.store.RLock() defer s.store.RUnlock() + if !featureconfig.Get().EnablePeerScorer { + return "disabled" + } score := s.score(pid) return fmt.Sprintf("[%0.1f%%, raw: %0.2f, blocks: %d/%d]", (score/s.MaxScore())*100, score, s.processedBlocks(pid), s.config.ProcessedBlocksCap) diff --git a/beacon-chain/sync/initial-sync/BUILD.bazel b/beacon-chain/sync/initial-sync/BUILD.bazel index 721a3550f..bf85b947a 100644 --- a/beacon-chain/sync/initial-sync/BUILD.bazel +++ b/beacon-chain/sync/initial-sync/BUILD.bazel @@ -25,6 +25,7 @@ go_library( "//beacon-chain/db:go_default_library", "//beacon-chain/flags:go_default_library", "//beacon-chain/p2p:go_default_library", + "//beacon-chain/p2p/peers:go_default_library", "//beacon-chain/state/stateutil:go_default_library", "//beacon-chain/sync:go_default_library", "//proto/beacon/p2p/v1:go_default_library", diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher.go b/beacon-chain/sync/initial-sync/blocks_fetcher.go index 499771936..5e663bf77 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher.go @@ -19,6 +19,7 @@ import ( "github.com/prysmaticlabs/prysm/beacon-chain/p2p" prysmsync "github.com/prysmaticlabs/prysm/beacon-chain/sync" p2ppb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" + "github.com/prysmaticlabs/prysm/shared/featureconfig" "github.com/prysmaticlabs/prysm/shared/params" "github.com/prysmaticlabs/prysm/shared/rand" "github.com/sirupsen/logrus" @@ -39,6 +40,9 @@ const ( // nonSkippedSlotsFullSearchEpochs how many epochs to check in full, before resorting to random // sampling of slots once per epoch nonSkippedSlotsFullSearchEpochs = 10 + // peerFilterCapacityWeight defines how peer's capacity affects peer's score. Provided as + // percentage, i.e. 0.3 means capacity will determine 30% of peer's score. + peerFilterCapacityWeight = 0.2 ) var ( @@ -49,8 +53,9 @@ var ( // blocksFetcherConfig is a config to setup the block fetcher. type blocksFetcherConfig struct { - headFetcher blockchain.HeadFetcher - p2p p2p.P2P + headFetcher blockchain.HeadFetcher + p2p p2p.P2P + peerFilterCapacityWeight float64 } // blocksFetcher is a service to fetch chain data from peers. @@ -68,6 +73,7 @@ type blocksFetcher struct { peerLocks map[peer.ID]*peerLock fetchRequests chan *fetchRequestParams fetchResponses chan *fetchRequestResponse + capacityWeight float64 // how remaining capacity affects peer selection quit chan struct{} // termination notifier } @@ -102,6 +108,11 @@ func newBlocksFetcher(ctx context.Context, cfg *blocksFetcherConfig) *blocksFetc float64(blocksPerSecond), int64(allowedBlocksBurst-blocksPerSecond), false /* deleteEmptyBuckets */) + capacityWeight := cfg.peerFilterCapacityWeight + if capacityWeight >= 1 { + capacityWeight = peerFilterCapacityWeight + } + ctx, cancel := context.WithCancel(ctx) return &blocksFetcher{ ctx: ctx, @@ -114,6 +125,7 @@ func newBlocksFetcher(ctx context.Context, cfg *blocksFetcherConfig) *blocksFetc peerLocks: make(map[peer.ID]*peerLock), fetchRequests: make(chan *fetchRequestParams, maxPendingRequests), fetchResponses: make(chan *fetchRequestResponse, maxPendingRequests), + capacityWeight: capacityWeight, quit: make(chan struct{}), } } @@ -261,7 +273,11 @@ func (f *blocksFetcher) fetchBlocksFromPeer( var blocks []*eth.SignedBeaconBlock var err error - peers, err = f.filterPeers(peers, peersPercentagePerRequest) + if featureconfig.Get().EnablePeerScorer { + peers, err = f.filterScoredPeers(ctx, peers, peersPercentagePerRequest) + } else { + peers, err = f.filterPeers(peers, peersPercentagePerRequest) + } if err != nil { return blocks, "", err } @@ -272,6 +288,9 @@ func (f *blocksFetcher) fetchBlocksFromPeer( } for i := 0; i < len(peers); i++ { if blocks, err = f.requestBlocks(ctx, req, peers[i]); err == nil { + if featureconfig.Get().EnablePeerScorer { + f.p2p.Peers().Scorers().BlockProviderScorer().Touch(peers[i]) + } return blocks, peers[i], err } } @@ -298,6 +317,7 @@ func (f *blocksFetcher) requestBlocks( "count": req.Count, "step": req.Step, "capacity": f.rateLimiter.Remaining(pid.String()), + "score": f.p2p.Peers().Scorers().BlockProviderScorer().FormatScorePretty(pid), }).Debug("Requesting blocks") if f.rateLimiter.Remaining(pid.String()) < int64(req.Count) { log.WithField("peer", pid).Debug("Slowing down for rate limit") diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher_peers.go b/beacon-chain/sync/initial-sync/blocks_fetcher_peers.go index b39af3954..57b7332b4 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher_peers.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher_peers.go @@ -10,10 +10,12 @@ import ( "github.com/libp2p/go-libp2p-core/peer" "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" "github.com/prysmaticlabs/prysm/beacon-chain/flags" + scorers "github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers" "github.com/prysmaticlabs/prysm/shared/mathutil" "github.com/prysmaticlabs/prysm/shared/params" "github.com/prysmaticlabs/prysm/shared/roughtime" "github.com/sirupsen/logrus" + "go.opencensus.io/trace" ) // getPeerLock returns peer lock for a given peer. If lock is not found, it is created. @@ -83,7 +85,7 @@ func (f *blocksFetcher) waitForMinimumPeers(ctx context.Context) ([]peer.ID, err } // filterPeers returns transformed list of peers, -// weight ordered or randomized, constrained if necessary. +// weight ordered or randomized, constrained if necessary (only percentage of peers returned). func (f *blocksFetcher) filterPeers(peers []peer.ID, peersPercentage float64) ([]peer.ID, error) { if len(peers) == 0 { return peers, nil @@ -96,14 +98,7 @@ func (f *blocksFetcher) filterPeers(peers []peer.ID, peersPercentage float64) ([ }) // Select sub-sample from peers (honoring min-max invariants). - required := params.BeaconConfig().MaxPeersToSync - if flags.Get().MinimumSyncPeers < required { - required = flags.Get().MinimumSyncPeers - } - limit := uint64(math.Round(float64(len(peers)) * peersPercentage)) - limit = mathutil.Max(limit, uint64(required)) - limit = mathutil.Min(limit, uint64(len(peers))) - peers = peers[:limit] + peers = trimPeers(peers, peersPercentage) // Order peers by remaining capacity, effectively turning in-order // round robin peer processing into a weighted one (peers with higher @@ -118,3 +113,52 @@ func (f *blocksFetcher) filterPeers(peers []peer.ID, peersPercentage float64) ([ return peers, nil } + +// filterScoredPeers returns transformed list of peers, +// weight sorted by scores and capacity remaining. List can be constrained using peersPercentage, +// where only percentage of peers are returned. +func (f *blocksFetcher) filterScoredPeers(ctx context.Context, peers []peer.ID, peersPercentage float64) ([]peer.ID, error) { + ctx, span := trace.StartSpan(ctx, "initialsync.filterScoredPeers") + defer span.End() + + if len(peers) == 0 { + return peers, nil + } + + // Sort peers using both block provider score and, custom, capacity based score (see + // peerFilterCapacityWeight if you want to give different weights to provider's and capacity + // scores). + // Scores produced are used as weights, so peers are ordered probabilistically i.e. peer with + // a higher score has higher chance to end up higher in the list. + scorer := f.p2p.Peers().Scorers().BlockProviderScorer() + peers = scorer.WeightSorted(f.rand, peers, func(peerID peer.ID, blockProviderScore float64) float64 { + remaining, capacity := float64(f.rateLimiter.Remaining(peerID.String())), float64(f.rateLimiter.Capacity()) + // When capacity is close to exhaustion, allow less performant peer to take a chance. + // Otherwise, there's a good chance system will be forced to wait for rate limiter. + if remaining < float64(f.blocksPerSecond) { + return 0.0 + } + capScore := remaining / capacity + overallScore := blockProviderScore*(1.0-f.capacityWeight) + capScore*f.capacityWeight + return math.Round(overallScore*scorers.ScoreRoundingFactor) / scorers.ScoreRoundingFactor + }) + peers = trimPeers(peers, peersPercentage) + + return peers, nil +} + +// trimPeers limits peer list, returning only specified percentage of peers. +// Takes system constraints into account (min/max peers to sync). +func trimPeers(peers []peer.ID, peersPercentage float64) []peer.ID { + required := params.BeaconConfig().MaxPeersToSync + if flags.Get().MinimumSyncPeers < required { + required = flags.Get().MinimumSyncPeers + } + // Weak/slow peers will be pushed down the list and trimmed since only percentage of peers is selected. + limit := uint64(math.Round(float64(len(peers)) * peersPercentage)) + // Limit cannot be less that minimum peers required by sync mechanism. + limit = mathutil.Max(limit, uint64(required)) + // Limit cannot be higher than number of peers available (safe-guard). + limit = mathutil.Min(limit, uint64(len(peers))) + return peers[:limit] +} diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher_test.go b/beacon-chain/sync/initial-sync/blocks_fetcher_test.go index 91ad63b95..f776fc2f8 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher_test.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher_test.go @@ -3,6 +3,7 @@ package initialsync import ( "context" "fmt" + "math" "sort" "sync" "testing" @@ -18,6 +19,7 @@ import ( dbtest "github.com/prysmaticlabs/prysm/beacon-chain/db/testing" "github.com/prysmaticlabs/prysm/beacon-chain/flags" p2pm "github.com/prysmaticlabs/prysm/beacon-chain/p2p" + "github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers" p2pt "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing" stateTrie "github.com/prysmaticlabs/prysm/beacon-chain/state" p2ppb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" @@ -501,7 +503,7 @@ func TestBlocksFetcher_selectFailOverPeer(t *testing.T) { { name: "No peers provided", args: args{ - excludedPID: "abc", + excludedPID: "a", peers: []peer.ID{}, }, want: "", @@ -510,9 +512,9 @@ func TestBlocksFetcher_selectFailOverPeer(t *testing.T) { { name: "Single peer which needs to be excluded", args: args{ - excludedPID: "abc", + excludedPID: "a", peers: []peer.ID{ - "abc", + "a", }, }, want: "", @@ -521,7 +523,7 @@ func TestBlocksFetcher_selectFailOverPeer(t *testing.T) { { name: "Single peer available", args: args{ - excludedPID: "abc", + excludedPID: "a", peers: []peer.ID{ "cde", }, @@ -532,9 +534,9 @@ func TestBlocksFetcher_selectFailOverPeer(t *testing.T) { { name: "Two peers available, excluded first", args: args{ - excludedPID: "abc", + excludedPID: "a", peers: []peer.ID{ - "abc", "cde", + "a", "cde", }, }, want: "cde", @@ -543,9 +545,9 @@ func TestBlocksFetcher_selectFailOverPeer(t *testing.T) { { name: "Two peers available, excluded second", args: args{ - excludedPID: "abc", + excludedPID: "a", peers: []peer.ID{ - "cde", "abc", + "cde", "a", }, }, want: "cde", @@ -554,9 +556,9 @@ func TestBlocksFetcher_selectFailOverPeer(t *testing.T) { { name: "Multiple peers available", args: args{ - excludedPID: "abc", + excludedPID: "a", peers: []peer.ID{ - "abc", "cde", "cde", "cde", + "a", "cde", "cde", "cde", }, }, want: "cde", @@ -683,37 +685,37 @@ func TestBlocksFetcher_filterPeers(t *testing.T) { name: "single peer", args: args{ peers: []weightedPeer{ - {"abc", 10}, + {"a", 10}, }, peersPercentage: 1.0, }, - want: []peer.ID{"abc"}, + want: []peer.ID{"a"}, }, { name: "multiple peers same capacity", args: args{ peers: []weightedPeer{ - {"abc", 10}, - {"def", 10}, - {"xyz", 10}, + {"a", 10}, + {"b", 10}, + {"c", 10}, }, peersPercentage: 1.0, }, - want: []peer.ID{"abc", "def", "xyz"}, + want: []peer.ID{"a", "b", "c"}, }, { name: "multiple peers different capacity", args: args{ peers: []weightedPeer{ - {"abc", 20}, - {"def", 15}, - {"ghi", 10}, - {"jkl", 90}, - {"xyz", 20}, + {"a", 20}, + {"b", 15}, + {"c", 10}, + {"d", 90}, + {"e", 20}, }, peersPercentage: 1.0, }, - want: []peer.ID{"ghi", "def", "abc", "xyz", "jkl"}, + want: []peer.ID{"c", "b", "a", "e", "d"}, }, } for _, tt := range tests { @@ -743,6 +745,200 @@ func TestBlocksFetcher_filterPeers(t *testing.T) { } } +func TestBlocksFetcher_filterScoredPeers(t *testing.T) { + type weightedPeer struct { + peer.ID + usedCapacity int64 + } + type args struct { + peers []weightedPeer + peersPercentage float64 + capacityWeight float64 + } + + batchSize := uint64(flags.Get().BlockBatchLimit) + tests := []struct { + name string + args args + update func(s *peers.BlockProviderScorer) + want []peer.ID + }{ + { + name: "no peers available", + args: args{ + peers: []weightedPeer{}, + peersPercentage: 1.0, + capacityWeight: 0.2, + }, + want: []peer.ID{}, + }, + { + name: "single peer", + args: args{ + peers: []weightedPeer{ + {"a", 1200}, + }, + peersPercentage: 1.0, + capacityWeight: 0.2, + }, + want: []peer.ID{"a"}, + }, + { + name: "multiple peers same capacity", + args: args{ + peers: []weightedPeer{ + {"a", 2400}, + {"b", 2400}, + {"c", 2400}, + }, + peersPercentage: 1.0, + capacityWeight: 0.2, + }, + want: []peer.ID{"a", "b", "c"}, + }, + { + name: "multiple peers capacity as tie-breaker", + args: args{ + peers: []weightedPeer{ + {"a", 6000}, + {"b", 3000}, + {"c", 0}, + {"d", 9000}, + {"e", 6000}, + }, + peersPercentage: 1.0, + capacityWeight: 0.2, + }, + update: func(s *peers.BlockProviderScorer) { + s.IncrementProcessedBlocks("a", batchSize*2) + s.IncrementProcessedBlocks("b", batchSize*2) + s.IncrementProcessedBlocks("c", batchSize*2) + s.IncrementProcessedBlocks("d", batchSize*2) + s.IncrementProcessedBlocks("e", batchSize*2) + }, + want: []peer.ID{"c", "b", "a", "e", "d"}, + }, + { + name: "multiple peers same capacity different scores", + args: args{ + peers: []weightedPeer{ + {"a", 9000}, + {"b", 9000}, + {"c", 9000}, + {"d", 9000}, + {"e", 9000}, + }, + peersPercentage: 0.8, + capacityWeight: 0.2, + }, + update: func(s *peers.BlockProviderScorer) { + s.IncrementProcessedBlocks("e", s.Params().ProcessedBlocksCap) + s.IncrementProcessedBlocks("b", s.Params().ProcessedBlocksCap/2) + s.IncrementProcessedBlocks("c", s.Params().ProcessedBlocksCap/4) + s.IncrementProcessedBlocks("a", s.Params().ProcessedBlocksCap/8) + s.IncrementProcessedBlocks("d", 0) + }, + want: []peer.ID{"e", "b", "c", "a"}, + }, + { + name: "multiple peers different capacities and scores", + args: args{ + peers: []weightedPeer{ + {"a", 6500}, + {"b", 2500}, + {"c", 1000}, + {"d", 9000}, + {"e", 6500}, + }, + peersPercentage: 0.8, + capacityWeight: 0.2, + }, + update: func(s *peers.BlockProviderScorer) { + // Make sure that score takes priority over capacity. + s.IncrementProcessedBlocks("c", batchSize*5) + s.IncrementProcessedBlocks("b", batchSize*15) + // Break tie using capacity as a tie-breaker (a and ghi have the same score). + s.IncrementProcessedBlocks("a", batchSize*3) + s.IncrementProcessedBlocks("e", batchSize*3) + // Exclude peer (peers percentage is 80%). + s.IncrementProcessedBlocks("d", batchSize) + }, + want: []peer.ID{"b", "c", "a", "e"}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mc, p2p, _ := initializeTestServices(t, []uint64{}, []*peerData{}) + fetcher := newBlocksFetcher(context.Background(), &blocksFetcherConfig{ + headFetcher: mc, + p2p: p2p, + peerFilterCapacityWeight: tt.args.capacityWeight, + }) + // Non-leaking bucket, with initial capacity of 10000. + fetcher.rateLimiter = leakybucket.NewCollector(0.000001, 10000, false) + peerIDs := make([]peer.ID, 0) + for _, pid := range tt.args.peers { + peerIDs = append(peerIDs, pid.ID) + fetcher.rateLimiter.Add(pid.ID.String(), pid.usedCapacity) + } + if tt.update != nil { + tt.update(fetcher.p2p.Peers().Scorers().BlockProviderScorer()) + } + // Since peer selection is probabilistic (weighted, with high scorers having higher + // chance of being selected), we need multiple rounds of filtering to test the order: + // over multiple attempts, top scorers should be picked on high positions more often. + peerStats := make(map[peer.ID]int, len(tt.want)) + var filteredPIDs []peer.ID + var err error + for i := 0; i < 1000; i++ { + filteredPIDs, err = fetcher.filterScoredPeers(context.Background(), peerIDs, tt.args.peersPercentage) + if len(filteredPIDs) <= 1 { + break + } + require.NoError(t, err) + for j, pid := range filteredPIDs { + // The higher peer in the list, the more "points" will it get. + peerStats[pid] += len(tt.want) - j + } + } + + // If percentage of peers was requested, rebuild combined filtered peers list. + if len(filteredPIDs) != len(peerStats) && len(peerStats) > 0 { + filteredPIDs = []peer.ID{} + for pid := range peerStats { + filteredPIDs = append(filteredPIDs, pid) + } + } + + // Sort by frequency of appearance in high positions on filtering. + sort.Slice(filteredPIDs, func(i, j int) bool { + return peerStats[filteredPIDs[i]] > peerStats[filteredPIDs[j]] + }) + if tt.args.peersPercentage < 1.0 { + limit := uint64(math.Round(float64(len(filteredPIDs)) * tt.args.peersPercentage)) + filteredPIDs = filteredPIDs[:limit] + } + + // Re-arrange peers with the same remaining capacity, deterministically . + // They are deliberately shuffled - so that on the same capacity any of + // such peers can be selected. That's why they are sorted here. + sort.SliceStable(filteredPIDs, func(i, j int) bool { + score1 := fetcher.p2p.Peers().Scorers().BlockProviderScorer().Score(filteredPIDs[i]) + score2 := fetcher.p2p.Peers().Scorers().BlockProviderScorer().Score(filteredPIDs[j]) + if score1 == score2 { + cap1 := fetcher.rateLimiter.Remaining(filteredPIDs[i].String()) + cap2 := fetcher.rateLimiter.Remaining(filteredPIDs[j].String()) + if cap1 == cap2 { + return filteredPIDs[i].String() < filteredPIDs[j].String() + } + } + return i < j + }) + assert.DeepEqual(t, tt.want, filteredPIDs) + }) + } +} + func TestBlocksFetcher_RequestBlocksRateLimitingLocks(t *testing.T) { p1 := p2pt.NewTestP2P(t) p2 := p2pt.NewTestP2P(t) @@ -831,29 +1027,29 @@ func TestBlocksFetcher_removeStalePeerLocks(t *testing.T) { age: peerLockMaxAge, peersIn: []peerData{ { - peerID: "abc", + peerID: "a", accessed: roughtime.Now(), }, { - peerID: "def", + peerID: "b", accessed: roughtime.Now(), }, { - peerID: "ghi", + peerID: "c", accessed: roughtime.Now(), }, }, peersOut: []peerData{ { - peerID: "abc", + peerID: "a", accessed: roughtime.Now(), }, { - peerID: "def", + peerID: "b", accessed: roughtime.Now(), }, { - peerID: "ghi", + peerID: "c", accessed: roughtime.Now(), }, }, @@ -863,25 +1059,25 @@ func TestBlocksFetcher_removeStalePeerLocks(t *testing.T) { age: peerLockMaxAge, peersIn: []peerData{ { - peerID: "abc", + peerID: "a", accessed: roughtime.Now(), }, { - peerID: "def", + peerID: "b", accessed: roughtime.Now().Add(-peerLockMaxAge), }, { - peerID: "ghi", + peerID: "c", accessed: roughtime.Now(), }, }, peersOut: []peerData{ { - peerID: "abc", + peerID: "a", accessed: roughtime.Now(), }, { - peerID: "ghi", + peerID: "c", accessed: roughtime.Now(), }, }, @@ -891,15 +1087,15 @@ func TestBlocksFetcher_removeStalePeerLocks(t *testing.T) { age: peerLockMaxAge, peersIn: []peerData{ { - peerID: "abc", + peerID: "a", accessed: roughtime.Now().Add(-peerLockMaxAge), }, { - peerID: "def", + peerID: "b", accessed: roughtime.Now().Add(-peerLockMaxAge), }, { - peerID: "ghi", + peerID: "c", accessed: roughtime.Now().Add(-peerLockMaxAge), }, }, diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher_utils.go b/beacon-chain/sync/initial-sync/blocks_fetcher_utils.go index 0bd0bf4e2..8ee6c758e 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher_utils.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher_utils.go @@ -7,6 +7,7 @@ import ( "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" p2ppb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" + "github.com/prysmaticlabs/prysm/shared/featureconfig" "github.com/prysmaticlabs/prysm/shared/params" "github.com/sirupsen/logrus" "go.opencensus.io/trace" @@ -33,7 +34,11 @@ func (f *blocksFetcher) nonSkippedSlotAfter(ctx context.Context, slot uint64) (u return 0, errSlotIsTooHigh } var err error - peers, err = f.filterPeers(peers, peersPercentagePerRequest) + if featureconfig.Get().EnablePeerScorer { + peers, err = f.filterScoredPeers(ctx, peers, peersPercentagePerRequest) + } else { + peers, err = f.filterPeers(peers, peersPercentagePerRequest) + } if err != nil { return 0, err } diff --git a/beacon-chain/sync/initial-sync/initial_sync_test.go b/beacon-chain/sync/initial-sync/initial_sync_test.go index 0345c9d6f..ca74de726 100644 --- a/beacon-chain/sync/initial-sync/initial_sync_test.go +++ b/beacon-chain/sync/initial-sync/initial_sync_test.go @@ -57,6 +57,7 @@ func TestMain(m *testing.M) { resetCfg := featureconfig.InitWithReset(&featureconfig.Flags{ NewStateMgmt: true, BatchBlockVerify: true, + EnablePeerScorer: true, }) defer resetCfg() diff --git a/beacon-chain/sync/initial-sync/round_robin.go b/beacon-chain/sync/initial-sync/round_robin.go index 37c8e3ab7..8e7f5d0b5 100644 --- a/beacon-chain/sync/initial-sync/round_robin.go +++ b/beacon-chain/sync/initial-sync/round_robin.go @@ -128,6 +128,15 @@ func (s *Service) roundRobinSync(genesis time.Time) error { // processFetchedData processes data received from queue. func (s *Service) processFetchedData( ctx context.Context, genesis time.Time, startSlot uint64, data *blocksQueueFetchedData) { + defer func() { + if !featureconfig.Get().EnablePeerScorer || data.pid == "" { + return + } + scorer := s.p2p.Peers().Scorers().BlockProviderScorer() + if diff := s.chain.HeadSlot() - startSlot; diff > 0 { + scorer.IncrementProcessedBlocks(data.pid, diff) + } + }() blockReceiver := s.chain.ReceiveBlockInitialSync batchReceiver := s.chain.ReceiveBlockBatch diff --git a/beacon-chain/sync/initial-sync/round_robin_test.go b/beacon-chain/sync/initial-sync/round_robin_test.go index 920af54d1..261a7b397 100644 --- a/beacon-chain/sync/initial-sync/round_robin_test.go +++ b/beacon-chain/sync/initial-sync/round_robin_test.go @@ -476,3 +476,85 @@ func TestService_processBlockBatch(t *testing.T) { assert.Equal(t, uint64(19), s.chain.HeadSlot(), "Unexpected head slot") }) } + +func TestService_blockProviderScoring(t *testing.T) { + cache.initializeRootCache(makeSequence(1, 320), t) + + p := p2pt.NewTestP2P(t) + beaconDB, _ := dbtest.SetupDB(t) + + peerData := []*peerData{ + { + // The slowest peer, only a single block in couple of epochs. + blocks: []uint64{1, 65, 129}, + finalizedEpoch: 5, + headSlot: 160, + }, + { + // A relatively slow peer, still should perform better than the slowest peer. + blocks: append([]uint64{1, 2, 3, 4, 65, 66, 67, 68, 129, 130}, makeSequence(131, 160)...), + finalizedEpoch: 5, + headSlot: 160, + }, + { + // This peer has all blocks - should be a preferred one. + blocks: makeSequence(1, 160), + finalizedEpoch: 5, + headSlot: 160, + }, + } + + peer1 := connectPeer(t, p, peerData[0], p.Peers()) + peer2 := connectPeer(t, p, peerData[1], p.Peers()) + peer3 := connectPeer(t, p, peerData[2], p.Peers()) + + cache.RLock() + genesisRoot := cache.rootCache[0] + cache.RUnlock() + + err := beaconDB.SaveBlock(context.Background(), ð.SignedBeaconBlock{Block: ð.BeaconBlock{Slot: 0}}) + require.NoError(t, err) + + st, err := stateTrie.InitializeFromProto(&p2ppb.BeaconState{}) + require.NoError(t, err) + mc := &mock.ChainService{ + State: st, + Root: genesisRoot[:], + DB: beaconDB, + } // no-op mock + s := &Service{ + chain: mc, + p2p: p, + db: beaconDB, + synced: false, + chainStarted: true, + } + scorer := s.p2p.Peers().Scorers().BlockProviderScorer() + expectedBlockSlots := makeSequence(1, 160) + currentSlot := uint64(160) + + assert.Equal(t, scorer.MaxScore(), scorer.Score(peer1)) + assert.Equal(t, scorer.MaxScore(), scorer.Score(peer2)) + assert.Equal(t, scorer.MaxScore(), scorer.Score(peer3)) + + assert.NoError(t, s.roundRobinSync(makeGenesisTime(currentSlot))) + if s.chain.HeadSlot() != currentSlot { + t.Errorf("Head slot (%d) is not currentSlot (%d)", s.chain.HeadSlot(), currentSlot) + } + assert.Equal(t, len(expectedBlockSlots), len(mc.BlocksReceived), "Processes wrong number of blocks") + var receivedBlockSlots []uint64 + for _, blk := range mc.BlocksReceived { + receivedBlockSlots = append(receivedBlockSlots, blk.Block.Slot) + } + missing := sliceutil.NotUint64(sliceutil.IntersectionUint64(expectedBlockSlots, receivedBlockSlots), expectedBlockSlots) + if len(missing) > 0 { + t.Errorf("Missing blocks at slots %v", missing) + } + + score1 := scorer.Score(peer1) + score2 := scorer.Score(peer2) + score3 := scorer.Score(peer3) + assert.Equal(t, true, score1 < score3, "Incorrect score (%v) for peer: %v (must be lower than %v)", score1, peer1, score3) + assert.Equal(t, true, score2 < score3, "Incorrect score (%v) for peer: %v (must be lower than %v)", score2, peer2, score3) + assert.Equal(t, true, scorer.ProcessedBlocks(peer3) > 100, "Not enough blocks returned by healthy peer") +} diff --git a/shared/featureconfig/config.go b/shared/featureconfig/config.go index b26b61061..bbf619af3 100644 --- a/shared/featureconfig/config.go +++ b/shared/featureconfig/config.go @@ -61,6 +61,7 @@ type Flags struct { EnableFinalizedDepositsCache bool // EnableFinalizedDepositsCache enables utilization of cached finalized deposits. EnableEth1DataMajorityVote bool // EnableEth1DataMajorityVote uses the Voting With The Majority algorithm to vote for eth1data. EnableAttBroadcastDiscoveryAttempts bool // EnableAttBroadcastDiscoveryAttempts allows the p2p service to attempt to ensure a subnet peer is present before broadcasting an attestation. + EnablePeerScorer bool // EnablePeerScorer enables experimental peer scoring in p2p. // DisableForkChoice disables using LMD-GHOST fork choice to update // the head of the chain based on attestations and instead accepts any valid received block @@ -255,6 +256,10 @@ func ConfigureBeaconChain(ctx *cli.Context) { if ctx.Bool(enableAttBroadcastDiscoveryAttempts.Name) { cfg.EnableAttBroadcastDiscoveryAttempts = true } + if ctx.Bool(enablePeerScorer.Name) { + log.Warn("Enabling peer scoring in P2P") + cfg.EnablePeerScorer = true + } Init(cfg) } diff --git a/shared/featureconfig/flags.go b/shared/featureconfig/flags.go index 08513b3eb..f9251f816 100644 --- a/shared/featureconfig/flags.go +++ b/shared/featureconfig/flags.go @@ -166,12 +166,17 @@ var ( Name: "enable-att-broadcast-discovery-attempts", Usage: "Enable experimental attestation subnet discovery before broadcasting.", } + enablePeerScorer = &cli.BoolFlag{ + Name: "enable-peer-scorer", + Usage: "Enable experimental P2P peer scorer", + } ) // devModeFlags holds list of flags that are set when development mode is on. var devModeFlags = []cli.Flag{ batchBlockVerify, enableAttBroadcastDiscoveryAttempts, + enablePeerScorer, } // Deprecated flags list. @@ -654,6 +659,7 @@ var BeaconChainFlags = append(deprecatedFlags, []cli.Flag{ enableFinalizedDepositsCache, enableEth1DataMajorityVote, enableAttBroadcastDiscoveryAttempts, + enablePeerScorer, }...) // E2EBeaconChainFlags contains a list of the beacon chain feature flags to be tested in E2E.