mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2025-01-20 17:31:11 +00:00
51af060450
Make it so that erigon can the enable beacon api.
483 lines
12 KiB
Go
483 lines
12 KiB
Go
package freezeblocks
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/binary"
|
|
"errors"
|
|
"fmt"
|
|
"os"
|
|
"path"
|
|
"path/filepath"
|
|
"sync"
|
|
"sync/atomic"
|
|
|
|
libcommon "github.com/ledgerwatch/erigon-lib/common"
|
|
"github.com/ledgerwatch/erigon-lib/common/background"
|
|
"github.com/ledgerwatch/erigon-lib/common/cmp"
|
|
"github.com/ledgerwatch/erigon-lib/common/dbg"
|
|
"github.com/ledgerwatch/erigon-lib/compress"
|
|
"github.com/ledgerwatch/erigon-lib/downloader/snaptype"
|
|
"github.com/ledgerwatch/erigon-lib/kv"
|
|
"github.com/ledgerwatch/erigon-lib/recsplit"
|
|
"github.com/ledgerwatch/erigon/cl/cltypes"
|
|
"github.com/ledgerwatch/erigon/cl/persistence"
|
|
"github.com/ledgerwatch/erigon/cl/persistence/format/snapshot_format"
|
|
"github.com/ledgerwatch/erigon/eth/ethconfig"
|
|
"github.com/ledgerwatch/log/v3"
|
|
"github.com/pierrec/lz4"
|
|
)
|
|
|
|
type BeaconBlockSegment struct {
|
|
seg *compress.Decompressor // value: chunked(ssz(SignedBeaconBlocks))
|
|
idxSlot *recsplit.Index // slot -> beacon_slot_segment_offset
|
|
ranges Range
|
|
}
|
|
|
|
func (sn *BeaconBlockSegment) closeIdx() {
|
|
if sn.idxSlot != nil {
|
|
sn.idxSlot.Close()
|
|
sn.idxSlot = nil
|
|
}
|
|
}
|
|
func (sn *BeaconBlockSegment) closeSeg() {
|
|
if sn.seg != nil {
|
|
sn.seg.Close()
|
|
sn.seg = nil
|
|
}
|
|
}
|
|
func (sn *BeaconBlockSegment) close() {
|
|
sn.closeSeg()
|
|
sn.closeIdx()
|
|
}
|
|
func (sn *BeaconBlockSegment) reopenSeg(dir string) (err error) {
|
|
sn.closeSeg()
|
|
fileName := snaptype.SegmentFileName(sn.ranges.from, sn.ranges.to, snaptype.BeaconBlocks)
|
|
sn.seg, err = compress.NewDecompressor(path.Join(dir, fileName))
|
|
if err != nil {
|
|
return fmt.Errorf("%w, fileName: %s", err, fileName)
|
|
}
|
|
return nil
|
|
}
|
|
func (sn *BeaconBlockSegment) reopenIdxIfNeed(dir string, optimistic bool) (err error) {
|
|
if sn.idxSlot != nil {
|
|
return nil
|
|
}
|
|
err = sn.reopenIdx(dir)
|
|
if err != nil {
|
|
if !errors.Is(err, os.ErrNotExist) {
|
|
if optimistic {
|
|
log.Warn("[snapshots] open index", "err", err)
|
|
} else {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (sn *BeaconBlockSegment) reopenIdx(dir string) (err error) {
|
|
sn.closeIdx()
|
|
if sn.seg == nil {
|
|
return nil
|
|
}
|
|
fileName := snaptype.IdxFileName(sn.ranges.from, sn.ranges.to, snaptype.BeaconBlocks.String())
|
|
sn.idxSlot, err = recsplit.OpenIndex(path.Join(dir, fileName))
|
|
if err != nil {
|
|
return fmt.Errorf("%w, fileName: %s", err, fileName)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type beaconBlockSegments struct {
|
|
lock sync.RWMutex
|
|
segments []*BeaconBlockSegment
|
|
}
|
|
|
|
func (s *beaconBlockSegments) View(f func(segments []*BeaconBlockSegment) error) error {
|
|
s.lock.RLock()
|
|
defer s.lock.RUnlock()
|
|
return f(s.segments)
|
|
}
|
|
|
|
func BeaconBlocksIdx(ctx context.Context, sn snaptype.FileInfo, segmentFilePath string, blockFrom, blockTo uint64, tmpDir string, p *background.Progress, lvl log.Lvl, logger log.Logger) (err error) {
|
|
defer func() {
|
|
if rec := recover(); rec != nil {
|
|
err = fmt.Errorf("BeaconBlocksIdx: at=%d-%d, %v, %s", blockFrom, blockTo, rec, dbg.Stack())
|
|
}
|
|
}()
|
|
// Calculate how many records there will be in the index
|
|
d, err := compress.NewDecompressor(segmentFilePath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer d.Close()
|
|
_, fname := filepath.Split(segmentFilePath)
|
|
p.Name.Store(&fname)
|
|
p.Total.Store(uint64(d.Count()))
|
|
|
|
if err := Idx(ctx, d, sn.From, tmpDir, log.LvlDebug, func(idx *recsplit.RecSplit, i, offset uint64, word []byte) error {
|
|
if i%20_000 == 0 {
|
|
logger.Log(lvl, "Generating idx for beacon blocks", "progress", i)
|
|
}
|
|
p.Processed.Add(1)
|
|
num := make([]byte, 8)
|
|
n := binary.PutUvarint(num, i)
|
|
if err := idx.AddKey(num[:n], offset); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}, logger); err != nil {
|
|
return fmt.Errorf("BodyNumberIdx: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
type CaplinSnapshots struct {
|
|
indicesReady atomic.Bool
|
|
segmentsReady atomic.Bool
|
|
|
|
BeaconBlocks *beaconBlockSegments
|
|
|
|
dir string
|
|
segmentsMax atomic.Uint64 // all types of .seg files are available - up to this number
|
|
idxMax atomic.Uint64 // all types of .idx files are available - up to this number
|
|
cfg ethconfig.BlocksFreezing
|
|
logger log.Logger
|
|
}
|
|
|
|
// NewCaplinSnapshots - opens all snapshots. But to simplify everything:
|
|
// - it opens snapshots only on App start and immutable after
|
|
// - all snapshots of given blocks range must exist - to make this blocks range available
|
|
// - gaps are not allowed
|
|
// - segment have [from:to) semantic
|
|
func NewCaplinSnapshots(cfg ethconfig.BlocksFreezing, snapDir string, logger log.Logger) *CaplinSnapshots {
|
|
return &CaplinSnapshots{dir: snapDir, cfg: cfg, BeaconBlocks: &beaconBlockSegments{}, logger: logger}
|
|
}
|
|
|
|
func (s *CaplinSnapshots) IndicesMax() uint64 { return s.idxMax.Load() }
|
|
func (s *CaplinSnapshots) SegmentsMax() uint64 { return s.segmentsMax.Load() }
|
|
|
|
func (s *CaplinSnapshots) SegFilePaths(from, to uint64) []string {
|
|
var res []string
|
|
for _, seg := range s.BeaconBlocks.segments {
|
|
if seg.ranges.from >= from && seg.ranges.to <= to {
|
|
res = append(res, seg.seg.FilePath())
|
|
}
|
|
}
|
|
return res
|
|
}
|
|
|
|
func (s *CaplinSnapshots) BlocksAvailable() uint64 {
|
|
return cmp.Min(s.segmentsMax.Load(), s.idxMax.Load())
|
|
}
|
|
|
|
// ReopenList stops on optimistic=false, continue opening files on optimistic=true
|
|
func (s *CaplinSnapshots) ReopenList(fileNames []string, optimistic bool) error {
|
|
s.BeaconBlocks.lock.Lock()
|
|
defer s.BeaconBlocks.lock.Unlock()
|
|
|
|
s.closeWhatNotInList(fileNames)
|
|
var segmentsMax uint64
|
|
var segmentsMaxSet bool
|
|
Loop:
|
|
for _, fName := range fileNames {
|
|
f, ok := snaptype.ParseFileName(s.dir, fName)
|
|
if !ok {
|
|
continue
|
|
}
|
|
var processed bool = true
|
|
|
|
switch f.T {
|
|
case snaptype.BeaconBlocks:
|
|
var sn *BeaconBlockSegment
|
|
var exists bool
|
|
for _, sn2 := range s.BeaconBlocks.segments {
|
|
if sn2.seg == nil { // it's ok if some segment was not able to open
|
|
continue
|
|
}
|
|
if fName == sn2.seg.FileName() {
|
|
sn = sn2
|
|
exists = true
|
|
break
|
|
}
|
|
}
|
|
if !exists {
|
|
sn = &BeaconBlockSegment{ranges: Range{f.From, f.To}}
|
|
}
|
|
if err := sn.reopenSeg(s.dir); err != nil {
|
|
if errors.Is(err, os.ErrNotExist) {
|
|
if optimistic {
|
|
continue Loop
|
|
} else {
|
|
break Loop
|
|
}
|
|
}
|
|
if optimistic {
|
|
s.logger.Warn("[snapshots] open segment", "err", err)
|
|
continue Loop
|
|
} else {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if !exists {
|
|
// it's possible to iterate over .seg file even if you don't have index
|
|
// then make segment available even if index open may fail
|
|
s.BeaconBlocks.segments = append(s.BeaconBlocks.segments, sn)
|
|
}
|
|
if err := sn.reopenIdxIfNeed(s.dir, optimistic); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if processed {
|
|
if f.To > 0 {
|
|
segmentsMax = f.To - 1
|
|
} else {
|
|
segmentsMax = 0
|
|
}
|
|
segmentsMaxSet = true
|
|
}
|
|
}
|
|
if segmentsMaxSet {
|
|
s.segmentsMax.Store(segmentsMax)
|
|
}
|
|
s.segmentsReady.Store(true)
|
|
s.idxMax.Store(s.idxAvailability())
|
|
s.indicesReady.Store(true)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *CaplinSnapshots) idxAvailability() uint64 {
|
|
var beaconBlocks uint64
|
|
for _, seg := range s.BeaconBlocks.segments {
|
|
if seg.idxSlot == nil {
|
|
break
|
|
}
|
|
beaconBlocks = seg.ranges.to - 1
|
|
}
|
|
return beaconBlocks
|
|
}
|
|
|
|
func (s *CaplinSnapshots) ReopenFolder() error {
|
|
files, _, err := SegmentsCaplin(s.dir)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
list := make([]string, 0, len(files))
|
|
for _, f := range files {
|
|
_, fName := filepath.Split(f.Path)
|
|
list = append(list, fName)
|
|
}
|
|
return s.ReopenList(list, false)
|
|
}
|
|
|
|
func (s *CaplinSnapshots) closeWhatNotInList(l []string) {
|
|
Loop1:
|
|
for i, sn := range s.BeaconBlocks.segments {
|
|
if sn.seg == nil {
|
|
continue Loop1
|
|
}
|
|
_, name := filepath.Split(sn.seg.FilePath())
|
|
for _, fName := range l {
|
|
if fName == name {
|
|
continue Loop1
|
|
}
|
|
}
|
|
sn.close()
|
|
s.BeaconBlocks.segments[i] = nil
|
|
}
|
|
var i int
|
|
for i = 0; i < len(s.BeaconBlocks.segments) && s.BeaconBlocks.segments[i] != nil && s.BeaconBlocks.segments[i].seg != nil; i++ {
|
|
}
|
|
tail := s.BeaconBlocks.segments[i:]
|
|
s.BeaconBlocks.segments = s.BeaconBlocks.segments[:i]
|
|
for i = 0; i < len(tail); i++ {
|
|
if tail[i] != nil {
|
|
tail[i].close()
|
|
tail[i] = nil
|
|
}
|
|
}
|
|
}
|
|
|
|
type CaplinView struct {
|
|
s *CaplinSnapshots
|
|
closed bool
|
|
}
|
|
|
|
func (s *CaplinSnapshots) View() *CaplinView {
|
|
v := &CaplinView{s: s}
|
|
v.s.BeaconBlocks.lock.RLock()
|
|
return v
|
|
}
|
|
|
|
func (v *CaplinView) Close() {
|
|
if v.closed {
|
|
return
|
|
}
|
|
v.closed = true
|
|
v.s.BeaconBlocks.lock.RUnlock()
|
|
|
|
}
|
|
|
|
func (v *CaplinView) BeaconBlocks() []*BeaconBlockSegment { return v.s.BeaconBlocks.segments }
|
|
|
|
func (v *CaplinView) BeaconBlocksSegment(slot uint64) (*BeaconBlockSegment, bool) {
|
|
for _, seg := range v.BeaconBlocks() {
|
|
if !(slot >= seg.ranges.from && slot < seg.ranges.to) {
|
|
continue
|
|
}
|
|
return seg, true
|
|
}
|
|
return nil, false
|
|
}
|
|
|
|
func dumpBeaconBlocksRange(ctx context.Context, db kv.RoDB, b persistence.BlockSource, fromSlot uint64, toSlot uint64, tmpDir, snapDir string, workers int, lvl log.Lvl, logger log.Logger) error {
|
|
segName := snaptype.SegmentFileName(fromSlot, toSlot, snaptype.BeaconBlocks)
|
|
f, _ := snaptype.ParseFileName(snapDir, segName)
|
|
|
|
sn, err := compress.NewCompressor(ctx, "Snapshot BeaconBlocks", f.Path, tmpDir, compress.MinPatternScore, workers, lvl, logger)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer sn.Close()
|
|
|
|
tx, err := db.BeginRo(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer tx.Rollback()
|
|
var w bytes.Buffer
|
|
lzWriter := lz4.NewWriter(&w)
|
|
defer lzWriter.Close()
|
|
// Just make a reusable buffer
|
|
buf := make([]byte, 2048)
|
|
// Generate .seg file, which is just the list of beacon blocks.
|
|
for i := fromSlot; i < toSlot; i++ {
|
|
obj, err := b.GetBlock(ctx, tx, i)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if i%20_000 == 0 {
|
|
logger.Log(lvl, "Dumping beacon blocks", "progress", i)
|
|
}
|
|
if obj == nil {
|
|
|
|
if err := sn.AddWord(nil); err != nil {
|
|
return err
|
|
}
|
|
continue
|
|
}
|
|
lzWriter.Reset(&w)
|
|
lzWriter.CompressionLevel = 1
|
|
if buf, err = snapshot_format.WriteBlockForSnapshot(lzWriter, obj.Data, buf); err != nil {
|
|
return err
|
|
}
|
|
if err := lzWriter.Flush(); err != nil {
|
|
return err
|
|
}
|
|
word := w.Bytes()
|
|
|
|
if err := sn.AddWord(word); err != nil {
|
|
return err
|
|
}
|
|
w.Reset()
|
|
}
|
|
if err := sn.Compress(); err != nil {
|
|
return fmt.Errorf("compress: %w", err)
|
|
}
|
|
// Generate .idx file, which is the slot => offset mapping.
|
|
p := &background.Progress{}
|
|
|
|
return BeaconBlocksIdx(ctx, f, path.Join(snapDir, segName), fromSlot, toSlot, tmpDir, p, lvl, logger)
|
|
}
|
|
|
|
func DumpBeaconBlocks(ctx context.Context, db kv.RoDB, b persistence.BlockSource, fromSlot, toSlot, blocksPerFile uint64, tmpDir, snapDir string, workers int, lvl log.Lvl, logger log.Logger) error {
|
|
if blocksPerFile == 0 {
|
|
return nil
|
|
}
|
|
|
|
for i := fromSlot; i < toSlot; i = chooseSegmentEnd(i, toSlot, blocksPerFile) {
|
|
if toSlot-i < blocksPerFile {
|
|
break
|
|
}
|
|
to := chooseSegmentEnd(i, toSlot, blocksPerFile)
|
|
logger.Log(lvl, "Dumping beacon blocks", "from", i, "to", to)
|
|
if err := dumpBeaconBlocksRange(ctx, db, b, i, to, tmpDir, snapDir, workers, lvl, logger); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *CaplinSnapshots) BuildMissingIndices(ctx context.Context, logger log.Logger, lvl log.Lvl) error {
|
|
// if !s.segmentsReady.Load() {
|
|
// return fmt.Errorf("not all snapshot segments are available")
|
|
// }
|
|
|
|
// wait for Downloader service to download all expected snapshots
|
|
segments, _, err := SegmentsCaplin(s.dir)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for index := range segments {
|
|
segment := segments[index]
|
|
if segment.T != snaptype.BeaconBlocks {
|
|
continue
|
|
}
|
|
if hasIdxFile(segment, logger) {
|
|
continue
|
|
}
|
|
p := &background.Progress{}
|
|
|
|
if err := BeaconBlocksIdx(ctx, segment, segment.Path, segment.From, segment.To, s.dir, p, log.LvlDebug, logger); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return s.ReopenFolder()
|
|
}
|
|
|
|
func (s *CaplinSnapshots) ReadHeader(slot uint64) (*cltypes.SignedBeaconBlockHeader, uint64, libcommon.Hash, error) {
|
|
view := s.View()
|
|
defer view.Close()
|
|
|
|
var buf []byte
|
|
|
|
seg, ok := view.BeaconBlocksSegment(slot)
|
|
if !ok {
|
|
return nil, 0, libcommon.Hash{}, nil
|
|
}
|
|
|
|
if seg.idxSlot == nil {
|
|
return nil, 0, libcommon.Hash{}, nil
|
|
}
|
|
blockOffset := seg.idxSlot.OrdinalLookup(slot - seg.idxSlot.BaseDataID())
|
|
|
|
gg := seg.seg.MakeGetter()
|
|
gg.Reset(blockOffset)
|
|
if !gg.HasNext() {
|
|
return nil, 0, libcommon.Hash{}, nil
|
|
}
|
|
|
|
buf, _ = gg.Next(buf)
|
|
if len(buf) == 0 {
|
|
return nil, 0, libcommon.Hash{}, nil
|
|
}
|
|
// Decompress this thing
|
|
buffer := buffersPool.Get().(*bytes.Buffer)
|
|
defer buffersPool.Put(buffer)
|
|
|
|
buffer.Reset()
|
|
buffer.Write(buf)
|
|
lzReader := lz4ReaderPool.Get().(*lz4.Reader)
|
|
defer lz4ReaderPool.Put(lzReader)
|
|
lzReader.Reset(buffer)
|
|
|
|
// Use pooled buffers and readers to avoid allocations.
|
|
return snapshot_format.ReadBlockHeaderFromSnapshotWithExecutionData(lzReader)
|
|
}
|