From c4eb8c7a166e17275a403ad1980f10f19ed1d348 Mon Sep 17 00:00:00 2001 From: Victor Farazdagi Date: Thu, 7 May 2020 00:29:50 +0300 Subject: [PATCH] Refactors block fetching function to fetcher (#5766) * refactors block fetching function to fetcher * more comments fixed * Merge refs/heads/master into init-sync-upd * Merge refs/heads/master into init-sync-upd * Merge refs/heads/master into init-sync-upd --- .../sync/initial-sync/blocks_fetcher.go | 8 ++- .../sync/initial-sync/blocks_queue.go | 14 +---- beacon-chain/sync/initial-sync/round_robin.go | 63 ++++--------------- beacon-chain/sync/initial-sync/service.go | 30 ++++----- 4 files changed, 33 insertions(+), 82 deletions(-) diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher.go b/beacon-chain/sync/initial-sync/blocks_fetcher.go index cc08429c9..d3e7de35a 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher.go @@ -30,8 +30,14 @@ import ( const ( // maxPendingRequests limits how many concurrent fetch request one can initiate. maxPendingRequests = 8 + // allowedBlocksPerSecond is number of blocks (per peer) fetcher can request per second. + allowedBlocksPerSecond = 32.0 + // blockBatchSize is a limit on number of blocks fetched per request. + blockBatchSize = 32 // peersPercentagePerRequest caps percentage of peers to be used in a request. peersPercentagePerRequest = 0.75 + // handshakePollingInterval is a polling interval for checking the number of received handshakes. + handshakePollingInterval = 5 * time.Second ) var ( @@ -83,7 +89,7 @@ func newBlocksFetcher(ctx context.Context, cfg *blocksFetcherConfig) *blocksFetc rateLimiter := leakybucket.NewCollector( allowedBlocksPerSecond, /* rate */ allowedBlocksPerSecond, /* capacity */ - false /* deleteEmptyBuckets */) + false /* deleteEmptyBuckets */) return &blocksFetcher{ ctx: ctx, diff --git a/beacon-chain/sync/initial-sync/blocks_queue.go b/beacon-chain/sync/initial-sync/blocks_queue.go index ff3b307cc..c09adb178 100644 --- a/beacon-chain/sync/initial-sync/blocks_queue.go +++ b/beacon-chain/sync/initial-sync/blocks_queue.go @@ -31,19 +31,9 @@ var ( errInputNotFetchRequestParams = errors.New("input data is not type *fetchRequestParams") ) -// blocksProvider exposes enough methods for queue to fetch incoming blocks. -type blocksProvider interface { - requestResponses() <-chan *fetchRequestResponse - scheduleRequest(ctx context.Context, start, count uint64) error - nonSkippedSlotAfter(ctx context.Context, slot uint64) (uint64, error) - bestFinalizedSlot() uint64 - start() error - stop() -} - // blocksQueueConfig is a config to setup block queue service. type blocksQueueConfig struct { - blocksFetcher blocksProvider + blocksFetcher *blocksFetcher headFetcher blockchain.HeadFetcher startSlot uint64 highestExpectedSlot uint64 @@ -57,7 +47,7 @@ type blocksQueue struct { cancel context.CancelFunc highestExpectedSlot uint64 state *stateMachine - blocksFetcher blocksProvider + blocksFetcher *blocksFetcher headFetcher blockchain.HeadFetcher fetchedBlocks chan *eth.SignedBeaconBlock // output channel for ready blocks quit chan struct{} // termination notifier diff --git a/beacon-chain/sync/initial-sync/round_robin.go b/beacon-chain/sync/initial-sync/round_robin.go index 16cfd2635..57ba83633 100644 --- a/beacon-chain/sync/initial-sync/round_robin.go +++ b/beacon-chain/sync/initial-sync/round_robin.go @@ -4,20 +4,15 @@ import ( "context" "encoding/hex" "fmt" - "io" "time" - "github.com/libp2p/go-libp2p-core/peer" "github.com/paulbellamy/ratecounter" - "github.com/pkg/errors" eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" "github.com/prysmaticlabs/prysm/beacon-chain/core/feed" blockfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/block" "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" "github.com/prysmaticlabs/prysm/beacon-chain/core/state" - "github.com/prysmaticlabs/prysm/beacon-chain/p2p" "github.com/prysmaticlabs/prysm/beacon-chain/state/stateutil" - prysmsync "github.com/prysmaticlabs/prysm/beacon-chain/sync" p2ppb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" "github.com/prysmaticlabs/prysm/shared/bytesutil" "github.com/prysmaticlabs/prysm/shared/featureconfig" @@ -25,9 +20,12 @@ import ( "github.com/sirupsen/logrus" ) -const blockBatchSize = 32 -const counterSeconds = 20 -const refreshTime = 6 * time.Second +const ( + // counterSeconds is an interval over which an average rate will be calculated. + counterSeconds = 20 + // refreshTime defines an interval at which suitable peer is checked during 2nd phase of sync. + refreshTime = 6 * time.Second +) // Round Robin sync looks at the latest peer statuses and syncs with the highest // finalized peer. @@ -95,12 +93,11 @@ func (s *Service) roundRobinSync(genesis time.Time) error { Count: mathutil.Min(helpers.SlotsSince(genesis)-s.chain.HeadSlot()+1, allowedBlocksPerSecond), Step: 1, } - - log.WithField("req", req).WithField("peer", best.Pretty()).Debug( - "Sending batch block request", - ) - - resp, err := s.requestBlocks(ctx, req, best) + log.WithFields(logrus.Fields{ + "req": req, + "peer": best.Pretty(), + }).Debug("Sending batch block request") + resp, err := queue.blocksFetcher.requestBlocks(ctx, req, best) if err != nil { log.WithError(err).Error("Failed to receive blocks, exiting init sync") return nil @@ -121,44 +118,6 @@ func (s *Service) roundRobinSync(genesis time.Time) error { return nil } -// requestBlocks by range to a specific peer. -func (s *Service) requestBlocks(ctx context.Context, req *p2ppb.BeaconBlocksByRangeRequest, pid peer.ID) ([]*eth.SignedBeaconBlock, error) { - if s.blocksRateLimiter.Remaining(pid.String()) < int64(req.Count) { - log.WithField("peer", pid).Debug("Slowing down for rate limit") - time.Sleep(s.blocksRateLimiter.TillEmpty(pid.String())) - } - s.blocksRateLimiter.Add(pid.String(), int64(req.Count)) - log.WithFields(logrus.Fields{ - "peer": pid, - "start": req.StartSlot, - "count": req.Count, - "step": req.Step, - }).Debug("Requesting blocks") - stream, err := s.p2p.Send(ctx, req, p2p.RPCBlocksByRangeTopic, pid) - if err != nil { - return nil, errors.Wrap(err, "failed to send request to peer") - } - defer func() { - if err := stream.Close(); err != nil { - log.WithError(err).Error("Failed to close stream") - } - }() - - resp := make([]*eth.SignedBeaconBlock, 0, req.Count) - for { - blk, err := prysmsync.ReadChunkedBlock(stream, s.p2p) - if err == io.EOF { - break - } - if err != nil { - return nil, errors.Wrap(err, "failed to read chunked block") - } - resp = append(resp, blk) - } - - return resp, nil -} - // highestFinalizedEpoch returns the absolute highest finalized epoch of all connected peers. // Note this can be lower than our finalized epoch if we have no peers or peers that are all behind us. func (s *Service) highestFinalizedEpoch() uint64 { diff --git a/beacon-chain/sync/initial-sync/service.go b/beacon-chain/sync/initial-sync/service.go index 6e9e8a32c..f1b91da31 100644 --- a/beacon-chain/sync/initial-sync/service.go +++ b/beacon-chain/sync/initial-sync/service.go @@ -25,6 +25,7 @@ import ( var _ = shared.Service(&Service{}) +// blockchainService defines the interface for interaction with block chain service. type blockchainService interface { blockchain.BlockReceiver blockchain.HeadFetcher @@ -32,12 +33,6 @@ type blockchainService interface { blockchain.FinalizationFetcher } -const ( - handshakePollingInterval = 5 * time.Second // Polling interval for checking the number of received handshakes. - - allowedBlocksPerSecond = 32.0 -) - // Config to set up the initial sync service. type Config struct { P2P p2p.P2P @@ -66,14 +61,15 @@ type Service struct { func NewInitialSync(cfg *Config) *Service { ctx, cancel := context.WithCancel(context.Background()) return &Service{ - ctx: ctx, - cancel: cancel, - chain: cfg.Chain, - p2p: cfg.P2P, - db: cfg.DB, - stateNotifier: cfg.StateNotifier, - blockNotifier: cfg.BlockNotifier, - blocksRateLimiter: leakybucket.NewCollector(allowedBlocksPerSecond, allowedBlocksPerSecond, false /* deleteEmptyBuckets */), + ctx: ctx, + cancel: cancel, + chain: cfg.Chain, + p2p: cfg.P2P, + db: cfg.DB, + stateNotifier: cfg.StateNotifier, + blockNotifier: cfg.BlockNotifier, + blocksRateLimiter: leakybucket.NewCollector( + allowedBlocksPerSecond, allowedBlocksPerSecond, false /* deleteEmptyBuckets */), } } @@ -116,8 +112,7 @@ func (s *Service) Start() { if genesis.After(roughtime.Now()) { log.WithField( - "genesis time", - genesis, + "genesis time", genesis, ).Warn("Genesis time is in the future - waiting to start sync...") time.Sleep(roughtime.Until(genesis)) } @@ -214,7 +209,8 @@ func (s *Service) waitForMinimumPeers() { } log.WithFields(logrus.Fields{ "suitable": len(peers), - "required": required}).Info("Waiting for enough suitable peers before syncing") + "required": required, + }).Info("Waiting for enough suitable peers before syncing") time.Sleep(handshakePollingInterval) } }