erigon-pulse/turbo/snapshotsync/freezeblocks/caplin_snapshots.go
Giulio rebuffo 46ecf030f5
Added GET /eth/v1/beacon/rewards/blocks/{block_id} and POST /eth/v1/beacon/rewards/sync_committee/{block_id} (#9102)
* Changed slightly archive format (again)
* Added all of the remaining rewards endpoints
2023-12-30 20:51:28 +01:00

492 lines
13 KiB
Go

package freezeblocks
import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"os"
"path/filepath"
"sync"
"sync/atomic"
"github.com/klauspost/compress/zstd"
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/common/background"
"github.com/ledgerwatch/erigon-lib/common/cmp"
"github.com/ledgerwatch/erigon-lib/common/dbg"
"github.com/ledgerwatch/erigon-lib/compress"
"github.com/ledgerwatch/erigon-lib/downloader/snaptype"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/recsplit"
"github.com/ledgerwatch/erigon/cl/clparams"
"github.com/ledgerwatch/erigon/cl/cltypes"
"github.com/ledgerwatch/erigon/cl/persistence"
"github.com/ledgerwatch/erigon/cl/persistence/format/snapshot_format"
"github.com/ledgerwatch/erigon/eth/ethconfig"
"github.com/ledgerwatch/log/v3"
)
type BeaconBlockSegment struct {
seg *compress.Decompressor // value: chunked(ssz(SignedBeaconBlocks))
idxSlot *recsplit.Index // slot -> beacon_slot_segment_offset
ranges Range
version uint8
}
func (sn *BeaconBlockSegment) closeIdx() {
if sn.idxSlot != nil {
sn.idxSlot.Close()
sn.idxSlot = nil
}
}
func (sn *BeaconBlockSegment) closeSeg() {
if sn.seg != nil {
sn.seg.Close()
sn.seg = nil
}
}
func (sn *BeaconBlockSegment) close() {
sn.closeSeg()
sn.closeIdx()
}
func (sn *BeaconBlockSegment) reopenSeg(dir string) (err error) {
sn.closeSeg()
fileName := snaptype.SegmentFileName(sn.version, sn.ranges.from, sn.ranges.to, snaptype.BeaconBlocks)
sn.seg, err = compress.NewDecompressor(filepath.Join(dir, fileName))
if err != nil {
return fmt.Errorf("%w, fileName: %s", err, fileName)
}
return nil
}
func (sn *BeaconBlockSegment) reopenIdxIfNeed(dir string, optimistic bool) (err error) {
if sn.idxSlot != nil {
return nil
}
err = sn.reopenIdx(dir)
if err != nil {
if !errors.Is(err, os.ErrNotExist) {
if optimistic {
log.Warn("[snapshots] open index", "err", err)
} else {
return err
}
}
}
return nil
}
func (sn *BeaconBlockSegment) reopenIdx(dir string) (err error) {
sn.closeIdx()
if sn.seg == nil {
return nil
}
fileName := snaptype.IdxFileName(sn.version, sn.ranges.from, sn.ranges.to, snaptype.BeaconBlocks.String())
sn.idxSlot, err = recsplit.OpenIndex(filepath.Join(dir, fileName))
if err != nil {
return fmt.Errorf("%w, fileName: %s", err, fileName)
}
return nil
}
type beaconBlockSegments struct {
lock sync.RWMutex
segments []*BeaconBlockSegment
}
func (s *beaconBlockSegments) View(f func(segments []*BeaconBlockSegment) error) error {
s.lock.RLock()
defer s.lock.RUnlock()
return f(s.segments)
}
func BeaconBlocksIdx(ctx context.Context, sn snaptype.FileInfo, segmentFilePath string, blockFrom, blockTo uint64, tmpDir string, p *background.Progress, lvl log.Lvl, logger log.Logger) (err error) {
defer func() {
if rec := recover(); rec != nil {
err = fmt.Errorf("BeaconBlocksIdx: at=%d-%d, %v, %s", blockFrom, blockTo, rec, dbg.Stack())
}
}()
// Calculate how many records there will be in the index
d, err := compress.NewDecompressor(segmentFilePath)
if err != nil {
return err
}
defer d.Close()
_, fname := filepath.Split(segmentFilePath)
p.Name.Store(&fname)
p.Total.Store(uint64(d.Count()))
if err := Idx(ctx, d, sn.From, tmpDir, log.LvlDebug, func(idx *recsplit.RecSplit, i, offset uint64, word []byte) error {
if i%20_000 == 0 {
logger.Log(lvl, "Generating idx for beacon blocks", "progress", i)
}
p.Processed.Add(1)
num := make([]byte, 8)
n := binary.PutUvarint(num, i)
if err := idx.AddKey(num[:n], offset); err != nil {
return err
}
return nil
}, logger); err != nil {
return fmt.Errorf("BodyNumberIdx: %w", err)
}
return nil
}
type CaplinSnapshots struct {
indicesReady atomic.Bool
segmentsReady atomic.Bool
BeaconBlocks *beaconBlockSegments
dir string
segmentsMax atomic.Uint64 // all types of .seg files are available - up to this number
idxMax atomic.Uint64 // all types of .idx files are available - up to this number
cfg ethconfig.BlocksFreezing
logger log.Logger
// allows for pruning segments - this is the min availible segment
segmentsMin atomic.Uint64
version uint8
// chain cfg
beaconCfg *clparams.BeaconChainConfig
}
// NewCaplinSnapshots - opens all snapshots. But to simplify everything:
// - it opens snapshots only on App start and immutable after
// - all snapshots of given blocks range must exist - to make this blocks range available
// - gaps are not allowed
// - segment have [from:to) semantic
func NewCaplinSnapshots(cfg ethconfig.BlocksFreezing, beaconCfg *clparams.BeaconChainConfig, snapDir string, version uint8, logger log.Logger) *CaplinSnapshots {
return &CaplinSnapshots{dir: snapDir, version: version, cfg: cfg, BeaconBlocks: &beaconBlockSegments{}, logger: logger, beaconCfg: beaconCfg}
}
func (s *CaplinSnapshots) Version() uint8 { return s.version }
func (s *CaplinSnapshots) IndicesMax() uint64 { return s.idxMax.Load() }
func (s *CaplinSnapshots) SegmentsMax() uint64 { return s.segmentsMax.Load() }
func (s *CaplinSnapshots) SegFilePaths(from, to uint64) []string {
var res []string
for _, seg := range s.BeaconBlocks.segments {
if seg.ranges.from >= from && seg.ranges.to <= to {
res = append(res, seg.seg.FilePath())
}
}
return res
}
func (s *CaplinSnapshots) BlocksAvailable() uint64 {
return cmp.Min(s.segmentsMax.Load(), s.idxMax.Load())
}
// ReopenList stops on optimistic=false, continue opening files on optimistic=true
func (s *CaplinSnapshots) ReopenList(fileNames []string, optimistic bool) error {
s.BeaconBlocks.lock.Lock()
defer s.BeaconBlocks.lock.Unlock()
s.closeWhatNotInList(fileNames)
var segmentsMax uint64
var segmentsMaxSet bool
Loop:
for _, fName := range fileNames {
f, ok := snaptype.ParseFileName(s.dir, fName)
if !ok {
continue
}
var processed bool = true
switch f.T {
case snaptype.BeaconBlocks:
var sn *BeaconBlockSegment
var exists bool
for _, sn2 := range s.BeaconBlocks.segments {
if sn2.seg == nil { // it's ok if some segment was not able to open
continue
}
if fName == sn2.seg.FileName() {
sn = sn2
exists = true
break
}
}
if !exists {
sn = &BeaconBlockSegment{version: s.version, ranges: Range{f.From, f.To}}
}
if err := sn.reopenSeg(s.dir); err != nil {
if errors.Is(err, os.ErrNotExist) {
if optimistic {
continue Loop
} else {
break Loop
}
}
if optimistic {
s.logger.Warn("[snapshots] open segment", "err", err)
continue Loop
} else {
return err
}
}
if !exists {
// it's possible to iterate over .seg file even if you don't have index
// then make segment available even if index open may fail
s.BeaconBlocks.segments = append(s.BeaconBlocks.segments, sn)
}
if err := sn.reopenIdxIfNeed(s.dir, optimistic); err != nil {
return err
}
}
if processed {
if f.To > 0 {
segmentsMax = f.To - 1
} else {
segmentsMax = 0
}
segmentsMaxSet = true
}
}
if segmentsMaxSet {
s.segmentsMax.Store(segmentsMax)
}
s.segmentsReady.Store(true)
s.idxMax.Store(s.idxAvailability())
s.indicesReady.Store(true)
return nil
}
func (s *CaplinSnapshots) idxAvailability() uint64 {
var beaconBlocks uint64
for _, seg := range s.BeaconBlocks.segments {
if seg.idxSlot == nil {
break
}
beaconBlocks = seg.ranges.to - 1
}
return beaconBlocks
}
func (s *CaplinSnapshots) ReopenFolder() error {
files, _, err := SegmentsCaplin(s.dir, s.version, s.segmentsMin.Load())
if err != nil {
return err
}
list := make([]string, 0, len(files))
for _, f := range files {
_, fName := filepath.Split(f.Path)
list = append(list, fName)
}
return s.ReopenList(list, false)
}
func (s *CaplinSnapshots) closeWhatNotInList(l []string) {
Loop1:
for i, sn := range s.BeaconBlocks.segments {
if sn.seg == nil {
continue Loop1
}
_, name := filepath.Split(sn.seg.FilePath())
for _, fName := range l {
if fName == name {
continue Loop1
}
}
sn.close()
s.BeaconBlocks.segments[i] = nil
}
var i int
for i = 0; i < len(s.BeaconBlocks.segments) && s.BeaconBlocks.segments[i] != nil && s.BeaconBlocks.segments[i].seg != nil; i++ {
}
tail := s.BeaconBlocks.segments[i:]
s.BeaconBlocks.segments = s.BeaconBlocks.segments[:i]
for i = 0; i < len(tail); i++ {
if tail[i] != nil {
tail[i].close()
tail[i] = nil
}
}
}
type CaplinView struct {
s *CaplinSnapshots
closed bool
}
func (s *CaplinSnapshots) View() *CaplinView {
v := &CaplinView{s: s}
v.s.BeaconBlocks.lock.RLock()
return v
}
func (v *CaplinView) Close() {
if v.closed {
return
}
v.closed = true
v.s.BeaconBlocks.lock.RUnlock()
}
func (v *CaplinView) BeaconBlocks() []*BeaconBlockSegment { return v.s.BeaconBlocks.segments }
func (v *CaplinView) BeaconBlocksSegment(slot uint64) (*BeaconBlockSegment, bool) {
for _, seg := range v.BeaconBlocks() {
if !(slot >= seg.ranges.from && slot < seg.ranges.to) {
continue
}
return seg, true
}
return nil, false
}
func dumpBeaconBlocksRange(ctx context.Context, db kv.RoDB, b persistence.BlockSource, version uint8, fromSlot uint64, toSlot uint64, tmpDir, snapDir string, workers int, lvl log.Lvl, logger log.Logger) error {
segName := snaptype.SegmentFileName(version, fromSlot, toSlot, snaptype.BeaconBlocks)
f, _ := snaptype.ParseFileName(snapDir, segName)
sn, err := compress.NewCompressor(ctx, "Snapshot BeaconBlocks", f.Path, tmpDir, compress.MinPatternScore, workers, lvl, logger)
if err != nil {
return err
}
defer sn.Close()
tx, err := db.BeginRo(ctx)
if err != nil {
return err
}
defer tx.Rollback()
var w bytes.Buffer
compressor, err := zstd.NewWriter(&w, zstd.WithEncoderLevel(zstd.SpeedBetterCompression))
if err != nil {
return err
}
defer compressor.Close()
// Just make a reusable buffer
buf := make([]byte, 2048)
// Generate .seg file, which is just the list of beacon blocks.
for i := fromSlot; i < toSlot; i++ {
obj, err := b.GetBlock(ctx, tx, i)
if err != nil {
return err
}
if i%20_000 == 0 {
logger.Log(lvl, "Dumping beacon blocks", "progress", i)
}
if obj == nil {
if err := sn.AddWord(nil); err != nil {
return err
}
continue
}
if buf, err = snapshot_format.WriteBlockForSnapshot(compressor, obj.Data, buf); err != nil {
return err
}
if err := compressor.Close(); err != nil {
return err
}
word := w.Bytes()
if err := sn.AddWord(word); err != nil {
return err
}
w.Reset()
compressor.Reset(&w)
}
if err := sn.Compress(); err != nil {
return fmt.Errorf("compress: %w", err)
}
// Generate .idx file, which is the slot => offset mapping.
p := &background.Progress{}
return BeaconBlocksIdx(ctx, f, filepath.Join(snapDir, segName), fromSlot, toSlot, tmpDir, p, lvl, logger)
}
func DumpBeaconBlocks(ctx context.Context, db kv.RoDB, b persistence.BlockSource, version uint8, fromSlot, toSlot, blocksPerFile uint64, tmpDir, snapDir string, workers int, lvl log.Lvl, logger log.Logger) error {
if blocksPerFile == 0 {
return nil
}
for i := fromSlot; i < toSlot; i = chooseSegmentEnd(i, toSlot, blocksPerFile) {
if toSlot-i < blocksPerFile {
break
}
to := chooseSegmentEnd(i, toSlot, blocksPerFile)
logger.Log(lvl, "Dumping beacon blocks", "from", i, "to", to)
if err := dumpBeaconBlocksRange(ctx, db, b, version, i, to, tmpDir, snapDir, workers, lvl, logger); err != nil {
return err
}
}
return nil
}
func (s *CaplinSnapshots) BuildMissingIndices(ctx context.Context, logger log.Logger, lvl log.Lvl) error {
// if !s.segmentsReady.Load() {
// return fmt.Errorf("not all snapshot segments are available")
// }
// wait for Downloader service to download all expected snapshots
segments, _, err := SegmentsCaplin(s.dir, s.version, 0)
if err != nil {
return err
}
for index := range segments {
segment := segments[index]
if segment.T != snaptype.BeaconBlocks {
continue
}
if hasIdxFile(segment, logger) {
continue
}
p := &background.Progress{}
if err := BeaconBlocksIdx(ctx, segment, segment.Path, segment.From, segment.To, s.dir, p, log.LvlDebug, logger); err != nil {
return err
}
}
return s.ReopenFolder()
}
func (s *CaplinSnapshots) ReadHeader(slot uint64) (*cltypes.SignedBeaconBlockHeader, uint64, libcommon.Hash, error) {
view := s.View()
defer view.Close()
var buf []byte
seg, ok := view.BeaconBlocksSegment(slot)
if !ok {
return nil, 0, libcommon.Hash{}, nil
}
if seg.idxSlot == nil {
return nil, 0, libcommon.Hash{}, nil
}
blockOffset := seg.idxSlot.OrdinalLookup(slot - seg.idxSlot.BaseDataID())
gg := seg.seg.MakeGetter()
gg.Reset(blockOffset)
if !gg.HasNext() {
return nil, 0, libcommon.Hash{}, nil
}
buf, _ = gg.Next(buf)
if len(buf) == 0 {
return nil, 0, libcommon.Hash{}, nil
}
// Decompress this thing
buffer := buffersPool.Get().(*bytes.Buffer)
defer buffersPool.Put(buffer)
buffer.Reset()
buffer.Write(buf)
reader := decompressorPool.Get().(*zstd.Decoder)
defer decompressorPool.Put(reader)
reader.Reset(buffer)
// Use pooled buffers and readers to avoid allocations.
return snapshot_format.ReadBlockHeaderFromSnapshotWithExecutionData(reader, s.beaconCfg)
}