mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2025-01-03 01:27:38 +00:00
e3: history bsc and mainnet snapshots (#5725)
This commit is contained in:
parent
d2436ab344
commit
b12b0d4627
@ -352,9 +352,7 @@ func RemoteServices(ctx context.Context, cfg httpcfg.HttpCfg, logger log.Logger,
|
||||
if agg, err = libstate.NewAggregator22(cfg.Dirs.SnapHistory, ethconfig.HistoryV3AggregationStep, db); err != nil {
|
||||
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("create aggregator: %w", err)
|
||||
}
|
||||
if err = agg.ReopenFiles(); err != nil {
|
||||
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("create aggregator: %w", err)
|
||||
}
|
||||
_ = agg.ReopenFiles()
|
||||
|
||||
db.View(context.Background(), func(tx kv.Tx) error {
|
||||
agg.LogStats(tx, func(endTxNumMinimax uint64) uint64 {
|
||||
|
@ -1629,32 +1629,38 @@ func IsPosBlock(db kv.Getter, blockHash common.Hash) (trans bool, err error) {
|
||||
var SnapshotsKey = []byte("snapshots")
|
||||
var SnapshotsHistoryKey = []byte("snapshots_history")
|
||||
|
||||
func ReadSnapshots(tx kv.Tx) ([]string, error) {
|
||||
func ReadSnapshots(tx kv.Tx) ([]string, []string, error) {
|
||||
v, err := tx.GetOne(kv.DatabaseInfo, SnapshotsKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
var res []string
|
||||
var res, resHist []string
|
||||
_ = json.Unmarshal(v, &res)
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func ReadHistorySnapshots(tx kv.Tx) ([]string, error) {
|
||||
v, err := tx.GetOne(kv.DatabaseInfo, SnapshotsHistoryKey)
|
||||
v, err = tx.GetOne(kv.DatabaseInfo, SnapshotsHistoryKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
var res []string
|
||||
_ = json.Unmarshal(v, &res)
|
||||
return res, nil
|
||||
_ = json.Unmarshal(v, &resHist)
|
||||
return res, resHist, nil
|
||||
}
|
||||
|
||||
func WriteSnapshots(tx kv.RwTx, list []string) error {
|
||||
func WriteSnapshots(tx kv.RwTx, list, histList []string) error {
|
||||
res, err := json.Marshal(list)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return tx.Put(kv.DatabaseInfo, SnapshotsKey, res)
|
||||
if err := tx.Put(kv.DatabaseInfo, SnapshotsKey, res); err != nil {
|
||||
return err
|
||||
}
|
||||
res, err = json.Marshal(histList)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := tx.Put(kv.DatabaseInfo, SnapshotsHistoryKey, res); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func WriteHistorySnapshots(tx kv.RwTx, list []string) error {
|
||||
res, err := json.Marshal(list)
|
||||
|
@ -67,7 +67,7 @@ services:
|
||||
|
||||
|
||||
prometheus:
|
||||
image: prom/prometheus:v2.37.1
|
||||
image: prom/prometheus:v2.39.1
|
||||
user: ${DOCKER_UID:-1000}:${DOCKER_GID:-1000} # Uses erigon user from Dockerfile
|
||||
command: --log.level=warn --config.file=/etc/prometheus/prometheus.yml --storage.tsdb.path=/prometheus --storage.tsdb.retention.time=150d --web.console.libraries=/usr/share/prometheus/console_libraries --web.console.templates=/usr/share/prometheus/consoles
|
||||
ports: [ "9090:9090" ]
|
||||
@ -77,7 +77,7 @@ services:
|
||||
restart: unless-stopped
|
||||
|
||||
grafana:
|
||||
image: grafana/grafana:9.1.6
|
||||
image: grafana/grafana:9.2.0
|
||||
user: "472:0" # required for grafana version >= 7.3
|
||||
ports: [ "3000:3000" ]
|
||||
volumes:
|
||||
|
@ -310,7 +310,7 @@ func WaitForDownloader(s *StageState, ctx context.Context, cfg SnapshotsCfg, tx
|
||||
return nil
|
||||
}
|
||||
|
||||
snInDB, err := rawdb.ReadSnapshots(tx)
|
||||
snInDB, snHistInDB, err := rawdb.ReadSnapshots(tx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -325,10 +325,6 @@ func WaitForDownloader(s *StageState, ctx context.Context, cfg SnapshotsCfg, tx
|
||||
if len(missingSnapshots) > 0 {
|
||||
log.Warn(fmt.Sprintf("[%s] downloading missing snapshots", s.LogPrefix()))
|
||||
}
|
||||
snHistInDB, err := rawdb.ReadHistorySnapshots(tx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// send all hashes to the Downloader service
|
||||
preverifiedBlockSnapshots := snapcfg.KnownCfg(cfg.chainConfig.ChainName, snInDB, snHistInDB).Preverified
|
||||
@ -430,7 +426,7 @@ Finish:
|
||||
return err
|
||||
}
|
||||
|
||||
if err := rawdb.WriteSnapshots(tx, cfg.snapshots.Files()); err != nil {
|
||||
if err := rawdb.WriteSnapshots(tx, cfg.snapshots.Files(), cfg.agg.Files()); err != nil {
|
||||
return err
|
||||
}
|
||||
if cfg.dbEventNotifier != nil { // can notify right here, even that write txn is not commit
|
||||
@ -452,15 +448,14 @@ Finish:
|
||||
|
||||
func calculateTime(amountLeft, rate uint64) string {
|
||||
if rate == 0 {
|
||||
return "999hrs:99m:99s"
|
||||
return "999hrs:99m"
|
||||
}
|
||||
timeLeftInSeconds := amountLeft / rate
|
||||
|
||||
hours := timeLeftInSeconds / 3600
|
||||
minutes := (timeLeftInSeconds / 60) % 60
|
||||
seconds := timeLeftInSeconds % 60
|
||||
|
||||
return fmt.Sprintf("%dhrs:%dm:%ds", hours, minutes, seconds)
|
||||
return fmt.Sprintf("%dhrs:%dm", hours, minutes)
|
||||
}
|
||||
|
||||
/* ====== PRUNING ====== */
|
||||
@ -482,7 +477,7 @@ func SnapshotsPrune(s *PruneState, cfg SnapshotsCfg, ctx context.Context, tx kv.
|
||||
return err
|
||||
}
|
||||
|
||||
if err := retireBlocksInSingleBackgroundThread(s, cfg.blockRetire, ctx, tx); err != nil {
|
||||
if err := retireBlocksInSingleBackgroundThread(s, cfg.blockRetire, cfg.agg, ctx, tx); err != nil {
|
||||
return fmt.Errorf("retireBlocksInSingleBackgroundThread: %w", err)
|
||||
}
|
||||
}
|
||||
@ -497,7 +492,7 @@ func SnapshotsPrune(s *PruneState, cfg SnapshotsCfg, ctx context.Context, tx kv.
|
||||
}
|
||||
|
||||
// retiring blocks in a single thread in the brackground
|
||||
func retireBlocksInSingleBackgroundThread(s *PruneState, blockRetire *snapshotsync.BlockRetire, ctx context.Context, tx kv.RwTx) (err error) {
|
||||
func retireBlocksInSingleBackgroundThread(s *PruneState, blockRetire *snapshotsync.BlockRetire, agg *state.Aggregator22, ctx context.Context, tx kv.RwTx) (err error) {
|
||||
// if something already happens in background - noop
|
||||
if blockRetire.Working() {
|
||||
return nil
|
||||
@ -507,7 +502,7 @@ func retireBlocksInSingleBackgroundThread(s *PruneState, blockRetire *snapshotsy
|
||||
return fmt.Errorf("[%s] %w", s.LogPrefix(), err)
|
||||
}
|
||||
if ok {
|
||||
if err := rawdb.WriteSnapshots(tx, blockRetire.Snapshots().Files()); err != nil {
|
||||
if err := rawdb.WriteSnapshots(tx, blockRetire.Snapshots().Files(), agg.Files()); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
2
go.mod
2
go.mod
@ -4,7 +4,7 @@ go 1.18
|
||||
|
||||
require (
|
||||
github.com/gballet/go-verkle v0.0.0-20220829125900-a702d458d33c
|
||||
github.com/ledgerwatch/erigon-lib v0.0.0-20221012183632-cfd89757dbff
|
||||
github.com/ledgerwatch/erigon-lib v0.0.0-20221013021436-af62ccaca9cc
|
||||
github.com/ledgerwatch/erigon-snapshot v1.0.1-0.20221012092130-0962bd35abe1
|
||||
github.com/ledgerwatch/log/v3 v3.4.2
|
||||
github.com/ledgerwatch/secp256k1 v1.0.0
|
||||
|
4
go.sum
4
go.sum
@ -563,8 +563,8 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
|
||||
github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758 h1:0D5M2HQSGD3PYPwICLl+/9oulQauOuETfgFvhBDffs0=
|
||||
github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c=
|
||||
github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8=
|
||||
github.com/ledgerwatch/erigon-lib v0.0.0-20221012183632-cfd89757dbff h1:qbBv5CevdfZiRkArXXwjkCNRwN5RTGNX2OIj17zmIFo=
|
||||
github.com/ledgerwatch/erigon-lib v0.0.0-20221012183632-cfd89757dbff/go.mod h1:GT9MS8/L8VEWWJFsyJ3s5uhGyR4+rtws8KBgrWyrOwY=
|
||||
github.com/ledgerwatch/erigon-lib v0.0.0-20221013021436-af62ccaca9cc h1:pO9T6xQZum5FYmQgBq3WtnM1jpNzCkxfQ/Z4rmGXKmo=
|
||||
github.com/ledgerwatch/erigon-lib v0.0.0-20221013021436-af62ccaca9cc/go.mod h1:GT9MS8/L8VEWWJFsyJ3s5uhGyR4+rtws8KBgrWyrOwY=
|
||||
github.com/ledgerwatch/erigon-snapshot v1.0.1-0.20221012092130-0962bd35abe1 h1:UZFTr9Zee9JdjVch0TEQsBd9fI4xIMQXyB3DjgKT1gw=
|
||||
github.com/ledgerwatch/erigon-snapshot v1.0.1-0.20221012092130-0962bd35abe1/go.mod h1:3AuPxZc85jkehh/HA9h8gabv5MSi3kb/ddtzBsTVJFo=
|
||||
github.com/ledgerwatch/log/v3 v3.4.2 h1:chvjB7c100rlIFgPv+Col2eerxIrHL88OiZRuPZDkxw=
|
||||
|
@ -347,8 +347,17 @@ func doRetireCommand(cliCtx *cli.Context) error {
|
||||
return err
|
||||
}
|
||||
|
||||
workers := cmp.Max(1, runtime.GOMAXPROCS(-1)-1)
|
||||
br := snapshotsync.NewBlockRetire(workers, dirs.Tmp, snapshots, db, nil, nil)
|
||||
br := snapshotsync.NewBlockRetire(estimate.CompressSnapshot.Workers(), dirs.Tmp, snapshots, db, nil, nil)
|
||||
|
||||
agg, err := libstate.NewAggregator22(dirs.SnapHistory, ethconfig.HistoryV3AggregationStep, db)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = agg.ReopenFiles()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
agg.SetWorkers(estimate.CompressSnapshot.Workers())
|
||||
|
||||
log.Info("Params", "from", from, "to", to, "every", every)
|
||||
for i := from; i < to; i += every {
|
||||
@ -356,7 +365,7 @@ func doRetireCommand(cliCtx *cli.Context) error {
|
||||
panic(err)
|
||||
}
|
||||
if err := db.Update(ctx, func(tx kv.RwTx) error {
|
||||
if err := rawdb.WriteSnapshots(tx, br.Snapshots().Files()); err != nil {
|
||||
if err := rawdb.WriteSnapshots(tx, br.Snapshots().Files(), agg.Files()); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Info("prune blocks from db\n")
|
||||
@ -371,15 +380,6 @@ func doRetireCommand(cliCtx *cli.Context) error {
|
||||
}
|
||||
}
|
||||
|
||||
agg, err := libstate.NewAggregator22(dirs.SnapHistory, ethconfig.HistoryV3AggregationStep, db)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = agg.ReopenFiles()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
agg.SetWorkers(estimate.CompressSnapshot.Workers())
|
||||
if err = agg.BuildMissedIndices(); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -416,8 +416,19 @@ func doSnapshotCommand(cliCtx *cli.Context) error {
|
||||
if err := allSnapshots.ReopenFolder(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
agg, err := libstate.NewAggregator22(dirs.SnapHistory, ethconfig.HistoryV3AggregationStep, db)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = agg.ReopenFiles()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
agg.SetWorkers(estimate.CompressSnapshot.Workers())
|
||||
|
||||
if err := db.Update(ctx, func(tx kv.RwTx) error {
|
||||
return rawdb.WriteSnapshots(tx, allSnapshots.Files())
|
||||
return rawdb.WriteSnapshots(tx, allSnapshots.Files(), agg.Files())
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -470,7 +470,7 @@ func (s *RoSnapshots) idxAvailability() uint64 {
|
||||
// - RPC return Nil for historical blocks if snapshots are not open
|
||||
func (s *RoSnapshots) OptimisticReopenWithDB(db kv.RoDB) {
|
||||
_ = db.View(context.Background(), func(tx kv.Tx) error {
|
||||
snList, err := rawdb.ReadSnapshots(tx)
|
||||
snList, _, err := rawdb.ReadSnapshots(tx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -691,7 +691,7 @@ func (s *RoSnapshots) ReopenFolder() error {
|
||||
}
|
||||
func (s *RoSnapshots) ReopenWithDB(db kv.RoDB) error {
|
||||
if err := db.View(context.Background(), func(tx kv.Tx) error {
|
||||
snList, err := rawdb.ReadSnapshots(tx)
|
||||
snList, _, err := rawdb.ReadSnapshots(tx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user