diff --git a/beacon-chain/p2p/BUILD.bazel b/beacon-chain/p2p/BUILD.bazel index 5b1a7df2c..5449d8a20 100644 --- a/beacon-chain/p2p/BUILD.bazel +++ b/beacon-chain/p2p/BUILD.bazel @@ -57,6 +57,7 @@ go_library( "//config/params:go_default_library", "//consensus-types/primitives:go_default_library", "//consensus-types/wrapper:go_default_library", + "//container/leaky-bucket:go_default_library", "//crypto/ecdsa:go_default_library", "//crypto/hash:go_default_library", "//encoding/bytesutil:go_default_library", @@ -74,7 +75,6 @@ go_library( "@com_github_ethereum_go_ethereum//p2p/discover:go_default_library", "@com_github_ethereum_go_ethereum//p2p/enode:go_default_library", "@com_github_ethereum_go_ethereum//p2p/enr:go_default_library", - "@com_github_kevinms_leakybucket_go//:go_default_library", "@com_github_kr_pretty//:go_default_library", "@com_github_libp2p_go_libp2p//:go_default_library", "@com_github_libp2p_go_libp2p//config:go_default_library", @@ -150,6 +150,7 @@ go_test( "//config/params:go_default_library", "//consensus-types/primitives:go_default_library", "//consensus-types/wrapper:go_default_library", + "//container/leaky-bucket:go_default_library", "//crypto/ecdsa:go_default_library", "//crypto/hash:go_default_library", "//encoding/bytesutil:go_default_library", @@ -168,7 +169,6 @@ go_test( "@com_github_ethereum_go_ethereum//p2p/enode:go_default_library", "@com_github_ethereum_go_ethereum//p2p/enr:go_default_library", "@com_github_golang_snappy//:go_default_library", - "@com_github_kevinms_leakybucket_go//:go_default_library", "@com_github_libp2p_go_libp2p//:go_default_library", "@com_github_libp2p_go_libp2p//core/crypto:go_default_library", "@com_github_libp2p_go_libp2p//core/host:go_default_library", diff --git a/beacon-chain/p2p/connection_gater_test.go b/beacon-chain/p2p/connection_gater_test.go index 6c490c956..1169b490f 100644 --- a/beacon-chain/p2p/connection_gater_test.go +++ b/beacon-chain/p2p/connection_gater_test.go @@ -4,8 +4,8 @@ import ( "context" "fmt" "testing" + "time" - "github.com/kevinms/leakybucket-go" "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p/core/peer" ma "github.com/multiformats/go-multiaddr" @@ -13,6 +13,7 @@ import ( "github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/peers/peerdata" "github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/peers/scorers" mockp2p "github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/testing" + leakybucket "github.com/prysmaticlabs/prysm/v3/container/leaky-bucket" ethpb "github.com/prysmaticlabs/prysm/v3/proto/eth/v1" "github.com/prysmaticlabs/prysm/v3/testing/assert" "github.com/prysmaticlabs/prysm/v3/testing/require" @@ -26,7 +27,7 @@ func TestPeer_AtMaxLimit(t *testing.T) { listen, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", ipAddr, 2000)) require.NoError(t, err, "Failed to p2p listen") s := &Service{ - ipLimiter: leakybucket.NewCollector(ipLimit, ipBurst, false), + ipLimiter: leakybucket.NewCollector(ipLimit, ipBurst, 1*time.Second, false), } s.peers = peers.NewStatus(context.Background(), &peers.StatusConfig{ PeerLimit: 0, @@ -70,7 +71,7 @@ func TestPeer_AtMaxLimit(t *testing.T) { func TestService_InterceptBannedIP(t *testing.T) { s := &Service{ - ipLimiter: leakybucket.NewCollector(ipLimit, ipBurst, false), + ipLimiter: leakybucket.NewCollector(ipLimit, ipBurst, 1*time.Second, false), peers: peers.NewStatus(context.Background(), &peers.StatusConfig{ PeerLimit: 20, ScorerParams: &scorers.Config{}, @@ -98,7 +99,7 @@ func TestService_InterceptBannedIP(t *testing.T) { func TestService_RejectInboundPeersBeyondLimit(t *testing.T) { limit := 20 s := &Service{ - ipLimiter: leakybucket.NewCollector(ipLimit, ipBurst, false), + ipLimiter: leakybucket.NewCollector(ipLimit, ipBurst, 1*time.Second, false), peers: peers.NewStatus(context.Background(), &peers.StatusConfig{ PeerLimit: limit, ScorerParams: &scorers.Config{}, @@ -140,7 +141,7 @@ func TestPeer_BelowMaxLimit(t *testing.T) { listen, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", ipAddr, 2000)) require.NoError(t, err, "Failed to p2p listen") s := &Service{ - ipLimiter: leakybucket.NewCollector(ipLimit, ipBurst, false), + ipLimiter: leakybucket.NewCollector(ipLimit, ipBurst, 1*time.Second, false), } s.peers = peers.NewStatus(context.Background(), &peers.StatusConfig{ PeerLimit: 1, @@ -191,7 +192,7 @@ func TestPeerAllowList(t *testing.T) { listen, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", ipAddr, 2000)) require.NoError(t, err, "Failed to p2p listen") s := &Service{ - ipLimiter: leakybucket.NewCollector(ipLimit, ipBurst, false), + ipLimiter: leakybucket.NewCollector(ipLimit, ipBurst, 1*time.Second, false), peers: peers.NewStatus(context.Background(), &peers.StatusConfig{ ScorerParams: &scorers.Config{}, }), @@ -237,7 +238,7 @@ func TestPeerDenyList(t *testing.T) { listen, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", ipAddr, 2000)) require.NoError(t, err, "Failed to p2p listen") s := &Service{ - ipLimiter: leakybucket.NewCollector(ipLimit, ipBurst, false), + ipLimiter: leakybucket.NewCollector(ipLimit, ipBurst, 1*time.Second, false), peers: peers.NewStatus(context.Background(), &peers.StatusConfig{ ScorerParams: &scorers.Config{}, }), @@ -272,7 +273,7 @@ func TestPeerDenyList(t *testing.T) { func TestService_InterceptAddrDial_Allow(t *testing.T) { s := &Service{ - ipLimiter: leakybucket.NewCollector(ipLimit, ipBurst, false), + ipLimiter: leakybucket.NewCollector(ipLimit, ipBurst, 1*time.Second, false), peers: peers.NewStatus(context.Background(), &peers.StatusConfig{ ScorerParams: &scorers.Config{}, }), @@ -292,7 +293,7 @@ func TestService_InterceptAddrDial_Allow(t *testing.T) { func TestService_InterceptAddrDial_Public(t *testing.T) { s := &Service{ - ipLimiter: leakybucket.NewCollector(ipLimit, ipBurst, false), + ipLimiter: leakybucket.NewCollector(ipLimit, ipBurst, 1*time.Second, false), peers: peers.NewStatus(context.Background(), &peers.StatusConfig{ ScorerParams: &scorers.Config{}, }), @@ -340,7 +341,7 @@ func TestService_InterceptAddrDial_Public(t *testing.T) { func TestService_InterceptAddrDial_Private(t *testing.T) { s := &Service{ - ipLimiter: leakybucket.NewCollector(ipLimit, ipBurst, false), + ipLimiter: leakybucket.NewCollector(ipLimit, ipBurst, 1*time.Second, false), peers: peers.NewStatus(context.Background(), &peers.StatusConfig{ ScorerParams: &scorers.Config{}, }), @@ -369,7 +370,7 @@ func TestService_InterceptAddrDial_Private(t *testing.T) { func TestService_InterceptAddrDial_AllowPrivate(t *testing.T) { s := &Service{ - ipLimiter: leakybucket.NewCollector(ipLimit, ipBurst, false), + ipLimiter: leakybucket.NewCollector(ipLimit, ipBurst, 1*time.Second, false), peers: peers.NewStatus(context.Background(), &peers.StatusConfig{ ScorerParams: &scorers.Config{}, }), @@ -398,7 +399,7 @@ func TestService_InterceptAddrDial_AllowPrivate(t *testing.T) { func TestService_InterceptAddrDial_DenyPublic(t *testing.T) { s := &Service{ - ipLimiter: leakybucket.NewCollector(ipLimit, ipBurst, false), + ipLimiter: leakybucket.NewCollector(ipLimit, ipBurst, 1*time.Second, false), peers: peers.NewStatus(context.Background(), &peers.StatusConfig{ ScorerParams: &scorers.Config{}, }), @@ -427,7 +428,7 @@ func TestService_InterceptAddrDial_DenyPublic(t *testing.T) { func TestService_InterceptAddrDial_AllowConflict(t *testing.T) { s := &Service{ - ipLimiter: leakybucket.NewCollector(ipLimit, ipBurst, false), + ipLimiter: leakybucket.NewCollector(ipLimit, ipBurst, 1*time.Second, false), peers: peers.NewStatus(context.Background(), &peers.StatusConfig{ ScorerParams: &scorers.Config{}, }), diff --git a/beacon-chain/p2p/discovery_test.go b/beacon-chain/p2p/discovery_test.go index c231ef8a0..ee07c7fb5 100644 --- a/beacon-chain/p2p/discovery_test.go +++ b/beacon-chain/p2p/discovery_test.go @@ -16,7 +16,6 @@ import ( "github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enr" - "github.com/kevinms/leakybucket-go" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" @@ -31,6 +30,7 @@ import ( testp2p "github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/testing" "github.com/prysmaticlabs/prysm/v3/config/params" "github.com/prysmaticlabs/prysm/v3/consensus-types/wrapper" + leakybucket "github.com/prysmaticlabs/prysm/v3/container/leaky-bucket" "github.com/prysmaticlabs/prysm/v3/encoding/bytesutil" prysmNetwork "github.com/prysmaticlabs/prysm/v3/network" ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1" @@ -246,7 +246,7 @@ func TestInboundPeerLimit(t *testing.T) { fakePeer := testp2p.NewTestP2P(t) s := &Service{ cfg: &Config{MaxPeers: 30}, - ipLimiter: leakybucket.NewCollector(ipLimit, ipBurst, false), + ipLimiter: leakybucket.NewCollector(ipLimit, ipBurst, 1*time.Second, false), peers: peers.NewStatus(context.Background(), &peers.StatusConfig{ PeerLimit: 30, ScorerParams: &scorers.Config{}, diff --git a/beacon-chain/p2p/service.go b/beacon-chain/p2p/service.go index aebf0aa80..e45e1e17b 100644 --- a/beacon-chain/p2p/service.go +++ b/beacon-chain/p2p/service.go @@ -11,7 +11,6 @@ import ( "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enr" - "github.com/kevinms/leakybucket-go" "github.com/libp2p/go-libp2p" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/host" @@ -29,6 +28,7 @@ import ( "github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/peers/scorers" "github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/types" "github.com/prysmaticlabs/prysm/v3/config/params" + leakybucket "github.com/prysmaticlabs/prysm/v3/container/leaky-bucket" prysmnetwork "github.com/prysmaticlabs/prysm/v3/network" "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1/metadata" "github.com/prysmaticlabs/prysm/v3/runtime" @@ -123,7 +123,7 @@ func NewService(ctx context.Context, cfg *Config) (*Service, error) { log.WithError(err).Error("Failed to create address filter") return nil, err } - s.ipLimiter = leakybucket.NewCollector(ipLimit, ipBurst, true /* deleteEmptyBuckets */) + s.ipLimiter = leakybucket.NewCollector(ipLimit, ipBurst, 30*time.Second, true /* deleteEmptyBuckets */) opts := s.buildOptions(ipAddr, s.privKey) h, err := libp2p.New(opts...) diff --git a/beacon-chain/sync/BUILD.bazel b/beacon-chain/sync/BUILD.bazel index 51b7c1cdf..aa97b11ec 100644 --- a/beacon-chain/sync/BUILD.bazel +++ b/beacon-chain/sync/BUILD.bazel @@ -88,6 +88,7 @@ go_library( "//consensus-types/interfaces:go_default_library", "//consensus-types/primitives:go_default_library", "//consensus-types/wrapper:go_default_library", + "//container/leaky-bucket:go_default_library", "//container/slice:go_default_library", "//crypto/bls:go_default_library", "//crypto/rand:go_default_library", @@ -105,7 +106,6 @@ go_library( "//time/slots:go_default_library", "@com_github_ethereum_go_ethereum//common/hexutil:go_default_library", "@com_github_hashicorp_golang_lru//:go_default_library", - "@com_github_kevinms_leakybucket_go//:go_default_library", "@com_github_libp2p_go_libp2p//core:go_default_library", "@com_github_libp2p_go_libp2p//core/host:go_default_library", "@com_github_libp2p_go_libp2p//core/network:go_default_library", @@ -204,6 +204,7 @@ go_test( "//consensus-types/interfaces:go_default_library", "//consensus-types/primitives:go_default_library", "//consensus-types/wrapper:go_default_library", + "//container/leaky-bucket:go_default_library", "//crypto/bls:go_default_library", "//crypto/rand:go_default_library", "//encoding/bytesutil:go_default_library", @@ -223,7 +224,6 @@ go_test( "@com_github_ethereum_go_ethereum//core/types:go_default_library", "@com_github_ethereum_go_ethereum//p2p/enr:go_default_library", "@com_github_golang_snappy//:go_default_library", - "@com_github_kevinms_leakybucket_go//:go_default_library", "@com_github_libp2p_go_libp2p//core:go_default_library", "@com_github_libp2p_go_libp2p//core/network:go_default_library", "@com_github_libp2p_go_libp2p//core/peer:go_default_library", diff --git a/beacon-chain/sync/initial-sync/BUILD.bazel b/beacon-chain/sync/initial-sync/BUILD.bazel index bd938733b..26c2b486e 100644 --- a/beacon-chain/sync/initial-sync/BUILD.bazel +++ b/beacon-chain/sync/initial-sync/BUILD.bazel @@ -31,13 +31,13 @@ go_library( "//config/params:go_default_library", "//consensus-types/interfaces:go_default_library", "//consensus-types/primitives:go_default_library", + "//container/leaky-bucket:go_default_library", "//crypto/rand:go_default_library", "//math:go_default_library", "//proto/prysm/v1alpha1:go_default_library", "//runtime:go_default_library", "//time:go_default_library", "//time/slots:go_default_library", - "@com_github_kevinms_leakybucket_go//:go_default_library", "@com_github_libp2p_go_libp2p//core/peer:go_default_library", "@com_github_paulbellamy_ratecounter//:go_default_library", "@com_github_pkg_errors//:go_default_library", @@ -83,7 +83,6 @@ go_test( "//testing/util:go_default_library", "//time:go_default_library", "@com_github_ethereum_go_ethereum//p2p/enr:go_default_library", - "@com_github_kevinms_leakybucket_go//:go_default_library", "@com_github_libp2p_go_libp2p//core:go_default_library", "@com_github_libp2p_go_libp2p//core/network:go_default_library", "@com_github_libp2p_go_libp2p//core/peer:go_default_library", @@ -126,6 +125,7 @@ go_test( "//consensus-types/blocks:go_default_library", "//consensus-types/interfaces:go_default_library", "//consensus-types/primitives:go_default_library", + "//container/leaky-bucket:go_default_library", "//container/slice:go_default_library", "//crypto/hash:go_default_library", "//encoding/bytesutil:go_default_library", @@ -136,7 +136,6 @@ go_test( "//time:go_default_library", "//time/slots:go_default_library", "@com_github_ethereum_go_ethereum//p2p/enr:go_default_library", - "@com_github_kevinms_leakybucket_go//:go_default_library", "@com_github_libp2p_go_libp2p//core:go_default_library", "@com_github_libp2p_go_libp2p//core/network:go_default_library", "@com_github_libp2p_go_libp2p//core/peer:go_default_library", diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher.go b/beacon-chain/sync/initial-sync/blocks_fetcher.go index 7c5d270ff..2431ab238 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher.go @@ -6,7 +6,6 @@ import ( "sync" "time" - "github.com/kevinms/leakybucket-go" "github.com/libp2p/go-libp2p/core/peer" "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/v3/beacon-chain/db" @@ -17,6 +16,7 @@ import ( "github.com/prysmaticlabs/prysm/v3/config/params" "github.com/prysmaticlabs/prysm/v3/consensus-types/interfaces" types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives" + leakybucket "github.com/prysmaticlabs/prysm/v3/container/leaky-bucket" "github.com/prysmaticlabs/prysm/v3/crypto/rand" p2ppb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1" "github.com/sirupsen/logrus" @@ -54,6 +54,9 @@ var ( errNoPeersWithAltBlocks = errors.New("no peers with alternative blocks found") ) +// Period to calculate expected limit for a single peer. +var blockLimiterPeriod = 30 * time.Second + // blocksFetcherConfig is a config to setup the block fetcher. type blocksFetcherConfig struct { chain blockchainService @@ -114,7 +117,7 @@ func newBlocksFetcher(ctx context.Context, cfg *blocksFetcherConfig) *blocksFetc // Allow fetcher to go almost to the full burst capacity (less a single batch). rateLimiter := leakybucket.NewCollector( float64(blocksPerSecond), int64(allowedBlocksBurst-blocksPerSecond), - false /* deleteEmptyBuckets */) + blockLimiterPeriod, false /* deleteEmptyBuckets */) capacityWeight := cfg.peerFilterCapacityWeight if capacityWeight >= 1 { diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher_peers_test.go b/beacon-chain/sync/initial-sync/blocks_fetcher_peers_test.go index 50a3d208e..7dfab5de4 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher_peers_test.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher_peers_test.go @@ -8,11 +8,11 @@ import ( "testing" "time" - "github.com/kevinms/leakybucket-go" "github.com/libp2p/go-libp2p/core/peer" "github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/peers/scorers" "github.com/prysmaticlabs/prysm/v3/cmd/beacon-chain/flags" types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives" + leakybucket "github.com/prysmaticlabs/prysm/v3/container/leaky-bucket" "github.com/prysmaticlabs/prysm/v3/testing/assert" "github.com/prysmaticlabs/prysm/v3/testing/require" prysmTime "github.com/prysmaticlabs/prysm/v3/time" @@ -237,7 +237,7 @@ func TestBlocksFetcher_filterPeers(t *testing.T) { peerFilterCapacityWeight: tt.args.capacityWeight, }) // Non-leaking bucket, with initial capacity of 10000. - fetcher.rateLimiter = leakybucket.NewCollector(0.000001, 10000, false) + fetcher.rateLimiter = leakybucket.NewCollector(0.000001, 10000, 1*time.Second, false) peerIDs := make([]peer.ID, 0) for _, pid := range tt.args.peers { peerIDs = append(peerIDs, pid.ID) diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher_test.go b/beacon-chain/sync/initial-sync/blocks_fetcher_test.go index 0fd877250..2a42df026 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher_test.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher_test.go @@ -8,7 +8,6 @@ import ( "testing" "time" - "github.com/kevinms/leakybucket-go" libp2pcore "github.com/libp2p/go-libp2p/core" "github.com/libp2p/go-libp2p/core/network" mock "github.com/prysmaticlabs/prysm/v3/beacon-chain/blockchain/testing" @@ -21,6 +20,7 @@ import ( "github.com/prysmaticlabs/prysm/v3/consensus-types/blocks" "github.com/prysmaticlabs/prysm/v3/consensus-types/interfaces" types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives" + leakybucket "github.com/prysmaticlabs/prysm/v3/container/leaky-bucket" "github.com/prysmaticlabs/prysm/v3/container/slice" ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/v3/testing/assert" @@ -547,7 +547,7 @@ func TestBlocksFetcher_RequestBlocksRateLimitingLocks(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{p2p: p1}) - fetcher.rateLimiter = leakybucket.NewCollector(float64(req.Count), int64(req.Count*burstFactor), false) + fetcher.rateLimiter = leakybucket.NewCollector(float64(req.Count), int64(req.Count*burstFactor), 1*time.Second, false) fetcher.chain = &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}} hook := logTest.NewGlobal() wg := new(sync.WaitGroup) @@ -842,7 +842,7 @@ func TestBlocksFetcher_requestBlocksFromPeerReturningInvalidBlocks(t *testing.T) ctx, cancel := context.WithCancel(context.Background()) defer cancel() fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{p2p: p1, chain: &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}}}) - fetcher.rateLimiter = leakybucket.NewCollector(0.000001, 640, false) + fetcher.rateLimiter = leakybucket.NewCollector(0.000001, 640, 1*time.Second, false) for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher_utils_test.go b/beacon-chain/sync/initial-sync/blocks_fetcher_utils_test.go index ffe9b6022..dad86d384 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher_utils_test.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher_utils_test.go @@ -7,7 +7,6 @@ import ( "testing" "time" - "github.com/kevinms/leakybucket-go" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" mock "github.com/prysmaticlabs/prysm/v3/beacon-chain/blockchain/testing" @@ -18,6 +17,7 @@ import ( "github.com/prysmaticlabs/prysm/v3/config/params" "github.com/prysmaticlabs/prysm/v3/consensus-types/blocks" types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives" + leakybucket "github.com/prysmaticlabs/prysm/v3/container/leaky-bucket" "github.com/prysmaticlabs/prysm/v3/encoding/bytesutil" ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/v3/testing/assert" @@ -59,7 +59,7 @@ func TestBlocksFetcher_nonSkippedSlotAfter(t *testing.T) { p2p: p2p, }, ) - fetcher.rateLimiter = leakybucket.NewCollector(6400, 6400, false) + fetcher.rateLimiter = leakybucket.NewCollector(6400, 6400, 1*time.Second, false) seekSlots := map[types.Slot]types.Slot{ 0: 1, 10: 11, @@ -191,7 +191,7 @@ func TestBlocksFetcher_findFork(t *testing.T) { db: beaconDB, }, ) - fetcher.rateLimiter = leakybucket.NewCollector(6400, 6400, false) + fetcher.rateLimiter = leakybucket.NewCollector(6400, 6400, 1*time.Second, false) // Consume all chain1 blocks from many peers (alternative fork will be featured by a single peer, // and should still be enough to explore alternative paths). @@ -339,7 +339,7 @@ func TestBlocksFetcher_findForkWithPeer(t *testing.T) { db: beaconDB, }, ) - fetcher.rateLimiter = leakybucket.NewCollector(6400, 6400, false) + fetcher.rateLimiter = leakybucket.NewCollector(6400, 6400, 1*time.Second, false) for _, blk := range knownBlocks { util.SaveBlock(t, ctx, beaconDB, blk) @@ -455,7 +455,7 @@ func TestBlocksFetcher_findAncestor(t *testing.T) { db: beaconDB, }, ) - fetcher.rateLimiter = leakybucket.NewCollector(6400, 6400, false) + fetcher.rateLimiter = leakybucket.NewCollector(6400, 6400, 1*time.Second, false) pcl := fmt.Sprintf("%s/ssz_snappy", p2pm.RPCBlocksByRootTopicV1) t.Run("error on request", func(t *testing.T) { diff --git a/beacon-chain/sync/initial-sync/blocks_queue_test.go b/beacon-chain/sync/initial-sync/blocks_queue_test.go index 345b735c3..f0a71445f 100644 --- a/beacon-chain/sync/initial-sync/blocks_queue_test.go +++ b/beacon-chain/sync/initial-sync/blocks_queue_test.go @@ -6,7 +6,6 @@ import ( "testing" "time" - "github.com/kevinms/leakybucket-go" "github.com/libp2p/go-libp2p/core/peer" mock "github.com/prysmaticlabs/prysm/v3/beacon-chain/blockchain/testing" dbtest "github.com/prysmaticlabs/prysm/v3/beacon-chain/db/testing" @@ -17,6 +16,7 @@ import ( "github.com/prysmaticlabs/prysm/v3/consensus-types/blocks" "github.com/prysmaticlabs/prysm/v3/consensus-types/interfaces" types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives" + leakybucket "github.com/prysmaticlabs/prysm/v3/container/leaky-bucket" "github.com/prysmaticlabs/prysm/v3/container/slice" "github.com/prysmaticlabs/prysm/v3/encoding/bytesutil" eth "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1" @@ -134,6 +134,11 @@ func TestBlocksQueue_InitStartStop(t *testing.T) { } func TestBlocksQueue_Loop(t *testing.T) { + currentPeriod := blockLimiterPeriod + blockLimiterPeriod = 1 * time.Second + defer func() { + blockLimiterPeriod = currentPeriod + }() tests := []struct { name string highestExpectedSlot types.Slot @@ -1075,7 +1080,7 @@ func TestBlocksQueue_stuckInUnfavourableFork(t *testing.T) { db: beaconDB, }, ) - fetcher.rateLimiter = leakybucket.NewCollector(6400, 6400, false) + fetcher.rateLimiter = leakybucket.NewCollector(6400, 6400, 1*time.Second, false) queue := newBlocksQueue(ctx, &blocksQueueConfig{ blocksFetcher: fetcher, @@ -1295,7 +1300,7 @@ func TestBlocksQueue_stuckWhenHeadIsSetToOrphanedBlock(t *testing.T) { db: beaconDB, }, ) - fetcher.rateLimiter = leakybucket.NewCollector(6400, 6400, false) + fetcher.rateLimiter = leakybucket.NewCollector(6400, 6400, 1*time.Second, false) // Connect peer that has all the blocks available. allBlocksPeer := connectPeerHavingBlocks(t, p2p, chain, finalizedSlot, p2p.Peers()) diff --git a/beacon-chain/sync/initial-sync/round_robin_test.go b/beacon-chain/sync/initial-sync/round_robin_test.go index 33992f530..146d31c98 100644 --- a/beacon-chain/sync/initial-sync/round_robin_test.go +++ b/beacon-chain/sync/initial-sync/round_robin_test.go @@ -22,6 +22,11 @@ import ( ) func TestService_roundRobinSync(t *testing.T) { + currentPeriod := blockLimiterPeriod + blockLimiterPeriod = 1 * time.Second + defer func() { + blockLimiterPeriod = currentPeriod + }() tests := []struct { name string currentSlot types.Slot @@ -488,6 +493,11 @@ func TestService_processBlockBatch(t *testing.T) { } func TestService_blockProviderScoring(t *testing.T) { + currentPeriod := blockLimiterPeriod + blockLimiterPeriod = 1 * time.Second + defer func() { + blockLimiterPeriod = currentPeriod + }() cache.initializeRootCache(makeSequence(1, 640), t) p := p2pt.NewTestP2P(t) diff --git a/beacon-chain/sync/rate_limiter.go b/beacon-chain/sync/rate_limiter.go index 60d407ef2..5d71f5043 100644 --- a/beacon-chain/sync/rate_limiter.go +++ b/beacon-chain/sync/rate_limiter.go @@ -3,19 +3,25 @@ package sync import ( "reflect" "sync" + "time" - "github.com/kevinms/leakybucket-go" "github.com/libp2p/go-libp2p/core/network" "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p" p2ptypes "github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/types" "github.com/prysmaticlabs/prysm/v3/cmd/beacon-chain/flags" + leakybucket "github.com/prysmaticlabs/prysm/v3/container/leaky-bucket" "github.com/sirupsen/logrus" "github.com/trailofbits/go-mutexasserts" ) const defaultBurstLimit = 5 +const leakyBucketPeriod = 1 * time.Second + +// Only allow in 2 batches per minute. +const blockBucketPeriod = 30 * time.Second + // Dummy topic to validate all incoming rpc requests. const rpcLimiterTopic = "rpc-limiter-topic" @@ -39,19 +45,19 @@ func newRateLimiter(p2pProvider p2p.P2P) *limiter { // Set topic map for all rpc topics. topicMap := make(map[string]*leakybucket.Collector, len(p2p.RPCTopicMappings)) // Goodbye Message - topicMap[addEncoding(p2p.RPCGoodByeTopicV1)] = leakybucket.NewCollector(1, 1, false /* deleteEmptyBuckets */) + topicMap[addEncoding(p2p.RPCGoodByeTopicV1)] = leakybucket.NewCollector(1, 1, leakyBucketPeriod, false /* deleteEmptyBuckets */) // MetadataV0 Message - topicMap[addEncoding(p2p.RPCMetaDataTopicV1)] = leakybucket.NewCollector(1, defaultBurstLimit, false /* deleteEmptyBuckets */) - topicMap[addEncoding(p2p.RPCMetaDataTopicV2)] = leakybucket.NewCollector(1, defaultBurstLimit, false /* deleteEmptyBuckets */) + topicMap[addEncoding(p2p.RPCMetaDataTopicV1)] = leakybucket.NewCollector(1, defaultBurstLimit, leakyBucketPeriod, false /* deleteEmptyBuckets */) + topicMap[addEncoding(p2p.RPCMetaDataTopicV2)] = leakybucket.NewCollector(1, defaultBurstLimit, leakyBucketPeriod, false /* deleteEmptyBuckets */) // Ping Message - topicMap[addEncoding(p2p.RPCPingTopicV1)] = leakybucket.NewCollector(1, defaultBurstLimit, false /* deleteEmptyBuckets */) + topicMap[addEncoding(p2p.RPCPingTopicV1)] = leakybucket.NewCollector(1, defaultBurstLimit, leakyBucketPeriod, false /* deleteEmptyBuckets */) // Status Message - topicMap[addEncoding(p2p.RPCStatusTopicV1)] = leakybucket.NewCollector(1, defaultBurstLimit, false /* deleteEmptyBuckets */) + topicMap[addEncoding(p2p.RPCStatusTopicV1)] = leakybucket.NewCollector(1, defaultBurstLimit, leakyBucketPeriod, false /* deleteEmptyBuckets */) // Use a single collector for block requests - blockCollector := leakybucket.NewCollector(allowedBlocksPerSecond, allowedBlocksBurst, false /* deleteEmptyBuckets */) + blockCollector := leakybucket.NewCollector(allowedBlocksPerSecond, allowedBlocksBurst, blockBucketPeriod, false /* deleteEmptyBuckets */) // Collector for V2 - blockCollectorV2 := leakybucket.NewCollector(allowedBlocksPerSecond, allowedBlocksBurst, false /* deleteEmptyBuckets */) + blockCollectorV2 := leakybucket.NewCollector(allowedBlocksPerSecond, allowedBlocksBurst, blockBucketPeriod, false /* deleteEmptyBuckets */) // BlocksByRoots requests topicMap[addEncoding(p2p.RPCBlocksByRootTopicV1)] = blockCollector @@ -62,7 +68,7 @@ func newRateLimiter(p2pProvider p2p.P2P) *limiter { topicMap[addEncoding(p2p.RPCBlocksByRangeTopicV2)] = blockCollectorV2 // General topic for all rpc requests. - topicMap[rpcLimiterTopic] = leakybucket.NewCollector(5, defaultBurstLimit*2, false /* deleteEmptyBuckets */) + topicMap[rpcLimiterTopic] = leakybucket.NewCollector(5, defaultBurstLimit*2, leakyBucketPeriod, false /* deleteEmptyBuckets */) return &limiter{limiterMap: topicMap, p2p: p2pProvider} } diff --git a/beacon-chain/sync/rpc_beacon_blocks_by_range_test.go b/beacon-chain/sync/rpc_beacon_blocks_by_range_test.go index c5dc731ec..35e2acb67 100644 --- a/beacon-chain/sync/rpc_beacon_blocks_by_range_test.go +++ b/beacon-chain/sync/rpc_beacon_blocks_by_range_test.go @@ -10,7 +10,6 @@ import ( "github.com/ethereum/go-ethereum/common" gethTypes "github.com/ethereum/go-ethereum/core/types" - "github.com/kevinms/leakybucket-go" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/protocol" chainMock "github.com/prysmaticlabs/prysm/v3/beacon-chain/blockchain/testing" @@ -28,6 +27,7 @@ import ( "github.com/prysmaticlabs/prysm/v3/consensus-types/blocks" "github.com/prysmaticlabs/prysm/v3/consensus-types/interfaces" types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives" + leakybucket "github.com/prysmaticlabs/prysm/v3/container/leaky-bucket" "github.com/prysmaticlabs/prysm/v3/encoding/bytesutil" enginev1 "github.com/prysmaticlabs/prysm/v3/proto/engine/v1" ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1" @@ -67,7 +67,7 @@ func TestRPCBeaconBlocksByRange_RPCHandlerReturnsBlocks(t *testing.T) { r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)} pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1) topic := string(pcl) - r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, int64(req.Count*10), false) + r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, int64(req.Count*10), time.Second, false) var wg sync.WaitGroup wg.Add(1) p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) { @@ -129,7 +129,7 @@ func TestRPCBeaconBlocksByRange_ReturnCorrectNumberBack(t *testing.T) { r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)} pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1) topic := string(pcl) - r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, int64(req.Count*10), false) + r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, int64(req.Count*10), time.Second, false) var wg sync.WaitGroup wg.Add(1) @@ -246,7 +246,7 @@ func TestRPCBeaconBlocksByRange_ReconstructsPayloads(t *testing.T) { } pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1) topic := string(pcl) - r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, int64(req.Count*10), false) + r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, int64(req.Count*10), time.Second, false) var wg sync.WaitGroup wg.Add(1) @@ -314,7 +314,7 @@ func TestRPCBeaconBlocksByRange_RPCHandlerReturnsSortedBlocks(t *testing.T) { r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)} pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1) topic := string(pcl) - r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, int64(req.Count*10), false) + r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, int64(req.Count*10), time.Second, false) var wg sync.WaitGroup wg.Add(1) @@ -379,7 +379,7 @@ func TestRPCBeaconBlocksByRange_ReturnsGenesisBlock(t *testing.T) { r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)} pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1) topic := string(pcl) - r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(10000, 10000, false) + r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(10000, 10000, time.Second, false) var wg sync.WaitGroup wg.Add(1) @@ -471,7 +471,7 @@ func TestRPCBeaconBlocksByRange_RPCHandlerRateLimitOverflow(t *testing.T) { pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1) topic := string(pcl) - r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, capacity, false) + r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, capacity, time.Second, false) req := ðpb.BeaconBlocksByRangeRequest{ StartSlot: 100, Step: 5, @@ -497,7 +497,7 @@ func TestRPCBeaconBlocksByRange_RPCHandlerRateLimitOverflow(t *testing.T) { pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1) topic := string(pcl) - r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, capacity, false) + r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, capacity, time.Second, false) req := ðpb.BeaconBlocksByRangeRequest{ StartSlot: 100, @@ -526,7 +526,7 @@ func TestRPCBeaconBlocksByRange_RPCHandlerRateLimitOverflow(t *testing.T) { r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)} pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1) topic := string(pcl) - r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, capacity, false) + r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, capacity, time.Second, false) req := ðpb.BeaconBlocksByRangeRequest{ StartSlot: 100, @@ -723,7 +723,7 @@ func TestRPCBeaconBlocksByRange_EnforceResponseInvariants(t *testing.T) { assert.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected") r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)} - r.rateLimiter.limiterMap[string(pcl)] = leakybucket.NewCollector(0.000001, 640, false) + r.rateLimiter.limiterMap[string(pcl)] = leakybucket.NewCollector(0.000001, 640, time.Second, false) req := ðpb.BeaconBlocksByRangeRequest{ StartSlot: 448, Step: 1, @@ -891,7 +891,7 @@ func TestRPCBeaconBlocksByRange_FilterBlocks(t *testing.T) { assert.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected") r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)} - r.rateLimiter.limiterMap[string(pcl)] = leakybucket.NewCollector(0.000001, 640, false) + r.rateLimiter.limiterMap[string(pcl)] = leakybucket.NewCollector(0.000001, 640, time.Second, false) req := ðpb.BeaconBlocksByRangeRequest{ StartSlot: 1, Step: 1, @@ -922,7 +922,7 @@ func TestRPCBeaconBlocksByRange_FilterBlocks(t *testing.T) { assert.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected") r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)} - r.rateLimiter.limiterMap[string(pcl)] = leakybucket.NewCollector(0.000001, 640, false) + r.rateLimiter.limiterMap[string(pcl)] = leakybucket.NewCollector(0.000001, 640, time.Second, false) req := ðpb.BeaconBlocksByRangeRequest{ StartSlot: 1, Step: 1, @@ -957,7 +957,7 @@ func TestRPCBeaconBlocksByRange_FilterBlocks(t *testing.T) { assert.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected") r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)} - r.rateLimiter.limiterMap[string(pcl)] = leakybucket.NewCollector(0.000001, 640, false) + r.rateLimiter.limiterMap[string(pcl)] = leakybucket.NewCollector(0.000001, 640, time.Second, false) req := ðpb.BeaconBlocksByRangeRequest{ StartSlot: 1, Step: 1, @@ -992,7 +992,7 @@ func TestRPCBeaconBlocksByRange_FilterBlocks(t *testing.T) { assert.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected") r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)} - r.rateLimiter.limiterMap[string(pcl)] = leakybucket.NewCollector(0.000001, 640, false) + r.rateLimiter.limiterMap[string(pcl)] = leakybucket.NewCollector(0.000001, 640, time.Second, false) req := ðpb.BeaconBlocksByRangeRequest{ StartSlot: 1, Step: 1, @@ -1032,7 +1032,7 @@ func TestRPCBeaconBlocksByRange_FilterBlocks(t *testing.T) { assert.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected") r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)} - r.rateLimiter.limiterMap[string(pcl)] = leakybucket.NewCollector(0.000001, 640, false) + r.rateLimiter.limiterMap[string(pcl)] = leakybucket.NewCollector(0.000001, 640, time.Second, false) req := ðpb.BeaconBlocksByRangeRequest{ StartSlot: 1, Step: 1, diff --git a/beacon-chain/sync/rpc_beacon_blocks_by_root_test.go b/beacon-chain/sync/rpc_beacon_blocks_by_root_test.go index 9b4e7e901..f3691c805 100644 --- a/beacon-chain/sync/rpc_beacon_blocks_by_root_test.go +++ b/beacon-chain/sync/rpc_beacon_blocks_by_root_test.go @@ -9,7 +9,6 @@ import ( "github.com/ethereum/go-ethereum/common" gethTypes "github.com/ethereum/go-ethereum/core/types" - "github.com/kevinms/leakybucket-go" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/protocol" gcache "github.com/patrickmn/go-cache" @@ -24,6 +23,7 @@ import ( "github.com/prysmaticlabs/prysm/v3/config/params" "github.com/prysmaticlabs/prysm/v3/consensus-types/blocks" types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives" + leakybucket "github.com/prysmaticlabs/prysm/v3/container/leaky-bucket" "github.com/prysmaticlabs/prysm/v3/encoding/bytesutil" enginev1 "github.com/prysmaticlabs/prysm/v3/proto/engine/v1" ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1" @@ -54,7 +54,7 @@ func TestRecentBeaconBlocksRPCHandler_ReturnsBlocks(t *testing.T) { r.cfg.chain = &mock.ChainService{ValidatorsRoot: [32]byte{}} pcl := protocol.ID(p2p.RPCBlocksByRootTopicV1) topic := string(pcl) - r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(10000, 10000, false) + r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(10000, 10000, time.Second, false) var wg sync.WaitGroup wg.Add(1) @@ -152,7 +152,7 @@ func TestRecentBeaconBlocksRPCHandler_ReturnsBlocks_ReconstructsPayload(t *testi r.cfg.chain = &mock.ChainService{ValidatorsRoot: [32]byte{}} pcl := protocol.ID(p2p.RPCBlocksByRootTopicV1) topic := string(pcl) - r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(10000, 10000, false) + r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(10000, 10000, time.Second, false) var wg sync.WaitGroup wg.Add(1) @@ -224,7 +224,7 @@ func TestRecentBeaconBlocks_RPCRequestSent(t *testing.T) { // Setup streams pcl := protocol.ID("/eth2/beacon_chain/req/beacon_blocks_by_root/1/ssz_snappy") topic := string(pcl) - r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(10000, 10000, false) + r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(10000, 10000, time.Second, false) var wg sync.WaitGroup wg.Add(1) @@ -261,7 +261,7 @@ func TestRecentBeaconBlocksRPCHandler_HandleZeroBlocks(t *testing.T) { r := &Service{cfg: &config{p2p: p1, beaconDB: d}, rateLimiter: newRateLimiter(p1)} pcl := protocol.ID(p2p.RPCBlocksByRootTopicV1) topic := string(pcl) - r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, false) + r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, time.Second, false) var wg sync.WaitGroup wg.Add(1) diff --git a/beacon-chain/sync/rpc_goodbye_test.go b/beacon-chain/sync/rpc_goodbye_test.go index a8f6d2990..29838c645 100644 --- a/beacon-chain/sync/rpc_goodbye_test.go +++ b/beacon-chain/sync/rpc_goodbye_test.go @@ -6,7 +6,6 @@ import ( "testing" "time" - "github.com/kevinms/leakybucket-go" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/protocol" mock "github.com/prysmaticlabs/prysm/v3/beacon-chain/blockchain/testing" @@ -15,6 +14,7 @@ import ( p2ptypes "github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/types" "github.com/prysmaticlabs/prysm/v3/config/params" types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives" + leakybucket "github.com/prysmaticlabs/prysm/v3/container/leaky-bucket" "github.com/prysmaticlabs/prysm/v3/testing/assert" "github.com/prysmaticlabs/prysm/v3/testing/require" "github.com/prysmaticlabs/prysm/v3/testing/util" @@ -44,7 +44,7 @@ func TestGoodByeRPCHandler_Disconnects_With_Peer(t *testing.T) { // Setup streams pcl := protocol.ID("/testing") topic := string(pcl) - r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, false) + r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, time.Second, false) var wg sync.WaitGroup wg.Add(1) p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) { @@ -89,7 +89,7 @@ func TestGoodByeRPCHandler_BackOffPeer(t *testing.T) { // Setup streams pcl := protocol.ID("/testing") topic := string(pcl) - r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, false) + r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, time.Second, false) var wg sync.WaitGroup wg.Add(1) p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) { @@ -166,7 +166,7 @@ func TestSendGoodbye_SendsMessage(t *testing.T) { // Setup streams pcl := protocol.ID("/eth2/beacon_chain/req/goodbye/1/ssz_snappy") topic := string(pcl) - r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, false) + r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, time.Second, false) var wg sync.WaitGroup wg.Add(1) p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) { @@ -211,7 +211,7 @@ func TestSendGoodbye_DisconnectWithPeer(t *testing.T) { // Setup streams pcl := protocol.ID("/eth2/beacon_chain/req/goodbye/1/ssz_snappy") topic := string(pcl) - r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, false) + r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, time.Second, false) var wg sync.WaitGroup wg.Add(1) p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) { diff --git a/beacon-chain/sync/rpc_metadata_test.go b/beacon-chain/sync/rpc_metadata_test.go index 300b09ba6..cebcf1bb9 100644 --- a/beacon-chain/sync/rpc_metadata_test.go +++ b/beacon-chain/sync/rpc_metadata_test.go @@ -7,7 +7,6 @@ import ( "testing" "time" - "github.com/kevinms/leakybucket-go" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/protocol" "github.com/prysmaticlabs/prysm/v3/beacon-chain/blockchain" @@ -18,6 +17,7 @@ import ( p2ptest "github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/testing" "github.com/prysmaticlabs/prysm/v3/config/params" "github.com/prysmaticlabs/prysm/v3/consensus-types/wrapper" + leakybucket "github.com/prysmaticlabs/prysm/v3/container/leaky-bucket" "github.com/prysmaticlabs/prysm/v3/encoding/ssz/equality" pb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1/metadata" @@ -53,7 +53,7 @@ func TestMetaDataRPCHandler_ReceivesMetadata(t *testing.T) { // Setup streams pcl := protocol.ID(p2p.RPCMetaDataTopicV1) topic := string(pcl) - r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, false) + r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, time.Second, false) var wg sync.WaitGroup wg.Add(1) p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) { @@ -112,8 +112,8 @@ func TestMetadataRPCHandler_SendsMetadata(t *testing.T) { // Setup streams pcl := protocol.ID(p2p.RPCMetaDataTopicV1 + r.cfg.p2p.Encoding().ProtocolSuffix()) topic := string(pcl) - r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, false) - r2.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, false) + r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, time.Second, false) + r2.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, time.Second, false) var wg sync.WaitGroup wg.Add(1) @@ -179,8 +179,8 @@ func TestMetadataRPCHandler_SendsMetadataAltair(t *testing.T) { // Setup streams pcl := protocol.ID(p2p.RPCMetaDataTopicV2 + r.cfg.p2p.Encoding().ProtocolSuffix()) topic := string(pcl) - r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(2, 2, false) - r2.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(2, 2, false) + r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(2, 2, time.Second, false) + r2.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(2, 2, time.Second, false) var wg sync.WaitGroup wg.Add(1) diff --git a/beacon-chain/sync/rpc_ping_test.go b/beacon-chain/sync/rpc_ping_test.go index 077207cbd..a604b4573 100644 --- a/beacon-chain/sync/rpc_ping_test.go +++ b/beacon-chain/sync/rpc_ping_test.go @@ -7,7 +7,6 @@ import ( "time" "github.com/ethereum/go-ethereum/p2p/enr" - "github.com/kevinms/leakybucket-go" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/protocol" mock "github.com/prysmaticlabs/prysm/v3/beacon-chain/blockchain/testing" @@ -17,6 +16,7 @@ import ( p2ptypes "github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/types" types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives" "github.com/prysmaticlabs/prysm/v3/consensus-types/wrapper" + leakybucket "github.com/prysmaticlabs/prysm/v3/container/leaky-bucket" pb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/v3/testing/assert" "github.com/prysmaticlabs/prysm/v3/testing/require" @@ -54,7 +54,7 @@ func TestPingRPCHandler_ReceivesPing(t *testing.T) { // Setup streams pcl := protocol.ID(p2p.RPCPingTopicV1) topic := string(pcl) - r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, false) + r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, time.Second, false) var wg sync.WaitGroup wg.Add(1) p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) { @@ -123,7 +123,7 @@ func TestPingRPCHandler_SendsPing(t *testing.T) { // Setup streams pcl := protocol.ID("/eth2/beacon_chain/req/ping/1/ssz_snappy") topic := string(pcl) - r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, false) + r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, time.Second, false) var wg sync.WaitGroup wg.Add(1) @@ -183,7 +183,7 @@ func TestPingRPCHandler_BadSequenceNumber(t *testing.T) { // Setup streams pcl := protocol.ID("/testing") topic := string(pcl) - r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, false) + r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, time.Second, false) var wg sync.WaitGroup wg.Add(1) p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) { diff --git a/beacon-chain/sync/rpc_status_test.go b/beacon-chain/sync/rpc_status_test.go index 0dbecce0c..55f62d06a 100644 --- a/beacon-chain/sync/rpc_status_test.go +++ b/beacon-chain/sync/rpc_status_test.go @@ -7,7 +7,6 @@ import ( "time" "github.com/ethereum/go-ethereum/p2p/enr" - "github.com/kevinms/leakybucket-go" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/protocol" mock "github.com/prysmaticlabs/prysm/v3/beacon-chain/blockchain/testing" @@ -25,6 +24,7 @@ import ( "github.com/prysmaticlabs/prysm/v3/consensus-types/interfaces" types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives" "github.com/prysmaticlabs/prysm/v3/consensus-types/wrapper" + leakybucket "github.com/prysmaticlabs/prysm/v3/container/leaky-bucket" "github.com/prysmaticlabs/prysm/v3/encoding/bytesutil" ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/v3/testing/assert" @@ -62,7 +62,7 @@ func TestStatusRPCHandler_Disconnects_OnForkVersionMismatch(t *testing.T) { } pcl := protocol.ID(p2p.RPCStatusTopicV1) topic := string(pcl) - r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, false) + r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, time.Second, false) var wg sync.WaitGroup wg.Add(1) @@ -77,7 +77,7 @@ func TestStatusRPCHandler_Disconnects_OnForkVersionMismatch(t *testing.T) { pcl2 := protocol.ID("/eth2/beacon_chain/req/goodbye/1/ssz_snappy") topic = string(pcl2) - r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, false) + r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, time.Second, false) var wg2 sync.WaitGroup wg2.Add(1) p2.BHost.SetStreamHandler(pcl2, func(stream network.Stream) { @@ -130,7 +130,7 @@ func TestStatusRPCHandler_ConnectsOnGenesis(t *testing.T) { } pcl := protocol.ID(p2p.RPCStatusTopicV1) topic := string(pcl) - r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, false) + r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, time.Second, false) var wg sync.WaitGroup wg.Add(1) @@ -214,7 +214,7 @@ func TestStatusRPCHandler_ReturnsHelloMessage(t *testing.T) { // Setup streams pcl := protocol.ID(p2p.RPCStatusTopicV1) topic := string(pcl) - r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, false) + r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, time.Second, false) var wg sync.WaitGroup wg.Add(1) p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) { @@ -317,7 +317,7 @@ func TestHandshakeHandlers_Roundtrip(t *testing.T) { // Setup streams pcl := protocol.ID("/eth2/beacon_chain/req/status/1/ssz_snappy") topic := string(pcl) - r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, false) + r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, time.Second, false) var wg sync.WaitGroup wg.Add(1) p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) { @@ -339,7 +339,7 @@ func TestHandshakeHandlers_Roundtrip(t *testing.T) { pcl = "/eth2/beacon_chain/req/ping/1/ssz_snappy" topic = string(pcl) - r2.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, false) + r2.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, time.Second, false) var wg2 sync.WaitGroup wg2.Add(1) p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) { @@ -434,7 +434,7 @@ func TestStatusRPCRequest_RequestSent(t *testing.T) { // Setup streams pcl := protocol.ID("/eth2/beacon_chain/req/status/1/ssz_snappy") topic := string(pcl) - r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, false) + r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, time.Second, false) var wg sync.WaitGroup wg.Add(1) p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) { @@ -542,7 +542,7 @@ func TestStatusRPCRequest_FinalizedBlockExists(t *testing.T) { // Setup streams pcl := protocol.ID("/eth2/beacon_chain/req/status/1/ssz_snappy") topic := string(pcl) - r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, false) + r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, time.Second, false) var wg sync.WaitGroup wg.Add(1) p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) { @@ -726,7 +726,7 @@ func TestStatusRPCRequest_FinalizedBlockSkippedSlots(t *testing.T) { // Setup streams pcl := protocol.ID("/eth2/beacon_chain/req/status/1/ssz_snappy") topic := string(pcl) - r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, false) + r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, time.Second, false) var wg sync.WaitGroup wg.Add(1) p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) { @@ -795,7 +795,7 @@ func TestStatusRPCRequest_BadPeerHandshake(t *testing.T) { // Setup streams pcl := protocol.ID("/eth2/beacon_chain/req/status/1/ssz_snappy") topic := string(pcl) - r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, false) + r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, time.Second, false) var wg sync.WaitGroup wg.Add(1) p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) { diff --git a/container/leaky-bucket/BUILD.bazel b/container/leaky-bucket/BUILD.bazel new file mode 100644 index 000000000..b57a9cda4 --- /dev/null +++ b/container/leaky-bucket/BUILD.bazel @@ -0,0 +1,22 @@ +load("@prysm//tools/go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = [ + "collector.go", + "heap.go", + "leakybucket.go", + ], + importpath = "github.com/prysmaticlabs/prysm/v3/container/leaky-bucket", + visibility = ["//visibility:public"], +) + +go_test( + name = "go_default_test", + srcs = [ + "collector_test.go", + "heap_test.go", + "leakybucket_test.go", + ], + embed = [":go_default_library"], +) diff --git a/container/leaky-bucket/LICENSE b/container/leaky-bucket/LICENSE new file mode 100644 index 000000000..d7d01856a --- /dev/null +++ b/container/leaky-bucket/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2019 kevinms + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/container/leaky-bucket/collector.go b/container/leaky-bucket/collector.go new file mode 100644 index 000000000..197df0797 --- /dev/null +++ b/container/leaky-bucket/collector.go @@ -0,0 +1,202 @@ +package leakybucket + +import ( + "container/heap" + "sync" + "time" +) + +//TODO: Finer grained locking. + +type bucketMap map[string]*LeakyBucket + +// A Collector can keep track of multiple LeakyBucket's. The caller does not +// directly interact with the buckets, but instead addresses them by a string +// key (e.g. IP address, hostname, hash, etc.) that is passed to most Collector +// methods. +// +// All Collector methods are goroutine safe. +type Collector struct { + buckets bucketMap + heap priorityQueue + rate float64 + capacity int64 + period time.Duration + lock sync.Mutex + quit chan bool +} + +// NewCollector creates a new Collector. When new buckets are created within +// the Collector, they will be assigned the capacity and rate of the Collector. +// A Collector does not provide a way to change the rate or capacity of +// bucket's within it. If different rates or capacities are required, either +// use multiple Collector's or manage your own LeakyBucket's. +// +// If deleteEmptyBuckets is true, a concurrent goroutine will be run that +// watches for bucket's that become empty and automatically removes them, +// freeing up memory resources. +func NewCollector(rate float64, capacity int64, period time.Duration, deleteEmptyBuckets bool) *Collector { + c := &Collector{ + buckets: make(bucketMap), + heap: make(priorityQueue, 0, 4096), + rate: rate, + capacity: capacity, + period: period, + quit: make(chan bool), + } + + if deleteEmptyBuckets { + c.PeriodicPrune() + } + + return c +} + +// Free releases the collector's resources. If the collector was created with +// deleteEmptyBuckets = true, then the goroutine looking for empty buckets, +// will be stopped. +func (c *Collector) Free() { + c.Reset() + close(c.quit) +} + +// Reset removes all internal buckets and resets the collector back to as if it +// was just created. +func (c *Collector) Reset() { + c.lock.Lock() + defer c.lock.Unlock() + + // Let the garbage collector do all the work. + c.buckets = make(bucketMap) + c.heap = make(priorityQueue, 0, 4096) +} + +// Capacity returns the collector's capacity. +func (c *Collector) Capacity() int64 { + return c.capacity +} + +// Rate returns the collector's rate. +func (c *Collector) Rate() float64 { + return c.rate +} + +// Remaining returns the remaining capacity of the internal bucket associated +// with key. If key is not associated with a bucket internally, it is treated +// as being empty. +func (c *Collector) Remaining(key string) int64 { + return c.capacity - c.Count(key) +} + +// Count returns the count of the internal bucket associated with key. If key +// is not associated with a bucket internally, it is treated as being empty. +func (c *Collector) Count(key string) int64 { + c.lock.Lock() + defer c.lock.Unlock() + + b, ok := c.buckets[key] + if !ok || b == nil { + return 0 + } + + return b.Count() +} + +// TillEmpty returns how much time must pass until the internal bucket +// associated with key is empty. If key is not associated with a bucket +// internally, it is treated as being empty. +func (c *Collector) TillEmpty(key string) time.Duration { + c.lock.Lock() + defer c.lock.Unlock() + + b, ok := c.buckets[key] + if !ok || b == nil { + return 0 + } + + return b.TillEmpty() +} + +// Remove deletes the internal bucket associated with key. If key is not +// associated with a bucket internally, nothing is done. +func (c *Collector) Remove(key string) { + c.lock.Lock() + defer c.lock.Unlock() + + b, ok := c.buckets[key] + if !ok || b == nil { + return + } + + delete(c.buckets, b.key) + heap.Remove(&c.heap, b.index) +} + +// Add 'amount' to the internal bucket associated with key, up to it's +// capacity. Returns how much was added to the bucket. If the return is less +// than 'amount', then the bucket's capacity was reached. +// +// If key is not associated with a bucket internally, a new bucket is created +// and amount is added to it. +func (c *Collector) Add(key string, amount int64) int64 { + c.lock.Lock() + defer c.lock.Unlock() + + b, ok := c.buckets[key] + + if !ok || b == nil { + // Create a new bucket. + b = &LeakyBucket{ + key: key, + capacity: c.capacity, + rate: c.rate, + period: c.period, + p: now(), + } + c.heap.Push(b) + c.buckets[key] = b + } + + n := b.Add(amount) + + if n > 0 { + heap.Fix(&c.heap, b.index) + } + + return n +} + +// Prune removes all empty buckets in the collector. +func (c *Collector) Prune() { + c.lock.Lock() + for c.heap.Peak() != nil { + b := c.heap.Peak() + + if now().Before(b.p) { + // The bucket isn't empty. + break + } + + // The bucket should be empty. + delete(c.buckets, b.key) + heap.Remove(&c.heap, b.index) + } + c.lock.Unlock() +} + +// PeriodicPrune runs a concurrent goroutine that calls Prune() at the given +// time interval. +func (c *Collector) PeriodicPrune() { + go func() { + ticker := time.NewTicker(c.period) + for { + select { + case <-ticker.C: + c.Prune() + case <-c.quit: + ticker.Stop() + return + } + } + }() +} diff --git a/container/leaky-bucket/collector_test.go b/container/leaky-bucket/collector_test.go new file mode 100644 index 000000000..5fa508b86 --- /dev/null +++ b/container/leaky-bucket/collector_test.go @@ -0,0 +1,210 @@ +package leakybucket + +import ( + "fmt" + "testing" + "time" +) + +func TestNewCollector(t *testing.T) { + rate := 1.0 + capacity := int64(2) + c := NewCollector(rate, capacity, time.Second, true) + + if c.buckets == nil { + t.Fatal("Didn't initialize priority?!") + } + if c.heap == nil { + t.Fatal("Didn't initialize priority?!") + } + if c.rate != rate || c.Rate() != rate { + t.Fatal("Wrong rate?!") + } + if c.capacity != capacity || c.Capacity() != capacity { + t.Fatal("Wrong capacity?!") + } + + c.Free() +} + +func TestNewCollector_LargerPeriod(t *testing.T) { + testNow := now + now = time.Now + defer func() { + now = testNow + }() + rate := 10.0 + capacity := int64(20) + c := NewCollector(rate, capacity, 5*time.Second, true) + + c.Add("test", 10) + c.Add("test", 10) + + if c.Remaining("test") != 0 { + t.Errorf("Excess capacity exists of: %d", c.Remaining("test")) + } + time.Sleep(1 * time.Second) + if c.Remaining("test") >= 20 { + t.Errorf("Excess capacity exists in: %d", c.Remaining("test")) + } + time.Sleep(4 * time.Second) + + if c.Add("test", 10) != 10 { + t.Errorf("Internal counter not refreshed: %d", c.Count("test")) + } + c.Free() +} + +var collectorSimple = testSet{ + capacity: int64(5), + rate: 1.0, + set: []actionSet{ + {}, + {1, "add", 1}, + {1, "time-set", time.Nanosecond}, + {1, "till", time.Second - time.Nanosecond}, + {1, "time-set", time.Second - time.Nanosecond}, + {1, "till", time.Nanosecond}, + {0, "time-set", time.Second}, + {0, "till", time.Duration(0)}, + {1, "add", 1}, + {1, "time-add", time.Second / 2}, + {1, "till", time.Second / 2}, + {2, "add", 1}, + {2, "time-add", time.Second/2 - time.Nanosecond}, + {0, "time-add", time.Second * time.Duration(5)}, + {1, "add", 1}, + {2, "add", 1}, + {3, "add", 1}, + {4, "add", 1}, + {5, "add", 1}, + {5, "add", 1}, + {5, "till", time.Second * 5}, + }, +} + +var collectorVaried = testSet{ + capacity: 1000, + rate: 60.0, + set: []actionSet{ + {}, + {100, "add", 100}, + {100, "time-set", time.Nanosecond}, + {1000, "add", 1000}, + {1000, "add", 1}, + {940, "time-set", time.Second}, + }, +} + +func runKey(t *testing.T, c *Collector, key string, test *testSet) { + setElapsed(0) + capacity := c.Capacity() + + for i, v := range test.set { + switch v.action { + case "add": + count := c.Count(key) + remaining := test.capacity - count + amount := int64(v.value.(int)) + n := c.Add(key, amount) + if n < amount { + // The bucket should be full now. + if n < remaining { + t.Fatalf("Test %d: Bucket should have been filled by this Add()?!", i) + } + } + case "time-set": + setElapsed(v.value.(time.Duration)) + case "time-add": + addToElapsed(v.value.(time.Duration)) + case "till": + dt := c.TillEmpty(key) + if dt != v.value.(time.Duration) { + t.Fatalf("%s -> Test %d: Bad TillEmpty(). Expected %v, got %v", key, i, v.value, dt) + } + } + count := c.Count(key) + if count != v.count { + t.Fatalf("%s -> Test %d: Bad count. Expected %d, got %d", key, i, v.count, count) + } + if count > capacity { + t.Fatalf("%s -> Test %d: Went over capacity?!", key, i) + } + if c.Remaining(key) != test.capacity-v.count { + t.Fatalf("Test %d: Expected remaining value %d, got %d", + i, test.capacity-v.count, c.Remaining(key)) + } + } +} + +func TestCollector(t *testing.T) { + setElapsed(0) + + tests := []testSet{ + collectorSimple, + collectorSimple, + collectorVaried, + } + + for i, test := range tests { + fmt.Println("Running testSet:", i) + + key := "127.0.0.1" + + c := NewCollector(test.rate, test.capacity, time.Second, false) + + // Run and test Remove() + runKey(t, c, key, &test) + c.Remove(key) + if c.Count(key) > 0 { + t.Fatal("Key still has a count after removal?!") + } + + // Run again and test Prune() + runKey(t, c, "127.0.0.1", &test) + c.Prune() + setElapsed(time.Hour) + c.Prune() + + // Run again and test Reset(). + runKey(t, c, "127.0.0.1", &test) + c.Reset() + if c.Count(key) != 0 { + t.Fatal("Key still has a count after removal?!") + } + if c.TillEmpty(key) != 0 { + t.Fatal("Key still has time till empty?!") + } + + // Try to remove a non-exist bucket. + c.Remove("fake") + if c.Count("fake") != 0 { + t.Fatal("Key still has a count after removal?!") + } + } +} + +func TestPeriodicPrune(t *testing.T) { + setElapsed(0) + key := "localhost" + c := NewCollector(1e7, 8, time.Second, false) + c.PeriodicPrune() + n := c.Add(key, 100) + if n != 8 { + t.Fatal("Didn't fill bucket?!") + } + + fmt.Printf("TillEmpty(): %v\n", c.TillEmpty(key)) + + // Wait for the periodic prune. + wait := time.Millisecond + time.Sleep(wait) + setElapsed(wait) + + count := c.Count(key) + if count != 0 { + t.Fatalf("Key's bucket is not empty: %d?!", count) + } + + c.Free() +} diff --git a/container/leaky-bucket/heap.go b/container/leaky-bucket/heap.go new file mode 100644 index 000000000..cd3a64485 --- /dev/null +++ b/container/leaky-bucket/heap.go @@ -0,0 +1,47 @@ +package leakybucket + +import "fmt" + +// Based on the example implementation of priority queue found in the +// container/heap package docs: https://golang.org/pkg/container/heap/ +type priorityQueue []*LeakyBucket + +func (pq priorityQueue) Len() int { + return len(pq) +} + +func (pq priorityQueue) Peak() *LeakyBucket { + if len(pq) <= 0 { + return nil + } + return pq[0] +} + +func (pq priorityQueue) Less(i, j int) bool { + return pq[i].p.Before(pq[j].p) +} + +func (pq priorityQueue) Swap(i, j int) { + pq[i], pq[j] = pq[j], pq[i] + pq[i].index = i + pq[j].index = j +} + +func (pq *priorityQueue) Push(x interface{}) { + n := len(*pq) + b, ok := x.(*LeakyBucket) + if !ok { + panic(fmt.Sprintf("%T", x)) + } + b.index = n + *pq = append(*pq, b) +} + +func (pq *priorityQueue) Pop() interface{} { + old := *pq + n := len(old) + b := old[n-1] + b.index = -1 // for safety + *pq = old[0 : n-1] + return b +} diff --git a/container/leaky-bucket/heap_test.go b/container/leaky-bucket/heap_test.go new file mode 100644 index 000000000..f153d6d59 --- /dev/null +++ b/container/leaky-bucket/heap_test.go @@ -0,0 +1,107 @@ +package leakybucket + +import ( + "testing" + "time" +) + +func TestLen(t *testing.T) { + q := make(priorityQueue, 0, 4096) + + if q.Len() != 0 { + t.Fatal("Queue should be empty?!") + } + + for i := 1; i <= 5; i++ { + b := NewLeakyBucket(1.0, 5, time.Second) + q.Push(b) + + l := q.Len() + if l != i { + t.Fatalf("Expected length %d, got %d", i, l) + } + } + for i := 4; i >= 0; i-- { + q.Pop() + + l := q.Len() + if l != i { + t.Fatalf("Expected length %d, got %d", i, l) + } + } +} + +func TestPeak(t *testing.T) { + q := make(priorityQueue, 0, 4096) + + for i := 0; i < 5; i++ { + b := NewLeakyBucket(1.0, 5, time.Second) + q.Push(b) + } +} + +func TestLess(t *testing.T) { + q := make(priorityQueue, 0, 4096) + + for i := 0; i < 5; i++ { + b := NewLeakyBucket(1.0, 5, time.Second) + b.p = now().Add(time.Duration(i)) + q.Push(b) + } + + for i, j := 0, 4; i < 5; i, j = i+1, j-1 { + if i < j && !q.Less(i, j) { + t.Fatal("Less is more?!") + } + } +} + +func TestSwap(t *testing.T) { + q := make(priorityQueue, 0, 4096) + + for i := 0; i < 5; i++ { + b := NewLeakyBucket(1.0, 5, time.Second) + q.Push(b) + } + + i := 2 + j := 4 + + bi := q[i] + bj := q[j] + + q.Swap(i, j) + + if bi != q[j] || bj != q[i] { + t.Fatal("Element weren't swapped?!") + } +} + +func TestPush(t *testing.T) { + q := make(priorityQueue, 0, 4096) + + for i := 0; i < 5; i++ { + b := NewLeakyBucket(1.0, 5, time.Second) + q.Push(b) + + if b != q[len(q)-1] { + t.Fatal("Push should append to queue.") + } + } +} + +func TestPop(t *testing.T) { + q := make(priorityQueue, 0, 4096) + + for i := 1; i <= 5; i++ { + b := NewLeakyBucket(1.0, 5, time.Second) + q.Push(b) + } + + for i := 1; i <= 5; i++ { + b := q[len(q)-1] + if b != q.Pop() { + t.Fatal("Pop should remove from end of queue.") + } + } +} diff --git a/container/leaky-bucket/leakybucket.go b/container/leaky-bucket/leakybucket.go new file mode 100644 index 000000000..d32982181 --- /dev/null +++ b/container/leaky-bucket/leakybucket.go @@ -0,0 +1,155 @@ +/* +Package leakybucket implements a scalable leaky bucket algorithm. + +There are at least two different definitions of the leaky bucket algorithm. +This package implements the leaky bucket as a meter. For more details see: + +https://en.wikipedia.org/wiki/Leaky_bucket#As_a_meter + +This means it is the exact mirror of a token bucket. + + // New LeakyBucket that leaks at the rate of 0.5/sec and a total capacity of 10. + b := NewLeakyBucket(0.5, 10) + + b.Add(5) + b.Add(5) + // Bucket is now full! + + n := b.Add(1) + // n == 0 + + +A Collector is a convenient way to keep track of multiple LeakyBucket's. +Buckets are associated with string keys for fast lookup. It can dynamically +add new buckets and automatically remove them as they become empty, freeing +up resources. + + // New Collector that leaks at 1 MiB/sec, a total capacity of 10 MiB and + // automatic removal of bucket's when they become empty. + const megabyte = 1<<20 + c := NewCollector(megabyte, megabyte*10, true) + + // Attempt to add 100 MiB to a bucket associated with an IP. + n := c.Add("192.168.0.42", megabyte*100) + + // 100 MiB is over the capacity, so only 10 MiB is actually added. + // n equals 10 MiB. +*/ +package leakybucket + +import ( + "math" + "time" +) + +// Makes it easy to test time based things. +var now = time.Now + +// LeakyBucket represents a bucket that leaks at a constant rate. +type LeakyBucket struct { + // The identifying key, used for map lookups. + key string + + // How large the bucket is. + capacity int64 + + // Amount the bucket leaks per time duration. + rate float64 + + // The priority of the bucket in a min-heap priority queue, where p is the + // exact time the bucket will have leaked enough to be empty. Buckets that + // are empty or will be the soonest are at the top of the heap. This allows + // for quick pruning of empty buckets that scales very well. p is adjusted + // any time an amount is added to the Queue(). + p time.Time + + // The time duration through which the leaky bucket is + // assessed. + period time.Duration + + // The index is maintained by the heap.Interface methods. + index int +} + +// NewLeakyBucket creates a new LeakyBucket with the give rate and capacity. +func NewLeakyBucket(rate float64, capacity int64, period time.Duration) *LeakyBucket { + return &LeakyBucket{ + rate: rate, + capacity: capacity, + period: period, + p: now(), + } +} + +// Count returns the bucket's current count. +func (b *LeakyBucket) Count() int64 { + if !now().Before(b.p) { + return 0 + } + + nsRemaining := float64(b.p.Sub(now())) + nsPerDrip := float64(b.period) / b.rate + count := int64(math.Ceil(nsRemaining / nsPerDrip)) + + return count +} + +// Rate returns the amount the bucket leaks per second. +func (b *LeakyBucket) Rate() float64 { + return b.rate +} + +// Capacity returns the bucket's capacity. +func (b *LeakyBucket) Capacity() int64 { + return b.capacity +} + +// Remaining returns the bucket's remaining capacity. +func (b *LeakyBucket) Remaining() int64 { + return b.capacity - b.Count() +} + +// ChangeCapacity changes the bucket's capacity. +// +// If the bucket's current count is greater than the new capacity, the count +// will be decreased to match the new capacity. +func (b *LeakyBucket) ChangeCapacity(capacity int64) { + diff := float64(capacity - b.capacity) + + if diff < 0 && b.Count() > capacity { + // We are shrinking the capacity and the new bucket size can't hold all + // the current contents. Dump the extra and adjust the time till empty. + nsPerDrip := float64(b.period) / b.rate + b.p = now().Add(time.Duration(nsPerDrip * float64(capacity))) + } + b.capacity = capacity +} + +// TillEmpty returns how much time must pass until the bucket is empty. +func (b *LeakyBucket) TillEmpty() time.Duration { + return b.p.Sub(now()) +} + +// Add 'amount' to the bucket's count, up to it's capacity. Returns how much +// was added to the bucket. If the return is less than 'amount', then the +// bucket's capacity was reached. +func (b *LeakyBucket) Add(amount int64) int64 { + count := b.Count() + if count >= b.capacity { + // The bucket is full. + return 0 + } + + if !now().Before(b.p) { + // The bucket needs to be reset. + b.p = now() + } + remaining := b.capacity - count + if amount > remaining { + amount = remaining + } + t := time.Duration(float64(b.period) * (float64(amount) / b.rate)) + b.p = b.p.Add(t) + + return amount +} diff --git a/container/leaky-bucket/leakybucket_test.go b/container/leaky-bucket/leakybucket_test.go new file mode 100644 index 000000000..4c3fce51a --- /dev/null +++ b/container/leaky-bucket/leakybucket_test.go @@ -0,0 +1,181 @@ +package leakybucket + +import ( + "fmt" + "os" + "sync/atomic" + "testing" + "time" +) + +// Arbitrary start time. +var start = time.Date(1990, 1, 2, 0, 0, 0, 0, time.UTC).Round(0) +var elapsed int64 + +// We provide atomic access to elapsed to avoid data races between multiple +// concurrent goroutines during the tests. +func getElapsed() time.Duration { + return time.Duration(atomic.LoadInt64(&elapsed)) +} +func setElapsed(v time.Duration) { + atomic.StoreInt64(&elapsed, int64(v)) +} +func addToElapsed(v time.Duration) { + atomic.AddInt64(&elapsed, int64(v)) +} + +func reset(t *testing.T, c *Collector) { + c.Reset() + setElapsed(0) +} + +func TestNewLeakyBucket(t *testing.T) { + rate := 1.0 + capacity := int64(5) + b := NewLeakyBucket(rate, capacity, time.Second) + + if b.p != now() { + t.Fatal("Didn't initialize priority?!") + } + if b.rate != rate || b.Rate() != rate { + t.Fatal("Wrong rate?!") + } + if b.capacity != capacity || b.Capacity() != capacity { + t.Fatal("Wrong capacity?!") + } +} + +type actionSet struct { + count int64 + action string + value interface{} +} + +type testSet struct { + capacity int64 + rate float64 + set []actionSet +} + +var oneAtaTime = testSet{ + capacity: 5, + rate: 1.0, + set: []actionSet{ + {}, + {1, "add", 1}, + {1, "time-set", time.Nanosecond}, + {1, "till", time.Second - time.Nanosecond}, + {1, "time-set", time.Second - time.Nanosecond}, + {1, "till", time.Nanosecond}, + {0, "time-set", time.Second}, + {0, "till", time.Duration(0)}, + + // Add a couple. + {1, "add", 1}, + {1, "time-add", time.Second / 2}, + {1, "till", time.Second / 2}, + {2, "add", 1}, + {2, "time-add", time.Second/2 - time.Nanosecond}, + + // Monkey with the capacity and make sure Count()/TillEmpty() are + // adjusted as needed. + {2, "cap", 5 + 1}, + {2, "till", time.Second + time.Nanosecond}, + {2, "cap", 5 - 1}, + {2, "till", time.Second + time.Nanosecond}, + {1, "cap", 1}, + {1, "till", time.Second}, + {1, "cap", 4}, + {1, "till", time.Second}, + + // Test the full cases. + {0, "time-add", time.Second * time.Duration(5)}, + {1, "add", 1}, + {2, "add", 1}, + {3, "add", 1}, + {4, "add", 1}, + {4, "add", 1}, + {4, "till", time.Second * 4}, + }, +} + +var varied = testSet{ + capacity: 1000, + rate: 60.0, + set: []actionSet{ + {}, + {100, "add", 100}, + {100, "time-set", time.Nanosecond}, + {1000, "add", 1000}, + {1000, "add", 1}, + {940, "time-set", time.Second}, + }, +} + +func runTest(t *testing.T, test *testSet) { + setElapsed(0) + b := NewLeakyBucket(test.rate, test.capacity, time.Second) + + for i, v := range test.set { + switch v.action { + case "add": + count := b.Count() + remaining := test.capacity - count + amount := int64(v.value.(int)) + n := b.Add(amount) + if n < amount { + // The bucket should be full now. + if n < remaining { + t.Fatalf("Test %d: Bucket should have been filled by this Add()?!", i) + } + } + case "time-set": + setElapsed(v.value.(time.Duration)) + case "cap": + b.ChangeCapacity(int64(v.value.(int))) + test.capacity = b.Capacity() + case "time-add": + addToElapsed(v.value.(time.Duration)) + case "till": + dt := b.TillEmpty() + if dt != v.value.(time.Duration) { + t.Fatalf("Test %d: Bad TillEmpty(). Expected %v, got %v", i, v.value, dt) + } + case "debug": + fmt.Println("elapsed:", getElapsed()) + fmt.Println("tillEmpty:", b.TillEmpty()) + fmt.Println("count:", b.Count()) + } + c := b.Count() + if c != v.count { + t.Fatalf("Test %d: Bad count. Expected %d, got %d", i, v.count, c) + } + if c > test.capacity { + t.Fatalf("Test %d: Went over capacity?!", i) + } + if b.Remaining() != test.capacity-v.count { + t.Fatalf("Test %d: Expected remaining value %d, got %d", + i, test.capacity-v.count, b.Remaining()) + } + } +} + +func TestLeakyBucket(t *testing.T) { + tests := []testSet{ + oneAtaTime, + varied, + } + + for i, test := range tests { + fmt.Println("Running testSet:", i) + runTest(t, &test) + } +} + +func TestMain(m *testing.M) { + // Override what now() function the leakybucket library uses. + // This greatly increases testability! + now = func() time.Time { return start.Add(getElapsed()) } + + os.Exit(m.Run()) +} diff --git a/deps.bzl b/deps.bzl index 7f8377c74..8002d624d 100644 --- a/deps.bzl +++ b/deps.bzl @@ -1883,13 +1883,6 @@ def prysm_deps(): version = "v0.0.2", ) - go_repository( - name = "com_github_kevinms_leakybucket_go", - importpath = "github.com/kevinms/leakybucket-go", - sum = "h1:qNtd6alRqd3qOdPrKXMZImV192ngQ0WSh1briEO33Tk=", - version = "v0.0.0-20200115003610-082473db97ca", - ) - go_repository( name = "com_github_kisielk_errcheck", importpath = "github.com/kisielk/errcheck", diff --git a/go.mod b/go.mod index ada72a91b..f9fab3bae 100644 --- a/go.mod +++ b/go.mod @@ -39,7 +39,6 @@ require ( github.com/joonix/log v0.0.0-20200409080653-9c1d2ceb5f1d github.com/json-iterator/go v1.1.12 github.com/k0kubun/go-ansi v0.0.0-20180517002512-3bf9e2903213 - github.com/kevinms/leakybucket-go v0.0.0-20200115003610-082473db97ca github.com/kr/pretty v0.3.0 github.com/libp2p/go-libp2p v0.22.0 github.com/libp2p/go-libp2p-pubsub v0.8.0 diff --git a/go.sum b/go.sum index e9fcf26ce..db0070579 100644 --- a/go.sum +++ b/go.sum @@ -609,8 +609,6 @@ github.com/k0kubun/go-ansi v0.0.0-20180517002512-3bf9e2903213 h1:qGQQKEcAR99REcM github.com/k0kubun/go-ansi v0.0.0-20180517002512-3bf9e2903213/go.mod h1:vNUNkEQ1e29fT/6vq2aBdFsgNPmy8qMdSay1npru+Sw= github.com/karalabe/usb v0.0.2 h1:M6QQBNxF+CQ8OFvxrT90BA0qBOXymndZnk5q235mFc4= github.com/karalabe/usb v0.0.2/go.mod h1:Od972xHfMJowv7NGVDiWVxk2zxnWgjLlJzE+F4F7AGU= -github.com/kevinms/leakybucket-go v0.0.0-20200115003610-082473db97ca h1:qNtd6alRqd3qOdPrKXMZImV192ngQ0WSh1briEO33Tk= -github.com/kevinms/leakybucket-go v0.0.0-20200115003610-082473db97ca/go.mod h1:ph+C5vpnCcQvKBwJwKLTK3JLNGnBXYlG7m7JjoC/zYA= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=