mirror of
https://gitlab.com/pulsechaincom/prysm-pulse.git
synced 2025-01-03 16:37:39 +00:00
Remove redundant error return from filterPeers() (#7730)
This commit is contained in:
parent
7acd73e1fe
commit
c5a8363998
@ -276,25 +276,21 @@ func (f *blocksFetcher) fetchBlocksFromPeer(
|
||||
ctx, span := trace.StartSpan(ctx, "initialsync.fetchBlocksFromPeer")
|
||||
defer span.End()
|
||||
|
||||
var blocks []*eth.SignedBeaconBlock
|
||||
peers, err := f.filterPeers(ctx, peers, peersPercentagePerRequest)
|
||||
if err != nil {
|
||||
return blocks, "", err
|
||||
}
|
||||
peers = f.filterPeers(ctx, peers, peersPercentagePerRequest)
|
||||
req := &p2ppb.BeaconBlocksByRangeRequest{
|
||||
StartSlot: start,
|
||||
Count: count,
|
||||
Step: 1,
|
||||
}
|
||||
for i := 0; i < len(peers); i++ {
|
||||
if blocks, err = f.requestBlocks(ctx, req, peers[i]); err == nil {
|
||||
if blocks, err := f.requestBlocks(ctx, req, peers[i]); err == nil {
|
||||
if featureconfig.Get().EnablePeerScorer {
|
||||
f.p2p.Peers().Scorers().BlockProviderScorer().Touch(peers[i])
|
||||
}
|
||||
return blocks, peers[i], err
|
||||
}
|
||||
}
|
||||
return blocks, "", errNoPeersAvailable
|
||||
return nil, "", errNoPeersAvailable
|
||||
}
|
||||
|
||||
// requestBlocks is a wrapper for handling BeaconBlocksByRangeRequest requests/streams.
|
||||
|
@ -94,13 +94,13 @@ func (f *blocksFetcher) waitForMinimumPeers(ctx context.Context) ([]peer.ID, err
|
||||
// filterPeers returns transformed list of peers, weight ordered or randomized, constrained
|
||||
// if necessary (when only percentage of peers returned).
|
||||
// When peer scorer is enabled, fallbacks filterScoredPeers.
|
||||
func (f *blocksFetcher) filterPeers(ctx context.Context, peers []peer.ID, peersPercentage float64) ([]peer.ID, error) {
|
||||
func (f *blocksFetcher) filterPeers(ctx context.Context, peers []peer.ID, peersPercentage float64) []peer.ID {
|
||||
if featureconfig.Get().EnablePeerScorer {
|
||||
return f.filterScoredPeers(ctx, peers, peersPercentagePerRequest)
|
||||
}
|
||||
|
||||
if len(peers) == 0 {
|
||||
return peers, nil
|
||||
return peers
|
||||
}
|
||||
|
||||
// Shuffle peers to prevent a bad peer from
|
||||
@ -123,17 +123,17 @@ func (f *blocksFetcher) filterPeers(ctx context.Context, peers []peer.ID, peersP
|
||||
return cap1 > cap2
|
||||
})
|
||||
|
||||
return peers, nil
|
||||
return peers
|
||||
}
|
||||
|
||||
// filterScoredPeers returns transformed list of peers, weight sorted by scores and capacity remaining.
|
||||
// List can be further constrained using peersPercentage, where only percentage of peers are returned.
|
||||
func (f *blocksFetcher) filterScoredPeers(ctx context.Context, peers []peer.ID, peersPercentage float64) ([]peer.ID, error) {
|
||||
func (f *blocksFetcher) filterScoredPeers(ctx context.Context, peers []peer.ID, peersPercentage float64) []peer.ID {
|
||||
ctx, span := trace.StartSpan(ctx, "initialsync.filterScoredPeers")
|
||||
defer span.End()
|
||||
|
||||
if len(peers) == 0 {
|
||||
return peers, nil
|
||||
return peers
|
||||
}
|
||||
|
||||
// Sort peers using both block provider score and, custom, capacity based score (see
|
||||
@ -153,9 +153,8 @@ func (f *blocksFetcher) filterScoredPeers(ctx context.Context, peers []peer.ID,
|
||||
overallScore := blockProviderScore*(1.0-f.capacityWeight) + capScore*f.capacityWeight
|
||||
return math.Round(overallScore*scorers.ScoreRoundingFactor) / scorers.ScoreRoundingFactor
|
||||
})
|
||||
peers = trimPeers(peers, peersPercentage)
|
||||
|
||||
return peers, nil
|
||||
return trimPeers(peers, peersPercentage)
|
||||
}
|
||||
|
||||
// trimPeers limits peer list, returning only specified percentage of peers.
|
||||
|
@ -181,12 +181,11 @@ func TestBlocksFetcher_filterPeers(t *testing.T) {
|
||||
pids = append(pids, pid.ID)
|
||||
fetcher.rateLimiter.Add(pid.ID.String(), pid.usedCapacity)
|
||||
}
|
||||
got, err := fetcher.filterPeers(context.Background(), pids, tt.args.peersPercentage)
|
||||
require.NoError(t, err)
|
||||
pids = fetcher.filterPeers(context.Background(), pids, tt.args.peersPercentage)
|
||||
// Re-arrange peers with the same remaining capacity, deterministically .
|
||||
// They are deliberately shuffled - so that on the same capacity any of
|
||||
// such peers can be selected. That's why they are sorted here.
|
||||
sort.SliceStable(got, func(i, j int) bool {
|
||||
sort.SliceStable(pids, func(i, j int) bool {
|
||||
cap1 := fetcher.rateLimiter.Remaining(pids[i].String())
|
||||
cap2 := fetcher.rateLimiter.Remaining(pids[j].String())
|
||||
if cap1 == cap2 {
|
||||
@ -194,7 +193,7 @@ func TestBlocksFetcher_filterPeers(t *testing.T) {
|
||||
}
|
||||
return i < j
|
||||
})
|
||||
assert.DeepEqual(t, tt.want, got)
|
||||
assert.DeepEqual(t, tt.want, pids)
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -345,7 +344,7 @@ func TestBlocksFetcher_filterScoredPeers(t *testing.T) {
|
||||
var filteredPIDs []peer.ID
|
||||
var err error
|
||||
for i := 0; i < 1000; i++ {
|
||||
filteredPIDs, err = fetcher.filterPeers(context.Background(), peerIDs, tt.args.peersPercentage)
|
||||
filteredPIDs = fetcher.filterPeers(context.Background(), peerIDs, tt.args.peersPercentage)
|
||||
if len(filteredPIDs) <= 1 {
|
||||
break
|
||||
}
|
||||
|
@ -35,10 +35,7 @@ func (f *blocksFetcher) nonSkippedSlotAfter(ctx context.Context, slot uint64) (u
|
||||
}
|
||||
|
||||
// Transform peer list to avoid eclipsing (filter, shuffle, trim).
|
||||
peers, err := f.filterPeers(ctx, peers, peersPercentagePerRequest)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
peers = f.filterPeers(ctx, peers, peersPercentagePerRequest)
|
||||
if len(peers) == 0 {
|
||||
return 0, errNoPeersAvailable
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user