mirror of
https://gitlab.com/pulsechaincom/prysm-pulse.git
synced 2025-01-03 00:27:38 +00:00
blob save fsync feature flag (#13652)
Co-authored-by: Kasey Kirkham <kasey@users.noreply.github.com>
This commit is contained in:
parent
e100fb0c08
commit
70e1b11aeb
@ -24,6 +24,7 @@ var (
|
|||||||
errIndexOutOfBounds = errors.New("blob index in file name >= MaxBlobsPerBlock")
|
errIndexOutOfBounds = errors.New("blob index in file name >= MaxBlobsPerBlock")
|
||||||
errEmptyBlobWritten = errors.New("zero bytes written to disk when saving blob sidecar")
|
errEmptyBlobWritten = errors.New("zero bytes written to disk when saving blob sidecar")
|
||||||
errSidecarEmptySSZData = errors.New("sidecar marshalled to an empty ssz byte slice")
|
errSidecarEmptySSZData = errors.New("sidecar marshalled to an empty ssz byte slice")
|
||||||
|
errNoBasePath = errors.New("BlobStorage base path not specified in init")
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -36,14 +37,26 @@ const (
|
|||||||
// BlobStorageOption is a functional option for configuring a BlobStorage.
|
// BlobStorageOption is a functional option for configuring a BlobStorage.
|
||||||
type BlobStorageOption func(*BlobStorage) error
|
type BlobStorageOption func(*BlobStorage) error
|
||||||
|
|
||||||
|
// WithBasePath is a required option that sets the base path of blob storage.
|
||||||
|
func WithBasePath(base string) BlobStorageOption {
|
||||||
|
return func(b *BlobStorage) error {
|
||||||
|
b.base = base
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// WithBlobRetentionEpochs is an option that changes the number of epochs blobs will be persisted.
|
// WithBlobRetentionEpochs is an option that changes the number of epochs blobs will be persisted.
|
||||||
func WithBlobRetentionEpochs(e primitives.Epoch) BlobStorageOption {
|
func WithBlobRetentionEpochs(e primitives.Epoch) BlobStorageOption {
|
||||||
return func(b *BlobStorage) error {
|
return func(b *BlobStorage) error {
|
||||||
pruner, err := newBlobPruner(b.fs, e)
|
b.retentionEpochs = e
|
||||||
if err != nil {
|
return nil
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
b.pruner = pruner
|
}
|
||||||
|
|
||||||
|
// WithSaveFsync is an option that causes Save to call fsync before renaming part files for improved durability.
|
||||||
|
func WithSaveFsync(fsync bool) BlobStorageOption {
|
||||||
|
return func(b *BlobStorage) error {
|
||||||
|
b.fsync = fsync
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -51,28 +64,34 @@ func WithBlobRetentionEpochs(e primitives.Epoch) BlobStorageOption {
|
|||||||
// NewBlobStorage creates a new instance of the BlobStorage object. Note that the implementation of BlobStorage may
|
// NewBlobStorage creates a new instance of the BlobStorage object. Note that the implementation of BlobStorage may
|
||||||
// attempt to hold a file lock to guarantee exclusive control of the blob storage directory, so this should only be
|
// attempt to hold a file lock to guarantee exclusive control of the blob storage directory, so this should only be
|
||||||
// initialized once per beacon node.
|
// initialized once per beacon node.
|
||||||
func NewBlobStorage(base string, opts ...BlobStorageOption) (*BlobStorage, error) {
|
func NewBlobStorage(opts ...BlobStorageOption) (*BlobStorage, error) {
|
||||||
base = path.Clean(base)
|
b := &BlobStorage{}
|
||||||
if err := file.MkdirAll(base); err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to create blob storage at %s: %w", base, err)
|
|
||||||
}
|
|
||||||
fs := afero.NewBasePathFs(afero.NewOsFs(), base)
|
|
||||||
b := &BlobStorage{
|
|
||||||
fs: fs,
|
|
||||||
}
|
|
||||||
for _, o := range opts {
|
for _, o := range opts {
|
||||||
if err := o(b); err != nil {
|
if err := o(b); err != nil {
|
||||||
return nil, fmt.Errorf("failed to create blob storage at %s: %w", base, err)
|
return nil, errors.Wrap(err, "failed to create blob storage")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if b.pruner == nil {
|
if b.base == "" {
|
||||||
log.Warn("Initializing blob filesystem storage with pruning disabled")
|
return nil, errNoBasePath
|
||||||
}
|
}
|
||||||
|
b.base = path.Clean(b.base)
|
||||||
|
if err := file.MkdirAll(b.base); err != nil {
|
||||||
|
return nil, errors.Wrapf(err, "failed to create blob storage at %s", b.base)
|
||||||
|
}
|
||||||
|
b.fs = afero.NewBasePathFs(afero.NewOsFs(), b.base)
|
||||||
|
pruner, err := newBlobPruner(b.fs, b.retentionEpochs)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
b.pruner = pruner
|
||||||
return b, nil
|
return b, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// BlobStorage is the concrete implementation of the filesystem backend for saving and retrieving BlobSidecars.
|
// BlobStorage is the concrete implementation of the filesystem backend for saving and retrieving BlobSidecars.
|
||||||
type BlobStorage struct {
|
type BlobStorage struct {
|
||||||
|
base string
|
||||||
|
retentionEpochs primitives.Epoch
|
||||||
|
fsync bool
|
||||||
fs afero.Fs
|
fs afero.Fs
|
||||||
pruner *blobPruner
|
pruner *blobPruner
|
||||||
}
|
}
|
||||||
@ -151,8 +170,13 @@ func (bs *BlobStorage) Save(sidecar blocks.VerifiedROBlob) error {
|
|||||||
}
|
}
|
||||||
return errors.Wrap(err, "failed to write to partial file")
|
return errors.Wrap(err, "failed to write to partial file")
|
||||||
}
|
}
|
||||||
err = partialFile.Close()
|
if bs.fsync {
|
||||||
if err != nil {
|
if err := partialFile.Sync(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := partialFile.Close(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -108,7 +108,7 @@ func TestBlobStorage_SaveBlobData(t *testing.T) {
|
|||||||
// to be empty. This test ensures that several routines can safely save the same blob at the
|
// to be empty. This test ensures that several routines can safely save the same blob at the
|
||||||
// same time. This isn't ideal behavior from the caller, but should be handled safely anyway.
|
// same time. This isn't ideal behavior from the caller, but should be handled safely anyway.
|
||||||
// See https://github.com/prysmaticlabs/prysm/pull/13648
|
// See https://github.com/prysmaticlabs/prysm/pull/13648
|
||||||
b, err := NewBlobStorage(t.TempDir())
|
b, err := NewBlobStorage(WithBasePath(t.TempDir()))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
blob := testSidecars[0]
|
blob := testSidecars[0]
|
||||||
|
|
||||||
@ -268,6 +268,8 @@ func BenchmarkPruning(b *testing.B) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestNewBlobStorage(t *testing.T) {
|
func TestNewBlobStorage(t *testing.T) {
|
||||||
_, err := NewBlobStorage(path.Join(t.TempDir(), "good"))
|
_, err := NewBlobStorage()
|
||||||
|
require.ErrorIs(t, err, errNoBasePath)
|
||||||
|
_, err = NewBlobStorage(WithBasePath(path.Join(t.TempDir(), "good")))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
@ -118,6 +118,7 @@ type BeaconNode struct {
|
|||||||
BackfillOpts []backfill.ServiceOption
|
BackfillOpts []backfill.ServiceOption
|
||||||
initialSyncComplete chan struct{}
|
initialSyncComplete chan struct{}
|
||||||
BlobStorage *filesystem.BlobStorage
|
BlobStorage *filesystem.BlobStorage
|
||||||
|
BlobStorageOptions []filesystem.BlobStorageOption
|
||||||
blobRetentionEpochs primitives.Epoch
|
blobRetentionEpochs primitives.Epoch
|
||||||
verifyInitWaiter *verification.InitializerWaiter
|
verifyInitWaiter *verification.InitializerWaiter
|
||||||
syncChecker *initialsync.SyncChecker
|
syncChecker *initialsync.SyncChecker
|
||||||
@ -209,6 +210,16 @@ func New(cliCtx *cli.Context, cancel context.CancelFunc, opts ...Option) (*Beaco
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Allow tests to set it as an opt.
|
||||||
|
if beacon.BlobStorage == nil {
|
||||||
|
beacon.BlobStorageOptions = append(beacon.BlobStorageOptions, filesystem.WithSaveFsync(features.Get().BlobSaveFsync))
|
||||||
|
blobs, err := filesystem.NewBlobStorage(beacon.BlobStorageOptions...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
beacon.BlobStorage = blobs
|
||||||
|
}
|
||||||
|
|
||||||
log.Debugln("Starting DB")
|
log.Debugln("Starting DB")
|
||||||
if err := beacon.startDB(cliCtx, depositAddress); err != nil {
|
if err := beacon.startDB(cliCtx, depositAddress); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -43,6 +43,15 @@ func WithBlobStorage(bs *filesystem.BlobStorage) Option {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithBlobStorageOptions appends 1 or more filesystem.BlobStorageOption on the beacon node,
|
||||||
|
// to be used when initializing blob storage.
|
||||||
|
func WithBlobStorageOptions(opt ...filesystem.BlobStorageOption) Option {
|
||||||
|
return func(bn *BeaconNode) error {
|
||||||
|
bn.BlobStorageOptions = append(bn.BlobStorageOptions, opt...)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// WithBlobRetentionEpochs sets the blobRetentionEpochs value, used in kv store initialization.
|
// WithBlobRetentionEpochs sets the blobRetentionEpochs value, used in kv store initialization.
|
||||||
func WithBlobRetentionEpochs(e primitives.Epoch) Option {
|
func WithBlobRetentionEpochs(e primitives.Epoch) Option {
|
||||||
return func(bn *BeaconNode) error {
|
return func(bn *BeaconNode) error {
|
||||||
|
@ -35,11 +35,10 @@ func BeaconNodeOptions(c *cli.Context) ([]node.Option, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
bs, err := filesystem.NewBlobStorage(blobStoragePath(c), filesystem.WithBlobRetentionEpochs(e))
|
opts := []node.Option{node.WithBlobStorageOptions(
|
||||||
if err != nil {
|
filesystem.WithBlobRetentionEpochs(e), filesystem.WithBasePath(blobStoragePath(c)),
|
||||||
return nil, err
|
)}
|
||||||
}
|
return opts, nil
|
||||||
return []node.Option{node.WithBlobStorage(bs), node.WithBlobRetentionEpochs(e)}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func blobStoragePath(c *cli.Context) string {
|
func blobStoragePath(c *cli.Context) string {
|
||||||
|
@ -69,7 +69,8 @@ type Flags struct {
|
|||||||
EnableEIP4881 bool // EnableEIP4881 specifies whether to use the deposit tree from EIP4881
|
EnableEIP4881 bool // EnableEIP4881 specifies whether to use the deposit tree from EIP4881
|
||||||
|
|
||||||
PrepareAllPayloads bool // PrepareAllPayloads informs the engine to prepare a block on every slot.
|
PrepareAllPayloads bool // PrepareAllPayloads informs the engine to prepare a block on every slot.
|
||||||
|
// BlobSaveFsync requires blob saving to block on fsync to ensure blobs are durably persisted before passing DA.
|
||||||
|
BlobSaveFsync bool
|
||||||
// KeystoreImportDebounceInterval specifies the time duration the validator waits to reload new keys if they have
|
// KeystoreImportDebounceInterval specifies the time duration the validator waits to reload new keys if they have
|
||||||
// changed on disk. This feature is for advanced use cases only.
|
// changed on disk. This feature is for advanced use cases only.
|
||||||
KeystoreImportDebounceInterval time.Duration
|
KeystoreImportDebounceInterval time.Duration
|
||||||
@ -245,6 +246,11 @@ func ConfigureBeaconChain(ctx *cli.Context) error {
|
|||||||
logEnabled(EnableLightClient)
|
logEnabled(EnableLightClient)
|
||||||
cfg.EnableLightClient = true
|
cfg.EnableLightClient = true
|
||||||
}
|
}
|
||||||
|
if ctx.IsSet(BlobSaveFsync.Name) {
|
||||||
|
logEnabled(BlobSaveFsync)
|
||||||
|
cfg.BlobSaveFsync = true
|
||||||
|
}
|
||||||
|
|
||||||
cfg.AggregateIntervals = [3]time.Duration{aggregateFirstInterval.Value, aggregateSecondInterval.Value, aggregateThirdInterval.Value}
|
cfg.AggregateIntervals = [3]time.Duration{aggregateFirstInterval.Value, aggregateSecondInterval.Value, aggregateThirdInterval.Value}
|
||||||
Init(cfg)
|
Init(cfg)
|
||||||
return nil
|
return nil
|
||||||
|
@ -149,12 +149,16 @@ var (
|
|||||||
Name: "disable-resource-manager",
|
Name: "disable-resource-manager",
|
||||||
Usage: "Disables running the libp2p resource manager.",
|
Usage: "Disables running the libp2p resource manager.",
|
||||||
}
|
}
|
||||||
|
|
||||||
// DisableRegistrationCache a flag for disabling the validator registration cache and use db instead.
|
// DisableRegistrationCache a flag for disabling the validator registration cache and use db instead.
|
||||||
DisableRegistrationCache = &cli.BoolFlag{
|
DisableRegistrationCache = &cli.BoolFlag{
|
||||||
Name: "disable-registration-cache",
|
Name: "disable-registration-cache",
|
||||||
Usage: "Temporary flag for disabling the validator registration cache instead of using the DB. Note: registrations do not clear on restart while using the DB.",
|
Usage: "Temporary flag for disabling the validator registration cache instead of using the DB. Note: registrations do not clear on restart while using the DB.",
|
||||||
}
|
}
|
||||||
|
// BlobSaveFsync enforces durable filesystem writes for use cases where blob availability is critical.
|
||||||
|
BlobSaveFsync = &cli.BoolFlag{
|
||||||
|
Name: "blob-save-fsync",
|
||||||
|
Usage: "Forces new blob files to be fysnc'd before continuing, ensuring durable blob writes.",
|
||||||
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
// devModeFlags holds list of flags that are set when development mode is on.
|
// devModeFlags holds list of flags that are set when development mode is on.
|
||||||
@ -209,6 +213,7 @@ var BeaconChainFlags = append(deprecatedBeaconFlags, append(deprecatedFlags, []c
|
|||||||
disableResourceManager,
|
disableResourceManager,
|
||||||
DisableRegistrationCache,
|
DisableRegistrationCache,
|
||||||
EnableLightClient,
|
EnableLightClient,
|
||||||
|
BlobSaveFsync,
|
||||||
}...)...)
|
}...)...)
|
||||||
|
|
||||||
// E2EBeaconChainFlags contains a list of the beacon chain feature flags to be tested in E2E.
|
// E2EBeaconChainFlags contains a list of the beacon chain feature flags to be tested in E2E.
|
||||||
|
Loading…
Reference in New Issue
Block a user