mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2024-12-22 03:30:37 +00:00
retire: handle case when bor snaps are behind block snaps (#9061)
This commit is contained in:
parent
ec970ac6bc
commit
2b87d65285
@ -127,6 +127,7 @@ func (c *Compressor) Close() {
|
||||
}
|
||||
|
||||
func (c *Compressor) SetTrace(trace bool) { c.trace = trace }
|
||||
func (c *Compressor) Workers() int { return c.workers }
|
||||
|
||||
func (c *Compressor) Count() int { return int(c.wordsCount) }
|
||||
|
||||
|
@ -1369,21 +1369,35 @@ func (br *BlockRetire) RetireBlocksInBackground(ctx context.Context, forwardProg
|
||||
}
|
||||
|
||||
func (br *BlockRetire) RetireBlocks(ctx context.Context, forwardProgress uint64, lvl log.Lvl, seedNewSnapshots func(downloadRequest []services.DownloadRequest) error, onDeleteSnapshots func(l []string) error) error {
|
||||
blockFrom, blockTo, ok := CanRetire(forwardProgress, br.blockReader.FrozenBlocks())
|
||||
if ok {
|
||||
if err := br.retireBlocks(ctx, blockFrom, blockTo, lvl, seedNewSnapshots, onDeleteSnapshots); err != nil {
|
||||
//br.logger.Warn("[snapshots] retire blocks", "err", err, "fromBlock", blockFrom, "toBlock", blockTo)
|
||||
return err
|
||||
includeBor := br.chainConfig.Bor != nil
|
||||
if includeBor {
|
||||
// "bor snaps" can be behind "block snaps", it's ok: for example because of `kill -9` in the middle of merge
|
||||
for br.blockReader.FrozenBorBlocks() < br.blockReader.FrozenBlocks() {
|
||||
blockFrom, blockTo, ok := CanRetire(forwardProgress, br.blockReader.FrozenBorBlocks())
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
if err := br.retireBorBlocks(ctx, blockFrom, blockTo, lvl, seedNewSnapshots, onDeleteSnapshots); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
includeBor := br.chainConfig.Bor != nil
|
||||
if includeBor {
|
||||
blockFrom, blockTo, ok = CanRetire(forwardProgress, br.blockReader.FrozenBorBlocks())
|
||||
if ok {
|
||||
if err := br.retireBorBlocks(ctx, blockFrom, blockTo, lvl, seedNewSnapshots, onDeleteSnapshots); err != nil {
|
||||
return err
|
||||
//br.logger.Warn("[bor snapshots] retire blocks", "err", err, "fromBlock", blockFrom, "toBlock", blockTo)
|
||||
for {
|
||||
blockFrom, blockTo, ok := CanRetire(forwardProgress, br.blockReader.FrozenBlocks())
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
if err := br.retireBlocks(ctx, blockFrom, blockTo, lvl, seedNewSnapshots, onDeleteSnapshots); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if includeBor {
|
||||
blockFrom, blockTo, ok = CanRetire(forwardProgress, br.blockReader.FrozenBorBlocks())
|
||||
if ok {
|
||||
if err := br.retireBorBlocks(ctx, blockFrom, blockTo, lvl, seedNewSnapshots, onDeleteSnapshots); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1558,8 +1572,8 @@ func dumpBlocksRange(ctx context.Context, blockFrom, blockTo uint64, tmpDir, sna
|
||||
}
|
||||
snapDir, fileName := filepath.Split(f.Path)
|
||||
ext := filepath.Ext(fileName)
|
||||
logger.Log(lvl, "[snapshots] Compression", "ratio", sn.Ratio.String(), "file", fileName[:len(fileName)-len(ext)])
|
||||
|
||||
logger.Log(lvl, "[snapshots] Compression start", "file", fileName[:len(fileName)-len(ext)], "workers", sn.Workers())
|
||||
t := time.Now()
|
||||
_, expectedCount, err = txsAmountBasedOnBodiesSnapshots(snapDir, blockFrom, blockTo)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -1570,6 +1584,7 @@ func dumpBlocksRange(ctx context.Context, blockFrom, blockTo uint64, tmpDir, sna
|
||||
if err := sn.Compress(); err != nil {
|
||||
return fmt.Errorf("compress: %w", err)
|
||||
}
|
||||
logger.Log(lvl, "[snapshots] Compression", "took", time.Since(t), "ratio", sn.Ratio.String(), "file", fileName[:len(fileName)-len(ext)])
|
||||
|
||||
p := &background.Progress{}
|
||||
if err := buildIdx(ctx, f, &chainConfig, tmpDir, p, lvl, logger); err != nil {
|
||||
@ -2384,9 +2399,6 @@ func (m *Merger) Merge(ctx context.Context, snapshots *RoSnapshots, mergeRanges
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
time.Sleep(1 * time.Second) // i working on blocking API - to ensure client does not use old snapsthos - and then delete them
|
||||
for _, t := range snaptype.BlockSnapshotTypes {
|
||||
m.removeOldFiles(toMerge[t], snapDir)
|
||||
}
|
||||
}
|
||||
@ -2446,6 +2458,7 @@ func (m *Merger) merge(ctx context.Context, toMerge []string, targetFile string,
|
||||
func (m *Merger) removeOldFiles(toDel []string, snapDir string) {
|
||||
for _, f := range toDel {
|
||||
_ = os.Remove(f)
|
||||
_ = os.Remove(f + ".torrent")
|
||||
ext := filepath.Ext(f)
|
||||
withoutExt := f[:len(f)-len(ext)]
|
||||
_ = os.Remove(withoutExt + ".idx")
|
||||
|
Loading…
Reference in New Issue
Block a user