prysm-pulse/beacon-chain/sync/pending_blocks_queue.go
terence tsao d54cefbe42
Part 1 of blockchain pkg clean up (#6537)
* Fix span names

* Clean up on block func

* Add updateFinalized helper

* Deprecate ReceiveBlockNoPubsub for ReceiveBlock

* Proposer to broadcast block

* Update migrate interface

* Replace ReceiveBlock for all

* Go fmt
2020-07-09 18:50:48 -05:00

200 lines
5.8 KiB
Go

package sync
import (
"context"
"encoding/hex"
"sort"
"sync"
"github.com/pkg/errors"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/state/stateutil"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/rand"
"github.com/prysmaticlabs/prysm/shared/runutil"
"github.com/prysmaticlabs/prysm/shared/slotutil"
"github.com/prysmaticlabs/prysm/shared/traceutil"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)
var processPendingBlocksPeriod = slotutil.DivideSlotBy(3 /* times per slot */)
// processes pending blocks queue on every processPendingBlocksPeriod
func (s *Service) processPendingBlocksQueue() {
ctx := context.Background()
// Prevents multiple queue processing goroutines (invoked by RunEvery) from contending for data.
locker := new(sync.Mutex)
runutil.RunEvery(s.ctx, processPendingBlocksPeriod, func() {
locker.Lock()
if err := s.processPendingBlocks(ctx); err != nil {
log.WithError(err).Error("Failed to process pending blocks")
}
locker.Unlock()
})
}
// processes the block tree inside the queue
func (s *Service) processPendingBlocks(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "processPendingBlocks")
defer span.End()
pids := s.p2p.Peers().Connected()
if err := s.validatePendingSlots(); err != nil {
return errors.Wrap(err, "could not validate pending slots")
}
slots := s.sortedPendingSlots()
span.AddAttributes(
trace.Int64Attribute("numSlots", int64(len(slots))),
trace.Int64Attribute("numPeers", int64(len(pids))),
)
randGen := rand.NewGenerator()
for _, slot := range slots {
ctx, span := trace.StartSpan(ctx, "processPendingBlocks.InnerLoop")
span.AddAttributes(trace.Int64Attribute("slot", int64(slot)))
s.pendingQueueLock.RLock()
b := s.slotToPendingBlocks[slot]
// Skip if block does not exist.
if b == nil || b.Block == nil {
s.pendingQueueLock.RUnlock()
span.End()
continue
}
s.pendingQueueLock.RUnlock()
inPendingQueue := s.seenPendingBlocks[bytesutil.ToBytes32(b.Block.ParentRoot)]
inDB := s.db.HasBlock(ctx, bytesutil.ToBytes32(b.Block.ParentRoot))
hasPeer := len(pids) != 0
// Only request for missing parent block if it's not in DB, not in pending cache
// and has peer in the peer list.
if !inPendingQueue && !inDB && hasPeer {
log.WithFields(logrus.Fields{
"currentSlot": b.Block.Slot,
"parentRoot": hex.EncodeToString(bytesutil.Trunc(b.Block.ParentRoot)),
}).Info("Requesting parent block")
req := [][32]byte{bytesutil.ToBytes32(b.Block.ParentRoot)}
// Start with a random peer to query, but choose the first peer in our unsorted list that claims to
// have a head slot newer than the block slot we are requesting.
pid := pids[randGen.Int()%len(pids)]
for _, p := range pids {
cs, err := s.p2p.Peers().ChainState(p)
if err != nil {
return errors.Wrap(err, "failed to read chain state for peer")
}
if cs != nil && cs.HeadSlot >= slot {
pid = p
break
}
}
if err := s.sendRecentBeaconBlocksRequest(ctx, req, pid); err != nil {
if err = s.sendRecentBeaconBlocksRequestFallback(ctx, req, pid); err != nil {
traceutil.AnnotateError(span, err)
log.Errorf("Could not send recent block request: %v", err)
}
}
span.End()
continue
}
if !inDB {
span.End()
continue
}
blkRoot, err := stateutil.BlockRoot(b.Block)
if err != nil {
traceutil.AnnotateError(span, err)
span.End()
return err
}
if err := s.chain.ReceiveBlock(ctx, b, blkRoot); err != nil {
log.Errorf("Could not process block from slot %d: %v", b.Block.Slot, err)
traceutil.AnnotateError(span, err)
}
// Broadcasting the block again once a node is able to process it.
if err := s.p2p.Broadcast(ctx, b); err != nil {
log.WithError(err).Error("Failed to broadcast block")
}
s.pendingQueueLock.Lock()
delete(s.slotToPendingBlocks, slot)
delete(s.seenPendingBlocks, blkRoot)
s.pendingQueueLock.Unlock()
log.WithFields(logrus.Fields{
"slot": slot,
"blockRoot": hex.EncodeToString(bytesutil.Trunc(blkRoot[:])),
}).Debug("Processed pending block and cleared it in cache")
span.End()
}
return nil
}
func (s *Service) sortedPendingSlots() []uint64 {
s.pendingQueueLock.RLock()
defer s.pendingQueueLock.RUnlock()
slots := make([]uint64, 0, len(s.slotToPendingBlocks))
for slot := range s.slotToPendingBlocks {
slots = append(slots, slot)
}
sort.Slice(slots, func(i, j int) bool {
return slots[i] < slots[j]
})
return slots
}
// validatePendingSlots validates the pending blocks
// by their slot. If they are before the current finalized
// checkpoint, these blocks are removed from the queue.
func (s *Service) validatePendingSlots() error {
s.pendingQueueLock.Lock()
defer s.pendingQueueLock.Unlock()
oldBlockRoots := make(map[[32]byte]bool)
finalizedEpoch := s.chain.FinalizedCheckpt().Epoch
for slot, b := range s.slotToPendingBlocks {
epoch := helpers.SlotToEpoch(slot)
// remove all descendant blocks of old blocks
if oldBlockRoots[bytesutil.ToBytes32(b.Block.ParentRoot)] {
root, err := stateutil.BlockRoot(b.Block)
if err != nil {
return err
}
oldBlockRoots[root] = true
delete(s.slotToPendingBlocks, slot)
delete(s.seenPendingBlocks, root)
continue
}
// don't process old blocks
if finalizedEpoch > 0 && epoch <= finalizedEpoch {
blkRoot, err := stateutil.BlockRoot(b.Block)
if err != nil {
return err
}
oldBlockRoots[blkRoot] = true
delete(s.slotToPendingBlocks, slot)
delete(s.seenPendingBlocks, blkRoot)
}
}
return nil
}
func (s *Service) clearPendingSlots() {
s.pendingQueueLock.Lock()
defer s.pendingQueueLock.Unlock()
s.slotToPendingBlocks = make(map[uint64]*ethpb.SignedBeaconBlock)
s.seenPendingBlocks = make(map[[32]byte]bool)
}