mirror of
https://gitlab.com/pulsechaincom/prysm-pulse.git
synced 2024-12-22 03:30:35 +00:00
Maximize Peer Capacity When Syncing (#13820)
* maximize it * fix it * lint * add test * Update beacon-chain/sync/initial-sync/blocks_fetcher.go Co-authored-by: Radosław Kapka <rkapka@wp.pl> * logs * kasey's review * kasey's review --------- Co-authored-by: Radosław Kapka <rkapka@wp.pl>
This commit is contained in:
parent
f3b49d4eaf
commit
65b90abdda
@ -312,7 +312,7 @@ func (f *blocksFetcher) handleRequest(ctx context.Context, start primitives.Slot
|
||||
|
||||
response.bwb, response.pid, response.err = f.fetchBlocksFromPeer(ctx, start, count, peers)
|
||||
if response.err == nil {
|
||||
bwb, err := f.fetchBlobsFromPeer(ctx, response.bwb, response.pid)
|
||||
bwb, err := f.fetchBlobsFromPeer(ctx, response.bwb, response.pid, peers)
|
||||
if err != nil {
|
||||
response.err = err
|
||||
}
|
||||
@ -336,6 +336,11 @@ func (f *blocksFetcher) fetchBlocksFromPeer(
|
||||
Count: count,
|
||||
Step: 1,
|
||||
}
|
||||
bestPeers := f.hasSufficientBandwidth(peers, req.Count)
|
||||
// We append the best peers to the front so that higher capacity
|
||||
// peers are dialed first.
|
||||
peers = append(bestPeers, peers...)
|
||||
peers = dedupPeers(peers)
|
||||
for i := 0; i < len(peers); i++ {
|
||||
p := peers[i]
|
||||
blocks, err := f.requestBlocks(ctx, req, p)
|
||||
@ -472,7 +477,7 @@ func missingCommitError(root [32]byte, slot primitives.Slot, missing [][]byte) e
|
||||
}
|
||||
|
||||
// fetchBlobsFromPeer fetches blocks from a single randomly selected peer.
|
||||
func (f *blocksFetcher) fetchBlobsFromPeer(ctx context.Context, bwb []blocks2.BlockWithROBlobs, pid peer.ID) ([]blocks2.BlockWithROBlobs, error) {
|
||||
func (f *blocksFetcher) fetchBlobsFromPeer(ctx context.Context, bwb []blocks2.BlockWithROBlobs, pid peer.ID, peers []peer.ID) ([]blocks2.BlockWithROBlobs, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "initialsync.fetchBlobsFromPeer")
|
||||
defer span.End()
|
||||
if slots.ToEpoch(f.clock.CurrentSlot()) < params.BeaconConfig().DenebForkEpoch {
|
||||
@ -487,13 +492,30 @@ func (f *blocksFetcher) fetchBlobsFromPeer(ctx context.Context, bwb []blocks2.Bl
|
||||
if req == nil {
|
||||
return bwb, nil
|
||||
}
|
||||
// Request blobs from the same peer that gave us the blob batch.
|
||||
blobs, err := f.requestBlobs(ctx, req, pid)
|
||||
peers = f.filterPeers(ctx, peers, peersPercentagePerRequest)
|
||||
// We dial the initial peer first to ensure that we get the desired set of blobs.
|
||||
wantedPeers := append([]peer.ID{pid}, peers...)
|
||||
bestPeers := f.hasSufficientBandwidth(wantedPeers, req.Count)
|
||||
// We append the best peers to the front so that higher capacity
|
||||
// peers are dialed first. If all of them fail, we fallback to the
|
||||
// initial peer we wanted to request blobs from.
|
||||
peers = append(bestPeers, pid)
|
||||
for i := 0; i < len(peers); i++ {
|
||||
p := peers[i]
|
||||
blobs, err := f.requestBlobs(ctx, req, p)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not request blobs by range")
|
||||
log.WithField("peer", p).WithError(err).Debug("Could not request blobs by range from peer")
|
||||
continue
|
||||
}
|
||||
f.p2p.Peers().Scorers().BlockProviderScorer().Touch(pid)
|
||||
return verifyAndPopulateBlobs(bwb, blobs, blobWindowStart)
|
||||
f.p2p.Peers().Scorers().BlockProviderScorer().Touch(p)
|
||||
robs, err := verifyAndPopulateBlobs(bwb, blobs, blobWindowStart)
|
||||
if err != nil {
|
||||
log.WithField("peer", p).WithError(err).Debug("Invalid BeaconBlobsByRange response")
|
||||
continue
|
||||
}
|
||||
return robs, err
|
||||
}
|
||||
return nil, errNoPeersAvailable
|
||||
}
|
||||
|
||||
// requestBlocks is a wrapper for handling BeaconBlocksByRangeRequest requests/streams.
|
||||
@ -606,6 +628,18 @@ func (f *blocksFetcher) waitForBandwidth(pid peer.ID, count uint64) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *blocksFetcher) hasSufficientBandwidth(peers []peer.ID, count uint64) []peer.ID {
|
||||
filteredPeers := []peer.ID{}
|
||||
for _, p := range peers {
|
||||
if uint64(f.rateLimiter.Remaining(p.String())) < count {
|
||||
continue
|
||||
}
|
||||
copiedP := p
|
||||
filteredPeers = append(filteredPeers, copiedP)
|
||||
}
|
||||
return filteredPeers
|
||||
}
|
||||
|
||||
// Determine how long it will take for us to have the required number of blocks allowed by our rate limiter.
|
||||
// We do this by calculating the duration till the rate limiter can request these blocks without exceeding
|
||||
// the provided bandwidth limits per peer.
|
||||
@ -626,3 +660,18 @@ func timeToWait(wanted, rem, capacity int64, timeTillEmpty time.Duration) time.D
|
||||
expectedTime := int64(timeTillEmpty) * blocksNeeded / currentNumBlks
|
||||
return time.Duration(expectedTime)
|
||||
}
|
||||
|
||||
// deduplicates the provided peer list.
|
||||
func dedupPeers(peers []peer.ID) []peer.ID {
|
||||
newPeerList := make([]peer.ID, 0, len(peers))
|
||||
peerExists := make(map[peer.ID]bool)
|
||||
|
||||
for i := range peers {
|
||||
if peerExists[peers[i]] {
|
||||
continue
|
||||
}
|
||||
newPeerList = append(newPeerList, peers[i])
|
||||
peerExists[peers[i]] = true
|
||||
}
|
||||
return newPeerList
|
||||
}
|
||||
|
@ -12,6 +12,7 @@ import (
|
||||
|
||||
libp2pcore "github.com/libp2p/go-libp2p/core"
|
||||
"github.com/libp2p/go-libp2p/core/network"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
mock "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain/testing"
|
||||
dbtest "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/testing"
|
||||
p2pm "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p"
|
||||
@ -1166,3 +1167,22 @@ func TestBatchLimit(t *testing.T) {
|
||||
|
||||
assert.Equal(t, params.BeaconConfig().MaxRequestBlocksDeneb, uint64(maxBatchLimit()))
|
||||
}
|
||||
|
||||
func TestBlockFetcher_HasSufficientBandwidth(t *testing.T) {
|
||||
bf := newBlocksFetcher(context.Background(), &blocksFetcherConfig{})
|
||||
currCap := bf.rateLimiter.Capacity()
|
||||
wantedAmt := currCap - 100
|
||||
bf.rateLimiter.Add(peer.ID("a").String(), wantedAmt)
|
||||
bf.rateLimiter.Add(peer.ID("c").String(), wantedAmt)
|
||||
bf.rateLimiter.Add(peer.ID("f").String(), wantedAmt)
|
||||
bf.rateLimiter.Add(peer.ID("d").String(), wantedAmt)
|
||||
|
||||
receivedPeers := bf.hasSufficientBandwidth([]peer.ID{"a", "b", "c", "d", "e", "f"}, 110)
|
||||
for _, p := range receivedPeers {
|
||||
switch p {
|
||||
case "a", "c", "f", "d":
|
||||
t.Errorf("peer has exceeded capacity: %s", p)
|
||||
}
|
||||
}
|
||||
assert.Equal(t, 2, len(receivedPeers))
|
||||
}
|
||||
|
@ -280,7 +280,7 @@ func (f *blocksFetcher) findForkWithPeer(ctx context.Context, pid peer.ID, slot
|
||||
}
|
||||
// We need to fetch the blobs for the given alt-chain if any exist, so that we can try to verify and import
|
||||
// the blocks.
|
||||
bwb, err := f.fetchBlobsFromPeer(ctx, altBlocks, pid)
|
||||
bwb, err := f.fetchBlobsFromPeer(ctx, altBlocks, pid, []peer.ID{pid})
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "unable to retrieve blobs for blocks found in findForkWithPeer")
|
||||
}
|
||||
@ -302,7 +302,7 @@ func (f *blocksFetcher) findAncestor(ctx context.Context, pid peer.ID, b interfa
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "received invalid blocks in findAncestor")
|
||||
}
|
||||
bwb, err = f.fetchBlobsFromPeer(ctx, bwb, pid)
|
||||
bwb, err = f.fetchBlobsFromPeer(ctx, bwb, pid, []peer.ID{pid})
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "unable to retrieve blobs for blocks found in findAncestor")
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user