prysm-pulse/beacon-chain/sync/rpc_blob_sidecars_by_root.go
2023-11-01 10:03:49 -07:00

133 lines
5.0 KiB
Go

package sync
import (
"context"
"sort"
"time"
libp2pcore "github.com/libp2p/go-libp2p/core"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/db"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/types"
"github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/v4/config/params"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v4/monitoring/tracing"
eth "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v4/time/slots"
"go.opencensus.io/trace"
)
func blobMinReqEpoch(finalized, current primitives.Epoch) primitives.Epoch {
// max(finalized_epoch, current_epoch - MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS, DENEB_FORK_EPOCH)
denebFork := params.BeaconConfig().DenebForkEpoch
var reqWindow primitives.Epoch
if current > params.BeaconNetworkConfig().MinEpochsForBlobsSidecarsRequest {
reqWindow = current - params.BeaconNetworkConfig().MinEpochsForBlobsSidecarsRequest
}
if finalized >= reqWindow && finalized > denebFork {
return finalized
}
if reqWindow >= finalized && reqWindow > denebFork {
return reqWindow
}
return denebFork
}
// blobSidecarByRootRPCHandler handles the /eth2/beacon_chain/req/blob_sidecars_by_root/1/ RPC request.
// spec: https://github.com/ethereum/consensus-specs/blob/a7e45db9ac2b60a33e144444969ad3ac0aae3d4c/specs/deneb/p2p-interface.md#blobsidecarsbyroot-v1
func (s *Service) blobSidecarByRootRPCHandler(ctx context.Context, msg interface{}, stream libp2pcore.Stream) error {
ctx, span := trace.StartSpan(ctx, "sync.blobSidecarByRootRPCHandler")
defer span.End()
ctx, cancel := context.WithTimeout(ctx, ttfbTimeout)
defer cancel()
SetRPCStreamDeadlines(stream)
log := log.WithField("handler", p2p.BlobSidecarsByRootName[1:]) // slice the leading slash off the name var
ref, ok := msg.(*types.BlobSidecarsByRootReq)
if !ok {
return errors.New("message is not type BlobSidecarsByRootReq")
}
blobIdents := *ref
if err := validateBlobByRootRequest(blobIdents); err != nil {
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
s.writeErrorResponseToStream(responseCodeInvalidRequest, err.Error(), stream)
return err
}
// Sort the identifiers so that requests for the same blob root will be adjacent, minimizing db lookups.
sort.Sort(blobIdents)
batchSize := flags.Get().BlobBatchLimit
var ticker *time.Ticker
if len(blobIdents) > batchSize {
ticker = time.NewTicker(time.Second)
}
minReqEpoch := blobMinReqEpoch(s.cfg.chain.FinalizedCheckpt().Epoch, slots.ToEpoch(s.cfg.clock.CurrentSlot()))
buff := struct {
root [32]byte
scs []*eth.DeprecatedBlobSidecar
}{}
for i := range blobIdents {
if err := ctx.Err(); err != nil {
closeStream(stream, log)
return err
}
// Throttle request processing to no more than batchSize/sec.
if i != 0 && i%batchSize == 0 && ticker != nil {
<-ticker.C
}
s.rateLimiter.add(stream, 1)
root, idx := bytesutil.ToBytes32(blobIdents[i].BlockRoot), blobIdents[i].Index
if root != buff.root {
scs, err := s.cfg.beaconDB.BlobSidecarsByRoot(ctx, root)
buff.root, buff.scs = root, scs
if err != nil {
if errors.Is(err, db.ErrNotFound) {
// In case db error path gave us a non-nil value, make sure that other indices for the problem root
// are not processed when we reenter the outer loop.
buff.scs = nil
log.WithError(err).Debugf("BlobSidecar not found in db, root=%x, index=%d", root, idx)
continue
}
log.WithError(err).Errorf("unexpected db error retrieving BlobSidecar, root=%x, index=%d", root, idx)
s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream)
return err
}
}
if idx >= uint64(len(buff.scs)) {
continue
}
sc := buff.scs[idx]
// If any root in the request content references a block earlier than minimum_request_epoch,
// peers MAY respond with error code 3: ResourceUnavailable or not include the blob in the response.
if slots.ToEpoch(sc.Slot) < minReqEpoch {
s.writeErrorResponseToStream(responseCodeResourceUnavailable, types.ErrBlobLTMinRequest.Error(), stream)
log.WithError(types.ErrBlobLTMinRequest).
Debugf("requested blob for block %#x before minimum_request_epoch", blobIdents[i].BlockRoot)
return types.ErrBlobLTMinRequest
}
SetStreamWriteDeadline(stream, defaultWriteDuration)
if chunkErr := WriteBlobSidecarChunk(stream, s.cfg.chain, s.cfg.p2p.Encoding(), sc); chunkErr != nil {
log.WithError(chunkErr).Debug("Could not send a chunked response")
s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream)
tracing.AnnotateError(span, chunkErr)
return chunkErr
}
}
closeStream(stream, log)
return nil
}
func validateBlobByRootRequest(blobIdents types.BlobSidecarsByRootReq) error {
if uint64(len(blobIdents)) > params.BeaconNetworkConfig().MaxRequestBlobSidecars {
return types.ErrMaxBlobReqExceeded
}
return nil
}