prysm-pulse/beacon-chain/rpc/prysm/v1alpha1/validator/proposer.go
james-prysm 83af9a5694
Beacon API: update Deneb endpoints after removing blob signing (#13235)
* making needed changes to beacon API based on removal of blobsidecar from block contents

* fixing tests and reverting some changes to be addressed later

* fixing generated code from protos

* gaz

* fixing get blob handler and adding blob storage to the blob service

* updating unit tests

* WIP

* wip tests

* got tests passing but needs cleanup

* removing gomod and gosum changes

* fixing more tests

* fixing more tests

* fixing more tests

* gaz

* moving some proto types around

* removing unneeded unit test

* fixing proposer paths

* adding more tests

* fixing more tests

* improving more unit tests

* updating one blob only unit test

* changing arguments of buildBlobSidecar

* reverting a change based on feedback

* terence's review items

* fixing test based on new develop changes

* radek's comments

* addressed more comments from radek

* adding in blobs to test data

* fixing casing in test

* removing extra line

* fixing issue from bad merge

* Update beacon-chain/rpc/eth/beacon/handlers_test.go

Co-authored-by: Radosław Kapka <rkapka@wp.pl>

* Update beacon-chain/rpc/eth/beacon/handlers_test.go

Co-authored-by: Radosław Kapka <rkapka@wp.pl>

* Update beacon-chain/rpc/eth/beacon/handlers_test.go

Co-authored-by: Radosław Kapka <rkapka@wp.pl>

* Update beacon-chain/rpc/eth/blob/handlers.go

Co-authored-by: Radosław Kapka <rkapka@wp.pl>

* moving core getblob business logic to blocker based on radek's comment

* fixing mock blocker

* gaz

---------

Co-authored-by: Radosław Kapka <rkapka@wp.pl>
2023-12-07 17:37:11 +00:00

387 lines
14 KiB
Go

package validator
import (
"context"
"encoding/hex"
"fmt"
"strings"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
emptypb "github.com/golang/protobuf/ptypes/empty"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/builder"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed"
blockfeed "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed/block"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/transition"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/db/kv"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state"
"github.com/prysmaticlabs/prysm/v4/config/params"
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v4/runtime/version"
"github.com/prysmaticlabs/prysm/v4/time/slots"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// eth1DataNotification is a latch to stop flooding logs with the same warning.
var eth1DataNotification bool
const (
// CouldNotDecodeBlock means that a signed beacon block couldn't be created from the block present in the request.
CouldNotDecodeBlock = "Could not decode block"
eth1dataTimeout = 2 * time.Second
)
// GetBeaconBlock is called by a proposer during its assigned slot to request a block to sign
// by passing in the slot and the signed randao reveal of the slot.
func (vs *Server) GetBeaconBlock(ctx context.Context, req *ethpb.BlockRequest) (*ethpb.GenericBeaconBlock, error) {
ctx, span := trace.StartSpan(ctx, "ProposerServer.GetBeaconBlock")
defer span.End()
span.AddAttributes(trace.Int64Attribute("slot", int64(req.Slot)))
t, err := slots.ToTime(uint64(vs.TimeFetcher.GenesisTime().Unix()), req.Slot)
if err != nil {
log.WithError(err).Error("Could not convert slot to time")
}
log.WithFields(logrus.Fields{
"slot": req.Slot,
"sinceSlotStartTime": time.Since(t),
}).Info("Begin building block")
// A syncing validator should not produce a block.
if vs.SyncChecker.Syncing() {
return nil, status.Error(codes.Unavailable, "Syncing to latest head, not ready to respond")
}
// process attestations and update head in forkchoice
vs.ForkchoiceFetcher.UpdateHead(ctx, vs.TimeFetcher.CurrentSlot())
headRoot := vs.ForkchoiceFetcher.CachedHeadRoot()
parentRoot := vs.ForkchoiceFetcher.GetProposerHead()
if parentRoot != headRoot {
blockchain.LateBlockAttemptedReorgCount.Inc()
}
// An optimistic validator MUST NOT produce a block (i.e., sign across the DOMAIN_BEACON_PROPOSER domain).
if slots.ToEpoch(req.Slot) >= params.BeaconConfig().BellatrixForkEpoch {
if err := vs.optimisticStatus(ctx); err != nil {
return nil, status.Errorf(codes.Unavailable, "Validator is not ready to propose: %v", err)
}
}
sBlk, err := getEmptyBlock(req.Slot)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not prepare block: %v", err)
}
head, err := vs.HeadFetcher.HeadState(ctx)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not get head state: %v", err)
}
head, err = transition.ProcessSlotsUsingNextSlotCache(ctx, head, parentRoot[:], req.Slot)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not process slots up to %d: %v", req.Slot, err)
}
// Set slot, graffiti, randao reveal, and parent root.
sBlk.SetSlot(req.Slot)
sBlk.SetGraffiti(req.Graffiti)
sBlk.SetRandaoReveal(req.RandaoReveal)
sBlk.SetParentRoot(parentRoot[:])
// Set proposer index.
idx, err := helpers.BeaconProposerIndex(ctx, head)
if err != nil {
return nil, fmt.Errorf("could not calculate proposer index %v", err)
}
sBlk.SetProposerIndex(idx)
if err = vs.BuildBlockParallel(ctx, sBlk, head, false); err != nil {
return nil, errors.Wrap(err, "could not build block in parallel")
}
sr, err := vs.computeStateRoot(ctx, sBlk)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not compute state root: %v", err)
}
sBlk.SetStateRoot(sr)
log.WithFields(logrus.Fields{
"slot": req.Slot,
"sinceSlotStartTime": time.Since(t),
"validator": sBlk.Block().ProposerIndex(),
}).Info("Finished building block")
// Blob cache is updated after BuildBlockParallel
return vs.constructGenericBeaconBlock(sBlk, bundleCache.get(req.Slot))
}
func (vs *Server) BuildBlockParallel(ctx context.Context, sBlk interfaces.SignedBeaconBlock, head state.BeaconState, skipMevBoost bool) error {
// Build consensus fields in background
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
// Set eth1 data.
eth1Data, err := vs.eth1DataMajorityVote(ctx, head)
if err != nil {
eth1Data = &ethpb.Eth1Data{DepositRoot: params.BeaconConfig().ZeroHash[:], BlockHash: params.BeaconConfig().ZeroHash[:]}
log.WithError(err).Error("Could not get eth1data")
}
sBlk.SetEth1Data(eth1Data)
// Set deposit and attestation.
deposits, atts, err := vs.packDepositsAndAttestations(ctx, head, eth1Data) // TODO: split attestations and deposits
if err != nil {
sBlk.SetDeposits([]*ethpb.Deposit{})
sBlk.SetAttestations([]*ethpb.Attestation{})
log.WithError(err).Error("Could not pack deposits and attestations")
} else {
sBlk.SetDeposits(deposits)
sBlk.SetAttestations(atts)
}
// Set slashings.
validProposerSlashings, validAttSlashings := vs.getSlashings(ctx, head)
sBlk.SetProposerSlashings(validProposerSlashings)
sBlk.SetAttesterSlashings(validAttSlashings)
// Set exits.
sBlk.SetVoluntaryExits(vs.getExits(head, sBlk.Block().Slot()))
// Set sync aggregate. New in Altair.
vs.setSyncAggregate(ctx, sBlk)
// Set bls to execution change. New in Capella.
vs.setBlsToExecData(sBlk, head)
}()
localPayload, overrideBuilder, err := vs.getLocalPayload(ctx, sBlk.Block(), head)
if err != nil {
return status.Errorf(codes.Internal, "Could not get local payload: %v", err)
}
// There's no reason to try to get a builder bid if local override is true.
var builderPayload interfaces.ExecutionData
var builderKzgCommitments [][]byte
overrideBuilder = overrideBuilder || skipMevBoost // Skip using mev-boost if requested by the caller.
if !overrideBuilder {
builderPayload, builderKzgCommitments, err = vs.getBuilderPayloadAndBlobs(ctx, sBlk.Block().Slot(), sBlk.Block().ProposerIndex())
if err != nil {
builderGetPayloadMissCount.Inc()
log.WithError(err).Error("Could not get builder payload")
}
}
if err := setExecutionData(ctx, sBlk, localPayload, builderPayload, builderKzgCommitments); err != nil {
return status.Errorf(codes.Internal, "Could not set execution data: %v", err)
}
wg.Wait() // Wait until block is built via consensus and execution fields.
return nil
}
// ProposeBeaconBlock is called by a proposer during its assigned slot to create a block in an attempt
// to get it processed by the beacon node as the canonical head.
func (vs *Server) ProposeBeaconBlock(ctx context.Context, req *ethpb.GenericSignedBeaconBlock) (*ethpb.ProposeResponse, error) {
ctx, span := trace.StartSpan(ctx, "ProposerServer.ProposeBeaconBlock")
defer span.End()
blk, err := blocks.NewSignedBeaconBlock(req.Block)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "%s: %v", CouldNotDecodeBlock, err)
}
unblinder, err := newUnblinder(blk, vs.BlockBuilder)
if err != nil {
return nil, errors.Wrap(err, "could not create unblinder")
}
blinded := unblinder.b.IsBlinded() //
var scs []*ethpb.BlobSidecar
blk, scs, err = unblinder.unblindBuilderBlock(ctx)
if err != nil {
return nil, errors.Wrap(err, "could not unblind builder block")
}
// Broadcast the new block to the network.
blkPb, err := blk.Proto()
if err != nil {
return nil, errors.Wrap(err, "could not get protobuf block")
}
if err := vs.P2P.Broadcast(ctx, blkPb); err != nil {
return nil, fmt.Errorf("could not broadcast block: %v", err)
}
root, err := blk.Block().HashTreeRoot()
if err != nil {
return nil, fmt.Errorf("could not tree hash block: %v", err)
}
log.WithFields(logrus.Fields{
"blockRoot": hex.EncodeToString(root[:]),
}).Debug("Broadcasting block")
if blk.Version() >= version.Deneb {
if !blinded {
dbBlockContents := req.GetDeneb()
if dbBlockContents == nil {
return nil, errors.New("signed beacon block contents is empty")
}
scs, err = buildBlobSidecars(blk, dbBlockContents.Blobs, dbBlockContents.KzgProofs)
if err != nil {
return nil, fmt.Errorf("could not build blob sidecars: %v", err)
}
}
for i, sc := range scs {
if err := vs.P2P.BroadcastBlob(ctx, uint64(i), sc); err != nil {
log.WithError(err).Error("Could not broadcast blob")
}
readOnlySc, err := blocks.NewROBlobWithRoot(sc, root)
if err != nil {
return nil, fmt.Errorf("could not create ROBlob: %v", err)
}
verifiedSc := blocks.NewVerifiedROBlob(readOnlySc)
if err := vs.BlobReceiver.ReceiveBlob(ctx, verifiedSc); err != nil {
log.WithError(err).Error("Could not receive blob")
}
}
}
if err := vs.BlockReceiver.ReceiveBlock(ctx, blk, root); err != nil {
return nil, fmt.Errorf("could not process beacon block: %v", err)
}
log.WithField("slot", blk.Block().Slot()).Debugf(
"Block proposal received via RPC")
vs.BlockNotifier.BlockFeed().Send(&feed.Event{
Type: blockfeed.ReceivedBlock,
Data: &blockfeed.ReceivedBlockData{SignedBlock: blk},
})
return &ethpb.ProposeResponse{
BlockRoot: root[:],
}, nil
}
// PrepareBeaconProposer caches and updates the fee recipient for the given proposer.
func (vs *Server) PrepareBeaconProposer(
ctx context.Context, request *ethpb.PrepareBeaconProposerRequest,
) (*emptypb.Empty, error) {
ctx, span := trace.StartSpan(ctx, "validator.PrepareBeaconProposer")
defer span.End()
var feeRecipients []common.Address
var validatorIndices []primitives.ValidatorIndex
newRecipients := make([]*ethpb.PrepareBeaconProposerRequest_FeeRecipientContainer, 0, len(request.Recipients))
for _, r := range request.Recipients {
f, err := vs.BeaconDB.FeeRecipientByValidatorID(ctx, r.ValidatorIndex)
switch {
case errors.Is(err, kv.ErrNotFoundFeeRecipient):
newRecipients = append(newRecipients, r)
case err != nil:
return nil, status.Errorf(codes.Internal, "Could not get fee recipient by validator index: %v", err)
default:
if common.BytesToAddress(r.FeeRecipient) != f {
newRecipients = append(newRecipients, r)
}
}
}
if len(newRecipients) == 0 {
return &emptypb.Empty{}, nil
}
for _, recipientContainer := range newRecipients {
recipient := hexutil.Encode(recipientContainer.FeeRecipient)
if !common.IsHexAddress(recipient) {
return nil, status.Errorf(codes.InvalidArgument, fmt.Sprintf("Invalid fee recipient address: %v", recipient))
}
feeRecipients = append(feeRecipients, common.BytesToAddress(recipientContainer.FeeRecipient))
validatorIndices = append(validatorIndices, recipientContainer.ValidatorIndex)
}
if err := vs.BeaconDB.SaveFeeRecipientsByValidatorIDs(ctx, validatorIndices, feeRecipients); err != nil {
return nil, status.Errorf(codes.Internal, "Could not save fee recipients: %v", err)
}
log.WithFields(logrus.Fields{
"validatorIndices": validatorIndices,
}).Info("Updated fee recipient addresses for validator indices")
return &emptypb.Empty{}, nil
}
// GetFeeRecipientByPubKey returns a fee recipient from the beacon node's settings or db based on a given public key
func (vs *Server) GetFeeRecipientByPubKey(ctx context.Context, request *ethpb.FeeRecipientByPubKeyRequest) (*ethpb.FeeRecipientByPubKeyResponse, error) {
ctx, span := trace.StartSpan(ctx, "validator.GetFeeRecipientByPublicKey")
defer span.End()
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "request was empty")
}
resp, err := vs.ValidatorIndex(ctx, &ethpb.ValidatorIndexRequest{PublicKey: request.PublicKey})
if err != nil {
if strings.Contains(err.Error(), "Could not find validator index") {
return &ethpb.FeeRecipientByPubKeyResponse{
FeeRecipient: params.BeaconConfig().DefaultFeeRecipient.Bytes(),
}, nil
} else {
log.WithError(err).Error("An error occurred while retrieving validator index")
return nil, err
}
}
address, err := vs.BeaconDB.FeeRecipientByValidatorID(ctx, resp.GetIndex())
if err != nil {
if errors.Is(err, kv.ErrNotFoundFeeRecipient) {
return &ethpb.FeeRecipientByPubKeyResponse{
FeeRecipient: params.BeaconConfig().DefaultFeeRecipient.Bytes(),
}, nil
} else {
log.WithError(err).Error("An error occurred while retrieving fee recipient from db")
return nil, status.Errorf(codes.Internal, err.Error())
}
}
return &ethpb.FeeRecipientByPubKeyResponse{
FeeRecipient: address.Bytes(),
}, nil
}
// computeStateRoot computes the state root after a block has been processed through a state transition and
// returns it to the validator client.
func (vs *Server) computeStateRoot(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock) ([]byte, error) {
beaconState, err := vs.StateGen.StateByRoot(ctx, block.Block().ParentRoot())
if err != nil {
return nil, errors.Wrap(err, "could not retrieve beacon state")
}
root, err := transition.CalculateStateRoot(
ctx,
beaconState,
block,
)
if err != nil {
return nil, errors.Wrapf(err, "could not calculate state root at slot %d", beaconState.Slot())
}
log.WithField("beaconStateRoot", fmt.Sprintf("%#x", root)).Debugf("Computed state root")
return root[:], nil
}
// SubmitValidatorRegistrations submits validator registrations.
func (vs *Server) SubmitValidatorRegistrations(ctx context.Context, reg *ethpb.SignedValidatorRegistrationsV1) (*emptypb.Empty, error) {
if vs.BlockBuilder == nil || !vs.BlockBuilder.Configured() {
return &emptypb.Empty{}, status.Errorf(codes.InvalidArgument, "Could not register block builder: %v", builder.ErrNoBuilder)
}
if err := vs.BlockBuilder.RegisterValidator(ctx, reg.Messages); err != nil {
return nil, status.Errorf(codes.InvalidArgument, "Could not register block builder: %v", err)
}
return &emptypb.Empty{}, nil
}