prysm-pulse/beacon-chain/sync/block_batcher.go

235 lines
7.4 KiB
Go

package sync
import (
"context"
"fmt"
"sort"
"time"
libp2pcore "github.com/libp2p/go-libp2p/core"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filters"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
)
// blockRangeBatcher encapsulates the logic for splitting up a block range request into fixed-size batches of
// blocks that are retrieved from the database, ensured to be canonical, sequential and unique.
// If a non-nil value for ticker is set, it will be used to pause between batches lookups, as a rate-limiter.
type blockRangeBatcher struct {
start primitives.Slot
end primitives.Slot
size uint64
db db.NoHeadAccessDatabase
limiter *limiter
ticker *time.Ticker
cf *canonicalFilter
current *blockBatch
}
func newBlockRangeBatcher(rp rangeParams, bdb db.NoHeadAccessDatabase, limiter *limiter, canonical canonicalChecker, ticker *time.Ticker) (*blockRangeBatcher, error) {
if bdb == nil {
return nil, errors.New("nil db param, unable to initialize blockRangeBatcher")
}
if limiter == nil {
return nil, errors.New("nil limiter param, unable to initialize blockRangeBatcher")
}
if canonical == nil {
return nil, errors.New("nil canonicalChecker param, unable to initialize blockRangeBatcher")
}
if ticker == nil {
return nil, errors.New("nil ticker param, unable to initialize blockRangeBatcher")
}
if rp.size == 0 {
return nil, fmt.Errorf("invalid batch size of %d", rp.size)
}
if rp.end < rp.start {
return nil, fmt.Errorf("batch end slot %d is lower than batch start %d", rp.end, rp.start)
}
cf := &canonicalFilter{canonical: canonical}
return &blockRangeBatcher{
start: rp.start,
end: rp.end,
size: rp.size,
db: bdb,
limiter: limiter,
ticker: ticker,
cf: cf,
}, nil
}
func (bb *blockRangeBatcher) next(ctx context.Context, stream libp2pcore.Stream) (blockBatch, bool) {
var nb blockBatch
var more bool
// The result of each call to next() is saved in the `current` field.
// If current is not nil, current.next figures out the next batch based on the previous one.
// If current is nil, newBlockBatch is used to generate the first batch.
if bb.current != nil {
current := *bb.current
nb, more = current.next(bb.end, bb.size)
} else {
nb, more = newBlockBatch(bb.start, bb.end, bb.size)
}
// newBlockBatch and next() both return a boolean to indicate whether calling .next() will yield another batch
// (based on the whether we've gotten to the end slot yet). blockRangeBatcher.next does the same,
// and returns (zero value, false), to signal the end of the iteration.
if !more {
return blockBatch{}, false
}
if err := bb.limiter.validateRequest(stream, bb.size); err != nil {
return blockBatch{err: errors.Wrap(err, "throttled by rate limiter")}, false
}
// Wait for the ticker before doing anything expensive, unless this is the first batch.
if bb.ticker != nil && bb.current != nil {
<-bb.ticker.C
}
filter := filters.NewFilter().SetStartSlot(nb.start).SetEndSlot(nb.end)
blks, roots, err := bb.db.Blocks(ctx, filter)
if err != nil {
return blockBatch{err: errors.Wrap(err, "Could not retrieve blocks")}, false
}
rob := make([]blocks.ROBlock, 0)
if nb.start == 0 {
gb, err := bb.genesisBlock(ctx)
if err != nil {
return blockBatch{err: errors.Wrap(err, "could not retrieve genesis block")}, false
}
rob = append(rob, gb)
}
for i := 0; i < len(blks); i++ {
rb, err := blocks.NewROBlockWithRoot(blks[i], roots[i])
if err != nil {
return blockBatch{err: errors.Wrap(err, "Could not initialize ROBlock")}, false
}
rob = append(rob, rb)
}
// Filter and sort our retrieved blocks, so that we only return valid sets of blocks.
nb.lin, nb.nonlin, nb.err = bb.cf.filter(ctx, rob)
// Decrease allowed blocks capacity by the number of streamed blocks.
bb.limiter.add(stream, int64(1+nb.end.SubSlot(nb.start)))
bb.current = &nb
return *bb.current, true
}
func (bb *blockRangeBatcher) genesisBlock(ctx context.Context) (blocks.ROBlock, error) {
b, err := bb.db.GenesisBlock(ctx)
if err != nil {
return blocks.ROBlock{}, err
}
htr, err := b.Block().HashTreeRoot()
if err != nil {
return blocks.ROBlock{}, err
}
return blocks.NewROBlockWithRoot(b, htr)
}
type blockBatch struct {
start primitives.Slot
end primitives.Slot
lin []blocks.ROBlock // lin is a linear chain of blocks connected through parent_root. broken tails go in nonlin.
nonlin []blocks.ROBlock // if there is a break in the chain of parent->child relationships, the tail is stored here.
err error
}
func newBlockBatch(start, reqEnd primitives.Slot, size uint64) (blockBatch, bool) {
if start > reqEnd {
return blockBatch{}, false
}
if size == 0 {
return blockBatch{}, false
}
nb := blockBatch{start: start, end: start.Add(size - 1)}
if nb.end > reqEnd {
nb.end = reqEnd
}
return nb, true
}
func (bb blockBatch) next(reqEnd primitives.Slot, size uint64) (blockBatch, bool) {
if bb.error() != nil {
return bb, false
}
if bb.nonLinear() {
return blockBatch{}, false
}
return newBlockBatch(bb.end.Add(1), reqEnd, size)
}
// blocks returns the list of linear, canonical blocks read from the db.
func (bb blockBatch) canonical() []blocks.ROBlock {
return bb.lin
}
// nonLinear is used to determine if there was a break in the chain of canonical blocks as read from the db.
// If true, code using the blockBatch should stop serving additional batches of blocks.
func (bb blockBatch) nonLinear() bool {
return len(bb.nonlin) > 0
}
func (bb blockBatch) error() error {
return bb.err
}
type canonicalChecker func(context.Context, [32]byte) (bool, error)
type canonicalFilter struct {
prevRoot [32]byte
canonical canonicalChecker
}
// filters all the provided blocks to ensure they are canonical and strictly linear.
func (cf *canonicalFilter) filter(ctx context.Context, blks []blocks.ROBlock) ([]blocks.ROBlock, []blocks.ROBlock, error) {
blks = sortedUniqueBlocks(blks)
seq := make([]blocks.ROBlock, 0, len(blks))
nseq := make([]blocks.ROBlock, 0)
for i, b := range blks {
cb, err := cf.canonical(ctx, b.Root())
if err != nil {
return nil, nil, err
}
if !cb {
continue
}
// prevRoot will be the zero value until we find the first canonical block in the stream seen by an instance
// of canonicalFilter. filter is called in batches; prevRoot can be the last root from the previous batch.
first := cf.prevRoot == [32]byte{}
// We assume blocks are processed in order, so the previous canonical root should be the parent of the next.
if !first && cf.prevRoot != b.Block().ParentRoot() {
// If the current block isn't descended from the last, something is wrong. Append everything remaining
// to the list of non-linear blocks, and stop building the canonical list.
nseq = append(nseq, blks[i:]...)
break
}
seq = append(seq, blks[i])
// Set the previous root as the
// newly added block's root
cf.prevRoot = b.Root()
}
return seq, nseq, nil
}
// returns a copy of the []ROBlock list in sorted order with duplicates removed
func sortedUniqueBlocks(blks []blocks.ROBlock) []blocks.ROBlock {
// Remove duplicate blocks received
sort.Sort(blocks.ROBlockSlice(blks))
if len(blks) < 2 {
return blks
}
u := 0
for i := 1; i < len(blks); i++ {
if blks[i].Root() != blks[u].Root() {
u += 1
if u != i {
blks[u] = blks[i]
}
}
}
return blks[:u+1]
}