prysm-pulse/beacon-chain/sync/initial-sync/blocks_queue.go
Victor Farazdagi f18bada8c9
Init sync blocks queue (#5064)
* fixes data race, by isolating critical sections

* minor refactoring: resolves blocking calls

* implements init-sync queue

* udpate fetch/send buffers in blocks fetcher

* blockState enum-like type alias

* refactors common code into releaseTicket()

* better gc

* linter

* Update beacon-chain/sync/initial-sync/blocks_queue.go

Co-Authored-By: shayzluf <thezluf@gmail.com>

* Update beacon-chain/sync/initial-sync/blocks_queue_test.go

Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
Co-authored-by: terence tsao <terence@prysmaticlabs.com>
Co-authored-by: shayzluf <thezluf@gmail.com>
Co-authored-by: Raul Jordan <raul@prysmaticlabs.com>
2020-03-16 18:21:36 +03:00

414 lines
12 KiB
Go

package initialsync
import (
"context"
"errors"
"sync"
"time"
eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/shared/mathutil"
)
const (
// queueMaxPendingRequests limits how many concurrent fetch request queue can initiate.
queueMaxPendingRequests = 8
// queueFetchRequestTimeout caps maximum amount of time before fetch requests is cancelled.
queueFetchRequestTimeout = 60 * time.Second
// queueMaxCachedBlocks hard limit on how many queue items to cache before forced dequeue.
queueMaxCachedBlocks = 8 * queueMaxPendingRequests * blockBatchSize
// queueStopCallTimeout is time allowed for queue to release resources when quitting.
queueStopCallTimeout = 1 * time.Second
)
var (
errQueueCtxIsDone = errors.New("queue's context is done, reinitialize")
errQueueTakesTooLongToStop = errors.New("queue takes too long to stop")
)
// blocksProvider exposes enough methods for queue to fetch incoming blocks.
type blocksProvider interface {
requestResponses() <-chan *fetchRequestResponse
scheduleRequest(ctx context.Context, start, count uint64) error
start() error
stop()
}
// blocksQueueConfig is a config to setup block queue service.
type blocksQueueConfig struct {
blocksFetcher blocksProvider
headFetcher blockchain.HeadFetcher
startSlot uint64
highestExpectedSlot uint64
p2p p2p.P2P
}
// blocksQueueState holds internal queue state (for easier management of state transitions).
type blocksQueueState struct {
scheduler *schedulerState
sender *senderState
cachedBlocks map[uint64]*cachedBlock
}
// blockState enums possible queue block states.
type blockState uint8
const (
// pendingBlock is a default block status when just added to queue.
pendingBlock = iota
// validBlock represents block that can be processed.
validBlock
// skippedBlock is a block for a slot that is not found on any available peers.
skippedBlock
// failedBlock represents block that can not be processed at the moment.
failedBlock
// blockStateLen is a sentinel to know number of possible block states.
blockStateLen
)
// schedulerState a state of scheduling process.
type schedulerState struct {
sync.Mutex
currentSlot uint64
blockBatchSize uint64
requestedBlocks map[blockState]uint64
}
// senderState is a state of block sending process.
type senderState struct {
sync.Mutex
}
// cachedBlock is a container for signed beacon block.
type cachedBlock struct {
*eth.SignedBeaconBlock
}
// blocksQueue is a priority queue that serves as a intermediary between block fetchers (producers)
// and block processing goroutine (consumer). Consumer can rely on order of incoming blocks.
type blocksQueue struct {
ctx context.Context
cancel context.CancelFunc
highestExpectedSlot uint64
state *blocksQueueState
blocksFetcher blocksProvider
headFetcher blockchain.HeadFetcher
fetchedBlocks chan *eth.SignedBeaconBlock // output channel for ready blocks
pendingFetchRequests chan struct{} // pending requests semaphore
pendingFetchedBlocks chan struct{} // notifier, pings block sending handler
quit chan struct{} // termination notifier
}
// newBlocksQueue creates initialized priority queue.
func newBlocksQueue(ctx context.Context, cfg *blocksQueueConfig) *blocksQueue {
ctx, cancel := context.WithCancel(ctx)
blocksFetcher := cfg.blocksFetcher
if blocksFetcher == nil {
blocksFetcher = newBlocksFetcher(ctx, &blocksFetcherConfig{
headFetcher: cfg.headFetcher,
p2p: cfg.p2p,
})
}
return &blocksQueue{
ctx: ctx,
cancel: cancel,
highestExpectedSlot: cfg.highestExpectedSlot,
state: &blocksQueueState{
scheduler: &schedulerState{
currentSlot: cfg.startSlot,
blockBatchSize: blockBatchSize,
requestedBlocks: make(map[blockState]uint64, blockStateLen),
},
sender: &senderState{},
cachedBlocks: make(map[uint64]*cachedBlock, queueMaxCachedBlocks),
},
blocksFetcher: blocksFetcher,
headFetcher: cfg.headFetcher,
fetchedBlocks: make(chan *eth.SignedBeaconBlock, blockBatchSize),
pendingFetchRequests: make(chan struct{}, queueMaxPendingRequests),
pendingFetchedBlocks: make(chan struct{}, queueMaxPendingRequests),
quit: make(chan struct{}),
}
}
// start boots up the queue processing.
func (q *blocksQueue) start() error {
select {
case <-q.ctx.Done():
return errQueueCtxIsDone
default:
go q.loop()
return nil
}
}
// stop terminates all queue operations.
func (q *blocksQueue) stop() error {
q.cancel()
select {
case <-q.quit:
return nil
case <-time.After(queueStopCallTimeout):
return errQueueTakesTooLongToStop
}
}
// loop is a main queue loop.
func (q *blocksQueue) loop() {
defer close(q.quit)
// Wait for all goroutines to wrap up (forced by cancelled context), and do a cleanup.
wg := &sync.WaitGroup{}
defer func() {
wg.Wait()
q.blocksFetcher.stop()
close(q.fetchedBlocks)
}()
if err := q.blocksFetcher.start(); err != nil {
log.WithError(err).Debug("Can not start blocks provider")
}
// Reads from semaphore channel, thus allowing next goroutine to grab it and schedule next request.
releaseTicket := func() {
select {
case <-q.ctx.Done():
case <-q.pendingFetchRequests:
}
}
for {
if q.headFetcher.HeadSlot() >= q.highestExpectedSlot {
log.Debug("Highest expected slot reached")
q.cancel()
}
select {
case <-q.ctx.Done():
log.Debug("Context closed, exiting goroutine (blocks queue)")
return
case q.pendingFetchRequests <- struct{}{}:
wg.Add(1)
go func() {
defer wg.Done()
// Schedule request.
if err := q.scheduleFetchRequests(q.ctx); err != nil {
q.state.scheduler.incrementCounter(failedBlock, blockBatchSize)
releaseTicket()
}
}()
case response, ok := <-q.blocksFetcher.requestResponses():
if !ok {
log.Debug("Fetcher closed output channel")
q.cancel()
return
}
// Release semaphore ticket.
go releaseTicket()
// Process incoming response into blocks.
wg.Add(1)
go func() {
defer func() {
select {
case <-q.ctx.Done():
case q.pendingFetchedBlocks <- struct{}{}: // notify sender of data availability
}
wg.Done()
}()
skippedBlocks, err := q.parseFetchResponse(q.ctx, response)
if err != nil {
q.state.scheduler.incrementCounter(failedBlock, response.count)
return
}
q.state.scheduler.incrementCounter(skippedBlock, skippedBlocks)
}()
case <-q.pendingFetchedBlocks:
wg.Add(1)
go func() {
defer wg.Done()
if err := q.sendFetchedBlocks(q.ctx); err != nil {
log.WithError(err).Debug("Error sending received blocks")
}
}()
}
}
}
// scheduleFetchRequests enqueues block fetch requests to block fetcher.
func (q *blocksQueue) scheduleFetchRequests(ctx context.Context) error {
q.state.scheduler.Lock()
defer q.state.scheduler.Unlock()
if ctx.Err() != nil {
return ctx.Err()
}
s := q.state.scheduler
blocks := q.state.scheduler.requestedBlocks
func() {
resetStateCounters := func() {
for i := 0; i < blockStateLen; i++ {
blocks[blockState(i)] = 0
}
s.currentSlot = q.headFetcher.HeadSlot()
}
// Update state's current slot pointer.
count := blocks[pendingBlock] + blocks[skippedBlock] + blocks[failedBlock] + blocks[validBlock]
if count == 0 {
s.currentSlot = q.headFetcher.HeadSlot()
return
}
// Too many failures (blocks that can't be processed at this time).
if blocks[failedBlock] >= s.blockBatchSize/2 {
s.blockBatchSize *= 2
resetStateCounters()
return
}
// Given enough valid blocks, we can set back block batch size.
if blocks[validBlock] >= blockBatchSize && s.blockBatchSize != blockBatchSize {
blocks[skippedBlock], blocks[validBlock] = blocks[skippedBlock]+blocks[validBlock], 0
s.blockBatchSize = blockBatchSize
}
// Too many items in scheduler, time to update current slot to point to current head's slot.
count = blocks[pendingBlock] + blocks[skippedBlock] + blocks[failedBlock] + blocks[validBlock]
if count >= queueMaxCachedBlocks {
s.blockBatchSize = blockBatchSize
resetStateCounters()
return
}
// All blocks processed, no pending blocks.
count = blocks[skippedBlock] + blocks[failedBlock] + blocks[validBlock]
if count > 0 && blocks[pendingBlock] == 0 {
s.blockBatchSize = blockBatchSize
resetStateCounters()
return
}
}()
offset := blocks[pendingBlock] + blocks[skippedBlock] + blocks[failedBlock] + blocks[validBlock]
start := q.state.scheduler.currentSlot + offset + 1
count := mathutil.Min(q.state.scheduler.blockBatchSize, q.highestExpectedSlot-start+1)
if count <= 0 {
return errStartSlotIsTooHigh
}
ctx, _ = context.WithTimeout(ctx, queueFetchRequestTimeout)
if err := q.blocksFetcher.scheduleRequest(ctx, start, count); err != nil {
return err
}
q.state.scheduler.requestedBlocks[pendingBlock] += count
return nil
}
// parseFetchResponse processes incoming responses.
func (q *blocksQueue) parseFetchResponse(ctx context.Context, response *fetchRequestResponse) (uint64, error) {
q.state.sender.Lock()
defer q.state.sender.Unlock()
if ctx.Err() != nil {
return 0, ctx.Err()
}
if response.err != nil {
return 0, response.err
}
// Extract beacon blocks.
responseBlocks := make(map[uint64]*eth.SignedBeaconBlock, len(response.blocks))
for _, blk := range response.blocks {
responseBlocks[blk.Block.Slot] = blk
}
// Cache blocks in [start, start + count) range, include skipped blocks.
var skippedBlocks uint64
end := response.start + mathutil.Max(response.count, uint64(len(response.blocks)))
for slot := response.start; slot < end; slot++ {
if block, ok := responseBlocks[slot]; ok {
q.state.cachedBlocks[slot] = &cachedBlock{
SignedBeaconBlock: block,
}
delete(responseBlocks, slot)
continue
}
q.state.cachedBlocks[slot] = &cachedBlock{}
skippedBlocks++
}
// If there are any items left in incoming response, cache them too.
for slot, block := range responseBlocks {
q.state.cachedBlocks[slot] = &cachedBlock{
SignedBeaconBlock: block,
}
}
return skippedBlocks, nil
}
// sendFetchedBlocks analyses available blocks, and sends them downstream in a correct slot order.
// Blocks are checked starting from the current head slot, and up until next consecutive block is available.
func (q *blocksQueue) sendFetchedBlocks(ctx context.Context) error {
q.state.sender.Lock()
defer q.state.sender.Unlock()
startSlot := q.headFetcher.HeadSlot() + 1
nonSkippedSlot := uint64(0)
for slot := startSlot; slot <= q.highestExpectedSlot; slot++ {
if ctx.Err() != nil {
return ctx.Err()
}
blockData, ok := q.state.cachedBlocks[slot]
if !ok {
break
}
if blockData.SignedBeaconBlock != nil && blockData.Block != nil {
select {
case <-ctx.Done():
return ctx.Err()
case q.fetchedBlocks <- blockData.SignedBeaconBlock:
}
nonSkippedSlot = slot
}
}
// Remove processed blocks.
if nonSkippedSlot > 0 {
for slot := range q.state.cachedBlocks {
if slot <= nonSkippedSlot {
delete(q.state.cachedBlocks, slot)
}
}
}
return nil
}
// incrementCounter increments particular scheduler counter.
func (s *schedulerState) incrementCounter(counter blockState, n uint64) {
s.Lock()
defer s.Unlock()
// Assert that counter is within acceptable boundaries.
if counter < 1 || counter >= blockStateLen {
return
}
n = mathutil.Min(s.requestedBlocks[pendingBlock], n)
s.requestedBlocks[counter] += n
s.requestedBlocks[pendingBlock] -= n
}