diff --git a/beacon-chain/sync/rpc_beacon_blocks_by_root.go b/beacon-chain/sync/rpc_beacon_blocks_by_root.go index 1eaa46163..c684942f3 100644 --- a/beacon-chain/sync/rpc_beacon_blocks_by_root.go +++ b/beacon-chain/sync/rpc_beacon_blocks_by_root.go @@ -11,6 +11,8 @@ import ( "github.com/prysmaticlabs/prysm/v4/config/params" "github.com/prysmaticlabs/prysm/v4/consensus-types/blocks" "github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces" + eth "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" + "github.com/prysmaticlabs/prysm/v4/runtime/version" ) // sendRecentBeaconBlocksRequest sends a recent beacon blocks request to a peer to get @@ -19,7 +21,7 @@ func (s *Service) sendRecentBeaconBlocksRequest(ctx context.Context, blockRoots ctx, cancel := context.WithTimeout(ctx, respTimeout) defer cancel() - _, err := SendBeaconBlocksByRootRequest(ctx, s.cfg.clock, s.cfg.p2p, id, blockRoots, func(blk interfaces.ReadOnlySignedBeaconBlock) error { + blks, err := SendBeaconBlocksByRootRequest(ctx, s.cfg.clock, s.cfg.p2p, id, blockRoots, func(blk interfaces.ReadOnlySignedBeaconBlock) error { blkRoot, err := blk.Block().HashTreeRoot() if err != nil { return err @@ -31,6 +33,21 @@ func (s *Service) sendRecentBeaconBlocksRequest(ctx context.Context, blockRoots } return nil }) + + for _, blk := range blks { + // Skip blocks before deneb because they have no blob. + if blk.Version() < version.Deneb { + continue + } + blkRoot, err := blk.Block().HashTreeRoot() + if err != nil { + return err + } + if err := s.requestPendingBlobs(ctx, blk.Block(), blkRoot[:], id); err != nil { + return err + } + } + return err } @@ -96,3 +113,37 @@ func (s *Service) beaconBlocksRootRPCHandler(ctx context.Context, msg interface{ closeStream(stream, log) return nil } + +func (s *Service) requestPendingBlobs(ctx context.Context, b interfaces.ReadOnlyBeaconBlock, br []byte, id peer.ID) error { + // Block before deneb has no blob. + if b.Version() < version.Deneb { + return nil + } + c, err := b.Body().BlobKzgCommitments() + if err != nil { + return err + } + // No op if the block has no blob commitments. + if len(c) == 0 { + return nil + } + + // Build request for blob sidecars. + blobId := make([]*eth.BlobIdentifier, len(c)) + for i := range c { + blobId[i] = ð.BlobIdentifier{Index: uint64(i), BlockRoot: br} + } + + ctxByte, err := ContextByteVersionsForValRoot(s.cfg.chain.GenesisValidatorsRoot()) + if err != nil { + return err + } + req := types.BlobSidecarsByRootReq(blobId) + + // Send request to a random peer. + blobSidecars, err := SendBlobSidecarByRoot(ctx, s.cfg.clock, s.cfg.p2p, id, ctxByte, &req) + if err != nil { + return err + } + return s.cfg.beaconDB.SaveBlobSidecar(ctx, blobSidecars) +} diff --git a/beacon-chain/sync/rpc_beacon_blocks_by_root_test.go b/beacon-chain/sync/rpc_beacon_blocks_by_root_test.go index 263f44cfa..026e5b9e0 100644 --- a/beacon-chain/sync/rpc_beacon_blocks_by_root_test.go +++ b/beacon-chain/sync/rpc_beacon_blocks_by_root_test.go @@ -9,6 +9,7 @@ import ( "github.com/ethereum/go-ethereum/common" gethTypes "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/p2p/enr" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/protocol" gcache "github.com/patrickmn/go-cache" @@ -17,6 +18,7 @@ import ( db "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/testing" mockExecution "github.com/prysmaticlabs/prysm/v4/beacon-chain/execution/testing" "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/peers" p2ptest "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/testing" p2pTypes "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/types" "github.com/prysmaticlabs/prysm/v4/beacon-chain/startup" @@ -288,3 +290,46 @@ func TestRecentBeaconBlocksRPCHandler_HandleZeroBlocks(t *testing.T) { require.NoError(t, err) assert.Equal(t, 1, int(lter.Count(stream1.Conn().RemotePeer().String()))) } + +func TestRequestPendingBlobs(t *testing.T) { + s := &Service{} + t.Run("old block should not fail", func(t *testing.T) { + b, err := blocks.NewBeaconBlock(util.NewBeaconBlock().Block) + require.NoError(t, err) + require.NoError(t, s.requestPendingBlobs(context.Background(), b, []byte{}, "test")) + }) + t.Run("empty commitment block should not fail", func(t *testing.T) { + b, err := blocks.NewBeaconBlock(util.NewBeaconBlockDeneb().Block) + require.NoError(t, err) + require.NoError(t, s.requestPendingBlobs(context.Background(), b, []byte{}, "test")) + }) + t.Run("unsupported protocol", func(t *testing.T) { + p1 := p2ptest.NewTestP2P(t) + p2 := p2ptest.NewTestP2P(t) + p1.Connect(p2) + require.Equal(t, 1, len(p1.BHost.Network().Peers())) + chain := &mock.ChainService{ + FinalizedCheckPoint: ðpb.Checkpoint{ + Epoch: 1, + Root: make([]byte, 32), + }, + ValidatorsRoot: [32]byte{}, + Genesis: time.Now(), + } + p1.Peers().Add(new(enr.Record), p2.PeerID(), nil, network.DirOutbound) + p1.Peers().SetConnectionState(p2.PeerID(), peers.PeerConnected) + p1.Peers().SetChainState(p2.PeerID(), ðpb.Status{FinalizedEpoch: 1}) + s := &Service{ + cfg: &config{ + p2p: p1, + chain: chain, + clock: startup.NewClock(time.Unix(0, 0), [32]byte{}), + }, + } + b := util.NewBeaconBlockDeneb() + b.Block.Body.BlobKzgCommitments = make([][]byte, 1) + b1, err := blocks.NewBeaconBlock(b.Block) + require.NoError(t, err) + require.ErrorContains(t, "protocols not supported", s.requestPendingBlobs(context.Background(), b1, []byte{}, p2.PeerID())) + }) +}