prysm-pulse/beacon-chain/sync/rpc_send_request.go
Steven Allen 2428880058
Update go-libp2p to 0.12.0 (#8015)
* Update go-libp2p to 0.12.0

go-libp2p 0.12.0 made some significant changes to the stream interfaces around
stream closing:

* Close now closes in both directions and frees the stream. However, unlike
FullClose did, it doesn't _wait_ for the remote peer to respond with an EOF.
* To close for writing, call CloseWrite (like one would on a TCP connection, etc.).

This patch:

* Replaces calls to FullClose with Close where appropriate.
* Replaces calls to Close with CloseWrite where appropriate.
* Removes redundant Close calls.
* Calls Reset to where appropriate to indicate that the request/response was
  aborted. Unlike Close, this will not flush and will not cause the remote peer
  to read an EOF. Instead, the remote peer will read an ErrReset error.
* Ensures we always either close or reset streams. Send wasn't closing the
  stream on some error paths.
* Now that stream closing is async, we explicitly wait for a response when
  "hanging up" on a peer (so we don't hang up before they receive our
  response/goodbye message).

* update bazel

* Gazelle

* revert unintentional bazel workspace change

* appease an overzealous linter

* update to latest

* Refactor encoder

* gazelle

* Gazelle

Co-authored-by: Preston Van Loon <preston@prysmaticlabs.com>
Co-authored-by: Nishant Das <nishdas93@gmail.com>
Co-authored-by: Raul Jordan <raul@prysmaticlabs.com>
2020-12-14 17:22:25 +00:00

115 lines
3.7 KiB
Go

package sync
import (
"context"
"io"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/pkg/errors"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/types"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/params"
)
// ErrInvalidFetchedData is thrown if stream fails to provide requested blocks.
var ErrInvalidFetchedData = errors.New("invalid data returned from peer")
// BeaconBlockProcessor defines a block processing function, which allows to start utilizing
// blocks even before all blocks are ready.
type BeaconBlockProcessor func(block *ethpb.SignedBeaconBlock) error
// SendBeaconBlocksByRangeRequest sends BeaconBlocksByRange and returns fetched blocks, if any.
func SendBeaconBlocksByRangeRequest(
ctx context.Context, p2pProvider p2p.P2P, pid peer.ID,
req *pb.BeaconBlocksByRangeRequest, blockProcessor BeaconBlockProcessor,
) ([]*ethpb.SignedBeaconBlock, error) {
stream, err := p2pProvider.Send(ctx, req, p2p.RPCBlocksByRangeTopic, pid)
if err != nil {
return nil, err
}
defer closeStream(stream, log)
// Augment block processing function, if non-nil block processor is provided.
blocks := make([]*ethpb.SignedBeaconBlock, 0, req.Count)
process := func(blk *ethpb.SignedBeaconBlock) error {
blocks = append(blocks, blk)
if blockProcessor != nil {
return blockProcessor(blk)
}
return nil
}
var prevSlot uint64
for i := uint64(0); ; i++ {
isFirstChunk := i == 0
blk, err := ReadChunkedBlock(stream, p2pProvider, isFirstChunk)
if errors.Is(err, io.EOF) {
break
}
if err != nil {
return nil, err
}
// The response MUST contain no more than `count` blocks, and no more than
// MAX_REQUEST_BLOCKS blocks.
if i >= req.Count || i >= params.BeaconNetworkConfig().MaxRequestBlocks {
return nil, ErrInvalidFetchedData
}
// Returned blocks MUST be in the slot range [start_slot, start_slot + count * step).
if blk.Block.Slot < req.StartSlot || blk.Block.Slot >= req.StartSlot+req.Count*req.Step {
return nil, ErrInvalidFetchedData
}
// Returned blocks, where they exist, MUST be sent in a consecutive order.
// Consecutive blocks MUST have values in `step` increments (slots may be skipped in between).
if !isFirstChunk && (prevSlot >= blk.Block.Slot || (blk.Block.Slot-prevSlot)%req.Step != 0) {
return nil, ErrInvalidFetchedData
}
prevSlot = blk.Block.Slot
if err := process(blk); err != nil {
return nil, err
}
}
return blocks, nil
}
// SendBeaconBlocksByRootRequest sends BeaconBlocksByRoot and returns fetched blocks, if any.
func SendBeaconBlocksByRootRequest(
ctx context.Context, p2pProvider p2p.P2P, pid peer.ID,
req *types.BeaconBlockByRootsReq, blockProcessor BeaconBlockProcessor,
) ([]*ethpb.SignedBeaconBlock, error) {
stream, err := p2pProvider.Send(ctx, req, p2p.RPCBlocksByRootTopic, pid)
if err != nil {
return nil, err
}
defer closeStream(stream, log)
// Augment block processing function, if non-nil block processor is provided.
blocks := make([]*ethpb.SignedBeaconBlock, 0, len(*req))
process := func(block *ethpb.SignedBeaconBlock) error {
blocks = append(blocks, block)
if blockProcessor != nil {
return blockProcessor(block)
}
return nil
}
for i := 0; i < len(*req); i++ {
// Exit if peer sends more than max request blocks.
if uint64(i) >= params.BeaconNetworkConfig().MaxRequestBlocks {
break
}
isFirstChunk := i == 0
blk, err := ReadChunkedBlock(stream, p2pProvider, isFirstChunk)
if errors.Is(err, io.EOF) {
break
}
if err != nil {
return nil, err
}
if err := process(blk); err != nil {
return nil, err
}
}
return blocks, nil
}