Compress params change (#5631)

as of https://github.com/ledgerwatch/erigon-lib/pull/651
This commit is contained in:
Alex Sharov 2022-10-05 17:54:54 +07:00 committed by GitHub
parent c48d51a582
commit ca9aa4723c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 234 additions and 161 deletions

125
README.md
View File

@ -49,7 +49,8 @@ System Requirements
===================
* For an Archive node of Ethereum Mainnet we recommend >=3TB storage space: 1.8TB state (as of March 2022),
200GB temp files (can symlink or mount folder `<datadir>/etl-tmp` to another disk). Ethereum Mainnet Full node (see `--prune*` flags): 400Gb (April 2022).
200GB temp files (can symlink or mount folder `<datadir>/etl-tmp` to another disk). Ethereum Mainnet Full node (
see `--prune*` flags): 400Gb (April 2022).
* Goerli Full node (see `--prune*` flags): 189GB on Beta, 114GB on Alpha (April 2022)..
@ -62,7 +63,8 @@ Bear in mind that SSD performance deteriorates when close to capacity.
RAM: >=16GB, 64-bit architecture, [Golang version >= 1.18](https://golang.org/doc/install), GCC 10+
<code>🔬 more details on disk storage [here](https://erigon.substack.com/p/disk-footprint-changes-in-new-erigon?s=r) and [here](https://ledgerwatch.github.io/turbo_geth_release.html#Disk-space).</code>
<code>🔬 more details on disk storage [here](https://erigon.substack.com/p/disk-footprint-changes-in-new-erigon?s=r)
and [here](https://ledgerwatch.github.io/turbo_geth_release.html#Disk-space).</code>
Usage
=====
@ -70,6 +72,7 @@ Usage
### Getting Started
For building the latest alpha release (this will be suitable for most users just wanting to run a node):
```sh
git clone https://github.com/ledgerwatch/erigon.git
cd erigon
@ -77,9 +80,11 @@ git checkout alpha
make erigon
./build/bin/erigon
```
You can check [the list of releases](https://github.com/ledgerwatch/erigon/releases) for release notes.
For building the bleeding edge development branch:
```sh
git clone --recurse-submodules https://github.com/ledgerwatch/erigon.git
cd erigon
@ -88,7 +93,8 @@ make erigon
./build/bin/erigon
```
Default `--snapshots` for `mainnet`, `goerli`, `bsc`. Other networks now have default `--snapshots=false`. Increase download speed by flag `--torrent.download.rate=20mb`. <code>🔬 See [Downloader docs](./cmd/downloader/readme.md)</code>
Default `--snapshots` for `mainnet`, `goerli`, `bsc`. Other networks now have default `--snapshots=false`. Increase
download speed by flag `--torrent.download.rate=20mb`. <code>🔬 See [Downloader docs](./cmd/downloader/readme.md)</code>
Use `--datadir` to choose where to store data.
@ -108,7 +114,8 @@ How to start Erigon's services as separated processes, see in [docker-compose.ym
There is an optional stage that can be enabled through flags:
* `--watch-the-burn`, Enable WatchTheBurn stage which keeps track of ETH issuance and is required to use `erigon_watchTheBurn`.
* `--watch-the-burn`, Enable WatchTheBurn stage which keeps track of ETH issuance and is required to
use `erigon_watchTheBurn`.
### Testnets
@ -162,8 +169,9 @@ Windows users may run erigon in 3 possible ways:
following point)
* If you need to build MDBX tools (i.e. `.\wmake.ps1 db-tools`)
then [Chocolatey package manager](https://chocolatey.org/) for Windows must be installed. By Chocolatey you need
to install the following components : `cmake`, `make`, `mingw` by `choco install cmake make mingw`. Make sure Windows System "Path" variable has:
C:\ProgramData\chocolatey\lib\mingw\tools\install\mingw64\bin
to install the following components : `cmake`, `make`, `mingw` by `choco install cmake make mingw`. Make sure
Windows System "Path" variable has:
C:\ProgramData\chocolatey\lib\mingw\tools\install\mingw64\bin
**Important note about Anti-Viruses**
During MinGW's compiler detection phase some temporary executables are generated to test compiler capabilities. It's
@ -189,17 +197,19 @@ C:\ProgramData\chocolatey\lib\mingw\tools\install\mingw64\bin
### Using TOML or YAML Config Files
You can set Erigon flags through a YAML or TOML configuration file with the flag `--config`. The flags set in the configuration
You can set Erigon flags through a YAML or TOML configuration file with the flag `--config`. The flags set in the
configuration
file can be overwritten by writing the flags directly on Erigon command line
### Example
`./build/bin/erigon --config ./config.yaml --chain=goerli
Assuming we have `chain : "mainnet" in our configuration file, by adding `--chain=goerli` allows the overwrite of the flag inside
Assuming we have `chain : "mainnet" in our configuration file, by adding `--chain=goerli` allows the overwrite of the
flag inside
of the yaml configuration file and sets the chain to goerli
### TOML
### TOML
Example of setting up TOML config file
@ -213,7 +223,7 @@ http = true
"http.api" = ["eth","debug","net"]
```
### YAML
### YAML
Example of setting up a YAML config file
@ -227,8 +237,6 @@ private.api.addr : "localhost:9090"
http.api : ["eth","debug","net"]
```
### Beacon Chain (Consensus Layer)
Erigon can be used as an Execution Layer (EL) for Consensus Layer clients (CL). Default configuration is OK.
@ -238,18 +246,23 @@ as well as `--authrpc.vhosts <CL host>`.
[Engine API]: https://github.com/ethereum/execution-apis/blob/main/src/engine/specification.md
In order to establish a secure connection between the Consensus Layer and the Execution Layer, a JWT secret key is automatically generated.
In order to establish a secure connection between the Consensus Layer and the Execution Layer, a JWT secret key is
automatically generated.
The JWT secret key will be present in the datadir by default under the name of `jwt.hex` and its path can be specified with the flag `--authrpc.jwtsecret`.
The JWT secret key will be present in the datadir by default under the name of `jwt.hex` and its path can be specified
with the flag `--authrpc.jwtsecret`.
This piece of info needs to be specified in the Consensus Layer as well in order to establish connection successfully. More information can be found [here](https://github.com/ethereum/execution-apis/blob/main/src/engine/authentication.md).
This piece of info needs to be specified in the Consensus Layer as well in order to establish connection successfully.
More information can be found [here](https://github.com/ethereum/execution-apis/blob/main/src/engine/authentication.md).
Once Erigon is running, you need to point your CL client to `<erigon address>:8551`,
where `<erigon address>` is either `localhost` or the IP address of the device running Erigon, and also point to the JWT secret path created by Erigon.
where `<erigon address>` is either `localhost` or the IP address of the device running Erigon, and also point to the JWT
secret path created by Erigon.
### Multiple Instances / One Machine
Define 6 flags to avoid conflicts: `--datadir --port --http.port --authrpc.port --torrent.port --private.api.addr`. Example of multiple chains on the same machine:
Define 6 flags to avoid conflicts: `--datadir --port --http.port --authrpc.port --torrent.port --private.api.addr`.
Example of multiple chains on the same machine:
```
# mainnet
@ -263,6 +276,7 @@ Define 6 flags to avoid conflicts: `--datadir --port --http.port --authrpc.port
Quote your path if it has spaces.
### Dev Chain
<code> 🔬 Detailed explanation is [DEV_CHAIN](/DEV_CHAIN.md).</code>
Key features
@ -317,11 +331,12 @@ Examples of stages are:
### JSON-RPC daemon
Most of Erigon's components (sentry, txpool, snapshots downloader, can work inside Erigon and as independent process.
Most of Erigon's components (sentry, txpool, snapshots downloader, can work inside Erigon and as independent process.
To enable built-in RPC server: `--http` and `--ws` (sharing same port with http)
Run RPCDaemon as separated process: this daemon can use local DB (with running Erigon or on snapshot of a database) or remote DB (run on another server). <code>🔬 See [RPC-Daemon docs](./cmd/rpcdaemon/README.md)</code>
Run RPCDaemon as separated process: this daemon can use local DB (with running Erigon or on snapshot of a database) or
remote DB (run on another server). <code>🔬 See [RPC-Daemon docs](./cmd/rpcdaemon/README.md)</code>
#### **For remote DB**
@ -346,12 +361,16 @@ For a details on the implementation status of each
command, [see this table](./cmd/rpcdaemon/README.md#rpc-implementation-status).
### Run all components by docker-compose
Docker allows for building and running Erigon via containers. This alleviates the need for installing build dependencies onto the host OS.
Docker allows for building and running Erigon via containers. This alleviates the need for installing build dependencies
onto the host OS.
#### Optional: Setup dedicated user
User UID/GID need to be synchronized between the host OS and container so files are written with correct permission.
You may wish to setup a dedicated user/group on the host OS, in which case the following `make` targets are available.
```sh
# create "erigon" user
make user_linux
@ -360,21 +379,28 @@ make user_macos
```
#### Environment Variables
There is a `.env.example` file in the root of the repo.
* `DOCKER_UID` - The UID of the docker user
* `DOCKER_GID` - The GID of the docker user
* `XDG_DATA_HOME` - The data directory which will be mounted to the docker containers
If not specified, the UID/GID will use the current user.
A good choice for `XDG_DATA_HOME` is to use the `~erigon/.ethereum` directory created by helper targets `make user_linux` or `make user_macos`.
A good choice for `XDG_DATA_HOME` is to use the `~erigon/.ethereum` directory created by helper
targets `make user_linux` or `make user_macos`.
#### Check: Permissions
In all cases, `XDG_DATA_HOME` (specified or default) must be writeable by the user UID/GID in docker, which will be determined by the `DOCKER_UID` and `DOCKER_GID` at build time.
If a build or service startup is failing due to permissions, check that all the directories, UID, and GID controlled by these environment variables are correct.
In all cases, `XDG_DATA_HOME` (specified or default) must be writeable by the user UID/GID in docker, which will be
determined by the `DOCKER_UID` and `DOCKER_GID` at build time.
If a build or service startup is failing due to permissions, check that all the directories, UID, and GID controlled by
these environment variables are correct.
#### Run
Next command starts: Erigon on port 30303, rpcdaemon on port 8545, prometheus on port 9090, and grafana on port 3000.
```sh
@ -457,42 +483,45 @@ Detailed explanation: [./docs/programmers_guide/db_faq.md](./docs/programmers_gu
#### `erigon` ports
| Port | Protocol | Purpose | Expose |
| Port | Protocol | Purpose | Expose |
|:-----:|:---------:|:----------------------:|:-------:|
| 30303 | TCP & UDP | eth/66 or 67 peering | Public |
| 9090 | TCP | gRPC Connections | Private |
| 42069 | TCP & UDP | Snap sync (Bittorrent) | Public |
| 6060 | TCP | Metrics or Pprof | Private |
| 8551 | TCP | Engine API (JWT auth) | Private |
| 30303 | TCP & UDP | eth/66 or 67 peering | Public |
| 9090 | TCP | gRPC Connections | Private |
| 42069 | TCP & UDP | Snap sync (Bittorrent) | Public |
| 6060 | TCP | Metrics or Pprof | Private |
| 8551 | TCP | Engine API (JWT auth) | Private |
Typically, 30303 is exposed to the internet to allow incoming peering connections. 9090 is exposed only
internally for rpcdaemon or other connections, (e.g. rpcdaemon -> erigon).
Port 8551 (JWT authenticated) is exposed only internally for [Engine API] JSON-RPC queries from the Consensus Layer node.
Port 8551 (JWT authenticated) is exposed only internally for [Engine API] JSON-RPC queries from the Consensus Layer
node.
#### `RPC` ports
| Port | Protocol | Purpose | Expose |
|:-----:|:---------:|:------------------:|:-------:|
| 8545 | TCP | HTTP & WebSockets | Private |
| Port | Protocol | Purpose | Expose |
|:----:|:--------:|:-----------------:|:-------:|
| 8545 | TCP | HTTP & WebSockets | Private |
Typically, 8545 is exposed only internally for JSON-RPC queries. Both HTTP and WebSocket connections are on the same port.
Typically, 8545 is exposed only internally for JSON-RPC queries. Both HTTP and WebSocket connections are on the same
port.
#### `sentry` ports
| Port | Protocol | Purpose | Expose |
| Port | Protocol | Purpose | Expose |
|:-----:|:---------:|:----------------:|:-------:|
| 30303 | TCP & UDP | Peering | Public |
| 9091 | TCP | gRPC Connections | Private |
| 30303 | TCP & UDP | Peering | Public |
| 9091 | TCP | gRPC Connections | Private |
Typically, a sentry process will run one eth/xx protocol (e.g. eth/66) and will be exposed to the internet on 30303. Port
Typically, a sentry process will run one eth/xx protocol (e.g. eth/66) and will be exposed to the internet on 30303.
Port
9091 is for internal gRCP connections (e.g erigon -> sentry).
#### Other ports
| Port | Protocol | Purpose | Expose |
| Port | Protocol | Purpose | Expose |
|:----:|:--------:|:-------:|:-------:|
| 6060 | TCP | pprof | Private |
| 6060 | TCP | metrics | Private |
| 6060 | TCP | pprof | Private |
| 6060 | TCP | metrics | Private |
Optional flags can be enabled that enable pprof or metrics (or both) - however, they both run on 6060 by default, so
you'll have to change one if you want to run both at the same time. use `--help` with the binary for more info.
@ -508,13 +537,16 @@ Reserved for future use: **gRPC ports**: `9092` consensus engine, `9093` snapsho
run `go tool pprof -inuse_space -png http://127.0.0.1:6060/debug/pprof/heap > mem.png`
### How to run local devnet?
<code> 🔬 Detailed explanation is [here](/DEV_CHAIN.md).</code>
### Docker permissions error
Docker uses user erigon with UID/GID 1000 (for security reasons). You can see this user being created in the Dockerfile.
Can fix by giving a host's user ownership of the folder, where the host's user UID/GID is the same as the docker's user UID/GID (1000).
More details in [post](https://www.fullstaq.com/knowledge-hub/blogs/docker-and-the-host-filesystem-owner-matching-problem)
Can fix by giving a host's user ownership of the folder, where the host's user UID/GID is the same as the docker's user
UID/GID (1000).
More details
in [post](https://www.fullstaq.com/knowledge-hub/blogs/docker-and-the-host-filesystem-owner-matching-problem)
### Run RaspberyPI
@ -597,8 +629,10 @@ memory.
**Warning:** Multiple instances of Erigon on same machine will touch Disk concurrently, it impacts performance - one of
main Erigon optimisations: "reduce Disk random access".
"Blocks Execution stage" still does many random reads - this is reason why it's slowest stage. We do not recommend running
multiple genesis syncs on same Disk. If genesis sync passed, then it's fine to run multiple Erigon instances on same Disk.
"Blocks Execution stage" still does many random reads - this is reason why it's slowest stage. We do not recommend
running
multiple genesis syncs on same Disk. If genesis sync passed, then it's fine to run multiple Erigon instances on same
Disk.
### Blocks Execution is slow on cloud-network-drives
@ -617,6 +651,7 @@ For example: btrfs's autodefrag option - may increase write IO 100x times
### the --mount option requires BuildKit error
For anyone else that was getting the BuildKit error when trying to start Erigon the old way you can use the below...
```
XDG_DATA_HOME=/preferred/data/folder DOCKER_BUILDKIT=1 COMPOSE_DOCKER_CLI_BUILD=1 make docker-compose
```

View File

@ -127,10 +127,10 @@ func createMagnetLinkWithInfoHash(hash *prototypes.H160, torrentClient *torrent.
return false, nil
}
infoHash := Proto2InfoHash(hash)
log.Debug("[downloader] downloading torrent and seg file", "hash", infoHash)
//log.Debug("[downloader] downloading torrent and seg file", "hash", infoHash)
if _, ok := torrentClient.Torrent(infoHash); ok {
log.Debug("[downloader] torrent client related to hash found", "hash", infoHash)
//log.Debug("[downloader] torrent client related to hash found", "hash", infoHash)
return true, nil
}
@ -151,6 +151,6 @@ func createMagnetLinkWithInfoHash(hash *prototypes.H160, torrentClient *torrent.
return
}
}(magnet.String())
log.Debug("[downloader] downloaded both seg and torrent files", "hash", infoHash)
//log.Debug("[downloader] downloaded both seg and torrent files", "hash", infoHash)
return false, nil
}

View File

@ -230,8 +230,7 @@ func BuildTorrentFilesIfNeed(ctx context.Context, snapDir string) ([]string, err
defer i.Inc()
defer sem.Release(1)
defer wg.Done()
err = buildTorrentIfNeed(f, snapDir)
if err != nil {
if err := buildTorrentIfNeed(f, snapDir); err != nil {
errs <- err
}

View File

@ -1,6 +1,7 @@
package commands
import (
"github.com/ledgerwatch/erigon/turbo/cli"
"github.com/spf13/cobra"
"github.com/ledgerwatch/erigon/cmd/utils"
@ -110,7 +111,7 @@ func withDataDir(cmd *cobra.Command) {
}
func withBatchSize(cmd *cobra.Command) {
cmd.Flags().StringVar(&batchSizeStr, "batchSize", "512M", "batch size for execution stage")
cmd.Flags().StringVar(&batchSizeStr, "batchSize", cli.BatchSizeFlag.Value, cli.BatchSizeFlag.Usage)
}
func withIntegrityChecks(cmd *cobra.Command) {

View File

@ -491,7 +491,8 @@ func stageHeaders(db kv.RwDB, ctx context.Context) error {
}
if reset {
if err := reset2.ResetBlocks(tx, sn, br); err != nil {
dirs := datadir.New(datadirCli)
if err := reset2.ResetBlocks(tx, db, sn, br, dirs.Tmp); err != nil {
return err
}
return nil
@ -1168,7 +1169,7 @@ func allSnapshots(db kv.RoDB) (*snapshotsync.RoSnapshots, *libstate.Aggregator22
}
_allSnapshotsSingleton.LogStat()
db.View(context.Background(), func(tx kv.Tx) error {
_aggSingleton.LogStats(func(endTxNumMinimax uint64) uint64 {
_aggSingleton.LogStats(tx, func(endTxNumMinimax uint64) uint64 {
_, histBlockNumProgress, _ := rawdb.TxNums.FindBlockNum(tx, endTxNumMinimax)
return histBlockNumProgress
})

View File

@ -357,7 +357,7 @@ func RemoteServices(ctx context.Context, cfg httpcfg.HttpCfg, logger log.Logger,
}
db.View(context.Background(), func(tx kv.Tx) error {
agg.LogStats(func(endTxNumMinimax uint64) uint64 {
agg.LogStats(tx, func(endTxNumMinimax uint64) uint64 {
_, histBlockNumProgress, _ := rawdb.TxNums.FindBlockNum(tx, endTxNumMinimax)
return histBlockNumProgress
})
@ -381,7 +381,7 @@ func RemoteServices(ctx context.Context, cfg httpcfg.HttpCfg, logger log.Logger,
log.Error("[Snapshots] reopen", "err", err)
} else {
db.View(context.Background(), func(tx kv.Tx) error {
agg.LogStats(func(endTxNumMinimax uint64) uint64 {
agg.LogStats(tx, func(endTxNumMinimax uint64) uint64 {
_, histBlockNumProgress, _ := rawdb.TxNums.FindBlockNum(tx, endTxNumMinimax)
return histBlockNumProgress
})
@ -437,16 +437,6 @@ func RemoteServices(ctx context.Context, cfg httpcfg.HttpCfg, logger log.Logger,
}()
ff = rpchelper.New(ctx, eth, txPool, mining, onNewSnapshot)
if cfg.WithDatadir {
dirs := datadir.New(cfg.DataDir)
if agg, err = libstate.NewAggregator22(dirs.SnapHistory, ethconfig.HistoryV3AggregationStep); err != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("create aggregator: %w", err)
}
if err = agg.ReopenFiles(); err != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("create aggregator: %w", err)
}
}
return db, borDb, eth, txPool, mining, stateCache, blockReader, ff, agg, err
}

View File

@ -288,7 +288,7 @@ func (api *APIImpl) getLogsV3(ctx context.Context, tx kv.Tx, begin, end uint64,
ac := api._agg.MakeContext()
ac.SetTx(tx)
topicsBitmap, err := getTopicsBitmap2(ac, tx, crit.Topics, fromTxNum, toTxNum)
topicsBitmap, err := getTopicsBitmapV3(ac, tx, crit.Topics, fromTxNum, toTxNum)
if err != nil {
return nil, err
}
@ -325,6 +325,7 @@ func (api *APIImpl) getLogsV3(ctx context.Context, tx kv.Tx, begin, end uint64,
var lastRules *params.Rules
stateReader := state.NewHistoryReader22(ac)
stateReader.SetTx(tx)
//stateReader.SetTrace(true)
iter := txNumbers.Iterator()
chainConfig, err := api.chainConfig(tx)
@ -383,7 +384,7 @@ func (api *APIImpl) getLogsV3(ctx context.Context, tx kv.Tx, begin, end uint64,
ibs.Prepare(txHash, lastBlockHash, txIndex)
_, err = core.ApplyMessage(evm, msg, gp, true /* refunds */, false /* gasBailout */)
if err != nil {
return nil, err
return nil, fmt.Errorf("%w: blockNum=%d, txNum=%d", err, blockNum, txNum)
}
rawLogs := ibs.GetLogs(txHash)
var logIndex uint
@ -415,7 +416,7 @@ func (api *APIImpl) getLogsV3(ctx context.Context, tx kv.Tx, begin, end uint64,
// {{}, {B}} matches any topic in first position AND B in second position
// {{A}, {B}} matches topic A in first position AND B in second position
// {{A, B}, {C, D}} matches topic (A OR B) in first position AND (C OR D) in second position
func getTopicsBitmap2(ac *libstate.Aggregator22Context, tx kv.Tx, topics [][]common.Hash, from, to uint64) (*roaring64.Bitmap, error) {
func getTopicsBitmapV3(ac *libstate.Aggregator22Context, tx kv.Tx, topics [][]common.Hash, from, to uint64) (*roaring64.Bitmap, error) {
var result *roaring64.Bitmap
for _, sub := range topics {
var bitmapForORing roaring64.Bitmap

View File

@ -293,7 +293,7 @@ func (cr EpochReader) FindBeforeOrEqualNumber(number uint64) (blockNum uint64, b
}
func NewWorkersPool(lock sync.Locker, background bool, chainDb kv.RoDB, wg *sync.WaitGroup, rs *state.State22, blockReader services.FullBlockReader, chainConfig *params.ChainConfig, logger log.Logger, genesis *core.Genesis, engine consensus.Engine, workerCount int) (reconWorkers []*Worker22, resultCh chan *state.TxTask, clear func()) {
queueSize := workerCount * 32
queueSize := workerCount * 64
reconWorkers = make([]*Worker22, workerCount)
resultCh = make(chan *state.TxTask, queueSize)
for i := 0; i < workerCount; i++ {

View File

@ -359,7 +359,7 @@ func (rw *ReconWorker) runTxTask(txTask *state2.TxTask) {
//fmt.Printf("txNum=%d, blockNum=%d, txIndex=%d, evm=%p\n", txTask.TxNum, txTask.BlockNum, txTask.TxIndex, vmenv)
_, err = core.ApplyMessage(vmenv, msg, gp, true /* refunds */, false /* gasBailout */)
if err != nil {
panic(fmt.Errorf("could not apply tx %d [%x] failed: %w", txTask.TxIndex, txHash, err))
panic(fmt.Errorf("could not apply blockNum=%d, txIdx=%d [%x] failed: %w", txTask.BlockNum, txTask.TxIndex, txHash, err))
}
if err = ibs.FinalizeTx(rules, noop); err != nil {
panic(err)

View File

@ -384,7 +384,7 @@ var (
DBReadConcurrencyFlag = cli.IntFlag{
Name: "db.read.concurrency",
Usage: "Does limit amount of parallel db reads. Default: equal to GOMAXPROCS (or number of CPU)",
Value: cmp.Max(10, runtime.GOMAXPROCS(-1)*4),
Value: cmp.Max(10, runtime.GOMAXPROCS(-1)*8),
}
RpcAccessListFlag = cli.StringFlag{
Name: "rpc.accessList",

View File

@ -12,6 +12,7 @@ import (
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/erigon/turbo/services"
"github.com/ledgerwatch/erigon/turbo/snapshotsync"
"github.com/ledgerwatch/log/v3"
)
func ResetState(db kv.RwDB, ctx context.Context, chain string) error {
@ -44,7 +45,23 @@ func ResetState(db kv.RwDB, ctx context.Context, chain string) error {
return nil
}
func ResetBlocks(tx kv.RwTx, snapshots *snapshotsync.RoSnapshots, br services.HeaderAndCanonicalReader) error {
func ResetBlocks(tx kv.RwTx, db kv.RoDB, snapshots *snapshotsync.RoSnapshots, br services.HeaderAndCanonicalReader, tmpdir string) error {
go func() { //inverted read-ahead - to warmup data
_ = db.View(context.Background(), func(tx kv.Tx) error {
c, err := tx.Cursor(kv.EthTx)
if err != nil {
return err
}
defer c.Close()
for k, _, err := c.Last(); k != nil; k, _, err = c.Prev() {
if err != nil {
return err
}
}
return nil
})
}()
// keep Genesis
if err := rawdb.TruncateBlocks(context.Background(), tx, 1); err != nil {
return err
@ -95,10 +112,13 @@ func ResetBlocks(tx kv.RwTx, snapshots *snapshotsync.RoSnapshots, br services.He
}
if snapshots != nil && snapshots.Cfg().Enabled && snapshots.BlocksAvailable() > 0 {
if err := stagedsync.FillDBFromSnapshots("fillind_db_from_snapshots", context.Background(), tx, "", snapshots, br); err != nil {
if err := stagedsync.FillDBFromSnapshots("fillind_db_from_snapshots", context.Background(), tx, tmpdir, snapshots, br); err != nil {
return err
}
_ = stages.SaveStageProgress(tx, stages.Snapshots, snapshots.BlocksAvailable())
_ = stages.SaveStageProgress(tx, stages.Headers, snapshots.BlocksAvailable())
_ = stages.SaveStageProgress(tx, stages.Bodies, snapshots.BlocksAvailable())
_ = stages.SaveStageProgress(tx, stages.Senders, snapshots.BlocksAvailable())
}
return nil
@ -142,6 +162,7 @@ func ResetExec(tx kv.RwTx, chain string) (err error) {
kv.Code, kv.PlainContractCode, kv.ContractCode, kv.IncarnationMap,
}
for _, b := range stateBuckets {
log.Info("Clear", "table", b)
if err := tx.ClearBucket(b); err != nil {
return err
}
@ -165,6 +186,7 @@ func ResetExec(tx kv.RwTx, chain string) (err error) {
kv.TracesToKeys, kv.TracesToIdx,
}
for _, b := range buckets {
log.Info("Clear", "table", b)
if err := tx.ClearBucket(b); err != nil {
return err
}

View File

@ -860,6 +860,9 @@ func (s *Ethereum) setUpBlockReader(ctx context.Context, dirs datadir.Dirs, snCo
if err != nil {
return nil, nil, nil, err
}
if err = agg.ReopenFiles(); err != nil {
return nil, nil, nil, err
}
return blockReader, allSnapshots, agg, nil
}

View File

@ -31,7 +31,6 @@ import (
"github.com/ledgerwatch/erigon/node/nodecfg/datadir"
"github.com/ledgerwatch/erigon/params"
"github.com/ledgerwatch/erigon/turbo/services"
"github.com/ledgerwatch/erigon/turbo/snapshotsync"
"github.com/ledgerwatch/log/v3"
"golang.org/x/sync/semaphore"
)
@ -82,7 +81,6 @@ func (p *Progress) Log(logPrefix string, rs *state.State22, rws state.TxTaskQueu
func Exec3(ctx context.Context,
execStage *StageState, workerCount int, batchSize datasize.ByteSize, chainDb kv.RwDB, applyTx kv.RwTx,
rs *state.State22, blockReader services.FullBlockReader,
allSnapshots *snapshotsync.RoSnapshots,
logger log.Logger, agg *state2.Aggregator22, engine consensus.Engine,
maxBlockNum uint64, chainConfig *params.ChainConfig,
genesis *core.Genesis,
@ -96,6 +94,9 @@ func Exec3(ctx context.Context,
}
defer applyTx.Rollback()
}
if !useExternalTx {
defer blockReader.(WithSnapshots).Snapshots().EnableMadvNormal().DisableReadAhead()
}
var block, stageProgress uint64
var outputTxNum, inputTxNum, maxTxNum uint64
@ -276,18 +277,18 @@ loop:
if err != nil {
return err
}
if parallel {
func() {
rwsLock.Lock()
defer rwsLock.Unlock()
for rws.Len() > queueSize || atomic.LoadInt64(&resultsSize) >= resultsThreshold || rs.SizeEstimate() >= commitThreshold {
rwsReceiveCond.Wait()
}
}()
}
txs := b.Transactions()
for txIndex := -1; txIndex <= len(txs); txIndex++ {
// Do not oversend, wait for the result heap to go under certain size
if parallel {
func() {
rwsLock.Lock()
defer rwsLock.Unlock()
for rws.Len() > queueSize || atomic.LoadInt64(&resultsSize) >= resultsThreshold || rs.SizeEstimate() >= commitThreshold {
rwsReceiveCond.Wait()
}
}()
}
txTask := &state.TxTask{
BlockNum: blockNum,
Rules: rules,
@ -332,41 +333,39 @@ loop:
}
stageProgress = blockNum
if txTask.Final && rs.SizeEstimate() >= commitThreshold {
commitStart := time.Now()
log.Info("Committing...")
if err := rs.Flush(applyTx); err != nil {
return err
}
if !useExternalTx {
if err = execStage.Update(applyTx, stageProgress); err != nil {
return err
}
applyTx.CollectMetrics()
if err := applyTx.Commit(); err != nil {
return err
}
if applyTx, err = chainDb.BeginRw(ctx); err != nil {
return err
}
defer applyTx.Rollback()
agg.SetTx(applyTx)
reconWorkers[0].ResetTx(applyTx)
log.Info("Committed", "time", time.Since(commitStart), "toProgress", stageProgress)
}
}
select {
case <-logEvery.C:
progress.Log(execStage.LogPrefix(), rs, rws, count, inputBlockNum, outputBlockNum, repeatCount, uint64(atomic.LoadInt64(&resultsSize)), resultCh)
default:
}
}
inputTxNum++
}
if rs.SizeEstimate() >= commitThreshold {
commitStart := time.Now()
log.Info("Committing...")
if err := rs.Flush(applyTx); err != nil {
return err
}
if !useExternalTx {
if err = execStage.Update(applyTx, stageProgress); err != nil {
return err
}
applyTx.CollectMetrics()
if err := applyTx.Commit(); err != nil {
return err
}
if applyTx, err = chainDb.BeginRw(ctx); err != nil {
return err
}
defer applyTx.Rollback()
agg.SetTx(applyTx)
reconWorkers[0].ResetTx(applyTx)
log.Info("Committed", "time", time.Since(commitStart), "toProgress", stageProgress)
}
}
// Check for interrupts
select {
case <-logEvery.C:
progress.Log(execStage.LogPrefix(), rs, rws, count, inputBlockNum, outputBlockNum, repeatCount, uint64(atomic.LoadInt64(&resultsSize)), resultCh)
case <-interruptCh:
log.Info(fmt.Sprintf("interrupted, please wait for cleanup, next run will start with block %d", blockNum))
atomic.StoreUint64(&maxTxNum, inputTxNum)
@ -443,14 +442,20 @@ func processResultQueue(rws *state.TxTaskQueue, outputTxNum *uint64, rs *state.S
}
}
func ReconstituteState(ctx context.Context, s *StageState, dirs datadir.Dirs, workerCount int, chainDb kv.RwDB,
func ReconstituteState(ctx context.Context, s *StageState, dirs datadir.Dirs, workerCount int, batchSize datasize.ByteSize, chainDb kv.RwDB,
blockReader services.FullBlockReader,
logger log.Logger, agg *state2.Aggregator22, engine consensus.Engine,
chainConfig *params.ChainConfig, genesis *core.Genesis) (err error) {
defer agg.EnableMadvNormal().DisableReadAhead()
reconDbPath := filepath.Join(dirs.DataDir, "recondb")
dir.Recreate(reconDbPath)
limiterB := semaphore.NewWeighted(int64(runtime.NumCPU()*2 + 1))
db, err := kv2.NewMDBX(log.New()).Path(reconDbPath).RoTxsLimiter(limiterB).WriteMap().WithTableCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { return kv.ReconTablesCfg }).Open()
db, err := kv2.NewMDBX(log.New()).Path(reconDbPath).RoTxsLimiter(limiterB).
WriteMergeThreshold(8192).
PageSize(uint64(16 * datasize.KB)).
WriteMap().WithTableCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { return kv.ReconTablesCfg }).
Open()
if err != nil {
return err
}
@ -490,7 +495,7 @@ func ReconstituteState(ctx context.Context, s *StageState, dirs datadir.Dirs, wo
fmt.Printf("Corresponding block num = %d, txNum = %d\n", blockNum, txNum)
var wg sync.WaitGroup
workCh := make(chan *state.TxTask, 128)
workCh := make(chan *state.TxTask, workerCount*64)
rs := state.NewReconState(workCh)
var fromKey, toKey []byte
bigCount := big.NewInt(int64(workerCount))
@ -552,9 +557,7 @@ func ReconstituteState(ctx context.Context, s *StageState, dirs datadir.Dirs, wo
accountCollectorsX[i] = nil
}
if err = db.Update(ctx, func(tx kv.RwTx) error {
return accountCollectorX.Load(nil, "", func(k, v []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error {
return tx.Put(kv.XAccount, k, v)
}, etl.TransformArgs{})
return accountCollectorX.Load(tx, kv.XAccount, etl.IdentityLoadFunc, etl.TransformArgs{})
}); err != nil {
return err
}
@ -599,9 +602,7 @@ func ReconstituteState(ctx context.Context, s *StageState, dirs datadir.Dirs, wo
storageCollectorsX[i] = nil
}
if err = db.Update(ctx, func(tx kv.RwTx) error {
return storageCollectorX.Load(nil, "", func(k, v []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error {
return tx.Put(kv.XStorage, k, v)
}, etl.TransformArgs{})
return storageCollectorX.Load(tx, kv.XStorage, etl.IdentityLoadFunc, etl.TransformArgs{})
}); err != nil {
return err
}
@ -645,9 +646,7 @@ func ReconstituteState(ctx context.Context, s *StageState, dirs datadir.Dirs, wo
codeCollectorsX[i] = nil
}
if err = db.Update(ctx, func(tx kv.RwTx) error {
return codeCollectorX.Load(nil, "", func(k, v []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error {
return tx.Put(kv.XCode, k, v)
}, etl.TransformArgs{})
return codeCollectorX.Load(tx, kv.XCode, etl.IdentityLoadFunc, etl.TransformArgs{})
}); err != nil {
return err
}
@ -687,7 +686,7 @@ func ReconstituteState(ctx context.Context, s *StageState, dirs datadir.Dirs, wo
for i := 0; i < workerCount; i++ {
go reconWorkers[i].Run()
}
commitThreshold := uint64(256 * 1024 * 1024)
commitThreshold := batchSize.Bytes() * 4
prevCount := uint64(0)
prevRollbackCount := uint64(0)
prevTime := time.Now()
@ -715,7 +714,7 @@ func ReconstituteState(ctx context.Context, s *StageState, dirs datadir.Dirs, wo
prevCount = count
prevRollbackCount = rollbackCount
log.Info("State reconstitution", "workers", workerCount, "progress", fmt.Sprintf("%.2f%%", progress),
"tx/s", fmt.Sprintf("%.1f", speedTx),
"tx/s", fmt.Sprintf("%.1f", speedTx), "workCh", fmt.Sprintf("%d/%d", len(workCh), cap(workCh)),
"repeat ratio", fmt.Sprintf("%.2f%%", repeatRatio),
"buffer", fmt.Sprintf("%s/%s", common.ByteCount(sizeEstimate), common.ByteCount(commitThreshold)),
"alloc", common.ByteCount(m.Alloc), "sys", common.ByteCount(m.Sys))
@ -749,6 +748,9 @@ func ReconstituteState(ctx context.Context, s *StageState, dirs datadir.Dirs, wo
}
}
}()
defer blockReader.(WithSnapshots).Snapshots().EnableReadAhead().DisableReadAhead()
var inputTxNum uint64
var b *types.Block
var txKey [8]byte

View File

@ -48,6 +48,9 @@ func (s *StageState) LogPrefix() string { return s.state.LogPrefix() }
// Update updates the stage state (current block number) in the database. Can be called multiple times during stage execution.
func (s *StageState) Update(db kv.Putter, newBlockNum uint64) error {
if s.ID == stages.Execution && newBlockNum == 0 {
panic(newBlockNum)
}
if m, ok := syncMetrics[s.ID]; ok {
m.Set(newBlockNum)
}

View File

@ -92,7 +92,7 @@ func StageExecuteBlocksCfg(
stateStream bool,
badBlockHalt bool,
exec22 bool,
historyV3 bool,
dirs datadir.Dirs,
blockReader services.FullBlockReader,
hd *headerdownload.HeaderDownload,
@ -115,7 +115,7 @@ func StageExecuteBlocksCfg(
blockReader: blockReader,
hd: hd,
genesis: genesis,
historyV3: exec22,
historyV3: historyV3,
workersCount: workersCount,
agg: agg,
}
@ -247,15 +247,15 @@ func ExecBlock22(s *StageState, u Unwinder, tx kv.RwTx, toBlock uint64, ctx cont
}
cfg.agg.SetWorkers(cmp.Max(1, runtime.NumCPU()-1))
allSnapshots := cfg.blockReader.(WithSnapshots).Snapshots()
if initialCycle {
if initialCycle && s.BlockNumber == 0 {
reconstituteToBlock, found, err := reconstituteBlock(cfg.agg, cfg.db, tx)
if err != nil {
return err
}
if found && reconstituteToBlock > s.BlockNumber+1 {
if err := ReconstituteState(execCtx, s, cfg.dirs, workersCount, cfg.db, cfg.blockReader, log.New(), cfg.agg, cfg.engine, cfg.chainConfig, cfg.genesis); err != nil {
log.Info(fmt.Sprintf("[%s] Blocks execution, reconstitution", s.LogPrefix()), "from", s.BlockNumber, "to", reconstituteToBlock)
if err := ReconstituteState(execCtx, s, cfg.dirs, workersCount, cfg.batchSize, cfg.db, cfg.blockReader, log.New(), cfg.agg, cfg.engine, cfg.chainConfig, cfg.genesis); err != nil {
return err
}
}
@ -279,7 +279,7 @@ func ExecBlock22(s *StageState, u Unwinder, tx kv.RwTx, toBlock uint64, ctx cont
}
rs := state.NewState22()
if err := Exec3(execCtx, s, workersCount, cfg.batchSize, cfg.db, tx, rs,
cfg.blockReader, allSnapshots, log.New(), cfg.agg, cfg.engine,
cfg.blockReader, log.New(), cfg.agg, cfg.engine,
to,
cfg.chainConfig, cfg.genesis); err != nil {
return err

View File

@ -166,7 +166,7 @@ func SpawnRecoverSendersStage(cfg SendersCfg, s *StageState, u Unwinder, tx kv.R
if j != nil {
n += uint64(j.index)
}
log.Info(fmt.Sprintf("[%s] Recovery", logPrefix), "block_number", n)
log.Info(fmt.Sprintf("[%s] Recovery", logPrefix), "block_number", n, "ch", fmt.Sprintf("%d/%d", len(jobs), cap(jobs)))
case j, ok = <-out:
if !ok {
return

View File

@ -120,7 +120,7 @@ func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.R
}
cfg.snapshots.LogStat()
cfg.agg.LogStats(func(endTxNumMinimax uint64) uint64 {
cfg.agg.LogStats(tx, func(endTxNumMinimax uint64) uint64 {
_, histBlockNumProgress, _ := rawdb.TxNums.FindBlockNum(tx, endTxNumMinimax)
return histBlockNumProgress
})

4
go.mod
View File

@ -4,7 +4,7 @@ go 1.18
require (
github.com/gballet/go-verkle v0.0.0-20220923150140-6c08cd337774
github.com/ledgerwatch/erigon-lib v0.0.0-20221004095151-0d6bc2eca418
github.com/ledgerwatch/erigon-lib v0.0.0-20221005081654-85faee1c1817
github.com/ledgerwatch/erigon-snapshot v1.0.1-0.20220913092204-de54ee30c7b9
github.com/ledgerwatch/log/v3 v3.4.2
github.com/ledgerwatch/secp256k1 v1.0.0
@ -73,7 +73,7 @@ require (
github.com/xsleonard/go-merkle v1.1.0
go.uber.org/atomic v1.10.0
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d
golang.org/x/exp v0.0.0-20220916125017-b168a2c6b86b
golang.org/x/exp v0.0.0-20220921164117-439092de6870
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4
golang.org/x/sys v0.0.0-20220919091848-fb04ddd9f9c8
golang.org/x/time v0.0.0-20220609170525-579cf78fd858

8
go.sum
View File

@ -561,8 +561,8 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758 h1:0D5M2HQSGD3PYPwICLl+/9oulQauOuETfgFvhBDffs0=
github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c=
github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8=
github.com/ledgerwatch/erigon-lib v0.0.0-20221004095151-0d6bc2eca418 h1:Kf/A7uhkGlvTuVLhawFFYc9V9Y5SO7Xbn38TgOCwNiI=
github.com/ledgerwatch/erigon-lib v0.0.0-20221004095151-0d6bc2eca418/go.mod h1:nMyol2RVCPF0TSR/VU4bK40JB35R/Q+IdodGW8j1kzU=
github.com/ledgerwatch/erigon-lib v0.0.0-20221005081654-85faee1c1817 h1:9GMsCin3fT1cUX4rwx3HaNkMmlETgQJqwJIp/9+W7eY=
github.com/ledgerwatch/erigon-lib v0.0.0-20221005081654-85faee1c1817/go.mod h1:YDP7ECNyjKo1dE7J5n8GXKBIYOWnmchvGCfALuwhBQg=
github.com/ledgerwatch/erigon-snapshot v1.0.1-0.20220913092204-de54ee30c7b9 h1:iWjzYLtOsp/Wpo9ZWV/eMIlnFzk8bm7POSzrXAILw24=
github.com/ledgerwatch/erigon-snapshot v1.0.1-0.20220913092204-de54ee30c7b9/go.mod h1:3AuPxZc85jkehh/HA9h8gabv5MSi3kb/ddtzBsTVJFo=
github.com/ledgerwatch/log/v3 v3.4.2 h1:chvjB7c100rlIFgPv+Col2eerxIrHL88OiZRuPZDkxw=
@ -1052,8 +1052,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0
golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM=
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
golang.org/x/exp v0.0.0-20220916125017-b168a2c6b86b h1:SCE/18RnFsLrjydh/R/s5EVvHoZprqEQUuoxK8q2Pc4=
golang.org/x/exp v0.0.0-20220916125017-b168a2c6b86b/go.mod h1:cyybsKvd6eL0RnXn6p/Grxp8F5bW7iYuBgsNCOHpMYE=
golang.org/x/exp v0.0.0-20220921164117-439092de6870 h1:j8b6j9gzSigH28O5SjSpQSSh9lFd6f5D/q0aHjNTulc=
golang.org/x/exp v0.0.0-20220921164117-439092de6870/go.mod h1:cyybsKvd6eL0RnXn6p/Grxp8F5bW7iYuBgsNCOHpMYE=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=

View File

@ -110,7 +110,7 @@ var resetBlocks4 = Migration{
log.Warn("NOTE: this migration will remove recent blocks (and senders) to fix several recent bugs. Your node will re-download last ~400K blocks, should not take very long")
}
if err := rawdbreset.ResetBlocks(tx, nil, nil); err != nil {
if err := rawdbreset.ResetBlocks(tx, db, nil, nil, dirs.Tmp); err != nil {
return err
}

View File

@ -320,7 +320,7 @@ func OpenDatabase(config *nodecfg.Config, logger log.Logger, label kv.Label) (kv
var openFunc func(exclusive bool) (kv.RwDB, error)
log.Info("Opening Database", "label", name, "path", dbPath)
openFunc = func(exclusive bool) (kv.RwDB, error) {
roTxLimit := int64(16)
roTxLimit := int64(32)
if config.Http.DBReadConcurrency > 0 {
roTxLimit = int64(config.Http.DBReadConcurrency)
}

View File

@ -43,7 +43,9 @@ func TestBlockchain(t *testing.T) {
bt.skipLoad(`^TransitionTests/bcArrowGlacierToMerge/powToPosBlockRejection\.json`)
if ethconfig.EnableHistoryV3InTest {
// HistoryV3: doesn't produce receipts on execution by design
bt.skipLoad(`^TestBlockchain/InvalidBlocks/bcInvalidHeaderTest/log1_wrongBloom.json`)
bt.skipLoad(`^TestBlockchain/InvalidBlocks/bcInvalidHeaderTest/log1_wrongBloom\.json`)
bt.skipLoad(`^TestBlockchain/InvalidBlocks/bcInvalidHeaderTest/wrongReceiptTrie\.json`)
//bt.skipLoad(`^TestBlockchain/InvalidBlocks/bcInvalidHeaderTest/wrongGasUsed\.json`)
}
bt.walk(t, blockTestDir, func(t *testing.T, name string, test *BlockTest) {

View File

@ -8,9 +8,11 @@ import (
"fmt"
"io"
"os"
"path/filepath"
"runtime"
"time"
"github.com/c2h5oh/datasize"
"github.com/holiman/uint256"
"github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/common/cmp"
@ -211,10 +213,11 @@ func doIndicesCommand(cliCtx *cli.Context) error {
workers := cmp.Max(1, runtime.GOMAXPROCS(-1)-1)
if rebuild {
cfg := ethconfig.NewSnapCfg(true, true, false)
if err := rebuildIndices("Indexing", ctx, chainDB, cfg, dirs, from, workers); err != nil {
log.Error("Error", "err", err)
}
panic("not implemented")
}
cfg := ethconfig.NewSnapCfg(true, true, false)
if err := rebuildIndices("Indexing", ctx, chainDB, cfg, dirs, from, workers); err != nil {
log.Error("Error", "err", err)
}
agg, err := libstate.NewAggregator22(dirs.SnapHistory, ethconfig.HistoryV3AggregationStep)
if err != nil {
@ -248,13 +251,17 @@ func doUncompress(cliCtx *cli.Context) error {
return err
}
defer decompressor.Close()
wr := bufio.NewWriterSize(os.Stdout, 512*1024*1024)
wr := bufio.NewWriterSize(os.Stdout, int(128*datasize.MB))
defer wr.Flush()
logEvery := time.NewTicker(30 * time.Second)
defer logEvery.Stop()
var i uint
var numBuf [binary.MaxVarintLen64]byte
defer decompressor.EnableReadAhead().DisableReadAhead()
g := decompressor.MakeGetter()
buf := make([]byte, 0, 16*etl.BufIOSize)
buf := make([]byte, 0, 1*datasize.MB)
for g.HasNext() {
buf, _ = g.Next(buf[:0])
n := binary.PutUvarint(numBuf[:], uint64(len(buf)))
@ -264,7 +271,12 @@ func doUncompress(cliCtx *cli.Context) error {
if _, err := wr.Write(buf); err != nil {
return err
}
i++
select {
case <-logEvery.C:
_, fileName := filepath.Split(decompressor.FilePath())
progress := 100 * float64(i) / float64(decompressor.Count())
log.Info("[uncompress] ", "progress", fmt.Sprintf("%.2f%%", progress), "file", fileName)
case <-ctx.Done():
return ctx.Err()
default:
@ -281,18 +293,14 @@ func doCompress(cliCtx *cli.Context) error {
}
f := args[0]
dirs := datadir.New(cliCtx.String(utils.DataDirFlag.Name))
workers := runtime.GOMAXPROCS(-1) - 1
if workers < 1 {
workers = 1
}
c, err := compress.NewCompressor(ctx, "", f, dirs.Tmp, compress.MinPatternScore, workers, log.LvlInfo)
workers := cmp.Max(1, runtime.GOMAXPROCS(-1)-1)
c, err := compress.NewCompressor(ctx, "compress", f, dirs.Tmp, compress.MinPatternScore, workers, log.LvlInfo)
if err != nil {
return err
}
defer c.Close()
r := bufio.NewReaderSize(os.Stdin, 512*1024*1024)
buf := make([]byte, 0, 32*1024*1024)
r := bufio.NewReaderSize(os.Stdin, int(128*datasize.MB))
buf := make([]byte, 0, int(1*datasize.MB))
var l uint64
for l, err = binary.ReadUvarint(r); err == nil; l, err = binary.ReadUvarint(r) {
if cap(buf) < int(l) {
@ -351,8 +359,11 @@ func doRetireCommand(cliCtx *cli.Context) error {
if err := rawdb.WriteSnapshots(tx, br.Snapshots().Files()); err != nil {
return err
}
if err := br.PruneAncientBlocks(tx); err != nil {
return err
log.Info("prune blocks from db\n")
for j := 0; j < 10_000; j++ { // prune happens by small steps, so need many runs
if err := br.PruneAncientBlocks(tx); err != nil {
return err
}
}
return nil
}); err != nil {
@ -406,6 +417,7 @@ func rebuildIndices(logPrefix string, ctx context.Context, db kv.RoDB, cfg ethco
if err := allSnapshots.ReopenFolder(); err != nil {
return err
}
allSnapshots.LogStat()
if err := snapshotsync.BuildMissedIndices(logPrefix, ctx, dirs, *chainID, workers); err != nil {
return err

View File

@ -820,6 +820,8 @@ func (s *RoSnapshots) ViewTxs(blockNum uint64, f func(sn *TxnSegment) error) (fo
}
func buildIdx(ctx context.Context, sn snap.FileInfo, chainID uint256.Int, tmpDir string, p *background.Progress, lvl log.Lvl) error {
_, fName := filepath.Split(sn.Path)
log.Debug("[snapshots] build idx", "file", fName)
switch sn.T {
case snap.Headers:
if err := HeadersIdx(ctx, sn.Path, sn.From, tmpDir, p, lvl); err != nil {