From 0f65e51d1e1f256ee880babd58b0b85e47be4868 Mon Sep 17 00:00:00 2001 From: Sammy Rosso <15244892+saolyn@users.noreply.github.com> Date: Fri, 3 Nov 2023 17:24:30 +0100 Subject: [PATCH] Blob filesystem: Save Blobs (#13129) * Add Save blob and tests * Remove locks * Remove test cleanup * Fix go mod * Cleanup * Add checksum * Add file hashing to fileutil * Move test * Check data when exists * Add one more test * Rename * Gaz * Add packaged level comment * Save full sidecar + reviews * Use path builder in test * Use other BlobSidecar * Cleanup * Fix gosec --------- Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com> --- beacon-chain/db/filesystem/BUILD.bazel | 28 +++ beacon-chain/db/filesystem/save_blob.go | 98 ++++++++++ beacon-chain/db/filesystem/save_blob_test.go | 191 +++++++++++++++++++ io/file/fileutil.go | 29 ++- io/file/fileutil_test.go | 22 +++ 5 files changed, 358 insertions(+), 10 deletions(-) create mode 100644 beacon-chain/db/filesystem/BUILD.bazel create mode 100644 beacon-chain/db/filesystem/save_blob.go create mode 100644 beacon-chain/db/filesystem/save_blob_test.go diff --git a/beacon-chain/db/filesystem/BUILD.bazel b/beacon-chain/db/filesystem/BUILD.bazel new file mode 100644 index 000000000..c1b5c4e67 --- /dev/null +++ b/beacon-chain/db/filesystem/BUILD.bazel @@ -0,0 +1,28 @@ +load("@prysm//tools/go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = ["save_blob.go"], + importpath = "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/filesystem", + visibility = ["//visibility:private"], + deps = [ + "//io/file:go_default_library", + "//proto/eth/v2:go_default_library", + "@com_github_pkg_errors//:go_default_library", + "@com_github_prysmaticlabs_fastssz//:go_default_library", + ], +) + +go_test( + name = "go_default_test", + srcs = ["save_blob_test.go"], + embed = [":go_default_library"], + deps = [ + "//config/fieldparams:go_default_library", + "//encoding/bytesutil:go_default_library", + "//io/file:go_default_library", + "//proto/eth/v2:go_default_library", + "//testing/require:go_default_library", + "@com_github_prysmaticlabs_fastssz//:go_default_library", + ], +) diff --git a/beacon-chain/db/filesystem/save_blob.go b/beacon-chain/db/filesystem/save_blob.go new file mode 100644 index 000000000..b974b9b2f --- /dev/null +++ b/beacon-chain/db/filesystem/save_blob.go @@ -0,0 +1,98 @@ +package filesystem + +import ( + "crypto/sha256" + "encoding/hex" + "fmt" + "os" + "path" + "path/filepath" + + "github.com/pkg/errors" + ssz "github.com/prysmaticlabs/fastssz" + "github.com/prysmaticlabs/prysm/v4/io/file" + "github.com/prysmaticlabs/prysm/v4/proto/eth/v2" +) + +type BlobStorage struct { + baseDir string +} + +// SaveBlobData saves blobs given a list of sidecars. +func (bs *BlobStorage) SaveBlobData(sidecars []*eth.BlobSidecar) error { + if len(sidecars) == 0 { + return errors.New("no blob data to save") + } + for _, sidecar := range sidecars { + blobPath := bs.sidecarFileKey(sidecar) + exists := file.Exists(blobPath) + if exists { + if err := checkDataIntegrity(sidecar, blobPath); err != nil { + // This error should never happen, if it does then the + // file has most likely been tampered with. + return errors.Wrapf(err, "failed to save blob sidecar, tried to overwrite blob (%s) with different content", blobPath) + } + continue // Blob already exists, move to the next one + } + + // Serialize the ethpb.BlobSidecar to binary data using SSZ. + sidecarData, err := ssz.MarshalSSZ(sidecar) + if err != nil { + return errors.Wrap(err, "failed to serialize sidecar data") + } + + // Create a partial file and write the serialized data to it. + partialFilePath := blobPath + ".partial" + partialFile, err := os.Create(filepath.Clean(partialFilePath)) + if err != nil { + return errors.Wrap(err, "failed to create partial file") + } + + _, err = partialFile.Write(sidecarData) + if err != nil { + closeErr := partialFile.Close() + if closeErr != nil { + return closeErr + } + return errors.Wrap(err, "failed to write to partial file") + } + err = partialFile.Close() + if err != nil { + return err + } + + // Atomically rename the partial file to its final name. + err = os.Rename(partialFilePath, blobPath) + if err != nil { + return errors.Wrap(err, "failed to rename partial file to final name") + } + } + return nil +} + +func (bs *BlobStorage) sidecarFileKey(sidecar *eth.BlobSidecar) string { + return path.Join(bs.baseDir, fmt.Sprintf( + "%d_%x_%d_%x.blob", + sidecar.Slot, + sidecar.BlockRoot, + sidecar.Index, + sidecar.KzgCommitment, + )) +} + +// checkDataIntegrity checks the data integrity by comparing the original ethpb.BlobSidecar. +func checkDataIntegrity(sidecar *eth.BlobSidecar, filePath string) error { + sidecarData, err := ssz.MarshalSSZ(sidecar) + if err != nil { + return errors.Wrap(err, "failed to serialize sidecar data") + } + originalChecksum := sha256.Sum256(sidecarData) + savedFileChecksum, err := file.HashFile(filePath) + if err != nil { + return errors.Wrap(err, "failed to calculate saved file checksum") + } + if hex.EncodeToString(originalChecksum[:]) != hex.EncodeToString(savedFileChecksum) { + return errors.New("data integrity check failed") + } + return nil +} diff --git a/beacon-chain/db/filesystem/save_blob_test.go b/beacon-chain/db/filesystem/save_blob_test.go new file mode 100644 index 000000000..6dc491d0c --- /dev/null +++ b/beacon-chain/db/filesystem/save_blob_test.go @@ -0,0 +1,191 @@ +package filesystem + +import ( + "bytes" + "crypto/rand" + "crypto/sha256" + "encoding/hex" + "os" + "path" + "strings" + "testing" + + ssz "github.com/prysmaticlabs/fastssz" + fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams" + "github.com/prysmaticlabs/prysm/v4/encoding/bytesutil" + "github.com/prysmaticlabs/prysm/v4/io/file" + "github.com/prysmaticlabs/prysm/v4/proto/eth/v2" + + "github.com/prysmaticlabs/prysm/v4/testing/require" +) + +func TestBlobStorage_SaveBlobData(t *testing.T) { + testSidecars := generateBlobSidecars(t, fieldparams.MaxBlobsPerBlock) + t.Run("NoBlobData", func(t *testing.T) { + tempDir := t.TempDir() + bs := &BlobStorage{baseDir: tempDir} + err := bs.SaveBlobData([]*eth.BlobSidecar{}) + require.ErrorContains(t, "no blob data to save", err) + }) + + t.Run("BlobExists", func(t *testing.T) { + tempDir := t.TempDir() + bs := &BlobStorage{baseDir: tempDir} + existingSidecar := testSidecars[0] + + blobPath := bs.sidecarFileKey(existingSidecar) + // Serialize the existing BlobSidecar to binary data. + existingSidecarData, err := ssz.MarshalSSZ(existingSidecar) + require.NoError(t, err) + + err = os.MkdirAll(path.Dir(blobPath), os.ModePerm) + require.NoError(t, err) + + // Write the serialized data to the blob file. + err = os.WriteFile(blobPath, existingSidecarData, os.ModePerm) + require.NoError(t, err) + + err = bs.SaveBlobData([]*eth.BlobSidecar{existingSidecar}) + require.NoError(t, err) + + content, err := os.ReadFile(blobPath) + require.NoError(t, err) + + // Deserialize the BlobSidecar from the saved file data. + var savedSidecar ssz.Unmarshaler + savedSidecar = ð.BlobSidecar{} + err = savedSidecar.UnmarshalSSZ(content) + require.NoError(t, err) + + // Compare the original Sidecar and the saved Sidecar. + require.DeepSSZEqual(t, existingSidecar, savedSidecar) + }) + + t.Run("SaveBlobDataNoErrors", func(t *testing.T) { + tempDir := t.TempDir() + bs := &BlobStorage{baseDir: tempDir} + err := bs.SaveBlobData(testSidecars) + require.NoError(t, err) + + // Check the number of files in the directory. + files, err := os.ReadDir(tempDir) + require.NoError(t, err) + require.Equal(t, len(testSidecars), len(files)) + + for _, f := range files { + content, err := os.ReadFile(path.Join(tempDir, f.Name())) + require.NoError(t, err) + + // Deserialize the BlobSidecar from the saved file data. + var savedSidecar ssz.Unmarshaler + savedSidecar = ð.BlobSidecar{} + err = savedSidecar.UnmarshalSSZ(content) + require.NoError(t, err) + + // Find the corresponding test sidecar based on the file name. + sidecar := findTestSidecarsByFileName(t, testSidecars, f.Name()) + require.NotNil(t, sidecar) + // Compare the original Sidecar and the saved Sidecar. + require.DeepSSZEqual(t, sidecar, savedSidecar) + } + }) + + t.Run("OverwriteBlobWithDifferentContent", func(t *testing.T) { + tempDir := t.TempDir() + bs := &BlobStorage{baseDir: tempDir} + originalSidecar := []*eth.BlobSidecar{testSidecars[0]} + // Save the original sidecar + err := bs.SaveBlobData(originalSidecar) + require.NoError(t, err) + + // Modify the blob data + modifiedSidecar := originalSidecar + modifiedSidecar[0].Blob = []byte("Modified Blob Data") + + err = bs.SaveBlobData(modifiedSidecar) + require.ErrorContains(t, "failed to save blob sidecar, tried to overwrite blob", err) + }) +} + +func findTestSidecarsByFileName(t *testing.T, testSidecars []*eth.BlobSidecar, fileName string) *eth.BlobSidecar { + parts := strings.SplitN(fileName, ".", 2) + require.Equal(t, 2, len(parts)) + // parts[0] contains the substring before the first period + components := strings.Split(parts[0], "_") + if len(components) == 4 { + blockRoot, err := hex.DecodeString(components[1]) + require.NoError(t, err) + kzgCommitment, err := hex.DecodeString(components[3]) + require.NoError(t, err) + for _, sidecar := range testSidecars { + if bytes.Equal(sidecar.BlockRoot, blockRoot) && bytes.Equal(sidecar.KzgCommitment, kzgCommitment) { + return sidecar + } + } + } + return nil +} + +func TestCheckDataIntegrity(t *testing.T) { + testSidecars := generateBlobSidecars(t, fieldparams.MaxBlobsPerBlock) + originalData, err := ssz.MarshalSSZ(testSidecars[0]) + require.NoError(t, err) + originalChecksum := sha256.Sum256(originalData) + + tempDir := t.TempDir() + tempfile, err := os.CreateTemp(tempDir, "testfile") + require.NoError(t, err) + _, err = tempfile.Write(originalData) + require.NoError(t, err) + + err = checkDataIntegrity(testSidecars[0], tempfile.Name()) + require.NoError(t, err) + + // Modify the data in the file to simulate data corruption + corruptedData := []byte("corrupted data") + err = os.WriteFile(tempfile.Name(), corruptedData, os.ModePerm) + require.NoError(t, err) + + // Test data integrity check with corrupted data + err = checkDataIntegrity(testSidecars[0], tempfile.Name()) + require.ErrorContains(t, "data integrity check failed", err) + + // Modify the calculated checksum to be incorrect + wrongChecksum := hex.EncodeToString(originalChecksum[:]) + "12345" + err = os.WriteFile(tempfile.Name(), []byte(wrongChecksum), os.ModePerm) + require.NoError(t, err) + + checksum, err := file.HashFile(tempfile.Name()) + require.NoError(t, err) + require.NotEqual(t, wrongChecksum, hex.EncodeToString(checksum)) +} + +func generateBlobSidecars(t *testing.T, n uint64) []*eth.BlobSidecar { + blobSidecars := make([]*eth.BlobSidecar, n) + for i := uint64(0); i < n; i++ { + blobSidecars[i] = generateBlobSidecar(t, i) + } + return blobSidecars +} + +func generateBlobSidecar(t *testing.T, index uint64) *eth.BlobSidecar { + blob := make([]byte, 131072) + _, err := rand.Read(blob) + require.NoError(t, err) + kzgCommitment := make([]byte, 48) + _, err = rand.Read(kzgCommitment) + require.NoError(t, err) + kzgProof := make([]byte, 48) + _, err = rand.Read(kzgProof) + require.NoError(t, err) + return ð.BlobSidecar{ + BlockRoot: bytesutil.PadTo([]byte{'a'}, 32), + Index: index, + Slot: 100, + BlockParentRoot: bytesutil.PadTo([]byte{'b'}, 32), + ProposerIndex: 101, + Blob: blob, + KzgCommitment: kzgCommitment, + KzgProof: kzgProof, + } +} diff --git a/io/file/fileutil.go b/io/file/fileutil.go index 902fa4814..fc98d4ddb 100644 --- a/io/file/fileutil.go +++ b/io/file/fileutil.go @@ -276,25 +276,34 @@ func HashDir(dir string) (string, error) { files = append([]string(nil), files...) sort.Strings(files) for _, file := range files { - fd, err := os.Open(filepath.Join(dir, file)) // #nosec G304 + hf, err := HashFile(filepath.Join(dir, file)) if err != nil { return "", err } - hf := sha256.New() - _, err = io.Copy(hf, fd) - if err != nil { - return "", err - } - if err := fd.Close(); err != nil { - return "", err - } - if _, err := fmt.Fprintf(h, "%x %s\n", hf.Sum(nil), file); err != nil { + if _, err := fmt.Fprintf(h, "%x %s\n", hf, file); err != nil { return "", err } } return "hashdir:" + base64.StdEncoding.EncodeToString(h.Sum(nil)), nil } +// HashFile calculates and returns the hash of a file. +func HashFile(filePath string) ([]byte, error) { + f, err := os.Open(filepath.Clean(filePath)) + if err != nil { + return nil, err + } + hf := sha256.New() + if _, err := io.Copy(hf, f); err != nil { + return nil, err + } + err = f.Close() + if err != nil { + return nil, err + } + return hf.Sum(nil), nil +} + // DirFiles returns list of files found within a given directory and its sub-directories. // Directory prefix will not be included as a part of returned file string i.e. for a file located // in "dir/foo/bar" only "foo/bar" part will be returned. diff --git a/io/file/fileutil_test.go b/io/file/fileutil_test.go index 7f519ff68..d1a7e7c48 100644 --- a/io/file/fileutil_test.go +++ b/io/file/fileutil_test.go @@ -18,6 +18,8 @@ package file_test import ( "bufio" "bytes" + "crypto/sha256" + "encoding/hex" "os" "os/user" "path/filepath" @@ -244,6 +246,26 @@ func TestHashDir(t *testing.T) { }) } +func TestHashFile(t *testing.T) { + originalData := []byte("test data") + originalChecksum := sha256.Sum256(originalData) + + tempDir := t.TempDir() + tempfile, err := os.CreateTemp(tempDir, "testfile") + require.NoError(t, err) + _, err = tempfile.Write(originalData) + require.NoError(t, err) + err = tempfile.Close() + require.NoError(t, err) + + // Calculate the checksum of the temporary file + checksum, err := file.HashFile(tempfile.Name()) + require.NoError(t, err) + + // Ensure the calculated checksum matches the original checksum + require.Equal(t, hex.EncodeToString(originalChecksum[:]), hex.EncodeToString(checksum)) +} + func TestDirFiles(t *testing.T) { tmpDir, tmpDirFnames := tmpDirWithContents(t) tests := []struct {