RetireBlocks: less arguments (#4785)

* save

* save
This commit is contained in:
Alex Sharov 2022-07-22 13:44:42 +07:00 committed by GitHub
parent 6060b87840
commit 66758c7960
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 37 additions and 60 deletions

View File

@ -130,8 +130,7 @@ func History22(genesis *core.Genesis, logger log.Logger) error {
prevTime := time.Now()
var blockReader services.FullBlockReader
var allSnapshots *snapshotsync.RoSnapshots
allSnapshots = snapshotsync.NewRoSnapshots(ethconfig.NewSnapCfg(true, false, true), path.Join(datadir, "snapshots"))
allSnapshots := snapshotsync.NewRoSnapshots(ethconfig.NewSnapCfg(true, false, true), path.Join(datadir, "snapshots"))
defer allSnapshots.Close()
if err := allSnapshots.Reopen(); err != nil {
return fmt.Errorf("reopen snapshot segments: %w", err)

View File

@ -376,8 +376,7 @@ func Recon(genesis *core.Genesis, logger log.Logger) error {
return err
}
var blockReader services.FullBlockReader
var allSnapshots *snapshotsync.RoSnapshots
allSnapshots = snapshotsync.NewRoSnapshots(ethconfig.NewSnapCfg(true, false, true), path.Join(datadir, "snapshots"))
allSnapshots := snapshotsync.NewRoSnapshots(ethconfig.NewSnapCfg(true, false, true), path.Join(datadir, "snapshots"))
defer allSnapshots.Close()
if err := allSnapshots.Reopen(); err != nil {
return fmt.Errorf("reopen snapshot segments: %w", err)

View File

@ -156,7 +156,7 @@ func (rs *ReconState) RollbackTx(txTask TxTask, dependency uint64) {
if rs.doneBitmap.Contains(dependency) {
heap.Push(&rs.queue, txTask)
} else {
tt, _ := rs.triggers[dependency]
tt := rs.triggers[dependency]
tt = append(tt, txTask)
rs.triggers[dependency] = tt
}

View File

@ -118,7 +118,7 @@ func (pl PathToLeaf) isRightmost() bool {
}
func (pl PathToLeaf) isEmpty() bool {
return pl == nil || len(pl) == 0
return len(pl) == 0
}
func (pl PathToLeaf) dropRoot() PathToLeaf {

View File

@ -1311,7 +1311,7 @@ func WaitForDownloader(ctx context.Context, cfg HeadersCfg, tx kv.RwTx) error {
return err
}
dbEmpty := len(snInDB) == 0
var missingSnapshots []snapshotsync.MergeRange
var missingSnapshots []snapshotsync.Range
if !dbEmpty {
_, missingSnapshots, err = snapshotsync.Segments(cfg.snapshots.Dir())
if err != nil {

View File

@ -9,7 +9,6 @@ import (
"sync"
"time"
"github.com/holiman/uint256"
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/common/cmp"
"github.com/ledgerwatch/erigon-lib/common/length"
@ -430,8 +429,7 @@ func retireBlocksInSingleBackgroundThread(s *PruneState, cfg SendersCfg, ctx con
}
}
chainID, _ := uint256.FromBig(cfg.chainConfig.ChainID)
cfg.blockRetire.RetireBlocksInBackground(ctx, s.ForwardProgress, *chainID, log.LvlInfo)
cfg.blockRetire.RetireBlocksInBackground(ctx, s.ForwardProgress, log.LvlInfo)
return nil
}

View File

@ -383,10 +383,7 @@ func (h *handler) handleCallMsg(ctx *callProc, msg *jsonrpcMessage, stream *json
func (h *handler) isMethodAllowedByGranularControl(method string) bool {
_, isForbidden := h.forbiddenList[method]
if len(h.allowList) == 0 {
if isForbidden {
return false
}
return true
return !isForbidden
}
_, ok := h.allowList[method]

View File

@ -244,8 +244,6 @@ func doRetireCommand(cliCtx *cli.Context) error {
defer chainDB.Close()
cfg := ethconfig.NewSnapCfg(true, true, true)
chainConfig := tool.ChainConfigFromDB(chainDB)
chainID, _ := uint256.FromBig(chainConfig.ChainID)
snapshots := snapshotsync.NewRoSnapshots(cfg, dirs.Snap)
if err := snapshots.Reopen(); err != nil {
return err
@ -256,7 +254,7 @@ func doRetireCommand(cliCtx *cli.Context) error {
log.Info("Params", "from", from, "to", to, "every", every)
for i := from; i < to; i += every {
if err := br.RetireBlocks(ctx, i, i+every, *chainID, log.LvlInfo); err != nil {
if err := br.RetireBlocks(ctx, i, i+every, log.LvlInfo); err != nil {
panic(err)
}
if err := chainDB.Update(ctx, func(tx kv.RwTx) error {

View File

@ -42,7 +42,7 @@ import (
)
type DownloadRequest struct {
ranges *MergeRange
ranges *Range
path string
torrentHash string
}
@ -50,20 +50,20 @@ type DownloadRequest struct {
type HeaderSegment struct {
seg *compress.Decompressor // value: first_byte_of_header_hash + header_rlp
idxHeaderHash *recsplit.Index // header_hash -> headers_segment_offset
ranges MergeRange
ranges Range
}
type BodySegment struct {
seg *compress.Decompressor // value: rlp(types.BodyForStorage)
idxBodyNumber *recsplit.Index // block_num_u64 -> bodies_segment_offset
ranges MergeRange
ranges Range
}
type TxnSegment struct {
Seg *compress.Decompressor // value: first_byte_of_transaction_hash + sender_address + transaction_rlp
IdxTxnHash *recsplit.Index // transaction_hash -> transactions_segment_offset
IdxTxnHash2BlockNum *recsplit.Index // transaction_hash -> block_number
ranges MergeRange
ranges Range
}
func (sn *HeaderSegment) close() {
@ -382,22 +382,6 @@ func (s *RoSnapshots) ReopenSomeIndices(types ...snap.Type) (err error) {
return nil
}
func (s *RoSnapshots) AsyncOpenAll(ctx context.Context) {
go func() {
for !s.segmentsReady.Load() || !s.indicesReady.Load() {
select {
case <-ctx.Done():
return
default:
}
if err := s.Reopen(); err != nil && !errors.Is(err, os.ErrNotExist) {
log.Error("AsyncOpenAll", "err", err)
}
time.Sleep(15 * time.Second)
}
}()
}
// OptimisticReopen - optimistically open snapshots (ignoring error), useful at App startup because:
// - user must be able: delete any snapshot file and Erigon will self-heal by re-downloading
// - RPC return Nil for historical blocks if snapshots are not open
@ -422,7 +406,7 @@ func (s *RoSnapshots) Reopen() error {
s.Txs.segments = s.Txs.segments[:0]
for _, f := range files {
{
seg := &BodySegment{ranges: MergeRange{f.From, f.To}}
seg := &BodySegment{ranges: Range{f.From, f.To}}
fileName := snap.SegmentFileName(f.From, f.To, snap.Bodies)
seg.seg, err = compress.NewDecompressor(path.Join(s.dir, fileName))
if err != nil {
@ -434,7 +418,7 @@ func (s *RoSnapshots) Reopen() error {
s.Bodies.segments = append(s.Bodies.segments, seg)
}
{
seg := &HeaderSegment{ranges: MergeRange{f.From, f.To}}
seg := &HeaderSegment{ranges: Range{f.From, f.To}}
fileName := snap.SegmentFileName(f.From, f.To, snap.Headers)
seg.seg, err = compress.NewDecompressor(path.Join(s.dir, fileName))
if err != nil {
@ -446,7 +430,7 @@ func (s *RoSnapshots) Reopen() error {
s.Headers.segments = append(s.Headers.segments, seg)
}
{
seg := &TxnSegment{ranges: MergeRange{f.From, f.To}}
seg := &TxnSegment{ranges: Range{f.From, f.To}}
fileName := snap.SegmentFileName(f.From, f.To, snap.Transactions)
seg.Seg, err = compress.NewDecompressor(path.Join(s.dir, fileName))
if err != nil {
@ -517,7 +501,7 @@ func (s *RoSnapshots) ReopenSegments() error {
var segmentsMaxSet bool
for _, f := range files {
{
seg := &BodySegment{ranges: MergeRange{f.From, f.To}}
seg := &BodySegment{ranges: Range{f.From, f.To}}
fileName := snap.SegmentFileName(f.From, f.To, snap.Bodies)
seg.seg, err = compress.NewDecompressor(path.Join(s.dir, fileName))
if err != nil {
@ -529,7 +513,7 @@ func (s *RoSnapshots) ReopenSegments() error {
s.Bodies.segments = append(s.Bodies.segments, seg)
}
{
seg := &HeaderSegment{ranges: MergeRange{f.From, f.To}}
seg := &HeaderSegment{ranges: Range{f.From, f.To}}
fileName := snap.SegmentFileName(f.From, f.To, snap.Headers)
seg.seg, err = compress.NewDecompressor(path.Join(s.dir, fileName))
if err != nil {
@ -541,7 +525,7 @@ func (s *RoSnapshots) ReopenSegments() error {
s.Headers.segments = append(s.Headers.segments, seg)
}
{
seg := &TxnSegment{ranges: MergeRange{f.From, f.To}}
seg := &TxnSegment{ranges: Range{f.From, f.To}}
fileName := snap.SegmentFileName(f.From, f.To, snap.Transactions)
seg.Seg, err = compress.NewDecompressor(path.Join(s.dir, fileName))
if err != nil {
@ -793,14 +777,14 @@ func BuildIndices(ctx context.Context, s *RoSnapshots, chainID uint256.Int, tmpD
return nil
}
func noGaps(in []snap.FileInfo) (out []snap.FileInfo, missingSnapshots []MergeRange) {
func noGaps(in []snap.FileInfo) (out []snap.FileInfo, missingSnapshots []Range) {
var prevTo uint64
for _, f := range in {
if f.To <= prevTo {
continue
}
if f.From != prevTo { // no gaps
missingSnapshots = append(missingSnapshots, MergeRange{prevTo, f.From})
missingSnapshots = append(missingSnapshots, Range{prevTo, f.From})
continue
}
prevTo = f.To
@ -854,7 +838,7 @@ func noOverlaps(in []snap.FileInfo) (res []snap.FileInfo) {
return res
}
func Segments(dir string) (res []snap.FileInfo, missingSnapshots []MergeRange, err error) {
func Segments(dir string) (res []snap.FileInfo, missingSnapshots []Range, err error) {
list, err := snap.Segments(dir)
if err != nil {
return nil, missingSnapshots, err
@ -944,10 +928,12 @@ func CanDeleteTo(curBlockNum uint64, snapshots *RoSnapshots) (blockTo uint64) {
hardLimit := (curBlockNum/1_000)*1_000 - params.FullImmutabilityThreshold
return cmp.Min(hardLimit, snapshots.BlocksAvailable()+1)
}
func (br *BlockRetire) RetireBlocks(ctx context.Context, blockFrom, blockTo uint64, chainID uint256.Int, lvl log.Lvl) error {
return retireBlocks(ctx, blockFrom, blockTo, chainID, br.tmpDir, br.snapshots, br.db, br.workers, br.downloader, lvl, br.notifier)
func (br *BlockRetire) RetireBlocks(ctx context.Context, blockFrom, blockTo uint64, lvl log.Lvl) error {
chainConfig := tool.ChainConfigFromDB(br.db)
chainID, _ := uint256.FromBig(chainConfig.ChainID)
return retireBlocks(ctx, blockFrom, blockTo, *chainID, br.tmpDir, br.snapshots, br.db, br.workers, br.downloader, lvl, br.notifier)
}
func (br *BlockRetire) RetireBlocksInBackground(ctx context.Context, forwardProgress uint64, chainID uint256.Int, lvl log.Lvl) {
func (br *BlockRetire) RetireBlocksInBackground(ctx context.Context, forwardProgress uint64, lvl log.Lvl) {
if br.working.Load() {
// go-routine is still working
return
@ -968,7 +954,7 @@ func (br *BlockRetire) RetireBlocksInBackground(ctx context.Context, forwardProg
return
}
err := br.RetireBlocks(ctx, blockFrom, blockTo, chainID, lvl)
err := br.RetireBlocks(ctx, blockFrom, blockTo, lvl)
br.result = &BlockRetireResult{
BlockFrom: blockFrom,
BlockTo: blockTo,
@ -988,7 +974,7 @@ func retireBlocks(ctx context.Context, blockFrom, blockTo uint64, chainID uint25
return fmt.Errorf("DumpBlocks: %w", err)
}
if err := snapshots.Reopen(); err != nil {
return fmt.Errorf("Reopen: %w", err)
return fmt.Errorf("reopen: %w", err)
}
idxWorkers := workers
if idxWorkers > 4 {
@ -998,7 +984,7 @@ func retireBlocks(ctx context.Context, blockFrom, blockTo uint64, chainID uint25
return err
}
if err := snapshots.Reopen(); err != nil {
return fmt.Errorf("Reopen: %w", err)
return fmt.Errorf("reopen: %w", err)
}
merger := NewMerger(tmpDir, workers, lvl, chainID, notifier)
ranges := merger.FindMergeRanges(snapshots)
@ -1010,7 +996,7 @@ func retireBlocks(ctx context.Context, blockFrom, blockTo uint64, chainID uint25
return err
}
if err := snapshots.Reopen(); err != nil {
return fmt.Errorf("Reopen: %w", err)
return fmt.Errorf("reopen: %w", err)
}
var downloadRequest []DownloadRequest
@ -1723,13 +1709,13 @@ func NewMerger(tmpDir string, workers int, lvl log.Lvl, chainID uint256.Int, not
return &Merger{tmpDir: tmpDir, workers: workers, lvl: lvl, chainID: chainID, notifier: notifier}
}
type MergeRange struct {
type Range struct {
from, to uint64
}
func (r MergeRange) String() string { return fmt.Sprintf("%dk-%dk", r.from/1000, r.to/1000) }
func (r Range) String() string { return fmt.Sprintf("%dk-%dk", r.from/1000, r.to/1000) }
func (*Merger) FindMergeRanges(snapshots *RoSnapshots) (res []MergeRange) {
func (*Merger) FindMergeRanges(snapshots *RoSnapshots) (res []Range) {
for i := len(snapshots.Headers.segments) - 1; i > 0; i-- {
sn := snapshots.Headers.segments[i]
if sn.ranges.to-sn.ranges.from >= snap.DEFAULT_SEGMENT_SIZE { // is complete .seg
@ -1744,14 +1730,14 @@ func (*Merger) FindMergeRanges(snapshots *RoSnapshots) (res []MergeRange) {
break
}
aggFrom := sn.ranges.to - span
res = append(res, MergeRange{from: aggFrom, to: sn.ranges.to})
res = append(res, Range{from: aggFrom, to: sn.ranges.to})
for snapshots.Headers.segments[i].ranges.from > aggFrom {
i--
}
break
}
}
slices.SortFunc(res, func(i, j MergeRange) bool { return i.from < j.from })
slices.SortFunc(res, func(i, j Range) bool { return i.from < j.from })
return res
}
func (m *Merger) filesByRange(snapshots *RoSnapshots, from, to uint64) (toMergeHeaders, toMergeBodies, toMergeTxs []string, err error) {
@ -1779,7 +1765,7 @@ func (m *Merger) filesByRange(snapshots *RoSnapshots, from, to uint64) (toMergeH
}
// Merge does merge segments in given ranges
func (m *Merger) Merge(ctx context.Context, snapshots *RoSnapshots, mergeRanges []MergeRange, snapDir string, doIndex bool) error {
func (m *Merger) Merge(ctx context.Context, snapshots *RoSnapshots, mergeRanges []Range, snapDir string, doIndex bool) error {
if len(mergeRanges) == 0 {
return nil
}
@ -1943,7 +1929,7 @@ func assertSegment(segmentFile string) {
}
}
func NewDownloadRequest(ranges *MergeRange, path string, torrentHash string) DownloadRequest {
func NewDownloadRequest(ranges *Range, path string, torrentHash string) DownloadRequest {
return DownloadRequest{
ranges: ranges,
path: path,