erigon-pulse/turbo/services/interfaces.go
Mark Holt 79ed8cad35
E2 snapshot uploading (#9056)
This change introduces additional processes to manage snapshot uploading
for E2 snapshots:

## erigon snapshots upload

The `snapshots uploader` command starts a version of erigon customized
for uploading snapshot files to
a remote location.  

It breaks the stage execution process after the senders stage and then
uses the snapshot stage to send
uploaded headers, bodies and (in the case of polygon) bor spans and
events to snapshot files. Because
this process avoids execution in run signifigantly faster than a
standard erigon configuration.

The uploader uses rclone to send seedable (100K or 500K blocks) to a
remote storage location specified
in the rclone config file.

The **uploader** is configured to minimize disk usage by doing the
following:

* It removes snapshots once they are loaded
* It aggressively prunes the database once entities are transferred to
snapshots

in addition to this it has the following performance related features:

* maximizes the workers allocated to snapshot processing to improve
throughput
* Can be started from scratch by downloading the latest snapshots from
the remote location to seed processing

## snapshots command

Is a stand alone command for managing remote snapshots it has the
following sub commands

* **cmp** - compare snapshots
* **copy** - copy snapshots
* **verify** - verify snapshots
* **manifest** - manage the manifest file in the root of remote snapshot
locations
* **torrent** - manage snapshot torrent files
2023-12-27 22:05:09 +00:00

141 lines
4.8 KiB
Go

package services
import (
"context"
"github.com/ledgerwatch/erigon-lib/chain"
"github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/eth/ethconfig"
"github.com/ledgerwatch/erigon/rlp"
"github.com/ledgerwatch/log/v3"
)
type All struct {
BlockReader FullBlockReader
}
type BlockReader interface {
BlockByNumber(ctx context.Context, db kv.Tx, number uint64) (*types.Block, error)
BlockByHash(ctx context.Context, db kv.Tx, hash common.Hash) (*types.Block, error)
CurrentBlock(db kv.Tx) (*types.Block, error)
BlockWithSenders(ctx context.Context, tx kv.Getter, hash common.Hash, blockNum uint64) (block *types.Block, senders []common.Address, err error)
}
type HeaderReader interface {
Header(ctx context.Context, tx kv.Getter, hash common.Hash, blockNum uint64) (*types.Header, error)
HeaderByNumber(ctx context.Context, tx kv.Getter, blockNum uint64) (*types.Header, error)
HeaderByHash(ctx context.Context, tx kv.Getter, hash common.Hash) (*types.Header, error)
ReadAncestor(db kv.Getter, hash common.Hash, number, ancestor uint64, maxNonCanonical *uint64) (common.Hash, uint64)
//TODO: change it to `iter`
HeadersRange(ctx context.Context, walker func(header *types.Header) error) error
}
type BorEventReader interface {
EventLookup(ctx context.Context, tx kv.Getter, txnHash common.Hash) (uint64, bool, error)
EventsByBlock(ctx context.Context, tx kv.Tx, hash common.Hash, blockNum uint64) ([]rlp.RawValue, error)
}
type BorSpanReader interface {
Span(ctx context.Context, tx kv.Getter, spanNum uint64) ([]byte, error)
}
type CanonicalReader interface {
CanonicalHash(ctx context.Context, tx kv.Getter, blockNum uint64) (common.Hash, error)
BadHeaderNumber(ctx context.Context, tx kv.Getter, hash common.Hash) (blockHeight *uint64, err error)
}
type BodyReader interface {
BodyWithTransactions(ctx context.Context, tx kv.Getter, hash common.Hash, blockNum uint64) (body *types.Body, err error)
BodyRlp(ctx context.Context, tx kv.Getter, hash common.Hash, blockNum uint64) (bodyRlp rlp.RawValue, err error)
Body(ctx context.Context, tx kv.Getter, hash common.Hash, blockNum uint64) (body *types.Body, txAmount uint32, err error)
HasSenders(ctx context.Context, tx kv.Getter, hash common.Hash, blockNum uint64) (bool, error)
}
type TxnReader interface {
TxnLookup(ctx context.Context, tx kv.Getter, txnHash common.Hash) (uint64, bool, error)
TxnByIdxInBlock(ctx context.Context, tx kv.Getter, blockNum uint64, i int) (txn types.Transaction, err error)
RawTransactions(ctx context.Context, tx kv.Getter, fromBlock, toBlock uint64) (txs [][]byte, err error)
}
type HeaderAndCanonicalReader interface {
HeaderReader
CanonicalReader
}
type BlockAndTxnReader interface {
BlockReader
//HeaderReader
TxnReader
}
type FullBlockReader interface {
BlockReader
BodyReader
HeaderReader
BorEventReader
BorSpanReader
TxnReader
CanonicalReader
FrozenBlocks() uint64
FrozenBorBlocks() uint64
FrozenFiles() (list []string)
FreezingCfg() ethconfig.BlocksFreezing
CanPruneTo(currentBlockInDB uint64) (canPruneBlocksTo uint64)
Snapshots() BlockSnapshots
BorSnapshots() BlockSnapshots
}
type BlockSnapshots interface {
LogStat(label string)
ReopenFolder() error
SegmentsMax() uint64
SegmentsMin() uint64
}
// BlockRetire - freezing blocks: moving old data from DB to snapshot files
type BlockRetire interface {
PruneAncientBlocks(tx kv.RwTx, limit int) error
RetireBlocksInBackground(ctx context.Context, miBlockNum uint64, maxBlockNum uint64, lvl log.Lvl, seedNewSnapshots func(downloadRequest []DownloadRequest) error, onDelete func(l []string) error)
HasNewFrozenFiles() bool
BuildMissedIndicesIfNeed(ctx context.Context, logPrefix string, notifier DBEventNotifier, cc *chain.Config) error
SetWorkers(workers int)
}
/*
type HeaderWriter interface {
WriteHeader(tx kv.RwTx, header *types.Header) error
WriteHeaderRaw(tx kv.StatelessRwTx, number uint64, hash libcommon.Hash, headerRlp []byte, skipIndexing bool) error
WriteCanonicalHash(tx kv.RwTx, hash libcommon.Hash, number uint64) error
WriteTd(db kv.Putter, hash libcommon.Hash, number uint64, td *big.Int) error
// [from,to)
FillHeaderNumberIndex(logPrefix string, tx kv.RwTx, tmpDir string, from, to uint64, ctx context.Context, logger log.Logger) error
}
type BlockWriter interface {
HeaderWriter
WriteRawBodyIfNotExists(tx kv.RwTx, hash libcommon.Hash, number uint64, body *types.RawBody) (ok bool, lastTxnNum uint64, err error)
WriteBody(tx kv.RwTx, hash libcommon.Hash, number uint64, body *types.Body) error
}
*/
type DBEventNotifier interface {
OnNewSnapshot()
}
type DownloadRequest struct {
Version uint8
Path string
TorrentHash string
}
func NewDownloadRequest(path string, torrentHash string) DownloadRequest {
return DownloadRequest{Path: path, TorrentHash: torrentHash}
}
type Range struct {
From, To uint64
}