mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2025-01-18 08:38:46 +00:00
f99f326363
This is a fix for at least one cause of this isssue: https://github.com/ledgerwatch/erigon/issues/8212. It happens after the end of the snapshot load, because the snapshot processing which was introduced a couple of month ago does not deal with validation of the headers at the start of the start of the chain. I have also added a fix to the persistence so that the last snapshot is recorded so that subsequent runs are not forced to process the whole snapshot run from start. The relationship between this an memory usage is that the fact that headers are not processed leads to a queue of pending headers with size of around 5GB. I have not changed any header parameters - so likely a prolonged stop to header processing will result in a similar level of memory growth.
129 lines
3.4 KiB
Go
129 lines
3.4 KiB
Go
package main
|
|
|
|
import (
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"time"
|
|
|
|
"github.com/ledgerwatch/erigon-lib/common/background"
|
|
"github.com/ledgerwatch/erigon-lib/common/datadir"
|
|
"github.com/ledgerwatch/erigon-lib/downloader/snaptype"
|
|
"github.com/ledgerwatch/erigon-lib/kv/mdbx"
|
|
"github.com/ledgerwatch/erigon/cmd/hack/tool/fromdb"
|
|
"github.com/ledgerwatch/erigon/turbo/debug"
|
|
"github.com/ledgerwatch/erigon/turbo/snapshotsync/freezeblocks"
|
|
"github.com/ledgerwatch/log/v3"
|
|
"github.com/urfave/cli/v2"
|
|
"golang.org/x/sync/errgroup"
|
|
)
|
|
|
|
// Build snapshot indexes for given snapshot files.
|
|
// Sample usage:
|
|
// build_idx --datadir erigon-1 --snapshot_path /snapshots/v1-000000-000500-headers.seg,/snapshots/v1-000500-001000-headers.seg
|
|
|
|
func main() {
|
|
|
|
app := &cli.App{
|
|
Flags: []cli.Flag{
|
|
&cli.StringFlag{
|
|
Name: "datadir",
|
|
Value: "./dev",
|
|
Usage: "node data directory",
|
|
},
|
|
&cli.StringSliceFlag{
|
|
Name: "snapshot_path",
|
|
Usage: "pathname of the snapshot file",
|
|
},
|
|
},
|
|
Action: func(cCtx *cli.Context) error {
|
|
return buildIndex(cCtx, cCtx.String("datadir"), cCtx.StringSlice("snapshot_path"))
|
|
},
|
|
}
|
|
|
|
if err := app.Run(os.Args); err != nil {
|
|
log.Crit(err.Error())
|
|
}
|
|
}
|
|
|
|
func FindIf(segments []snaptype.FileInfo, predicate func(snaptype.FileInfo) bool) (snaptype.FileInfo, bool) {
|
|
for _, segment := range segments {
|
|
if predicate(segment) {
|
|
return segment, true
|
|
}
|
|
}
|
|
return snaptype.FileInfo{}, false // Return zero value and false if not found
|
|
}
|
|
|
|
func buildIndex(cliCtx *cli.Context, dataDir string, snapshotPaths []string) error {
|
|
logger, _, err := debug.Setup(cliCtx, true /* rootLogger */)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
logLevel := log.LvlInfo
|
|
|
|
ps := background.NewProgressSet()
|
|
|
|
workers := 4
|
|
g, ctx := errgroup.WithContext(cliCtx.Context)
|
|
g.SetLimit(workers)
|
|
|
|
dirs := datadir.New(dataDir)
|
|
|
|
chainDB := mdbx.NewMDBX(logger).Path(dirs.Chaindata).MustOpen()
|
|
defer chainDB.Close()
|
|
|
|
chainConfig := fromdb.ChainConfig(chainDB)
|
|
|
|
segments, _, err := freezeblocks.Segments(dirs.Snap)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
fmt.Printf("Building indexes:\n- dataDir: %s\n- snapshots: %s\n", dataDir, snapshotPaths)
|
|
start := time.Now()
|
|
|
|
for _, snapshotPath := range snapshotPaths {
|
|
segment, found := FindIf(segments, func(s snaptype.FileInfo) bool {
|
|
return s.Path == snapshotPath
|
|
})
|
|
if !found {
|
|
return fmt.Errorf("segment %s not found", snapshotPath)
|
|
}
|
|
|
|
switch segment.T {
|
|
case snaptype.Headers:
|
|
g.Go(func() error {
|
|
jobProgress := &background.Progress{}
|
|
ps.Add(jobProgress)
|
|
defer ps.Delete(jobProgress)
|
|
return freezeblocks.HeadersIdx(ctx, chainConfig, segment.Path, segment.From, dirs.Tmp, jobProgress, logLevel, logger)
|
|
})
|
|
case snaptype.Bodies:
|
|
g.Go(func() error {
|
|
jobProgress := &background.Progress{}
|
|
ps.Add(jobProgress)
|
|
defer ps.Delete(jobProgress)
|
|
return freezeblocks.BodiesIdx(ctx, segment.Path, segment.From, dirs.Tmp, jobProgress, logLevel, logger)
|
|
})
|
|
case snaptype.Transactions:
|
|
g.Go(func() error {
|
|
jobProgress := &background.Progress{}
|
|
ps.Add(jobProgress)
|
|
defer ps.Delete(jobProgress)
|
|
dir, _ := filepath.Split(segment.Path)
|
|
return freezeblocks.TransactionsIdx(ctx, chainConfig, segment.From, segment.To, dir, dirs.Tmp, jobProgress, logLevel, logger)
|
|
})
|
|
}
|
|
}
|
|
|
|
if err := g.Wait(); err != nil {
|
|
return err
|
|
}
|
|
|
|
elapsed := time.Since(start)
|
|
fmt.Printf("Indexes for %d snapshots built in %d ms\n", len(snapshotPaths), elapsed.Milliseconds())
|
|
|
|
return nil
|
|
}
|