mirror of
synced 2025-01-04 18:14:28 +00:00
This change introduces additional processes to manage snapshot uploading for E2 snapshots: ## erigon snapshots upload The `snapshots uploader` command starts a version of erigon customized for uploading snapshot files to a remote location. It breaks the stage execution process after the senders stage and then uses the snapshot stage to send uploaded headers, bodies and (in the case of polygon) bor spans and events to snapshot files. Because this process avoids execution in run signifigantly faster than a standard erigon configuration. The uploader uses rclone to send seedable (100K or 500K blocks) to a remote storage location specified in the rclone config file. The **uploader** is configured to minimize disk usage by doing the following: * It removes snapshots once they are loaded * It aggressively prunes the database once entities are transferred to snapshots in addition to this it has the following performance related features: * maximizes the workers allocated to snapshot processing to improve throughput * Can be started from scratch by downloading the latest snapshots from the remote location to seed processing ## snapshots command Is a stand alone command for managing remote snapshots it has the following sub commands * **cmp** - compare snapshots * **copy** - copy snapshots * **verify** - verify snapshots * **manifest** - manage the manifest file in the root of remote snapshot locations * **torrent** - manage snapshot torrent files
784 lines
16 KiB
784 lines
16 KiB
package downloader
import (
type rcloneInfo struct {
file string
snapInfo *snaptype.FileInfo
remoteInfo remoteInfo
localInfo fs.FileInfo
func (i *rcloneInfo) Version() uint8 {
if i.snapInfo != nil {
return i.snapInfo.Version
return 0
func (i *rcloneInfo) From() uint64 {
if i.snapInfo != nil {
return i.snapInfo.From
return 0
func (i *rcloneInfo) To() uint64 {
if i.snapInfo != nil {
return i.snapInfo.To
return 0
func (i *rcloneInfo) Type() snaptype.Type {
if i.snapInfo != nil {
return i.snapInfo.T
return snaptype.Unknown
type RCloneClient struct {
rclone *exec.Cmd
rcloneUrl string
rcloneSession *http.Client
logger log.Logger
func (c *RCloneClient) start(logger log.Logger) error {
c.logger = logger
rclone, _ := exec.LookPath("rclone")
if len(rclone) == 0 {
logger.Warn("[rclone] Uploading disabled: rclone not found in PATH")
return fmt.Errorf("rclone not found in PATH")
if p, err := freePort(); err == nil {
ctx, cancel := context.WithCancel(context.Background())
addr := fmt.Sprintf("", p)
c.rclone = exec.CommandContext(ctx, rclone, "rcd", "--rc-addr", addr, "--rc-no-auth")
c.rcloneUrl = "http://" + addr
c.rcloneSession = &http.Client{} // no timeout - we're doing sync calls
if err := c.rclone.Start(); err != nil {
logger.Warn("[rclone] Uploading disabled: rclone didn't start", "err", err)
return fmt.Errorf("rclone didn't start: %w", err)
} else {
logger.Info("[rclone] rclone started", "addr", addr)
go func() {
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, syscall.SIGTERM, syscall.SIGINT)
switch s := <-signalCh; s {
case syscall.SIGTERM, syscall.SIGINT:
return nil
func (c *RCloneClient) ListRemotes(ctx context.Context) ([]string, error) {
result, err := c.cmd(ctx, "config/listremotes", nil)
if err != nil {
return nil, err
remotes := struct {
Remotes []string `json:"remotes"`
err = json.Unmarshal(result, &remotes)
if err != nil {
return nil, err
return remotes.Remotes, nil
func (u *RCloneClient) sync(ctx context.Context, request *rcloneRequest) error {
_, err := u.cmd(ctx, "sync/sync", request)
return err
return retryConnects(ctx, func(ctx context.Context) error {
return client.CallContext(ctx, result, string(method), args...)
func isConnectionError(err error) bool {
var opErr *net.OpError
if errors.As(err, &opErr) {
return opErr.Op == "dial"
return false
const connectionTimeout = time.Second * 5
func retry(ctx context.Context, op func(context.Context) error, isRecoverableError func(error) bool, delay time.Duration, lastErr error) error {
err := op(ctx)
if err == nil {
return nil
if errors.Is(err, context.DeadlineExceeded) && lastErr != nil {
return lastErr
if !isRecoverableError(err) {
return err
delayTimer := time.NewTimer(delay)
select {
case <-delayTimer.C:
return retry(ctx, op, isRecoverableError, delay, err)
case <-ctx.Done():
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
return err
return ctx.Err()
func (u *RCloneClient) cmd(ctx context.Context, path string, args interface{}) ([]byte, error) {
requestBody, err := json.Marshal(args)
if err != nil {
return nil, err
request, err := http.NewRequestWithContext(ctx, http.MethodPost,
u.rcloneUrl+"/"+path, bytes.NewBuffer(requestBody))
if err != nil {
return nil, err
request.Header.Set("Content-Type", "application/json")
ctx, cancel := context.WithTimeout(ctx, connectionTimeout)
defer cancel()
var response *http.Response
err = retry(ctx, func(ctx context.Context) error {
response, err = u.rcloneSession.Do(request) //nolint:bodyclose
return err
}, isConnectionError, time.Millisecond*200, nil)
if err != nil {
return nil, err
defer response.Body.Close()
if response.StatusCode != http.StatusOK {
responseBody := struct {
Error string `json:"error"`
if err := json.NewDecoder(response.Body).Decode(&responseBody); err == nil && len(responseBody.Error) > 0 {
u.logger.Warn("[rclone] cmd failed", "path", path, "status", response.Status, "err", responseBody.Error)
return nil, fmt.Errorf("cmd: %s failed: %s: %s", path, response.Status, responseBody.Error)
} else {
u.logger.Warn("[rclone] cmd failed", "path", path, "status", response.Status)
return nil, fmt.Errorf("cmd: %s failed: %s", path, response.Status)
return io.ReadAll(response.Body)
type RCloneSession struct {
files map[string]*rcloneInfo
oplock sync.Mutex
remoteFs string
localFs string
syncQueue chan syncRequest
syncScheduled atomic.Bool
activeSyncCount atomic.Int32
cancel context.CancelFunc
var rcClient RCloneClient
var rcClientStart sync.Once
func NewRCloneClient(logger log.Logger) (*RCloneClient, error) {
var err error
rcClientStart.Do(func() {
err = rcClient.start(logger)
if err != nil {
return nil, err
return &rcClient, nil
func freePort() (port int, err error) {
if a, err := net.ResolveTCPAddr("tcp", ""); err != nil {
return 0, err
} else {
if l, err := net.ListenTCP("tcp", a); err != nil {
return 0, err
} else {
defer l.Close()
return l.Addr().(*net.TCPAddr).Port, nil
func (c *RCloneClient) NewSession(ctx context.Context, localFs string, remoteFs string) (*RCloneSession, error) {
ctx, cancel := context.WithCancel(ctx)
session := &RCloneSession{
RCloneClient: c,
files: map[string]*rcloneInfo{},
remoteFs: remoteFs,
localFs: localFs,
cancel: cancel,
syncQueue: make(chan syncRequest, 100),
go func() {
if _, err := session.ReadRemoteDir(ctx, true); err == nil {
return session, nil
func (c *RCloneSession) RemoteFsRoot() string {
return c.remoteFs
func (c *RCloneSession) LocalFsRoot() string {
return c.localFs
func (c *RCloneSession) Stop() {
type syncRequest struct {
ctx context.Context
info map[string]*rcloneInfo
cerr chan error
request *rcloneRequest
retryTime time.Duration
func (c *RCloneSession) Upload(ctx context.Context, files ...string) error {
reqInfo := map[string]*rcloneInfo{}
for _, file := range files {
info, ok := c.files[file]
if !ok || info.localInfo == nil {
localInfo, err := os.Stat(filepath.Join(c.localFs, file))
if err != nil {
return fmt.Errorf("can't upload: %s: %w", file, err)
if !localInfo.Mode().IsRegular() || localInfo.Size() == 0 {
return fmt.Errorf("can't upload: %s: %s", file, "file is not uploadable")
if ok {
info.localInfo = localInfo
} else {
info := &rcloneInfo{
file: file,
localInfo: localInfo,
if snapInfo, ok := snaptype.ParseFileName(c.localFs, file); ok {
info.snapInfo = &snapInfo
c.files[file] = info
} else {
reqInfo[file] = info
cerr := make(chan error, 1)
c.syncQueue <- syncRequest{ctx, reqInfo, cerr,
Group: c.Label(),
SrcFs: c.localFs,
DstFs: c.remoteFs,
Filter: rcloneFilter{
IncludeRule: files,
}}, 0}
return <-cerr
func (c *RCloneSession) Download(ctx context.Context, files ...string) error {
if len(c.files) == 0 {
_, err := c.ReadRemoteDir(ctx, false)
if err != nil {
return fmt.Errorf("can't download: %s: %w", files, err)
reqInfo := map[string]*rcloneInfo{}
for _, file := range files {
info, ok := c.files[file]
if !ok || info.remoteInfo.Size == 0 {
return fmt.Errorf("can't download: %s: %w", file, os.ErrNotExist)
reqInfo[file] = info
cerr := make(chan error, 1)
c.syncQueue <- syncRequest{ctx, reqInfo, cerr,
SrcFs: c.remoteFs,
DstFs: c.localFs,
Filter: rcloneFilter{
IncludeRule: files,
}}, 0}
return <-cerr
func (c *RCloneSession) Cat(ctx context.Context, file string) (io.Reader, error) {
rclone, err := exec.LookPath("rclone")
if err != nil {
return nil, err
cmd := exec.CommandContext(ctx, rclone, "cat", c.remoteFs+"/"+file)
stdout, err := cmd.StdoutPipe()
if err != nil {
return nil, err
if err := cmd.Start(); err != nil {
return nil, err
return stdout, nil
func (c *RCloneSession) ReadLocalDir(ctx context.Context) ([]fs.DirEntry, error) {
return os.ReadDir(c.localFs)
func (c *RCloneSession) Label() string {
return strconv.FormatUint(murmur3.Sum64([]byte(c.localFs+"<->"+c.remoteFs)), 36)
type remoteInfo struct {
Name string
Size uint64
ModTime time.Time
type SnapInfo interface {
Version() uint8
From() uint64
To() uint64
Type() snaptype.Type
type fileInfo struct {
func (fi *fileInfo) Name() string {
return fi.file
func (fi *fileInfo) Size() int64 {
return int64(fi.remoteInfo.Size)
func (fi *fileInfo) Mode() fs.FileMode {
return fs.ModeIrregular
func (fi *fileInfo) ModTime() time.Time {
return fi.remoteInfo.ModTime
func (fi *fileInfo) IsDir() bool {
return false
func (fi *fileInfo) Sys() any {
return fi.rcloneInfo
type dirEntry struct {
info *fileInfo
func (e dirEntry) Name() string {
return e.info.Name()
func (e dirEntry) IsDir() bool {
return e.info.IsDir()
func (e dirEntry) Type() fs.FileMode {
return e.info.Mode()
func (e dirEntry) Info() (fs.FileInfo, error) {
return e.info, nil
var ErrAccessDenied = errors.New("access denied")
func (c *RCloneSession) ReadRemoteDir(ctx context.Context, refresh bool) ([]fs.DirEntry, error) {
if len(c.remoteFs) == 0 {
return nil, fmt.Errorf("remote fs undefined")
defer c.oplock.Unlock()
fileCount := len(c.files)
if fileCount == 0 || refresh {
listBody, err := json.Marshal(struct {
Fs string `json:"fs"`
Remote string `json:"remote"`
Fs: c.remoteFs,
Remote: "",
if err != nil {
return nil, fmt.Errorf("can't marshal list request: %w", err)
listRequest, err := http.NewRequestWithContext(ctx, http.MethodPost,
c.rcloneUrl+"/operations/list", bytes.NewBuffer(listBody))
if err != nil {
return nil, fmt.Errorf("can't create list request: %w", err)
listRequest.Header.Set("Content-Type", "application/json")
var response *http.Response
for i := 0; i < 10; i++ {
response, err = c.rcloneSession.Do(listRequest) //nolint:bodyclose
if err == nil {
time.Sleep(2 * time.Second)
if err != nil {
return nil, fmt.Errorf("can't get remote list: %w", err)
defer response.Body.Close()
if response.StatusCode != http.StatusOK {
body, _ := io.ReadAll(response.Body)
e := struct {
Error string `json:"error"`
if err := json.Unmarshal(body, &e); err == nil {
if strings.Contains(e.Error, "AccessDenied") {
return nil, fmt.Errorf("can't get remote list: %w", ErrAccessDenied)
return nil, fmt.Errorf("can't get remote list: %s: %s", response.Status, string(body))
responseBody := struct {
List []remoteInfo `json:"list"`
if err := json.NewDecoder(response.Body).Decode(&responseBody); err != nil {
return nil, fmt.Errorf("can't decode remote list: %w", err)
for _, fi := range responseBody.List {
localInfo, _ := os.Stat(filepath.Join(c.localFs, fi.Name))
if rcinfo, ok := c.files[fi.Name]; ok {
rcinfo.localInfo = localInfo
rcinfo.remoteInfo = fi
if snapInfo, ok := snaptype.ParseFileName(c.localFs, fi.Name); ok {
rcinfo.snapInfo = &snapInfo
} else {
rcinfo.snapInfo = nil
} else {
info := &rcloneInfo{
file: fi.Name,
localInfo: localInfo,
remoteInfo: fi,
if snapInfo, ok := snaptype.ParseFileName(c.localFs, fi.Name); ok {
info.snapInfo = &snapInfo
c.files[fi.Name] = info
var entries = make([]fs.DirEntry, 0, len(c.files))
for _, info := range c.files {
if info.remoteInfo.Size > 0 {
entries = append(entries, &dirEntry{&fileInfo{info}})
slices.SortFunc(entries, func(a, b fs.DirEntry) int {
return strings.Compare(a.Name(), b.Name())
return entries, nil
type rcloneFilter struct {
IncludeRule []string `json:"IncludeRule"`
type rcloneRequest struct {
Async bool `json:"_async,omitempty"`
Config map[string]interface{} `json:"_config,omitempty"`
Group string `json:"group"`
SrcFs string `json:"srcFs"`
DstFs string `json:"dstFs"`
Filter rcloneFilter `json:"_filter"`
func (c *RCloneSession) syncFiles(ctx context.Context) {
if !c.syncScheduled.CompareAndSwap(false, true) {
g, gctx := errgroup.WithContext(ctx)
minRetryTime := 30 * time.Second
maxRetryTime := 300 * time.Second
retry := func(request syncRequest) {
switch {
case request.retryTime == 0:
request.retryTime = minRetryTime
case request.retryTime < maxRetryTime:
request.retryTime += request.retryTime
request.retryTime = maxRetryTime
retryTimer := time.NewTicker(request.retryTime)
select {
case <-request.ctx.Done():
request.cerr <- request.ctx.Err()
case <-retryTimer.C:
syncQueue := c.syncQueue
if syncQueue != nil {
syncQueue <- request
} else {
request.cerr <- fmt.Errorf("no sync queue available")
go func() {
logEvery := time.NewTicker(20 * time.Second)
defer logEvery.Stop()
select {
case <-gctx.Done():
if syncCount := int(c.activeSyncCount.Load()) + len(c.syncQueue); syncCount > 0 {
log.Info("[rclone] Synced files", "processed", fmt.Sprintf("%d/%d", c.activeSyncCount.Load(), syncCount))
syncQueue := c.syncQueue
c.syncQueue = nil
if syncQueue != nil {
case <-logEvery.C:
if syncCount := int(c.activeSyncCount.Load()) + len(c.syncQueue); syncCount > 0 {
log.Info("[rclone] Syncing files", "progress", fmt.Sprintf("%d/%d", c.activeSyncCount.Load(), syncCount))
go func() {
for req := range c.syncQueue {
if gctx.Err() != nil {
req.cerr <- gctx.Err()
func(req syncRequest) {
g.Go(func() error {
defer func() {
if r := recover(); r != nil {
log.Error("[rclone] snapshot sync failed", "err", r, "stack", dbg.Stack())
if gctx.Err() != nil {
req.cerr <- gctx.Err()
var err error
var ok bool
if err, ok = r.(error); ok {
req.cerr <- fmt.Errorf("snapshot sync failed: %w", err)
} else {
req.cerr <- fmt.Errorf("snapshot sync failed: %s", r)
if req.ctx.Err() != nil {
req.cerr <- req.ctx.Err()
return nil //nolint:nilerr
if err := c.sync(gctx, req.request); err != nil {
if gctx.Err() != nil {
req.cerr <- gctx.Err()
} else {
go retry(req)
return nil //nolint:nilerr
for _, info := range req.info {
localInfo, _ := os.Stat(filepath.Join(c.localFs, info.file))
info.localInfo = localInfo
info.remoteInfo = remoteInfo{
Name: info.file,
Size: uint64(localInfo.Size()),
ModTime: localInfo.ModTime(),
req.cerr <- nil
return nil
if err := g.Wait(); err != nil {
c.logger.Debug("[rclone] uploading failed", "err", err)