e3: prune limited amount before commit #675 (#5693)

This commit is contained in:
Alex Sharov 2022-10-11 11:25:13 +07:00 committed by GitHub
parent f09084f45d
commit a2e51a2469
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 27 additions and 14 deletions

View File

@ -1154,7 +1154,7 @@ func allSnapshots(db kv.RoDB) (*snapshotsync.RoSnapshots, *libstate.Aggregator22
aggDir := path.Join(datadirCli, "snapshots", "history")
dir.MustExist(aggDir)
var err error
_aggSingleton, err = libstate.NewAggregator22(aggDir, ethconfig.HistoryV3AggregationStep)
_aggSingleton, err = libstate.NewAggregator22(aggDir, ethconfig.HistoryV3AggregationStep, db)
if err != nil {
panic(err)
}

View File

@ -349,7 +349,7 @@ func RemoteServices(ctx context.Context, cfg httpcfg.HttpCfg, logger log.Logger,
allSnapshots.OptimisticReopenWithDB(db)
allSnapshots.LogStat()
if agg, err = libstate.NewAggregator22(cfg.Dirs.SnapHistory, ethconfig.HistoryV3AggregationStep); err != nil {
if agg, err = libstate.NewAggregator22(cfg.Dirs.SnapHistory, ethconfig.HistoryV3AggregationStep, db); err != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("create aggregator: %w", err)
}
if err = agg.ReopenFiles(); err != nil {

View File

@ -104,7 +104,7 @@ func ReplayTx(genesis *core.Genesis) error {
}
fmt.Printf("txNum = %d\n", txNum)
dirs := datadir2.New(datadir)
agg, err := libstate.NewAggregator22(dirs.SnapHistory, ethconfig.HistoryV3AggregationStep)
agg, err := libstate.NewAggregator22(dirs.SnapHistory, ethconfig.HistoryV3AggregationStep, nil)
if err != nil {
return fmt.Errorf("create history: %w", err)
}

View File

@ -885,7 +885,7 @@ func (s *Ethereum) setUpBlockReader(ctx context.Context, dirs datadir.Dirs, snCo
}
dir.MustExist(dirs.SnapHistory)
agg, err := libstate.NewAggregator22(dirs.SnapHistory, ethconfig.HistoryV3AggregationStep)
agg, err := libstate.NewAggregator22(dirs.SnapHistory, ethconfig.HistoryV3AggregationStep, s.chainDB)
if err != nil {
return nil, nil, nil, err
}

View File

@ -27,6 +27,7 @@ import (
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/core/state"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/eth/ethconfig"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/erigon/node/nodecfg/datadir"
"github.com/ledgerwatch/erigon/params"
@ -249,6 +250,10 @@ func Exec3(ctx context.Context,
return err
}
//TODO: can't commit - because we are in the middle of the block. Need make sure that we are always processed whole block.
if err = agg.Prune(ethconfig.HistoryV3AggregationStep / 10); err != nil { // prune part of retired data, before commit
return err
}
if err = tx.Commit(); err != nil {
return err
}
@ -356,6 +361,9 @@ loop:
return err
}
applyTx.CollectMetrics()
if err = agg.Prune(ethconfig.HistoryV3AggregationStep / 10); err != nil {
return err
}
if err := applyTx.Commit(); err != nil {
return err
}

View File

@ -155,7 +155,7 @@ func apply(tx kv.RwTx, agg *libstate.Aggregator22) (beforeBlock, afterBlock test
func newAgg(t *testing.T) *libstate.Aggregator22 {
t.Helper()
agg, err := libstate.NewAggregator22(t.TempDir(), ethconfig.HistoryV3AggregationStep)
agg, err := libstate.NewAggregator22(t.TempDir(), ethconfig.HistoryV3AggregationStep, nil)
require.NoError(t, err)
err = agg.ReopenFiles()
require.NoError(t, err)

6
go.mod
View File

@ -4,7 +4,7 @@ go 1.18
require (
github.com/gballet/go-verkle v0.0.0-20220923150140-6c08cd337774
github.com/ledgerwatch/erigon-lib v0.0.0-20221010021359-744fd18718b4
github.com/ledgerwatch/erigon-lib v0.0.0-20221011042341-e64b16fbfcb9
github.com/ledgerwatch/erigon-snapshot v1.0.1-0.20221010023441-7e1da3031ab0
github.com/ledgerwatch/log/v3 v3.4.2
github.com/ledgerwatch/secp256k1 v1.0.0
@ -72,6 +72,7 @@ require (
github.com/valyala/fastjson v1.6.3
github.com/xsleonard/go-merkle v1.1.0
go.uber.org/atomic v1.10.0
go.uber.org/zap v1.23.0
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d
golang.org/x/exp v0.0.0-20220921164117-439092de6870
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4
@ -155,7 +156,6 @@ require (
go.opentelemetry.io/otel v1.8.0 // indirect
go.opentelemetry.io/otel/trace v1.8.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
go.uber.org/zap v1.23.0 // indirect
lukechampine.com/blake3 v1.1.7 // indirect
)
@ -239,7 +239,7 @@ require (
golang.org/x/tools v0.1.12 // indirect
google.golang.org/genproto v0.0.0-20211118181313-81c1377c94b1 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
gopkg.in/yaml.v3 v3.0.1
gopkg.in/yaml.v3 v3.0.1 // indirect
gotest.tools/v3 v3.3.0 // indirect
lukechampine.com/uint128 v1.1.1 // indirect
modernc.org/cc/v3 v3.38.1 // indirect

4
go.sum
View File

@ -561,8 +561,8 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758 h1:0D5M2HQSGD3PYPwICLl+/9oulQauOuETfgFvhBDffs0=
github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c=
github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8=
github.com/ledgerwatch/erigon-lib v0.0.0-20221010021359-744fd18718b4 h1:j0k2Ww+udU4jgYFq0tBkqNZ1PP8ds2aG6JrY/Dy16iI=
github.com/ledgerwatch/erigon-lib v0.0.0-20221010021359-744fd18718b4/go.mod h1:YDP7ECNyjKo1dE7J5n8GXKBIYOWnmchvGCfALuwhBQg=
github.com/ledgerwatch/erigon-lib v0.0.0-20221011042341-e64b16fbfcb9 h1:rP+wyunlPGxYOZXOu324ObeRpKIyikh3VgveJOKYMQU=
github.com/ledgerwatch/erigon-lib v0.0.0-20221011042341-e64b16fbfcb9/go.mod h1:YDP7ECNyjKo1dE7J5n8GXKBIYOWnmchvGCfALuwhBQg=
github.com/ledgerwatch/erigon-snapshot v1.0.1-0.20221010023441-7e1da3031ab0 h1:72XYCqNaL9hxP5WVw3YzNp2s+MexmcTEWDUSHARj1nE=
github.com/ledgerwatch/erigon-snapshot v1.0.1-0.20221010023441-7e1da3031ab0/go.mod h1:3AuPxZc85jkehh/HA9h8gabv5MSi3kb/ddtzBsTVJFo=
github.com/ledgerwatch/log/v3 v3.4.2 h1:chvjB7c100rlIFgPv+Col2eerxIrHL88OiZRuPZDkxw=

View File

@ -325,7 +325,9 @@ func OpenDatabase(config *nodecfg.Config, logger log.Logger, label kv.Label) (kv
roTxLimit = int64(config.Http.DBReadConcurrency)
}
roTxsLimiter := semaphore.NewWeighted(roTxLimit) // 1 less than max to allow unlocking to happen
opts := mdbx.NewMDBX(logger).Path(dbPath).Label(label).DBVerbosity(config.DatabaseVerbosity).RoTxsLimiter(roTxsLimiter)
opts := mdbx.NewMDBX(logger).
WriteMergeThreshold(4 * 8192).
Path(dbPath).Label(label).DBVerbosity(config.DatabaseVerbosity).RoTxsLimiter(roTxsLimiter)
if exclusive {
opts = opts.Exclusive()
}

View File

@ -219,7 +219,7 @@ func doIndicesCommand(cliCtx *cli.Context) error {
if err := rebuildIndices("Indexing", ctx, chainDB, cfg, dirs, from, workers); err != nil {
log.Error("Error", "err", err)
}
agg, err := libstate.NewAggregator22(dirs.SnapHistory, ethconfig.HistoryV3AggregationStep)
agg, err := libstate.NewAggregator22(dirs.SnapHistory, ethconfig.HistoryV3AggregationStep, chainDB)
if err != nil {
return err
}
@ -371,7 +371,7 @@ func doRetireCommand(cliCtx *cli.Context) error {
}
}
agg, err := libstate.NewAggregator22(dirs.SnapHistory, ethconfig.HistoryV3AggregationStep)
agg, err := libstate.NewAggregator22(dirs.SnapHistory, ethconfig.HistoryV3AggregationStep, db)
if err != nil {
return err
}
@ -380,6 +380,9 @@ func doRetireCommand(cliCtx *cli.Context) error {
return err
}
agg.SetWorkers(estimate.CompressSnapshot.Workers())
if err = agg.BuildMissedIndices(); err != nil {
return err
}
if err = agg.Merge(); err != nil {
return err
}

View File

@ -254,7 +254,7 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey
if cfg.HistoryV3 {
dir.MustExist(dirs.SnapHistory)
mock.agg, err = libstate.NewAggregator22(dirs.SnapHistory, ethconfig.HistoryV3AggregationStep)
mock.agg, err = libstate.NewAggregator22(dirs.SnapHistory, ethconfig.HistoryV3AggregationStep, db)
if err != nil {
panic(err)
}