Refactors block fetching function to fetcher (#5766)

* refactors block fetching function to fetcher
* more comments fixed
* Merge refs/heads/master into init-sync-upd
* Merge refs/heads/master into init-sync-upd
* Merge refs/heads/master into init-sync-upd
This commit is contained in:
Victor Farazdagi 2020-05-07 00:29:50 +03:00 committed by GitHub
parent 38f2ec6d7d
commit c4eb8c7a16
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 33 additions and 82 deletions

View File

@ -30,8 +30,14 @@ import (
const (
// maxPendingRequests limits how many concurrent fetch request one can initiate.
maxPendingRequests = 8
// allowedBlocksPerSecond is number of blocks (per peer) fetcher can request per second.
allowedBlocksPerSecond = 32.0
// blockBatchSize is a limit on number of blocks fetched per request.
blockBatchSize = 32
// peersPercentagePerRequest caps percentage of peers to be used in a request.
peersPercentagePerRequest = 0.75
// handshakePollingInterval is a polling interval for checking the number of received handshakes.
handshakePollingInterval = 5 * time.Second
)
var (
@ -83,7 +89,7 @@ func newBlocksFetcher(ctx context.Context, cfg *blocksFetcherConfig) *blocksFetc
rateLimiter := leakybucket.NewCollector(
allowedBlocksPerSecond, /* rate */
allowedBlocksPerSecond, /* capacity */
false /* deleteEmptyBuckets */)
false /* deleteEmptyBuckets */)
return &blocksFetcher{
ctx: ctx,

View File

@ -31,19 +31,9 @@ var (
errInputNotFetchRequestParams = errors.New("input data is not type *fetchRequestParams")
)
// blocksProvider exposes enough methods for queue to fetch incoming blocks.
type blocksProvider interface {
requestResponses() <-chan *fetchRequestResponse
scheduleRequest(ctx context.Context, start, count uint64) error
nonSkippedSlotAfter(ctx context.Context, slot uint64) (uint64, error)
bestFinalizedSlot() uint64
start() error
stop()
}
// blocksQueueConfig is a config to setup block queue service.
type blocksQueueConfig struct {
blocksFetcher blocksProvider
blocksFetcher *blocksFetcher
headFetcher blockchain.HeadFetcher
startSlot uint64
highestExpectedSlot uint64
@ -57,7 +47,7 @@ type blocksQueue struct {
cancel context.CancelFunc
highestExpectedSlot uint64
state *stateMachine
blocksFetcher blocksProvider
blocksFetcher *blocksFetcher
headFetcher blockchain.HeadFetcher
fetchedBlocks chan *eth.SignedBeaconBlock // output channel for ready blocks
quit chan struct{} // termination notifier

View File

@ -4,20 +4,15 @@ import (
"context"
"encoding/hex"
"fmt"
"io"
"time"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/paulbellamy/ratecounter"
"github.com/pkg/errors"
eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
blockfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/block"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/core/state"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/beacon-chain/state/stateutil"
prysmsync "github.com/prysmaticlabs/prysm/beacon-chain/sync"
p2ppb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
@ -25,9 +20,12 @@ import (
"github.com/sirupsen/logrus"
)
const blockBatchSize = 32
const counterSeconds = 20
const refreshTime = 6 * time.Second
const (
// counterSeconds is an interval over which an average rate will be calculated.
counterSeconds = 20
// refreshTime defines an interval at which suitable peer is checked during 2nd phase of sync.
refreshTime = 6 * time.Second
)
// Round Robin sync looks at the latest peer statuses and syncs with the highest
// finalized peer.
@ -95,12 +93,11 @@ func (s *Service) roundRobinSync(genesis time.Time) error {
Count: mathutil.Min(helpers.SlotsSince(genesis)-s.chain.HeadSlot()+1, allowedBlocksPerSecond),
Step: 1,
}
log.WithField("req", req).WithField("peer", best.Pretty()).Debug(
"Sending batch block request",
)
resp, err := s.requestBlocks(ctx, req, best)
log.WithFields(logrus.Fields{
"req": req,
"peer": best.Pretty(),
}).Debug("Sending batch block request")
resp, err := queue.blocksFetcher.requestBlocks(ctx, req, best)
if err != nil {
log.WithError(err).Error("Failed to receive blocks, exiting init sync")
return nil
@ -121,44 +118,6 @@ func (s *Service) roundRobinSync(genesis time.Time) error {
return nil
}
// requestBlocks by range to a specific peer.
func (s *Service) requestBlocks(ctx context.Context, req *p2ppb.BeaconBlocksByRangeRequest, pid peer.ID) ([]*eth.SignedBeaconBlock, error) {
if s.blocksRateLimiter.Remaining(pid.String()) < int64(req.Count) {
log.WithField("peer", pid).Debug("Slowing down for rate limit")
time.Sleep(s.blocksRateLimiter.TillEmpty(pid.String()))
}
s.blocksRateLimiter.Add(pid.String(), int64(req.Count))
log.WithFields(logrus.Fields{
"peer": pid,
"start": req.StartSlot,
"count": req.Count,
"step": req.Step,
}).Debug("Requesting blocks")
stream, err := s.p2p.Send(ctx, req, p2p.RPCBlocksByRangeTopic, pid)
if err != nil {
return nil, errors.Wrap(err, "failed to send request to peer")
}
defer func() {
if err := stream.Close(); err != nil {
log.WithError(err).Error("Failed to close stream")
}
}()
resp := make([]*eth.SignedBeaconBlock, 0, req.Count)
for {
blk, err := prysmsync.ReadChunkedBlock(stream, s.p2p)
if err == io.EOF {
break
}
if err != nil {
return nil, errors.Wrap(err, "failed to read chunked block")
}
resp = append(resp, blk)
}
return resp, nil
}
// highestFinalizedEpoch returns the absolute highest finalized epoch of all connected peers.
// Note this can be lower than our finalized epoch if we have no peers or peers that are all behind us.
func (s *Service) highestFinalizedEpoch() uint64 {

View File

@ -25,6 +25,7 @@ import (
var _ = shared.Service(&Service{})
// blockchainService defines the interface for interaction with block chain service.
type blockchainService interface {
blockchain.BlockReceiver
blockchain.HeadFetcher
@ -32,12 +33,6 @@ type blockchainService interface {
blockchain.FinalizationFetcher
}
const (
handshakePollingInterval = 5 * time.Second // Polling interval for checking the number of received handshakes.
allowedBlocksPerSecond = 32.0
)
// Config to set up the initial sync service.
type Config struct {
P2P p2p.P2P
@ -66,14 +61,15 @@ type Service struct {
func NewInitialSync(cfg *Config) *Service {
ctx, cancel := context.WithCancel(context.Background())
return &Service{
ctx: ctx,
cancel: cancel,
chain: cfg.Chain,
p2p: cfg.P2P,
db: cfg.DB,
stateNotifier: cfg.StateNotifier,
blockNotifier: cfg.BlockNotifier,
blocksRateLimiter: leakybucket.NewCollector(allowedBlocksPerSecond, allowedBlocksPerSecond, false /* deleteEmptyBuckets */),
ctx: ctx,
cancel: cancel,
chain: cfg.Chain,
p2p: cfg.P2P,
db: cfg.DB,
stateNotifier: cfg.StateNotifier,
blockNotifier: cfg.BlockNotifier,
blocksRateLimiter: leakybucket.NewCollector(
allowedBlocksPerSecond, allowedBlocksPerSecond, false /* deleteEmptyBuckets */),
}
}
@ -116,8 +112,7 @@ func (s *Service) Start() {
if genesis.After(roughtime.Now()) {
log.WithField(
"genesis time",
genesis,
"genesis time", genesis,
).Warn("Genesis time is in the future - waiting to start sync...")
time.Sleep(roughtime.Until(genesis))
}
@ -214,7 +209,8 @@ func (s *Service) waitForMinimumPeers() {
}
log.WithFields(logrus.Fields{
"suitable": len(peers),
"required": required}).Info("Waiting for enough suitable peers before syncing")
"required": required,
}).Info("Waiting for enough suitable peers before syncing")
time.Sleep(handshakePollingInterval)
}
}