package sync import ( "context" libp2pcore "github.com/libp2p/go-libp2p-core" "github.com/libp2p/go-libp2p-core/peer" "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/beacon-chain/p2p/types" "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/block" "github.com/prysmaticlabs/prysm/shared/params" ) // sendRecentBeaconBlocksRequest sends a recent beacon blocks request to a peer to get // those corresponding blocks from that peer. func (s *Service) sendRecentBeaconBlocksRequest(ctx context.Context, blockRoots *types.BeaconBlockByRootsReq, id peer.ID) error { ctx, cancel := context.WithTimeout(ctx, respTimeout) defer cancel() _, err := SendBeaconBlocksByRootRequest(ctx, s.cfg.Chain, s.cfg.P2P, id, blockRoots, func(blk block.SignedBeaconBlock) error { blkRoot, err := blk.Block().HashTreeRoot() if err != nil { return err } s.pendingQueueLock.Lock() if err := s.insertBlockToPendingQueue(blk.Block().Slot(), blk, blkRoot); err != nil { return err } s.pendingQueueLock.Unlock() return nil }) return err } // beaconBlocksRootRPCHandler looks up the request blocks from the database from the given block roots. func (s *Service) beaconBlocksRootRPCHandler(ctx context.Context, msg interface{}, stream libp2pcore.Stream) error { ctx, cancel := context.WithTimeout(ctx, ttfbTimeout) defer cancel() SetRPCStreamDeadlines(stream) log := log.WithField("handler", "beacon_blocks_by_root") rawMsg, ok := msg.(*types.BeaconBlockByRootsReq) if !ok { return errors.New("message is not type BeaconBlockByRootsReq") } blockRoots := *rawMsg if err := s.rateLimiter.validateRequest(stream, uint64(len(blockRoots))); err != nil { return err } if len(blockRoots) == 0 { // Add to rate limiter in the event no // roots are requested. s.rateLimiter.add(stream, 1) s.writeErrorResponseToStream(responseCodeInvalidRequest, "no block roots provided in request", stream) return errors.New("no block roots provided") } if uint64(len(blockRoots)) > params.BeaconNetworkConfig().MaxRequestBlocks { s.writeErrorResponseToStream(responseCodeInvalidRequest, "requested more than the max block limit", stream) return errors.New("requested more than the max block limit") } s.rateLimiter.add(stream, int64(len(blockRoots))) for _, root := range blockRoots { blk, err := s.cfg.DB.Block(ctx, root) if err != nil { log.WithError(err).Debug("Could not fetch block") s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream) return err } if blk == nil || blk.IsNil() { continue } if err := s.chunkWriter(stream, blk.Proto()); err != nil { return err } } closeStream(stream, log) return nil }