From ca9aa4723cdce418b6a092dca7e40154ed5ef0dd Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Wed, 5 Oct 2022 17:54:54 +0700 Subject: [PATCH] Compress params change (#5631) as of https://github.com/ledgerwatch/erigon-lib/pull/651 --- README.md | 125 +++++++++++------- .../downloader/downloader_grpc_server.go | 6 +- cmd/downloader/downloader/util.go | 3 +- cmd/integration/commands/flags.go | 3 +- cmd/integration/commands/stages.go | 5 +- cmd/rpcdaemon/cli/config.go | 14 +- cmd/rpcdaemon/commands/eth_receipts.go | 7 +- cmd/state/exec3/state.go | 2 +- cmd/state/exec3/state_recon.go | 2 +- cmd/utils/flags.go | 2 +- core/rawdb/rawdbreset/reset_stages.go | 26 +++- eth/backend.go | 3 + eth/stagedsync/exec3.go | 112 ++++++++-------- eth/stagedsync/stage.go | 3 + eth/stagedsync/stage_execute.go | 12 +- eth/stagedsync/stage_senders.go | 2 +- eth/stagedsync/stage_snapshots.go | 2 +- go.mod | 4 +- go.sum | 8 +- migrations/reset_blocks.go | 2 +- node/node.go | 2 +- tests/block_test.go | 4 +- turbo/app/snapshots.go | 44 +++--- turbo/snapshotsync/block_snapshots.go | 2 + 24 files changed, 234 insertions(+), 161 deletions(-) diff --git a/README.md b/README.md index 82ff77ebe..2fe681c61 100644 --- a/README.md +++ b/README.md @@ -49,7 +49,8 @@ System Requirements =================== * For an Archive node of Ethereum Mainnet we recommend >=3TB storage space: 1.8TB state (as of March 2022), -200GB temp files (can symlink or mount folder `/etl-tmp` to another disk). Ethereum Mainnet Full node (see `--prune*` flags): 400Gb (April 2022). + 200GB temp files (can symlink or mount folder `/etl-tmp` to another disk). Ethereum Mainnet Full node ( + see `--prune*` flags): 400Gb (April 2022). * Goerli Full node (see `--prune*` flags): 189GB on Beta, 114GB on Alpha (April 2022).. @@ -62,7 +63,8 @@ Bear in mind that SSD performance deteriorates when close to capacity. RAM: >=16GB, 64-bit architecture, [Golang version >= 1.18](https://golang.org/doc/install), GCC 10+ -🔬 more details on disk storage [here](https://erigon.substack.com/p/disk-footprint-changes-in-new-erigon?s=r) and [here](https://ledgerwatch.github.io/turbo_geth_release.html#Disk-space). +🔬 more details on disk storage [here](https://erigon.substack.com/p/disk-footprint-changes-in-new-erigon?s=r) +and [here](https://ledgerwatch.github.io/turbo_geth_release.html#Disk-space). Usage ===== @@ -70,6 +72,7 @@ Usage ### Getting Started For building the latest alpha release (this will be suitable for most users just wanting to run a node): + ```sh git clone https://github.com/ledgerwatch/erigon.git cd erigon @@ -77,9 +80,11 @@ git checkout alpha make erigon ./build/bin/erigon ``` + You can check [the list of releases](https://github.com/ledgerwatch/erigon/releases) for release notes. For building the bleeding edge development branch: + ```sh git clone --recurse-submodules https://github.com/ledgerwatch/erigon.git cd erigon @@ -88,7 +93,8 @@ make erigon ./build/bin/erigon ``` -Default `--snapshots` for `mainnet`, `goerli`, `bsc`. Other networks now have default `--snapshots=false`. Increase download speed by flag `--torrent.download.rate=20mb`. 🔬 See [Downloader docs](./cmd/downloader/readme.md) +Default `--snapshots` for `mainnet`, `goerli`, `bsc`. Other networks now have default `--snapshots=false`. Increase +download speed by flag `--torrent.download.rate=20mb`. 🔬 See [Downloader docs](./cmd/downloader/readme.md) Use `--datadir` to choose where to store data. @@ -108,7 +114,8 @@ How to start Erigon's services as separated processes, see in [docker-compose.ym There is an optional stage that can be enabled through flags: -* `--watch-the-burn`, Enable WatchTheBurn stage which keeps track of ETH issuance and is required to use `erigon_watchTheBurn`. +* `--watch-the-burn`, Enable WatchTheBurn stage which keeps track of ETH issuance and is required to + use `erigon_watchTheBurn`. ### Testnets @@ -162,8 +169,9 @@ Windows users may run erigon in 3 possible ways: following point) * If you need to build MDBX tools (i.e. `.\wmake.ps1 db-tools`) then [Chocolatey package manager](https://chocolatey.org/) for Windows must be installed. By Chocolatey you need - to install the following components : `cmake`, `make`, `mingw` by `choco install cmake make mingw`. Make sure Windows System "Path" variable has: -C:\ProgramData\chocolatey\lib\mingw\tools\install\mingw64\bin + to install the following components : `cmake`, `make`, `mingw` by `choco install cmake make mingw`. Make sure + Windows System "Path" variable has: + C:\ProgramData\chocolatey\lib\mingw\tools\install\mingw64\bin **Important note about Anti-Viruses** During MinGW's compiler detection phase some temporary executables are generated to test compiler capabilities. It's @@ -189,17 +197,19 @@ C:\ProgramData\chocolatey\lib\mingw\tools\install\mingw64\bin ### Using TOML or YAML Config Files -You can set Erigon flags through a YAML or TOML configuration file with the flag `--config`. The flags set in the configuration +You can set Erigon flags through a YAML or TOML configuration file with the flag `--config`. The flags set in the +configuration file can be overwritten by writing the flags directly on Erigon command line ### Example `./build/bin/erigon --config ./config.yaml --chain=goerli -Assuming we have `chain : "mainnet" in our configuration file, by adding `--chain=goerli` allows the overwrite of the flag inside +Assuming we have `chain : "mainnet" in our configuration file, by adding `--chain=goerli` allows the overwrite of the +flag inside of the yaml configuration file and sets the chain to goerli -### TOML +### TOML Example of setting up TOML config file @@ -213,7 +223,7 @@ http = true "http.api" = ["eth","debug","net"] ``` -### YAML +### YAML Example of setting up a YAML config file @@ -227,8 +237,6 @@ private.api.addr : "localhost:9090" http.api : ["eth","debug","net"] ``` - - ### Beacon Chain (Consensus Layer) Erigon can be used as an Execution Layer (EL) for Consensus Layer clients (CL). Default configuration is OK. @@ -238,18 +246,23 @@ as well as `--authrpc.vhosts `. [Engine API]: https://github.com/ethereum/execution-apis/blob/main/src/engine/specification.md -In order to establish a secure connection between the Consensus Layer and the Execution Layer, a JWT secret key is automatically generated. +In order to establish a secure connection between the Consensus Layer and the Execution Layer, a JWT secret key is +automatically generated. -The JWT secret key will be present in the datadir by default under the name of `jwt.hex` and its path can be specified with the flag `--authrpc.jwtsecret`. +The JWT secret key will be present in the datadir by default under the name of `jwt.hex` and its path can be specified +with the flag `--authrpc.jwtsecret`. -This piece of info needs to be specified in the Consensus Layer as well in order to establish connection successfully. More information can be found [here](https://github.com/ethereum/execution-apis/blob/main/src/engine/authentication.md). +This piece of info needs to be specified in the Consensus Layer as well in order to establish connection successfully. +More information can be found [here](https://github.com/ethereum/execution-apis/blob/main/src/engine/authentication.md). Once Erigon is running, you need to point your CL client to `:8551`, -where `` is either `localhost` or the IP address of the device running Erigon, and also point to the JWT secret path created by Erigon. +where `` is either `localhost` or the IP address of the device running Erigon, and also point to the JWT +secret path created by Erigon. ### Multiple Instances / One Machine -Define 6 flags to avoid conflicts: `--datadir --port --http.port --authrpc.port --torrent.port --private.api.addr`. Example of multiple chains on the same machine: +Define 6 flags to avoid conflicts: `--datadir --port --http.port --authrpc.port --torrent.port --private.api.addr`. +Example of multiple chains on the same machine: ``` # mainnet @@ -263,6 +276,7 @@ Define 6 flags to avoid conflicts: `--datadir --port --http.port --authrpc.port Quote your path if it has spaces. ### Dev Chain + 🔬 Detailed explanation is [DEV_CHAIN](/DEV_CHAIN.md). Key features @@ -317,11 +331,12 @@ Examples of stages are: ### JSON-RPC daemon -Most of Erigon's components (sentry, txpool, snapshots downloader, can work inside Erigon and as independent process. +Most of Erigon's components (sentry, txpool, snapshots downloader, can work inside Erigon and as independent process. To enable built-in RPC server: `--http` and `--ws` (sharing same port with http) -Run RPCDaemon as separated process: this daemon can use local DB (with running Erigon or on snapshot of a database) or remote DB (run on another server). 🔬 See [RPC-Daemon docs](./cmd/rpcdaemon/README.md) +Run RPCDaemon as separated process: this daemon can use local DB (with running Erigon or on snapshot of a database) or +remote DB (run on another server). 🔬 See [RPC-Daemon docs](./cmd/rpcdaemon/README.md) #### **For remote DB** @@ -346,12 +361,16 @@ For a details on the implementation status of each command, [see this table](./cmd/rpcdaemon/README.md#rpc-implementation-status). ### Run all components by docker-compose -Docker allows for building and running Erigon via containers. This alleviates the need for installing build dependencies onto the host OS. + +Docker allows for building and running Erigon via containers. This alleviates the need for installing build dependencies +onto the host OS. #### Optional: Setup dedicated user + User UID/GID need to be synchronized between the host OS and container so files are written with correct permission. You may wish to setup a dedicated user/group on the host OS, in which case the following `make` targets are available. + ```sh # create "erigon" user make user_linux @@ -360,21 +379,28 @@ make user_macos ``` #### Environment Variables + There is a `.env.example` file in the root of the repo. + * `DOCKER_UID` - The UID of the docker user * `DOCKER_GID` - The GID of the docker user * `XDG_DATA_HOME` - The data directory which will be mounted to the docker containers If not specified, the UID/GID will use the current user. -A good choice for `XDG_DATA_HOME` is to use the `~erigon/.ethereum` directory created by helper targets `make user_linux` or `make user_macos`. +A good choice for `XDG_DATA_HOME` is to use the `~erigon/.ethereum` directory created by helper +targets `make user_linux` or `make user_macos`. #### Check: Permissions -In all cases, `XDG_DATA_HOME` (specified or default) must be writeable by the user UID/GID in docker, which will be determined by the `DOCKER_UID` and `DOCKER_GID` at build time. -If a build or service startup is failing due to permissions, check that all the directories, UID, and GID controlled by these environment variables are correct. +In all cases, `XDG_DATA_HOME` (specified or default) must be writeable by the user UID/GID in docker, which will be +determined by the `DOCKER_UID` and `DOCKER_GID` at build time. + +If a build or service startup is failing due to permissions, check that all the directories, UID, and GID controlled by +these environment variables are correct. #### Run + Next command starts: Erigon on port 30303, rpcdaemon on port 8545, prometheus on port 9090, and grafana on port 3000. ```sh @@ -457,42 +483,45 @@ Detailed explanation: [./docs/programmers_guide/db_faq.md](./docs/programmers_gu #### `erigon` ports -| Port | Protocol | Purpose | Expose | +| Port | Protocol | Purpose | Expose | |:-----:|:---------:|:----------------------:|:-------:| -| 30303 | TCP & UDP | eth/66 or 67 peering | Public | -| 9090 | TCP | gRPC Connections | Private | -| 42069 | TCP & UDP | Snap sync (Bittorrent) | Public | -| 6060 | TCP | Metrics or Pprof | Private | -| 8551 | TCP | Engine API (JWT auth) | Private | +| 30303 | TCP & UDP | eth/66 or 67 peering | Public | +| 9090 | TCP | gRPC Connections | Private | +| 42069 | TCP & UDP | Snap sync (Bittorrent) | Public | +| 6060 | TCP | Metrics or Pprof | Private | +| 8551 | TCP | Engine API (JWT auth) | Private | Typically, 30303 is exposed to the internet to allow incoming peering connections. 9090 is exposed only internally for rpcdaemon or other connections, (e.g. rpcdaemon -> erigon). -Port 8551 (JWT authenticated) is exposed only internally for [Engine API] JSON-RPC queries from the Consensus Layer node. +Port 8551 (JWT authenticated) is exposed only internally for [Engine API] JSON-RPC queries from the Consensus Layer +node. #### `RPC` ports -| Port | Protocol | Purpose | Expose | -|:-----:|:---------:|:------------------:|:-------:| -| 8545 | TCP | HTTP & WebSockets | Private | +| Port | Protocol | Purpose | Expose | +|:----:|:--------:|:-----------------:|:-------:| +| 8545 | TCP | HTTP & WebSockets | Private | -Typically, 8545 is exposed only internally for JSON-RPC queries. Both HTTP and WebSocket connections are on the same port. +Typically, 8545 is exposed only internally for JSON-RPC queries. Both HTTP and WebSocket connections are on the same +port. #### `sentry` ports -| Port | Protocol | Purpose | Expose | +| Port | Protocol | Purpose | Expose | |:-----:|:---------:|:----------------:|:-------:| -| 30303 | TCP & UDP | Peering | Public | -| 9091 | TCP | gRPC Connections | Private | +| 30303 | TCP & UDP | Peering | Public | +| 9091 | TCP | gRPC Connections | Private | -Typically, a sentry process will run one eth/xx protocol (e.g. eth/66) and will be exposed to the internet on 30303. Port +Typically, a sentry process will run one eth/xx protocol (e.g. eth/66) and will be exposed to the internet on 30303. +Port 9091 is for internal gRCP connections (e.g erigon -> sentry). #### Other ports -| Port | Protocol | Purpose | Expose | +| Port | Protocol | Purpose | Expose | |:----:|:--------:|:-------:|:-------:| -| 6060 | TCP | pprof | Private | -| 6060 | TCP | metrics | Private | +| 6060 | TCP | pprof | Private | +| 6060 | TCP | metrics | Private | Optional flags can be enabled that enable pprof or metrics (or both) - however, they both run on 6060 by default, so you'll have to change one if you want to run both at the same time. use `--help` with the binary for more info. @@ -508,13 +537,16 @@ Reserved for future use: **gRPC ports**: `9092` consensus engine, `9093` snapsho run `go tool pprof -inuse_space -png http://127.0.0.1:6060/debug/pprof/heap > mem.png` ### How to run local devnet? + 🔬 Detailed explanation is [here](/DEV_CHAIN.md). ### Docker permissions error Docker uses user erigon with UID/GID 1000 (for security reasons). You can see this user being created in the Dockerfile. -Can fix by giving a host's user ownership of the folder, where the host's user UID/GID is the same as the docker's user UID/GID (1000). -More details in [post](https://www.fullstaq.com/knowledge-hub/blogs/docker-and-the-host-filesystem-owner-matching-problem) +Can fix by giving a host's user ownership of the folder, where the host's user UID/GID is the same as the docker's user +UID/GID (1000). +More details +in [post](https://www.fullstaq.com/knowledge-hub/blogs/docker-and-the-host-filesystem-owner-matching-problem) ### Run RaspberyPI @@ -597,8 +629,10 @@ memory. **Warning:** Multiple instances of Erigon on same machine will touch Disk concurrently, it impacts performance - one of main Erigon optimisations: "reduce Disk random access". -"Blocks Execution stage" still does many random reads - this is reason why it's slowest stage. We do not recommend running -multiple genesis syncs on same Disk. If genesis sync passed, then it's fine to run multiple Erigon instances on same Disk. +"Blocks Execution stage" still does many random reads - this is reason why it's slowest stage. We do not recommend +running +multiple genesis syncs on same Disk. If genesis sync passed, then it's fine to run multiple Erigon instances on same +Disk. ### Blocks Execution is slow on cloud-network-drives @@ -617,6 +651,7 @@ For example: btrfs's autodefrag option - may increase write IO 100x times ### the --mount option requires BuildKit error For anyone else that was getting the BuildKit error when trying to start Erigon the old way you can use the below... + ``` XDG_DATA_HOME=/preferred/data/folder DOCKER_BUILDKIT=1 COMPOSE_DOCKER_CLI_BUILD=1 make docker-compose ``` diff --git a/cmd/downloader/downloader/downloader_grpc_server.go b/cmd/downloader/downloader/downloader_grpc_server.go index b1f245e29..79d9e1185 100644 --- a/cmd/downloader/downloader/downloader_grpc_server.go +++ b/cmd/downloader/downloader/downloader_grpc_server.go @@ -127,10 +127,10 @@ func createMagnetLinkWithInfoHash(hash *prototypes.H160, torrentClient *torrent. return false, nil } infoHash := Proto2InfoHash(hash) - log.Debug("[downloader] downloading torrent and seg file", "hash", infoHash) + //log.Debug("[downloader] downloading torrent and seg file", "hash", infoHash) if _, ok := torrentClient.Torrent(infoHash); ok { - log.Debug("[downloader] torrent client related to hash found", "hash", infoHash) + //log.Debug("[downloader] torrent client related to hash found", "hash", infoHash) return true, nil } @@ -151,6 +151,6 @@ func createMagnetLinkWithInfoHash(hash *prototypes.H160, torrentClient *torrent. return } }(magnet.String()) - log.Debug("[downloader] downloaded both seg and torrent files", "hash", infoHash) + //log.Debug("[downloader] downloaded both seg and torrent files", "hash", infoHash) return false, nil } diff --git a/cmd/downloader/downloader/util.go b/cmd/downloader/downloader/util.go index 3493083d0..cc2a086b1 100644 --- a/cmd/downloader/downloader/util.go +++ b/cmd/downloader/downloader/util.go @@ -230,8 +230,7 @@ func BuildTorrentFilesIfNeed(ctx context.Context, snapDir string) ([]string, err defer i.Inc() defer sem.Release(1) defer wg.Done() - err = buildTorrentIfNeed(f, snapDir) - if err != nil { + if err := buildTorrentIfNeed(f, snapDir); err != nil { errs <- err } diff --git a/cmd/integration/commands/flags.go b/cmd/integration/commands/flags.go index d9a4bbe62..e8b420b6a 100644 --- a/cmd/integration/commands/flags.go +++ b/cmd/integration/commands/flags.go @@ -1,6 +1,7 @@ package commands import ( + "github.com/ledgerwatch/erigon/turbo/cli" "github.com/spf13/cobra" "github.com/ledgerwatch/erigon/cmd/utils" @@ -110,7 +111,7 @@ func withDataDir(cmd *cobra.Command) { } func withBatchSize(cmd *cobra.Command) { - cmd.Flags().StringVar(&batchSizeStr, "batchSize", "512M", "batch size for execution stage") + cmd.Flags().StringVar(&batchSizeStr, "batchSize", cli.BatchSizeFlag.Value, cli.BatchSizeFlag.Usage) } func withIntegrityChecks(cmd *cobra.Command) { diff --git a/cmd/integration/commands/stages.go b/cmd/integration/commands/stages.go index 41bec748d..a38de97ac 100644 --- a/cmd/integration/commands/stages.go +++ b/cmd/integration/commands/stages.go @@ -491,7 +491,8 @@ func stageHeaders(db kv.RwDB, ctx context.Context) error { } if reset { - if err := reset2.ResetBlocks(tx, sn, br); err != nil { + dirs := datadir.New(datadirCli) + if err := reset2.ResetBlocks(tx, db, sn, br, dirs.Tmp); err != nil { return err } return nil @@ -1168,7 +1169,7 @@ func allSnapshots(db kv.RoDB) (*snapshotsync.RoSnapshots, *libstate.Aggregator22 } _allSnapshotsSingleton.LogStat() db.View(context.Background(), func(tx kv.Tx) error { - _aggSingleton.LogStats(func(endTxNumMinimax uint64) uint64 { + _aggSingleton.LogStats(tx, func(endTxNumMinimax uint64) uint64 { _, histBlockNumProgress, _ := rawdb.TxNums.FindBlockNum(tx, endTxNumMinimax) return histBlockNumProgress }) diff --git a/cmd/rpcdaemon/cli/config.go b/cmd/rpcdaemon/cli/config.go index 53f86b8f0..fd66777ff 100644 --- a/cmd/rpcdaemon/cli/config.go +++ b/cmd/rpcdaemon/cli/config.go @@ -357,7 +357,7 @@ func RemoteServices(ctx context.Context, cfg httpcfg.HttpCfg, logger log.Logger, } db.View(context.Background(), func(tx kv.Tx) error { - agg.LogStats(func(endTxNumMinimax uint64) uint64 { + agg.LogStats(tx, func(endTxNumMinimax uint64) uint64 { _, histBlockNumProgress, _ := rawdb.TxNums.FindBlockNum(tx, endTxNumMinimax) return histBlockNumProgress }) @@ -381,7 +381,7 @@ func RemoteServices(ctx context.Context, cfg httpcfg.HttpCfg, logger log.Logger, log.Error("[Snapshots] reopen", "err", err) } else { db.View(context.Background(), func(tx kv.Tx) error { - agg.LogStats(func(endTxNumMinimax uint64) uint64 { + agg.LogStats(tx, func(endTxNumMinimax uint64) uint64 { _, histBlockNumProgress, _ := rawdb.TxNums.FindBlockNum(tx, endTxNumMinimax) return histBlockNumProgress }) @@ -437,16 +437,6 @@ func RemoteServices(ctx context.Context, cfg httpcfg.HttpCfg, logger log.Logger, }() ff = rpchelper.New(ctx, eth, txPool, mining, onNewSnapshot) - if cfg.WithDatadir { - dirs := datadir.New(cfg.DataDir) - if agg, err = libstate.NewAggregator22(dirs.SnapHistory, ethconfig.HistoryV3AggregationStep); err != nil { - return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("create aggregator: %w", err) - } - if err = agg.ReopenFiles(); err != nil { - return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("create aggregator: %w", err) - } - - } return db, borDb, eth, txPool, mining, stateCache, blockReader, ff, agg, err } diff --git a/cmd/rpcdaemon/commands/eth_receipts.go b/cmd/rpcdaemon/commands/eth_receipts.go index 8371e82ff..fee31b233 100644 --- a/cmd/rpcdaemon/commands/eth_receipts.go +++ b/cmd/rpcdaemon/commands/eth_receipts.go @@ -288,7 +288,7 @@ func (api *APIImpl) getLogsV3(ctx context.Context, tx kv.Tx, begin, end uint64, ac := api._agg.MakeContext() ac.SetTx(tx) - topicsBitmap, err := getTopicsBitmap2(ac, tx, crit.Topics, fromTxNum, toTxNum) + topicsBitmap, err := getTopicsBitmapV3(ac, tx, crit.Topics, fromTxNum, toTxNum) if err != nil { return nil, err } @@ -325,6 +325,7 @@ func (api *APIImpl) getLogsV3(ctx context.Context, tx kv.Tx, begin, end uint64, var lastRules *params.Rules stateReader := state.NewHistoryReader22(ac) stateReader.SetTx(tx) + //stateReader.SetTrace(true) iter := txNumbers.Iterator() chainConfig, err := api.chainConfig(tx) @@ -383,7 +384,7 @@ func (api *APIImpl) getLogsV3(ctx context.Context, tx kv.Tx, begin, end uint64, ibs.Prepare(txHash, lastBlockHash, txIndex) _, err = core.ApplyMessage(evm, msg, gp, true /* refunds */, false /* gasBailout */) if err != nil { - return nil, err + return nil, fmt.Errorf("%w: blockNum=%d, txNum=%d", err, blockNum, txNum) } rawLogs := ibs.GetLogs(txHash) var logIndex uint @@ -415,7 +416,7 @@ func (api *APIImpl) getLogsV3(ctx context.Context, tx kv.Tx, begin, end uint64, // {{}, {B}} matches any topic in first position AND B in second position // {{A}, {B}} matches topic A in first position AND B in second position // {{A, B}, {C, D}} matches topic (A OR B) in first position AND (C OR D) in second position -func getTopicsBitmap2(ac *libstate.Aggregator22Context, tx kv.Tx, topics [][]common.Hash, from, to uint64) (*roaring64.Bitmap, error) { +func getTopicsBitmapV3(ac *libstate.Aggregator22Context, tx kv.Tx, topics [][]common.Hash, from, to uint64) (*roaring64.Bitmap, error) { var result *roaring64.Bitmap for _, sub := range topics { var bitmapForORing roaring64.Bitmap diff --git a/cmd/state/exec3/state.go b/cmd/state/exec3/state.go index 14dba380c..b7fd24371 100644 --- a/cmd/state/exec3/state.go +++ b/cmd/state/exec3/state.go @@ -293,7 +293,7 @@ func (cr EpochReader) FindBeforeOrEqualNumber(number uint64) (blockNum uint64, b } func NewWorkersPool(lock sync.Locker, background bool, chainDb kv.RoDB, wg *sync.WaitGroup, rs *state.State22, blockReader services.FullBlockReader, chainConfig *params.ChainConfig, logger log.Logger, genesis *core.Genesis, engine consensus.Engine, workerCount int) (reconWorkers []*Worker22, resultCh chan *state.TxTask, clear func()) { - queueSize := workerCount * 32 + queueSize := workerCount * 64 reconWorkers = make([]*Worker22, workerCount) resultCh = make(chan *state.TxTask, queueSize) for i := 0; i < workerCount; i++ { diff --git a/cmd/state/exec3/state_recon.go b/cmd/state/exec3/state_recon.go index 8bbb7df65..013891998 100644 --- a/cmd/state/exec3/state_recon.go +++ b/cmd/state/exec3/state_recon.go @@ -359,7 +359,7 @@ func (rw *ReconWorker) runTxTask(txTask *state2.TxTask) { //fmt.Printf("txNum=%d, blockNum=%d, txIndex=%d, evm=%p\n", txTask.TxNum, txTask.BlockNum, txTask.TxIndex, vmenv) _, err = core.ApplyMessage(vmenv, msg, gp, true /* refunds */, false /* gasBailout */) if err != nil { - panic(fmt.Errorf("could not apply tx %d [%x] failed: %w", txTask.TxIndex, txHash, err)) + panic(fmt.Errorf("could not apply blockNum=%d, txIdx=%d [%x] failed: %w", txTask.BlockNum, txTask.TxIndex, txHash, err)) } if err = ibs.FinalizeTx(rules, noop); err != nil { panic(err) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index cd87b682c..a59f2490d 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -384,7 +384,7 @@ var ( DBReadConcurrencyFlag = cli.IntFlag{ Name: "db.read.concurrency", Usage: "Does limit amount of parallel db reads. Default: equal to GOMAXPROCS (or number of CPU)", - Value: cmp.Max(10, runtime.GOMAXPROCS(-1)*4), + Value: cmp.Max(10, runtime.GOMAXPROCS(-1)*8), } RpcAccessListFlag = cli.StringFlag{ Name: "rpc.accessList", diff --git a/core/rawdb/rawdbreset/reset_stages.go b/core/rawdb/rawdbreset/reset_stages.go index 4e0d4f226..36c35ea69 100644 --- a/core/rawdb/rawdbreset/reset_stages.go +++ b/core/rawdb/rawdbreset/reset_stages.go @@ -12,6 +12,7 @@ import ( "github.com/ledgerwatch/erigon/eth/stagedsync/stages" "github.com/ledgerwatch/erigon/turbo/services" "github.com/ledgerwatch/erigon/turbo/snapshotsync" + "github.com/ledgerwatch/log/v3" ) func ResetState(db kv.RwDB, ctx context.Context, chain string) error { @@ -44,7 +45,23 @@ func ResetState(db kv.RwDB, ctx context.Context, chain string) error { return nil } -func ResetBlocks(tx kv.RwTx, snapshots *snapshotsync.RoSnapshots, br services.HeaderAndCanonicalReader) error { +func ResetBlocks(tx kv.RwTx, db kv.RoDB, snapshots *snapshotsync.RoSnapshots, br services.HeaderAndCanonicalReader, tmpdir string) error { + go func() { //inverted read-ahead - to warmup data + _ = db.View(context.Background(), func(tx kv.Tx) error { + c, err := tx.Cursor(kv.EthTx) + if err != nil { + return err + } + defer c.Close() + for k, _, err := c.Last(); k != nil; k, _, err = c.Prev() { + if err != nil { + return err + } + } + return nil + }) + }() + // keep Genesis if err := rawdb.TruncateBlocks(context.Background(), tx, 1); err != nil { return err @@ -95,10 +112,13 @@ func ResetBlocks(tx kv.RwTx, snapshots *snapshotsync.RoSnapshots, br services.He } if snapshots != nil && snapshots.Cfg().Enabled && snapshots.BlocksAvailable() > 0 { - if err := stagedsync.FillDBFromSnapshots("fillind_db_from_snapshots", context.Background(), tx, "", snapshots, br); err != nil { + if err := stagedsync.FillDBFromSnapshots("fillind_db_from_snapshots", context.Background(), tx, tmpdir, snapshots, br); err != nil { return err } _ = stages.SaveStageProgress(tx, stages.Snapshots, snapshots.BlocksAvailable()) + _ = stages.SaveStageProgress(tx, stages.Headers, snapshots.BlocksAvailable()) + _ = stages.SaveStageProgress(tx, stages.Bodies, snapshots.BlocksAvailable()) + _ = stages.SaveStageProgress(tx, stages.Senders, snapshots.BlocksAvailable()) } return nil @@ -142,6 +162,7 @@ func ResetExec(tx kv.RwTx, chain string) (err error) { kv.Code, kv.PlainContractCode, kv.ContractCode, kv.IncarnationMap, } for _, b := range stateBuckets { + log.Info("Clear", "table", b) if err := tx.ClearBucket(b); err != nil { return err } @@ -165,6 +186,7 @@ func ResetExec(tx kv.RwTx, chain string) (err error) { kv.TracesToKeys, kv.TracesToIdx, } for _, b := range buckets { + log.Info("Clear", "table", b) if err := tx.ClearBucket(b); err != nil { return err } diff --git a/eth/backend.go b/eth/backend.go index 1d191e647..961a49b34 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -860,6 +860,9 @@ func (s *Ethereum) setUpBlockReader(ctx context.Context, dirs datadir.Dirs, snCo if err != nil { return nil, nil, nil, err } + if err = agg.ReopenFiles(); err != nil { + return nil, nil, nil, err + } return blockReader, allSnapshots, agg, nil } diff --git a/eth/stagedsync/exec3.go b/eth/stagedsync/exec3.go index 48eede0b1..7c741e1a5 100644 --- a/eth/stagedsync/exec3.go +++ b/eth/stagedsync/exec3.go @@ -31,7 +31,6 @@ import ( "github.com/ledgerwatch/erigon/node/nodecfg/datadir" "github.com/ledgerwatch/erigon/params" "github.com/ledgerwatch/erigon/turbo/services" - "github.com/ledgerwatch/erigon/turbo/snapshotsync" "github.com/ledgerwatch/log/v3" "golang.org/x/sync/semaphore" ) @@ -82,7 +81,6 @@ func (p *Progress) Log(logPrefix string, rs *state.State22, rws state.TxTaskQueu func Exec3(ctx context.Context, execStage *StageState, workerCount int, batchSize datasize.ByteSize, chainDb kv.RwDB, applyTx kv.RwTx, rs *state.State22, blockReader services.FullBlockReader, - allSnapshots *snapshotsync.RoSnapshots, logger log.Logger, agg *state2.Aggregator22, engine consensus.Engine, maxBlockNum uint64, chainConfig *params.ChainConfig, genesis *core.Genesis, @@ -96,6 +94,9 @@ func Exec3(ctx context.Context, } defer applyTx.Rollback() } + if !useExternalTx { + defer blockReader.(WithSnapshots).Snapshots().EnableMadvNormal().DisableReadAhead() + } var block, stageProgress uint64 var outputTxNum, inputTxNum, maxTxNum uint64 @@ -276,18 +277,18 @@ loop: if err != nil { return err } + if parallel { + func() { + rwsLock.Lock() + defer rwsLock.Unlock() + for rws.Len() > queueSize || atomic.LoadInt64(&resultsSize) >= resultsThreshold || rs.SizeEstimate() >= commitThreshold { + rwsReceiveCond.Wait() + } + }() + } txs := b.Transactions() for txIndex := -1; txIndex <= len(txs); txIndex++ { // Do not oversend, wait for the result heap to go under certain size - if parallel { - func() { - rwsLock.Lock() - defer rwsLock.Unlock() - for rws.Len() > queueSize || atomic.LoadInt64(&resultsSize) >= resultsThreshold || rs.SizeEstimate() >= commitThreshold { - rwsReceiveCond.Wait() - } - }() - } txTask := &state.TxTask{ BlockNum: blockNum, Rules: rules, @@ -332,41 +333,39 @@ loop: } stageProgress = blockNum - - if txTask.Final && rs.SizeEstimate() >= commitThreshold { - commitStart := time.Now() - log.Info("Committing...") - if err := rs.Flush(applyTx); err != nil { - return err - } - if !useExternalTx { - if err = execStage.Update(applyTx, stageProgress); err != nil { - return err - } - applyTx.CollectMetrics() - if err := applyTx.Commit(); err != nil { - return err - } - if applyTx, err = chainDb.BeginRw(ctx); err != nil { - return err - } - defer applyTx.Rollback() - agg.SetTx(applyTx) - reconWorkers[0].ResetTx(applyTx) - log.Info("Committed", "time", time.Since(commitStart), "toProgress", stageProgress) - } - } - - select { - case <-logEvery.C: - progress.Log(execStage.LogPrefix(), rs, rws, count, inputBlockNum, outputBlockNum, repeatCount, uint64(atomic.LoadInt64(&resultsSize)), resultCh) - default: - } } + inputTxNum++ } + + if rs.SizeEstimate() >= commitThreshold { + commitStart := time.Now() + log.Info("Committing...") + if err := rs.Flush(applyTx); err != nil { + return err + } + if !useExternalTx { + if err = execStage.Update(applyTx, stageProgress); err != nil { + return err + } + applyTx.CollectMetrics() + if err := applyTx.Commit(); err != nil { + return err + } + if applyTx, err = chainDb.BeginRw(ctx); err != nil { + return err + } + defer applyTx.Rollback() + agg.SetTx(applyTx) + reconWorkers[0].ResetTx(applyTx) + log.Info("Committed", "time", time.Since(commitStart), "toProgress", stageProgress) + } + } + // Check for interrupts select { + case <-logEvery.C: + progress.Log(execStage.LogPrefix(), rs, rws, count, inputBlockNum, outputBlockNum, repeatCount, uint64(atomic.LoadInt64(&resultsSize)), resultCh) case <-interruptCh: log.Info(fmt.Sprintf("interrupted, please wait for cleanup, next run will start with block %d", blockNum)) atomic.StoreUint64(&maxTxNum, inputTxNum) @@ -443,14 +442,20 @@ func processResultQueue(rws *state.TxTaskQueue, outputTxNum *uint64, rs *state.S } } -func ReconstituteState(ctx context.Context, s *StageState, dirs datadir.Dirs, workerCount int, chainDb kv.RwDB, +func ReconstituteState(ctx context.Context, s *StageState, dirs datadir.Dirs, workerCount int, batchSize datasize.ByteSize, chainDb kv.RwDB, blockReader services.FullBlockReader, logger log.Logger, agg *state2.Aggregator22, engine consensus.Engine, chainConfig *params.ChainConfig, genesis *core.Genesis) (err error) { + defer agg.EnableMadvNormal().DisableReadAhead() + reconDbPath := filepath.Join(dirs.DataDir, "recondb") dir.Recreate(reconDbPath) limiterB := semaphore.NewWeighted(int64(runtime.NumCPU()*2 + 1)) - db, err := kv2.NewMDBX(log.New()).Path(reconDbPath).RoTxsLimiter(limiterB).WriteMap().WithTableCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { return kv.ReconTablesCfg }).Open() + db, err := kv2.NewMDBX(log.New()).Path(reconDbPath).RoTxsLimiter(limiterB). + WriteMergeThreshold(8192). + PageSize(uint64(16 * datasize.KB)). + WriteMap().WithTableCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { return kv.ReconTablesCfg }). + Open() if err != nil { return err } @@ -490,7 +495,7 @@ func ReconstituteState(ctx context.Context, s *StageState, dirs datadir.Dirs, wo fmt.Printf("Corresponding block num = %d, txNum = %d\n", blockNum, txNum) var wg sync.WaitGroup - workCh := make(chan *state.TxTask, 128) + workCh := make(chan *state.TxTask, workerCount*64) rs := state.NewReconState(workCh) var fromKey, toKey []byte bigCount := big.NewInt(int64(workerCount)) @@ -552,9 +557,7 @@ func ReconstituteState(ctx context.Context, s *StageState, dirs datadir.Dirs, wo accountCollectorsX[i] = nil } if err = db.Update(ctx, func(tx kv.RwTx) error { - return accountCollectorX.Load(nil, "", func(k, v []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error { - return tx.Put(kv.XAccount, k, v) - }, etl.TransformArgs{}) + return accountCollectorX.Load(tx, kv.XAccount, etl.IdentityLoadFunc, etl.TransformArgs{}) }); err != nil { return err } @@ -599,9 +602,7 @@ func ReconstituteState(ctx context.Context, s *StageState, dirs datadir.Dirs, wo storageCollectorsX[i] = nil } if err = db.Update(ctx, func(tx kv.RwTx) error { - return storageCollectorX.Load(nil, "", func(k, v []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error { - return tx.Put(kv.XStorage, k, v) - }, etl.TransformArgs{}) + return storageCollectorX.Load(tx, kv.XStorage, etl.IdentityLoadFunc, etl.TransformArgs{}) }); err != nil { return err } @@ -645,9 +646,7 @@ func ReconstituteState(ctx context.Context, s *StageState, dirs datadir.Dirs, wo codeCollectorsX[i] = nil } if err = db.Update(ctx, func(tx kv.RwTx) error { - return codeCollectorX.Load(nil, "", func(k, v []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error { - return tx.Put(kv.XCode, k, v) - }, etl.TransformArgs{}) + return codeCollectorX.Load(tx, kv.XCode, etl.IdentityLoadFunc, etl.TransformArgs{}) }); err != nil { return err } @@ -687,7 +686,7 @@ func ReconstituteState(ctx context.Context, s *StageState, dirs datadir.Dirs, wo for i := 0; i < workerCount; i++ { go reconWorkers[i].Run() } - commitThreshold := uint64(256 * 1024 * 1024) + commitThreshold := batchSize.Bytes() * 4 prevCount := uint64(0) prevRollbackCount := uint64(0) prevTime := time.Now() @@ -715,7 +714,7 @@ func ReconstituteState(ctx context.Context, s *StageState, dirs datadir.Dirs, wo prevCount = count prevRollbackCount = rollbackCount log.Info("State reconstitution", "workers", workerCount, "progress", fmt.Sprintf("%.2f%%", progress), - "tx/s", fmt.Sprintf("%.1f", speedTx), + "tx/s", fmt.Sprintf("%.1f", speedTx), "workCh", fmt.Sprintf("%d/%d", len(workCh), cap(workCh)), "repeat ratio", fmt.Sprintf("%.2f%%", repeatRatio), "buffer", fmt.Sprintf("%s/%s", common.ByteCount(sizeEstimate), common.ByteCount(commitThreshold)), "alloc", common.ByteCount(m.Alloc), "sys", common.ByteCount(m.Sys)) @@ -749,6 +748,9 @@ func ReconstituteState(ctx context.Context, s *StageState, dirs datadir.Dirs, wo } } }() + + defer blockReader.(WithSnapshots).Snapshots().EnableReadAhead().DisableReadAhead() + var inputTxNum uint64 var b *types.Block var txKey [8]byte diff --git a/eth/stagedsync/stage.go b/eth/stagedsync/stage.go index 88c008f6a..37994bac0 100644 --- a/eth/stagedsync/stage.go +++ b/eth/stagedsync/stage.go @@ -48,6 +48,9 @@ func (s *StageState) LogPrefix() string { return s.state.LogPrefix() } // Update updates the stage state (current block number) in the database. Can be called multiple times during stage execution. func (s *StageState) Update(db kv.Putter, newBlockNum uint64) error { + if s.ID == stages.Execution && newBlockNum == 0 { + panic(newBlockNum) + } if m, ok := syncMetrics[s.ID]; ok { m.Set(newBlockNum) } diff --git a/eth/stagedsync/stage_execute.go b/eth/stagedsync/stage_execute.go index 154bdce7e..fbe3f7195 100644 --- a/eth/stagedsync/stage_execute.go +++ b/eth/stagedsync/stage_execute.go @@ -92,7 +92,7 @@ func StageExecuteBlocksCfg( stateStream bool, badBlockHalt bool, - exec22 bool, + historyV3 bool, dirs datadir.Dirs, blockReader services.FullBlockReader, hd *headerdownload.HeaderDownload, @@ -115,7 +115,7 @@ func StageExecuteBlocksCfg( blockReader: blockReader, hd: hd, genesis: genesis, - historyV3: exec22, + historyV3: historyV3, workersCount: workersCount, agg: agg, } @@ -247,15 +247,15 @@ func ExecBlock22(s *StageState, u Unwinder, tx kv.RwTx, toBlock uint64, ctx cont } cfg.agg.SetWorkers(cmp.Max(1, runtime.NumCPU()-1)) - allSnapshots := cfg.blockReader.(WithSnapshots).Snapshots() - if initialCycle { + if initialCycle && s.BlockNumber == 0 { reconstituteToBlock, found, err := reconstituteBlock(cfg.agg, cfg.db, tx) if err != nil { return err } if found && reconstituteToBlock > s.BlockNumber+1 { - if err := ReconstituteState(execCtx, s, cfg.dirs, workersCount, cfg.db, cfg.blockReader, log.New(), cfg.agg, cfg.engine, cfg.chainConfig, cfg.genesis); err != nil { + log.Info(fmt.Sprintf("[%s] Blocks execution, reconstitution", s.LogPrefix()), "from", s.BlockNumber, "to", reconstituteToBlock) + if err := ReconstituteState(execCtx, s, cfg.dirs, workersCount, cfg.batchSize, cfg.db, cfg.blockReader, log.New(), cfg.agg, cfg.engine, cfg.chainConfig, cfg.genesis); err != nil { return err } } @@ -279,7 +279,7 @@ func ExecBlock22(s *StageState, u Unwinder, tx kv.RwTx, toBlock uint64, ctx cont } rs := state.NewState22() if err := Exec3(execCtx, s, workersCount, cfg.batchSize, cfg.db, tx, rs, - cfg.blockReader, allSnapshots, log.New(), cfg.agg, cfg.engine, + cfg.blockReader, log.New(), cfg.agg, cfg.engine, to, cfg.chainConfig, cfg.genesis); err != nil { return err diff --git a/eth/stagedsync/stage_senders.go b/eth/stagedsync/stage_senders.go index 9d63fa42a..7e0174ad4 100644 --- a/eth/stagedsync/stage_senders.go +++ b/eth/stagedsync/stage_senders.go @@ -166,7 +166,7 @@ func SpawnRecoverSendersStage(cfg SendersCfg, s *StageState, u Unwinder, tx kv.R if j != nil { n += uint64(j.index) } - log.Info(fmt.Sprintf("[%s] Recovery", logPrefix), "block_number", n) + log.Info(fmt.Sprintf("[%s] Recovery", logPrefix), "block_number", n, "ch", fmt.Sprintf("%d/%d", len(jobs), cap(jobs))) case j, ok = <-out: if !ok { return diff --git a/eth/stagedsync/stage_snapshots.go b/eth/stagedsync/stage_snapshots.go index b1095fd58..b76b2ea26 100644 --- a/eth/stagedsync/stage_snapshots.go +++ b/eth/stagedsync/stage_snapshots.go @@ -120,7 +120,7 @@ func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.R } cfg.snapshots.LogStat() - cfg.agg.LogStats(func(endTxNumMinimax uint64) uint64 { + cfg.agg.LogStats(tx, func(endTxNumMinimax uint64) uint64 { _, histBlockNumProgress, _ := rawdb.TxNums.FindBlockNum(tx, endTxNumMinimax) return histBlockNumProgress }) diff --git a/go.mod b/go.mod index 8dbf3ec96..badf90bde 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.18 require ( github.com/gballet/go-verkle v0.0.0-20220923150140-6c08cd337774 - github.com/ledgerwatch/erigon-lib v0.0.0-20221004095151-0d6bc2eca418 + github.com/ledgerwatch/erigon-lib v0.0.0-20221005081654-85faee1c1817 github.com/ledgerwatch/erigon-snapshot v1.0.1-0.20220913092204-de54ee30c7b9 github.com/ledgerwatch/log/v3 v3.4.2 github.com/ledgerwatch/secp256k1 v1.0.0 @@ -73,7 +73,7 @@ require ( github.com/xsleonard/go-merkle v1.1.0 go.uber.org/atomic v1.10.0 golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d - golang.org/x/exp v0.0.0-20220916125017-b168a2c6b86b + golang.org/x/exp v0.0.0-20220921164117-439092de6870 golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 golang.org/x/sys v0.0.0-20220919091848-fb04ddd9f9c8 golang.org/x/time v0.0.0-20220609170525-579cf78fd858 diff --git a/go.sum b/go.sum index 7a930406d..0dc26aeb2 100644 --- a/go.sum +++ b/go.sum @@ -561,8 +561,8 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758 h1:0D5M2HQSGD3PYPwICLl+/9oulQauOuETfgFvhBDffs0= github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c= github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8= -github.com/ledgerwatch/erigon-lib v0.0.0-20221004095151-0d6bc2eca418 h1:Kf/A7uhkGlvTuVLhawFFYc9V9Y5SO7Xbn38TgOCwNiI= -github.com/ledgerwatch/erigon-lib v0.0.0-20221004095151-0d6bc2eca418/go.mod h1:nMyol2RVCPF0TSR/VU4bK40JB35R/Q+IdodGW8j1kzU= +github.com/ledgerwatch/erigon-lib v0.0.0-20221005081654-85faee1c1817 h1:9GMsCin3fT1cUX4rwx3HaNkMmlETgQJqwJIp/9+W7eY= +github.com/ledgerwatch/erigon-lib v0.0.0-20221005081654-85faee1c1817/go.mod h1:YDP7ECNyjKo1dE7J5n8GXKBIYOWnmchvGCfALuwhBQg= github.com/ledgerwatch/erigon-snapshot v1.0.1-0.20220913092204-de54ee30c7b9 h1:iWjzYLtOsp/Wpo9ZWV/eMIlnFzk8bm7POSzrXAILw24= github.com/ledgerwatch/erigon-snapshot v1.0.1-0.20220913092204-de54ee30c7b9/go.mod h1:3AuPxZc85jkehh/HA9h8gabv5MSi3kb/ddtzBsTVJFo= github.com/ledgerwatch/log/v3 v3.4.2 h1:chvjB7c100rlIFgPv+Col2eerxIrHL88OiZRuPZDkxw= @@ -1052,8 +1052,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= -golang.org/x/exp v0.0.0-20220916125017-b168a2c6b86b h1:SCE/18RnFsLrjydh/R/s5EVvHoZprqEQUuoxK8q2Pc4= -golang.org/x/exp v0.0.0-20220916125017-b168a2c6b86b/go.mod h1:cyybsKvd6eL0RnXn6p/Grxp8F5bW7iYuBgsNCOHpMYE= +golang.org/x/exp v0.0.0-20220921164117-439092de6870 h1:j8b6j9gzSigH28O5SjSpQSSh9lFd6f5D/q0aHjNTulc= +golang.org/x/exp v0.0.0-20220921164117-439092de6870/go.mod h1:cyybsKvd6eL0RnXn6p/Grxp8F5bW7iYuBgsNCOHpMYE= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= diff --git a/migrations/reset_blocks.go b/migrations/reset_blocks.go index cfc1ce6ad..c42ed0fb7 100644 --- a/migrations/reset_blocks.go +++ b/migrations/reset_blocks.go @@ -110,7 +110,7 @@ var resetBlocks4 = Migration{ log.Warn("NOTE: this migration will remove recent blocks (and senders) to fix several recent bugs. Your node will re-download last ~400K blocks, should not take very long") } - if err := rawdbreset.ResetBlocks(tx, nil, nil); err != nil { + if err := rawdbreset.ResetBlocks(tx, db, nil, nil, dirs.Tmp); err != nil { return err } diff --git a/node/node.go b/node/node.go index 4d6237508..b4700321b 100644 --- a/node/node.go +++ b/node/node.go @@ -320,7 +320,7 @@ func OpenDatabase(config *nodecfg.Config, logger log.Logger, label kv.Label) (kv var openFunc func(exclusive bool) (kv.RwDB, error) log.Info("Opening Database", "label", name, "path", dbPath) openFunc = func(exclusive bool) (kv.RwDB, error) { - roTxLimit := int64(16) + roTxLimit := int64(32) if config.Http.DBReadConcurrency > 0 { roTxLimit = int64(config.Http.DBReadConcurrency) } diff --git a/tests/block_test.go b/tests/block_test.go index d2ab4efa4..c726acf17 100644 --- a/tests/block_test.go +++ b/tests/block_test.go @@ -43,7 +43,9 @@ func TestBlockchain(t *testing.T) { bt.skipLoad(`^TransitionTests/bcArrowGlacierToMerge/powToPosBlockRejection\.json`) if ethconfig.EnableHistoryV3InTest { // HistoryV3: doesn't produce receipts on execution by design - bt.skipLoad(`^TestBlockchain/InvalidBlocks/bcInvalidHeaderTest/log1_wrongBloom.json`) + bt.skipLoad(`^TestBlockchain/InvalidBlocks/bcInvalidHeaderTest/log1_wrongBloom\.json`) + bt.skipLoad(`^TestBlockchain/InvalidBlocks/bcInvalidHeaderTest/wrongReceiptTrie\.json`) + //bt.skipLoad(`^TestBlockchain/InvalidBlocks/bcInvalidHeaderTest/wrongGasUsed\.json`) } bt.walk(t, blockTestDir, func(t *testing.T, name string, test *BlockTest) { diff --git a/turbo/app/snapshots.go b/turbo/app/snapshots.go index 9e445f63a..0cad5d633 100644 --- a/turbo/app/snapshots.go +++ b/turbo/app/snapshots.go @@ -8,9 +8,11 @@ import ( "fmt" "io" "os" + "path/filepath" "runtime" "time" + "github.com/c2h5oh/datasize" "github.com/holiman/uint256" "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/common/cmp" @@ -211,10 +213,11 @@ func doIndicesCommand(cliCtx *cli.Context) error { workers := cmp.Max(1, runtime.GOMAXPROCS(-1)-1) if rebuild { - cfg := ethconfig.NewSnapCfg(true, true, false) - if err := rebuildIndices("Indexing", ctx, chainDB, cfg, dirs, from, workers); err != nil { - log.Error("Error", "err", err) - } + panic("not implemented") + } + cfg := ethconfig.NewSnapCfg(true, true, false) + if err := rebuildIndices("Indexing", ctx, chainDB, cfg, dirs, from, workers); err != nil { + log.Error("Error", "err", err) } agg, err := libstate.NewAggregator22(dirs.SnapHistory, ethconfig.HistoryV3AggregationStep) if err != nil { @@ -248,13 +251,17 @@ func doUncompress(cliCtx *cli.Context) error { return err } defer decompressor.Close() - wr := bufio.NewWriterSize(os.Stdout, 512*1024*1024) + wr := bufio.NewWriterSize(os.Stdout, int(128*datasize.MB)) defer wr.Flush() + logEvery := time.NewTicker(30 * time.Second) + defer logEvery.Stop() + + var i uint var numBuf [binary.MaxVarintLen64]byte defer decompressor.EnableReadAhead().DisableReadAhead() g := decompressor.MakeGetter() - buf := make([]byte, 0, 16*etl.BufIOSize) + buf := make([]byte, 0, 1*datasize.MB) for g.HasNext() { buf, _ = g.Next(buf[:0]) n := binary.PutUvarint(numBuf[:], uint64(len(buf))) @@ -264,7 +271,12 @@ func doUncompress(cliCtx *cli.Context) error { if _, err := wr.Write(buf); err != nil { return err } + i++ select { + case <-logEvery.C: + _, fileName := filepath.Split(decompressor.FilePath()) + progress := 100 * float64(i) / float64(decompressor.Count()) + log.Info("[uncompress] ", "progress", fmt.Sprintf("%.2f%%", progress), "file", fileName) case <-ctx.Done(): return ctx.Err() default: @@ -281,18 +293,14 @@ func doCompress(cliCtx *cli.Context) error { } f := args[0] dirs := datadir.New(cliCtx.String(utils.DataDirFlag.Name)) - workers := runtime.GOMAXPROCS(-1) - 1 - if workers < 1 { - workers = 1 - } - c, err := compress.NewCompressor(ctx, "", f, dirs.Tmp, compress.MinPatternScore, workers, log.LvlInfo) + workers := cmp.Max(1, runtime.GOMAXPROCS(-1)-1) + c, err := compress.NewCompressor(ctx, "compress", f, dirs.Tmp, compress.MinPatternScore, workers, log.LvlInfo) if err != nil { return err } defer c.Close() - - r := bufio.NewReaderSize(os.Stdin, 512*1024*1024) - buf := make([]byte, 0, 32*1024*1024) + r := bufio.NewReaderSize(os.Stdin, int(128*datasize.MB)) + buf := make([]byte, 0, int(1*datasize.MB)) var l uint64 for l, err = binary.ReadUvarint(r); err == nil; l, err = binary.ReadUvarint(r) { if cap(buf) < int(l) { @@ -351,8 +359,11 @@ func doRetireCommand(cliCtx *cli.Context) error { if err := rawdb.WriteSnapshots(tx, br.Snapshots().Files()); err != nil { return err } - if err := br.PruneAncientBlocks(tx); err != nil { - return err + log.Info("prune blocks from db\n") + for j := 0; j < 10_000; j++ { // prune happens by small steps, so need many runs + if err := br.PruneAncientBlocks(tx); err != nil { + return err + } } return nil }); err != nil { @@ -406,6 +417,7 @@ func rebuildIndices(logPrefix string, ctx context.Context, db kv.RoDB, cfg ethco if err := allSnapshots.ReopenFolder(); err != nil { return err } + allSnapshots.LogStat() if err := snapshotsync.BuildMissedIndices(logPrefix, ctx, dirs, *chainID, workers); err != nil { return err diff --git a/turbo/snapshotsync/block_snapshots.go b/turbo/snapshotsync/block_snapshots.go index 6600c6b5e..6f76adc3d 100644 --- a/turbo/snapshotsync/block_snapshots.go +++ b/turbo/snapshotsync/block_snapshots.go @@ -820,6 +820,8 @@ func (s *RoSnapshots) ViewTxs(blockNum uint64, f func(sn *TxnSegment) error) (fo } func buildIdx(ctx context.Context, sn snap.FileInfo, chainID uint256.Int, tmpDir string, p *background.Progress, lvl log.Lvl) error { + _, fName := filepath.Split(sn.Path) + log.Debug("[snapshots] build idx", "file", fName) switch sn.T { case snap.Headers: if err := HeadersIdx(ctx, sn.Path, sn.From, tmpDir, p, lvl); err != nil {