mirror of
https://gitlab.com/pulsechaincom/prysm-pulse.git
synced 2024-12-25 12:57:18 +00:00
a069738c20
* update shared/params * update eth2-types deps * update protobufs * update shared/* * fix testutil/state * update beacon-chain/state * update beacon-chain/db * update tests * fix test * update beacon-chain/core * update beacon-chain/blockchain * update beacon-chain/cache * beacon-chain/forkchoice * update beacon-chain/operations * update beacon-chain/p2p * update beacon-chain/rpc * update sync/initial-sync * update deps * update deps * go fmt * update beacon-chain/sync * update endtoend/ * bazel build //beacon-chain - works w/o issues * update slasher code * udpate tools/ * update validator/ * update fastssz * fix build * fix test building * update tests * update ethereumapis deps * fix tests * update state/stategen * fix build * fix test * add FarFutureSlot * go imports * Radek's suggestions * Ivan's suggestions * type conversions * Nishant's suggestions * add more tests to rpc_send_request * fix test * clean up * fix conflicts Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com> Co-authored-by: nisdas <nishdas93@gmail.com>
122 lines
3.9 KiB
Go
122 lines
3.9 KiB
Go
package sync
|
|
|
|
import (
|
|
"context"
|
|
"io"
|
|
|
|
"github.com/libp2p/go-libp2p-core/peer"
|
|
"github.com/pkg/errors"
|
|
types "github.com/prysmaticlabs/eth2-types"
|
|
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
|
|
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
|
|
p2ptypes "github.com/prysmaticlabs/prysm/beacon-chain/p2p/types"
|
|
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
|
|
"github.com/prysmaticlabs/prysm/shared/params"
|
|
)
|
|
|
|
// ErrInvalidFetchedData is thrown if stream fails to provide requested blocks.
|
|
var ErrInvalidFetchedData = errors.New("invalid data returned from peer")
|
|
|
|
// BeaconBlockProcessor defines a block processing function, which allows to start utilizing
|
|
// blocks even before all blocks are ready.
|
|
type BeaconBlockProcessor func(block *ethpb.SignedBeaconBlock) error
|
|
|
|
// SendBeaconBlocksByRangeRequest sends BeaconBlocksByRange and returns fetched blocks, if any.
|
|
func SendBeaconBlocksByRangeRequest(
|
|
ctx context.Context, p2pProvider p2p.P2P, pid peer.ID,
|
|
req *pb.BeaconBlocksByRangeRequest, blockProcessor BeaconBlockProcessor,
|
|
) ([]*ethpb.SignedBeaconBlock, error) {
|
|
stream, err := p2pProvider.Send(ctx, req, p2p.RPCBlocksByRangeTopic, pid)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer closeStream(stream, log)
|
|
|
|
// Augment block processing function, if non-nil block processor is provided.
|
|
blocks := make([]*ethpb.SignedBeaconBlock, 0, req.Count)
|
|
process := func(blk *ethpb.SignedBeaconBlock) error {
|
|
blocks = append(blocks, blk)
|
|
if blockProcessor != nil {
|
|
return blockProcessor(blk)
|
|
}
|
|
return nil
|
|
}
|
|
var prevSlot types.Slot
|
|
for i := uint64(0); ; i++ {
|
|
isFirstChunk := i == 0
|
|
blk, err := ReadChunkedBlock(stream, p2pProvider, isFirstChunk)
|
|
if errors.Is(err, io.EOF) {
|
|
break
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// The response MUST contain no more than `count` blocks, and no more than
|
|
// MAX_REQUEST_BLOCKS blocks.
|
|
if i >= req.Count || i >= params.BeaconNetworkConfig().MaxRequestBlocks {
|
|
return nil, ErrInvalidFetchedData
|
|
}
|
|
// Returned blocks MUST be in the slot range [start_slot, start_slot + count * step).
|
|
if blk.Block.Slot < req.StartSlot || blk.Block.Slot >= req.StartSlot.Add(req.Count*req.Step) {
|
|
return nil, ErrInvalidFetchedData
|
|
}
|
|
// Returned blocks, where they exist, MUST be sent in a consecutive order.
|
|
// Consecutive blocks MUST have values in `step` increments (slots may be skipped in between).
|
|
isSlotOutOfOrder := false
|
|
if prevSlot >= blk.Block.Slot {
|
|
isSlotOutOfOrder = true
|
|
} else if req.Step != 0 && blk.Block.Slot.SubSlot(prevSlot).Mod(req.Step) != 0 {
|
|
isSlotOutOfOrder = true
|
|
}
|
|
if !isFirstChunk && isSlotOutOfOrder {
|
|
return nil, ErrInvalidFetchedData
|
|
}
|
|
prevSlot = blk.Block.Slot
|
|
if err := process(blk); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
return blocks, nil
|
|
}
|
|
|
|
// SendBeaconBlocksByRootRequest sends BeaconBlocksByRoot and returns fetched blocks, if any.
|
|
func SendBeaconBlocksByRootRequest(
|
|
ctx context.Context, p2pProvider p2p.P2P, pid peer.ID,
|
|
req *p2ptypes.BeaconBlockByRootsReq, blockProcessor BeaconBlockProcessor,
|
|
) ([]*ethpb.SignedBeaconBlock, error) {
|
|
stream, err := p2pProvider.Send(ctx, req, p2p.RPCBlocksByRootTopic, pid)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer closeStream(stream, log)
|
|
|
|
// Augment block processing function, if non-nil block processor is provided.
|
|
blocks := make([]*ethpb.SignedBeaconBlock, 0, len(*req))
|
|
process := func(block *ethpb.SignedBeaconBlock) error {
|
|
blocks = append(blocks, block)
|
|
if blockProcessor != nil {
|
|
return blockProcessor(block)
|
|
}
|
|
return nil
|
|
}
|
|
for i := 0; i < len(*req); i++ {
|
|
// Exit if peer sends more than max request blocks.
|
|
if uint64(i) >= params.BeaconNetworkConfig().MaxRequestBlocks {
|
|
break
|
|
}
|
|
isFirstChunk := i == 0
|
|
blk, err := ReadChunkedBlock(stream, p2pProvider, isFirstChunk)
|
|
if errors.Is(err, io.EOF) {
|
|
break
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := process(blk); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
return blocks, nil
|
|
}
|