mirror of
https://gitlab.com/pulsechaincom/prysm-pulse.git
synced 2024-12-22 03:30:35 +00:00
Blob filesystem: Save Blobs (#13129)
* Add Save blob and tests * Remove locks * Remove test cleanup * Fix go mod * Cleanup * Add checksum * Add file hashing to fileutil * Move test * Check data when exists * Add one more test * Rename * Gaz * Add packaged level comment * Save full sidecar + reviews * Use path builder in test * Use other BlobSidecar * Cleanup * Fix gosec --------- Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
This commit is contained in:
parent
d1dd8471a3
commit
0f65e51d1e
28
beacon-chain/db/filesystem/BUILD.bazel
Normal file
28
beacon-chain/db/filesystem/BUILD.bazel
Normal file
@ -0,0 +1,28 @@
|
||||
load("@prysm//tools/go:def.bzl", "go_library", "go_test")
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = ["save_blob.go"],
|
||||
importpath = "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/filesystem",
|
||||
visibility = ["//visibility:private"],
|
||||
deps = [
|
||||
"//io/file:go_default_library",
|
||||
"//proto/eth/v2:go_default_library",
|
||||
"@com_github_pkg_errors//:go_default_library",
|
||||
"@com_github_prysmaticlabs_fastssz//:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = ["save_blob_test.go"],
|
||||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
"//config/fieldparams:go_default_library",
|
||||
"//encoding/bytesutil:go_default_library",
|
||||
"//io/file:go_default_library",
|
||||
"//proto/eth/v2:go_default_library",
|
||||
"//testing/require:go_default_library",
|
||||
"@com_github_prysmaticlabs_fastssz//:go_default_library",
|
||||
],
|
||||
)
|
98
beacon-chain/db/filesystem/save_blob.go
Normal file
98
beacon-chain/db/filesystem/save_blob.go
Normal file
@ -0,0 +1,98 @@
|
||||
package filesystem
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
ssz "github.com/prysmaticlabs/fastssz"
|
||||
"github.com/prysmaticlabs/prysm/v4/io/file"
|
||||
"github.com/prysmaticlabs/prysm/v4/proto/eth/v2"
|
||||
)
|
||||
|
||||
type BlobStorage struct {
|
||||
baseDir string
|
||||
}
|
||||
|
||||
// SaveBlobData saves blobs given a list of sidecars.
|
||||
func (bs *BlobStorage) SaveBlobData(sidecars []*eth.BlobSidecar) error {
|
||||
if len(sidecars) == 0 {
|
||||
return errors.New("no blob data to save")
|
||||
}
|
||||
for _, sidecar := range sidecars {
|
||||
blobPath := bs.sidecarFileKey(sidecar)
|
||||
exists := file.Exists(blobPath)
|
||||
if exists {
|
||||
if err := checkDataIntegrity(sidecar, blobPath); err != nil {
|
||||
// This error should never happen, if it does then the
|
||||
// file has most likely been tampered with.
|
||||
return errors.Wrapf(err, "failed to save blob sidecar, tried to overwrite blob (%s) with different content", blobPath)
|
||||
}
|
||||
continue // Blob already exists, move to the next one
|
||||
}
|
||||
|
||||
// Serialize the ethpb.BlobSidecar to binary data using SSZ.
|
||||
sidecarData, err := ssz.MarshalSSZ(sidecar)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to serialize sidecar data")
|
||||
}
|
||||
|
||||
// Create a partial file and write the serialized data to it.
|
||||
partialFilePath := blobPath + ".partial"
|
||||
partialFile, err := os.Create(filepath.Clean(partialFilePath))
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to create partial file")
|
||||
}
|
||||
|
||||
_, err = partialFile.Write(sidecarData)
|
||||
if err != nil {
|
||||
closeErr := partialFile.Close()
|
||||
if closeErr != nil {
|
||||
return closeErr
|
||||
}
|
||||
return errors.Wrap(err, "failed to write to partial file")
|
||||
}
|
||||
err = partialFile.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Atomically rename the partial file to its final name.
|
||||
err = os.Rename(partialFilePath, blobPath)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to rename partial file to final name")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (bs *BlobStorage) sidecarFileKey(sidecar *eth.BlobSidecar) string {
|
||||
return path.Join(bs.baseDir, fmt.Sprintf(
|
||||
"%d_%x_%d_%x.blob",
|
||||
sidecar.Slot,
|
||||
sidecar.BlockRoot,
|
||||
sidecar.Index,
|
||||
sidecar.KzgCommitment,
|
||||
))
|
||||
}
|
||||
|
||||
// checkDataIntegrity checks the data integrity by comparing the original ethpb.BlobSidecar.
|
||||
func checkDataIntegrity(sidecar *eth.BlobSidecar, filePath string) error {
|
||||
sidecarData, err := ssz.MarshalSSZ(sidecar)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to serialize sidecar data")
|
||||
}
|
||||
originalChecksum := sha256.Sum256(sidecarData)
|
||||
savedFileChecksum, err := file.HashFile(filePath)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to calculate saved file checksum")
|
||||
}
|
||||
if hex.EncodeToString(originalChecksum[:]) != hex.EncodeToString(savedFileChecksum) {
|
||||
return errors.New("data integrity check failed")
|
||||
}
|
||||
return nil
|
||||
}
|
191
beacon-chain/db/filesystem/save_blob_test.go
Normal file
191
beacon-chain/db/filesystem/save_blob_test.go
Normal file
@ -0,0 +1,191 @@
|
||||
package filesystem
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/rand"
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"os"
|
||||
"path"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
ssz "github.com/prysmaticlabs/fastssz"
|
||||
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
|
||||
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
|
||||
"github.com/prysmaticlabs/prysm/v4/io/file"
|
||||
"github.com/prysmaticlabs/prysm/v4/proto/eth/v2"
|
||||
|
||||
"github.com/prysmaticlabs/prysm/v4/testing/require"
|
||||
)
|
||||
|
||||
func TestBlobStorage_SaveBlobData(t *testing.T) {
|
||||
testSidecars := generateBlobSidecars(t, fieldparams.MaxBlobsPerBlock)
|
||||
t.Run("NoBlobData", func(t *testing.T) {
|
||||
tempDir := t.TempDir()
|
||||
bs := &BlobStorage{baseDir: tempDir}
|
||||
err := bs.SaveBlobData([]*eth.BlobSidecar{})
|
||||
require.ErrorContains(t, "no blob data to save", err)
|
||||
})
|
||||
|
||||
t.Run("BlobExists", func(t *testing.T) {
|
||||
tempDir := t.TempDir()
|
||||
bs := &BlobStorage{baseDir: tempDir}
|
||||
existingSidecar := testSidecars[0]
|
||||
|
||||
blobPath := bs.sidecarFileKey(existingSidecar)
|
||||
// Serialize the existing BlobSidecar to binary data.
|
||||
existingSidecarData, err := ssz.MarshalSSZ(existingSidecar)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = os.MkdirAll(path.Dir(blobPath), os.ModePerm)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Write the serialized data to the blob file.
|
||||
err = os.WriteFile(blobPath, existingSidecarData, os.ModePerm)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = bs.SaveBlobData([]*eth.BlobSidecar{existingSidecar})
|
||||
require.NoError(t, err)
|
||||
|
||||
content, err := os.ReadFile(blobPath)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Deserialize the BlobSidecar from the saved file data.
|
||||
var savedSidecar ssz.Unmarshaler
|
||||
savedSidecar = ð.BlobSidecar{}
|
||||
err = savedSidecar.UnmarshalSSZ(content)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Compare the original Sidecar and the saved Sidecar.
|
||||
require.DeepSSZEqual(t, existingSidecar, savedSidecar)
|
||||
})
|
||||
|
||||
t.Run("SaveBlobDataNoErrors", func(t *testing.T) {
|
||||
tempDir := t.TempDir()
|
||||
bs := &BlobStorage{baseDir: tempDir}
|
||||
err := bs.SaveBlobData(testSidecars)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Check the number of files in the directory.
|
||||
files, err := os.ReadDir(tempDir)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, len(testSidecars), len(files))
|
||||
|
||||
for _, f := range files {
|
||||
content, err := os.ReadFile(path.Join(tempDir, f.Name()))
|
||||
require.NoError(t, err)
|
||||
|
||||
// Deserialize the BlobSidecar from the saved file data.
|
||||
var savedSidecar ssz.Unmarshaler
|
||||
savedSidecar = ð.BlobSidecar{}
|
||||
err = savedSidecar.UnmarshalSSZ(content)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Find the corresponding test sidecar based on the file name.
|
||||
sidecar := findTestSidecarsByFileName(t, testSidecars, f.Name())
|
||||
require.NotNil(t, sidecar)
|
||||
// Compare the original Sidecar and the saved Sidecar.
|
||||
require.DeepSSZEqual(t, sidecar, savedSidecar)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("OverwriteBlobWithDifferentContent", func(t *testing.T) {
|
||||
tempDir := t.TempDir()
|
||||
bs := &BlobStorage{baseDir: tempDir}
|
||||
originalSidecar := []*eth.BlobSidecar{testSidecars[0]}
|
||||
// Save the original sidecar
|
||||
err := bs.SaveBlobData(originalSidecar)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Modify the blob data
|
||||
modifiedSidecar := originalSidecar
|
||||
modifiedSidecar[0].Blob = []byte("Modified Blob Data")
|
||||
|
||||
err = bs.SaveBlobData(modifiedSidecar)
|
||||
require.ErrorContains(t, "failed to save blob sidecar, tried to overwrite blob", err)
|
||||
})
|
||||
}
|
||||
|
||||
func findTestSidecarsByFileName(t *testing.T, testSidecars []*eth.BlobSidecar, fileName string) *eth.BlobSidecar {
|
||||
parts := strings.SplitN(fileName, ".", 2)
|
||||
require.Equal(t, 2, len(parts))
|
||||
// parts[0] contains the substring before the first period
|
||||
components := strings.Split(parts[0], "_")
|
||||
if len(components) == 4 {
|
||||
blockRoot, err := hex.DecodeString(components[1])
|
||||
require.NoError(t, err)
|
||||
kzgCommitment, err := hex.DecodeString(components[3])
|
||||
require.NoError(t, err)
|
||||
for _, sidecar := range testSidecars {
|
||||
if bytes.Equal(sidecar.BlockRoot, blockRoot) && bytes.Equal(sidecar.KzgCommitment, kzgCommitment) {
|
||||
return sidecar
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestCheckDataIntegrity(t *testing.T) {
|
||||
testSidecars := generateBlobSidecars(t, fieldparams.MaxBlobsPerBlock)
|
||||
originalData, err := ssz.MarshalSSZ(testSidecars[0])
|
||||
require.NoError(t, err)
|
||||
originalChecksum := sha256.Sum256(originalData)
|
||||
|
||||
tempDir := t.TempDir()
|
||||
tempfile, err := os.CreateTemp(tempDir, "testfile")
|
||||
require.NoError(t, err)
|
||||
_, err = tempfile.Write(originalData)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = checkDataIntegrity(testSidecars[0], tempfile.Name())
|
||||
require.NoError(t, err)
|
||||
|
||||
// Modify the data in the file to simulate data corruption
|
||||
corruptedData := []byte("corrupted data")
|
||||
err = os.WriteFile(tempfile.Name(), corruptedData, os.ModePerm)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Test data integrity check with corrupted data
|
||||
err = checkDataIntegrity(testSidecars[0], tempfile.Name())
|
||||
require.ErrorContains(t, "data integrity check failed", err)
|
||||
|
||||
// Modify the calculated checksum to be incorrect
|
||||
wrongChecksum := hex.EncodeToString(originalChecksum[:]) + "12345"
|
||||
err = os.WriteFile(tempfile.Name(), []byte(wrongChecksum), os.ModePerm)
|
||||
require.NoError(t, err)
|
||||
|
||||
checksum, err := file.HashFile(tempfile.Name())
|
||||
require.NoError(t, err)
|
||||
require.NotEqual(t, wrongChecksum, hex.EncodeToString(checksum))
|
||||
}
|
||||
|
||||
func generateBlobSidecars(t *testing.T, n uint64) []*eth.BlobSidecar {
|
||||
blobSidecars := make([]*eth.BlobSidecar, n)
|
||||
for i := uint64(0); i < n; i++ {
|
||||
blobSidecars[i] = generateBlobSidecar(t, i)
|
||||
}
|
||||
return blobSidecars
|
||||
}
|
||||
|
||||
func generateBlobSidecar(t *testing.T, index uint64) *eth.BlobSidecar {
|
||||
blob := make([]byte, 131072)
|
||||
_, err := rand.Read(blob)
|
||||
require.NoError(t, err)
|
||||
kzgCommitment := make([]byte, 48)
|
||||
_, err = rand.Read(kzgCommitment)
|
||||
require.NoError(t, err)
|
||||
kzgProof := make([]byte, 48)
|
||||
_, err = rand.Read(kzgProof)
|
||||
require.NoError(t, err)
|
||||
return ð.BlobSidecar{
|
||||
BlockRoot: bytesutil.PadTo([]byte{'a'}, 32),
|
||||
Index: index,
|
||||
Slot: 100,
|
||||
BlockParentRoot: bytesutil.PadTo([]byte{'b'}, 32),
|
||||
ProposerIndex: 101,
|
||||
Blob: blob,
|
||||
KzgCommitment: kzgCommitment,
|
||||
KzgProof: kzgProof,
|
||||
}
|
||||
}
|
@ -276,25 +276,34 @@ func HashDir(dir string) (string, error) {
|
||||
files = append([]string(nil), files...)
|
||||
sort.Strings(files)
|
||||
for _, file := range files {
|
||||
fd, err := os.Open(filepath.Join(dir, file)) // #nosec G304
|
||||
hf, err := HashFile(filepath.Join(dir, file))
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
hf := sha256.New()
|
||||
_, err = io.Copy(hf, fd)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if err := fd.Close(); err != nil {
|
||||
return "", err
|
||||
}
|
||||
if _, err := fmt.Fprintf(h, "%x %s\n", hf.Sum(nil), file); err != nil {
|
||||
if _, err := fmt.Fprintf(h, "%x %s\n", hf, file); err != nil {
|
||||
return "", err
|
||||
}
|
||||
}
|
||||
return "hashdir:" + base64.StdEncoding.EncodeToString(h.Sum(nil)), nil
|
||||
}
|
||||
|
||||
// HashFile calculates and returns the hash of a file.
|
||||
func HashFile(filePath string) ([]byte, error) {
|
||||
f, err := os.Open(filepath.Clean(filePath))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
hf := sha256.New()
|
||||
if _, err := io.Copy(hf, f); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = f.Close()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return hf.Sum(nil), nil
|
||||
}
|
||||
|
||||
// DirFiles returns list of files found within a given directory and its sub-directories.
|
||||
// Directory prefix will not be included as a part of returned file string i.e. for a file located
|
||||
// in "dir/foo/bar" only "foo/bar" part will be returned.
|
||||
|
@ -18,6 +18,8 @@ package file_test
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"os"
|
||||
"os/user"
|
||||
"path/filepath"
|
||||
@ -244,6 +246,26 @@ func TestHashDir(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestHashFile(t *testing.T) {
|
||||
originalData := []byte("test data")
|
||||
originalChecksum := sha256.Sum256(originalData)
|
||||
|
||||
tempDir := t.TempDir()
|
||||
tempfile, err := os.CreateTemp(tempDir, "testfile")
|
||||
require.NoError(t, err)
|
||||
_, err = tempfile.Write(originalData)
|
||||
require.NoError(t, err)
|
||||
err = tempfile.Close()
|
||||
require.NoError(t, err)
|
||||
|
||||
// Calculate the checksum of the temporary file
|
||||
checksum, err := file.HashFile(tempfile.Name())
|
||||
require.NoError(t, err)
|
||||
|
||||
// Ensure the calculated checksum matches the original checksum
|
||||
require.Equal(t, hex.EncodeToString(originalChecksum[:]), hex.EncodeToString(checksum))
|
||||
}
|
||||
|
||||
func TestDirFiles(t *testing.T) {
|
||||
tmpDir, tmpDirFnames := tmpDirWithContents(t)
|
||||
tests := []struct {
|
||||
|
Loading…
Reference in New Issue
Block a user