Vendor Leaky Bucket Implementation (#11560)

* add changes

* fix tests

* change to minute

* remove dep

* remove

* fix tests

* add test for period

* improve

* linter

* build files

* ci

* make it stricter

* fix tests

* fix

* Update beacon-chain/sync/rate_limiter.go

Co-authored-by: Preston Van Loon <preston@prysmaticlabs.com>

Co-authored-by: terencechain <terence@prysmaticlabs.com>
Co-authored-by: Preston Van Loon <preston@prysmaticlabs.com>
This commit is contained in:
Nishant Das 2022-10-21 05:40:13 +08:00 committed by GitHub
parent 4bd4d6392d
commit 661cbc45ae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 1063 additions and 104 deletions

View File

@ -57,6 +57,7 @@ go_library(
"//config/params:go_default_library",
"//consensus-types/primitives:go_default_library",
"//consensus-types/wrapper:go_default_library",
"//container/leaky-bucket:go_default_library",
"//crypto/ecdsa:go_default_library",
"//crypto/hash:go_default_library",
"//encoding/bytesutil:go_default_library",
@ -74,7 +75,6 @@ go_library(
"@com_github_ethereum_go_ethereum//p2p/discover:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/enode:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/enr:go_default_library",
"@com_github_kevinms_leakybucket_go//:go_default_library",
"@com_github_kr_pretty//:go_default_library",
"@com_github_libp2p_go_libp2p//:go_default_library",
"@com_github_libp2p_go_libp2p//config:go_default_library",
@ -150,6 +150,7 @@ go_test(
"//config/params:go_default_library",
"//consensus-types/primitives:go_default_library",
"//consensus-types/wrapper:go_default_library",
"//container/leaky-bucket:go_default_library",
"//crypto/ecdsa:go_default_library",
"//crypto/hash:go_default_library",
"//encoding/bytesutil:go_default_library",
@ -168,7 +169,6 @@ go_test(
"@com_github_ethereum_go_ethereum//p2p/enode:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/enr:go_default_library",
"@com_github_golang_snappy//:go_default_library",
"@com_github_kevinms_leakybucket_go//:go_default_library",
"@com_github_libp2p_go_libp2p//:go_default_library",
"@com_github_libp2p_go_libp2p//core/crypto:go_default_library",
"@com_github_libp2p_go_libp2p//core/host:go_default_library",

View File

@ -4,8 +4,8 @@ import (
"context"
"fmt"
"testing"
"time"
"github.com/kevinms/leakybucket-go"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/peer"
ma "github.com/multiformats/go-multiaddr"
@ -13,6 +13,7 @@ import (
"github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/peers/peerdata"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/peers/scorers"
mockp2p "github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/testing"
leakybucket "github.com/prysmaticlabs/prysm/v3/container/leaky-bucket"
ethpb "github.com/prysmaticlabs/prysm/v3/proto/eth/v1"
"github.com/prysmaticlabs/prysm/v3/testing/assert"
"github.com/prysmaticlabs/prysm/v3/testing/require"
@ -26,7 +27,7 @@ func TestPeer_AtMaxLimit(t *testing.T) {
listen, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", ipAddr, 2000))
require.NoError(t, err, "Failed to p2p listen")
s := &Service{
ipLimiter: leakybucket.NewCollector(ipLimit, ipBurst, false),
ipLimiter: leakybucket.NewCollector(ipLimit, ipBurst, 1*time.Second, false),
}
s.peers = peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 0,
@ -70,7 +71,7 @@ func TestPeer_AtMaxLimit(t *testing.T) {
func TestService_InterceptBannedIP(t *testing.T) {
s := &Service{
ipLimiter: leakybucket.NewCollector(ipLimit, ipBurst, false),
ipLimiter: leakybucket.NewCollector(ipLimit, ipBurst, 1*time.Second, false),
peers: peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 20,
ScorerParams: &scorers.Config{},
@ -98,7 +99,7 @@ func TestService_InterceptBannedIP(t *testing.T) {
func TestService_RejectInboundPeersBeyondLimit(t *testing.T) {
limit := 20
s := &Service{
ipLimiter: leakybucket.NewCollector(ipLimit, ipBurst, false),
ipLimiter: leakybucket.NewCollector(ipLimit, ipBurst, 1*time.Second, false),
peers: peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: limit,
ScorerParams: &scorers.Config{},
@ -140,7 +141,7 @@ func TestPeer_BelowMaxLimit(t *testing.T) {
listen, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", ipAddr, 2000))
require.NoError(t, err, "Failed to p2p listen")
s := &Service{
ipLimiter: leakybucket.NewCollector(ipLimit, ipBurst, false),
ipLimiter: leakybucket.NewCollector(ipLimit, ipBurst, 1*time.Second, false),
}
s.peers = peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 1,
@ -191,7 +192,7 @@ func TestPeerAllowList(t *testing.T) {
listen, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", ipAddr, 2000))
require.NoError(t, err, "Failed to p2p listen")
s := &Service{
ipLimiter: leakybucket.NewCollector(ipLimit, ipBurst, false),
ipLimiter: leakybucket.NewCollector(ipLimit, ipBurst, 1*time.Second, false),
peers: peers.NewStatus(context.Background(), &peers.StatusConfig{
ScorerParams: &scorers.Config{},
}),
@ -237,7 +238,7 @@ func TestPeerDenyList(t *testing.T) {
listen, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", ipAddr, 2000))
require.NoError(t, err, "Failed to p2p listen")
s := &Service{
ipLimiter: leakybucket.NewCollector(ipLimit, ipBurst, false),
ipLimiter: leakybucket.NewCollector(ipLimit, ipBurst, 1*time.Second, false),
peers: peers.NewStatus(context.Background(), &peers.StatusConfig{
ScorerParams: &scorers.Config{},
}),
@ -272,7 +273,7 @@ func TestPeerDenyList(t *testing.T) {
func TestService_InterceptAddrDial_Allow(t *testing.T) {
s := &Service{
ipLimiter: leakybucket.NewCollector(ipLimit, ipBurst, false),
ipLimiter: leakybucket.NewCollector(ipLimit, ipBurst, 1*time.Second, false),
peers: peers.NewStatus(context.Background(), &peers.StatusConfig{
ScorerParams: &scorers.Config{},
}),
@ -292,7 +293,7 @@ func TestService_InterceptAddrDial_Allow(t *testing.T) {
func TestService_InterceptAddrDial_Public(t *testing.T) {
s := &Service{
ipLimiter: leakybucket.NewCollector(ipLimit, ipBurst, false),
ipLimiter: leakybucket.NewCollector(ipLimit, ipBurst, 1*time.Second, false),
peers: peers.NewStatus(context.Background(), &peers.StatusConfig{
ScorerParams: &scorers.Config{},
}),
@ -340,7 +341,7 @@ func TestService_InterceptAddrDial_Public(t *testing.T) {
func TestService_InterceptAddrDial_Private(t *testing.T) {
s := &Service{
ipLimiter: leakybucket.NewCollector(ipLimit, ipBurst, false),
ipLimiter: leakybucket.NewCollector(ipLimit, ipBurst, 1*time.Second, false),
peers: peers.NewStatus(context.Background(), &peers.StatusConfig{
ScorerParams: &scorers.Config{},
}),
@ -369,7 +370,7 @@ func TestService_InterceptAddrDial_Private(t *testing.T) {
func TestService_InterceptAddrDial_AllowPrivate(t *testing.T) {
s := &Service{
ipLimiter: leakybucket.NewCollector(ipLimit, ipBurst, false),
ipLimiter: leakybucket.NewCollector(ipLimit, ipBurst, 1*time.Second, false),
peers: peers.NewStatus(context.Background(), &peers.StatusConfig{
ScorerParams: &scorers.Config{},
}),
@ -398,7 +399,7 @@ func TestService_InterceptAddrDial_AllowPrivate(t *testing.T) {
func TestService_InterceptAddrDial_DenyPublic(t *testing.T) {
s := &Service{
ipLimiter: leakybucket.NewCollector(ipLimit, ipBurst, false),
ipLimiter: leakybucket.NewCollector(ipLimit, ipBurst, 1*time.Second, false),
peers: peers.NewStatus(context.Background(), &peers.StatusConfig{
ScorerParams: &scorers.Config{},
}),
@ -427,7 +428,7 @@ func TestService_InterceptAddrDial_DenyPublic(t *testing.T) {
func TestService_InterceptAddrDial_AllowConflict(t *testing.T) {
s := &Service{
ipLimiter: leakybucket.NewCollector(ipLimit, ipBurst, false),
ipLimiter: leakybucket.NewCollector(ipLimit, ipBurst, 1*time.Second, false),
peers: peers.NewStatus(context.Background(), &peers.StatusConfig{
ScorerParams: &scorers.Config{},
}),

View File

@ -16,7 +16,6 @@ import (
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/kevinms/leakybucket-go"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
@ -31,6 +30,7 @@ import (
testp2p "github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/testing"
"github.com/prysmaticlabs/prysm/v3/config/params"
"github.com/prysmaticlabs/prysm/v3/consensus-types/wrapper"
leakybucket "github.com/prysmaticlabs/prysm/v3/container/leaky-bucket"
"github.com/prysmaticlabs/prysm/v3/encoding/bytesutil"
prysmNetwork "github.com/prysmaticlabs/prysm/v3/network"
ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
@ -246,7 +246,7 @@ func TestInboundPeerLimit(t *testing.T) {
fakePeer := testp2p.NewTestP2P(t)
s := &Service{
cfg: &Config{MaxPeers: 30},
ipLimiter: leakybucket.NewCollector(ipLimit, ipBurst, false),
ipLimiter: leakybucket.NewCollector(ipLimit, ipBurst, 1*time.Second, false),
peers: peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 30,
ScorerParams: &scorers.Config{},

View File

@ -11,7 +11,6 @@ import (
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/kevinms/leakybucket-go"
"github.com/libp2p/go-libp2p"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/host"
@ -29,6 +28,7 @@ import (
"github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/peers/scorers"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/types"
"github.com/prysmaticlabs/prysm/v3/config/params"
leakybucket "github.com/prysmaticlabs/prysm/v3/container/leaky-bucket"
prysmnetwork "github.com/prysmaticlabs/prysm/v3/network"
"github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1/metadata"
"github.com/prysmaticlabs/prysm/v3/runtime"
@ -123,7 +123,7 @@ func NewService(ctx context.Context, cfg *Config) (*Service, error) {
log.WithError(err).Error("Failed to create address filter")
return nil, err
}
s.ipLimiter = leakybucket.NewCollector(ipLimit, ipBurst, true /* deleteEmptyBuckets */)
s.ipLimiter = leakybucket.NewCollector(ipLimit, ipBurst, 30*time.Second, true /* deleteEmptyBuckets */)
opts := s.buildOptions(ipAddr, s.privKey)
h, err := libp2p.New(opts...)

View File

@ -88,6 +88,7 @@ go_library(
"//consensus-types/interfaces:go_default_library",
"//consensus-types/primitives:go_default_library",
"//consensus-types/wrapper:go_default_library",
"//container/leaky-bucket:go_default_library",
"//container/slice:go_default_library",
"//crypto/bls:go_default_library",
"//crypto/rand:go_default_library",
@ -105,7 +106,6 @@ go_library(
"//time/slots:go_default_library",
"@com_github_ethereum_go_ethereum//common/hexutil:go_default_library",
"@com_github_hashicorp_golang_lru//:go_default_library",
"@com_github_kevinms_leakybucket_go//:go_default_library",
"@com_github_libp2p_go_libp2p//core:go_default_library",
"@com_github_libp2p_go_libp2p//core/host:go_default_library",
"@com_github_libp2p_go_libp2p//core/network:go_default_library",
@ -204,6 +204,7 @@ go_test(
"//consensus-types/interfaces:go_default_library",
"//consensus-types/primitives:go_default_library",
"//consensus-types/wrapper:go_default_library",
"//container/leaky-bucket:go_default_library",
"//crypto/bls:go_default_library",
"//crypto/rand:go_default_library",
"//encoding/bytesutil:go_default_library",
@ -223,7 +224,6 @@ go_test(
"@com_github_ethereum_go_ethereum//core/types:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/enr:go_default_library",
"@com_github_golang_snappy//:go_default_library",
"@com_github_kevinms_leakybucket_go//:go_default_library",
"@com_github_libp2p_go_libp2p//core:go_default_library",
"@com_github_libp2p_go_libp2p//core/network:go_default_library",
"@com_github_libp2p_go_libp2p//core/peer:go_default_library",

View File

@ -31,13 +31,13 @@ go_library(
"//config/params:go_default_library",
"//consensus-types/interfaces:go_default_library",
"//consensus-types/primitives:go_default_library",
"//container/leaky-bucket:go_default_library",
"//crypto/rand:go_default_library",
"//math:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//runtime:go_default_library",
"//time:go_default_library",
"//time/slots:go_default_library",
"@com_github_kevinms_leakybucket_go//:go_default_library",
"@com_github_libp2p_go_libp2p//core/peer:go_default_library",
"@com_github_paulbellamy_ratecounter//:go_default_library",
"@com_github_pkg_errors//:go_default_library",
@ -83,7 +83,6 @@ go_test(
"//testing/util:go_default_library",
"//time:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/enr:go_default_library",
"@com_github_kevinms_leakybucket_go//:go_default_library",
"@com_github_libp2p_go_libp2p//core:go_default_library",
"@com_github_libp2p_go_libp2p//core/network:go_default_library",
"@com_github_libp2p_go_libp2p//core/peer:go_default_library",
@ -126,6 +125,7 @@ go_test(
"//consensus-types/blocks:go_default_library",
"//consensus-types/interfaces:go_default_library",
"//consensus-types/primitives:go_default_library",
"//container/leaky-bucket:go_default_library",
"//container/slice:go_default_library",
"//crypto/hash:go_default_library",
"//encoding/bytesutil:go_default_library",
@ -136,7 +136,6 @@ go_test(
"//time:go_default_library",
"//time/slots:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/enr:go_default_library",
"@com_github_kevinms_leakybucket_go//:go_default_library",
"@com_github_libp2p_go_libp2p//core:go_default_library",
"@com_github_libp2p_go_libp2p//core/network:go_default_library",
"@com_github_libp2p_go_libp2p//core/peer:go_default_library",

View File

@ -6,7 +6,6 @@ import (
"sync"
"time"
"github.com/kevinms/leakybucket-go"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/db"
@ -17,6 +16,7 @@ import (
"github.com/prysmaticlabs/prysm/v3/config/params"
"github.com/prysmaticlabs/prysm/v3/consensus-types/interfaces"
types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives"
leakybucket "github.com/prysmaticlabs/prysm/v3/container/leaky-bucket"
"github.com/prysmaticlabs/prysm/v3/crypto/rand"
p2ppb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
"github.com/sirupsen/logrus"
@ -54,6 +54,9 @@ var (
errNoPeersWithAltBlocks = errors.New("no peers with alternative blocks found")
)
// Period to calculate expected limit for a single peer.
var blockLimiterPeriod = 30 * time.Second
// blocksFetcherConfig is a config to setup the block fetcher.
type blocksFetcherConfig struct {
chain blockchainService
@ -114,7 +117,7 @@ func newBlocksFetcher(ctx context.Context, cfg *blocksFetcherConfig) *blocksFetc
// Allow fetcher to go almost to the full burst capacity (less a single batch).
rateLimiter := leakybucket.NewCollector(
float64(blocksPerSecond), int64(allowedBlocksBurst-blocksPerSecond),
false /* deleteEmptyBuckets */)
blockLimiterPeriod, false /* deleteEmptyBuckets */)
capacityWeight := cfg.peerFilterCapacityWeight
if capacityWeight >= 1 {

View File

@ -8,11 +8,11 @@ import (
"testing"
"time"
"github.com/kevinms/leakybucket-go"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/peers/scorers"
"github.com/prysmaticlabs/prysm/v3/cmd/beacon-chain/flags"
types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives"
leakybucket "github.com/prysmaticlabs/prysm/v3/container/leaky-bucket"
"github.com/prysmaticlabs/prysm/v3/testing/assert"
"github.com/prysmaticlabs/prysm/v3/testing/require"
prysmTime "github.com/prysmaticlabs/prysm/v3/time"
@ -237,7 +237,7 @@ func TestBlocksFetcher_filterPeers(t *testing.T) {
peerFilterCapacityWeight: tt.args.capacityWeight,
})
// Non-leaking bucket, with initial capacity of 10000.
fetcher.rateLimiter = leakybucket.NewCollector(0.000001, 10000, false)
fetcher.rateLimiter = leakybucket.NewCollector(0.000001, 10000, 1*time.Second, false)
peerIDs := make([]peer.ID, 0)
for _, pid := range tt.args.peers {
peerIDs = append(peerIDs, pid.ID)

View File

@ -8,7 +8,6 @@ import (
"testing"
"time"
"github.com/kevinms/leakybucket-go"
libp2pcore "github.com/libp2p/go-libp2p/core"
"github.com/libp2p/go-libp2p/core/network"
mock "github.com/prysmaticlabs/prysm/v3/beacon-chain/blockchain/testing"
@ -21,6 +20,7 @@ import (
"github.com/prysmaticlabs/prysm/v3/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v3/consensus-types/interfaces"
types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives"
leakybucket "github.com/prysmaticlabs/prysm/v3/container/leaky-bucket"
"github.com/prysmaticlabs/prysm/v3/container/slice"
ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v3/testing/assert"
@ -547,7 +547,7 @@ func TestBlocksFetcher_RequestBlocksRateLimitingLocks(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{p2p: p1})
fetcher.rateLimiter = leakybucket.NewCollector(float64(req.Count), int64(req.Count*burstFactor), false)
fetcher.rateLimiter = leakybucket.NewCollector(float64(req.Count), int64(req.Count*burstFactor), 1*time.Second, false)
fetcher.chain = &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}}
hook := logTest.NewGlobal()
wg := new(sync.WaitGroup)
@ -842,7 +842,7 @@ func TestBlocksFetcher_requestBlocksFromPeerReturningInvalidBlocks(t *testing.T)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{p2p: p1, chain: &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}}})
fetcher.rateLimiter = leakybucket.NewCollector(0.000001, 640, false)
fetcher.rateLimiter = leakybucket.NewCollector(0.000001, 640, 1*time.Second, false)
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {

View File

@ -7,7 +7,6 @@ import (
"testing"
"time"
"github.com/kevinms/leakybucket-go"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
mock "github.com/prysmaticlabs/prysm/v3/beacon-chain/blockchain/testing"
@ -18,6 +17,7 @@ import (
"github.com/prysmaticlabs/prysm/v3/config/params"
"github.com/prysmaticlabs/prysm/v3/consensus-types/blocks"
types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives"
leakybucket "github.com/prysmaticlabs/prysm/v3/container/leaky-bucket"
"github.com/prysmaticlabs/prysm/v3/encoding/bytesutil"
ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v3/testing/assert"
@ -59,7 +59,7 @@ func TestBlocksFetcher_nonSkippedSlotAfter(t *testing.T) {
p2p: p2p,
},
)
fetcher.rateLimiter = leakybucket.NewCollector(6400, 6400, false)
fetcher.rateLimiter = leakybucket.NewCollector(6400, 6400, 1*time.Second, false)
seekSlots := map[types.Slot]types.Slot{
0: 1,
10: 11,
@ -191,7 +191,7 @@ func TestBlocksFetcher_findFork(t *testing.T) {
db: beaconDB,
},
)
fetcher.rateLimiter = leakybucket.NewCollector(6400, 6400, false)
fetcher.rateLimiter = leakybucket.NewCollector(6400, 6400, 1*time.Second, false)
// Consume all chain1 blocks from many peers (alternative fork will be featured by a single peer,
// and should still be enough to explore alternative paths).
@ -339,7 +339,7 @@ func TestBlocksFetcher_findForkWithPeer(t *testing.T) {
db: beaconDB,
},
)
fetcher.rateLimiter = leakybucket.NewCollector(6400, 6400, false)
fetcher.rateLimiter = leakybucket.NewCollector(6400, 6400, 1*time.Second, false)
for _, blk := range knownBlocks {
util.SaveBlock(t, ctx, beaconDB, blk)
@ -455,7 +455,7 @@ func TestBlocksFetcher_findAncestor(t *testing.T) {
db: beaconDB,
},
)
fetcher.rateLimiter = leakybucket.NewCollector(6400, 6400, false)
fetcher.rateLimiter = leakybucket.NewCollector(6400, 6400, 1*time.Second, false)
pcl := fmt.Sprintf("%s/ssz_snappy", p2pm.RPCBlocksByRootTopicV1)
t.Run("error on request", func(t *testing.T) {

View File

@ -6,7 +6,6 @@ import (
"testing"
"time"
"github.com/kevinms/leakybucket-go"
"github.com/libp2p/go-libp2p/core/peer"
mock "github.com/prysmaticlabs/prysm/v3/beacon-chain/blockchain/testing"
dbtest "github.com/prysmaticlabs/prysm/v3/beacon-chain/db/testing"
@ -17,6 +16,7 @@ import (
"github.com/prysmaticlabs/prysm/v3/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v3/consensus-types/interfaces"
types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives"
leakybucket "github.com/prysmaticlabs/prysm/v3/container/leaky-bucket"
"github.com/prysmaticlabs/prysm/v3/container/slice"
"github.com/prysmaticlabs/prysm/v3/encoding/bytesutil"
eth "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
@ -134,6 +134,11 @@ func TestBlocksQueue_InitStartStop(t *testing.T) {
}
func TestBlocksQueue_Loop(t *testing.T) {
currentPeriod := blockLimiterPeriod
blockLimiterPeriod = 1 * time.Second
defer func() {
blockLimiterPeriod = currentPeriod
}()
tests := []struct {
name string
highestExpectedSlot types.Slot
@ -1075,7 +1080,7 @@ func TestBlocksQueue_stuckInUnfavourableFork(t *testing.T) {
db: beaconDB,
},
)
fetcher.rateLimiter = leakybucket.NewCollector(6400, 6400, false)
fetcher.rateLimiter = leakybucket.NewCollector(6400, 6400, 1*time.Second, false)
queue := newBlocksQueue(ctx, &blocksQueueConfig{
blocksFetcher: fetcher,
@ -1295,7 +1300,7 @@ func TestBlocksQueue_stuckWhenHeadIsSetToOrphanedBlock(t *testing.T) {
db: beaconDB,
},
)
fetcher.rateLimiter = leakybucket.NewCollector(6400, 6400, false)
fetcher.rateLimiter = leakybucket.NewCollector(6400, 6400, 1*time.Second, false)
// Connect peer that has all the blocks available.
allBlocksPeer := connectPeerHavingBlocks(t, p2p, chain, finalizedSlot, p2p.Peers())

View File

@ -22,6 +22,11 @@ import (
)
func TestService_roundRobinSync(t *testing.T) {
currentPeriod := blockLimiterPeriod
blockLimiterPeriod = 1 * time.Second
defer func() {
blockLimiterPeriod = currentPeriod
}()
tests := []struct {
name string
currentSlot types.Slot
@ -488,6 +493,11 @@ func TestService_processBlockBatch(t *testing.T) {
}
func TestService_blockProviderScoring(t *testing.T) {
currentPeriod := blockLimiterPeriod
blockLimiterPeriod = 1 * time.Second
defer func() {
blockLimiterPeriod = currentPeriod
}()
cache.initializeRootCache(makeSequence(1, 640), t)
p := p2pt.NewTestP2P(t)

View File

@ -3,19 +3,25 @@ package sync
import (
"reflect"
"sync"
"time"
"github.com/kevinms/leakybucket-go"
"github.com/libp2p/go-libp2p/core/network"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p"
p2ptypes "github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/types"
"github.com/prysmaticlabs/prysm/v3/cmd/beacon-chain/flags"
leakybucket "github.com/prysmaticlabs/prysm/v3/container/leaky-bucket"
"github.com/sirupsen/logrus"
"github.com/trailofbits/go-mutexasserts"
)
const defaultBurstLimit = 5
const leakyBucketPeriod = 1 * time.Second
// Only allow in 2 batches per minute.
const blockBucketPeriod = 30 * time.Second
// Dummy topic to validate all incoming rpc requests.
const rpcLimiterTopic = "rpc-limiter-topic"
@ -39,19 +45,19 @@ func newRateLimiter(p2pProvider p2p.P2P) *limiter {
// Set topic map for all rpc topics.
topicMap := make(map[string]*leakybucket.Collector, len(p2p.RPCTopicMappings))
// Goodbye Message
topicMap[addEncoding(p2p.RPCGoodByeTopicV1)] = leakybucket.NewCollector(1, 1, false /* deleteEmptyBuckets */)
topicMap[addEncoding(p2p.RPCGoodByeTopicV1)] = leakybucket.NewCollector(1, 1, leakyBucketPeriod, false /* deleteEmptyBuckets */)
// MetadataV0 Message
topicMap[addEncoding(p2p.RPCMetaDataTopicV1)] = leakybucket.NewCollector(1, defaultBurstLimit, false /* deleteEmptyBuckets */)
topicMap[addEncoding(p2p.RPCMetaDataTopicV2)] = leakybucket.NewCollector(1, defaultBurstLimit, false /* deleteEmptyBuckets */)
topicMap[addEncoding(p2p.RPCMetaDataTopicV1)] = leakybucket.NewCollector(1, defaultBurstLimit, leakyBucketPeriod, false /* deleteEmptyBuckets */)
topicMap[addEncoding(p2p.RPCMetaDataTopicV2)] = leakybucket.NewCollector(1, defaultBurstLimit, leakyBucketPeriod, false /* deleteEmptyBuckets */)
// Ping Message
topicMap[addEncoding(p2p.RPCPingTopicV1)] = leakybucket.NewCollector(1, defaultBurstLimit, false /* deleteEmptyBuckets */)
topicMap[addEncoding(p2p.RPCPingTopicV1)] = leakybucket.NewCollector(1, defaultBurstLimit, leakyBucketPeriod, false /* deleteEmptyBuckets */)
// Status Message
topicMap[addEncoding(p2p.RPCStatusTopicV1)] = leakybucket.NewCollector(1, defaultBurstLimit, false /* deleteEmptyBuckets */)
topicMap[addEncoding(p2p.RPCStatusTopicV1)] = leakybucket.NewCollector(1, defaultBurstLimit, leakyBucketPeriod, false /* deleteEmptyBuckets */)
// Use a single collector for block requests
blockCollector := leakybucket.NewCollector(allowedBlocksPerSecond, allowedBlocksBurst, false /* deleteEmptyBuckets */)
blockCollector := leakybucket.NewCollector(allowedBlocksPerSecond, allowedBlocksBurst, blockBucketPeriod, false /* deleteEmptyBuckets */)
// Collector for V2
blockCollectorV2 := leakybucket.NewCollector(allowedBlocksPerSecond, allowedBlocksBurst, false /* deleteEmptyBuckets */)
blockCollectorV2 := leakybucket.NewCollector(allowedBlocksPerSecond, allowedBlocksBurst, blockBucketPeriod, false /* deleteEmptyBuckets */)
// BlocksByRoots requests
topicMap[addEncoding(p2p.RPCBlocksByRootTopicV1)] = blockCollector
@ -62,7 +68,7 @@ func newRateLimiter(p2pProvider p2p.P2P) *limiter {
topicMap[addEncoding(p2p.RPCBlocksByRangeTopicV2)] = blockCollectorV2
// General topic for all rpc requests.
topicMap[rpcLimiterTopic] = leakybucket.NewCollector(5, defaultBurstLimit*2, false /* deleteEmptyBuckets */)
topicMap[rpcLimiterTopic] = leakybucket.NewCollector(5, defaultBurstLimit*2, leakyBucketPeriod, false /* deleteEmptyBuckets */)
return &limiter{limiterMap: topicMap, p2p: p2pProvider}
}

View File

@ -10,7 +10,6 @@ import (
"github.com/ethereum/go-ethereum/common"
gethTypes "github.com/ethereum/go-ethereum/core/types"
"github.com/kevinms/leakybucket-go"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/protocol"
chainMock "github.com/prysmaticlabs/prysm/v3/beacon-chain/blockchain/testing"
@ -28,6 +27,7 @@ import (
"github.com/prysmaticlabs/prysm/v3/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v3/consensus-types/interfaces"
types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives"
leakybucket "github.com/prysmaticlabs/prysm/v3/container/leaky-bucket"
"github.com/prysmaticlabs/prysm/v3/encoding/bytesutil"
enginev1 "github.com/prysmaticlabs/prysm/v3/proto/engine/v1"
ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
@ -67,7 +67,7 @@ func TestRPCBeaconBlocksByRange_RPCHandlerReturnsBlocks(t *testing.T) {
r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)}
pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1)
topic := string(pcl)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, int64(req.Count*10), false)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, int64(req.Count*10), time.Second, false)
var wg sync.WaitGroup
wg.Add(1)
p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) {
@ -129,7 +129,7 @@ func TestRPCBeaconBlocksByRange_ReturnCorrectNumberBack(t *testing.T) {
r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)}
pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1)
topic := string(pcl)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, int64(req.Count*10), false)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, int64(req.Count*10), time.Second, false)
var wg sync.WaitGroup
wg.Add(1)
@ -246,7 +246,7 @@ func TestRPCBeaconBlocksByRange_ReconstructsPayloads(t *testing.T) {
}
pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1)
topic := string(pcl)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, int64(req.Count*10), false)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, int64(req.Count*10), time.Second, false)
var wg sync.WaitGroup
wg.Add(1)
@ -314,7 +314,7 @@ func TestRPCBeaconBlocksByRange_RPCHandlerReturnsSortedBlocks(t *testing.T) {
r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)}
pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1)
topic := string(pcl)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, int64(req.Count*10), false)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, int64(req.Count*10), time.Second, false)
var wg sync.WaitGroup
wg.Add(1)
@ -379,7 +379,7 @@ func TestRPCBeaconBlocksByRange_ReturnsGenesisBlock(t *testing.T) {
r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)}
pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1)
topic := string(pcl)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(10000, 10000, false)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(10000, 10000, time.Second, false)
var wg sync.WaitGroup
wg.Add(1)
@ -471,7 +471,7 @@ func TestRPCBeaconBlocksByRange_RPCHandlerRateLimitOverflow(t *testing.T) {
pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1)
topic := string(pcl)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, capacity, false)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, capacity, time.Second, false)
req := &ethpb.BeaconBlocksByRangeRequest{
StartSlot: 100,
Step: 5,
@ -497,7 +497,7 @@ func TestRPCBeaconBlocksByRange_RPCHandlerRateLimitOverflow(t *testing.T) {
pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1)
topic := string(pcl)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, capacity, false)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, capacity, time.Second, false)
req := &ethpb.BeaconBlocksByRangeRequest{
StartSlot: 100,
@ -526,7 +526,7 @@ func TestRPCBeaconBlocksByRange_RPCHandlerRateLimitOverflow(t *testing.T) {
r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)}
pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1)
topic := string(pcl)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, capacity, false)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, capacity, time.Second, false)
req := &ethpb.BeaconBlocksByRangeRequest{
StartSlot: 100,
@ -723,7 +723,7 @@ func TestRPCBeaconBlocksByRange_EnforceResponseInvariants(t *testing.T) {
assert.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected")
r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)}
r.rateLimiter.limiterMap[string(pcl)] = leakybucket.NewCollector(0.000001, 640, false)
r.rateLimiter.limiterMap[string(pcl)] = leakybucket.NewCollector(0.000001, 640, time.Second, false)
req := &ethpb.BeaconBlocksByRangeRequest{
StartSlot: 448,
Step: 1,
@ -891,7 +891,7 @@ func TestRPCBeaconBlocksByRange_FilterBlocks(t *testing.T) {
assert.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected")
r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)}
r.rateLimiter.limiterMap[string(pcl)] = leakybucket.NewCollector(0.000001, 640, false)
r.rateLimiter.limiterMap[string(pcl)] = leakybucket.NewCollector(0.000001, 640, time.Second, false)
req := &ethpb.BeaconBlocksByRangeRequest{
StartSlot: 1,
Step: 1,
@ -922,7 +922,7 @@ func TestRPCBeaconBlocksByRange_FilterBlocks(t *testing.T) {
assert.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected")
r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)}
r.rateLimiter.limiterMap[string(pcl)] = leakybucket.NewCollector(0.000001, 640, false)
r.rateLimiter.limiterMap[string(pcl)] = leakybucket.NewCollector(0.000001, 640, time.Second, false)
req := &ethpb.BeaconBlocksByRangeRequest{
StartSlot: 1,
Step: 1,
@ -957,7 +957,7 @@ func TestRPCBeaconBlocksByRange_FilterBlocks(t *testing.T) {
assert.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected")
r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)}
r.rateLimiter.limiterMap[string(pcl)] = leakybucket.NewCollector(0.000001, 640, false)
r.rateLimiter.limiterMap[string(pcl)] = leakybucket.NewCollector(0.000001, 640, time.Second, false)
req := &ethpb.BeaconBlocksByRangeRequest{
StartSlot: 1,
Step: 1,
@ -992,7 +992,7 @@ func TestRPCBeaconBlocksByRange_FilterBlocks(t *testing.T) {
assert.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected")
r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)}
r.rateLimiter.limiterMap[string(pcl)] = leakybucket.NewCollector(0.000001, 640, false)
r.rateLimiter.limiterMap[string(pcl)] = leakybucket.NewCollector(0.000001, 640, time.Second, false)
req := &ethpb.BeaconBlocksByRangeRequest{
StartSlot: 1,
Step: 1,
@ -1032,7 +1032,7 @@ func TestRPCBeaconBlocksByRange_FilterBlocks(t *testing.T) {
assert.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected")
r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)}
r.rateLimiter.limiterMap[string(pcl)] = leakybucket.NewCollector(0.000001, 640, false)
r.rateLimiter.limiterMap[string(pcl)] = leakybucket.NewCollector(0.000001, 640, time.Second, false)
req := &ethpb.BeaconBlocksByRangeRequest{
StartSlot: 1,
Step: 1,

View File

@ -9,7 +9,6 @@ import (
"github.com/ethereum/go-ethereum/common"
gethTypes "github.com/ethereum/go-ethereum/core/types"
"github.com/kevinms/leakybucket-go"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/protocol"
gcache "github.com/patrickmn/go-cache"
@ -24,6 +23,7 @@ import (
"github.com/prysmaticlabs/prysm/v3/config/params"
"github.com/prysmaticlabs/prysm/v3/consensus-types/blocks"
types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives"
leakybucket "github.com/prysmaticlabs/prysm/v3/container/leaky-bucket"
"github.com/prysmaticlabs/prysm/v3/encoding/bytesutil"
enginev1 "github.com/prysmaticlabs/prysm/v3/proto/engine/v1"
ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
@ -54,7 +54,7 @@ func TestRecentBeaconBlocksRPCHandler_ReturnsBlocks(t *testing.T) {
r.cfg.chain = &mock.ChainService{ValidatorsRoot: [32]byte{}}
pcl := protocol.ID(p2p.RPCBlocksByRootTopicV1)
topic := string(pcl)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(10000, 10000, false)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(10000, 10000, time.Second, false)
var wg sync.WaitGroup
wg.Add(1)
@ -152,7 +152,7 @@ func TestRecentBeaconBlocksRPCHandler_ReturnsBlocks_ReconstructsPayload(t *testi
r.cfg.chain = &mock.ChainService{ValidatorsRoot: [32]byte{}}
pcl := protocol.ID(p2p.RPCBlocksByRootTopicV1)
topic := string(pcl)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(10000, 10000, false)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(10000, 10000, time.Second, false)
var wg sync.WaitGroup
wg.Add(1)
@ -224,7 +224,7 @@ func TestRecentBeaconBlocks_RPCRequestSent(t *testing.T) {
// Setup streams
pcl := protocol.ID("/eth2/beacon_chain/req/beacon_blocks_by_root/1/ssz_snappy")
topic := string(pcl)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(10000, 10000, false)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(10000, 10000, time.Second, false)
var wg sync.WaitGroup
wg.Add(1)
@ -261,7 +261,7 @@ func TestRecentBeaconBlocksRPCHandler_HandleZeroBlocks(t *testing.T) {
r := &Service{cfg: &config{p2p: p1, beaconDB: d}, rateLimiter: newRateLimiter(p1)}
pcl := protocol.ID(p2p.RPCBlocksByRootTopicV1)
topic := string(pcl)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, false)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, time.Second, false)
var wg sync.WaitGroup
wg.Add(1)

View File

@ -6,7 +6,6 @@ import (
"testing"
"time"
"github.com/kevinms/leakybucket-go"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/protocol"
mock "github.com/prysmaticlabs/prysm/v3/beacon-chain/blockchain/testing"
@ -15,6 +14,7 @@ import (
p2ptypes "github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/types"
"github.com/prysmaticlabs/prysm/v3/config/params"
types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives"
leakybucket "github.com/prysmaticlabs/prysm/v3/container/leaky-bucket"
"github.com/prysmaticlabs/prysm/v3/testing/assert"
"github.com/prysmaticlabs/prysm/v3/testing/require"
"github.com/prysmaticlabs/prysm/v3/testing/util"
@ -44,7 +44,7 @@ func TestGoodByeRPCHandler_Disconnects_With_Peer(t *testing.T) {
// Setup streams
pcl := protocol.ID("/testing")
topic := string(pcl)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, false)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, time.Second, false)
var wg sync.WaitGroup
wg.Add(1)
p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) {
@ -89,7 +89,7 @@ func TestGoodByeRPCHandler_BackOffPeer(t *testing.T) {
// Setup streams
pcl := protocol.ID("/testing")
topic := string(pcl)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, false)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, time.Second, false)
var wg sync.WaitGroup
wg.Add(1)
p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) {
@ -166,7 +166,7 @@ func TestSendGoodbye_SendsMessage(t *testing.T) {
// Setup streams
pcl := protocol.ID("/eth2/beacon_chain/req/goodbye/1/ssz_snappy")
topic := string(pcl)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, false)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, time.Second, false)
var wg sync.WaitGroup
wg.Add(1)
p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) {
@ -211,7 +211,7 @@ func TestSendGoodbye_DisconnectWithPeer(t *testing.T) {
// Setup streams
pcl := protocol.ID("/eth2/beacon_chain/req/goodbye/1/ssz_snappy")
topic := string(pcl)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, false)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, time.Second, false)
var wg sync.WaitGroup
wg.Add(1)
p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) {

View File

@ -7,7 +7,6 @@ import (
"testing"
"time"
"github.com/kevinms/leakybucket-go"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/blockchain"
@ -18,6 +17,7 @@ import (
p2ptest "github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/testing"
"github.com/prysmaticlabs/prysm/v3/config/params"
"github.com/prysmaticlabs/prysm/v3/consensus-types/wrapper"
leakybucket "github.com/prysmaticlabs/prysm/v3/container/leaky-bucket"
"github.com/prysmaticlabs/prysm/v3/encoding/ssz/equality"
pb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1/metadata"
@ -53,7 +53,7 @@ func TestMetaDataRPCHandler_ReceivesMetadata(t *testing.T) {
// Setup streams
pcl := protocol.ID(p2p.RPCMetaDataTopicV1)
topic := string(pcl)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, false)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, time.Second, false)
var wg sync.WaitGroup
wg.Add(1)
p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) {
@ -112,8 +112,8 @@ func TestMetadataRPCHandler_SendsMetadata(t *testing.T) {
// Setup streams
pcl := protocol.ID(p2p.RPCMetaDataTopicV1 + r.cfg.p2p.Encoding().ProtocolSuffix())
topic := string(pcl)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, false)
r2.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, false)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, time.Second, false)
r2.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, time.Second, false)
var wg sync.WaitGroup
wg.Add(1)
@ -179,8 +179,8 @@ func TestMetadataRPCHandler_SendsMetadataAltair(t *testing.T) {
// Setup streams
pcl := protocol.ID(p2p.RPCMetaDataTopicV2 + r.cfg.p2p.Encoding().ProtocolSuffix())
topic := string(pcl)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(2, 2, false)
r2.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(2, 2, false)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(2, 2, time.Second, false)
r2.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(2, 2, time.Second, false)
var wg sync.WaitGroup
wg.Add(1)

View File

@ -7,7 +7,6 @@ import (
"time"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/kevinms/leakybucket-go"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/protocol"
mock "github.com/prysmaticlabs/prysm/v3/beacon-chain/blockchain/testing"
@ -17,6 +16,7 @@ import (
p2ptypes "github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/types"
types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v3/consensus-types/wrapper"
leakybucket "github.com/prysmaticlabs/prysm/v3/container/leaky-bucket"
pb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v3/testing/assert"
"github.com/prysmaticlabs/prysm/v3/testing/require"
@ -54,7 +54,7 @@ func TestPingRPCHandler_ReceivesPing(t *testing.T) {
// Setup streams
pcl := protocol.ID(p2p.RPCPingTopicV1)
topic := string(pcl)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, false)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, time.Second, false)
var wg sync.WaitGroup
wg.Add(1)
p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) {
@ -123,7 +123,7 @@ func TestPingRPCHandler_SendsPing(t *testing.T) {
// Setup streams
pcl := protocol.ID("/eth2/beacon_chain/req/ping/1/ssz_snappy")
topic := string(pcl)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, false)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, time.Second, false)
var wg sync.WaitGroup
wg.Add(1)
@ -183,7 +183,7 @@ func TestPingRPCHandler_BadSequenceNumber(t *testing.T) {
// Setup streams
pcl := protocol.ID("/testing")
topic := string(pcl)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, false)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, time.Second, false)
var wg sync.WaitGroup
wg.Add(1)
p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) {

View File

@ -7,7 +7,6 @@ import (
"time"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/kevinms/leakybucket-go"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/protocol"
mock "github.com/prysmaticlabs/prysm/v3/beacon-chain/blockchain/testing"
@ -25,6 +24,7 @@ import (
"github.com/prysmaticlabs/prysm/v3/consensus-types/interfaces"
types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v3/consensus-types/wrapper"
leakybucket "github.com/prysmaticlabs/prysm/v3/container/leaky-bucket"
"github.com/prysmaticlabs/prysm/v3/encoding/bytesutil"
ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v3/testing/assert"
@ -62,7 +62,7 @@ func TestStatusRPCHandler_Disconnects_OnForkVersionMismatch(t *testing.T) {
}
pcl := protocol.ID(p2p.RPCStatusTopicV1)
topic := string(pcl)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, false)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, time.Second, false)
var wg sync.WaitGroup
wg.Add(1)
@ -77,7 +77,7 @@ func TestStatusRPCHandler_Disconnects_OnForkVersionMismatch(t *testing.T) {
pcl2 := protocol.ID("/eth2/beacon_chain/req/goodbye/1/ssz_snappy")
topic = string(pcl2)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, false)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, time.Second, false)
var wg2 sync.WaitGroup
wg2.Add(1)
p2.BHost.SetStreamHandler(pcl2, func(stream network.Stream) {
@ -130,7 +130,7 @@ func TestStatusRPCHandler_ConnectsOnGenesis(t *testing.T) {
}
pcl := protocol.ID(p2p.RPCStatusTopicV1)
topic := string(pcl)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, false)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, time.Second, false)
var wg sync.WaitGroup
wg.Add(1)
@ -214,7 +214,7 @@ func TestStatusRPCHandler_ReturnsHelloMessage(t *testing.T) {
// Setup streams
pcl := protocol.ID(p2p.RPCStatusTopicV1)
topic := string(pcl)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, false)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, time.Second, false)
var wg sync.WaitGroup
wg.Add(1)
p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) {
@ -317,7 +317,7 @@ func TestHandshakeHandlers_Roundtrip(t *testing.T) {
// Setup streams
pcl := protocol.ID("/eth2/beacon_chain/req/status/1/ssz_snappy")
topic := string(pcl)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, false)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, time.Second, false)
var wg sync.WaitGroup
wg.Add(1)
p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) {
@ -339,7 +339,7 @@ func TestHandshakeHandlers_Roundtrip(t *testing.T) {
pcl = "/eth2/beacon_chain/req/ping/1/ssz_snappy"
topic = string(pcl)
r2.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, false)
r2.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, time.Second, false)
var wg2 sync.WaitGroup
wg2.Add(1)
p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) {
@ -434,7 +434,7 @@ func TestStatusRPCRequest_RequestSent(t *testing.T) {
// Setup streams
pcl := protocol.ID("/eth2/beacon_chain/req/status/1/ssz_snappy")
topic := string(pcl)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, false)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, time.Second, false)
var wg sync.WaitGroup
wg.Add(1)
p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) {
@ -542,7 +542,7 @@ func TestStatusRPCRequest_FinalizedBlockExists(t *testing.T) {
// Setup streams
pcl := protocol.ID("/eth2/beacon_chain/req/status/1/ssz_snappy")
topic := string(pcl)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, false)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, time.Second, false)
var wg sync.WaitGroup
wg.Add(1)
p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) {
@ -726,7 +726,7 @@ func TestStatusRPCRequest_FinalizedBlockSkippedSlots(t *testing.T) {
// Setup streams
pcl := protocol.ID("/eth2/beacon_chain/req/status/1/ssz_snappy")
topic := string(pcl)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, false)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, time.Second, false)
var wg sync.WaitGroup
wg.Add(1)
p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) {
@ -795,7 +795,7 @@ func TestStatusRPCRequest_BadPeerHandshake(t *testing.T) {
// Setup streams
pcl := protocol.ID("/eth2/beacon_chain/req/status/1/ssz_snappy")
topic := string(pcl)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, false)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, time.Second, false)
var wg sync.WaitGroup
wg.Add(1)
p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) {

View File

@ -0,0 +1,22 @@
load("@prysm//tools/go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"collector.go",
"heap.go",
"leakybucket.go",
],
importpath = "github.com/prysmaticlabs/prysm/v3/container/leaky-bucket",
visibility = ["//visibility:public"],
)
go_test(
name = "go_default_test",
srcs = [
"collector_test.go",
"heap_test.go",
"leakybucket_test.go",
],
embed = [":go_default_library"],
)

View File

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2019 kevinms
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@ -0,0 +1,202 @@
package leakybucket
import (
"container/heap"
"sync"
"time"
)
//TODO: Finer grained locking.
type bucketMap map[string]*LeakyBucket
// A Collector can keep track of multiple LeakyBucket's. The caller does not
// directly interact with the buckets, but instead addresses them by a string
// key (e.g. IP address, hostname, hash, etc.) that is passed to most Collector
// methods.
//
// All Collector methods are goroutine safe.
type Collector struct {
buckets bucketMap
heap priorityQueue
rate float64
capacity int64
period time.Duration
lock sync.Mutex
quit chan bool
}
// NewCollector creates a new Collector. When new buckets are created within
// the Collector, they will be assigned the capacity and rate of the Collector.
// A Collector does not provide a way to change the rate or capacity of
// bucket's within it. If different rates or capacities are required, either
// use multiple Collector's or manage your own LeakyBucket's.
//
// If deleteEmptyBuckets is true, a concurrent goroutine will be run that
// watches for bucket's that become empty and automatically removes them,
// freeing up memory resources.
func NewCollector(rate float64, capacity int64, period time.Duration, deleteEmptyBuckets bool) *Collector {
c := &Collector{
buckets: make(bucketMap),
heap: make(priorityQueue, 0, 4096),
rate: rate,
capacity: capacity,
period: period,
quit: make(chan bool),
}
if deleteEmptyBuckets {
c.PeriodicPrune()
}
return c
}
// Free releases the collector's resources. If the collector was created with
// deleteEmptyBuckets = true, then the goroutine looking for empty buckets,
// will be stopped.
func (c *Collector) Free() {
c.Reset()
close(c.quit)
}
// Reset removes all internal buckets and resets the collector back to as if it
// was just created.
func (c *Collector) Reset() {
c.lock.Lock()
defer c.lock.Unlock()
// Let the garbage collector do all the work.
c.buckets = make(bucketMap)
c.heap = make(priorityQueue, 0, 4096)
}
// Capacity returns the collector's capacity.
func (c *Collector) Capacity() int64 {
return c.capacity
}
// Rate returns the collector's rate.
func (c *Collector) Rate() float64 {
return c.rate
}
// Remaining returns the remaining capacity of the internal bucket associated
// with key. If key is not associated with a bucket internally, it is treated
// as being empty.
func (c *Collector) Remaining(key string) int64 {
return c.capacity - c.Count(key)
}
// Count returns the count of the internal bucket associated with key. If key
// is not associated with a bucket internally, it is treated as being empty.
func (c *Collector) Count(key string) int64 {
c.lock.Lock()
defer c.lock.Unlock()
b, ok := c.buckets[key]
if !ok || b == nil {
return 0
}
return b.Count()
}
// TillEmpty returns how much time must pass until the internal bucket
// associated with key is empty. If key is not associated with a bucket
// internally, it is treated as being empty.
func (c *Collector) TillEmpty(key string) time.Duration {
c.lock.Lock()
defer c.lock.Unlock()
b, ok := c.buckets[key]
if !ok || b == nil {
return 0
}
return b.TillEmpty()
}
// Remove deletes the internal bucket associated with key. If key is not
// associated with a bucket internally, nothing is done.
func (c *Collector) Remove(key string) {
c.lock.Lock()
defer c.lock.Unlock()
b, ok := c.buckets[key]
if !ok || b == nil {
return
}
delete(c.buckets, b.key)
heap.Remove(&c.heap, b.index)
}
// Add 'amount' to the internal bucket associated with key, up to it's
// capacity. Returns how much was added to the bucket. If the return is less
// than 'amount', then the bucket's capacity was reached.
//
// If key is not associated with a bucket internally, a new bucket is created
// and amount is added to it.
func (c *Collector) Add(key string, amount int64) int64 {
c.lock.Lock()
defer c.lock.Unlock()
b, ok := c.buckets[key]
if !ok || b == nil {
// Create a new bucket.
b = &LeakyBucket{
key: key,
capacity: c.capacity,
rate: c.rate,
period: c.period,
p: now(),
}
c.heap.Push(b)
c.buckets[key] = b
}
n := b.Add(amount)
if n > 0 {
heap.Fix(&c.heap, b.index)
}
return n
}
// Prune removes all empty buckets in the collector.
func (c *Collector) Prune() {
c.lock.Lock()
for c.heap.Peak() != nil {
b := c.heap.Peak()
if now().Before(b.p) {
// The bucket isn't empty.
break
}
// The bucket should be empty.
delete(c.buckets, b.key)
heap.Remove(&c.heap, b.index)
}
c.lock.Unlock()
}
// PeriodicPrune runs a concurrent goroutine that calls Prune() at the given
// time interval.
func (c *Collector) PeriodicPrune() {
go func() {
ticker := time.NewTicker(c.period)
for {
select {
case <-ticker.C:
c.Prune()
case <-c.quit:
ticker.Stop()
return
}
}
}()
}

View File

@ -0,0 +1,210 @@
package leakybucket
import (
"fmt"
"testing"
"time"
)
func TestNewCollector(t *testing.T) {
rate := 1.0
capacity := int64(2)
c := NewCollector(rate, capacity, time.Second, true)
if c.buckets == nil {
t.Fatal("Didn't initialize priority?!")
}
if c.heap == nil {
t.Fatal("Didn't initialize priority?!")
}
if c.rate != rate || c.Rate() != rate {
t.Fatal("Wrong rate?!")
}
if c.capacity != capacity || c.Capacity() != capacity {
t.Fatal("Wrong capacity?!")
}
c.Free()
}
func TestNewCollector_LargerPeriod(t *testing.T) {
testNow := now
now = time.Now
defer func() {
now = testNow
}()
rate := 10.0
capacity := int64(20)
c := NewCollector(rate, capacity, 5*time.Second, true)
c.Add("test", 10)
c.Add("test", 10)
if c.Remaining("test") != 0 {
t.Errorf("Excess capacity exists of: %d", c.Remaining("test"))
}
time.Sleep(1 * time.Second)
if c.Remaining("test") >= 20 {
t.Errorf("Excess capacity exists in: %d", c.Remaining("test"))
}
time.Sleep(4 * time.Second)
if c.Add("test", 10) != 10 {
t.Errorf("Internal counter not refreshed: %d", c.Count("test"))
}
c.Free()
}
var collectorSimple = testSet{
capacity: int64(5),
rate: 1.0,
set: []actionSet{
{},
{1, "add", 1},
{1, "time-set", time.Nanosecond},
{1, "till", time.Second - time.Nanosecond},
{1, "time-set", time.Second - time.Nanosecond},
{1, "till", time.Nanosecond},
{0, "time-set", time.Second},
{0, "till", time.Duration(0)},
{1, "add", 1},
{1, "time-add", time.Second / 2},
{1, "till", time.Second / 2},
{2, "add", 1},
{2, "time-add", time.Second/2 - time.Nanosecond},
{0, "time-add", time.Second * time.Duration(5)},
{1, "add", 1},
{2, "add", 1},
{3, "add", 1},
{4, "add", 1},
{5, "add", 1},
{5, "add", 1},
{5, "till", time.Second * 5},
},
}
var collectorVaried = testSet{
capacity: 1000,
rate: 60.0,
set: []actionSet{
{},
{100, "add", 100},
{100, "time-set", time.Nanosecond},
{1000, "add", 1000},
{1000, "add", 1},
{940, "time-set", time.Second},
},
}
func runKey(t *testing.T, c *Collector, key string, test *testSet) {
setElapsed(0)
capacity := c.Capacity()
for i, v := range test.set {
switch v.action {
case "add":
count := c.Count(key)
remaining := test.capacity - count
amount := int64(v.value.(int))
n := c.Add(key, amount)
if n < amount {
// The bucket should be full now.
if n < remaining {
t.Fatalf("Test %d: Bucket should have been filled by this Add()?!", i)
}
}
case "time-set":
setElapsed(v.value.(time.Duration))
case "time-add":
addToElapsed(v.value.(time.Duration))
case "till":
dt := c.TillEmpty(key)
if dt != v.value.(time.Duration) {
t.Fatalf("%s -> Test %d: Bad TillEmpty(). Expected %v, got %v", key, i, v.value, dt)
}
}
count := c.Count(key)
if count != v.count {
t.Fatalf("%s -> Test %d: Bad count. Expected %d, got %d", key, i, v.count, count)
}
if count > capacity {
t.Fatalf("%s -> Test %d: Went over capacity?!", key, i)
}
if c.Remaining(key) != test.capacity-v.count {
t.Fatalf("Test %d: Expected remaining value %d, got %d",
i, test.capacity-v.count, c.Remaining(key))
}
}
}
func TestCollector(t *testing.T) {
setElapsed(0)
tests := []testSet{
collectorSimple,
collectorSimple,
collectorVaried,
}
for i, test := range tests {
fmt.Println("Running testSet:", i)
key := "127.0.0.1"
c := NewCollector(test.rate, test.capacity, time.Second, false)
// Run and test Remove()
runKey(t, c, key, &test)
c.Remove(key)
if c.Count(key) > 0 {
t.Fatal("Key still has a count after removal?!")
}
// Run again and test Prune()
runKey(t, c, "127.0.0.1", &test)
c.Prune()
setElapsed(time.Hour)
c.Prune()
// Run again and test Reset().
runKey(t, c, "127.0.0.1", &test)
c.Reset()
if c.Count(key) != 0 {
t.Fatal("Key still has a count after removal?!")
}
if c.TillEmpty(key) != 0 {
t.Fatal("Key still has time till empty?!")
}
// Try to remove a non-exist bucket.
c.Remove("fake")
if c.Count("fake") != 0 {
t.Fatal("Key still has a count after removal?!")
}
}
}
func TestPeriodicPrune(t *testing.T) {
setElapsed(0)
key := "localhost"
c := NewCollector(1e7, 8, time.Second, false)
c.PeriodicPrune()
n := c.Add(key, 100)
if n != 8 {
t.Fatal("Didn't fill bucket?!")
}
fmt.Printf("TillEmpty(): %v\n", c.TillEmpty(key))
// Wait for the periodic prune.
wait := time.Millisecond
time.Sleep(wait)
setElapsed(wait)
count := c.Count(key)
if count != 0 {
t.Fatalf("Key's bucket is not empty: %d?!", count)
}
c.Free()
}

View File

@ -0,0 +1,47 @@
package leakybucket
import "fmt"
// Based on the example implementation of priority queue found in the
// container/heap package docs: https://golang.org/pkg/container/heap/
type priorityQueue []*LeakyBucket
func (pq priorityQueue) Len() int {
return len(pq)
}
func (pq priorityQueue) Peak() *LeakyBucket {
if len(pq) <= 0 {
return nil
}
return pq[0]
}
func (pq priorityQueue) Less(i, j int) bool {
return pq[i].p.Before(pq[j].p)
}
func (pq priorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i
pq[j].index = j
}
func (pq *priorityQueue) Push(x interface{}) {
n := len(*pq)
b, ok := x.(*LeakyBucket)
if !ok {
panic(fmt.Sprintf("%T", x))
}
b.index = n
*pq = append(*pq, b)
}
func (pq *priorityQueue) Pop() interface{} {
old := *pq
n := len(old)
b := old[n-1]
b.index = -1 // for safety
*pq = old[0 : n-1]
return b
}

View File

@ -0,0 +1,107 @@
package leakybucket
import (
"testing"
"time"
)
func TestLen(t *testing.T) {
q := make(priorityQueue, 0, 4096)
if q.Len() != 0 {
t.Fatal("Queue should be empty?!")
}
for i := 1; i <= 5; i++ {
b := NewLeakyBucket(1.0, 5, time.Second)
q.Push(b)
l := q.Len()
if l != i {
t.Fatalf("Expected length %d, got %d", i, l)
}
}
for i := 4; i >= 0; i-- {
q.Pop()
l := q.Len()
if l != i {
t.Fatalf("Expected length %d, got %d", i, l)
}
}
}
func TestPeak(t *testing.T) {
q := make(priorityQueue, 0, 4096)
for i := 0; i < 5; i++ {
b := NewLeakyBucket(1.0, 5, time.Second)
q.Push(b)
}
}
func TestLess(t *testing.T) {
q := make(priorityQueue, 0, 4096)
for i := 0; i < 5; i++ {
b := NewLeakyBucket(1.0, 5, time.Second)
b.p = now().Add(time.Duration(i))
q.Push(b)
}
for i, j := 0, 4; i < 5; i, j = i+1, j-1 {
if i < j && !q.Less(i, j) {
t.Fatal("Less is more?!")
}
}
}
func TestSwap(t *testing.T) {
q := make(priorityQueue, 0, 4096)
for i := 0; i < 5; i++ {
b := NewLeakyBucket(1.0, 5, time.Second)
q.Push(b)
}
i := 2
j := 4
bi := q[i]
bj := q[j]
q.Swap(i, j)
if bi != q[j] || bj != q[i] {
t.Fatal("Element weren't swapped?!")
}
}
func TestPush(t *testing.T) {
q := make(priorityQueue, 0, 4096)
for i := 0; i < 5; i++ {
b := NewLeakyBucket(1.0, 5, time.Second)
q.Push(b)
if b != q[len(q)-1] {
t.Fatal("Push should append to queue.")
}
}
}
func TestPop(t *testing.T) {
q := make(priorityQueue, 0, 4096)
for i := 1; i <= 5; i++ {
b := NewLeakyBucket(1.0, 5, time.Second)
q.Push(b)
}
for i := 1; i <= 5; i++ {
b := q[len(q)-1]
if b != q.Pop() {
t.Fatal("Pop should remove from end of queue.")
}
}
}

View File

@ -0,0 +1,155 @@
/*
Package leakybucket implements a scalable leaky bucket algorithm.
There are at least two different definitions of the leaky bucket algorithm.
This package implements the leaky bucket as a meter. For more details see:
https://en.wikipedia.org/wiki/Leaky_bucket#As_a_meter
This means it is the exact mirror of a token bucket.
// New LeakyBucket that leaks at the rate of 0.5/sec and a total capacity of 10.
b := NewLeakyBucket(0.5, 10)
b.Add(5)
b.Add(5)
// Bucket is now full!
n := b.Add(1)
// n == 0
A Collector is a convenient way to keep track of multiple LeakyBucket's.
Buckets are associated with string keys for fast lookup. It can dynamically
add new buckets and automatically remove them as they become empty, freeing
up resources.
// New Collector that leaks at 1 MiB/sec, a total capacity of 10 MiB and
// automatic removal of bucket's when they become empty.
const megabyte = 1<<20
c := NewCollector(megabyte, megabyte*10, true)
// Attempt to add 100 MiB to a bucket associated with an IP.
n := c.Add("192.168.0.42", megabyte*100)
// 100 MiB is over the capacity, so only 10 MiB is actually added.
// n equals 10 MiB.
*/
package leakybucket
import (
"math"
"time"
)
// Makes it easy to test time based things.
var now = time.Now
// LeakyBucket represents a bucket that leaks at a constant rate.
type LeakyBucket struct {
// The identifying key, used for map lookups.
key string
// How large the bucket is.
capacity int64
// Amount the bucket leaks per time duration.
rate float64
// The priority of the bucket in a min-heap priority queue, where p is the
// exact time the bucket will have leaked enough to be empty. Buckets that
// are empty or will be the soonest are at the top of the heap. This allows
// for quick pruning of empty buckets that scales very well. p is adjusted
// any time an amount is added to the Queue().
p time.Time
// The time duration through which the leaky bucket is
// assessed.
period time.Duration
// The index is maintained by the heap.Interface methods.
index int
}
// NewLeakyBucket creates a new LeakyBucket with the give rate and capacity.
func NewLeakyBucket(rate float64, capacity int64, period time.Duration) *LeakyBucket {
return &LeakyBucket{
rate: rate,
capacity: capacity,
period: period,
p: now(),
}
}
// Count returns the bucket's current count.
func (b *LeakyBucket) Count() int64 {
if !now().Before(b.p) {
return 0
}
nsRemaining := float64(b.p.Sub(now()))
nsPerDrip := float64(b.period) / b.rate
count := int64(math.Ceil(nsRemaining / nsPerDrip))
return count
}
// Rate returns the amount the bucket leaks per second.
func (b *LeakyBucket) Rate() float64 {
return b.rate
}
// Capacity returns the bucket's capacity.
func (b *LeakyBucket) Capacity() int64 {
return b.capacity
}
// Remaining returns the bucket's remaining capacity.
func (b *LeakyBucket) Remaining() int64 {
return b.capacity - b.Count()
}
// ChangeCapacity changes the bucket's capacity.
//
// If the bucket's current count is greater than the new capacity, the count
// will be decreased to match the new capacity.
func (b *LeakyBucket) ChangeCapacity(capacity int64) {
diff := float64(capacity - b.capacity)
if diff < 0 && b.Count() > capacity {
// We are shrinking the capacity and the new bucket size can't hold all
// the current contents. Dump the extra and adjust the time till empty.
nsPerDrip := float64(b.period) / b.rate
b.p = now().Add(time.Duration(nsPerDrip * float64(capacity)))
}
b.capacity = capacity
}
// TillEmpty returns how much time must pass until the bucket is empty.
func (b *LeakyBucket) TillEmpty() time.Duration {
return b.p.Sub(now())
}
// Add 'amount' to the bucket's count, up to it's capacity. Returns how much
// was added to the bucket. If the return is less than 'amount', then the
// bucket's capacity was reached.
func (b *LeakyBucket) Add(amount int64) int64 {
count := b.Count()
if count >= b.capacity {
// The bucket is full.
return 0
}
if !now().Before(b.p) {
// The bucket needs to be reset.
b.p = now()
}
remaining := b.capacity - count
if amount > remaining {
amount = remaining
}
t := time.Duration(float64(b.period) * (float64(amount) / b.rate))
b.p = b.p.Add(t)
return amount
}

View File

@ -0,0 +1,181 @@
package leakybucket
import (
"fmt"
"os"
"sync/atomic"
"testing"
"time"
)
// Arbitrary start time.
var start = time.Date(1990, 1, 2, 0, 0, 0, 0, time.UTC).Round(0)
var elapsed int64
// We provide atomic access to elapsed to avoid data races between multiple
// concurrent goroutines during the tests.
func getElapsed() time.Duration {
return time.Duration(atomic.LoadInt64(&elapsed))
}
func setElapsed(v time.Duration) {
atomic.StoreInt64(&elapsed, int64(v))
}
func addToElapsed(v time.Duration) {
atomic.AddInt64(&elapsed, int64(v))
}
func reset(t *testing.T, c *Collector) {
c.Reset()
setElapsed(0)
}
func TestNewLeakyBucket(t *testing.T) {
rate := 1.0
capacity := int64(5)
b := NewLeakyBucket(rate, capacity, time.Second)
if b.p != now() {
t.Fatal("Didn't initialize priority?!")
}
if b.rate != rate || b.Rate() != rate {
t.Fatal("Wrong rate?!")
}
if b.capacity != capacity || b.Capacity() != capacity {
t.Fatal("Wrong capacity?!")
}
}
type actionSet struct {
count int64
action string
value interface{}
}
type testSet struct {
capacity int64
rate float64
set []actionSet
}
var oneAtaTime = testSet{
capacity: 5,
rate: 1.0,
set: []actionSet{
{},
{1, "add", 1},
{1, "time-set", time.Nanosecond},
{1, "till", time.Second - time.Nanosecond},
{1, "time-set", time.Second - time.Nanosecond},
{1, "till", time.Nanosecond},
{0, "time-set", time.Second},
{0, "till", time.Duration(0)},
// Add a couple.
{1, "add", 1},
{1, "time-add", time.Second / 2},
{1, "till", time.Second / 2},
{2, "add", 1},
{2, "time-add", time.Second/2 - time.Nanosecond},
// Monkey with the capacity and make sure Count()/TillEmpty() are
// adjusted as needed.
{2, "cap", 5 + 1},
{2, "till", time.Second + time.Nanosecond},
{2, "cap", 5 - 1},
{2, "till", time.Second + time.Nanosecond},
{1, "cap", 1},
{1, "till", time.Second},
{1, "cap", 4},
{1, "till", time.Second},
// Test the full cases.
{0, "time-add", time.Second * time.Duration(5)},
{1, "add", 1},
{2, "add", 1},
{3, "add", 1},
{4, "add", 1},
{4, "add", 1},
{4, "till", time.Second * 4},
},
}
var varied = testSet{
capacity: 1000,
rate: 60.0,
set: []actionSet{
{},
{100, "add", 100},
{100, "time-set", time.Nanosecond},
{1000, "add", 1000},
{1000, "add", 1},
{940, "time-set", time.Second},
},
}
func runTest(t *testing.T, test *testSet) {
setElapsed(0)
b := NewLeakyBucket(test.rate, test.capacity, time.Second)
for i, v := range test.set {
switch v.action {
case "add":
count := b.Count()
remaining := test.capacity - count
amount := int64(v.value.(int))
n := b.Add(amount)
if n < amount {
// The bucket should be full now.
if n < remaining {
t.Fatalf("Test %d: Bucket should have been filled by this Add()?!", i)
}
}
case "time-set":
setElapsed(v.value.(time.Duration))
case "cap":
b.ChangeCapacity(int64(v.value.(int)))
test.capacity = b.Capacity()
case "time-add":
addToElapsed(v.value.(time.Duration))
case "till":
dt := b.TillEmpty()
if dt != v.value.(time.Duration) {
t.Fatalf("Test %d: Bad TillEmpty(). Expected %v, got %v", i, v.value, dt)
}
case "debug":
fmt.Println("elapsed:", getElapsed())
fmt.Println("tillEmpty:", b.TillEmpty())
fmt.Println("count:", b.Count())
}
c := b.Count()
if c != v.count {
t.Fatalf("Test %d: Bad count. Expected %d, got %d", i, v.count, c)
}
if c > test.capacity {
t.Fatalf("Test %d: Went over capacity?!", i)
}
if b.Remaining() != test.capacity-v.count {
t.Fatalf("Test %d: Expected remaining value %d, got %d",
i, test.capacity-v.count, b.Remaining())
}
}
}
func TestLeakyBucket(t *testing.T) {
tests := []testSet{
oneAtaTime,
varied,
}
for i, test := range tests {
fmt.Println("Running testSet:", i)
runTest(t, &test)
}
}
func TestMain(m *testing.M) {
// Override what now() function the leakybucket library uses.
// This greatly increases testability!
now = func() time.Time { return start.Add(getElapsed()) }
os.Exit(m.Run())
}

View File

@ -1883,13 +1883,6 @@ def prysm_deps():
version = "v0.0.2",
)
go_repository(
name = "com_github_kevinms_leakybucket_go",
importpath = "github.com/kevinms/leakybucket-go",
sum = "h1:qNtd6alRqd3qOdPrKXMZImV192ngQ0WSh1briEO33Tk=",
version = "v0.0.0-20200115003610-082473db97ca",
)
go_repository(
name = "com_github_kisielk_errcheck",
importpath = "github.com/kisielk/errcheck",

1
go.mod
View File

@ -39,7 +39,6 @@ require (
github.com/joonix/log v0.0.0-20200409080653-9c1d2ceb5f1d
github.com/json-iterator/go v1.1.12
github.com/k0kubun/go-ansi v0.0.0-20180517002512-3bf9e2903213
github.com/kevinms/leakybucket-go v0.0.0-20200115003610-082473db97ca
github.com/kr/pretty v0.3.0
github.com/libp2p/go-libp2p v0.22.0
github.com/libp2p/go-libp2p-pubsub v0.8.0

2
go.sum
View File

@ -609,8 +609,6 @@ github.com/k0kubun/go-ansi v0.0.0-20180517002512-3bf9e2903213 h1:qGQQKEcAR99REcM
github.com/k0kubun/go-ansi v0.0.0-20180517002512-3bf9e2903213/go.mod h1:vNUNkEQ1e29fT/6vq2aBdFsgNPmy8qMdSay1npru+Sw=
github.com/karalabe/usb v0.0.2 h1:M6QQBNxF+CQ8OFvxrT90BA0qBOXymndZnk5q235mFc4=
github.com/karalabe/usb v0.0.2/go.mod h1:Od972xHfMJowv7NGVDiWVxk2zxnWgjLlJzE+F4F7AGU=
github.com/kevinms/leakybucket-go v0.0.0-20200115003610-082473db97ca h1:qNtd6alRqd3qOdPrKXMZImV192ngQ0WSh1briEO33Tk=
github.com/kevinms/leakybucket-go v0.0.0-20200115003610-082473db97ca/go.mod h1:ph+C5vpnCcQvKBwJwKLTK3JLNGnBXYlG7m7JjoC/zYA=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=