erigon-pulse/turbo/snapshotsync/freezeblocks/bor_snapshots.go

1232 lines
34 KiB
Go

package freezeblocks
import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"os"
"path"
"path/filepath"
"reflect"
"runtime"
"sync"
"sync/atomic"
"time"
"github.com/ledgerwatch/erigon-lib/chain"
"github.com/ledgerwatch/erigon-lib/chain/snapcfg"
common2 "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/common/background"
"github.com/ledgerwatch/erigon-lib/common/cmp"
"github.com/ledgerwatch/erigon-lib/common/dbg"
"github.com/ledgerwatch/erigon-lib/common/hexutility"
"github.com/ledgerwatch/erigon-lib/common/length"
"github.com/ledgerwatch/erigon-lib/compress"
"github.com/ledgerwatch/erigon-lib/downloader/snaptype"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/recsplit"
"github.com/ledgerwatch/erigon/cmd/hack/tool/fromdb"
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/eth/ethconfig"
"github.com/ledgerwatch/erigon/turbo/services"
"github.com/ledgerwatch/log/v3"
"golang.org/x/exp/slices"
)
const (
spanLength = 6400 // Number of blocks in a span
zerothSpanEnd = 255 // End block of 0th span
)
type BorEventSegment struct {
seg *compress.Decompressor // value: event_rlp
IdxBorTxnHash *recsplit.Index // bor_transaction_hash -> bor_event_segment_offset
ranges Range
}
func (sn *BorEventSegment) closeIdx() {
if sn.IdxBorTxnHash != nil {
sn.IdxBorTxnHash.Close()
sn.IdxBorTxnHash = nil
}
}
func (sn *BorEventSegment) closeSeg() {
if sn.seg != nil {
sn.seg.Close()
sn.seg = nil
}
}
func (sn *BorEventSegment) close() {
sn.closeSeg()
sn.closeIdx()
}
func (sn *BorEventSegment) reopenSeg(dir string) (err error) {
sn.closeSeg()
fileName := snaptype.SegmentFileName(sn.ranges.from, sn.ranges.to, snaptype.BorEvents)
sn.seg, err = compress.NewDecompressor(path.Join(dir, fileName))
if err != nil {
return fmt.Errorf("%w, fileName: %s", err, fileName)
}
return nil
}
func (sn *BorEventSegment) reopenIdx(dir string) (err error) {
sn.closeIdx()
if sn.seg == nil {
return nil
}
fileName := snaptype.IdxFileName(sn.ranges.from, sn.ranges.to, snaptype.BorEvents.String())
sn.IdxBorTxnHash, err = recsplit.OpenIndex(path.Join(dir, fileName))
if err != nil {
return fmt.Errorf("%w, fileName: %s", err, fileName)
}
return nil
}
func (sn *BorEventSegment) reopenIdxIfNeed(dir string, optimistic bool) (err error) {
if sn.IdxBorTxnHash != nil {
return nil
}
err = sn.reopenIdx(dir)
if err != nil {
if !errors.Is(err, os.ErrNotExist) {
if optimistic {
log.Warn("[snapshots] open index", "err", err)
} else {
return err
}
}
}
return nil
}
type borEventSegments struct {
lock sync.RWMutex
segments []*BorEventSegment
}
type BorSpanSegment struct {
seg *compress.Decompressor // value: span_json
idx *recsplit.Index // span_id -> offset
ranges Range
}
func (sn *BorSpanSegment) closeIdx() {
if sn.idx != nil {
sn.idx.Close()
sn.idx = nil
}
}
func (sn *BorSpanSegment) closeSeg() {
if sn.seg != nil {
sn.seg.Close()
sn.seg = nil
}
}
func (sn *BorSpanSegment) close() {
sn.closeSeg()
sn.closeIdx()
}
func (sn *BorSpanSegment) reopenSeg(dir string) (err error) {
sn.closeSeg()
fileName := snaptype.SegmentFileName(sn.ranges.from, sn.ranges.to, snaptype.BorSpans)
sn.seg, err = compress.NewDecompressor(path.Join(dir, fileName))
if err != nil {
return fmt.Errorf("%w, fileName: %s", err, fileName)
}
return nil
}
func (sn *BorSpanSegment) reopenIdx(dir string) (err error) {
sn.closeIdx()
if sn.seg == nil {
return nil
}
fileName := snaptype.IdxFileName(sn.ranges.from, sn.ranges.to, snaptype.BorSpans.String())
sn.idx, err = recsplit.OpenIndex(path.Join(dir, fileName))
if err != nil {
return fmt.Errorf("%w, fileName: %s", err, fileName)
}
return nil
}
func (sn *BorSpanSegment) reopenIdxIfNeed(dir string, optimistic bool) (err error) {
if sn.idx != nil {
return nil
}
err = sn.reopenIdx(dir)
if err != nil {
if !errors.Is(err, os.ErrNotExist) {
if optimistic {
log.Warn("[snapshots] open index", "err", err)
} else {
return err
}
}
}
return nil
}
type borSpanSegments struct {
lock sync.RWMutex
segments []*BorSpanSegment
}
func (br *BlockRetire) RetireBorBlocks(ctx context.Context, blockFrom, blockTo uint64, lvl log.Lvl, seedNewSnapshots func(downloadRequest []services.DownloadRequest) error) error {
chainConfig := fromdb.ChainConfig(br.db)
notifier, logger, blockReader, tmpDir, db, workers := br.notifier, br.logger, br.blockReader, br.tmpDir, br.db, br.workers
logger.Log(lvl, "[bor snapshots] Retire Bor Blocks", "range", fmt.Sprintf("%dk-%dk", blockFrom/1000, blockTo/1000))
snapshots := br.borSnapshots()
firstTxNum := blockReader.(*BlockReader).FirstTxNumNotInSnapshots()
if err := DumpBorBlocks(ctx, chainConfig, blockFrom, blockTo, snaptype.Erigon2MergeLimit, tmpDir, snapshots.Dir(), firstTxNum, db, workers, lvl, logger, blockReader); err != nil {
return fmt.Errorf("DumpBorBlocks: %w", err)
}
if err := snapshots.ReopenFolder(); err != nil {
return fmt.Errorf("reopen: %w", err)
}
snapshots.LogStat()
if notifier != nil && !reflect.ValueOf(notifier).IsNil() { // notify about new snapshots of any size
notifier.OnNewSnapshot()
}
merger := NewBorMerger(tmpDir, workers, lvl, db, chainConfig, notifier, logger)
rangesToMerge := merger.FindMergeRanges(snapshots.Ranges(), snapshots.BlocksAvailable())
if len(rangesToMerge) == 0 {
return nil
}
onMerge := func(r Range) error {
if notifier != nil && !reflect.ValueOf(notifier).IsNil() { // notify about new snapshots of any size
notifier.OnNewSnapshot()
}
if seedNewSnapshots != nil {
downloadRequest := []services.DownloadRequest{
services.NewDownloadRequest(&services.Range{From: r.from, To: r.to}, "", "", true /* Bor */),
}
if err := seedNewSnapshots(downloadRequest); err != nil {
return err
}
}
return nil
}
onDelete := func(files []string) error {
//TODO: add Downloader API to delete files
return nil
}
err := merger.Merge(ctx, snapshots, rangesToMerge, snapshots.Dir(), true /* doIndex */, onMerge, onDelete)
if err != nil {
return err
}
return nil
}
func DumpBorBlocks(ctx context.Context, chainConfig *chain.Config, blockFrom, blockTo, blocksPerFile uint64, tmpDir, snapDir string, firstTxNum uint64, chainDB kv.RoDB, workers int, lvl log.Lvl, logger log.Logger, blockReader services.FullBlockReader) error {
if blocksPerFile == 0 {
return nil
}
for i := blockFrom; i < blockTo; i = chooseSegmentEnd(i, blockTo, blocksPerFile) {
if err := dumpBorBlocksRange(ctx, i, chooseSegmentEnd(i, blockTo, blocksPerFile), tmpDir, snapDir, firstTxNum, chainDB, *chainConfig, workers, lvl, logger, blockReader); err != nil {
return err
}
}
return nil
}
func dumpBorBlocksRange(ctx context.Context, blockFrom, blockTo uint64, tmpDir, snapDir string, firstTxNum uint64, chainDB kv.RoDB, chainConfig chain.Config, workers int, lvl log.Lvl, logger log.Logger, blockReader services.FullBlockReader) error {
logEvery := time.NewTicker(20 * time.Second)
defer logEvery.Stop()
{
segName := snaptype.SegmentFileName(blockFrom, blockTo, snaptype.BorEvents)
f, _ := snaptype.ParseFileName(snapDir, segName)
sn, err := compress.NewCompressor(ctx, "Snapshot BorEvents", f.Path, tmpDir, compress.MinPatternScore, workers, log.LvlTrace, logger)
if err != nil {
return err
}
defer sn.Close()
if err := DumpBorEvents(ctx, chainDB, blockFrom, blockTo, workers, lvl, logger, func(v []byte) error {
return sn.AddWord(v)
}); err != nil {
return fmt.Errorf("DumpBorEvents: %w", err)
}
if err := sn.Compress(); err != nil {
return fmt.Errorf("compress: %w", err)
}
p := &background.Progress{}
if err := buildIdx(ctx, f, &chainConfig, tmpDir, p, lvl, logger); err != nil {
return err
}
}
{
segName := snaptype.SegmentFileName(blockFrom, blockTo, snaptype.BorSpans)
f, _ := snaptype.ParseFileName(snapDir, segName)
sn, err := compress.NewCompressor(ctx, "Snapshot BorSpans", f.Path, tmpDir, compress.MinPatternScore, workers, log.LvlTrace, logger)
if err != nil {
return err
}
defer sn.Close()
if err := DumpBorSpans(ctx, chainDB, blockFrom, blockTo, workers, lvl, logger, func(v []byte) error {
return sn.AddWord(v)
}); err != nil {
return fmt.Errorf("DumpBorSpans: %w", err)
}
if err := sn.Compress(); err != nil {
return fmt.Errorf("compress: %w", err)
}
p := &background.Progress{}
if err := buildIdx(ctx, f, &chainConfig, tmpDir, p, lvl, logger); err != nil {
return err
}
}
return nil
}
func dumpBorEventRange(startEventId, endEventId uint64, tx kv.Tx, blockNum uint64, blockHash common2.Hash, collect func([]byte) error) error {
var blockNumBuf [8]byte
var eventIdBuf [8]byte
txnHash := types.ComputeBorTxHash(blockNum, blockHash)
binary.BigEndian.PutUint64(blockNumBuf[:], blockNum)
for eventId := startEventId; eventId < endEventId; eventId++ {
binary.BigEndian.PutUint64(eventIdBuf[:], eventId)
event, err := tx.GetOne(kv.BorEvents, eventIdBuf[:])
if err != nil {
return err
}
snapshotRecord := make([]byte, len(event)+length.Hash+length.BlockNum+8)
copy(snapshotRecord, txnHash[:])
copy(snapshotRecord[length.Hash:], blockNumBuf[:])
binary.BigEndian.PutUint64(snapshotRecord[length.Hash+length.BlockNum:], eventId)
copy(snapshotRecord[length.Hash+length.BlockNum+8:], event)
if err := collect(snapshotRecord); err != nil {
return err
}
}
return nil
}
// DumpBorEvents - [from, to)
func DumpBorEvents(ctx context.Context, db kv.RoDB, blockFrom, blockTo uint64, workers int, lvl log.Lvl, logger log.Logger, collect func([]byte) error) error {
logEvery := time.NewTicker(20 * time.Second)
defer logEvery.Stop()
from := hexutility.EncodeTs(blockFrom)
var first bool = true
var prevBlockNum uint64
var startEventId uint64
var lastEventId uint64
if err := kv.BigChunks(db, kv.BorEventNums, from, func(tx kv.Tx, blockNumBytes, eventIdBytes []byte) (bool, error) {
blockNum := binary.BigEndian.Uint64(blockNumBytes)
if first {
startEventId = binary.BigEndian.Uint64(eventIdBytes)
first = false
prevBlockNum = blockNum
} else if blockNum != prevBlockNum {
endEventId := binary.BigEndian.Uint64(eventIdBytes)
blockHash, e := rawdb.ReadCanonicalHash(tx, prevBlockNum)
if e != nil {
return false, e
}
if e := dumpBorEventRange(startEventId, endEventId, tx, prevBlockNum, blockHash, collect); e != nil {
return false, e
}
startEventId = endEventId
prevBlockNum = blockNum
}
if blockNum >= blockTo {
return false, nil
}
lastEventId = binary.BigEndian.Uint64(eventIdBytes)
select {
case <-ctx.Done():
return false, ctx.Err()
case <-logEvery.C:
var m runtime.MemStats
if lvl >= log.LvlInfo {
dbg.ReadMemStats(&m)
}
logger.Log(lvl, "[bor snapshots] Dumping bor events", "block num", blockNum,
"alloc", common2.ByteCount(m.Alloc), "sys", common2.ByteCount(m.Sys),
)
default:
}
return true, nil
}); err != nil {
return err
}
if lastEventId > startEventId {
if err := db.View(ctx, func(tx kv.Tx) error {
blockHash, e := rawdb.ReadCanonicalHash(tx, prevBlockNum)
if e != nil {
return e
}
return dumpBorEventRange(startEventId, lastEventId+1, tx, prevBlockNum, blockHash, collect)
}); err != nil {
return err
}
}
return nil
}
// DumpBorSpans - [from, to)
func DumpBorSpans(ctx context.Context, db kv.RoDB, blockFrom, blockTo uint64, workers int, lvl log.Lvl, logger log.Logger, collect func([]byte) error) error {
logEvery := time.NewTicker(20 * time.Second)
defer logEvery.Stop()
var spanFrom, spanTo uint64
if blockFrom > zerothSpanEnd {
spanFrom = 1 + (blockFrom-zerothSpanEnd-1)/spanLength
}
if blockTo > zerothSpanEnd {
spanTo = 1 + (blockTo-zerothSpanEnd-1)/spanLength
}
from := hexutility.EncodeTs(spanFrom)
if err := kv.BigChunks(db, kv.BorSpans, from, func(tx kv.Tx, spanIdBytes, spanBytes []byte) (bool, error) {
spanId := binary.BigEndian.Uint64(spanIdBytes)
if spanId >= spanTo {
return false, nil
}
if e := collect(spanBytes); e != nil {
return false, e
}
select {
case <-ctx.Done():
return false, ctx.Err()
case <-logEvery.C:
var m runtime.MemStats
if lvl >= log.LvlInfo {
dbg.ReadMemStats(&m)
}
logger.Log(lvl, "[bor snapshots] Dumping bor spans", "spanId", spanId,
"alloc", common2.ByteCount(m.Alloc), "sys", common2.ByteCount(m.Sys),
)
default:
}
return true, nil
}); err != nil {
return err
}
return nil
}
func BorEventsIdx(ctx context.Context, segmentFilePath string, blockFrom, blockTo uint64, snapDir string, tmpDir string, p *background.Progress, lvl log.Lvl, logger log.Logger) (err error) {
defer func() {
if rec := recover(); rec != nil {
err = fmt.Errorf("BorEventsIdx: at=%d-%d, %v, %s", blockFrom, blockTo, rec, dbg.Stack())
}
}()
// Calculate how many records there will be in the index
d, err := compress.NewDecompressor(segmentFilePath)
if err != nil {
return err
}
defer d.Close()
g := d.MakeGetter()
var blockNumBuf [length.BlockNum]byte
var first bool = true
word := make([]byte, 0, 4096)
var blockCount int
var baseEventId uint64
for g.HasNext() {
word, _ = g.Next(word[:0])
if first || !bytes.Equal(blockNumBuf[:], word[length.Hash:length.Hash+length.BlockNum]) {
blockCount++
copy(blockNumBuf[:], word[length.Hash:length.Hash+length.BlockNum])
}
if first {
baseEventId = binary.BigEndian.Uint64(word[length.Hash+length.BlockNum : length.Hash+length.BlockNum+8])
first = false
}
select {
case <-ctx.Done():
return ctx.Err()
default:
}
}
var idxFilePath = filepath.Join(snapDir, snaptype.IdxFileName(blockFrom, blockTo, snaptype.BorEvents.String()))
rs, err := recsplit.NewRecSplit(recsplit.RecSplitArgs{
KeyCount: blockCount,
Enums: blockCount > 0,
BucketSize: 2000,
LeafSize: 8,
TmpDir: tmpDir,
IndexFile: idxFilePath,
BaseDataID: baseEventId,
}, logger)
if err != nil {
return err
}
rs.LogLvl(log.LvlDebug)
defer d.EnableMadvNormal().DisableReadAhead()
RETRY:
g.Reset(0)
first = true
var i, offset, nextPos uint64
for g.HasNext() {
word, nextPos = g.Next(word[:0])
i++
if first || !bytes.Equal(blockNumBuf[:], word[length.Hash:length.Hash+length.BlockNum]) {
if err = rs.AddKey(word[:length.Hash], offset); err != nil {
return err
}
copy(blockNumBuf[:], word[length.Hash:length.Hash+length.BlockNum])
}
if first {
first = false
}
select {
case <-ctx.Done():
return ctx.Err()
default:
}
offset = nextPos
}
if err = rs.Build(ctx); err != nil {
if errors.Is(err, recsplit.ErrCollision) {
logger.Info("Building recsplit. Collision happened. It's ok. Restarting with another salt...", "err", err)
rs.ResetNextSalt()
goto RETRY
}
return err
}
return nil
}
func BorSpansIdx(ctx context.Context, segmentFilePath string, blockFrom, blockTo uint64, snapDir string, tmpDir string, p *background.Progress, lvl log.Lvl, logger log.Logger) (err error) {
defer func() {
if rec := recover(); rec != nil {
err = fmt.Errorf("BorSpansIdx: at=%d-%d, %v, %s", blockFrom, blockTo, rec, dbg.Stack())
}
}()
// Calculate how many records there will be in the index
d, err := compress.NewDecompressor(segmentFilePath)
if err != nil {
return err
}
defer d.Close()
g := d.MakeGetter()
var idxFilePath = filepath.Join(snapDir, snaptype.IdxFileName(blockFrom, blockTo, snaptype.BorSpans.String()))
var baseSpanId uint64
if blockFrom > zerothSpanEnd {
baseSpanId = 1 + (blockFrom-zerothSpanEnd-1)/spanLength
}
rs, err := recsplit.NewRecSplit(recsplit.RecSplitArgs{
KeyCount: d.Count(),
Enums: d.Count() > 0,
BucketSize: 2000,
LeafSize: 8,
TmpDir: tmpDir,
IndexFile: idxFilePath,
BaseDataID: baseSpanId,
}, logger)
if err != nil {
return err
}
rs.LogLvl(log.LvlDebug)
defer d.EnableMadvNormal().DisableReadAhead()
RETRY:
g.Reset(0)
var i, offset, nextPos uint64
var key [8]byte
for g.HasNext() {
nextPos, _ = g.Skip()
binary.BigEndian.PutUint64(key[:], i)
i++
if err = rs.AddKey(key[:], offset); err != nil {
return err
}
select {
case <-ctx.Done():
return ctx.Err()
default:
}
offset = nextPos
}
if err = rs.Build(ctx); err != nil {
if errors.Is(err, recsplit.ErrCollision) {
logger.Info("Building recsplit. Collision happened. It's ok. Restarting with another salt...", "err", err)
rs.ResetNextSalt()
goto RETRY
}
return err
}
return nil
}
type BorRoSnapshots struct {
indicesReady atomic.Bool
segmentsReady atomic.Bool
Events *borEventSegments
Spans *borSpanSegments
dir string
segmentsMax atomic.Uint64 // all types of .seg files are available - up to this number
idxMax atomic.Uint64 // all types of .idx files are available - up to this number
cfg ethconfig.BlocksFreezing
logger log.Logger
}
// NewBorRoSnapshots - opens all bor snapshots. But to simplify everything:
// - it opens snapshots only on App start and immutable after
// - all snapshots of given blocks range must exist - to make this blocks range available
// - gaps are not allowed
// - segment have [from:to) semantic
func NewBorRoSnapshots(cfg ethconfig.BlocksFreezing, snapDir string, logger log.Logger) *BorRoSnapshots {
return &BorRoSnapshots{dir: snapDir, cfg: cfg, Events: &borEventSegments{}, Spans: &borSpanSegments{}, logger: logger}
}
func (s *BorRoSnapshots) Cfg() ethconfig.BlocksFreezing { return s.cfg }
func (s *BorRoSnapshots) Dir() string { return s.dir }
func (s *BorRoSnapshots) SegmentsReady() bool { return s.segmentsReady.Load() }
func (s *BorRoSnapshots) IndicesReady() bool { return s.indicesReady.Load() }
func (s *BorRoSnapshots) IndicesMax() uint64 { return s.idxMax.Load() }
func (s *BorRoSnapshots) SegmentsMax() uint64 { return s.segmentsMax.Load() }
func (s *BorRoSnapshots) BlocksAvailable() uint64 {
return cmp.Min(s.segmentsMax.Load(), s.idxMax.Load())
}
func (s *BorRoSnapshots) LogStat() {
var m runtime.MemStats
dbg.ReadMemStats(&m)
s.logger.Info("[bor snapshots] Blocks Stat",
"blocks", fmt.Sprintf("%dk", (s.SegmentsMax()+1)/1000),
"indices", fmt.Sprintf("%dk", (s.IndicesMax()+1)/1000),
"alloc", common2.ByteCount(m.Alloc), "sys", common2.ByteCount(m.Sys))
}
func BorSegments(dir string) (res []snaptype.FileInfo, missingSnapshots []Range, err error) {
list, err := snaptype.Segments(dir)
if err != nil {
return nil, missingSnapshots, err
}
{
var l []snaptype.FileInfo
var m []Range
for _, f := range list {
if f.T != snaptype.BorEvents {
continue
}
l = append(l, f)
}
l, m = noGaps(noOverlaps(borSegmentsMustExist(dir, l)))
res = append(res, l...)
missingSnapshots = append(missingSnapshots, m...)
}
{
var l []snaptype.FileInfo
for _, f := range list {
if f.T != snaptype.BorSpans {
continue
}
l = append(l, f)
}
l, _ = noGaps(noOverlaps(borSegmentsMustExist(dir, l)))
res = append(res, l...)
}
return res, missingSnapshots, nil
}
func (s *BorRoSnapshots) ScanDir() (map[string]struct{}, []*services.Range, error) {
existingFiles, missingSnapshots, err := BorSegments(s.dir)
if err != nil {
return nil, nil, err
}
existingFilesMap := map[string]struct{}{}
for _, existingFile := range existingFiles {
_, fname := filepath.Split(existingFile.Path)
existingFilesMap[fname] = struct{}{}
}
res := make([]*services.Range, 0, len(missingSnapshots))
for _, sn := range missingSnapshots {
res = append(res, &services.Range{From: sn.from, To: sn.to})
}
return existingFilesMap, res, nil
}
func (s *BorRoSnapshots) EnsureExpectedBlocksAreAvailable(cfg *snapcfg.Cfg) error {
if s.BlocksAvailable() < cfg.ExpectBlocks {
return fmt.Errorf("app must wait until all expected bor snapshots are available. Expected: %d, Available: %d", cfg.ExpectBlocks, s.BlocksAvailable())
}
return nil
}
// DisableReadAhead - usage: `defer d.EnableReadAhead().DisableReadAhead()`. Please don't use this funcs without `defer` to avoid leak.
func (s *BorRoSnapshots) DisableReadAhead() {
s.Events.lock.RLock()
defer s.Events.lock.RUnlock()
s.Spans.lock.RLock()
defer s.Spans.lock.RUnlock()
for _, sn := range s.Events.segments {
sn.seg.DisableReadAhead()
}
for _, sn := range s.Spans.segments {
sn.seg.DisableReadAhead()
}
}
func (s *BorRoSnapshots) EnableReadAhead() *BorRoSnapshots {
s.Events.lock.RLock()
defer s.Events.lock.RUnlock()
s.Spans.lock.RLock()
defer s.Spans.lock.RUnlock()
for _, sn := range s.Events.segments {
sn.seg.EnableReadAhead()
}
for _, sn := range s.Spans.segments {
sn.seg.EnableReadAhead()
}
return s
}
func (s *BorRoSnapshots) EnableMadvWillNeed() *BorRoSnapshots {
s.Events.lock.RLock()
defer s.Events.lock.RUnlock()
s.Spans.lock.RLock()
defer s.Spans.lock.RUnlock()
for _, sn := range s.Events.segments {
sn.seg.EnableWillNeed()
}
for _, sn := range s.Spans.segments {
sn.seg.EnableWillNeed()
}
return s
}
func (s *BorRoSnapshots) EnableMadvNormal() *BorRoSnapshots {
s.Events.lock.RLock()
defer s.Events.lock.RUnlock()
s.Spans.lock.RLock()
defer s.Spans.lock.RUnlock()
for _, sn := range s.Events.segments {
sn.seg.EnableMadvNormal()
}
for _, sn := range s.Spans.segments {
sn.seg.EnableMadvNormal()
}
return s
}
func (s *BorRoSnapshots) idxAvailability() uint64 {
var events, spans uint64
for _, seg := range s.Events.segments {
if seg.IdxBorTxnHash == nil {
break
}
events = seg.ranges.to - 1
}
for _, seg := range s.Spans.segments {
if seg.idx == nil {
break
}
spans = seg.ranges.to - 1
}
return cmp.Min(events, spans)
}
// OptimisticReopenWithDB - optimistically open snapshots (ignoring error), useful at App startup because:
// - user must be able: delete any snapshot file and Erigon will self-heal by re-downloading
// - RPC return Nil for historical blocks if snapshots are not open
func (s *BorRoSnapshots) OptimisticReopenWithDB(db kv.RoDB) {
_ = db.View(context.Background(), func(tx kv.Tx) error {
snList, _, err := rawdb.ReadSnapshots(tx)
if err != nil {
return err
}
return s.ReopenList(snList, true)
})
}
func (s *BorRoSnapshots) Files() (list []string) {
s.Events.lock.RLock()
defer s.Events.lock.RUnlock()
s.Spans.lock.RLock()
defer s.Spans.lock.RUnlock()
max := s.BlocksAvailable()
for _, seg := range s.Events.segments {
if seg.seg == nil {
continue
}
if seg.ranges.from > max {
continue
}
_, fName := filepath.Split(seg.seg.FilePath())
list = append(list, fName)
}
for _, seg := range s.Spans.segments {
if seg.seg == nil {
continue
}
if seg.ranges.from > max {
continue
}
_, fName := filepath.Split(seg.seg.FilePath())
list = append(list, fName)
}
slices.Sort(list)
return list
}
// ReopenList stops on optimistic=false, continue opening files on optimistic=true
func (s *BorRoSnapshots) ReopenList(fileNames []string, optimistic bool) error {
s.Events.lock.Lock()
defer s.Events.lock.Unlock()
s.Spans.lock.Lock()
defer s.Spans.lock.Unlock()
s.closeWhatNotInList(fileNames)
var segmentsMax uint64
var segmentsMaxSet bool
Loop:
for _, fName := range fileNames {
f, ok := snaptype.ParseFileName(s.dir, fName)
if !ok {
s.logger.Trace("BorRoSnapshots.ReopenList: skip", "file", fName)
continue
}
var processed bool = true
switch f.T {
case snaptype.BorEvents:
var sn *BorEventSegment
var exists bool
for _, sn2 := range s.Events.segments {
if sn2.seg == nil { // it's ok if some segment was not able to open
continue
}
if fName == sn2.seg.FileName() {
sn = sn2
exists = true
break
}
}
if !exists {
sn = &BorEventSegment{ranges: Range{f.From, f.To}}
}
if err := sn.reopenSeg(s.dir); err != nil {
if errors.Is(err, os.ErrNotExist) {
if optimistic {
continue Loop
} else {
break Loop
}
}
if optimistic {
s.logger.Warn("[bor snapshots] open segment", "err", err)
continue Loop
} else {
return err
}
}
if !exists {
// it's possible to iterate over .seg file even if you don't have index
// then make segment availabe even if index open may fail
s.Events.segments = append(s.Events.segments, sn)
}
if err := sn.reopenIdxIfNeed(s.dir, optimistic); err != nil {
return err
}
case snaptype.BorSpans:
var sn *BorSpanSegment
var exists bool
for _, sn2 := range s.Spans.segments {
if sn2.seg == nil { // it's ok if some segment was not able to open
continue
}
if fName == sn2.seg.FileName() {
sn = sn2
exists = true
break
}
}
if !exists {
sn = &BorSpanSegment{ranges: Range{f.From, f.To}}
}
if err := sn.reopenSeg(s.dir); err != nil {
if errors.Is(err, os.ErrNotExist) {
if optimistic {
continue Loop
} else {
break Loop
}
}
if optimistic {
s.logger.Warn("[bor snapshots] open segment", "err", err)
continue Loop
} else {
return err
}
}
if !exists {
// it's possible to iterate over .seg file even if you don't have index
// then make segment availabe even if index open may fail
s.Spans.segments = append(s.Spans.segments, sn)
}
if err := sn.reopenIdxIfNeed(s.dir, optimistic); err != nil {
return err
}
default:
processed = false
}
if processed {
if f.To > 0 {
segmentsMax = f.To - 1
} else {
segmentsMax = 0
}
segmentsMaxSet = true
}
}
if segmentsMaxSet {
s.segmentsMax.Store(segmentsMax)
}
s.segmentsReady.Store(true)
s.idxMax.Store(s.idxAvailability())
s.indicesReady.Store(true)
return nil
}
func (s *BorRoSnapshots) Ranges() (ranges []Range) {
view := s.View()
defer view.Close()
for _, sn := range view.Events() {
ranges = append(ranges, sn.ranges)
}
return ranges
}
func (s *BorRoSnapshots) OptimisticalyReopenFolder() { _ = s.ReopenFolder() }
func (s *BorRoSnapshots) OptimisticalyReopenWithDB(db kv.RoDB) { _ = s.ReopenWithDB(db) }
func (s *BorRoSnapshots) ReopenFolder() error {
files, _, err := BorSegments(s.dir)
if err != nil {
return err
}
list := make([]string, 0, len(files))
for _, f := range files {
_, fName := filepath.Split(f.Path)
list = append(list, fName)
}
return s.ReopenList(list, false)
}
func (s *BorRoSnapshots) ReopenWithDB(db kv.RoDB) error {
if err := db.View(context.Background(), func(tx kv.Tx) error {
snList, _, err := rawdb.ReadSnapshots(tx)
if err != nil {
return err
}
return s.ReopenList(snList, true)
}); err != nil {
return err
}
return nil
}
func (s *BorRoSnapshots) Close() {
s.Events.lock.Lock()
defer s.Events.lock.Unlock()
s.Spans.lock.Lock()
defer s.Spans.lock.Unlock()
s.closeWhatNotInList(nil)
}
func (s *BorRoSnapshots) closeWhatNotInList(l []string) {
Loop1:
for i, sn := range s.Events.segments {
if sn.seg == nil {
continue Loop1
}
_, name := filepath.Split(sn.seg.FilePath())
for _, fName := range l {
if fName == name {
continue Loop1
}
}
sn.close()
s.Events.segments[i] = nil
}
Loop2:
for i, sn := range s.Spans.segments {
if sn.seg == nil {
continue Loop2
}
_, name := filepath.Split(sn.seg.FilePath())
for _, fName := range l {
if fName == name {
continue Loop2
}
}
sn.close()
s.Spans.segments[i] = nil
}
var i int
for i = 0; i < len(s.Events.segments) && s.Events.segments[i] != nil && s.Events.segments[i].seg != nil; i++ {
}
tail := s.Events.segments[i:]
s.Events.segments = s.Events.segments[:i]
for i = 0; i < len(tail); i++ {
if tail[i] != nil {
tail[i].close()
tail[i] = nil
}
}
for i = 0; i < len(s.Spans.segments) && s.Spans.segments[i] != nil && s.Spans.segments[i].seg != nil; i++ {
}
tailS := s.Spans.segments[i:]
s.Spans.segments = s.Spans.segments[:i]
for i = 0; i < len(tailS); i++ {
if tailS[i] != nil {
tailS[i].close()
tailS[i] = nil
}
}
}
func (s *BorRoSnapshots) PrintDebug() {
s.Events.lock.RLock()
defer s.Events.lock.RUnlock()
s.Spans.lock.RLock()
defer s.Spans.lock.RUnlock()
fmt.Println(" == BorSnapshots, Event")
for _, sn := range s.Events.segments {
fmt.Printf("%d, %t\n", sn.ranges.from, sn.IdxBorTxnHash == nil)
}
fmt.Println(" == BorSnapshots, Span")
for _, sn := range s.Spans.segments {
fmt.Printf("%d, %t\n", sn.ranges.from, sn.idx == nil)
}
}
type BorView struct {
s *BorRoSnapshots
closed bool
}
func (s *BorRoSnapshots) View() *BorView {
v := &BorView{s: s}
v.s.Events.lock.RLock()
v.s.Spans.lock.RLock()
return v
}
func (v *BorView) Close() {
if v.closed {
return
}
v.closed = true
v.s.Events.lock.RUnlock()
v.s.Spans.lock.RUnlock()
}
func (v *BorView) Events() []*BorEventSegment { return v.s.Events.segments }
func (v *BorView) Spans() []*BorSpanSegment { return v.s.Spans.segments }
func (v *BorView) EventsSegment(blockNum uint64) (*BorEventSegment, bool) {
for _, seg := range v.Events() {
if !(blockNum >= seg.ranges.from && blockNum < seg.ranges.to) {
continue
}
return seg, true
}
return nil, false
}
func (v *BorView) SpansSegment(blockNum uint64) (*BorSpanSegment, bool) {
for _, seg := range v.Spans() {
if !(blockNum >= seg.ranges.from && blockNum < seg.ranges.to) {
continue
}
return seg, true
}
return nil, false
}
type BorMerger struct {
lvl log.Lvl
compressWorkers int
tmpDir string
chainConfig *chain.Config
chainDB kv.RoDB
notifier services.DBEventNotifier
logger log.Logger
}
func NewBorMerger(tmpDir string, compressWorkers int, lvl log.Lvl, chainDB kv.RoDB, chainConfig *chain.Config, notifier services.DBEventNotifier, logger log.Logger) *BorMerger {
return &BorMerger{tmpDir: tmpDir, compressWorkers: compressWorkers, lvl: lvl, chainDB: chainDB, chainConfig: chainConfig, notifier: notifier, logger: logger}
}
func (m *BorMerger) FindMergeRanges(currentRanges []Range, maxBlockNum uint64) (toMerge []Range) {
for i := len(currentRanges) - 1; i > 0; i-- {
r := currentRanges[i]
isRecent := r.IsRecent(maxBlockNum)
mergeLimit, mergeSteps := uint64(snaptype.Erigon2RecentMergeLimit), MergeSteps
if isRecent {
mergeLimit, mergeSteps = snaptype.Erigon2MergeLimit, RecentMergeSteps
}
if r.to-r.from >= mergeLimit {
continue
}
for _, span := range mergeSteps {
if r.to%span != 0 {
continue
}
if r.to-r.from == span {
break
}
aggFrom := r.to - span
toMerge = append(toMerge, Range{from: aggFrom, to: r.to})
for currentRanges[i].from > aggFrom {
i--
}
break
}
}
slices.SortFunc(toMerge, func(i, j Range) int { return cmp.Compare(i.from, j.from) })
return toMerge
}
func (m *BorMerger) filesByRange(snapshots *BorRoSnapshots, from, to uint64) (map[snaptype.Type][]string, error) {
toMerge := map[snaptype.Type][]string{}
view := snapshots.View()
defer view.Close()
eSegments := view.Events()
sSegments := view.Spans()
for i, sn := range eSegments {
if sn.ranges.from < from {
continue
}
if sn.ranges.to > to {
break
}
toMerge[snaptype.BorEvents] = append(toMerge[snaptype.BorEvents], eSegments[i].seg.FilePath())
toMerge[snaptype.BorSpans] = append(toMerge[snaptype.BorSpans], sSegments[i].seg.FilePath())
}
return toMerge, nil
}
// Merge does merge segments in given ranges
func (m *BorMerger) Merge(ctx context.Context, snapshots *BorRoSnapshots, mergeRanges []Range, snapDir string, doIndex bool, onMerge func(r Range) error, onDelete func(l []string) error) error {
if len(mergeRanges) == 0 {
return nil
}
logEvery := time.NewTicker(30 * time.Second)
defer logEvery.Stop()
for _, r := range mergeRanges {
toMerge, err := m.filesByRange(snapshots, r.from, r.to)
if err != nil {
return err
}
for _, t := range []snaptype.Type{snaptype.BorEvents, snaptype.BorSpans} {
segName := snaptype.SegmentFileName(r.from, r.to, t)
f, ok := snaptype.ParseFileName(snapDir, segName)
if !ok {
continue
}
if err := m.merge(ctx, toMerge[t], f.Path, logEvery); err != nil {
return fmt.Errorf("mergeByAppendSegments: %w", err)
}
if doIndex {
p := &background.Progress{}
if err := buildIdx(ctx, f, m.chainConfig, m.tmpDir, p, m.lvl, m.logger); err != nil {
return err
}
}
}
if err := snapshots.ReopenFolder(); err != nil {
return fmt.Errorf("ReopenSegments: %w", err)
}
snapshots.LogStat()
if err := onMerge(r); err != nil {
return err
}
for _, t := range snaptype.BlockSnapshotTypes {
if len(toMerge[t]) == 0 {
continue
}
if err := onDelete(toMerge[t]); err != nil {
return err
}
}
for _, t := range []snaptype.Type{snaptype.BorEvents, snaptype.BorSpans} {
m.removeOldFiles(toMerge[t], snapDir)
}
}
m.logger.Log(m.lvl, "[bor snapshots] Merge done", "from", mergeRanges[0].from, "to", mergeRanges[0].to)
return nil
}
func (m *BorMerger) merge(ctx context.Context, toMerge []string, targetFile string, logEvery *time.Ticker) error {
var word = make([]byte, 0, 4096)
var expectedTotal int
cList := make([]*compress.Decompressor, len(toMerge))
for i, cFile := range toMerge {
d, err := compress.NewDecompressor(cFile)
if err != nil {
return err
}
defer d.Close()
cList[i] = d
expectedTotal += d.Count()
}
f, err := compress.NewCompressor(ctx, "Bor Snapshots merge", targetFile, m.tmpDir, compress.MinPatternScore, m.compressWorkers, log.LvlTrace, m.logger)
if err != nil {
return err
}
defer f.Close()
for _, d := range cList {
if err := d.WithReadAhead(func() error {
g := d.MakeGetter()
for g.HasNext() {
word, _ = g.Next(word[:0])
if err := f.AddWord(word); err != nil {
return err
}
}
return nil
}); err != nil {
return err
}
}
if f.Count() != expectedTotal {
return fmt.Errorf("unexpected amount after bor segments merge. got: %d, expected: %d", f.Count(), expectedTotal)
}
if err = f.Compress(); err != nil {
return err
}
return nil
}
func (m *BorMerger) removeOldFiles(toDel []string, snapDir string) {
for _, f := range toDel {
_ = os.Remove(f)
ext := filepath.Ext(f)
withoutExt := f[:len(f)-len(ext)]
_ = os.Remove(withoutExt + ".idx")
}
tmpFiles, err := snaptype.TmpFiles(snapDir)
if err != nil {
return
}
for _, f := range tmpFiles {
_ = os.Remove(f)
}
}