diff --git a/cmd/rpcdaemon/cli/config.go b/cmd/rpcdaemon/cli/config.go index 01074716d..9a5c7538d 100644 --- a/cmd/rpcdaemon/cli/config.go +++ b/cmd/rpcdaemon/cli/config.go @@ -352,9 +352,7 @@ func RemoteServices(ctx context.Context, cfg httpcfg.HttpCfg, logger log.Logger, if agg, err = libstate.NewAggregator22(cfg.Dirs.SnapHistory, ethconfig.HistoryV3AggregationStep, db); err != nil { return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("create aggregator: %w", err) } - if err = agg.ReopenFiles(); err != nil { - return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("create aggregator: %w", err) - } + _ = agg.ReopenFiles() db.View(context.Background(), func(tx kv.Tx) error { agg.LogStats(tx, func(endTxNumMinimax uint64) uint64 { diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go index 7e610a459..2ba5c3b18 100644 --- a/core/rawdb/accessors_chain.go +++ b/core/rawdb/accessors_chain.go @@ -1629,32 +1629,38 @@ func IsPosBlock(db kv.Getter, blockHash common.Hash) (trans bool, err error) { var SnapshotsKey = []byte("snapshots") var SnapshotsHistoryKey = []byte("snapshots_history") -func ReadSnapshots(tx kv.Tx) ([]string, error) { +func ReadSnapshots(tx kv.Tx) ([]string, []string, error) { v, err := tx.GetOne(kv.DatabaseInfo, SnapshotsKey) if err != nil { - return nil, err + return nil, nil, err } - var res []string + var res, resHist []string _ = json.Unmarshal(v, &res) - return res, nil -} -func ReadHistorySnapshots(tx kv.Tx) ([]string, error) { - v, err := tx.GetOne(kv.DatabaseInfo, SnapshotsHistoryKey) + v, err = tx.GetOne(kv.DatabaseInfo, SnapshotsHistoryKey) if err != nil { - return nil, err + return nil, nil, err } - var res []string - _ = json.Unmarshal(v, &res) - return res, nil + _ = json.Unmarshal(v, &resHist) + return res, resHist, nil } -func WriteSnapshots(tx kv.RwTx, list []string) error { +func WriteSnapshots(tx kv.RwTx, list, histList []string) error { res, err := json.Marshal(list) if err != nil { return err } - return tx.Put(kv.DatabaseInfo, SnapshotsKey, res) + if err := tx.Put(kv.DatabaseInfo, SnapshotsKey, res); err != nil { + return err + } + res, err = json.Marshal(histList) + if err != nil { + return err + } + if err := tx.Put(kv.DatabaseInfo, SnapshotsHistoryKey, res); err != nil { + return err + } + return nil } func WriteHistorySnapshots(tx kv.RwTx, list []string) error { res, err := json.Marshal(list) diff --git a/docker-compose.yml b/docker-compose.yml index 7c59558c1..a24c46f76 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -67,7 +67,7 @@ services: prometheus: - image: prom/prometheus:v2.37.1 + image: prom/prometheus:v2.39.1 user: ${DOCKER_UID:-1000}:${DOCKER_GID:-1000} # Uses erigon user from Dockerfile command: --log.level=warn --config.file=/etc/prometheus/prometheus.yml --storage.tsdb.path=/prometheus --storage.tsdb.retention.time=150d --web.console.libraries=/usr/share/prometheus/console_libraries --web.console.templates=/usr/share/prometheus/consoles ports: [ "9090:9090" ] @@ -77,7 +77,7 @@ services: restart: unless-stopped grafana: - image: grafana/grafana:9.1.6 + image: grafana/grafana:9.2.0 user: "472:0" # required for grafana version >= 7.3 ports: [ "3000:3000" ] volumes: diff --git a/eth/stagedsync/stage_snapshots.go b/eth/stagedsync/stage_snapshots.go index e28208cfd..6b030a61b 100644 --- a/eth/stagedsync/stage_snapshots.go +++ b/eth/stagedsync/stage_snapshots.go @@ -310,7 +310,7 @@ func WaitForDownloader(s *StageState, ctx context.Context, cfg SnapshotsCfg, tx return nil } - snInDB, err := rawdb.ReadSnapshots(tx) + snInDB, snHistInDB, err := rawdb.ReadSnapshots(tx) if err != nil { return err } @@ -325,10 +325,6 @@ func WaitForDownloader(s *StageState, ctx context.Context, cfg SnapshotsCfg, tx if len(missingSnapshots) > 0 { log.Warn(fmt.Sprintf("[%s] downloading missing snapshots", s.LogPrefix())) } - snHistInDB, err := rawdb.ReadHistorySnapshots(tx) - if err != nil { - return err - } // send all hashes to the Downloader service preverifiedBlockSnapshots := snapcfg.KnownCfg(cfg.chainConfig.ChainName, snInDB, snHistInDB).Preverified @@ -430,7 +426,7 @@ Finish: return err } - if err := rawdb.WriteSnapshots(tx, cfg.snapshots.Files()); err != nil { + if err := rawdb.WriteSnapshots(tx, cfg.snapshots.Files(), cfg.agg.Files()); err != nil { return err } if cfg.dbEventNotifier != nil { // can notify right here, even that write txn is not commit @@ -452,15 +448,14 @@ Finish: func calculateTime(amountLeft, rate uint64) string { if rate == 0 { - return "999hrs:99m:99s" + return "999hrs:99m" } timeLeftInSeconds := amountLeft / rate hours := timeLeftInSeconds / 3600 minutes := (timeLeftInSeconds / 60) % 60 - seconds := timeLeftInSeconds % 60 - return fmt.Sprintf("%dhrs:%dm:%ds", hours, minutes, seconds) + return fmt.Sprintf("%dhrs:%dm", hours, minutes) } /* ====== PRUNING ====== */ @@ -482,7 +477,7 @@ func SnapshotsPrune(s *PruneState, cfg SnapshotsCfg, ctx context.Context, tx kv. return err } - if err := retireBlocksInSingleBackgroundThread(s, cfg.blockRetire, ctx, tx); err != nil { + if err := retireBlocksInSingleBackgroundThread(s, cfg.blockRetire, cfg.agg, ctx, tx); err != nil { return fmt.Errorf("retireBlocksInSingleBackgroundThread: %w", err) } } @@ -497,7 +492,7 @@ func SnapshotsPrune(s *PruneState, cfg SnapshotsCfg, ctx context.Context, tx kv. } // retiring blocks in a single thread in the brackground -func retireBlocksInSingleBackgroundThread(s *PruneState, blockRetire *snapshotsync.BlockRetire, ctx context.Context, tx kv.RwTx) (err error) { +func retireBlocksInSingleBackgroundThread(s *PruneState, blockRetire *snapshotsync.BlockRetire, agg *state.Aggregator22, ctx context.Context, tx kv.RwTx) (err error) { // if something already happens in background - noop if blockRetire.Working() { return nil @@ -507,7 +502,7 @@ func retireBlocksInSingleBackgroundThread(s *PruneState, blockRetire *snapshotsy return fmt.Errorf("[%s] %w", s.LogPrefix(), err) } if ok { - if err := rawdb.WriteSnapshots(tx, blockRetire.Snapshots().Files()); err != nil { + if err := rawdb.WriteSnapshots(tx, blockRetire.Snapshots().Files(), agg.Files()); err != nil { return err } } diff --git a/go.mod b/go.mod index 4335628c1..aab4ccc0c 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.18 require ( github.com/gballet/go-verkle v0.0.0-20220829125900-a702d458d33c - github.com/ledgerwatch/erigon-lib v0.0.0-20221012183632-cfd89757dbff + github.com/ledgerwatch/erigon-lib v0.0.0-20221013021436-af62ccaca9cc github.com/ledgerwatch/erigon-snapshot v1.0.1-0.20221012092130-0962bd35abe1 github.com/ledgerwatch/log/v3 v3.4.2 github.com/ledgerwatch/secp256k1 v1.0.0 diff --git a/go.sum b/go.sum index a46357d76..ca29c7257 100644 --- a/go.sum +++ b/go.sum @@ -563,8 +563,8 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758 h1:0D5M2HQSGD3PYPwICLl+/9oulQauOuETfgFvhBDffs0= github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c= github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8= -github.com/ledgerwatch/erigon-lib v0.0.0-20221012183632-cfd89757dbff h1:qbBv5CevdfZiRkArXXwjkCNRwN5RTGNX2OIj17zmIFo= -github.com/ledgerwatch/erigon-lib v0.0.0-20221012183632-cfd89757dbff/go.mod h1:GT9MS8/L8VEWWJFsyJ3s5uhGyR4+rtws8KBgrWyrOwY= +github.com/ledgerwatch/erigon-lib v0.0.0-20221013021436-af62ccaca9cc h1:pO9T6xQZum5FYmQgBq3WtnM1jpNzCkxfQ/Z4rmGXKmo= +github.com/ledgerwatch/erigon-lib v0.0.0-20221013021436-af62ccaca9cc/go.mod h1:GT9MS8/L8VEWWJFsyJ3s5uhGyR4+rtws8KBgrWyrOwY= github.com/ledgerwatch/erigon-snapshot v1.0.1-0.20221012092130-0962bd35abe1 h1:UZFTr9Zee9JdjVch0TEQsBd9fI4xIMQXyB3DjgKT1gw= github.com/ledgerwatch/erigon-snapshot v1.0.1-0.20221012092130-0962bd35abe1/go.mod h1:3AuPxZc85jkehh/HA9h8gabv5MSi3kb/ddtzBsTVJFo= github.com/ledgerwatch/log/v3 v3.4.2 h1:chvjB7c100rlIFgPv+Col2eerxIrHL88OiZRuPZDkxw= diff --git a/turbo/app/snapshots.go b/turbo/app/snapshots.go index 410c05e55..d50ef042e 100644 --- a/turbo/app/snapshots.go +++ b/turbo/app/snapshots.go @@ -347,8 +347,17 @@ func doRetireCommand(cliCtx *cli.Context) error { return err } - workers := cmp.Max(1, runtime.GOMAXPROCS(-1)-1) - br := snapshotsync.NewBlockRetire(workers, dirs.Tmp, snapshots, db, nil, nil) + br := snapshotsync.NewBlockRetire(estimate.CompressSnapshot.Workers(), dirs.Tmp, snapshots, db, nil, nil) + + agg, err := libstate.NewAggregator22(dirs.SnapHistory, ethconfig.HistoryV3AggregationStep, db) + if err != nil { + return err + } + err = agg.ReopenFiles() + if err != nil { + return err + } + agg.SetWorkers(estimate.CompressSnapshot.Workers()) log.Info("Params", "from", from, "to", to, "every", every) for i := from; i < to; i += every { @@ -356,7 +365,7 @@ func doRetireCommand(cliCtx *cli.Context) error { panic(err) } if err := db.Update(ctx, func(tx kv.RwTx) error { - if err := rawdb.WriteSnapshots(tx, br.Snapshots().Files()); err != nil { + if err := rawdb.WriteSnapshots(tx, br.Snapshots().Files(), agg.Files()); err != nil { return err } log.Info("prune blocks from db\n") @@ -371,15 +380,6 @@ func doRetireCommand(cliCtx *cli.Context) error { } } - agg, err := libstate.NewAggregator22(dirs.SnapHistory, ethconfig.HistoryV3AggregationStep, db) - if err != nil { - return err - } - err = agg.ReopenFiles() - if err != nil { - return err - } - agg.SetWorkers(estimate.CompressSnapshot.Workers()) if err = agg.BuildMissedIndices(); err != nil { return err } @@ -416,8 +416,19 @@ func doSnapshotCommand(cliCtx *cli.Context) error { if err := allSnapshots.ReopenFolder(); err != nil { return err } + + agg, err := libstate.NewAggregator22(dirs.SnapHistory, ethconfig.HistoryV3AggregationStep, db) + if err != nil { + return err + } + err = agg.ReopenFiles() + if err != nil { + return err + } + agg.SetWorkers(estimate.CompressSnapshot.Workers()) + if err := db.Update(ctx, func(tx kv.RwTx) error { - return rawdb.WriteSnapshots(tx, allSnapshots.Files()) + return rawdb.WriteSnapshots(tx, allSnapshots.Files(), agg.Files()) }); err != nil { return err } diff --git a/turbo/snapshotsync/block_snapshots.go b/turbo/snapshotsync/block_snapshots.go index 54f25df37..76fc8d86f 100644 --- a/turbo/snapshotsync/block_snapshots.go +++ b/turbo/snapshotsync/block_snapshots.go @@ -470,7 +470,7 @@ func (s *RoSnapshots) idxAvailability() uint64 { // - RPC return Nil for historical blocks if snapshots are not open func (s *RoSnapshots) OptimisticReopenWithDB(db kv.RoDB) { _ = db.View(context.Background(), func(tx kv.Tx) error { - snList, err := rawdb.ReadSnapshots(tx) + snList, _, err := rawdb.ReadSnapshots(tx) if err != nil { return err } @@ -691,7 +691,7 @@ func (s *RoSnapshots) ReopenFolder() error { } func (s *RoSnapshots) ReopenWithDB(db kv.RoDB) error { if err := db.View(context.Background(), func(tx kv.Tx) error { - snList, err := rawdb.ReadSnapshots(tx) + snList, _, err := rawdb.ReadSnapshots(tx) if err != nil { return err }