mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2024-12-22 03:30:37 +00:00
prune speedup. stage_senders: don't re-calc existing senders (#7643)
- stage_senders: don't re-calc existing senders - stage_tx_lookup: prune less blocks per iteration - because random-deletes are expensive. pruning must not slow-down sync. - prune data even if --snap.stop is set - "prune as-much-as-possible at startup" is not very good idea: at initialCycle machine can be cold and prune will cause big downtime, no reason to produce much freelist in 1 tx. People may also restart erigon - because of some bug - and it will cause unexpected downtime (usually Erigon startup very fast). So, I just remove all `initialSync`-related logic in pruning. - fix lost metrics about disk write byte/sec
This commit is contained in:
parent
6f54cc6ef0
commit
ad72b7178e
@ -47,7 +47,7 @@ func NewStagedSync(
|
||||
blockWriter *blockio.BlockWriter,
|
||||
) (*stagedsync.Sync, error) {
|
||||
dirs := cfg.Dirs
|
||||
blockRetire := snapshotsync.NewBlockRetire(1, dirs.Tmp, blockReader, db, snapDownloader, notifications.Events, logger)
|
||||
blockRetire := snapshotsync.NewBlockRetire(1, dirs.Tmp, blockReader, blockWriter, db, snapDownloader, notifications.Events, logger)
|
||||
|
||||
// During Import we don't want other services like header requests, body requests etc. to be running.
|
||||
// Hence we run it in the test mode.
|
||||
|
@ -36,6 +36,19 @@ func PruneMode(db kv.RoDB) (pm prune.Mode) {
|
||||
return
|
||||
}
|
||||
func TxsV3(db kv.RoDB) (enabled bool) {
|
||||
if err := db.View(context.Background(), func(tx kv.Tx) error {
|
||||
var err error
|
||||
enabled, err = kvcfg.TransactionsV3.Enabled(tx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return
|
||||
}
|
||||
func HistV3(db kv.RoDB) (enabled bool) {
|
||||
if err := db.View(context.Background(), func(tx kv.Tx) error {
|
||||
var err error
|
||||
enabled, err = kvcfg.HistoryV3.Enabled(tx)
|
||||
|
@ -868,7 +868,7 @@ func stageSenders(db kv.RwDB, ctx context.Context, logger log.Logger) error {
|
||||
|
||||
var blockRetire *snapshotsync.BlockRetire
|
||||
if sn.Cfg().Enabled {
|
||||
blockRetire = snapshotsync.NewBlockRetire(estimate.CompressSnapshot.Workers(), tmpdir, br, db, nil, nil, logger)
|
||||
blockRetire = snapshotsync.NewBlockRetire(estimate.CompressSnapshot.Workers(), tmpdir, br, bw, db, nil, nil, logger)
|
||||
}
|
||||
|
||||
pm, err := prune.Get(tx)
|
||||
|
@ -16,6 +16,7 @@ import (
|
||||
"github.com/ledgerwatch/erigon-lib/kv"
|
||||
kv2 "github.com/ledgerwatch/erigon-lib/kv/mdbx"
|
||||
"github.com/ledgerwatch/erigon-lib/kv/temporal/historyv2"
|
||||
"github.com/ledgerwatch/erigon/cmd/utils"
|
||||
"github.com/ledgerwatch/log/v3"
|
||||
"github.com/spf13/cobra"
|
||||
|
||||
@ -41,7 +42,7 @@ func init() {
|
||||
withSnapshotBlocks(checkChangeSetsCmd)
|
||||
checkChangeSetsCmd.Flags().StringVar(&historyfile, "historyfile", "", "path to the file where the changesets and history are expected to be. If omitted, the same as <datadir>/erion/chaindata")
|
||||
checkChangeSetsCmd.Flags().BoolVar(&nocheck, "nocheck", false, "set to turn off the changeset checking and only execute transaction (for performance testing)")
|
||||
checkChangeSetsCmd.Flags().BoolVar(&transactionsV3, "experimental.transactions.v3", false, "(this flag is in testing stage) Not recommended yet: Can't change this flag after node creation. New DB table for transactions allows keeping multiple branches of block bodies in the DB simultaneously")
|
||||
checkChangeSetsCmd.Flags().BoolVar(&transactionsV3, utils.TransactionV3Flag.Name, utils.TransactionV3Flag.Value, utils.TransactionV3Flag.Usage)
|
||||
rootCmd.AddCommand(checkChangeSetsCmd)
|
||||
}
|
||||
|
||||
|
@ -1291,12 +1291,15 @@ func DeleteAncientBlocks(tx kv.RwTx, blockTo uint64, blocksDeleteLimit int) erro
|
||||
// Copying k because otherwise the same memory will be reused
|
||||
// for the next key and Delete below will end up deleting 1 more record than required
|
||||
kCopy := common.CopyBytes(k)
|
||||
if err = tx.Delete(kv.Headers, kCopy); err != nil {
|
||||
if err = tx.Delete(kv.Senders, kCopy); err != nil {
|
||||
return err
|
||||
}
|
||||
if err = tx.Delete(kv.BlockBody, kCopy); err != nil {
|
||||
return err
|
||||
}
|
||||
if err = tx.Delete(kv.Headers, kCopy); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -19,6 +19,10 @@ import (
|
||||
"github.com/ledgerwatch/log/v3"
|
||||
)
|
||||
|
||||
//Naming:
|
||||
// Prune: delete old data
|
||||
// Unwind: delete recent data
|
||||
|
||||
// BlockReader can read blocks from db and snapshots
|
||||
type BlockWriter struct {
|
||||
historyV3 bool
|
||||
@ -114,6 +118,11 @@ func (w *BlockWriter) MakeBodiesNonCanonical(tx kv.RwTx, from uint64, deleteBodi
|
||||
return nil
|
||||
}
|
||||
|
||||
//if deleteBodies {
|
||||
//if err := rawdb.MakeBodiesNonCanonical(tx, from, deleteBodies, ctx, logPrefix, logEvery); err != nil {
|
||||
// return err
|
||||
//}
|
||||
//}
|
||||
if w.historyV3 {
|
||||
if err := rawdbv3.TxNums.Truncate(tx, from); err != nil {
|
||||
return err
|
||||
@ -122,7 +131,7 @@ func (w *BlockWriter) MakeBodiesNonCanonical(tx kv.RwTx, from uint64, deleteBodi
|
||||
return nil
|
||||
}
|
||||
|
||||
func extractHeaders(k []byte, v []byte, next etl.ExtractNextFunc) error {
|
||||
func extractHeaders(k []byte, _ []byte, next etl.ExtractNextFunc) error {
|
||||
// We only want to extract entries composed by Block Number + Header Hash
|
||||
if len(k) != 40 {
|
||||
return nil
|
||||
@ -169,10 +178,10 @@ func (w *BlockWriter) ResetSenders(ctx context.Context, db kv.RoDB, tx kv.RwTx)
|
||||
return backup.ClearTables(ctx, db, tx, kv.Senders)
|
||||
}
|
||||
|
||||
// Prune - [1, to) old blocks after moving it to snapshots.
|
||||
// PruneBlocks - [1, to) old blocks after moving it to snapshots.
|
||||
// keeps genesis in db
|
||||
// doesn't change sequences of kv.EthTx and kv.NonCanonicalTxs
|
||||
// doesn't delete Receipts, Senders, Canonical markers, TotalDifficulty
|
||||
func (w *BlockWriter) Prune(ctx context.Context, tx kv.RwTx, blockTo uint64, blocksDeleteLimit int) error {
|
||||
func (w *BlockWriter) PruneBlocks(ctx context.Context, tx kv.RwTx, blockTo uint64, blocksDeleteLimit int) error {
|
||||
return rawdb.DeleteAncientBlocks(tx, blockTo, blocksDeleteLimit)
|
||||
}
|
||||
|
@ -368,7 +368,6 @@ var StateUnwindOrder = UnwindOrder{
|
||||
|
||||
var DefaultPruneOrder = PruneOrder{
|
||||
stages.Finish,
|
||||
stages.Snapshots,
|
||||
stages.TxLookup,
|
||||
stages.LogIndex,
|
||||
stages.StorageHistoryIndex,
|
||||
@ -386,6 +385,7 @@ var DefaultPruneOrder = PruneOrder{
|
||||
stages.Bodies,
|
||||
stages.BlockHashes,
|
||||
stages.Headers,
|
||||
stages.Snapshots,
|
||||
}
|
||||
|
||||
var MiningUnwindOrder = UnwindOrder{} // nothing to unwind in mining - because mining does not commit db changes
|
||||
|
@ -711,7 +711,9 @@ func saveDownloadedPoSHeaders(tx kv.RwTx, cfg HeadersCfg, headerInserter *header
|
||||
logger.Info("PoS headers verified and saved", "requestId", cfg.hd.RequestId(), "fork head", lastValidHash)
|
||||
}
|
||||
|
||||
cfg.hd.HeadersCollector().Close()
|
||||
if cfg.hd.HeadersCollector() != nil {
|
||||
cfg.hd.HeadersCollector().Close()
|
||||
}
|
||||
cfg.hd.SetHeadersCollector(nil)
|
||||
cfg.hd.SetPosStatus(headerdownload.Idle)
|
||||
}
|
||||
|
@ -75,7 +75,6 @@ func SpawnRecoverSendersStage(cfg SendersCfg, s *StageState, u Unwinder, tx kv.R
|
||||
if cfg.blockRetire != nil && cfg.blockRetire.Snapshots() != nil && cfg.blockRetire.Snapshots().Cfg().Enabled && s.BlockNumber < cfg.blockRetire.Snapshots().BlocksAvailable() {
|
||||
s.BlockNumber = cfg.blockRetire.Snapshots().BlocksAvailable()
|
||||
}
|
||||
txsV3Enabled := cfg.blockWriter.TxsV3Enabled() // allow stor senders for non-canonical blocks
|
||||
|
||||
quitCh := ctx.Done()
|
||||
useExternalTx := tx != nil
|
||||
@ -108,40 +107,7 @@ func SpawnRecoverSendersStage(cfg SendersCfg, s *StageState, u Unwinder, tx kv.R
|
||||
logEvery := time.NewTicker(30 * time.Second)
|
||||
defer logEvery.Stop()
|
||||
|
||||
canonicalC, err := tx.Cursor(kv.HeaderCanonical)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer canonicalC.Close()
|
||||
|
||||
startFrom := s.BlockNumber + 1
|
||||
currentHeaderIdx := uint64(0)
|
||||
canonical := make([]libcommon.Hash, to-s.BlockNumber)
|
||||
|
||||
if !txsV3Enabled {
|
||||
for k, v, err := canonicalC.Seek(hexutility.EncodeTs(startFrom)); k != nil; k, v, err = canonicalC.Next() {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := libcommon.Stopped(quitCh); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if currentHeaderIdx >= to-s.BlockNumber { // if header stage is ehead of body stage
|
||||
break
|
||||
}
|
||||
|
||||
copy(canonical[currentHeaderIdx][:], v)
|
||||
currentHeaderIdx++
|
||||
|
||||
select {
|
||||
default:
|
||||
case <-logEvery.C:
|
||||
logger.Info(fmt.Sprintf("[%s] Preload headers", logPrefix), "block_number", binary.BigEndian.Uint64(k))
|
||||
}
|
||||
}
|
||||
}
|
||||
logger.Trace(fmt.Sprintf("[%s] Read canonical hashes", logPrefix), "amount", len(canonical))
|
||||
|
||||
jobs := make(chan *senderRecoveryJob, cfg.batchSize)
|
||||
out := make(chan *senderRecoveryJob, cfg.batchSize)
|
||||
@ -214,14 +180,14 @@ func SpawnRecoverSendersStage(cfg SendersCfg, s *StageState, u Unwinder, tx kv.R
|
||||
return nil
|
||||
}
|
||||
|
||||
bodiesC, err := tx.Cursor(kv.BlockBody)
|
||||
bodiesC, err := tx.Cursor(kv.HeaderCanonical)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer bodiesC.Close()
|
||||
|
||||
Loop:
|
||||
for k, _, err := bodiesC.Seek(hexutility.EncodeTs(startFrom)); k != nil; k, _, err = bodiesC.Next() {
|
||||
for k, v, err := bodiesC.Seek(hexutility.EncodeTs(startFrom)); k != nil; k, v, err = bodiesC.Next() {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -229,32 +195,28 @@ Loop:
|
||||
return err
|
||||
}
|
||||
|
||||
blockNumber := binary.BigEndian.Uint64(k[:8])
|
||||
blockHash := libcommon.BytesToHash(k[8:])
|
||||
blockNumber := binary.BigEndian.Uint64(k)
|
||||
blockHash := libcommon.BytesToHash(v)
|
||||
|
||||
if blockNumber > to {
|
||||
break
|
||||
}
|
||||
|
||||
has, err := cfg.blockReader.HasSenders(ctx, tx, blockHash, blockNumber)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if has {
|
||||
continue
|
||||
}
|
||||
|
||||
var body *types.Body
|
||||
if txsV3Enabled {
|
||||
if body, err = cfg.blockReader.BodyWithTransactions(ctx, tx, blockHash, blockNumber); err != nil {
|
||||
return err
|
||||
}
|
||||
if body == nil {
|
||||
logger.Warn(fmt.Sprintf("[%s] blockReader.BodyWithTransactions can't find block", logPrefix), "num", blockNumber, "hash", blockHash)
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
if canonical[blockNumber-s.BlockNumber-1] != blockHash {
|
||||
// non-canonical case
|
||||
continue
|
||||
}
|
||||
body = rawdb.ReadCanonicalBodyWithTransactions(tx, blockHash, blockNumber)
|
||||
if body == nil {
|
||||
logger.Warn(fmt.Sprintf("[%s] ReadCanonicalBodyWithTransactions can't find block", logPrefix), "num", blockNumber, "hash", blockHash)
|
||||
continue
|
||||
}
|
||||
if body, err = cfg.blockReader.BodyWithTransactions(ctx, tx, blockHash, blockNumber); err != nil {
|
||||
return err
|
||||
}
|
||||
if body == nil {
|
||||
logger.Warn(fmt.Sprintf("[%s] ReadBodyWithTransactions can't find block", logPrefix), "num", blockNumber, "hash", blockHash)
|
||||
continue
|
||||
}
|
||||
|
||||
select {
|
||||
@ -397,8 +359,6 @@ func UnwindSendersStage(s *UnwindState, tx kv.RwTx, cfg SendersCfg, ctx context.
|
||||
}
|
||||
|
||||
func PruneSendersStage(s *PruneState, tx kv.RwTx, cfg SendersCfg, ctx context.Context) (err error) {
|
||||
logEvery := time.NewTicker(logInterval)
|
||||
defer logEvery.Stop()
|
||||
useExternalTx := tx != nil
|
||||
if !useExternalTx {
|
||||
tx, err = cfg.db.BeginRw(ctx)
|
||||
@ -408,9 +368,11 @@ func PruneSendersStage(s *PruneState, tx kv.RwTx, cfg SendersCfg, ctx context.Co
|
||||
defer tx.Rollback()
|
||||
}
|
||||
sn := cfg.blockRetire.Snapshots()
|
||||
if !(sn != nil && sn.Cfg().Enabled && sn.Cfg().Produce) && cfg.prune.TxIndex.Enabled() {
|
||||
if sn.Cfg().Enabled {
|
||||
// noop. in this case senders will be deleted by BlockRetire.PruneAncientBlocks after data-freezing.
|
||||
} else if cfg.prune.TxIndex.Enabled() {
|
||||
to := cfg.prune.TxIndex.PruneTo(s.ForwardProgress)
|
||||
if err = rawdb.PruneTable(tx, kv.Senders, to, ctx, 1_000); err != nil {
|
||||
if err = rawdb.PruneTable(tx, kv.Senders, to, ctx, 100); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -130,7 +130,7 @@ func TestSenders(t *testing.T) {
|
||||
|
||||
require.NoError(stages.SaveStageProgress(tx, stages.Bodies, 3))
|
||||
|
||||
blockRetire := snapshotsync.NewBlockRetire(1, "", br, db, nil, nil, logger)
|
||||
blockRetire := snapshotsync.NewBlockRetire(1, "", br, bw, db, nil, nil, logger)
|
||||
cfg := stagedsync.StageSendersCfg(db, params.TestChainConfig, false, "", prune.Mode{}, blockRetire, bw, br, nil)
|
||||
err = stagedsync.SpawnRecoverSendersStage(cfg, &stagedsync.StageState{ID: stages.Senders}, nil, tx, 3, m.Ctx, log.New())
|
||||
require.NoError(err)
|
||||
|
@ -14,7 +14,6 @@ import (
|
||||
|
||||
"github.com/ledgerwatch/erigon-lib/chain"
|
||||
libcommon "github.com/ledgerwatch/erigon-lib/common"
|
||||
"github.com/ledgerwatch/erigon-lib/common/cmp"
|
||||
"github.com/ledgerwatch/erigon-lib/common/datadir"
|
||||
"github.com/ledgerwatch/erigon-lib/common/dbg"
|
||||
"github.com/ledgerwatch/erigon-lib/downloader/snaptype"
|
||||
@ -269,23 +268,11 @@ func FillDBFromSnapshots(logPrefix string, ctx context.Context, tx kv.RwTx,
|
||||
return err
|
||||
}
|
||||
if historyV3 {
|
||||
var toBlock uint64
|
||||
if sn != nil {
|
||||
toBlock = sn.BlocksAvailable()
|
||||
}
|
||||
toBlock = cmp.Max(toBlock, progress)
|
||||
|
||||
_ = tx.ClearBucket(kv.MaxTxNum)
|
||||
if err := rawdbv3.TxNums.WriteForGenesis(tx, 1); err != nil {
|
||||
return err
|
||||
}
|
||||
type IterBody interface {
|
||||
IterateBodies(f func(blockNum, baseTxNum, txAmount uint64) error) error
|
||||
IterateFrozenBodies(f func(blockNum, baseTxNum, txAmount uint64) error) error
|
||||
}
|
||||
if err := blockReader.(IterBody).IterateBodies(func(blockNum, baseTxNum, txAmount uint64) error {
|
||||
if blockNum == 0 || blockNum > toBlock {
|
||||
return nil
|
||||
}
|
||||
if err := blockReader.(IterBody).IterateFrozenBodies(func(blockNum, baseTxNum, txAmount uint64) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
@ -302,6 +289,15 @@ func FillDBFromSnapshots(logPrefix string, ctx context.Context, tx kv.RwTx,
|
||||
}); err != nil {
|
||||
return fmt.Errorf("build txNum => blockNum mapping: %w", err)
|
||||
}
|
||||
if blockReader.Snapshots().BlocksAvailable() > 0 {
|
||||
if err := rawdb.AppendCanonicalTxNums(tx, blockReader.Snapshots().BlocksAvailable()+1); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
if err := rawdb.AppendCanonicalTxNums(tx, 0); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
if err := rawdb.WriteSnapshots(tx, sn.Files(), agg.Files()); err != nil {
|
||||
return err
|
||||
@ -498,13 +494,14 @@ func SnapshotsPrune(s *PruneState, cfg SnapshotsCfg, ctx context.Context, tx kv.
|
||||
defer tx.Rollback()
|
||||
}
|
||||
|
||||
sn := cfg.blockRetire.Snapshots()
|
||||
if sn != nil && sn.Cfg().Enabled && sn.Cfg().Produce {
|
||||
br := cfg.blockRetire
|
||||
br := cfg.blockRetire
|
||||
sn := br.Snapshots()
|
||||
if sn.Cfg().Enabled {
|
||||
if err := br.PruneAncientBlocks(tx, 100); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
}
|
||||
if sn.Cfg().Enabled && sn.Cfg().Produce {
|
||||
//TODO: initialSync maybe save files progress here
|
||||
if cfg.agg.NeedSaveFilesListInDB() || br.NeedSaveFilesListInDB() {
|
||||
if err := rawdb.WriteSnapshots(tx, br.Snapshots().Files(), cfg.agg.Files()); err != nil {
|
||||
|
@ -117,14 +117,12 @@ func SpawnTxLookup(s *StageState, tx kv.RwTx, toBlock uint64, cfg TxLookupCfg, c
|
||||
// txnLookupTransform - [startKey, endKey)
|
||||
func txnLookupTransform(logPrefix string, tx kv.RwTx, blockFrom, blockTo uint64, ctx context.Context, cfg TxLookupCfg, logger log.Logger) (err error) {
|
||||
bigNum := new(big.Int)
|
||||
txsV3Enabled := cfg.blockReader.TxsV3Enabled()
|
||||
if txsV3Enabled {
|
||||
panic("implement me. need iterate by kv.BlockID instead of kv.HeaderCanonical")
|
||||
}
|
||||
|
||||
return etl.Transform(logPrefix, tx, kv.HeaderCanonical, kv.TxLookup, cfg.tmpdir, func(k, v []byte, next etl.ExtractNextFunc) error {
|
||||
blocknum, blockHash := binary.BigEndian.Uint64(k), libcommon.CastToHash(v)
|
||||
body := rawdb.ReadCanonicalBodyWithTransactions(tx, blockHash, blocknum)
|
||||
body, err := cfg.blockReader.BodyWithTransactions(ctx, tx, blockHash, blocknum)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if body == nil {
|
||||
return fmt.Errorf("transform: empty block body %d, hash %x", blocknum, v)
|
||||
}
|
||||
@ -236,10 +234,8 @@ func PruneTxLookup(s *PruneState, tx kv.RwTx, cfg TxLookupCfg, ctx context.Conte
|
||||
} else if blockSnapshots != nil && blockSnapshots.Cfg().Enabled {
|
||||
blockTo = snapshotsync.CanDeleteTo(s.ForwardProgress, blockSnapshots)
|
||||
}
|
||||
|
||||
if !initialCycle { // limit time for pruning
|
||||
blockTo = cmp.Min(blockTo, blockFrom+100)
|
||||
}
|
||||
// can't prune much here: because tx_lookup index has crypto-hashed-keys, and 1 block producing hundreds of deletes
|
||||
blockTo = cmp.Min(blockTo, blockFrom+10)
|
||||
|
||||
if blockFrom < blockTo {
|
||||
if err = deleteTxLookupRange(tx, logPrefix, blockFrom, blockTo, ctx, cfg, logger); err != nil {
|
||||
@ -269,7 +265,10 @@ func PruneTxLookup(s *PruneState, tx kv.RwTx, cfg TxLookupCfg, ctx context.Conte
|
||||
func deleteTxLookupRange(tx kv.RwTx, logPrefix string, blockFrom, blockTo uint64, ctx context.Context, cfg TxLookupCfg, logger log.Logger) error {
|
||||
return etl.Transform(logPrefix, tx, kv.HeaderCanonical, kv.TxLookup, cfg.tmpdir, func(k, v []byte, next etl.ExtractNextFunc) error {
|
||||
blocknum, blockHash := binary.BigEndian.Uint64(k), libcommon.CastToHash(v)
|
||||
body := rawdb.ReadCanonicalBodyWithTransactions(tx, blockHash, blocknum)
|
||||
body, err := cfg.blockReader.BodyWithTransactions(ctx, tx, blockHash, blocknum)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if body == nil {
|
||||
log.Debug("TxLookup pruning, empty block body", "height", blocknum)
|
||||
return nil
|
||||
|
@ -17,7 +17,7 @@ import (
|
||||
func Setup(address string) {
|
||||
http.HandleFunc("/debug/metrics/prometheus", func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Access-Control-Allow-Origin", "*")
|
||||
metrics2.WritePrometheus(w, false)
|
||||
metrics2.WritePrometheus(w, true)
|
||||
contentType := expfmt.Negotiate(r.Header)
|
||||
enc := expfmt.NewEncoder(w, contentType)
|
||||
mf, err := prometheus.DefaultGatherer.Gather()
|
||||
|
@ -28,6 +28,7 @@ import (
|
||||
"github.com/ledgerwatch/erigon/cmd/hack/tool/fromdb"
|
||||
"github.com/ledgerwatch/erigon/cmd/utils"
|
||||
"github.com/ledgerwatch/erigon/core/rawdb"
|
||||
"github.com/ledgerwatch/erigon/core/rawdb/blockio"
|
||||
"github.com/ledgerwatch/erigon/eth/ethconfig"
|
||||
"github.com/ledgerwatch/erigon/eth/ethconfig/estimate"
|
||||
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
|
||||
@ -415,8 +416,9 @@ func doRetireCommand(cliCtx *cli.Context) error {
|
||||
return err
|
||||
}
|
||||
blockReader := snapshotsync.NewBlockReader(snapshots, fromdb.TxsV3(db))
|
||||
blockWriter := blockio.NewBlockWriter(fromdb.HistV3(db), fromdb.TxsV3(db))
|
||||
|
||||
br := snapshotsync.NewBlockRetire(estimate.CompressSnapshot.Workers(), dirs.Tmp, blockReader, db, nil, nil, logger)
|
||||
br := snapshotsync.NewBlockRetire(estimate.CompressSnapshot.Workers(), dirs.Tmp, blockReader, blockWriter, db, nil, nil, logger)
|
||||
agg, err := libstate.NewAggregatorV3(ctx, dirs.SnapHistory, dirs.Tmp, ethconfig.HistoryV3AggregationStep, db, logger)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -788,7 +788,7 @@ func (r *BlockReader) FirstTxNumNotInSnapshots() uint64 {
|
||||
return lastTxnID
|
||||
}
|
||||
|
||||
func (r *BlockReader) IterateBodies(f func(blockNum, baseTxNum, txAmount uint64) error) error {
|
||||
func (r *BlockReader) IterateFrozenBodies(f func(blockNum, baseTxNum, txAmount uint64) error) error {
|
||||
view := r.sn.View()
|
||||
defer view.Close()
|
||||
|
||||
|
@ -36,6 +36,7 @@ import (
|
||||
types2 "github.com/ledgerwatch/erigon-lib/types"
|
||||
"github.com/ledgerwatch/erigon/cmd/hack/tool/fromdb"
|
||||
"github.com/ledgerwatch/erigon/core/rawdb"
|
||||
"github.com/ledgerwatch/erigon/core/rawdb/blockio"
|
||||
"github.com/ledgerwatch/erigon/core/types"
|
||||
"github.com/ledgerwatch/erigon/crypto"
|
||||
"github.com/ledgerwatch/erigon/crypto/cryptopool"
|
||||
@ -1008,10 +1009,11 @@ type BlockRetire struct {
|
||||
notifier DBEventNotifier
|
||||
logger log.Logger
|
||||
blockReader services.FullBlockReader
|
||||
blockWriter *blockio.BlockWriter
|
||||
}
|
||||
|
||||
func NewBlockRetire(workers int, tmpDir string, blockReader services.FullBlockReader, db kv.RoDB, downloader proto_downloader.DownloaderClient, notifier DBEventNotifier, logger log.Logger) *BlockRetire {
|
||||
return &BlockRetire{workers: workers, tmpDir: tmpDir, blockReader: blockReader, db: db, downloader: downloader, notifier: notifier, logger: logger}
|
||||
func NewBlockRetire(workers int, tmpDir string, blockReader services.FullBlockReader, blockWriter *blockio.BlockWriter, db kv.RoDB, downloader proto_downloader.DownloaderClient, notifier DBEventNotifier, logger log.Logger) *BlockRetire {
|
||||
return &BlockRetire{workers: workers, tmpDir: tmpDir, blockReader: blockReader, blockWriter: blockWriter, db: db, downloader: downloader, notifier: notifier, logger: logger}
|
||||
}
|
||||
func (br *BlockRetire) Snapshots() *RoSnapshots { return br.blockReader.Snapshots().(*RoSnapshots) }
|
||||
func (br *BlockRetire) NeedSaveFilesListInDB() bool {
|
||||
@ -1122,12 +1124,9 @@ func (br *BlockRetire) PruneAncientBlocks(tx kv.RwTx, limit int) error {
|
||||
return err
|
||||
}
|
||||
canDeleteTo := CanDeleteTo(currentProgress, blockSnapshots)
|
||||
if err := rawdb.DeleteAncientBlocks(tx, canDeleteTo, limit); err != nil {
|
||||
if err := br.blockWriter.PruneBlocks(context.Background(), tx, canDeleteTo, limit); err != nil {
|
||||
return nil
|
||||
}
|
||||
if err := rawdb.PruneTable(tx, kv.Senders, canDeleteTo, context.Background(), limit); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -370,7 +370,7 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK
|
||||
|
||||
var snapshotsDownloader proto_downloader.DownloaderClient
|
||||
|
||||
blockRetire := snapshotsync.NewBlockRetire(1, dirs.Tmp, blockReader, mock.DB, snapshotsDownloader, mock.Notifications.Events, logger)
|
||||
blockRetire := snapshotsync.NewBlockRetire(1, dirs.Tmp, blockReader, blockWriter, mock.DB, snapshotsDownloader, mock.Notifications.Events, logger)
|
||||
mock.Sync = stagedsync.New(
|
||||
stagedsync.DefaultStages(mock.Ctx,
|
||||
stagedsync.StageSnapshotsCfg(mock.DB, *mock.ChainConfig, dirs, blockRetire, snapshotsDownloader, blockReader, mock.Notifications.Events, mock.HistoryV3, mock.agg),
|
||||
|
@ -406,7 +406,7 @@ func NewDefaultStages(ctx context.Context,
|
||||
) []*stagedsync.Stage {
|
||||
dirs := cfg.Dirs
|
||||
blockWriter := blockio.NewBlockWriter(cfg.HistoryV3, cfg.TransactionsV3)
|
||||
blockRetire := snapshotsync.NewBlockRetire(1, dirs.Tmp, blockReader, db, snapDownloader, notifications.Events, logger)
|
||||
blockRetire := snapshotsync.NewBlockRetire(1, dirs.Tmp, blockReader, blockWriter, db, snapDownloader, notifications.Events, logger)
|
||||
|
||||
// During Import we don't want other services like header requests, body requests etc. to be running.
|
||||
// Hence we run it in the test mode.
|
||||
|
Loading…
Reference in New Issue
Block a user