mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2025-01-05 10:32:19 +00:00
1d1d988aa7
- Moved fuse related code in a new package, swarm/fuse - Added write support - Create new files - Delete existing files - Append to files (with limitations) - More test coverage
289 lines
6.5 KiB
Go
289 lines
6.5 KiB
Go
// Copyright 2016 The go-ethereum Authors
|
|
// This file is part of the go-ethereum library.
|
|
//
|
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Lesser General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Lesser General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Lesser General Public License
|
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package api
|
|
|
|
import (
|
|
"bufio"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"os"
|
|
"path"
|
|
"path/filepath"
|
|
"sync"
|
|
|
|
"github.com/ethereum/go-ethereum/common"
|
|
"github.com/ethereum/go-ethereum/log"
|
|
"github.com/ethereum/go-ethereum/swarm/storage"
|
|
)
|
|
|
|
const maxParallelFiles = 5
|
|
|
|
type FileSystem struct {
|
|
api *Api
|
|
}
|
|
|
|
func NewFileSystem(api *Api) *FileSystem {
|
|
return &FileSystem{api}
|
|
}
|
|
|
|
// Upload replicates a local directory as a manifest file and uploads it
|
|
// using dpa store
|
|
// TODO: localpath should point to a manifest
|
|
//
|
|
// DEPRECATED: Use the HTTP API instead
|
|
func (self *FileSystem) Upload(lpath, index string) (string, error) {
|
|
var list []*manifestTrieEntry
|
|
localpath, err := filepath.Abs(filepath.Clean(lpath))
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
f, err := os.Open(localpath)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
stat, err := f.Stat()
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
var start int
|
|
if stat.IsDir() {
|
|
start = len(localpath)
|
|
log.Debug(fmt.Sprintf("uploading '%s'", localpath))
|
|
err = filepath.Walk(localpath, func(path string, info os.FileInfo, err error) error {
|
|
if (err == nil) && !info.IsDir() {
|
|
if len(path) <= start {
|
|
return fmt.Errorf("Path is too short")
|
|
}
|
|
if path[:start] != localpath {
|
|
return fmt.Errorf("Path prefix of '%s' does not match localpath '%s'", path, localpath)
|
|
}
|
|
entry := newManifestTrieEntry(&ManifestEntry{Path: filepath.ToSlash(path)}, nil)
|
|
list = append(list, entry)
|
|
}
|
|
return err
|
|
})
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
} else {
|
|
dir := filepath.Dir(localpath)
|
|
start = len(dir)
|
|
if len(localpath) <= start {
|
|
return "", fmt.Errorf("Path is too short")
|
|
}
|
|
if localpath[:start] != dir {
|
|
return "", fmt.Errorf("Path prefix of '%s' does not match dir '%s'", localpath, dir)
|
|
}
|
|
entry := newManifestTrieEntry(&ManifestEntry{Path: filepath.ToSlash(localpath)}, nil)
|
|
list = append(list, entry)
|
|
}
|
|
|
|
cnt := len(list)
|
|
errors := make([]error, cnt)
|
|
done := make(chan bool, maxParallelFiles)
|
|
dcnt := 0
|
|
awg := &sync.WaitGroup{}
|
|
|
|
for i, entry := range list {
|
|
if i >= dcnt+maxParallelFiles {
|
|
<-done
|
|
dcnt++
|
|
}
|
|
awg.Add(1)
|
|
go func(i int, entry *manifestTrieEntry, done chan bool) {
|
|
f, err := os.Open(entry.Path)
|
|
if err == nil {
|
|
stat, _ := f.Stat()
|
|
var hash storage.Key
|
|
wg := &sync.WaitGroup{}
|
|
hash, err = self.api.dpa.Store(f, stat.Size(), wg, nil)
|
|
if hash != nil {
|
|
list[i].Hash = hash.String()
|
|
}
|
|
wg.Wait()
|
|
awg.Done()
|
|
if err == nil {
|
|
first512 := make([]byte, 512)
|
|
fread, _ := f.ReadAt(first512, 0)
|
|
if fread > 0 {
|
|
mimeType := http.DetectContentType(first512[:fread])
|
|
if filepath.Ext(entry.Path) == ".css" {
|
|
mimeType = "text/css"
|
|
}
|
|
list[i].ContentType = mimeType
|
|
}
|
|
}
|
|
f.Close()
|
|
}
|
|
errors[i] = err
|
|
done <- true
|
|
}(i, entry, done)
|
|
}
|
|
for dcnt < cnt {
|
|
<-done
|
|
dcnt++
|
|
}
|
|
|
|
trie := &manifestTrie{
|
|
dpa: self.api.dpa,
|
|
}
|
|
quitC := make(chan bool)
|
|
for i, entry := range list {
|
|
if errors[i] != nil {
|
|
return "", errors[i]
|
|
}
|
|
entry.Path = RegularSlashes(entry.Path[start:])
|
|
if entry.Path == index {
|
|
ientry := newManifestTrieEntry(&ManifestEntry{
|
|
ContentType: entry.ContentType,
|
|
}, nil)
|
|
ientry.Hash = entry.Hash
|
|
trie.addEntry(ientry, quitC)
|
|
}
|
|
trie.addEntry(entry, quitC)
|
|
}
|
|
|
|
err2 := trie.recalcAndStore()
|
|
var hs string
|
|
if err2 == nil {
|
|
hs = trie.hash.String()
|
|
}
|
|
awg.Wait()
|
|
return hs, err2
|
|
}
|
|
|
|
// Download replicates the manifest basePath structure on the local filesystem
|
|
// under localpath
|
|
//
|
|
// DEPRECATED: Use the HTTP API instead
|
|
func (self *FileSystem) Download(bzzpath, localpath string) error {
|
|
lpath, err := filepath.Abs(filepath.Clean(localpath))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = os.MkdirAll(lpath, os.ModePerm)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
//resolving host and port
|
|
uri, err := Parse(path.Join("bzz:/", bzzpath))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
key, err := self.api.Resolve(uri)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
path := uri.Path
|
|
|
|
if len(path) > 0 {
|
|
path += "/"
|
|
}
|
|
|
|
quitC := make(chan bool)
|
|
trie, err := loadManifest(self.api.dpa, key, quitC)
|
|
if err != nil {
|
|
log.Warn(fmt.Sprintf("fs.Download: loadManifestTrie error: %v", err))
|
|
return err
|
|
}
|
|
|
|
type downloadListEntry struct {
|
|
key storage.Key
|
|
path string
|
|
}
|
|
|
|
var list []*downloadListEntry
|
|
var mde error
|
|
|
|
prevPath := lpath
|
|
err = trie.listWithPrefix(path, quitC, func(entry *manifestTrieEntry, suffix string) {
|
|
log.Trace(fmt.Sprintf("fs.Download: %#v", entry))
|
|
|
|
key = common.Hex2Bytes(entry.Hash)
|
|
path := lpath + "/" + suffix
|
|
dir := filepath.Dir(path)
|
|
if dir != prevPath {
|
|
mde = os.MkdirAll(dir, os.ModePerm)
|
|
prevPath = dir
|
|
}
|
|
if (mde == nil) && (path != dir+"/") {
|
|
list = append(list, &downloadListEntry{key: key, path: path})
|
|
}
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
wg := sync.WaitGroup{}
|
|
errC := make(chan error)
|
|
done := make(chan bool, maxParallelFiles)
|
|
for i, entry := range list {
|
|
select {
|
|
case done <- true:
|
|
wg.Add(1)
|
|
case <-quitC:
|
|
return fmt.Errorf("aborted")
|
|
}
|
|
go func(i int, entry *downloadListEntry) {
|
|
defer wg.Done()
|
|
err := retrieveToFile(quitC, self.api.dpa, entry.key, entry.path)
|
|
if err != nil {
|
|
select {
|
|
case errC <- err:
|
|
case <-quitC:
|
|
}
|
|
return
|
|
}
|
|
<-done
|
|
}(i, entry)
|
|
}
|
|
go func() {
|
|
wg.Wait()
|
|
close(errC)
|
|
}()
|
|
select {
|
|
case err = <-errC:
|
|
return err
|
|
case <-quitC:
|
|
return fmt.Errorf("aborted")
|
|
}
|
|
}
|
|
|
|
func retrieveToFile(quitC chan bool, dpa *storage.DPA, key storage.Key, path string) error {
|
|
f, err := os.Create(path) // TODO: basePath separators
|
|
if err != nil {
|
|
return err
|
|
}
|
|
reader := dpa.Retrieve(key)
|
|
writer := bufio.NewWriter(f)
|
|
size, err := reader.Size(quitC)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if _, err = io.CopyN(writer, reader, size); err != nil {
|
|
return err
|
|
}
|
|
if err := writer.Flush(); err != nil {
|
|
return err
|
|
}
|
|
return f.Close()
|
|
}
|