diff --git a/beacon-chain/node/node.go b/beacon-chain/node/node.go index 2e5be1e92..8c600dc2b 100644 --- a/beacon-chain/node/node.go +++ b/beacon-chain/node/node.go @@ -229,14 +229,6 @@ func New(cliCtx *cli.Context, cancel context.CancelFunc, opts ...Option) (*Beaco if err != nil { return nil, errors.Wrap(err, "backfill status initialization error") } - pa := peers.NewAssigner(beacon.fetchP2P().Peers(), beacon.forkChoicer) - bf, err := backfill.NewService(ctx, bfs, beacon.clockWaiter, beacon.fetchP2P(), pa, beacon.BackfillOpts...) - if err != nil { - return nil, errors.Wrap(err, "error initializing backfill service") - } - if err := beacon.services.RegisterService(bf); err != nil { - return nil, errors.Wrap(err, "error registering backfill service") - } log.Debugln("Starting State Gen") if err := beacon.startStateGen(ctx, bfs, beacon.forkChoicer); err != nil { @@ -251,6 +243,16 @@ func New(cliCtx *cli.Context, cancel context.CancelFunc, opts ...Option) (*Beaco beacon.verifyInitWaiter = verification.NewInitializerWaiter( beacon.clockWaiter, forkchoice.NewROForkChoice(beacon.forkChoicer), beacon.stateGen) + pa := peers.NewAssigner(beacon.fetchP2P().Peers(), beacon.forkChoicer) + beacon.BackfillOpts = append(beacon.BackfillOpts, backfill.WithVerifierWaiter(beacon.verifyInitWaiter)) + bf, err := backfill.NewService(ctx, bfs, beacon.BlobStorage, beacon.clockWaiter, beacon.fetchP2P(), pa, beacon.BackfillOpts...) + if err != nil { + return nil, errors.Wrap(err, "error initializing backfill service") + } + if err := beacon.services.RegisterService(bf); err != nil { + return nil, errors.Wrap(err, "error registering backfill service") + } + log.Debugln("Registering POW Chain Service") if err := beacon.registerPOWChainService(); err != nil { return nil, err diff --git a/beacon-chain/sync/backfill/BUILD.bazel b/beacon-chain/sync/backfill/BUILD.bazel index 6abde4fa5..9ddc0e7b2 100644 --- a/beacon-chain/sync/backfill/BUILD.bazel +++ b/beacon-chain/sync/backfill/BUILD.bazel @@ -5,6 +5,7 @@ go_library( srcs = [ "batch.go", "batcher.go", + "blobs.go", "metrics.go", "pool.go", "service.go", @@ -17,12 +18,15 @@ go_library( deps = [ "//beacon-chain/core/helpers:go_default_library", "//beacon-chain/core/signing:go_default_library", + "//beacon-chain/das:go_default_library", "//beacon-chain/db:go_default_library", + "//beacon-chain/db/filesystem:go_default_library", "//beacon-chain/p2p:go_default_library", "//beacon-chain/p2p/peers:go_default_library", "//beacon-chain/startup:go_default_library", "//beacon-chain/state:go_default_library", "//beacon-chain/sync:go_default_library", + "//beacon-chain/verification:go_default_library", "//config/fieldparams:go_default_library", "//config/params:go_default_library", "//consensus-types/blocks:go_default_library", @@ -34,6 +38,7 @@ go_library( "//proto/dbval:go_default_library", "//proto/prysm/v1alpha1:go_default_library", "//runtime:go_default_library", + "//runtime/version:go_default_library", "//time/slots:go_default_library", "@com_github_libp2p_go_libp2p//core/peer:go_default_library", "@com_github_pkg_errors//:go_default_library", @@ -46,7 +51,9 @@ go_library( go_test( name = "go_default_test", srcs = [ + "batch_test.go", "batcher_test.go", + "blobs_test.go", "pool_test.go", "service_test.go", "status_test.go", @@ -56,10 +63,14 @@ go_test( deps = [ "//beacon-chain/core/helpers:go_default_library", "//beacon-chain/core/signing:go_default_library", + "//beacon-chain/das:go_default_library", "//beacon-chain/db:go_default_library", + "//beacon-chain/db/filesystem:go_default_library", "//beacon-chain/p2p/testing:go_default_library", "//beacon-chain/startup:go_default_library", "//beacon-chain/state:go_default_library", + "//beacon-chain/sync:go_default_library", + "//beacon-chain/verification:go_default_library", "//config/fieldparams:go_default_library", "//config/params:go_default_library", "//consensus-types/blocks:go_default_library", diff --git a/beacon-chain/sync/backfill/batch.go b/beacon-chain/sync/backfill/batch.go index f6c04a2c7..b7f8bb05b 100644 --- a/beacon-chain/sync/backfill/batch.go +++ b/beacon-chain/sync/backfill/batch.go @@ -2,10 +2,14 @@ package backfill import ( "fmt" + "sort" "time" "github.com/libp2p/go-libp2p/core/peer" "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/das" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/sync" + "github.com/prysmaticlabs/prysm/v4/consensus-types/blocks" "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" eth "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" log "github.com/sirupsen/logrus" @@ -33,6 +37,8 @@ func (s batchState) String() string { return "import_complete" case batchEndSequence: return "end_sequence" + case batchBlobSync: + return "blob_sync" default: return "unknown" } @@ -43,6 +49,7 @@ const ( batchInit batchSequenced batchErrRetryable + batchBlobSync batchImportable batchImportComplete batchEndSequence @@ -57,10 +64,13 @@ type batch struct { retries int begin primitives.Slot end primitives.Slot // half-open interval, [begin, end), ie >= start, < end. - results VerifiedROBlocks + results verifiedROBlocks err error state batchState - pid peer.ID + busy peer.ID + blockPid peer.ID + blobPid peer.ID + bs *blobSync } func (b batch) logFields() log.Fields { @@ -72,7 +82,9 @@ func (b batch) logFields() log.Fields { "retries": b.retries, "begin": b.begin, "end": b.end, - "pid": b.pid, + "busyPid": b.busy, + "blockPid": b.blockPid, + "blobPid": b.blobPid, } } @@ -101,7 +113,7 @@ func (b batch) ensureParent(expected [32]byte) error { return nil } -func (b batch) request() *eth.BeaconBlocksByRangeRequest { +func (b batch) blockRequest() *eth.BeaconBlocksByRangeRequest { return ð.BeaconBlocksByRangeRequest{ StartSlot: b.begin, Count: uint64(b.end - b.begin), @@ -109,6 +121,32 @@ func (b batch) request() *eth.BeaconBlocksByRangeRequest { } } +func (b batch) blobRequest() *eth.BlobSidecarsByRangeRequest { + return ð.BlobSidecarsByRangeRequest{ + StartSlot: b.begin, + Count: uint64(b.end - b.begin), + } +} + +func (b batch) withResults(results verifiedROBlocks, bs *blobSync) batch { + b.results = results + b.bs = bs + if bs.blobsNeeded() > 0 { + return b.withState(batchBlobSync) + } + return b.withState(batchImportable) +} + +func (b batch) postBlobSync() batch { + if b.blobsNeeded() > 0 { + log.WithFields(b.logFields()).WithField("blobs_missing", b.blobsNeeded()).Error("batch still missing blobs after downloading from peer") + b.bs = nil + b.results = []blocks.ROBlock{} + return b.withState(batchErrRetryable) + } + return b.withState(batchImportable) +} + func (b batch) withState(s batchState) batch { if s == batchSequenced { b.scheduled = time.Now() @@ -130,7 +168,7 @@ func (b batch) withState(s batchState) batch { } func (b batch) withPeer(p peer.ID) batch { - b.pid = p + b.blockPid = p backfillBatchTimeWaiting.Observe(float64(time.Since(b.scheduled).Milliseconds())) return b } @@ -139,3 +177,21 @@ func (b batch) withRetryableError(err error) batch { b.err = err return b.withState(batchErrRetryable) } + +func (b batch) blobsNeeded() int { + return b.bs.blobsNeeded() +} + +func (b batch) blobResponseValidator() sync.BlobResponseValidation { + return b.bs.validateNext +} + +func (b batch) availabilityStore() das.AvailabilityStore { + return b.bs.store +} + +func sortBatchDesc(bb []batch) { + sort.Slice(bb, func(i, j int) bool { + return bb[j].end < bb[i].end + }) +} diff --git a/beacon-chain/sync/backfill/batch_test.go b/beacon-chain/sync/backfill/batch_test.go new file mode 100644 index 000000000..ecfa94f95 --- /dev/null +++ b/beacon-chain/sync/backfill/batch_test.go @@ -0,0 +1,21 @@ +package backfill + +import ( + "testing" + + "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" + "github.com/prysmaticlabs/prysm/v4/testing/require" +) + +func TestSortBatchDesc(t *testing.T) { + orderIn := []primitives.Slot{100, 10000, 1} + orderOut := []primitives.Slot{10000, 100, 1} + batches := make([]batch, len(orderIn)) + for i := range orderIn { + batches[i] = batch{end: orderIn[i]} + } + sortBatchDesc(batches) + for i := range orderOut { + require.Equal(t, orderOut[i], batches[i].end) + } +} diff --git a/beacon-chain/sync/backfill/blobs.go b/beacon-chain/sync/backfill/blobs.go new file mode 100644 index 000000000..18a7b1ada --- /dev/null +++ b/beacon-chain/sync/backfill/blobs.go @@ -0,0 +1,141 @@ +package backfill + +import ( + "bytes" + "context" + + "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/das" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/filesystem" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/verification" + fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams" + "github.com/prysmaticlabs/prysm/v4/consensus-types/blocks" + "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" + "github.com/prysmaticlabs/prysm/v4/encoding/bytesutil" +) + +var ( + errUnexpectedResponseSize = errors.New("received more blobs than expected for the requested range") + errUnexpectedCommitment = errors.New("BlobSidecar commitment does not match block") + errUnexpectedResponseContent = errors.New("BlobSidecar response does not include expected values in expected order") + errBatchVerifierMismatch = errors.New("the list of blocks passed to the availability check does not match what was verified") +) + +type blobSummary struct { + blockRoot [32]byte + index uint64 + commitment [48]byte + signature [fieldparams.BLSSignatureLength]byte +} + +type blobSyncConfig struct { + retentionStart primitives.Slot + nbv verification.NewBlobVerifier + store *filesystem.BlobStorage +} + +func newBlobSync(current primitives.Slot, vbs verifiedROBlocks, cfg *blobSyncConfig) (*blobSync, error) { + expected, err := vbs.blobIdents(cfg.retentionStart) + if err != nil { + return nil, err + } + bbv := newBlobBatchVerifier(cfg.nbv) + as := das.NewLazilyPersistentStore(cfg.store, bbv) + return &blobSync{current: current, expected: expected, bbv: bbv, store: as}, nil +} + +type blobVerifierMap map[[32]byte][fieldparams.MaxBlobsPerBlock]verification.BlobVerifier + +type blobSync struct { + store das.AvailabilityStore + expected []blobSummary + next int + bbv *blobBatchVerifier + current primitives.Slot +} + +func (bs *blobSync) blobsNeeded() int { + return len(bs.expected) - bs.next +} + +func (bs *blobSync) validateNext(rb blocks.ROBlob) error { + if bs.next >= len(bs.expected) { + return errUnexpectedResponseSize + } + next := bs.expected[bs.next] + bs.next += 1 + // Get the super cheap verifications out of the way before we init a verifier. + if next.blockRoot != rb.BlockRoot() { + return errors.Wrapf(errUnexpectedResponseContent, "next expected root=%#x, saw=%#x", next.blockRoot, rb.BlockRoot()) + } + if next.index != rb.Index { + return errors.Wrapf(errUnexpectedResponseContent, "next expected root=%#x, saw=%#x for root=%#x", next.index, rb.Index, next.blockRoot) + } + if next.commitment != bytesutil.ToBytes48(rb.KzgCommitment) { + return errors.Wrapf(errUnexpectedResponseContent, "next expected commitment=%#x, saw=%#x for root=%#x", next.commitment, rb.KzgCommitment, rb.BlockRoot()) + } + + if bytesutil.ToBytes96(rb.SignedBlockHeader.Signature) != next.signature { + return verification.ErrInvalidProposerSignature + } + v := bs.bbv.newVerifier(rb) + if err := v.BlobIndexInBounds(); err != nil { + return err + } + v.SatisfyRequirement(verification.RequireValidProposerSignature) + if err := v.SidecarInclusionProven(); err != nil { + return err + } + if err := v.SidecarKzgProofVerified(); err != nil { + return err + } + if err := bs.store.Persist(bs.current, rb); err != nil { + return err + } + + return nil +} + +func newBlobBatchVerifier(nbv verification.NewBlobVerifier) *blobBatchVerifier { + return &blobBatchVerifier{newBlobVerifier: nbv, verifiers: make(blobVerifierMap)} +} + +type blobBatchVerifier struct { + newBlobVerifier verification.NewBlobVerifier + verifiers blobVerifierMap +} + +func (bbv *blobBatchVerifier) newVerifier(rb blocks.ROBlob) verification.BlobVerifier { + m := bbv.verifiers[rb.BlockRoot()] + m[rb.Index] = bbv.newBlobVerifier(rb, verification.BackfillSidecarRequirements) + bbv.verifiers[rb.BlockRoot()] = m + return m[rb.Index] +} + +func (bbv blobBatchVerifier) VerifiedROBlobs(_ context.Context, blk blocks.ROBlock, _ []blocks.ROBlob) ([]blocks.VerifiedROBlob, error) { + m, ok := bbv.verifiers[blk.Root()] + if !ok { + return nil, errors.Wrapf(verification.ErrMissingVerification, "no record of verifiers for root %#x", blk.Root()) + } + c, err := blk.Block().Body().BlobKzgCommitments() + if err != nil { + return nil, errors.Wrapf(errUnexpectedCommitment, "error reading commitments from block root %#x", blk.Root()) + } + vbs := make([]blocks.VerifiedROBlob, len(c)) + for i := range c { + if m[i] == nil { + return nil, errors.Wrapf(errBatchVerifierMismatch, "do not have verifier for block root %#x idx %d", blk.Root(), i) + } + vb, err := m[i].VerifiedROBlob() + if err != nil { + return nil, err + } + if !bytes.Equal(vb.KzgCommitment, c[i]) { + return nil, errors.Wrapf(errBatchVerifierMismatch, "commitments do not match, verified=%#x da check=%#x for root %#x", vb.KzgCommitment, c[i], vb.BlockRoot()) + } + vbs[i] = vb + } + return vbs, nil +} + +var _ das.BlobBatchVerifier = &blobBatchVerifier{} diff --git a/beacon-chain/sync/backfill/blobs_test.go b/beacon-chain/sync/backfill/blobs_test.go new file mode 100644 index 000000000..90967efee --- /dev/null +++ b/beacon-chain/sync/backfill/blobs_test.go @@ -0,0 +1,128 @@ +package backfill + +import ( + "testing" + + "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/filesystem" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/verification" + "github.com/prysmaticlabs/prysm/v4/consensus-types/blocks" + "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" + "github.com/prysmaticlabs/prysm/v4/encoding/bytesutil" + "github.com/prysmaticlabs/prysm/v4/testing/require" + "github.com/prysmaticlabs/prysm/v4/testing/util" +) + +func testBlobGen(t *testing.T, start primitives.Slot, n int) ([]blocks.ROBlock, [][]blocks.ROBlob) { + blks := make([]blocks.ROBlock, n) + blobs := make([][]blocks.ROBlob, n) + for i := 0; i < n; i++ { + bk, bl := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, start+primitives.Slot(i), 3) + blks[i] = bk + blobs[i] = bl + } + return blks, blobs +} + +func TestValidateNext_happy(t *testing.T) { + current := primitives.Slot(128) + blks, blobs := testBlobGen(t, 63, 4) + cfg := &blobSyncConfig{ + retentionStart: 0, + nbv: testNewBlobVerifier(), + store: filesystem.NewEphemeralBlobStorage(t), + } + bsync, err := newBlobSync(current, blks, cfg) + require.NoError(t, err) + nb := 0 + for i := range blobs { + bs := blobs[i] + for ib := range bs { + require.NoError(t, bsync.validateNext(bs[ib])) + nb += 1 + } + } + require.Equal(t, nb, bsync.next) + // we should get an error if we read another blob. + require.ErrorIs(t, bsync.validateNext(blobs[0][0]), errUnexpectedResponseSize) +} + +func TestValidateNext_cheapErrors(t *testing.T) { + current := primitives.Slot(128) + blks, blobs := testBlobGen(t, 63, 2) + cfg := &blobSyncConfig{ + retentionStart: 0, + nbv: testNewBlobVerifier(), + store: filesystem.NewEphemeralBlobStorage(t), + } + bsync, err := newBlobSync(current, blks, cfg) + require.NoError(t, err) + require.ErrorIs(t, bsync.validateNext(blobs[len(blobs)-1][0]), errUnexpectedResponseContent) +} + +func TestValidateNext_sigMatch(t *testing.T) { + current := primitives.Slot(128) + blks, blobs := testBlobGen(t, 63, 1) + cfg := &blobSyncConfig{ + retentionStart: 0, + nbv: testNewBlobVerifier(), + store: filesystem.NewEphemeralBlobStorage(t), + } + bsync, err := newBlobSync(current, blks, cfg) + require.NoError(t, err) + blobs[0][0].SignedBlockHeader.Signature = bytesutil.PadTo([]byte("derp"), 48) + require.ErrorIs(t, bsync.validateNext(blobs[0][0]), verification.ErrInvalidProposerSignature) +} + +func TestValidateNext_errorsFromVerifier(t *testing.T) { + current := primitives.Slot(128) + blks, blobs := testBlobGen(t, 63, 1) + cases := []struct { + name string + err error + cb func(*verification.MockBlobVerifier) + }{ + { + name: "index oob", + err: verification.ErrBlobIndexInvalid, + cb: func(v *verification.MockBlobVerifier) { + v.ErrBlobIndexInBounds = verification.ErrBlobIndexInvalid + }, + }, + { + name: "not inclusion proven", + err: verification.ErrSidecarInclusionProofInvalid, + cb: func(v *verification.MockBlobVerifier) { + v.ErrSidecarInclusionProven = verification.ErrSidecarInclusionProofInvalid + }, + }, + { + name: "not kzg proof valid", + err: verification.ErrSidecarKzgProofInvalid, + cb: func(v *verification.MockBlobVerifier) { + v.ErrSidecarKzgProofVerified = verification.ErrSidecarKzgProofInvalid + }, + }, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + cfg := &blobSyncConfig{ + retentionStart: 0, + nbv: testNewBlobVerifier(c.cb), + store: filesystem.NewEphemeralBlobStorage(t), + } + bsync, err := newBlobSync(current, blks, cfg) + require.NoError(t, err) + require.ErrorIs(t, bsync.validateNext(blobs[0][0]), c.err) + }) + } +} + +func testNewBlobVerifier(opts ...func(*verification.MockBlobVerifier)) verification.NewBlobVerifier { + return func(b blocks.ROBlob, reqs []verification.Requirement) verification.BlobVerifier { + v := &verification.MockBlobVerifier{} + for i := range opts { + opts[i](v) + } + return v + } +} diff --git a/beacon-chain/sync/backfill/metrics.go b/beacon-chain/sync/backfill/metrics.go index 97ecf4b2e..bbbb5a0bf 100644 --- a/beacon-chain/sync/backfill/metrics.go +++ b/beacon-chain/sync/backfill/metrics.go @@ -3,6 +3,9 @@ package backfill import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/sync" + "github.com/prysmaticlabs/prysm/v4/consensus-types/blocks" + "github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces" ) var ( @@ -30,10 +33,28 @@ var ( Help: "Number of backfill batches downloaded and imported.", }, ) - backfillBatchApproximateBytes = promauto.NewCounter( + backfillBlocksApproximateBytes = promauto.NewCounter( prometheus.CounterOpts{ - Name: "backfill_batch_bytes_downloaded", - Help: "Count of bytes downloaded from peers", + Name: "backfill_blocks_bytes_downloaded", + Help: "BeaconBlock bytes downloaded from peers for backfill.", + }, + ) + backfillBlobsApproximateBytes = promauto.NewCounter( + prometheus.CounterOpts{ + Name: "backfill_blobs_bytes_downloaded", + Help: "BlobSidecar bytes downloaded from peers for backfill.", + }, + ) + backfillBlobsDownloadCount = promauto.NewCounter( + prometheus.CounterOpts{ + Name: "backfill_blobs_download_count", + Help: "Number of BlobSidecar values downloaded from peers for backfill.", + }, + ) + backfillBlocksDownloadCount = promauto.NewCounter( + prometheus.CounterOpts{ + Name: "backfill_blocks_download_count", + Help: "Number of BeaconBlock values downloaded from peers for backfill.", }, ) backfillBatchTimeRoundtrip = promauto.NewHistogram( @@ -50,10 +71,17 @@ var ( Buckets: []float64{50, 100, 300, 1000, 2000}, }, ) - backfillBatchTimeDownloading = promauto.NewHistogram( + backfillBatchTimeDownloadingBlocks = promauto.NewHistogram( prometheus.HistogramOpts{ - Name: "backfill_batch_time_download", - Help: "Time batch spent downloading blocks from peer.", + Name: "backfill_batch_blocks_time_download", + Help: "Time, in milliseconds, batch spent downloading blocks from peer.", + Buckets: []float64{100, 300, 1000, 2000, 4000, 8000}, + }, + ) + backfillBatchTimeDownloadingBlobs = promauto.NewHistogram( + prometheus.HistogramOpts{ + Name: "backfill_batch_blobs_time_download", + Help: "Time, in milliseconds, batch spent downloading blobs from peer.", Buckets: []float64{100, 300, 1000, 2000, 4000, 8000}, }, ) @@ -65,3 +93,16 @@ var ( }, ) ) + +func blobValidationMetrics(_ blocks.ROBlob) error { + backfillBlobsDownloadCount.Inc() + return nil +} + +func blockValidationMetrics(interfaces.ReadOnlySignedBeaconBlock) error { + backfillBlocksDownloadCount.Inc() + return nil +} + +var _ sync.BlobResponseValidation = blobValidationMetrics +var _ sync.BeaconBlockProcessor = blockValidationMetrics diff --git a/beacon-chain/sync/backfill/pool.go b/beacon-chain/sync/backfill/pool.go index 1024895e7..e4121458d 100644 --- a/beacon-chain/sync/backfill/pool.go +++ b/beacon-chain/sync/backfill/pool.go @@ -7,15 +7,18 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/filesystem" "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p" "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/peers" "github.com/prysmaticlabs/prysm/v4/beacon-chain/startup" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/sync" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/verification" "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" log "github.com/sirupsen/logrus" ) type batchWorkerPool interface { - spawn(ctx context.Context, n int, clock *startup.Clock, a PeerAssigner, v *verifier) + spawn(ctx context.Context, n int, clock *startup.Clock, a PeerAssigner, v *verifier, cm sync.ContextByteVersions, blobVerifier verification.NewBlobVerifier, bfs *filesystem.BlobStorage) todo(b batch) complete() (batch, error) } @@ -24,11 +27,11 @@ type worker interface { run(context.Context) } -type newWorker func(id workerId, in, out chan batch, c *startup.Clock, v *verifier) worker +type newWorker func(id workerId, in, out chan batch, c *startup.Clock, v *verifier, cm sync.ContextByteVersions, nbv verification.NewBlobVerifier, bfs *filesystem.BlobStorage) worker func defaultNewWorker(p p2p.P2P) newWorker { - return func(id workerId, in, out chan batch, c *startup.Clock, v *verifier) worker { - return newP2pWorker(id, p, in, out, c, v) + return func(id workerId, in, out chan batch, c *startup.Clock, v *verifier, cm sync.ContextByteVersions, nbv verification.NewBlobVerifier, bfs *filesystem.BlobStorage) worker { + return newP2pWorker(id, p, in, out, c, v, cm, nbv, bfs) } } @@ -60,11 +63,11 @@ func newP2PBatchWorkerPool(p p2p.P2P, maxBatches int) *p2pBatchWorkerPool { } } -func (p *p2pBatchWorkerPool) spawn(ctx context.Context, n int, c *startup.Clock, a PeerAssigner, v *verifier) { +func (p *p2pBatchWorkerPool) spawn(ctx context.Context, n int, c *startup.Clock, a PeerAssigner, v *verifier, cm sync.ContextByteVersions, nbv verification.NewBlobVerifier, bfs *filesystem.BlobStorage) { p.ctx, p.cancel = context.WithCancel(ctx) go p.batchRouter(a) for i := 0; i < n; i++ { - go p.newWorker(workerId(i), p.toWorkers, p.fromWorkers, c, v).run(p.ctx) + go p.newWorker(workerId(i), p.toWorkers, p.fromWorkers, c, v, cm, nbv, bfs).run(p.ctx) } } @@ -106,14 +109,21 @@ func (p *p2pBatchWorkerPool) batchRouter(pa PeerAssigner) { select { case b := <-p.toRouter: todo = append(todo, b) + // sort batches in descending order so that we'll always process the dependent batches first + sortBatchDesc(todo) case <-rt.C: // Worker assignments can fail if assignBatch can't find a suitable peer. // This ticker exists to periodically break out of the channel select // to retry failed assignments. case b := <-p.fromWorkers: - pid := b.pid + pid := b.busy busy[pid] = false - p.fromRouter <- b + if b.state == batchBlobSync { + todo = append(todo, b) + sortBatchDesc(todo) + } else { + p.fromRouter <- b + } case <-p.ctx.Done(): log.WithError(p.ctx.Err()).Info("p2pBatchWorkerPool context canceled, shutting down") p.shutdown(p.ctx.Err()) @@ -135,7 +145,7 @@ func (p *p2pBatchWorkerPool) batchRouter(pa PeerAssigner) { } for _, pid := range assigned { busy[pid] = true - todo[0].pid = pid + todo[0].busy = pid p.toWorkers <- todo[0].withPeer(pid) if todo[0].begin < earliest { earliest = todo[0].begin diff --git a/beacon-chain/sync/backfill/pool_test.go b/beacon-chain/sync/backfill/pool_test.go index efcfef2bc..bd20b5701 100644 --- a/beacon-chain/sync/backfill/pool_test.go +++ b/beacon-chain/sync/backfill/pool_test.go @@ -6,8 +6,13 @@ import ( "time" "github.com/libp2p/go-libp2p/core/peer" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/filesystem" p2ptest "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/testing" "github.com/prysmaticlabs/prysm/v4/beacon-chain/startup" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/sync" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/verification" + "github.com/prysmaticlabs/prysm/v4/consensus-types/blocks" + "github.com/prysmaticlabs/prysm/v4/encoding/bytesutil" "github.com/prysmaticlabs/prysm/v4/testing/require" "github.com/prysmaticlabs/prysm/v4/testing/util" ) @@ -28,6 +33,10 @@ func (m mockAssigner) Assign(busy map[peer.ID]bool, n int) ([]peer.ID, error) { var _ PeerAssigner = &mockAssigner{} +func mockNewBlobVerifier(_ blocks.ROBlob, _ []verification.Requirement) verification.BlobVerifier { + return &verification.MockBlobVerifier{} +} + func TestPoolDetectAllEnded(t *testing.T) { nw := 5 p2p := p2ptest.NewTestP2P(t) @@ -40,7 +49,11 @@ func TestPoolDetectAllEnded(t *testing.T) { require.NoError(t, err) v, err := newBackfillVerifier(st.GenesisValidatorsRoot(), keys) require.NoError(t, err) - pool.spawn(ctx, nw, startup.NewClock(time.Now(), [32]byte{}), ma, v) + + ctxMap, err := sync.ContextByteVersionsForValRoot(bytesutil.ToBytes32(st.GenesisValidatorsRoot())) + require.NoError(t, err) + bfs := filesystem.NewEphemeralBlobStorage(t) + pool.spawn(ctx, nw, startup.NewClock(time.Now(), [32]byte{}), ma, v, ctxMap, mockNewBlobVerifier, bfs) br := batcher{min: 10, size: 10} endSeq := br.before(0) require.Equal(t, batchEndSequence, endSeq.state) @@ -59,7 +72,7 @@ type mockPool struct { todoChan chan batch } -func (m *mockPool) spawn(_ context.Context, _ int, _ *startup.Clock, _ PeerAssigner, _ *verifier) { +func (m *mockPool) spawn(_ context.Context, _ int, _ *startup.Clock, _ PeerAssigner, _ *verifier, _ sync.ContextByteVersions, _ verification.NewBlobVerifier, _ *filesystem.BlobStorage) { } func (m *mockPool) todo(b batch) { diff --git a/beacon-chain/sync/backfill/service.go b/beacon-chain/sync/backfill/service.go index 41ed8a1a8..e1de7f200 100644 --- a/beacon-chain/sync/backfill/service.go +++ b/beacon-chain/sync/backfill/service.go @@ -6,8 +6,12 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/helpers" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/filesystem" "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p" "github.com/prysmaticlabs/prysm/v4/beacon-chain/startup" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/sync" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/verification" + "github.com/prysmaticlabs/prysm/v4/consensus-types/blocks" "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" "github.com/prysmaticlabs/prysm/v4/encoding/bytesutil" "github.com/prysmaticlabs/prysm/v4/proto/dbval" @@ -17,78 +21,39 @@ import ( ) type Service struct { - ctx context.Context - store *Store - ms minimumSlotter - cw startup.ClockWaiter - enabled bool // service is disabled by default while feature is experimental - nWorkers int - batchSeq *batchSequencer - batchSize uint64 - pool batchWorkerPool - verifier *verifier - p2p p2p.P2P - pa PeerAssigner - batchImporter batchImporter + ctx context.Context + enabled bool // service is disabled by default while feature is experimental + clock *startup.Clock + store *Store + ms minimumSlotter + cw startup.ClockWaiter + verifierWaiter InitializerWaiter + newBlobVerifier verification.NewBlobVerifier + nWorkers int + batchSeq *batchSequencer + batchSize uint64 + pool batchWorkerPool + verifier *verifier + ctxMap sync.ContextByteVersions + p2p p2p.P2P + pa PeerAssigner + batchImporter batchImporter + blobStore *filesystem.BlobStorage } var _ runtime.Service = (*Service)(nil) -type ServiceOption func(*Service) error - -func WithEnableBackfill(enabled bool) ServiceOption { - return func(s *Service) error { - s.enabled = enabled - return nil - } +// PeerAssigner describes a type that provides an Assign method, which can assign the best peer +// to service an RPC blockRequest. The Assign method takes a map of peers that should be excluded, +// allowing the caller to avoid making multiple concurrent requests to the same peer. +type PeerAssigner interface { + Assign(busy map[peer.ID]bool, n int) ([]peer.ID, error) } -func WithWorkerCount(n int) ServiceOption { - return func(s *Service) error { - s.nWorkers = n - return nil - } -} +type minimumSlotter func(primitives.Slot) primitives.Slot +type batchImporter func(ctx context.Context, current primitives.Slot, b batch, su *Store) (*dbval.BackfillStatus, error) -func WithBatchSize(n uint64) ServiceOption { - return func(s *Service) error { - s.batchSize = n - return nil - } -} - -type minimumSlotter interface { - minimumSlot() primitives.Slot - setClock(*startup.Clock) -} - -type defaultMinimumSlotter struct { - clock *startup.Clock - cw startup.ClockWaiter - ctx context.Context -} - -func (d defaultMinimumSlotter) minimumSlot() primitives.Slot { - if d.clock == nil { - var err error - d.clock, err = d.cw.WaitForClock(d.ctx) - if err != nil { - log.WithError(err).Fatal("failed to obtain system/genesis clock, unable to start backfill service") - } - } - return minimumBackfillSlot(d.clock.CurrentSlot()) -} - -func (d defaultMinimumSlotter) setClock(c *startup.Clock) { - //nolint:all - d.clock = c -} - -var _ minimumSlotter = &defaultMinimumSlotter{} - -type batchImporter func(ctx context.Context, b batch, su *Store) (*dbval.BackfillStatus, error) - -func defaultBatchImporter(ctx context.Context, b batch, su *Store) (*dbval.BackfillStatus, error) { +func defaultBatchImporter(ctx context.Context, current primitives.Slot, b batch, su *Store) (*dbval.BackfillStatus, error) { status := su.status() if err := b.ensureParent(bytesutil.ToBytes32(status.LowParentRoot)); err != nil { return status, err @@ -96,28 +61,63 @@ func defaultBatchImporter(ctx context.Context, b batch, su *Store) (*dbval.Backf // Import blocks to db and update db state to reflect the newly imported blocks. // Other parts of the beacon node may use the same StatusUpdater instance // via the coverage.AvailableBlocker interface to safely determine if a given slot has been backfilled. - status, err := su.fillBack(ctx, b.results) - if err != nil { - log.WithError(err).Fatal("Non-recoverable db error in backfill service, quitting.") - } - return status, nil + return su.fillBack(ctx, current, b.results, b.availabilityStore()) } -// PeerAssigner describes a type that provides an Assign method, which can assign the best peer -// to service an RPC request. The Assign method takes a map of peers that should be excluded, -// allowing the caller to avoid making multiple concurrent requests to the same peer. -type PeerAssigner interface { - Assign(busy map[peer.ID]bool, n int) ([]peer.ID, error) +// ServiceOption represents a functional option for the backfill service constructor. +type ServiceOption func(*Service) error + +// WithEnableBackfill toggles the entire backfill service on or off, intended to be used by a feature flag. +func WithEnableBackfill(enabled bool) ServiceOption { + return func(s *Service) error { + s.enabled = enabled + return nil + } +} + +// WithWorkerCount sets the number of goroutines in the batch processing pool that can concurrently +// make p2p requests to download data for batches. +func WithWorkerCount(n int) ServiceOption { + return func(s *Service) error { + s.nWorkers = n + return nil + } +} + +// WithBatchSize configures the size of backfill batches, similar to the initial-sync block-batch-limit flag. +// It should usually be left at the default value. +func WithBatchSize(n uint64) ServiceOption { + return func(s *Service) error { + s.batchSize = n + return nil + } +} + +// InitializerWaiter is an interface that is satisfied by verification.InitializerWaiter. +// Using this interface enables node init to satisfy this requirement for the backfill service +// while also allowing backfill to mock it in tests. +type InitializerWaiter interface { + WaitForInitializer(ctx context.Context) (*verification.Initializer, error) +} + +// WithVerifierWaiter sets the verification.InitializerWaiter +// for the backfill Service. +func WithVerifierWaiter(viw InitializerWaiter) ServiceOption { + return func(s *Service) error { + s.verifierWaiter = viw + return nil + } } // NewService initializes the backfill Service. Like all implementations of the Service interface, // the service won't begin its runloop until Start() is called. -func NewService(ctx context.Context, su *Store, cw startup.ClockWaiter, p p2p.P2P, pa PeerAssigner, opts ...ServiceOption) (*Service, error) { +func NewService(ctx context.Context, su *Store, bStore *filesystem.BlobStorage, cw startup.ClockWaiter, p p2p.P2P, pa PeerAssigner, opts ...ServiceOption) (*Service, error) { s := &Service{ ctx: ctx, store: su, + blobStore: bStore, cw: cw, - ms: &defaultMinimumSlotter{cw: cw, ctx: ctx}, + ms: minimumBackfillSlot, p2p: p, pa: pa, batchImporter: defaultBatchImporter, @@ -132,26 +132,33 @@ func NewService(ctx context.Context, su *Store, cw startup.ClockWaiter, p p2p.P2 return s, nil } -func (s *Service) initVerifier(ctx context.Context) (*verifier, error) { +func (s *Service) initVerifier(ctx context.Context) (*verifier, sync.ContextByteVersions, error) { cps, err := s.store.originState(ctx) if err != nil { - return nil, err + return nil, nil, err } keys, err := cps.PublicKeys() if err != nil { - return nil, errors.Wrap(err, "Unable to retrieve public keys for all validators in the origin state") + return nil, nil, errors.Wrap(err, "Unable to retrieve public keys for all validators in the origin state") } - return newBackfillVerifier(cps.GenesisValidatorsRoot(), keys) + vr := cps.GenesisValidatorsRoot() + ctxMap, err := sync.ContextByteVersionsForValRoot(bytesutil.ToBytes32(vr)) + if err != nil { + return nil, nil, errors.Wrapf(err, "unable to initialize context version map using genesis validator root = %#x", vr) + } + v, err := newBackfillVerifier(vr, keys) + return v, ctxMap, err } func (s *Service) updateComplete() bool { b, err := s.pool.complete() if err != nil { if errors.Is(err, errEndSequence) { - log.WithField("backfill_slot", b.begin).Info("Backfill is complete") + log.WithField("backfill_slot", b.begin).Info("Backfill is complete.") return true } - log.WithError(err).Fatal("Non-recoverable error in backfill service, quitting.") + log.WithError(err).Error("Backfill service received unhandled error from worker pool.") + return true } s.batchSeq.update(b) return false @@ -166,12 +173,13 @@ func (s *Service) importBatches(ctx context.Context) { } backfillBatchesImported.Add(float64(imported)) }() + current := s.clock.CurrentSlot() for i := range importable { ib := importable[i] if len(ib.results) == 0 { log.WithFields(ib.logFields()).Error("Batch with no results, skipping importer.") } - _, err := s.batchImporter(ctx, ib, s.store) + _, err := s.batchImporter(ctx, current, ib, s.store) if err != nil { log.WithError(err).WithFields(ib.logFields()).Debug("Backfill batch failed to import.") s.downscore(ib) @@ -214,40 +222,51 @@ func (s *Service) scheduleTodos() { // Start begins the runloop of backfill.Service in the current goroutine. func (s *Service) Start() { if !s.enabled { - log.Info("Exiting backfill service; not enabled.") + log.Info("Backfill service not enabled.") return } ctx, cancel := context.WithCancel(s.ctx) defer func() { + log.Info("Backfill service is shutting down.") cancel() }() clock, err := s.cw.WaitForClock(ctx) if err != nil { - log.WithError(err).Fatal("Backfill service failed to start while waiting for genesis data.") + log.WithError(err).Error("Backfill service failed to start while waiting for genesis data.") + return + } + s.clock = clock + v, err := s.verifierWaiter.WaitForInitializer(ctx) + s.newBlobVerifier = newBlobVerifierFromInitializer(v) + + if err != nil { + log.WithError(err).Error("Could not initialize blob verifier in backfill service.") + return } - s.ms.setClock(clock) if s.store.isGenesisSync() { - log.Info("Exiting backfill service as the node has been initialized with a genesis state or the backfill status is missing") + log.Info("Backfill short-circuit; node synced from genesis.") return } status := s.store.status() // Exit early if there aren't going to be any batches to backfill. - if primitives.Slot(status.LowSlot) <= s.ms.minimumSlot() { - log.WithField("minimum_required_slot", s.ms.minimumSlot()). + if primitives.Slot(status.LowSlot) <= s.ms(s.clock.CurrentSlot()) { + log.WithField("minimum_required_slot", s.ms(s.clock.CurrentSlot())). WithField("backfill_lowest_slot", status.LowSlot). Info("Exiting backfill service; minimum block retention slot > lowest backfilled block.") return } - s.verifier, err = s.initVerifier(ctx) + s.verifier, s.ctxMap, err = s.initVerifier(ctx) if err != nil { - log.WithError(err).Fatal("Unable to initialize backfill verifier, quitting.") + log.WithError(err).Error("Unable to initialize backfill verifier.") + return } - s.pool.spawn(ctx, s.nWorkers, clock, s.pa, s.verifier) + s.pool.spawn(ctx, s.nWorkers, clock, s.pa, s.verifier, s.ctxMap, s.newBlobVerifier, s.blobStore) - s.batchSeq = newBatchSequencer(s.nWorkers, s.ms.minimumSlot(), primitives.Slot(status.LowSlot), primitives.Slot(s.batchSize)) + s.batchSeq = newBatchSequencer(s.nWorkers, s.ms(s.clock.CurrentSlot()), primitives.Slot(status.LowSlot), primitives.Slot(s.batchSize)) if err = s.initBatches(); err != nil { - log.WithError(err).Fatal("Non-recoverable error in backfill service, quitting.") + log.WithError(err).Error("Non-recoverable error in backfill service.") + return } for { @@ -259,8 +278,8 @@ func (s *Service) Start() { } s.importBatches(ctx) batchesWaiting.Set(float64(s.batchSeq.countWithState(batchImportable))) - if err := s.batchSeq.moveMinimum(s.ms.minimumSlot()); err != nil { - log.WithError(err).Fatal("Non-recoverable error in backfill service, quitting.") + if err := s.batchSeq.moveMinimum(s.ms(s.clock.CurrentSlot())); err != nil { + log.WithError(err).Error("Non-recoverable error while adjusting backfill minimum slot.") } s.scheduleTodos() } @@ -278,7 +297,7 @@ func (s *Service) initBatches() error { } func (s *Service) downscore(b batch) { - s.p2p.Peers().Scorers().BadResponsesScorer().Increment(b.pid) + s.p2p.Peers().Scorers().BadResponsesScorer().Increment(b.blockPid) } func (s *Service) Stop() error { @@ -304,3 +323,9 @@ func minimumBackfillSlot(current primitives.Slot) primitives.Slot { } return current - offset } + +func newBlobVerifierFromInitializer(ini *verification.Initializer) verification.NewBlobVerifier { + return func(b blocks.ROBlob, reqs []verification.Requirement) verification.BlobVerifier { + return ini.NewBlobVerifier(b, reqs) + } +} diff --git a/beacon-chain/sync/backfill/service_test.go b/beacon-chain/sync/backfill/service_test.go index c723f28e1..5e9c76db7 100644 --- a/beacon-chain/sync/backfill/service_test.go +++ b/beacon-chain/sync/backfill/service_test.go @@ -6,9 +6,11 @@ import ( "time" "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/helpers" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/filesystem" p2ptest "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/testing" "github.com/prysmaticlabs/prysm/v4/beacon-chain/startup" "github.com/prysmaticlabs/prysm/v4/beacon-chain/state" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/verification" "github.com/prysmaticlabs/prysm/v4/config/params" "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" "github.com/prysmaticlabs/prysm/v4/proto/dbval" @@ -20,13 +22,15 @@ type mockMinimumSlotter struct { min primitives.Slot } -var _ minimumSlotter = &mockMinimumSlotter{} - -func (m mockMinimumSlotter) minimumSlot() primitives.Slot { +func (m mockMinimumSlotter) minimumSlot(_ primitives.Slot) primitives.Slot { return m.min } -func (m mockMinimumSlotter) setClock(*startup.Clock) { +type mockInitalizerWaiter struct { +} + +func (mi *mockInitalizerWaiter) WaitForInitializer(ctx context.Context) (*verification.Initializer, error) { + return &verification.Initializer{}, nil } func TestServiceInit(t *testing.T) { @@ -52,11 +56,13 @@ func TestServiceInit(t *testing.T) { require.NoError(t, cw.SetClock(startup.NewClock(time.Now(), [32]byte{}))) pool := &mockPool{todoChan: make(chan batch, nWorkers), finishedChan: make(chan batch, nWorkers)} p2pt := p2ptest.NewTestP2P(t) - srv, err := NewService(ctx, su, cw, p2pt, &mockAssigner{}, WithBatchSize(batchSize), WithWorkerCount(nWorkers), WithEnableBackfill(true)) + bfs := filesystem.NewEphemeralBlobStorage(t) + srv, err := NewService(ctx, su, bfs, cw, p2pt, &mockAssigner{}, + WithBatchSize(batchSize), WithWorkerCount(nWorkers), WithEnableBackfill(true), WithVerifierWaiter(&mockInitalizerWaiter{})) require.NoError(t, err) - srv.ms = mockMinimumSlotter{min: primitives.Slot(high - batchSize*uint64(nBatches))} + srv.ms = mockMinimumSlotter{min: primitives.Slot(high - batchSize*uint64(nBatches))}.minimumSlot srv.pool = pool - srv.batchImporter = func(context.Context, batch, *Store) (*dbval.BackfillStatus, error) { + srv.batchImporter = func(context.Context, primitives.Slot, batch, *Store) (*dbval.BackfillStatus, error) { return &dbval.BackfillStatus{}, nil } go srv.Start() diff --git a/beacon-chain/sync/backfill/status.go b/beacon-chain/sync/backfill/status.go index 2ec809caa..71460c41a 100644 --- a/beacon-chain/sync/backfill/status.go +++ b/beacon-chain/sync/backfill/status.go @@ -5,6 +5,7 @@ import ( "sync" "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/das" "github.com/prysmaticlabs/prysm/v4/beacon-chain/db" "github.com/prysmaticlabs/prysm/v4/beacon-chain/state" "github.com/prysmaticlabs/prysm/v4/consensus-types/blocks" @@ -73,7 +74,7 @@ func (s *Store) status() *dbval.BackfillStatus { // fillBack saves the slice of blocks and updates the BackfillStatus LowSlot/Root/ParentRoot tracker to the values // from the first block in the slice. This method assumes that the block slice has been fully validated and // sorted in slot order by the calling function. -func (s *Store) fillBack(ctx context.Context, blocks []blocks.ROBlock) (*dbval.BackfillStatus, error) { +func (s *Store) fillBack(ctx context.Context, current primitives.Slot, blocks []blocks.ROBlock, store das.AvailabilityStore) (*dbval.BackfillStatus, error) { status := s.status() if len(blocks) == 0 { return status, nil @@ -87,6 +88,12 @@ func (s *Store) fillBack(ctx context.Context, blocks []blocks.ROBlock) (*dbval.B status.LowParentRoot, highest.Root(), status.LowSlot, highest.Block().Slot()) } + for i := range blocks { + if err := store.IsDataAvailable(ctx, current, blocks[i]); err != nil { + return nil, err + } + } + if err := s.store.SaveROBlocks(ctx, blocks, false); err != nil { return nil, errors.Wrapf(err, "error saving backfill blocks") } diff --git a/beacon-chain/sync/backfill/status_test.go b/beacon-chain/sync/backfill/status_test.go index 0ee020827..7e7df6444 100644 --- a/beacon-chain/sync/backfill/status_test.go +++ b/beacon-chain/sync/backfill/status_test.go @@ -5,6 +5,7 @@ import ( "context" "testing" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/das" "github.com/prysmaticlabs/prysm/v4/beacon-chain/db" "github.com/prysmaticlabs/prysm/v4/beacon-chain/state" "github.com/prysmaticlabs/prysm/v4/consensus-types/blocks" @@ -138,7 +139,7 @@ func TestStatusUpdater_FillBack(t *testing.T) { require.NoError(t, err) s := &Store{bs: &dbval.BackfillStatus{LowSlot: 100, LowParentRoot: rob.RootSlice()}, store: mdb} require.Equal(t, false, s.AvailableBlock(95)) - _, err = s.fillBack(ctx, []blocks.ROBlock{rob}) + _, err = s.fillBack(ctx, 0, []blocks.ROBlock{rob}, &das.MockAvailabilityStore{}) require.NoError(t, err) require.Equal(t, true, s.AvailableBlock(95)) } diff --git a/beacon-chain/sync/backfill/verify.go b/beacon-chain/sync/backfill/verify.go index 1d3f78ef8..2e7bc42a9 100644 --- a/beacon-chain/sync/backfill/verify.go +++ b/beacon-chain/sync/backfill/verify.go @@ -9,7 +9,9 @@ import ( "github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces" "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" "github.com/prysmaticlabs/prysm/v4/crypto/bls" + "github.com/prysmaticlabs/prysm/v4/encoding/bytesutil" "github.com/prysmaticlabs/prysm/v4/network/forks" + "github.com/prysmaticlabs/prysm/v4/runtime/version" "github.com/prysmaticlabs/prysm/v4/time/slots" ) @@ -17,8 +19,37 @@ var errInvalidBatchChain = errors.New("parent_root of block does not match the p var errProposerIndexTooHigh = errors.New("proposer index not present in origin state") var errUnknownDomain = errors.New("runtime error looking up signing domain for fork") -// VerifiedROBlocks represents a slice of blocks that have passed signature verification. -type VerifiedROBlocks []blocks.ROBlock +// verifiedROBlocks represents a slice of blocks that have passed signature verification. +type verifiedROBlocks []blocks.ROBlock + +func (v verifiedROBlocks) blobIdents(retentionStart primitives.Slot) ([]blobSummary, error) { + // early return if the newest block is outside the retention window + if len(v) > 0 && v[len(v)-1].Block().Slot() < retentionStart { + return nil, nil + } + bs := make([]blobSummary, 0) + for i := range v { + if v[i].Block().Slot() < retentionStart { + continue + } + if v[i].Block().Version() < version.Deneb { + continue + } + c, err := v[i].Block().Body().BlobKzgCommitments() + if err != nil { + return nil, errors.Wrapf(err, "unexpected error checking commitments for block root %#x", v[i].Root()) + } + if len(c) == 0 { + continue + } + for ci := range c { + bs = append(bs, blobSummary{ + blockRoot: v[i].Root(), signature: v[i].Signature(), + index: uint64(ci), commitment: bytesutil.ToBytes48(c[ci])}) + } + } + return bs, nil +} type verifier struct { keys [][fieldparams.BLSPubkeyLength]byte @@ -27,7 +58,7 @@ type verifier struct { } // TODO: rewrite this to use ROBlock. -func (vr verifier) verify(blks []interfaces.ReadOnlySignedBeaconBlock) (VerifiedROBlocks, error) { +func (vr verifier) verify(blks []interfaces.ReadOnlySignedBeaconBlock) (verifiedROBlocks, error) { var err error result := make([]blocks.ROBlock, len(blks)) sigSet := bls.NewSet() diff --git a/beacon-chain/sync/backfill/worker.go b/beacon-chain/sync/backfill/worker.go index 954575c37..98c22f963 100644 --- a/beacon-chain/sync/backfill/worker.go +++ b/beacon-chain/sync/backfill/worker.go @@ -4,9 +4,12 @@ import ( "context" "time" + "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/filesystem" "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p" "github.com/prysmaticlabs/prysm/v4/beacon-chain/startup" "github.com/prysmaticlabs/prysm/v4/beacon-chain/sync" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/verification" log "github.com/sirupsen/logrus" ) @@ -19,6 +22,9 @@ type p2pWorker struct { p2p p2p.P2P v *verifier c *startup.Clock + cm sync.ContextByteVersions + nbv verification.NewBlobVerifier + bfs *filesystem.BlobStorage } func (w *p2pWorker) run(ctx context.Context) { @@ -26,7 +32,11 @@ func (w *p2pWorker) run(ctx context.Context) { select { case b := <-w.todo: log.WithFields(b.logFields()).WithField("backfill_worker", w.id).Debug("Backfill worker received batch.") - w.done <- w.handle(ctx, b) + if b.state == batchBlobSync { + w.done <- w.handleBlobs(ctx, b) + } else { + w.done <- w.handleBlocks(ctx, b) + } case <-ctx.Done(): log.WithField("backfill_worker", w.id).Info("Backfill worker exiting after context canceled.") return @@ -34,11 +44,17 @@ func (w *p2pWorker) run(ctx context.Context) { } } -func (w *p2pWorker) handle(ctx context.Context, b batch) batch { +func (w *p2pWorker) handleBlocks(ctx context.Context, b batch) batch { + cs := w.c.CurrentSlot() + blobRetentionStart, err := sync.BlobsByRangeMinStartSlot(cs) + if err != nil { + return b.withRetryableError(errors.Wrap(err, "configuration issue, could not compute minimum blob retention slot")) + } + b.blockPid = b.busy start := time.Now() - results, err := sync.SendBeaconBlocksByRangeRequest(ctx, w.c, w.p2p, b.pid, b.request(), nil) + results, err := sync.SendBeaconBlocksByRangeRequest(ctx, w.c, w.p2p, b.blockPid, b.blockRequest(), blockValidationMetrics) dlt := time.Now() - backfillBatchTimeDownloading.Observe(float64(dlt.Sub(start).Milliseconds())) + backfillBatchTimeDownloadingBlocks.Observe(float64(dlt.Sub(start).Milliseconds())) if err != nil { log.WithError(err).WithFields(b.logFields()).Debug("Batch requesting failed") return b.withRetryableError(err) @@ -56,13 +72,37 @@ func (w *p2pWorker) handle(ctx context.Context, b batch) batch { for i := range vb { bdl += vb[i].SizeSSZ() } - backfillBatchApproximateBytes.Add(float64(bdl)) - log.WithField("dlbytes", bdl).Debug("backfill batch bytes downloaded") - b.results = vb - return b.withState(batchImportable) + backfillBlocksApproximateBytes.Add(float64(bdl)) + log.WithFields(b.logFields()).WithField("dlbytes", bdl).Debug("backfill batch block bytes downloaded") + bs, err := newBlobSync(cs, vb, &blobSyncConfig{retentionStart: blobRetentionStart, nbv: w.nbv, store: w.bfs}) + if err != nil { + return b.withRetryableError(err) + } + return b.withResults(vb, bs) } -func newP2pWorker(id workerId, p p2p.P2P, todo, done chan batch, c *startup.Clock, v *verifier) *p2pWorker { +func (w *p2pWorker) handleBlobs(ctx context.Context, b batch) batch { + b.blobPid = b.busy + start := time.Now() + // we don't need to use the response for anything other than metrics, because blobResponseValidation + // adds each of them to a batch AvailabilityStore once it is checked. + blobs, err := sync.SendBlobsByRangeRequest(ctx, w.c, w.p2p, b.blobPid, w.cm, b.blobRequest(), b.blobResponseValidator(), blobValidationMetrics) + if err != nil { + b.bs = nil + return b.withRetryableError(err) + } + dlt := time.Now() + backfillBatchTimeDownloadingBlobs.Observe(float64(dlt.Sub(start).Milliseconds())) + if len(blobs) > 0 { + // All blobs are the same size, so we can compute 1 and use it for all in the batch. + sz := blobs[0].SizeSSZ() * len(blobs) + backfillBlobsApproximateBytes.Add(float64(sz)) + log.WithFields(b.logFields()).WithField("dlbytes", sz).Debug("backfill batch blob bytes downloaded") + } + return b.postBlobSync() +} + +func newP2pWorker(id workerId, p p2p.P2P, todo, done chan batch, c *startup.Clock, v *verifier, cm sync.ContextByteVersions, nbv verification.NewBlobVerifier, bfs *filesystem.BlobStorage) *p2pWorker { return &p2pWorker{ id: id, todo: todo, @@ -70,5 +110,8 @@ func newP2pWorker(id workerId, p p2p.P2P, todo, done chan batch, c *startup.Cloc p2p: p, v: v, c: c, + cm: cm, + nbv: nbv, + bfs: bfs, } } diff --git a/beacon-chain/sync/rpc_send_request.go b/beacon-chain/sync/rpc_send_request.go index 9d1e8e33c..37e15964e 100644 --- a/beacon-chain/sync/rpc_send_request.go +++ b/beacon-chain/sync/rpc_send_request.go @@ -155,7 +155,7 @@ func SendBeaconBlocksByRootRequest( return blocks, nil } -func SendBlobsByRangeRequest(ctx context.Context, tor blockchain.TemporalOracle, p2pApi p2p.SenderEncoder, pid peer.ID, ctxMap ContextByteVersions, req *pb.BlobSidecarsByRangeRequest) ([]blocks.ROBlob, error) { +func SendBlobsByRangeRequest(ctx context.Context, tor blockchain.TemporalOracle, p2pApi p2p.SenderEncoder, pid peer.ID, ctxMap ContextByteVersions, req *pb.BlobSidecarsByRangeRequest, bvs ...BlobResponseValidation) ([]blocks.ROBlob, error) { topic, err := p2p.TopicFromMessage(p2p.BlobSidecarsByRangeName, slots.ToEpoch(tor.CurrentSlot())) if err != nil { return nil, err @@ -175,8 +175,11 @@ func SendBlobsByRangeRequest(ctx context.Context, tor blockchain.TemporalOracle, if max > req.Count*fieldparams.MaxBlobsPerBlock { max = req.Count * fieldparams.MaxBlobsPerBlock } - blobVal := composeBlobValidations(blobValidatorFromRangeReq(req), newSequentialBlobValidator()) - return readChunkEncodedBlobs(stream, p2pApi.Encoding(), ctxMap, blobVal, max) + vfuncs := []BlobResponseValidation{blobValidatorFromRangeReq(req), newSequentialBlobValidator()} + if len(bvs) > 0 { + vfuncs = append(vfuncs, bvs...) + } + return readChunkEncodedBlobs(stream, p2pApi.Encoding(), ctxMap, composeBlobValidations(vfuncs...), max) } func SendBlobSidecarByRoot( @@ -205,9 +208,11 @@ func SendBlobSidecarByRoot( return readChunkEncodedBlobs(stream, p2pApi.Encoding(), ctxMap, blobValidatorFromRootReq(req), max) } -type blobResponseValidation func(blocks.ROBlob) error +// BlobResponseValidation represents a function that can validate aspects of a single unmarshaled blob +// that was received from a peer in response to an rpc request. +type BlobResponseValidation func(blocks.ROBlob) error -func composeBlobValidations(vf ...blobResponseValidation) blobResponseValidation { +func composeBlobValidations(vf ...BlobResponseValidation) BlobResponseValidation { return func(blob blocks.ROBlob) error { for i := range vf { if err := vf[i](blob); err != nil { @@ -264,14 +269,14 @@ func (sbv *seqBlobValid) nextValid(blob blocks.ROBlob) error { return nil } -func newSequentialBlobValidator() blobResponseValidation { +func newSequentialBlobValidator() BlobResponseValidation { sbv := &seqBlobValid{} return func(blob blocks.ROBlob) error { return sbv.nextValid(blob) } } -func blobValidatorFromRootReq(req *p2ptypes.BlobSidecarsByRootReq) blobResponseValidation { +func blobValidatorFromRootReq(req *p2ptypes.BlobSidecarsByRootReq) BlobResponseValidation { blobIds := make(map[[32]byte]map[uint64]bool) for _, sc := range *req { blockRoot := bytesutil.ToBytes32(sc.BlockRoot) @@ -293,7 +298,7 @@ func blobValidatorFromRootReq(req *p2ptypes.BlobSidecarsByRootReq) blobResponseV } } -func blobValidatorFromRangeReq(req *pb.BlobSidecarsByRangeRequest) blobResponseValidation { +func blobValidatorFromRangeReq(req *pb.BlobSidecarsByRangeRequest) BlobResponseValidation { end := req.StartSlot + primitives.Slot(req.Count) return func(sc blocks.ROBlob) error { if sc.Slot() < req.StartSlot || sc.Slot() >= end { @@ -303,7 +308,7 @@ func blobValidatorFromRangeReq(req *pb.BlobSidecarsByRangeRequest) blobResponseV } } -func readChunkEncodedBlobs(stream network.Stream, encoding encoder.NetworkEncoding, ctxMap ContextByteVersions, vf blobResponseValidation, max uint64) ([]blocks.ROBlob, error) { +func readChunkEncodedBlobs(stream network.Stream, encoding encoder.NetworkEncoding, ctxMap ContextByteVersions, vf BlobResponseValidation, max uint64) ([]blocks.ROBlob, error) { sidecars := make([]blocks.ROBlob, 0) // Attempt an extra read beyond max to check if the peer is violating the spec by // sending more than MAX_REQUEST_BLOB_SIDECARS, or more blobs than requested. @@ -327,7 +332,7 @@ func readChunkEncodedBlobs(stream network.Stream, encoding encoder.NetworkEncodi return sidecars, nil } -func readChunkedBlobSidecar(stream network.Stream, encoding encoder.NetworkEncoding, ctxMap ContextByteVersions, vf blobResponseValidation) (blocks.ROBlob, error) { +func readChunkedBlobSidecar(stream network.Stream, encoding encoder.NetworkEncoding, ctxMap ContextByteVersions, vf BlobResponseValidation) (blocks.ROBlob, error) { var b blocks.ROBlob pb := ðpb.BlobSidecar{} decode := encoding.DecodeWithMaxLength diff --git a/beacon-chain/verification/blob.go b/beacon-chain/verification/blob.go index dfd8f5acf..bd7a503ea 100644 --- a/beacon-chain/verification/blob.go +++ b/beacon-chain/verification/blob.go @@ -63,6 +63,9 @@ var InitsyncSidecarRequirements = []Requirement{ RequireSidecarInclusionProven, } +// BackfillSidecarRequirements is the same as InitsyncSidecarRequirements +var BackfillSidecarRequirements = InitsyncSidecarRequirements + var ( ErrBlobInvalid = errors.New("blob failed verification") // ErrBlobIndexInvalid means RequireBlobIndexInBounds failed. diff --git a/beacon-chain/verification/initializer.go b/beacon-chain/verification/initializer.go index 7abf842c8..2e91c9e3d 100644 --- a/beacon-chain/verification/initializer.go +++ b/beacon-chain/verification/initializer.go @@ -40,7 +40,7 @@ type sharedResources struct { // Initializer is used to create different Verifiers. // Verifiers require access to stateful data structures, like caches, -// and it is Initializer's job to provides access to those. +// and it is Initializer's job to provide access to those. type Initializer struct { shared *sharedResources }