mirror of
https://gitlab.com/pulsechaincom/prysm-pulse.git
synced 2024-12-25 21:07:18 +00:00
Use a client side rate limit to reduce chance of getting banned (#4637)
* Use a client side rate limit to reduce chance of getting banned * fix test Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
This commit is contained in:
parent
6d6e8be10a
commit
62a5931843
@ -25,6 +25,7 @@ go_library(
|
|||||||
"//shared/mathutil:go_default_library",
|
"//shared/mathutil:go_default_library",
|
||||||
"//shared/params:go_default_library",
|
"//shared/params:go_default_library",
|
||||||
"//shared/roughtime:go_default_library",
|
"//shared/roughtime:go_default_library",
|
||||||
|
"@com_github_kevinms_leakybucket_go//:go_default_library",
|
||||||
"@com_github_libp2p_go_libp2p_core//peer:go_default_library",
|
"@com_github_libp2p_go_libp2p_core//peer:go_default_library",
|
||||||
"@com_github_paulbellamy_ratecounter//:go_default_library",
|
"@com_github_paulbellamy_ratecounter//:go_default_library",
|
||||||
"@com_github_pkg_errors//:go_default_library",
|
"@com_github_pkg_errors//:go_default_library",
|
||||||
@ -51,6 +52,7 @@ go_test(
|
|||||||
"//shared/params:go_default_library",
|
"//shared/params:go_default_library",
|
||||||
"//shared/roughtime:go_default_library",
|
"//shared/roughtime:go_default_library",
|
||||||
"//shared/sliceutil:go_default_library",
|
"//shared/sliceutil:go_default_library",
|
||||||
|
"@com_github_kevinms_leakybucket_go//:go_default_library",
|
||||||
"@com_github_libp2p_go_libp2p_core//network:go_default_library",
|
"@com_github_libp2p_go_libp2p_core//network:go_default_library",
|
||||||
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
|
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
|
||||||
"@com_github_prysmaticlabs_go_ssz//:go_default_library",
|
"@com_github_prysmaticlabs_go_ssz//:go_default_library",
|
||||||
|
@ -274,6 +274,11 @@ func (s *Service) roundRobinSync(genesis time.Time) error {
|
|||||||
|
|
||||||
// requestBlocks by range to a specific peer.
|
// requestBlocks by range to a specific peer.
|
||||||
func (s *Service) requestBlocks(ctx context.Context, req *p2ppb.BeaconBlocksByRangeRequest, pid peer.ID) ([]*eth.SignedBeaconBlock, error) {
|
func (s *Service) requestBlocks(ctx context.Context, req *p2ppb.BeaconBlocksByRangeRequest, pid peer.ID) ([]*eth.SignedBeaconBlock, error) {
|
||||||
|
if s.blocksRateLimiter.Remaining(pid.String()) < int64(req.Count) {
|
||||||
|
log.WithField("peer", pid).Debug("Slowing down for rate limit")
|
||||||
|
time.Sleep(s.blocksRateLimiter.TillEmpty(pid.String()))
|
||||||
|
}
|
||||||
|
s.blocksRateLimiter.Add(pid.String(), int64(req.Count))
|
||||||
log.WithFields(logrus.Fields{
|
log.WithFields(logrus.Fields{
|
||||||
"peer": pid,
|
"peer": pid,
|
||||||
"start": req.StartSlot,
|
"start": req.StartSlot,
|
||||||
|
@ -7,6 +7,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/kevinms/leakybucket-go"
|
||||||
"github.com/libp2p/go-libp2p-core/network"
|
"github.com/libp2p/go-libp2p-core/network"
|
||||||
eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
|
eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
|
||||||
"github.com/prysmaticlabs/go-ssz"
|
"github.com/prysmaticlabs/go-ssz"
|
||||||
@ -255,11 +256,12 @@ func TestRoundRobinSync(t *testing.T) {
|
|||||||
DB: beaconDB,
|
DB: beaconDB,
|
||||||
} // no-op mock
|
} // no-op mock
|
||||||
s := &Service{
|
s := &Service{
|
||||||
chain: mc,
|
chain: mc,
|
||||||
p2p: p,
|
p2p: p,
|
||||||
db: beaconDB,
|
db: beaconDB,
|
||||||
synced: false,
|
synced: false,
|
||||||
chainStarted: true,
|
chainStarted: true,
|
||||||
|
blocksRateLimiter: leakybucket.NewCollector(allowedBlocksPerSecond, allowedBlocksPerSecond, false /* deleteEmptyBuckets */),
|
||||||
}
|
}
|
||||||
if err := s.roundRobinSync(makeGenesisTime(tt.currentSlot)); err != nil {
|
if err := s.roundRobinSync(makeGenesisTime(tt.currentSlot)); err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
|
@ -4,6 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/kevinms/leakybucket-go"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
|
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
|
||||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
|
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
|
||||||
@ -27,6 +28,8 @@ type blockchainService interface {
|
|||||||
|
|
||||||
const (
|
const (
|
||||||
handshakePollingInterval = 5 * time.Second // Polling interval for checking the number of received handshakes.
|
handshakePollingInterval = 5 * time.Second // Polling interval for checking the number of received handshakes.
|
||||||
|
|
||||||
|
allowedBlocksPerSecond = 32.0
|
||||||
)
|
)
|
||||||
|
|
||||||
// Config to set up the initial sync service.
|
// Config to set up the initial sync service.
|
||||||
@ -39,24 +42,26 @@ type Config struct {
|
|||||||
|
|
||||||
// Service service.
|
// Service service.
|
||||||
type Service struct {
|
type Service struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
chain blockchainService
|
chain blockchainService
|
||||||
p2p p2p.P2P
|
p2p p2p.P2P
|
||||||
db db.ReadOnlyDatabase
|
db db.ReadOnlyDatabase
|
||||||
synced bool
|
synced bool
|
||||||
chainStarted bool
|
chainStarted bool
|
||||||
stateNotifier statefeed.Notifier
|
stateNotifier statefeed.Notifier
|
||||||
|
blocksRateLimiter *leakybucket.Collector
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewInitialSync configures the initial sync service responsible for bringing the node up to the
|
// NewInitialSync configures the initial sync service responsible for bringing the node up to the
|
||||||
// latest head of the blockchain.
|
// latest head of the blockchain.
|
||||||
func NewInitialSync(cfg *Config) *Service {
|
func NewInitialSync(cfg *Config) *Service {
|
||||||
return &Service{
|
return &Service{
|
||||||
ctx: context.Background(),
|
ctx: context.Background(),
|
||||||
chain: cfg.Chain,
|
chain: cfg.Chain,
|
||||||
p2p: cfg.P2P,
|
p2p: cfg.P2P,
|
||||||
db: cfg.DB,
|
db: cfg.DB,
|
||||||
stateNotifier: cfg.StateNotifier,
|
stateNotifier: cfg.StateNotifier,
|
||||||
|
blocksRateLimiter: leakybucket.NewCollector(allowedBlocksPerSecond, allowedBlocksPerSecond, false /* deleteEmptyBuckets */),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user