diff --git a/beacon-chain/db/filesystem/blob.go b/beacon-chain/db/filesystem/blob.go index 5ff68addf..d34f36b08 100644 --- a/beacon-chain/db/filesystem/blob.go +++ b/beacon-chain/db/filesystem/blob.go @@ -24,6 +24,7 @@ var ( errIndexOutOfBounds = errors.New("blob index in file name >= MaxBlobsPerBlock") errEmptyBlobWritten = errors.New("zero bytes written to disk when saving blob sidecar") errSidecarEmptySSZData = errors.New("sidecar marshalled to an empty ssz byte slice") + errNoBasePath = errors.New("BlobStorage base path not specified in init") ) const ( @@ -36,14 +37,26 @@ const ( // BlobStorageOption is a functional option for configuring a BlobStorage. type BlobStorageOption func(*BlobStorage) error +// WithBasePath is a required option that sets the base path of blob storage. +func WithBasePath(base string) BlobStorageOption { + return func(b *BlobStorage) error { + b.base = base + return nil + } +} + // WithBlobRetentionEpochs is an option that changes the number of epochs blobs will be persisted. func WithBlobRetentionEpochs(e primitives.Epoch) BlobStorageOption { return func(b *BlobStorage) error { - pruner, err := newBlobPruner(b.fs, e) - if err != nil { - return err - } - b.pruner = pruner + b.retentionEpochs = e + return nil + } +} + +// WithSaveFsync is an option that causes Save to call fsync before renaming part files for improved durability. +func WithSaveFsync(fsync bool) BlobStorageOption { + return func(b *BlobStorage) error { + b.fsync = fsync return nil } } @@ -51,30 +64,36 @@ func WithBlobRetentionEpochs(e primitives.Epoch) BlobStorageOption { // NewBlobStorage creates a new instance of the BlobStorage object. Note that the implementation of BlobStorage may // attempt to hold a file lock to guarantee exclusive control of the blob storage directory, so this should only be // initialized once per beacon node. -func NewBlobStorage(base string, opts ...BlobStorageOption) (*BlobStorage, error) { - base = path.Clean(base) - if err := file.MkdirAll(base); err != nil { - return nil, fmt.Errorf("failed to create blob storage at %s: %w", base, err) - } - fs := afero.NewBasePathFs(afero.NewOsFs(), base) - b := &BlobStorage{ - fs: fs, - } +func NewBlobStorage(opts ...BlobStorageOption) (*BlobStorage, error) { + b := &BlobStorage{} for _, o := range opts { if err := o(b); err != nil { - return nil, fmt.Errorf("failed to create blob storage at %s: %w", base, err) + return nil, errors.Wrap(err, "failed to create blob storage") } } - if b.pruner == nil { - log.Warn("Initializing blob filesystem storage with pruning disabled") + if b.base == "" { + return nil, errNoBasePath } + b.base = path.Clean(b.base) + if err := file.MkdirAll(b.base); err != nil { + return nil, errors.Wrapf(err, "failed to create blob storage at %s", b.base) + } + b.fs = afero.NewBasePathFs(afero.NewOsFs(), b.base) + pruner, err := newBlobPruner(b.fs, b.retentionEpochs) + if err != nil { + return nil, err + } + b.pruner = pruner return b, nil } // BlobStorage is the concrete implementation of the filesystem backend for saving and retrieving BlobSidecars. type BlobStorage struct { - fs afero.Fs - pruner *blobPruner + base string + retentionEpochs primitives.Epoch + fsync bool + fs afero.Fs + pruner *blobPruner } // WarmCache runs the prune routine with an expiration of slot of 0, so nothing will be pruned, but the pruner's cache @@ -151,8 +170,13 @@ func (bs *BlobStorage) Save(sidecar blocks.VerifiedROBlob) error { } return errors.Wrap(err, "failed to write to partial file") } - err = partialFile.Close() - if err != nil { + if bs.fsync { + if err := partialFile.Sync(); err != nil { + return err + } + } + + if err := partialFile.Close(); err != nil { return err } diff --git a/beacon-chain/db/filesystem/blob_test.go b/beacon-chain/db/filesystem/blob_test.go index 9efe799d8..f70b84c88 100644 --- a/beacon-chain/db/filesystem/blob_test.go +++ b/beacon-chain/db/filesystem/blob_test.go @@ -108,7 +108,7 @@ func TestBlobStorage_SaveBlobData(t *testing.T) { // to be empty. This test ensures that several routines can safely save the same blob at the // same time. This isn't ideal behavior from the caller, but should be handled safely anyway. // See https://github.com/prysmaticlabs/prysm/pull/13648 - b, err := NewBlobStorage(t.TempDir()) + b, err := NewBlobStorage(WithBasePath(t.TempDir())) require.NoError(t, err) blob := testSidecars[0] @@ -268,6 +268,8 @@ func BenchmarkPruning(b *testing.B) { } func TestNewBlobStorage(t *testing.T) { - _, err := NewBlobStorage(path.Join(t.TempDir(), "good")) + _, err := NewBlobStorage() + require.ErrorIs(t, err, errNoBasePath) + _, err = NewBlobStorage(WithBasePath(path.Join(t.TempDir(), "good"))) require.NoError(t, err) } diff --git a/beacon-chain/node/node.go b/beacon-chain/node/node.go index 91beb5e84..86f84e174 100644 --- a/beacon-chain/node/node.go +++ b/beacon-chain/node/node.go @@ -118,6 +118,7 @@ type BeaconNode struct { BackfillOpts []backfill.ServiceOption initialSyncComplete chan struct{} BlobStorage *filesystem.BlobStorage + BlobStorageOptions []filesystem.BlobStorageOption blobRetentionEpochs primitives.Epoch verifyInitWaiter *verification.InitializerWaiter syncChecker *initialsync.SyncChecker @@ -209,6 +210,16 @@ func New(cliCtx *cli.Context, cancel context.CancelFunc, opts ...Option) (*Beaco return nil, err } + // Allow tests to set it as an opt. + if beacon.BlobStorage == nil { + beacon.BlobStorageOptions = append(beacon.BlobStorageOptions, filesystem.WithSaveFsync(features.Get().BlobSaveFsync)) + blobs, err := filesystem.NewBlobStorage(beacon.BlobStorageOptions...) + if err != nil { + return nil, err + } + beacon.BlobStorage = blobs + } + log.Debugln("Starting DB") if err := beacon.startDB(cliCtx, depositAddress); err != nil { return nil, err diff --git a/beacon-chain/node/options.go b/beacon-chain/node/options.go index 24672ec60..db482bd14 100644 --- a/beacon-chain/node/options.go +++ b/beacon-chain/node/options.go @@ -43,6 +43,15 @@ func WithBlobStorage(bs *filesystem.BlobStorage) Option { } } +// WithBlobStorageOptions appends 1 or more filesystem.BlobStorageOption on the beacon node, +// to be used when initializing blob storage. +func WithBlobStorageOptions(opt ...filesystem.BlobStorageOption) Option { + return func(bn *BeaconNode) error { + bn.BlobStorageOptions = append(bn.BlobStorageOptions, opt...) + return nil + } +} + // WithBlobRetentionEpochs sets the blobRetentionEpochs value, used in kv store initialization. func WithBlobRetentionEpochs(e primitives.Epoch) Option { return func(bn *BeaconNode) error { diff --git a/cmd/beacon-chain/storage/options.go b/cmd/beacon-chain/storage/options.go index a8cfa6d79..3ecd943ef 100644 --- a/cmd/beacon-chain/storage/options.go +++ b/cmd/beacon-chain/storage/options.go @@ -35,11 +35,10 @@ func BeaconNodeOptions(c *cli.Context) ([]node.Option, error) { if err != nil { return nil, err } - bs, err := filesystem.NewBlobStorage(blobStoragePath(c), filesystem.WithBlobRetentionEpochs(e)) - if err != nil { - return nil, err - } - return []node.Option{node.WithBlobStorage(bs), node.WithBlobRetentionEpochs(e)}, nil + opts := []node.Option{node.WithBlobStorageOptions( + filesystem.WithBlobRetentionEpochs(e), filesystem.WithBasePath(blobStoragePath(c)), + )} + return opts, nil } func blobStoragePath(c *cli.Context) string { diff --git a/config/features/config.go b/config/features/config.go index dd6516ac2..209b14c5a 100644 --- a/config/features/config.go +++ b/config/features/config.go @@ -69,7 +69,8 @@ type Flags struct { EnableEIP4881 bool // EnableEIP4881 specifies whether to use the deposit tree from EIP4881 PrepareAllPayloads bool // PrepareAllPayloads informs the engine to prepare a block on every slot. - + // BlobSaveFsync requires blob saving to block on fsync to ensure blobs are durably persisted before passing DA. + BlobSaveFsync bool // KeystoreImportDebounceInterval specifies the time duration the validator waits to reload new keys if they have // changed on disk. This feature is for advanced use cases only. KeystoreImportDebounceInterval time.Duration @@ -245,6 +246,11 @@ func ConfigureBeaconChain(ctx *cli.Context) error { logEnabled(EnableLightClient) cfg.EnableLightClient = true } + if ctx.IsSet(BlobSaveFsync.Name) { + logEnabled(BlobSaveFsync) + cfg.BlobSaveFsync = true + } + cfg.AggregateIntervals = [3]time.Duration{aggregateFirstInterval.Value, aggregateSecondInterval.Value, aggregateThirdInterval.Value} Init(cfg) return nil diff --git a/config/features/flags.go b/config/features/flags.go index 9060c88bb..7b67beaa2 100644 --- a/config/features/flags.go +++ b/config/features/flags.go @@ -149,12 +149,16 @@ var ( Name: "disable-resource-manager", Usage: "Disables running the libp2p resource manager.", } - // DisableRegistrationCache a flag for disabling the validator registration cache and use db instead. DisableRegistrationCache = &cli.BoolFlag{ Name: "disable-registration-cache", Usage: "Temporary flag for disabling the validator registration cache instead of using the DB. Note: registrations do not clear on restart while using the DB.", } + // BlobSaveFsync enforces durable filesystem writes for use cases where blob availability is critical. + BlobSaveFsync = &cli.BoolFlag{ + Name: "blob-save-fsync", + Usage: "Forces new blob files to be fysnc'd before continuing, ensuring durable blob writes.", + } ) // devModeFlags holds list of flags that are set when development mode is on. @@ -209,6 +213,7 @@ var BeaconChainFlags = append(deprecatedBeaconFlags, append(deprecatedFlags, []c disableResourceManager, DisableRegistrationCache, EnableLightClient, + BlobSaveFsync, }...)...) // E2EBeaconChainFlags contains a list of the beacon chain feature flags to be tested in E2E.