mirror of
https://gitlab.com/pulsechaincom/prysm-pulse.git
synced 2025-01-19 08:14:15 +00:00
f40b8583f7
* Add Save blob and tests * Remove locks * Remove test cleanup * Fix go mod * Cleanup * Add checksum * Add file hashing to fileutil * Move test * Check data when exists * Add one more test * Rename * Gaz * Add packaged level comment * Fix block proposals in the REST validator client (#13116) * Fix block proposals in the REST validator client * fix graffiti test * return empty graffiti * fallback to old endpoints * logs * handle 404 * everything passes * review from James * log undecoded value * test fixes and additions --------- Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com> * fix head slot in log (#13139) * zig: Update zig to recent main branch commit (#13142) Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com> * Fix Pending Queue Deadline Bug (#13145) * rearrange deadline * naming * Add pruning * Gaz * Gaz * Update pruning * Cleanup * Making a mess * Benchmarking * Forgot to add the file + fixes * Fixes * Pruning from DB fixed * Add prune by file data * Fix pruning * Prune fixes * Cleanup db blockRoot filter * Handle file close error * Fix deletion * Change read at + remove retentionEpich from bs * Gaz * Seperate logic + add detailed comments * Add tests * Add retention slot when creating blobStorage * Fix tests * Gaz * Fix testonly import * Fix linter errors * Fix retentionSlot calculation * Move + use MaxEpochsToPersistBlobs * Remove unused ctx * Prestons suggestion Co-authored-by: Preston Van Loon <pvanloon@offchainlabs.com> * Rename --------- Co-authored-by: Radosław Kapka <rkapka@wp.pl> Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com> Co-authored-by: Potuz <potuz@prysmaticlabs.com> Co-authored-by: Preston Van Loon <pvanloon@offchainlabs.com> Co-authored-by: Nishant Das <nishdas93@gmail.com>
251 lines
7.1 KiB
Go
251 lines
7.1 KiB
Go
package filesystem
|
|
|
|
import (
|
|
"encoding/binary"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"path"
|
|
"strconv"
|
|
"strings"
|
|
|
|
"github.com/ethereum/go-ethereum/common/hexutil"
|
|
"github.com/pkg/errors"
|
|
"github.com/prysmaticlabs/prysm/v4/beacon-chain/verification"
|
|
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
|
|
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
|
|
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
|
|
"github.com/prysmaticlabs/prysm/v4/io/file"
|
|
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
|
|
"github.com/prysmaticlabs/prysm/v4/runtime/logging"
|
|
"github.com/prysmaticlabs/prysm/v4/time/slots"
|
|
log "github.com/sirupsen/logrus"
|
|
"github.com/spf13/afero"
|
|
)
|
|
|
|
var (
|
|
errIndexOutOfBounds = errors.New("blob index in file name > MaxBlobsPerBlock")
|
|
)
|
|
|
|
const (
|
|
sszExt = "ssz"
|
|
partExt = "part"
|
|
|
|
bufferEpochs = 2
|
|
directoryPermissions = 0700
|
|
)
|
|
|
|
// NewBlobStorage creates a new instance of the BlobStorage object. Note that the implementation of BlobStorage may
|
|
// attempt to hold a file lock to guarantee exclusive control of the blob storage directory, so this should only be
|
|
// initialized once per beacon node.
|
|
func NewBlobStorage(base string) (*BlobStorage, error) {
|
|
base = path.Clean(base)
|
|
if err := file.MkdirAll(base); err != nil {
|
|
return nil, fmt.Errorf("failed to create blob storage at %s: %w", base, err)
|
|
}
|
|
fs := afero.NewBasePathFs(afero.NewOsFs(), base)
|
|
return &BlobStorage{fs: fs, retentionEpochs: MaxEpochsToPersistBlobs}, nil
|
|
}
|
|
|
|
// BlobStorage is the concrete implementation of the filesystem backend for saving and retrieving BlobSidecars.
|
|
type BlobStorage struct {
|
|
fs afero.Fs
|
|
retentionEpochs primitives.Epoch
|
|
}
|
|
|
|
// Save saves blobs given a list of sidecars.
|
|
func (bs *BlobStorage) Save(sidecar blocks.VerifiedROBlob) error {
|
|
fname := namerForSidecar(sidecar)
|
|
sszPath := fname.path()
|
|
exists, err := afero.Exists(bs.fs, sszPath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if exists {
|
|
log.WithFields(logging.BlobFields(sidecar.ROBlob)).Debug("ignoring a duplicate blob sidecar Save attempt")
|
|
return nil
|
|
}
|
|
|
|
// Serialize the ethpb.BlobSidecar to binary data using SSZ.
|
|
sidecarData, err := sidecar.MarshalSSZ()
|
|
if err != nil {
|
|
return errors.Wrap(err, "failed to serialize sidecar data")
|
|
}
|
|
if err := bs.fs.MkdirAll(fname.dir(), directoryPermissions); err != nil {
|
|
return err
|
|
}
|
|
partPath := fname.partPath()
|
|
// Create a partial file and write the serialized data to it.
|
|
partialFile, err := bs.fs.Create(partPath)
|
|
if err != nil {
|
|
return errors.Wrap(err, "failed to create partial file")
|
|
}
|
|
|
|
_, err = partialFile.Write(sidecarData)
|
|
if err != nil {
|
|
closeErr := partialFile.Close()
|
|
if closeErr != nil {
|
|
return closeErr
|
|
}
|
|
return errors.Wrap(err, "failed to write to partial file")
|
|
}
|
|
err = partialFile.Close()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Atomically rename the partial file to its final name.
|
|
err = bs.fs.Rename(partPath, sszPath)
|
|
if err != nil {
|
|
return errors.Wrap(err, "failed to rename partial file to final name")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Get retrieves a single BlobSidecar by its root and index.
|
|
// Since BlobStorage only writes blobs that have undergone full verification, the return
|
|
// value is always a VerifiedROBlob.
|
|
func (bs *BlobStorage) Get(root [32]byte, idx uint64) (blocks.VerifiedROBlob, error) {
|
|
expected := blobNamer{root: root, index: idx}
|
|
encoded, err := afero.ReadFile(bs.fs, expected.path())
|
|
var v blocks.VerifiedROBlob
|
|
if err != nil {
|
|
return v, err
|
|
}
|
|
s := ðpb.BlobSidecar{}
|
|
if err := s.UnmarshalSSZ(encoded); err != nil {
|
|
return v, err
|
|
}
|
|
ro, err := blocks.NewROBlobWithRoot(s, root)
|
|
if err != nil {
|
|
return blocks.VerifiedROBlob{}, err
|
|
}
|
|
return verification.BlobSidecarNoop(ro)
|
|
}
|
|
|
|
// Indices generates a bitmap representing which BlobSidecar.Index values are present on disk for a given root.
|
|
// This value can be compared to the commitments observed in a block to determine which indices need to be found
|
|
// on the network to confirm data availability.
|
|
func (bs *BlobStorage) Indices(root [32]byte) ([fieldparams.MaxBlobsPerBlock]bool, error) {
|
|
var mask [fieldparams.MaxBlobsPerBlock]bool
|
|
rootDir := blobNamer{root: root}.dir()
|
|
entries, err := afero.ReadDir(bs.fs, rootDir)
|
|
if err != nil {
|
|
if os.IsNotExist(err) {
|
|
return mask, nil
|
|
}
|
|
return mask, err
|
|
}
|
|
for i := range entries {
|
|
if entries[i].IsDir() {
|
|
continue
|
|
}
|
|
name := entries[i].Name()
|
|
if !strings.HasSuffix(name, sszExt) {
|
|
continue
|
|
}
|
|
parts := strings.Split(name, ".")
|
|
if len(parts) != 2 {
|
|
continue
|
|
}
|
|
u, err := strconv.ParseUint(parts[0], 10, 64)
|
|
if err != nil {
|
|
return mask, errors.Wrapf(err, "unexpected directory entry breaks listing, %s", parts[0])
|
|
}
|
|
if u >= fieldparams.MaxBlobsPerBlock {
|
|
return mask, errIndexOutOfBounds
|
|
}
|
|
mask[u] = true
|
|
}
|
|
return mask, nil
|
|
}
|
|
|
|
type blobNamer struct {
|
|
root [32]byte
|
|
index uint64
|
|
}
|
|
|
|
func namerForSidecar(sc blocks.VerifiedROBlob) blobNamer {
|
|
return blobNamer{root: sc.BlockRoot(), index: sc.Index}
|
|
}
|
|
|
|
func (p blobNamer) dir() string {
|
|
return fmt.Sprintf("%#x", p.root)
|
|
}
|
|
|
|
func (p blobNamer) fname(ext string) string {
|
|
return path.Join(p.dir(), fmt.Sprintf("%d.%s", p.index, ext))
|
|
}
|
|
|
|
func (p blobNamer) partPath() string {
|
|
return p.fname(partExt)
|
|
}
|
|
|
|
func (p blobNamer) path() string {
|
|
return p.fname(sszExt)
|
|
}
|
|
|
|
// Prune prunes blobs in the base directory based on the retention epoch.
|
|
// It deletes blobs older than currentEpoch - (retentionEpochs+bufferEpochs).
|
|
// This is so that we keep a slight buffer and blobs are deleted after n+2 epochs.
|
|
func (bs *BlobStorage) Prune(currentSlot primitives.Slot) error {
|
|
retentionSlots, err := slots.EpochStart(bs.retentionEpochs + bufferEpochs)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if currentSlot < retentionSlots {
|
|
return nil // Overflow would occur
|
|
}
|
|
|
|
folders, err := afero.ReadDir(bs.fs, ".")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, folder := range folders {
|
|
if folder.IsDir() {
|
|
f, err := bs.fs.Open(folder.Name() + "/0." + sszExt)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer func(f afero.File) {
|
|
err := f.Close()
|
|
if err != nil {
|
|
log.WithError(err).Errorf("Could not close blob file")
|
|
}
|
|
}(f)
|
|
|
|
slot, err := slotFromBlob(f)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if slot < (currentSlot - retentionSlots) {
|
|
if err = bs.fs.RemoveAll(folder.Name()); err != nil {
|
|
return errors.Wrapf(err, "failed to delete blob %s", f.Name())
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// slotFromBlob reads the ssz data of a file at the specified offset (8 + 131072 + 48 + 48 = 131176 bytes),
|
|
// which is calculated based on the size of the BlobSidecar struct and is based on the size of the fields
|
|
// preceding the slot information within SignedBeaconBlockHeader.
|
|
func slotFromBlob(at io.ReaderAt) (primitives.Slot, error) {
|
|
b := make([]byte, 8)
|
|
_, err := at.ReadAt(b, 131176)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
rawSlot := binary.LittleEndian.Uint64(b)
|
|
return primitives.Slot(rawSlot), nil
|
|
}
|
|
|
|
// Delete removes the directory matching the provided block root and all the blobs it contains.
|
|
func (bs *BlobStorage) Delete(root [32]byte) error {
|
|
if err := bs.fs.RemoveAll(hexutil.Encode(root[:])); err != nil {
|
|
return fmt.Errorf("failed to delete blobs for root %#x: %w", root, err)
|
|
}
|
|
return nil
|
|
}
|