diff --git a/beacon-chain/db/filesystem/BUILD.bazel b/beacon-chain/db/filesystem/BUILD.bazel index 3d1501a5e..dd39ed377 100644 --- a/beacon-chain/db/filesystem/BUILD.bazel +++ b/beacon-chain/db/filesystem/BUILD.bazel @@ -5,30 +5,42 @@ go_library( srcs = [ "blob.go", "ephemeral.go", + "flags.go", ], importpath = "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/filesystem", visibility = ["//visibility:public"], deps = [ "//beacon-chain/verification:go_default_library", + "//cmd/beacon-chain/flags:go_default_library", "//config/fieldparams:go_default_library", + "//config/params:go_default_library", "//consensus-types/blocks:go_default_library", + "//consensus-types/primitives:go_default_library", "//io/file:go_default_library", "//proto/prysm/v1alpha1:go_default_library", "//runtime/logging:go_default_library", + "//time/slots:go_default_library", "@com_github_ethereum_go_ethereum//common/hexutil:go_default_library", "@com_github_pkg_errors//:go_default_library", "@com_github_sirupsen_logrus//:go_default_library", "@com_github_spf13_afero//:go_default_library", + "@com_github_urfave_cli_v2//:go_default_library", ], ) go_test( name = "go_default_test", - srcs = ["blob_test.go"], + srcs = [ + "blob_test.go", + "flags_test.go", + ], embed = [":go_default_library"], deps = [ "//beacon-chain/verification:go_default_library", + "//cmd/beacon-chain/flags:go_default_library", "//config/fieldparams:go_default_library", + "//config/params:go_default_library", + "//consensus-types/primitives:go_default_library", "//encoding/bytesutil:go_default_library", "//proto/prysm/v1alpha1:go_default_library", "//testing/require:go_default_library", @@ -36,5 +48,6 @@ go_test( "@com_github_ethereum_go_ethereum//common/hexutil:go_default_library", "@com_github_prysmaticlabs_fastssz//:go_default_library", "@com_github_spf13_afero//:go_default_library", + "@com_github_urfave_cli_v2//:go_default_library", ], ) diff --git a/beacon-chain/db/filesystem/blob.go b/beacon-chain/db/filesystem/blob.go index 8cbe6e877..9d6e46ce0 100644 --- a/beacon-chain/db/filesystem/blob.go +++ b/beacon-chain/db/filesystem/blob.go @@ -1,7 +1,9 @@ package filesystem import ( + "encoding/binary" "fmt" + "io" "os" "path" "strconv" @@ -12,9 +14,11 @@ import ( "github.com/prysmaticlabs/prysm/v4/beacon-chain/verification" fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams" "github.com/prysmaticlabs/prysm/v4/consensus-types/blocks" + "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" "github.com/prysmaticlabs/prysm/v4/io/file" ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/v4/runtime/logging" + "github.com/prysmaticlabs/prysm/v4/time/slots" log "github.com/sirupsen/logrus" "github.com/spf13/afero" ) @@ -27,6 +31,7 @@ const ( sszExt = "ssz" partExt = "part" + bufferEpochs = 2 directoryPermissions = 0700 ) @@ -39,12 +44,13 @@ func NewBlobStorage(base string) (*BlobStorage, error) { return nil, fmt.Errorf("failed to create blob storage at %s: %w", base, err) } fs := afero.NewBasePathFs(afero.NewOsFs(), base) - return &BlobStorage{fs: fs}, nil + return &BlobStorage{fs: fs, retentionEpochs: MaxEpochsToPersistBlobs}, nil } // BlobStorage is the concrete implementation of the filesystem backend for saving and retrieving BlobSidecars. type BlobStorage struct { - fs afero.Fs + fs afero.Fs + retentionEpochs primitives.Epoch } // Save saves blobs given a list of sidecars. @@ -179,6 +185,62 @@ func (p blobNamer) path() string { return p.fname(sszExt) } +// Prune prunes blobs in the base directory based on the retention epoch. +// It deletes blobs older than currentEpoch - (retentionEpochs+bufferEpochs). +// This is so that we keep a slight buffer and blobs are deleted after n+2 epochs. +func (bs *BlobStorage) Prune(currentSlot primitives.Slot) error { + retentionSlots, err := slots.EpochStart(bs.retentionEpochs + bufferEpochs) + if err != nil { + return err + } + if currentSlot < retentionSlots { + return nil // Overflow would occur + } + + folders, err := afero.ReadDir(bs.fs, ".") + if err != nil { + return err + } + for _, folder := range folders { + if folder.IsDir() { + f, err := bs.fs.Open(folder.Name() + "/0." + sszExt) + if err != nil { + return err + } + defer func(f afero.File) { + err := f.Close() + if err != nil { + log.WithError(err).Errorf("Could not close blob file") + } + }(f) + + slot, err := slotFromBlob(f) + if err != nil { + return err + } + if slot < (currentSlot - retentionSlots) { + if err = bs.fs.RemoveAll(folder.Name()); err != nil { + return errors.Wrapf(err, "failed to delete blob %s", f.Name()) + } + } + } + } + return nil +} + +// slotFromBlob reads the ssz data of a file at the specified offset (8 + 131072 + 48 + 48 = 131176 bytes), +// which is calculated based on the size of the BlobSidecar struct and is based on the size of the fields +// preceding the slot information within SignedBeaconBlockHeader. +func slotFromBlob(at io.ReaderAt) (primitives.Slot, error) { + b := make([]byte, 8) + _, err := at.ReadAt(b, 131176) + if err != nil { + return 0, err + } + rawSlot := binary.LittleEndian.Uint64(b) + return primitives.Slot(rawSlot), nil +} + // Delete removes the directory matching the provided block root and all the blobs it contains. func (bs *BlobStorage) Delete(root [32]byte) error { if err := bs.fs.RemoveAll(hexutil.Encode(root[:])); err != nil { diff --git a/beacon-chain/db/filesystem/blob_test.go b/beacon-chain/db/filesystem/blob_test.go index 8ca55f60f..6cc740568 100644 --- a/beacon-chain/db/filesystem/blob_test.go +++ b/beacon-chain/db/filesystem/blob_test.go @@ -10,12 +10,12 @@ import ( ssz "github.com/prysmaticlabs/fastssz" "github.com/prysmaticlabs/prysm/v4/beacon-chain/verification" fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams" + "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" "github.com/prysmaticlabs/prysm/v4/encoding/bytesutil" ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" - "github.com/spf13/afero" - "github.com/prysmaticlabs/prysm/v4/testing/require" "github.com/prysmaticlabs/prysm/v4/testing/util" + "github.com/spf13/afero" ) func TestBlobStorage_SaveBlobData(t *testing.T) { @@ -24,7 +24,8 @@ func TestBlobStorage_SaveBlobData(t *testing.T) { require.NoError(t, err) t.Run("no error for duplicate", func(t *testing.T) { - fs, bs := NewEphemeralBlobStorageWithFs(t) + fs, bs, err := NewEphemeralBlobStorageWithFs(t) + require.NoError(t, err) existingSidecar := testSidecars[0] blobPath := namerForSidecar(existingSidecar).path() @@ -74,7 +75,8 @@ func TestBlobStorage_SaveBlobData(t *testing.T) { } func TestBlobIndicesBounds(t *testing.T) { - fs, bs := NewEphemeralBlobStorageWithFs(t) + fs, bs, err := NewEphemeralBlobStorageWithFs(t) + require.NoError(t, err) root := [32]byte{} okIdx := uint64(fieldparams.MaxBlobsPerBlock - 1) @@ -103,8 +105,77 @@ func writeFakeSSZ(t *testing.T, fs afero.Fs, root [32]byte, idx uint64) { require.NoError(t, fh.Close()) } +func TestBlobStoragePrune(t *testing.T) { + currentSlot := primitives.Slot(200000) + fs, bs, err := NewEphemeralBlobStorageWithFs(t) + require.NoError(t, err) + + t.Run("PruneOne", func(t *testing.T) { + _, sidecars := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 300, fieldparams.MaxBlobsPerBlock) + testSidecars, err := verification.BlobSidecarSliceNoop(sidecars) + require.NoError(t, err) + + for _, sidecar := range testSidecars { + require.NoError(t, bs.Save(sidecar)) + } + + require.NoError(t, bs.Prune(currentSlot)) + + remainingFolders, err := afero.ReadDir(fs, ".") + require.NoError(t, err) + require.Equal(t, 0, len(remainingFolders)) + }) + t.Run("PruneMany", func(t *testing.T) { + blockQty := 10 + slot := primitives.Slot(0) + + for j := 0; j <= blockQty; j++ { + root := bytesutil.ToBytes32(bytesutil.ToBytes(uint64(slot), 32)) + _, sidecars := util.GenerateTestDenebBlockWithSidecar(t, root, slot, fieldparams.MaxBlobsPerBlock) + testSidecars, err := verification.BlobSidecarSliceNoop(sidecars) + require.NoError(t, err) + require.NoError(t, bs.Save(testSidecars[0])) + + slot += 10000 + } + + require.NoError(t, bs.Prune(currentSlot)) + + remainingFolders, err := afero.ReadDir(fs, ".") + require.NoError(t, err) + require.Equal(t, 4, len(remainingFolders)) + }) +} + +func BenchmarkPruning(b *testing.B) { + var t *testing.T + _, bs, err := NewEphemeralBlobStorageWithFs(t) + require.NoError(t, err) + + blockQty := 10000 + currentSlot := primitives.Slot(150000) + slot := primitives.Slot(0) + + for j := 0; j <= blockQty; j++ { + root := bytesutil.ToBytes32(bytesutil.ToBytes(uint64(slot), 32)) + _, sidecars := util.GenerateTestDenebBlockWithSidecar(t, root, slot, fieldparams.MaxBlobsPerBlock) + testSidecars, err := verification.BlobSidecarSliceNoop(sidecars) + require.NoError(t, err) + require.NoError(t, bs.Save(testSidecars[0])) + + slot += 100 + } + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := bs.Prune(currentSlot) + require.NoError(b, err) + } +} + func TestBlobStorageDelete(t *testing.T) { - fs, bs := NewEphemeralBlobStorageWithFs(t) + fs, bs, err := NewEphemeralBlobStorageWithFs(t) + require.NoError(t, err) rawRoot := "0xcf9bb70c98f58092c9d6459227c9765f984d240be9690e85179bc5a6f60366ad" blockRoot, err := hexutil.Decode(rawRoot) require.NoError(t, err) diff --git a/beacon-chain/db/filesystem/ephemeral.go b/beacon-chain/db/filesystem/ephemeral.go index 170dc3619..898b5e0a5 100644 --- a/beacon-chain/db/filesystem/ephemeral.go +++ b/beacon-chain/db/filesystem/ephemeral.go @@ -3,6 +3,7 @@ package filesystem import ( "testing" + "github.com/prysmaticlabs/prysm/v4/config/params" "github.com/spf13/afero" ) @@ -15,9 +16,10 @@ func NewEphemeralBlobStorage(_ testing.TB) *BlobStorage { // NewEphemeralBlobStorageWithFs can be used by tests that want access to the virtual filesystem // in order to interact with it outside the parameters of the BlobStorage api. -func NewEphemeralBlobStorageWithFs(_ testing.TB) (afero.Fs, *BlobStorage) { +func NewEphemeralBlobStorageWithFs(_ testing.TB) (afero.Fs, *BlobStorage, error) { fs := afero.NewMemMapFs() - return fs, &BlobStorage{fs: fs} + retentionEpoch := params.BeaconNetworkConfig().MinEpochsForBlobsSidecarsRequest + return fs, &BlobStorage{fs: fs, retentionEpochs: retentionEpoch}, nil } type BlobMocker struct { @@ -44,7 +46,7 @@ func (bm *BlobMocker) CreateFakeIndices(root [32]byte, indices []uint64) error { return nil } -// NewEpehmeralBlobStorageWithMocker returns a *BlobMocker value in addition to the BlobStorage value. +// NewEphemeralBlobStorageWithMocker returns a *BlobMocker value in addition to the BlobStorage value. // BlockMocker encapsulates things blob path construction to avoid leaking implementation details. func NewEphemeralBlobStorageWithMocker(_ testing.TB) (*BlobMocker, *BlobStorage) { fs := afero.NewMemMapFs() diff --git a/beacon-chain/db/kv/flags.go b/beacon-chain/db/filesystem/flags.go similarity index 86% rename from beacon-chain/db/kv/flags.go rename to beacon-chain/db/filesystem/flags.go index 891dab1c0..eaca5a996 100644 --- a/beacon-chain/db/kv/flags.go +++ b/beacon-chain/db/filesystem/flags.go @@ -1,4 +1,4 @@ -package kv +package filesystem import ( "fmt" @@ -9,9 +9,9 @@ import ( "github.com/urfave/cli/v2" ) -var maxEpochsToPersistBlobs = params.BeaconNetworkConfig().MinEpochsForBlobsSidecarsRequest +var MaxEpochsToPersistBlobs = params.BeaconNetworkConfig().MinEpochsForBlobsSidecarsRequest -// ConfigureBlobRetentionEpoch sets the epoch for blob retention based on command-line context. It sets the local config `maxEpochsToPersistBlobs`. +// ConfigureBlobRetentionEpoch sets the epoch for blob retention based on command-line context. It sets the local config `MaxEpochsToPersistBlobs`. // If the flag is not set, the spec default `MinEpochsForBlobsSidecarsRequest` is used. // An error if the input epoch is smaller than the spec default value. func ConfigureBlobRetentionEpoch(cliCtx *cli.Context) error { @@ -26,7 +26,7 @@ func ConfigureBlobRetentionEpoch(cliCtx *cli.Context) error { return fmt.Errorf("%s smaller than spec default, %d < %d", flags.BlobRetentionEpoch.Name, e, params.BeaconNetworkConfig().MinEpochsForBlobsSidecarsRequest) } - maxEpochsToPersistBlobs = e + MaxEpochsToPersistBlobs = e } return nil diff --git a/beacon-chain/db/kv/flags_test.go b/beacon-chain/db/filesystem/flags_test.go similarity index 90% rename from beacon-chain/db/kv/flags_test.go rename to beacon-chain/db/filesystem/flags_test.go index 43e35d6c5..02ef4958c 100644 --- a/beacon-chain/db/kv/flags_test.go +++ b/beacon-chain/db/filesystem/flags_test.go @@ -1,4 +1,4 @@ -package kv +package filesystem import ( "flag" @@ -13,14 +13,14 @@ import ( ) func TestConfigureBlobRetentionEpoch(t *testing.T) { - maxEpochsToPersistBlobs = params.BeaconNetworkConfig().MinEpochsForBlobsSidecarsRequest + MaxEpochsToPersistBlobs = params.BeaconNetworkConfig().MinEpochsForBlobsSidecarsRequest params.SetupTestConfigCleanup(t) app := cli.App{} set := flag.NewFlagSet("test", 0) // Test case: Spec default. require.NoError(t, ConfigureBlobRetentionEpoch(cli.NewContext(&app, set, nil))) - require.Equal(t, params.BeaconNetworkConfig().MinEpochsForBlobsSidecarsRequest, maxEpochsToPersistBlobs) + require.Equal(t, params.BeaconNetworkConfig().MinEpochsForBlobsSidecarsRequest, MaxEpochsToPersistBlobs) set.Uint64(flags.BlobRetentionEpoch.Name, 0, "") minEpochsForSidecarRequest := uint64(params.BeaconNetworkConfig().MinEpochsForBlobsSidecarsRequest) @@ -29,7 +29,7 @@ func TestConfigureBlobRetentionEpoch(t *testing.T) { // Test case: Input epoch is greater than or equal to spec value. require.NoError(t, ConfigureBlobRetentionEpoch(cliCtx)) - require.Equal(t, primitives.Epoch(2*minEpochsForSidecarRequest), maxEpochsToPersistBlobs) + require.Equal(t, primitives.Epoch(2*minEpochsForSidecarRequest), MaxEpochsToPersistBlobs) // Test case: Input epoch is less than spec value. require.NoError(t, set.Set(flags.BlobRetentionEpoch.Name, strconv.FormatUint(minEpochsForSidecarRequest-1, 10))) diff --git a/beacon-chain/db/kv/BUILD.bazel b/beacon-chain/db/kv/BUILD.bazel index c53653b05..3e0f8dc65 100644 --- a/beacon-chain/db/kv/BUILD.bazel +++ b/beacon-chain/db/kv/BUILD.bazel @@ -13,7 +13,6 @@ go_library( "error.go", "execution_chain.go", "finalized_block_roots.go", - "flags.go", "genesis.go", "key.go", "kv.go", @@ -34,12 +33,12 @@ go_library( visibility = ["//visibility:public"], deps = [ "//beacon-chain/core/blocks:go_default_library", + "//beacon-chain/db/filesystem:go_default_library", "//beacon-chain/db/filters:go_default_library", "//beacon-chain/db/iface:go_default_library", "//beacon-chain/state:go_default_library", "//beacon-chain/state/genesis:go_default_library", "//beacon-chain/state/state-native:go_default_library", - "//cmd/beacon-chain/flags:go_default_library", "//config/features:go_default_library", "//config/fieldparams:go_default_library", "//config/params:go_default_library", @@ -67,7 +66,6 @@ go_library( "@com_github_prysmaticlabs_prombbolt//:go_default_library", "@com_github_schollz_progressbar_v3//:go_default_library", "@com_github_sirupsen_logrus//:go_default_library", - "@com_github_urfave_cli_v2//:go_default_library", "@io_etcd_go_bbolt//:go_default_library", "@io_opencensus_go//trace:go_default_library", "@org_golang_google_protobuf//proto:go_default_library", @@ -86,7 +84,6 @@ go_test( "encoding_test.go", "execution_chain_test.go", "finalized_block_roots_test.go", - "flags_test.go", "genesis_test.go", "init_test.go", "kv_test.go", @@ -102,6 +99,7 @@ go_test( data = glob(["testdata/**"]), embed = [":go_default_library"], deps = [ + "//beacon-chain/db/filesystem:go_default_library", "//beacon-chain/db/filters:go_default_library", "//beacon-chain/db/iface:go_default_library", "//beacon-chain/state:go_default_library", diff --git a/beacon-chain/db/kv/blob.go b/beacon-chain/db/kv/blob.go index d22e19617..57288858b 100644 --- a/beacon-chain/db/kv/blob.go +++ b/beacon-chain/db/kv/blob.go @@ -7,6 +7,7 @@ import ( "sort" "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/filesystem" fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams" "github.com/prysmaticlabs/prysm/v4/config/params" types "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" @@ -290,7 +291,7 @@ func blobSidecarKey(blob *ethpb.DeprecatedBlobSidecar) blobRotatingKey { func slotKey(slot types.Slot) []byte { slotsPerEpoch := params.BeaconConfig().SlotsPerEpoch - maxSlotsToPersistBlobs := types.Slot(maxEpochsToPersistBlobs.Mul(uint64(slotsPerEpoch))) + maxSlotsToPersistBlobs := types.Slot(filesystem.MaxEpochsToPersistBlobs.Mul(uint64(slotsPerEpoch))) return bytesutil.SlotToBytesBigEndian(slot.ModSlot(maxSlotsToPersistBlobs)) } @@ -299,14 +300,14 @@ func checkEpochsForBlobSidecarsRequestBucket(db *bolt.DB) error { b := tx.Bucket(chainMetadataBucket) v := b.Get(blobRetentionEpochsKey) if v == nil { - if err := b.Put(blobRetentionEpochsKey, bytesutil.Uint64ToBytesBigEndian(uint64(maxEpochsToPersistBlobs))); err != nil { + if err := b.Put(blobRetentionEpochsKey, bytesutil.Uint64ToBytesBigEndian(uint64(filesystem.MaxEpochsToPersistBlobs))); err != nil { return err } return nil } e := bytesutil.BytesToUint64BigEndian(v) - if e != uint64(maxEpochsToPersistBlobs) { - return fmt.Errorf("epochs for blobs request value in DB %d does not match config value %d", e, maxEpochsToPersistBlobs) + if e != uint64(filesystem.MaxEpochsToPersistBlobs) { + return fmt.Errorf("epochs for blobs request value in DB %d does not match config value %d", e, filesystem.MaxEpochsToPersistBlobs) } return nil }); err != nil { diff --git a/beacon-chain/db/kv/blob_test.go b/beacon-chain/db/kv/blob_test.go index 85ccc2acb..8e8d48a61 100644 --- a/beacon-chain/db/kv/blob_test.go +++ b/beacon-chain/db/kv/blob_test.go @@ -9,6 +9,7 @@ import ( "testing" "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/filesystem" "github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/flags" fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams" "github.com/prysmaticlabs/prysm/v4/config/params" @@ -524,7 +525,7 @@ func Test_checkEpochsForBlobSidecarsRequestBucket(t *testing.T) { set.Uint64(flags.BlobRetentionEpoch.Name, 0, "") require.NoError(t, set.Set(flags.BlobRetentionEpoch.Name, strconv.FormatUint(42069, 10))) cliCtx := cli.NewContext(&cli.App{}, set, nil) - require.NoError(t, ConfigureBlobRetentionEpoch(cliCtx)) + require.NoError(t, filesystem.ConfigureBlobRetentionEpoch(cliCtx)) require.ErrorContains(t, "epochs for blobs request value in DB 4096 does not match config value 42069", checkEpochsForBlobSidecarsRequestBucket(dbStore.db)) } diff --git a/beacon-chain/node/node.go b/beacon-chain/node/node.go index 1be5d2ef5..58cf73c99 100644 --- a/beacon-chain/node/node.go +++ b/beacon-chain/node/node.go @@ -155,7 +155,7 @@ func New(cliCtx *cli.Context, cancel context.CancelFunc, opts ...Option) (*Beaco if err := configureExecutionSetting(cliCtx); err != nil { return nil, err } - if err := kv.ConfigureBlobRetentionEpoch(cliCtx); err != nil { + if err := filesystem.ConfigureBlobRetentionEpoch(cliCtx); err != nil { return nil, err } configureFastSSZHashingAlgorithm()