mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2025-01-10 04:51:20 +00:00
3d22a46c94
* swarm/storage: fix HashExplore concurrency bug ethersphere#1211 * swarm/storage: lock as value not pointer * swarm/storage: wait for to complete * swarm/storage: fix linter problems * swarm/storage: append to nil slice
147 lines
5.3 KiB
Go
147 lines
5.3 KiB
Go
// Copyright 2016 The go-ethereum Authors
|
|
// This file is part of the go-ethereum library.
|
|
//
|
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Lesser General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Lesser General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Lesser General Public License
|
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package storage
|
|
|
|
import (
|
|
"context"
|
|
"io"
|
|
"sort"
|
|
"sync"
|
|
)
|
|
|
|
/*
|
|
FileStore provides the client API entrypoints Store and Retrieve to store and retrieve
|
|
It can store anything that has a byte slice representation, so files or serialised objects etc.
|
|
|
|
Storage: FileStore calls the Chunker to segment the input datastream of any size to a merkle hashed tree of chunks. The key of the root block is returned to the client.
|
|
|
|
Retrieval: given the key of the root block, the FileStore retrieves the block chunks and reconstructs the original data and passes it back as a lazy reader. A lazy reader is a reader with on-demand delayed processing, i.e. the chunks needed to reconstruct a large file are only fetched and processed if that particular part of the document is actually read.
|
|
|
|
As the chunker produces chunks, FileStore dispatches them to its own chunk store
|
|
implementation for storage or retrieval.
|
|
*/
|
|
|
|
const (
|
|
defaultLDBCapacity = 5000000 // capacity for LevelDB, by default 5*10^6*4096 bytes == 20GB
|
|
defaultCacheCapacity = 10000 // capacity for in-memory chunks' cache
|
|
defaultChunkRequestsCacheCapacity = 5000000 // capacity for container holding outgoing requests for chunks. should be set to LevelDB capacity
|
|
)
|
|
|
|
type FileStore struct {
|
|
ChunkStore
|
|
hashFunc SwarmHasher
|
|
}
|
|
|
|
type FileStoreParams struct {
|
|
Hash string
|
|
}
|
|
|
|
func NewFileStoreParams() *FileStoreParams {
|
|
return &FileStoreParams{
|
|
Hash: DefaultHash,
|
|
}
|
|
}
|
|
|
|
// for testing locally
|
|
func NewLocalFileStore(datadir string, basekey []byte) (*FileStore, error) {
|
|
params := NewDefaultLocalStoreParams()
|
|
params.Init(datadir)
|
|
localStore, err := NewLocalStore(params, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
localStore.Validators = append(localStore.Validators, NewContentAddressValidator(MakeHashFunc(DefaultHash)))
|
|
return NewFileStore(localStore, NewFileStoreParams()), nil
|
|
}
|
|
|
|
func NewFileStore(store ChunkStore, params *FileStoreParams) *FileStore {
|
|
hashFunc := MakeHashFunc(params.Hash)
|
|
return &FileStore{
|
|
ChunkStore: store,
|
|
hashFunc: hashFunc,
|
|
}
|
|
}
|
|
|
|
// Retrieve is a public API. Main entry point for document retrieval directly. Used by the
|
|
// FS-aware API and httpaccess
|
|
// Chunk retrieval blocks on netStore requests with a timeout so reader will
|
|
// report error if retrieval of chunks within requested range time out.
|
|
// It returns a reader with the chunk data and whether the content was encrypted
|
|
func (f *FileStore) Retrieve(ctx context.Context, addr Address) (reader *LazyChunkReader, isEncrypted bool) {
|
|
isEncrypted = len(addr) > f.hashFunc().Size()
|
|
getter := NewHasherStore(f.ChunkStore, f.hashFunc, isEncrypted)
|
|
reader = TreeJoin(ctx, addr, getter, 0)
|
|
return
|
|
}
|
|
|
|
// Store is a public API. Main entry point for document storage directly. Used by the
|
|
// FS-aware API and httpaccess
|
|
func (f *FileStore) Store(ctx context.Context, data io.Reader, size int64, toEncrypt bool) (addr Address, wait func(context.Context) error, err error) {
|
|
putter := NewHasherStore(f.ChunkStore, f.hashFunc, toEncrypt)
|
|
return PyramidSplit(ctx, data, putter, putter)
|
|
}
|
|
|
|
func (f *FileStore) HashSize() int {
|
|
return f.hashFunc().Size()
|
|
}
|
|
|
|
// GetAllReferences is a public API. This endpoint returns all chunk hashes (only) for a given file
|
|
func (f *FileStore) GetAllReferences(ctx context.Context, data io.Reader, toEncrypt bool) (addrs AddressCollection, err error) {
|
|
// create a special kind of putter, which only will store the references
|
|
putter := &hashExplorer{
|
|
hasherStore: NewHasherStore(f.ChunkStore, f.hashFunc, toEncrypt),
|
|
}
|
|
// do the actual splitting anyway, no way around it
|
|
_, wait, err := PyramidSplit(ctx, data, putter, putter)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// wait for splitting to be complete and all chunks processed
|
|
err = wait(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// collect all references
|
|
addrs = NewAddressCollection(0)
|
|
for _, ref := range putter.references {
|
|
addrs = append(addrs, Address(ref))
|
|
}
|
|
sort.Sort(addrs)
|
|
return addrs, nil
|
|
}
|
|
|
|
// hashExplorer is a special kind of putter which will only store chunk references
|
|
type hashExplorer struct {
|
|
*hasherStore
|
|
references []Reference
|
|
lock sync.Mutex
|
|
}
|
|
|
|
// HashExplorer's Put will add just the chunk hashes to its `References`
|
|
func (he *hashExplorer) Put(ctx context.Context, chunkData ChunkData) (Reference, error) {
|
|
// Need to do the actual Put, which returns the references
|
|
ref, err := he.hasherStore.Put(ctx, chunkData)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// internally store the reference
|
|
he.lock.Lock()
|
|
he.references = append(he.references, ref)
|
|
he.lock.Unlock()
|
|
return ref, nil
|
|
}
|