Backfill Blobs (#13595)

* retrieve and save blobs during backfill

* Update beacon-chain/sync/backfill/batch.go

Co-authored-by: Preston Van Loon <pvanloon@offchainlabs.com>

* Update beacon-chain/sync/backfill/blobs.go

Co-authored-by: Preston Van Loon <pvanloon@offchainlabs.com>

* Update beacon-chain/sync/backfill/metrics.go

Co-authored-by: Preston Van Loon <pvanloon@offchainlabs.com>

* make blobSync initialization path a little safer

* use bytes.Equal to avoid extra allocation

* stop using log.Fatal and other little cleanups

* wrap up post blob sync actions in batch mutator

* unit test coverage on verifier

---------

Co-authored-by: Kasey Kirkham <kasey@users.noreply.github.com>
Co-authored-by: Preston Van Loon <pvanloon@offchainlabs.com>
This commit is contained in:
kasey 2024-02-14 14:58:51 -06:00 committed by GitHub
parent 5de22d22bc
commit bb66201c2c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 704 additions and 160 deletions

View File

@ -229,14 +229,6 @@ func New(cliCtx *cli.Context, cancel context.CancelFunc, opts ...Option) (*Beaco
if err != nil {
return nil, errors.Wrap(err, "backfill status initialization error")
}
pa := peers.NewAssigner(beacon.fetchP2P().Peers(), beacon.forkChoicer)
bf, err := backfill.NewService(ctx, bfs, beacon.clockWaiter, beacon.fetchP2P(), pa, beacon.BackfillOpts...)
if err != nil {
return nil, errors.Wrap(err, "error initializing backfill service")
}
if err := beacon.services.RegisterService(bf); err != nil {
return nil, errors.Wrap(err, "error registering backfill service")
}
log.Debugln("Starting State Gen")
if err := beacon.startStateGen(ctx, bfs, beacon.forkChoicer); err != nil {
@ -251,6 +243,16 @@ func New(cliCtx *cli.Context, cancel context.CancelFunc, opts ...Option) (*Beaco
beacon.verifyInitWaiter = verification.NewInitializerWaiter(
beacon.clockWaiter, forkchoice.NewROForkChoice(beacon.forkChoicer), beacon.stateGen)
pa := peers.NewAssigner(beacon.fetchP2P().Peers(), beacon.forkChoicer)
beacon.BackfillOpts = append(beacon.BackfillOpts, backfill.WithVerifierWaiter(beacon.verifyInitWaiter))
bf, err := backfill.NewService(ctx, bfs, beacon.BlobStorage, beacon.clockWaiter, beacon.fetchP2P(), pa, beacon.BackfillOpts...)
if err != nil {
return nil, errors.Wrap(err, "error initializing backfill service")
}
if err := beacon.services.RegisterService(bf); err != nil {
return nil, errors.Wrap(err, "error registering backfill service")
}
log.Debugln("Registering POW Chain Service")
if err := beacon.registerPOWChainService(); err != nil {
return nil, err

View File

@ -5,6 +5,7 @@ go_library(
srcs = [
"batch.go",
"batcher.go",
"blobs.go",
"metrics.go",
"pool.go",
"service.go",
@ -17,12 +18,15 @@ go_library(
deps = [
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/signing:go_default_library",
"//beacon-chain/das:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/db/filesystem:go_default_library",
"//beacon-chain/p2p:go_default_library",
"//beacon-chain/p2p/peers:go_default_library",
"//beacon-chain/startup:go_default_library",
"//beacon-chain/state:go_default_library",
"//beacon-chain/sync:go_default_library",
"//beacon-chain/verification:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",
@ -34,6 +38,7 @@ go_library(
"//proto/dbval:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//runtime:go_default_library",
"//runtime/version:go_default_library",
"//time/slots:go_default_library",
"@com_github_libp2p_go_libp2p//core/peer:go_default_library",
"@com_github_pkg_errors//:go_default_library",
@ -46,7 +51,9 @@ go_library(
go_test(
name = "go_default_test",
srcs = [
"batch_test.go",
"batcher_test.go",
"blobs_test.go",
"pool_test.go",
"service_test.go",
"status_test.go",
@ -56,10 +63,14 @@ go_test(
deps = [
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/signing:go_default_library",
"//beacon-chain/das:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/db/filesystem:go_default_library",
"//beacon-chain/p2p/testing:go_default_library",
"//beacon-chain/startup:go_default_library",
"//beacon-chain/state:go_default_library",
"//beacon-chain/sync:go_default_library",
"//beacon-chain/verification:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",

View File

@ -2,10 +2,14 @@ package backfill
import (
"fmt"
"sort"
"time"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/das"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/sync"
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
eth "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
log "github.com/sirupsen/logrus"
@ -33,6 +37,8 @@ func (s batchState) String() string {
return "import_complete"
case batchEndSequence:
return "end_sequence"
case batchBlobSync:
return "blob_sync"
default:
return "unknown"
}
@ -43,6 +49,7 @@ const (
batchInit
batchSequenced
batchErrRetryable
batchBlobSync
batchImportable
batchImportComplete
batchEndSequence
@ -57,10 +64,13 @@ type batch struct {
retries int
begin primitives.Slot
end primitives.Slot // half-open interval, [begin, end), ie >= start, < end.
results VerifiedROBlocks
results verifiedROBlocks
err error
state batchState
pid peer.ID
busy peer.ID
blockPid peer.ID
blobPid peer.ID
bs *blobSync
}
func (b batch) logFields() log.Fields {
@ -72,7 +82,9 @@ func (b batch) logFields() log.Fields {
"retries": b.retries,
"begin": b.begin,
"end": b.end,
"pid": b.pid,
"busyPid": b.busy,
"blockPid": b.blockPid,
"blobPid": b.blobPid,
}
}
@ -101,7 +113,7 @@ func (b batch) ensureParent(expected [32]byte) error {
return nil
}
func (b batch) request() *eth.BeaconBlocksByRangeRequest {
func (b batch) blockRequest() *eth.BeaconBlocksByRangeRequest {
return &eth.BeaconBlocksByRangeRequest{
StartSlot: b.begin,
Count: uint64(b.end - b.begin),
@ -109,6 +121,32 @@ func (b batch) request() *eth.BeaconBlocksByRangeRequest {
}
}
func (b batch) blobRequest() *eth.BlobSidecarsByRangeRequest {
return &eth.BlobSidecarsByRangeRequest{
StartSlot: b.begin,
Count: uint64(b.end - b.begin),
}
}
func (b batch) withResults(results verifiedROBlocks, bs *blobSync) batch {
b.results = results
b.bs = bs
if bs.blobsNeeded() > 0 {
return b.withState(batchBlobSync)
}
return b.withState(batchImportable)
}
func (b batch) postBlobSync() batch {
if b.blobsNeeded() > 0 {
log.WithFields(b.logFields()).WithField("blobs_missing", b.blobsNeeded()).Error("batch still missing blobs after downloading from peer")
b.bs = nil
b.results = []blocks.ROBlock{}
return b.withState(batchErrRetryable)
}
return b.withState(batchImportable)
}
func (b batch) withState(s batchState) batch {
if s == batchSequenced {
b.scheduled = time.Now()
@ -130,7 +168,7 @@ func (b batch) withState(s batchState) batch {
}
func (b batch) withPeer(p peer.ID) batch {
b.pid = p
b.blockPid = p
backfillBatchTimeWaiting.Observe(float64(time.Since(b.scheduled).Milliseconds()))
return b
}
@ -139,3 +177,21 @@ func (b batch) withRetryableError(err error) batch {
b.err = err
return b.withState(batchErrRetryable)
}
func (b batch) blobsNeeded() int {
return b.bs.blobsNeeded()
}
func (b batch) blobResponseValidator() sync.BlobResponseValidation {
return b.bs.validateNext
}
func (b batch) availabilityStore() das.AvailabilityStore {
return b.bs.store
}
func sortBatchDesc(bb []batch) {
sort.Slice(bb, func(i, j int) bool {
return bb[j].end < bb[i].end
})
}

View File

@ -0,0 +1,21 @@
package backfill
import (
"testing"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v4/testing/require"
)
func TestSortBatchDesc(t *testing.T) {
orderIn := []primitives.Slot{100, 10000, 1}
orderOut := []primitives.Slot{10000, 100, 1}
batches := make([]batch, len(orderIn))
for i := range orderIn {
batches[i] = batch{end: orderIn[i]}
}
sortBatchDesc(batches)
for i := range orderOut {
require.Equal(t, orderOut[i], batches[i].end)
}
}

View File

@ -0,0 +1,141 @@
package backfill
import (
"bytes"
"context"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/das"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/db/filesystem"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/verification"
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
)
var (
errUnexpectedResponseSize = errors.New("received more blobs than expected for the requested range")
errUnexpectedCommitment = errors.New("BlobSidecar commitment does not match block")
errUnexpectedResponseContent = errors.New("BlobSidecar response does not include expected values in expected order")
errBatchVerifierMismatch = errors.New("the list of blocks passed to the availability check does not match what was verified")
)
type blobSummary struct {
blockRoot [32]byte
index uint64
commitment [48]byte
signature [fieldparams.BLSSignatureLength]byte
}
type blobSyncConfig struct {
retentionStart primitives.Slot
nbv verification.NewBlobVerifier
store *filesystem.BlobStorage
}
func newBlobSync(current primitives.Slot, vbs verifiedROBlocks, cfg *blobSyncConfig) (*blobSync, error) {
expected, err := vbs.blobIdents(cfg.retentionStart)
if err != nil {
return nil, err
}
bbv := newBlobBatchVerifier(cfg.nbv)
as := das.NewLazilyPersistentStore(cfg.store, bbv)
return &blobSync{current: current, expected: expected, bbv: bbv, store: as}, nil
}
type blobVerifierMap map[[32]byte][fieldparams.MaxBlobsPerBlock]verification.BlobVerifier
type blobSync struct {
store das.AvailabilityStore
expected []blobSummary
next int
bbv *blobBatchVerifier
current primitives.Slot
}
func (bs *blobSync) blobsNeeded() int {
return len(bs.expected) - bs.next
}
func (bs *blobSync) validateNext(rb blocks.ROBlob) error {
if bs.next >= len(bs.expected) {
return errUnexpectedResponseSize
}
next := bs.expected[bs.next]
bs.next += 1
// Get the super cheap verifications out of the way before we init a verifier.
if next.blockRoot != rb.BlockRoot() {
return errors.Wrapf(errUnexpectedResponseContent, "next expected root=%#x, saw=%#x", next.blockRoot, rb.BlockRoot())
}
if next.index != rb.Index {
return errors.Wrapf(errUnexpectedResponseContent, "next expected root=%#x, saw=%#x for root=%#x", next.index, rb.Index, next.blockRoot)
}
if next.commitment != bytesutil.ToBytes48(rb.KzgCommitment) {
return errors.Wrapf(errUnexpectedResponseContent, "next expected commitment=%#x, saw=%#x for root=%#x", next.commitment, rb.KzgCommitment, rb.BlockRoot())
}
if bytesutil.ToBytes96(rb.SignedBlockHeader.Signature) != next.signature {
return verification.ErrInvalidProposerSignature
}
v := bs.bbv.newVerifier(rb)
if err := v.BlobIndexInBounds(); err != nil {
return err
}
v.SatisfyRequirement(verification.RequireValidProposerSignature)
if err := v.SidecarInclusionProven(); err != nil {
return err
}
if err := v.SidecarKzgProofVerified(); err != nil {
return err
}
if err := bs.store.Persist(bs.current, rb); err != nil {
return err
}
return nil
}
func newBlobBatchVerifier(nbv verification.NewBlobVerifier) *blobBatchVerifier {
return &blobBatchVerifier{newBlobVerifier: nbv, verifiers: make(blobVerifierMap)}
}
type blobBatchVerifier struct {
newBlobVerifier verification.NewBlobVerifier
verifiers blobVerifierMap
}
func (bbv *blobBatchVerifier) newVerifier(rb blocks.ROBlob) verification.BlobVerifier {
m := bbv.verifiers[rb.BlockRoot()]
m[rb.Index] = bbv.newBlobVerifier(rb, verification.BackfillSidecarRequirements)
bbv.verifiers[rb.BlockRoot()] = m
return m[rb.Index]
}
func (bbv blobBatchVerifier) VerifiedROBlobs(_ context.Context, blk blocks.ROBlock, _ []blocks.ROBlob) ([]blocks.VerifiedROBlob, error) {
m, ok := bbv.verifiers[blk.Root()]
if !ok {
return nil, errors.Wrapf(verification.ErrMissingVerification, "no record of verifiers for root %#x", blk.Root())
}
c, err := blk.Block().Body().BlobKzgCommitments()
if err != nil {
return nil, errors.Wrapf(errUnexpectedCommitment, "error reading commitments from block root %#x", blk.Root())
}
vbs := make([]blocks.VerifiedROBlob, len(c))
for i := range c {
if m[i] == nil {
return nil, errors.Wrapf(errBatchVerifierMismatch, "do not have verifier for block root %#x idx %d", blk.Root(), i)
}
vb, err := m[i].VerifiedROBlob()
if err != nil {
return nil, err
}
if !bytes.Equal(vb.KzgCommitment, c[i]) {
return nil, errors.Wrapf(errBatchVerifierMismatch, "commitments do not match, verified=%#x da check=%#x for root %#x", vb.KzgCommitment, c[i], vb.BlockRoot())
}
vbs[i] = vb
}
return vbs, nil
}
var _ das.BlobBatchVerifier = &blobBatchVerifier{}

View File

@ -0,0 +1,128 @@
package backfill
import (
"testing"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/db/filesystem"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/verification"
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v4/testing/require"
"github.com/prysmaticlabs/prysm/v4/testing/util"
)
func testBlobGen(t *testing.T, start primitives.Slot, n int) ([]blocks.ROBlock, [][]blocks.ROBlob) {
blks := make([]blocks.ROBlock, n)
blobs := make([][]blocks.ROBlob, n)
for i := 0; i < n; i++ {
bk, bl := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, start+primitives.Slot(i), 3)
blks[i] = bk
blobs[i] = bl
}
return blks, blobs
}
func TestValidateNext_happy(t *testing.T) {
current := primitives.Slot(128)
blks, blobs := testBlobGen(t, 63, 4)
cfg := &blobSyncConfig{
retentionStart: 0,
nbv: testNewBlobVerifier(),
store: filesystem.NewEphemeralBlobStorage(t),
}
bsync, err := newBlobSync(current, blks, cfg)
require.NoError(t, err)
nb := 0
for i := range blobs {
bs := blobs[i]
for ib := range bs {
require.NoError(t, bsync.validateNext(bs[ib]))
nb += 1
}
}
require.Equal(t, nb, bsync.next)
// we should get an error if we read another blob.
require.ErrorIs(t, bsync.validateNext(blobs[0][0]), errUnexpectedResponseSize)
}
func TestValidateNext_cheapErrors(t *testing.T) {
current := primitives.Slot(128)
blks, blobs := testBlobGen(t, 63, 2)
cfg := &blobSyncConfig{
retentionStart: 0,
nbv: testNewBlobVerifier(),
store: filesystem.NewEphemeralBlobStorage(t),
}
bsync, err := newBlobSync(current, blks, cfg)
require.NoError(t, err)
require.ErrorIs(t, bsync.validateNext(blobs[len(blobs)-1][0]), errUnexpectedResponseContent)
}
func TestValidateNext_sigMatch(t *testing.T) {
current := primitives.Slot(128)
blks, blobs := testBlobGen(t, 63, 1)
cfg := &blobSyncConfig{
retentionStart: 0,
nbv: testNewBlobVerifier(),
store: filesystem.NewEphemeralBlobStorage(t),
}
bsync, err := newBlobSync(current, blks, cfg)
require.NoError(t, err)
blobs[0][0].SignedBlockHeader.Signature = bytesutil.PadTo([]byte("derp"), 48)
require.ErrorIs(t, bsync.validateNext(blobs[0][0]), verification.ErrInvalidProposerSignature)
}
func TestValidateNext_errorsFromVerifier(t *testing.T) {
current := primitives.Slot(128)
blks, blobs := testBlobGen(t, 63, 1)
cases := []struct {
name string
err error
cb func(*verification.MockBlobVerifier)
}{
{
name: "index oob",
err: verification.ErrBlobIndexInvalid,
cb: func(v *verification.MockBlobVerifier) {
v.ErrBlobIndexInBounds = verification.ErrBlobIndexInvalid
},
},
{
name: "not inclusion proven",
err: verification.ErrSidecarInclusionProofInvalid,
cb: func(v *verification.MockBlobVerifier) {
v.ErrSidecarInclusionProven = verification.ErrSidecarInclusionProofInvalid
},
},
{
name: "not kzg proof valid",
err: verification.ErrSidecarKzgProofInvalid,
cb: func(v *verification.MockBlobVerifier) {
v.ErrSidecarKzgProofVerified = verification.ErrSidecarKzgProofInvalid
},
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
cfg := &blobSyncConfig{
retentionStart: 0,
nbv: testNewBlobVerifier(c.cb),
store: filesystem.NewEphemeralBlobStorage(t),
}
bsync, err := newBlobSync(current, blks, cfg)
require.NoError(t, err)
require.ErrorIs(t, bsync.validateNext(blobs[0][0]), c.err)
})
}
}
func testNewBlobVerifier(opts ...func(*verification.MockBlobVerifier)) verification.NewBlobVerifier {
return func(b blocks.ROBlob, reqs []verification.Requirement) verification.BlobVerifier {
v := &verification.MockBlobVerifier{}
for i := range opts {
opts[i](v)
}
return v
}
}

View File

@ -3,6 +3,9 @@ package backfill
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/sync"
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces"
)
var (
@ -30,10 +33,28 @@ var (
Help: "Number of backfill batches downloaded and imported.",
},
)
backfillBatchApproximateBytes = promauto.NewCounter(
backfillBlocksApproximateBytes = promauto.NewCounter(
prometheus.CounterOpts{
Name: "backfill_batch_bytes_downloaded",
Help: "Count of bytes downloaded from peers",
Name: "backfill_blocks_bytes_downloaded",
Help: "BeaconBlock bytes downloaded from peers for backfill.",
},
)
backfillBlobsApproximateBytes = promauto.NewCounter(
prometheus.CounterOpts{
Name: "backfill_blobs_bytes_downloaded",
Help: "BlobSidecar bytes downloaded from peers for backfill.",
},
)
backfillBlobsDownloadCount = promauto.NewCounter(
prometheus.CounterOpts{
Name: "backfill_blobs_download_count",
Help: "Number of BlobSidecar values downloaded from peers for backfill.",
},
)
backfillBlocksDownloadCount = promauto.NewCounter(
prometheus.CounterOpts{
Name: "backfill_blocks_download_count",
Help: "Number of BeaconBlock values downloaded from peers for backfill.",
},
)
backfillBatchTimeRoundtrip = promauto.NewHistogram(
@ -50,10 +71,17 @@ var (
Buckets: []float64{50, 100, 300, 1000, 2000},
},
)
backfillBatchTimeDownloading = promauto.NewHistogram(
backfillBatchTimeDownloadingBlocks = promauto.NewHistogram(
prometheus.HistogramOpts{
Name: "backfill_batch_time_download",
Help: "Time batch spent downloading blocks from peer.",
Name: "backfill_batch_blocks_time_download",
Help: "Time, in milliseconds, batch spent downloading blocks from peer.",
Buckets: []float64{100, 300, 1000, 2000, 4000, 8000},
},
)
backfillBatchTimeDownloadingBlobs = promauto.NewHistogram(
prometheus.HistogramOpts{
Name: "backfill_batch_blobs_time_download",
Help: "Time, in milliseconds, batch spent downloading blobs from peer.",
Buckets: []float64{100, 300, 1000, 2000, 4000, 8000},
},
)
@ -65,3 +93,16 @@ var (
},
)
)
func blobValidationMetrics(_ blocks.ROBlob) error {
backfillBlobsDownloadCount.Inc()
return nil
}
func blockValidationMetrics(interfaces.ReadOnlySignedBeaconBlock) error {
backfillBlocksDownloadCount.Inc()
return nil
}
var _ sync.BlobResponseValidation = blobValidationMetrics
var _ sync.BeaconBlockProcessor = blockValidationMetrics

View File

@ -7,15 +7,18 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/db/filesystem"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/peers"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/startup"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/sync"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/verification"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
log "github.com/sirupsen/logrus"
)
type batchWorkerPool interface {
spawn(ctx context.Context, n int, clock *startup.Clock, a PeerAssigner, v *verifier)
spawn(ctx context.Context, n int, clock *startup.Clock, a PeerAssigner, v *verifier, cm sync.ContextByteVersions, blobVerifier verification.NewBlobVerifier, bfs *filesystem.BlobStorage)
todo(b batch)
complete() (batch, error)
}
@ -24,11 +27,11 @@ type worker interface {
run(context.Context)
}
type newWorker func(id workerId, in, out chan batch, c *startup.Clock, v *verifier) worker
type newWorker func(id workerId, in, out chan batch, c *startup.Clock, v *verifier, cm sync.ContextByteVersions, nbv verification.NewBlobVerifier, bfs *filesystem.BlobStorage) worker
func defaultNewWorker(p p2p.P2P) newWorker {
return func(id workerId, in, out chan batch, c *startup.Clock, v *verifier) worker {
return newP2pWorker(id, p, in, out, c, v)
return func(id workerId, in, out chan batch, c *startup.Clock, v *verifier, cm sync.ContextByteVersions, nbv verification.NewBlobVerifier, bfs *filesystem.BlobStorage) worker {
return newP2pWorker(id, p, in, out, c, v, cm, nbv, bfs)
}
}
@ -60,11 +63,11 @@ func newP2PBatchWorkerPool(p p2p.P2P, maxBatches int) *p2pBatchWorkerPool {
}
}
func (p *p2pBatchWorkerPool) spawn(ctx context.Context, n int, c *startup.Clock, a PeerAssigner, v *verifier) {
func (p *p2pBatchWorkerPool) spawn(ctx context.Context, n int, c *startup.Clock, a PeerAssigner, v *verifier, cm sync.ContextByteVersions, nbv verification.NewBlobVerifier, bfs *filesystem.BlobStorage) {
p.ctx, p.cancel = context.WithCancel(ctx)
go p.batchRouter(a)
for i := 0; i < n; i++ {
go p.newWorker(workerId(i), p.toWorkers, p.fromWorkers, c, v).run(p.ctx)
go p.newWorker(workerId(i), p.toWorkers, p.fromWorkers, c, v, cm, nbv, bfs).run(p.ctx)
}
}
@ -106,14 +109,21 @@ func (p *p2pBatchWorkerPool) batchRouter(pa PeerAssigner) {
select {
case b := <-p.toRouter:
todo = append(todo, b)
// sort batches in descending order so that we'll always process the dependent batches first
sortBatchDesc(todo)
case <-rt.C:
// Worker assignments can fail if assignBatch can't find a suitable peer.
// This ticker exists to periodically break out of the channel select
// to retry failed assignments.
case b := <-p.fromWorkers:
pid := b.pid
pid := b.busy
busy[pid] = false
p.fromRouter <- b
if b.state == batchBlobSync {
todo = append(todo, b)
sortBatchDesc(todo)
} else {
p.fromRouter <- b
}
case <-p.ctx.Done():
log.WithError(p.ctx.Err()).Info("p2pBatchWorkerPool context canceled, shutting down")
p.shutdown(p.ctx.Err())
@ -135,7 +145,7 @@ func (p *p2pBatchWorkerPool) batchRouter(pa PeerAssigner) {
}
for _, pid := range assigned {
busy[pid] = true
todo[0].pid = pid
todo[0].busy = pid
p.toWorkers <- todo[0].withPeer(pid)
if todo[0].begin < earliest {
earliest = todo[0].begin

View File

@ -6,8 +6,13 @@ import (
"time"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/db/filesystem"
p2ptest "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/testing"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/startup"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/sync"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/verification"
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v4/testing/require"
"github.com/prysmaticlabs/prysm/v4/testing/util"
)
@ -28,6 +33,10 @@ func (m mockAssigner) Assign(busy map[peer.ID]bool, n int) ([]peer.ID, error) {
var _ PeerAssigner = &mockAssigner{}
func mockNewBlobVerifier(_ blocks.ROBlob, _ []verification.Requirement) verification.BlobVerifier {
return &verification.MockBlobVerifier{}
}
func TestPoolDetectAllEnded(t *testing.T) {
nw := 5
p2p := p2ptest.NewTestP2P(t)
@ -40,7 +49,11 @@ func TestPoolDetectAllEnded(t *testing.T) {
require.NoError(t, err)
v, err := newBackfillVerifier(st.GenesisValidatorsRoot(), keys)
require.NoError(t, err)
pool.spawn(ctx, nw, startup.NewClock(time.Now(), [32]byte{}), ma, v)
ctxMap, err := sync.ContextByteVersionsForValRoot(bytesutil.ToBytes32(st.GenesisValidatorsRoot()))
require.NoError(t, err)
bfs := filesystem.NewEphemeralBlobStorage(t)
pool.spawn(ctx, nw, startup.NewClock(time.Now(), [32]byte{}), ma, v, ctxMap, mockNewBlobVerifier, bfs)
br := batcher{min: 10, size: 10}
endSeq := br.before(0)
require.Equal(t, batchEndSequence, endSeq.state)
@ -59,7 +72,7 @@ type mockPool struct {
todoChan chan batch
}
func (m *mockPool) spawn(_ context.Context, _ int, _ *startup.Clock, _ PeerAssigner, _ *verifier) {
func (m *mockPool) spawn(_ context.Context, _ int, _ *startup.Clock, _ PeerAssigner, _ *verifier, _ sync.ContextByteVersions, _ verification.NewBlobVerifier, _ *filesystem.BlobStorage) {
}
func (m *mockPool) todo(b batch) {

View File

@ -6,8 +6,12 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/db/filesystem"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/startup"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/sync"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/verification"
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v4/proto/dbval"
@ -17,78 +21,39 @@ import (
)
type Service struct {
ctx context.Context
store *Store
ms minimumSlotter
cw startup.ClockWaiter
enabled bool // service is disabled by default while feature is experimental
nWorkers int
batchSeq *batchSequencer
batchSize uint64
pool batchWorkerPool
verifier *verifier
p2p p2p.P2P
pa PeerAssigner
batchImporter batchImporter
ctx context.Context
enabled bool // service is disabled by default while feature is experimental
clock *startup.Clock
store *Store
ms minimumSlotter
cw startup.ClockWaiter
verifierWaiter InitializerWaiter
newBlobVerifier verification.NewBlobVerifier
nWorkers int
batchSeq *batchSequencer
batchSize uint64
pool batchWorkerPool
verifier *verifier
ctxMap sync.ContextByteVersions
p2p p2p.P2P
pa PeerAssigner
batchImporter batchImporter
blobStore *filesystem.BlobStorage
}
var _ runtime.Service = (*Service)(nil)
type ServiceOption func(*Service) error
func WithEnableBackfill(enabled bool) ServiceOption {
return func(s *Service) error {
s.enabled = enabled
return nil
}
// PeerAssigner describes a type that provides an Assign method, which can assign the best peer
// to service an RPC blockRequest. The Assign method takes a map of peers that should be excluded,
// allowing the caller to avoid making multiple concurrent requests to the same peer.
type PeerAssigner interface {
Assign(busy map[peer.ID]bool, n int) ([]peer.ID, error)
}
func WithWorkerCount(n int) ServiceOption {
return func(s *Service) error {
s.nWorkers = n
return nil
}
}
type minimumSlotter func(primitives.Slot) primitives.Slot
type batchImporter func(ctx context.Context, current primitives.Slot, b batch, su *Store) (*dbval.BackfillStatus, error)
func WithBatchSize(n uint64) ServiceOption {
return func(s *Service) error {
s.batchSize = n
return nil
}
}
type minimumSlotter interface {
minimumSlot() primitives.Slot
setClock(*startup.Clock)
}
type defaultMinimumSlotter struct {
clock *startup.Clock
cw startup.ClockWaiter
ctx context.Context
}
func (d defaultMinimumSlotter) minimumSlot() primitives.Slot {
if d.clock == nil {
var err error
d.clock, err = d.cw.WaitForClock(d.ctx)
if err != nil {
log.WithError(err).Fatal("failed to obtain system/genesis clock, unable to start backfill service")
}
}
return minimumBackfillSlot(d.clock.CurrentSlot())
}
func (d defaultMinimumSlotter) setClock(c *startup.Clock) {
//nolint:all
d.clock = c
}
var _ minimumSlotter = &defaultMinimumSlotter{}
type batchImporter func(ctx context.Context, b batch, su *Store) (*dbval.BackfillStatus, error)
func defaultBatchImporter(ctx context.Context, b batch, su *Store) (*dbval.BackfillStatus, error) {
func defaultBatchImporter(ctx context.Context, current primitives.Slot, b batch, su *Store) (*dbval.BackfillStatus, error) {
status := su.status()
if err := b.ensureParent(bytesutil.ToBytes32(status.LowParentRoot)); err != nil {
return status, err
@ -96,28 +61,63 @@ func defaultBatchImporter(ctx context.Context, b batch, su *Store) (*dbval.Backf
// Import blocks to db and update db state to reflect the newly imported blocks.
// Other parts of the beacon node may use the same StatusUpdater instance
// via the coverage.AvailableBlocker interface to safely determine if a given slot has been backfilled.
status, err := su.fillBack(ctx, b.results)
if err != nil {
log.WithError(err).Fatal("Non-recoverable db error in backfill service, quitting.")
}
return status, nil
return su.fillBack(ctx, current, b.results, b.availabilityStore())
}
// PeerAssigner describes a type that provides an Assign method, which can assign the best peer
// to service an RPC request. The Assign method takes a map of peers that should be excluded,
// allowing the caller to avoid making multiple concurrent requests to the same peer.
type PeerAssigner interface {
Assign(busy map[peer.ID]bool, n int) ([]peer.ID, error)
// ServiceOption represents a functional option for the backfill service constructor.
type ServiceOption func(*Service) error
// WithEnableBackfill toggles the entire backfill service on or off, intended to be used by a feature flag.
func WithEnableBackfill(enabled bool) ServiceOption {
return func(s *Service) error {
s.enabled = enabled
return nil
}
}
// WithWorkerCount sets the number of goroutines in the batch processing pool that can concurrently
// make p2p requests to download data for batches.
func WithWorkerCount(n int) ServiceOption {
return func(s *Service) error {
s.nWorkers = n
return nil
}
}
// WithBatchSize configures the size of backfill batches, similar to the initial-sync block-batch-limit flag.
// It should usually be left at the default value.
func WithBatchSize(n uint64) ServiceOption {
return func(s *Service) error {
s.batchSize = n
return nil
}
}
// InitializerWaiter is an interface that is satisfied by verification.InitializerWaiter.
// Using this interface enables node init to satisfy this requirement for the backfill service
// while also allowing backfill to mock it in tests.
type InitializerWaiter interface {
WaitForInitializer(ctx context.Context) (*verification.Initializer, error)
}
// WithVerifierWaiter sets the verification.InitializerWaiter
// for the backfill Service.
func WithVerifierWaiter(viw InitializerWaiter) ServiceOption {
return func(s *Service) error {
s.verifierWaiter = viw
return nil
}
}
// NewService initializes the backfill Service. Like all implementations of the Service interface,
// the service won't begin its runloop until Start() is called.
func NewService(ctx context.Context, su *Store, cw startup.ClockWaiter, p p2p.P2P, pa PeerAssigner, opts ...ServiceOption) (*Service, error) {
func NewService(ctx context.Context, su *Store, bStore *filesystem.BlobStorage, cw startup.ClockWaiter, p p2p.P2P, pa PeerAssigner, opts ...ServiceOption) (*Service, error) {
s := &Service{
ctx: ctx,
store: su,
blobStore: bStore,
cw: cw,
ms: &defaultMinimumSlotter{cw: cw, ctx: ctx},
ms: minimumBackfillSlot,
p2p: p,
pa: pa,
batchImporter: defaultBatchImporter,
@ -132,26 +132,33 @@ func NewService(ctx context.Context, su *Store, cw startup.ClockWaiter, p p2p.P2
return s, nil
}
func (s *Service) initVerifier(ctx context.Context) (*verifier, error) {
func (s *Service) initVerifier(ctx context.Context) (*verifier, sync.ContextByteVersions, error) {
cps, err := s.store.originState(ctx)
if err != nil {
return nil, err
return nil, nil, err
}
keys, err := cps.PublicKeys()
if err != nil {
return nil, errors.Wrap(err, "Unable to retrieve public keys for all validators in the origin state")
return nil, nil, errors.Wrap(err, "Unable to retrieve public keys for all validators in the origin state")
}
return newBackfillVerifier(cps.GenesisValidatorsRoot(), keys)
vr := cps.GenesisValidatorsRoot()
ctxMap, err := sync.ContextByteVersionsForValRoot(bytesutil.ToBytes32(vr))
if err != nil {
return nil, nil, errors.Wrapf(err, "unable to initialize context version map using genesis validator root = %#x", vr)
}
v, err := newBackfillVerifier(vr, keys)
return v, ctxMap, err
}
func (s *Service) updateComplete() bool {
b, err := s.pool.complete()
if err != nil {
if errors.Is(err, errEndSequence) {
log.WithField("backfill_slot", b.begin).Info("Backfill is complete")
log.WithField("backfill_slot", b.begin).Info("Backfill is complete.")
return true
}
log.WithError(err).Fatal("Non-recoverable error in backfill service, quitting.")
log.WithError(err).Error("Backfill service received unhandled error from worker pool.")
return true
}
s.batchSeq.update(b)
return false
@ -166,12 +173,13 @@ func (s *Service) importBatches(ctx context.Context) {
}
backfillBatchesImported.Add(float64(imported))
}()
current := s.clock.CurrentSlot()
for i := range importable {
ib := importable[i]
if len(ib.results) == 0 {
log.WithFields(ib.logFields()).Error("Batch with no results, skipping importer.")
}
_, err := s.batchImporter(ctx, ib, s.store)
_, err := s.batchImporter(ctx, current, ib, s.store)
if err != nil {
log.WithError(err).WithFields(ib.logFields()).Debug("Backfill batch failed to import.")
s.downscore(ib)
@ -214,40 +222,51 @@ func (s *Service) scheduleTodos() {
// Start begins the runloop of backfill.Service in the current goroutine.
func (s *Service) Start() {
if !s.enabled {
log.Info("Exiting backfill service; not enabled.")
log.Info("Backfill service not enabled.")
return
}
ctx, cancel := context.WithCancel(s.ctx)
defer func() {
log.Info("Backfill service is shutting down.")
cancel()
}()
clock, err := s.cw.WaitForClock(ctx)
if err != nil {
log.WithError(err).Fatal("Backfill service failed to start while waiting for genesis data.")
log.WithError(err).Error("Backfill service failed to start while waiting for genesis data.")
return
}
s.clock = clock
v, err := s.verifierWaiter.WaitForInitializer(ctx)
s.newBlobVerifier = newBlobVerifierFromInitializer(v)
if err != nil {
log.WithError(err).Error("Could not initialize blob verifier in backfill service.")
return
}
s.ms.setClock(clock)
if s.store.isGenesisSync() {
log.Info("Exiting backfill service as the node has been initialized with a genesis state or the backfill status is missing")
log.Info("Backfill short-circuit; node synced from genesis.")
return
}
status := s.store.status()
// Exit early if there aren't going to be any batches to backfill.
if primitives.Slot(status.LowSlot) <= s.ms.minimumSlot() {
log.WithField("minimum_required_slot", s.ms.minimumSlot()).
if primitives.Slot(status.LowSlot) <= s.ms(s.clock.CurrentSlot()) {
log.WithField("minimum_required_slot", s.ms(s.clock.CurrentSlot())).
WithField("backfill_lowest_slot", status.LowSlot).
Info("Exiting backfill service; minimum block retention slot > lowest backfilled block.")
return
}
s.verifier, err = s.initVerifier(ctx)
s.verifier, s.ctxMap, err = s.initVerifier(ctx)
if err != nil {
log.WithError(err).Fatal("Unable to initialize backfill verifier, quitting.")
log.WithError(err).Error("Unable to initialize backfill verifier.")
return
}
s.pool.spawn(ctx, s.nWorkers, clock, s.pa, s.verifier)
s.pool.spawn(ctx, s.nWorkers, clock, s.pa, s.verifier, s.ctxMap, s.newBlobVerifier, s.blobStore)
s.batchSeq = newBatchSequencer(s.nWorkers, s.ms.minimumSlot(), primitives.Slot(status.LowSlot), primitives.Slot(s.batchSize))
s.batchSeq = newBatchSequencer(s.nWorkers, s.ms(s.clock.CurrentSlot()), primitives.Slot(status.LowSlot), primitives.Slot(s.batchSize))
if err = s.initBatches(); err != nil {
log.WithError(err).Fatal("Non-recoverable error in backfill service, quitting.")
log.WithError(err).Error("Non-recoverable error in backfill service.")
return
}
for {
@ -259,8 +278,8 @@ func (s *Service) Start() {
}
s.importBatches(ctx)
batchesWaiting.Set(float64(s.batchSeq.countWithState(batchImportable)))
if err := s.batchSeq.moveMinimum(s.ms.minimumSlot()); err != nil {
log.WithError(err).Fatal("Non-recoverable error in backfill service, quitting.")
if err := s.batchSeq.moveMinimum(s.ms(s.clock.CurrentSlot())); err != nil {
log.WithError(err).Error("Non-recoverable error while adjusting backfill minimum slot.")
}
s.scheduleTodos()
}
@ -278,7 +297,7 @@ func (s *Service) initBatches() error {
}
func (s *Service) downscore(b batch) {
s.p2p.Peers().Scorers().BadResponsesScorer().Increment(b.pid)
s.p2p.Peers().Scorers().BadResponsesScorer().Increment(b.blockPid)
}
func (s *Service) Stop() error {
@ -304,3 +323,9 @@ func minimumBackfillSlot(current primitives.Slot) primitives.Slot {
}
return current - offset
}
func newBlobVerifierFromInitializer(ini *verification.Initializer) verification.NewBlobVerifier {
return func(b blocks.ROBlob, reqs []verification.Requirement) verification.BlobVerifier {
return ini.NewBlobVerifier(b, reqs)
}
}

View File

@ -6,9 +6,11 @@ import (
"time"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/db/filesystem"
p2ptest "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/testing"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/startup"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/verification"
"github.com/prysmaticlabs/prysm/v4/config/params"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v4/proto/dbval"
@ -20,13 +22,15 @@ type mockMinimumSlotter struct {
min primitives.Slot
}
var _ minimumSlotter = &mockMinimumSlotter{}
func (m mockMinimumSlotter) minimumSlot() primitives.Slot {
func (m mockMinimumSlotter) minimumSlot(_ primitives.Slot) primitives.Slot {
return m.min
}
func (m mockMinimumSlotter) setClock(*startup.Clock) {
type mockInitalizerWaiter struct {
}
func (mi *mockInitalizerWaiter) WaitForInitializer(ctx context.Context) (*verification.Initializer, error) {
return &verification.Initializer{}, nil
}
func TestServiceInit(t *testing.T) {
@ -52,11 +56,13 @@ func TestServiceInit(t *testing.T) {
require.NoError(t, cw.SetClock(startup.NewClock(time.Now(), [32]byte{})))
pool := &mockPool{todoChan: make(chan batch, nWorkers), finishedChan: make(chan batch, nWorkers)}
p2pt := p2ptest.NewTestP2P(t)
srv, err := NewService(ctx, su, cw, p2pt, &mockAssigner{}, WithBatchSize(batchSize), WithWorkerCount(nWorkers), WithEnableBackfill(true))
bfs := filesystem.NewEphemeralBlobStorage(t)
srv, err := NewService(ctx, su, bfs, cw, p2pt, &mockAssigner{},
WithBatchSize(batchSize), WithWorkerCount(nWorkers), WithEnableBackfill(true), WithVerifierWaiter(&mockInitalizerWaiter{}))
require.NoError(t, err)
srv.ms = mockMinimumSlotter{min: primitives.Slot(high - batchSize*uint64(nBatches))}
srv.ms = mockMinimumSlotter{min: primitives.Slot(high - batchSize*uint64(nBatches))}.minimumSlot
srv.pool = pool
srv.batchImporter = func(context.Context, batch, *Store) (*dbval.BackfillStatus, error) {
srv.batchImporter = func(context.Context, primitives.Slot, batch, *Store) (*dbval.BackfillStatus, error) {
return &dbval.BackfillStatus{}, nil
}
go srv.Start()

View File

@ -5,6 +5,7 @@ import (
"sync"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/das"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/db"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state"
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
@ -73,7 +74,7 @@ func (s *Store) status() *dbval.BackfillStatus {
// fillBack saves the slice of blocks and updates the BackfillStatus LowSlot/Root/ParentRoot tracker to the values
// from the first block in the slice. This method assumes that the block slice has been fully validated and
// sorted in slot order by the calling function.
func (s *Store) fillBack(ctx context.Context, blocks []blocks.ROBlock) (*dbval.BackfillStatus, error) {
func (s *Store) fillBack(ctx context.Context, current primitives.Slot, blocks []blocks.ROBlock, store das.AvailabilityStore) (*dbval.BackfillStatus, error) {
status := s.status()
if len(blocks) == 0 {
return status, nil
@ -87,6 +88,12 @@ func (s *Store) fillBack(ctx context.Context, blocks []blocks.ROBlock) (*dbval.B
status.LowParentRoot, highest.Root(), status.LowSlot, highest.Block().Slot())
}
for i := range blocks {
if err := store.IsDataAvailable(ctx, current, blocks[i]); err != nil {
return nil, err
}
}
if err := s.store.SaveROBlocks(ctx, blocks, false); err != nil {
return nil, errors.Wrapf(err, "error saving backfill blocks")
}

View File

@ -5,6 +5,7 @@ import (
"context"
"testing"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/das"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/db"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state"
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
@ -138,7 +139,7 @@ func TestStatusUpdater_FillBack(t *testing.T) {
require.NoError(t, err)
s := &Store{bs: &dbval.BackfillStatus{LowSlot: 100, LowParentRoot: rob.RootSlice()}, store: mdb}
require.Equal(t, false, s.AvailableBlock(95))
_, err = s.fillBack(ctx, []blocks.ROBlock{rob})
_, err = s.fillBack(ctx, 0, []blocks.ROBlock{rob}, &das.MockAvailabilityStore{})
require.NoError(t, err)
require.Equal(t, true, s.AvailableBlock(95))
}

View File

@ -9,7 +9,9 @@ import (
"github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v4/crypto/bls"
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v4/network/forks"
"github.com/prysmaticlabs/prysm/v4/runtime/version"
"github.com/prysmaticlabs/prysm/v4/time/slots"
)
@ -17,8 +19,37 @@ var errInvalidBatchChain = errors.New("parent_root of block does not match the p
var errProposerIndexTooHigh = errors.New("proposer index not present in origin state")
var errUnknownDomain = errors.New("runtime error looking up signing domain for fork")
// VerifiedROBlocks represents a slice of blocks that have passed signature verification.
type VerifiedROBlocks []blocks.ROBlock
// verifiedROBlocks represents a slice of blocks that have passed signature verification.
type verifiedROBlocks []blocks.ROBlock
func (v verifiedROBlocks) blobIdents(retentionStart primitives.Slot) ([]blobSummary, error) {
// early return if the newest block is outside the retention window
if len(v) > 0 && v[len(v)-1].Block().Slot() < retentionStart {
return nil, nil
}
bs := make([]blobSummary, 0)
for i := range v {
if v[i].Block().Slot() < retentionStart {
continue
}
if v[i].Block().Version() < version.Deneb {
continue
}
c, err := v[i].Block().Body().BlobKzgCommitments()
if err != nil {
return nil, errors.Wrapf(err, "unexpected error checking commitments for block root %#x", v[i].Root())
}
if len(c) == 0 {
continue
}
for ci := range c {
bs = append(bs, blobSummary{
blockRoot: v[i].Root(), signature: v[i].Signature(),
index: uint64(ci), commitment: bytesutil.ToBytes48(c[ci])})
}
}
return bs, nil
}
type verifier struct {
keys [][fieldparams.BLSPubkeyLength]byte
@ -27,7 +58,7 @@ type verifier struct {
}
// TODO: rewrite this to use ROBlock.
func (vr verifier) verify(blks []interfaces.ReadOnlySignedBeaconBlock) (VerifiedROBlocks, error) {
func (vr verifier) verify(blks []interfaces.ReadOnlySignedBeaconBlock) (verifiedROBlocks, error) {
var err error
result := make([]blocks.ROBlock, len(blks))
sigSet := bls.NewSet()

View File

@ -4,9 +4,12 @@ import (
"context"
"time"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/db/filesystem"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/startup"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/sync"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/verification"
log "github.com/sirupsen/logrus"
)
@ -19,6 +22,9 @@ type p2pWorker struct {
p2p p2p.P2P
v *verifier
c *startup.Clock
cm sync.ContextByteVersions
nbv verification.NewBlobVerifier
bfs *filesystem.BlobStorage
}
func (w *p2pWorker) run(ctx context.Context) {
@ -26,7 +32,11 @@ func (w *p2pWorker) run(ctx context.Context) {
select {
case b := <-w.todo:
log.WithFields(b.logFields()).WithField("backfill_worker", w.id).Debug("Backfill worker received batch.")
w.done <- w.handle(ctx, b)
if b.state == batchBlobSync {
w.done <- w.handleBlobs(ctx, b)
} else {
w.done <- w.handleBlocks(ctx, b)
}
case <-ctx.Done():
log.WithField("backfill_worker", w.id).Info("Backfill worker exiting after context canceled.")
return
@ -34,11 +44,17 @@ func (w *p2pWorker) run(ctx context.Context) {
}
}
func (w *p2pWorker) handle(ctx context.Context, b batch) batch {
func (w *p2pWorker) handleBlocks(ctx context.Context, b batch) batch {
cs := w.c.CurrentSlot()
blobRetentionStart, err := sync.BlobsByRangeMinStartSlot(cs)
if err != nil {
return b.withRetryableError(errors.Wrap(err, "configuration issue, could not compute minimum blob retention slot"))
}
b.blockPid = b.busy
start := time.Now()
results, err := sync.SendBeaconBlocksByRangeRequest(ctx, w.c, w.p2p, b.pid, b.request(), nil)
results, err := sync.SendBeaconBlocksByRangeRequest(ctx, w.c, w.p2p, b.blockPid, b.blockRequest(), blockValidationMetrics)
dlt := time.Now()
backfillBatchTimeDownloading.Observe(float64(dlt.Sub(start).Milliseconds()))
backfillBatchTimeDownloadingBlocks.Observe(float64(dlt.Sub(start).Milliseconds()))
if err != nil {
log.WithError(err).WithFields(b.logFields()).Debug("Batch requesting failed")
return b.withRetryableError(err)
@ -56,13 +72,37 @@ func (w *p2pWorker) handle(ctx context.Context, b batch) batch {
for i := range vb {
bdl += vb[i].SizeSSZ()
}
backfillBatchApproximateBytes.Add(float64(bdl))
log.WithField("dlbytes", bdl).Debug("backfill batch bytes downloaded")
b.results = vb
return b.withState(batchImportable)
backfillBlocksApproximateBytes.Add(float64(bdl))
log.WithFields(b.logFields()).WithField("dlbytes", bdl).Debug("backfill batch block bytes downloaded")
bs, err := newBlobSync(cs, vb, &blobSyncConfig{retentionStart: blobRetentionStart, nbv: w.nbv, store: w.bfs})
if err != nil {
return b.withRetryableError(err)
}
return b.withResults(vb, bs)
}
func newP2pWorker(id workerId, p p2p.P2P, todo, done chan batch, c *startup.Clock, v *verifier) *p2pWorker {
func (w *p2pWorker) handleBlobs(ctx context.Context, b batch) batch {
b.blobPid = b.busy
start := time.Now()
// we don't need to use the response for anything other than metrics, because blobResponseValidation
// adds each of them to a batch AvailabilityStore once it is checked.
blobs, err := sync.SendBlobsByRangeRequest(ctx, w.c, w.p2p, b.blobPid, w.cm, b.blobRequest(), b.blobResponseValidator(), blobValidationMetrics)
if err != nil {
b.bs = nil
return b.withRetryableError(err)
}
dlt := time.Now()
backfillBatchTimeDownloadingBlobs.Observe(float64(dlt.Sub(start).Milliseconds()))
if len(blobs) > 0 {
// All blobs are the same size, so we can compute 1 and use it for all in the batch.
sz := blobs[0].SizeSSZ() * len(blobs)
backfillBlobsApproximateBytes.Add(float64(sz))
log.WithFields(b.logFields()).WithField("dlbytes", sz).Debug("backfill batch blob bytes downloaded")
}
return b.postBlobSync()
}
func newP2pWorker(id workerId, p p2p.P2P, todo, done chan batch, c *startup.Clock, v *verifier, cm sync.ContextByteVersions, nbv verification.NewBlobVerifier, bfs *filesystem.BlobStorage) *p2pWorker {
return &p2pWorker{
id: id,
todo: todo,
@ -70,5 +110,8 @@ func newP2pWorker(id workerId, p p2p.P2P, todo, done chan batch, c *startup.Cloc
p2p: p,
v: v,
c: c,
cm: cm,
nbv: nbv,
bfs: bfs,
}
}

View File

@ -155,7 +155,7 @@ func SendBeaconBlocksByRootRequest(
return blocks, nil
}
func SendBlobsByRangeRequest(ctx context.Context, tor blockchain.TemporalOracle, p2pApi p2p.SenderEncoder, pid peer.ID, ctxMap ContextByteVersions, req *pb.BlobSidecarsByRangeRequest) ([]blocks.ROBlob, error) {
func SendBlobsByRangeRequest(ctx context.Context, tor blockchain.TemporalOracle, p2pApi p2p.SenderEncoder, pid peer.ID, ctxMap ContextByteVersions, req *pb.BlobSidecarsByRangeRequest, bvs ...BlobResponseValidation) ([]blocks.ROBlob, error) {
topic, err := p2p.TopicFromMessage(p2p.BlobSidecarsByRangeName, slots.ToEpoch(tor.CurrentSlot()))
if err != nil {
return nil, err
@ -175,8 +175,11 @@ func SendBlobsByRangeRequest(ctx context.Context, tor blockchain.TemporalOracle,
if max > req.Count*fieldparams.MaxBlobsPerBlock {
max = req.Count * fieldparams.MaxBlobsPerBlock
}
blobVal := composeBlobValidations(blobValidatorFromRangeReq(req), newSequentialBlobValidator())
return readChunkEncodedBlobs(stream, p2pApi.Encoding(), ctxMap, blobVal, max)
vfuncs := []BlobResponseValidation{blobValidatorFromRangeReq(req), newSequentialBlobValidator()}
if len(bvs) > 0 {
vfuncs = append(vfuncs, bvs...)
}
return readChunkEncodedBlobs(stream, p2pApi.Encoding(), ctxMap, composeBlobValidations(vfuncs...), max)
}
func SendBlobSidecarByRoot(
@ -205,9 +208,11 @@ func SendBlobSidecarByRoot(
return readChunkEncodedBlobs(stream, p2pApi.Encoding(), ctxMap, blobValidatorFromRootReq(req), max)
}
type blobResponseValidation func(blocks.ROBlob) error
// BlobResponseValidation represents a function that can validate aspects of a single unmarshaled blob
// that was received from a peer in response to an rpc request.
type BlobResponseValidation func(blocks.ROBlob) error
func composeBlobValidations(vf ...blobResponseValidation) blobResponseValidation {
func composeBlobValidations(vf ...BlobResponseValidation) BlobResponseValidation {
return func(blob blocks.ROBlob) error {
for i := range vf {
if err := vf[i](blob); err != nil {
@ -264,14 +269,14 @@ func (sbv *seqBlobValid) nextValid(blob blocks.ROBlob) error {
return nil
}
func newSequentialBlobValidator() blobResponseValidation {
func newSequentialBlobValidator() BlobResponseValidation {
sbv := &seqBlobValid{}
return func(blob blocks.ROBlob) error {
return sbv.nextValid(blob)
}
}
func blobValidatorFromRootReq(req *p2ptypes.BlobSidecarsByRootReq) blobResponseValidation {
func blobValidatorFromRootReq(req *p2ptypes.BlobSidecarsByRootReq) BlobResponseValidation {
blobIds := make(map[[32]byte]map[uint64]bool)
for _, sc := range *req {
blockRoot := bytesutil.ToBytes32(sc.BlockRoot)
@ -293,7 +298,7 @@ func blobValidatorFromRootReq(req *p2ptypes.BlobSidecarsByRootReq) blobResponseV
}
}
func blobValidatorFromRangeReq(req *pb.BlobSidecarsByRangeRequest) blobResponseValidation {
func blobValidatorFromRangeReq(req *pb.BlobSidecarsByRangeRequest) BlobResponseValidation {
end := req.StartSlot + primitives.Slot(req.Count)
return func(sc blocks.ROBlob) error {
if sc.Slot() < req.StartSlot || sc.Slot() >= end {
@ -303,7 +308,7 @@ func blobValidatorFromRangeReq(req *pb.BlobSidecarsByRangeRequest) blobResponseV
}
}
func readChunkEncodedBlobs(stream network.Stream, encoding encoder.NetworkEncoding, ctxMap ContextByteVersions, vf blobResponseValidation, max uint64) ([]blocks.ROBlob, error) {
func readChunkEncodedBlobs(stream network.Stream, encoding encoder.NetworkEncoding, ctxMap ContextByteVersions, vf BlobResponseValidation, max uint64) ([]blocks.ROBlob, error) {
sidecars := make([]blocks.ROBlob, 0)
// Attempt an extra read beyond max to check if the peer is violating the spec by
// sending more than MAX_REQUEST_BLOB_SIDECARS, or more blobs than requested.
@ -327,7 +332,7 @@ func readChunkEncodedBlobs(stream network.Stream, encoding encoder.NetworkEncodi
return sidecars, nil
}
func readChunkedBlobSidecar(stream network.Stream, encoding encoder.NetworkEncoding, ctxMap ContextByteVersions, vf blobResponseValidation) (blocks.ROBlob, error) {
func readChunkedBlobSidecar(stream network.Stream, encoding encoder.NetworkEncoding, ctxMap ContextByteVersions, vf BlobResponseValidation) (blocks.ROBlob, error) {
var b blocks.ROBlob
pb := &ethpb.BlobSidecar{}
decode := encoding.DecodeWithMaxLength

View File

@ -63,6 +63,9 @@ var InitsyncSidecarRequirements = []Requirement{
RequireSidecarInclusionProven,
}
// BackfillSidecarRequirements is the same as InitsyncSidecarRequirements
var BackfillSidecarRequirements = InitsyncSidecarRequirements
var (
ErrBlobInvalid = errors.New("blob failed verification")
// ErrBlobIndexInvalid means RequireBlobIndexInBounds failed.

View File

@ -40,7 +40,7 @@ type sharedResources struct {
// Initializer is used to create different Verifiers.
// Verifiers require access to stateful data structures, like caches,
// and it is Initializer's job to provides access to those.
// and it is Initializer's job to provide access to those.
type Initializer struct {
shared *sharedResources
}