prysm-pulse/beacon-chain/sync/pending_blocks_queue.go
Nishant Das 03356fc7b5
Add Ability to Resync Node (#4279)
* add resyncing functionality

* add more validation to status message

* lint and build

* jim's review

* preston's review

* clean up

* remove log

* remove no sync

* change again

* change back

* remove spaces

* Update shared/slotutil/slottime.go

Co-Authored-By: Raul Jordan <raul@prysmaticlabs.com>

* Apply suggestions from code review

Co-Authored-By: Raul Jordan <raul@prysmaticlabs.com>

* fix refs

* raul's review

* goimports

* goimports

* add counter

* removed condition

* change back

* gaz

Co-authored-by: Raul Jordan <raul@prysmaticlabs.com>
Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
2020-01-02 16:09:28 +08:00

157 lines
4.5 KiB
Go

package sync
import (
"context"
"encoding/hex"
"sort"
"time"
"github.com/pkg/errors"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/go-ssz"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/runutil"
"github.com/prysmaticlabs/prysm/shared/traceutil"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
"golang.org/x/exp/rand"
)
var processPendingBlocksPeriod = time.Duration(params.BeaconConfig().SecondsPerSlot/3) * time.Second
// processes pending blocks queue on every processPendingBlocksPeriod
func (r *Service) processPendingBlocksQueue() {
ctx := context.Background()
runutil.RunEvery(r.ctx, processPendingBlocksPeriod, func() {
r.processPendingBlocks(ctx)
})
}
// processes the block tree inside the queue
func (r *Service) processPendingBlocks(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "processPendingBlocks")
defer span.End()
pids := r.p2p.Peers().Connected()
if err := r.validatePendingSlots(); err != nil {
return errors.Wrap(err, "could not validate pending slots")
}
slots := r.sortedPendingSlots()
span.AddAttributes(
trace.Int64Attribute("numSlots", int64(len(slots))),
trace.Int64Attribute("numPeers", int64(len(pids))),
)
for _, s := range slots {
ctx, span := trace.StartSpan(ctx, "processPendingBlocks.InnerLoop")
span.AddAttributes(trace.Int64Attribute("slot", int64(s)))
r.pendingQueueLock.RLock()
b := r.slotToPendingBlocks[uint64(s)]
inPendingQueue := r.seenPendingBlocks[bytesutil.ToBytes32(b.ParentRoot)]
r.pendingQueueLock.RUnlock()
inDB := r.db.HasBlock(ctx, bytesutil.ToBytes32(b.ParentRoot))
hasPeer := len(pids) != 0
// Only request for missing parent block if it's not in DB, not in pending cache
// and has peer in the peer list.
if !inPendingQueue && !inDB && hasPeer {
log.WithFields(logrus.Fields{
"currentSlot": b.Slot,
"parentRoot": hex.EncodeToString(b.ParentRoot),
}).Info("Requesting parent block")
req := [][32]byte{bytesutil.ToBytes32(b.ParentRoot)}
if err := r.sendRecentBeaconBlocksRequest(ctx, req, pids[rand.Int()%len(pids)]); err != nil {
traceutil.AnnotateError(span, err)
log.Errorf("Could not send recent block request: %v", err)
}
span.End()
continue
}
if !inDB {
span.End()
continue
}
if err := r.chain.ReceiveBlockNoPubsub(ctx, b); err != nil {
log.Errorf("Could not process block from slot %d: %v", b.Slot, err)
traceutil.AnnotateError(span, err)
}
r.pendingQueueLock.Lock()
delete(r.slotToPendingBlocks, uint64(s))
blkRoot, err := ssz.SigningRoot(b)
if err != nil {
traceutil.AnnotateError(span, err)
span.End()
return err
}
delete(r.seenPendingBlocks, blkRoot)
r.pendingQueueLock.Unlock()
log.Infof("Processed ancestor block with slot %d and cleared pending block cache", s)
}
return nil
}
func (r *Service) sortedPendingSlots() []int {
r.pendingQueueLock.RLock()
defer r.pendingQueueLock.RUnlock()
slots := make([]int, 0, len(r.slotToPendingBlocks))
for s := range r.slotToPendingBlocks {
slots = append(slots, int(s))
}
sort.Ints(slots)
return slots
}
// validatePendingSlots validates the pending blocks
// by their slot. If they are before the current finalized
// checkpoint, these blocks are removed from the queue.
func (r *Service) validatePendingSlots() error {
r.pendingQueueLock.RLock()
defer r.pendingQueueLock.RUnlock()
oldBlockRoots := make(map[[32]byte]bool)
finalizedEpoch := r.chain.FinalizedCheckpt().Epoch
for s, b := range r.slotToPendingBlocks {
epoch := helpers.SlotToEpoch(s)
// remove all descendant blocks of old blocks
if oldBlockRoots[bytesutil.ToBytes32(b.ParentRoot)] {
root, err := ssz.SigningRoot(b)
if err != nil {
return err
}
oldBlockRoots[root] = true
delete(r.slotToPendingBlocks, s)
delete(r.seenPendingBlocks, root)
continue
}
// don't process old blocks
if finalizedEpoch > 0 && epoch <= finalizedEpoch {
blkRoot, err := ssz.SigningRoot(b)
if err != nil {
return err
}
oldBlockRoots[blkRoot] = true
delete(r.slotToPendingBlocks, s)
delete(r.seenPendingBlocks, blkRoot)
}
}
oldBlockRoots = nil
return nil
}
func (r *Service) clearPendingSlots() {
r.pendingQueueLock.Lock()
defer r.pendingQueueLock.Unlock()
r.slotToPendingBlocks = make(map[uint64]*ethpb.BeaconBlock)
r.seenPendingBlocks = make(map[[32]byte]bool)
}