From 995009ac7b98e7c149e71f34556f9b94243783bd Mon Sep 17 00:00:00 2001 From: Giulio rebuffo Date: Sun, 22 Oct 2023 19:21:37 +0200 Subject: [PATCH] Added cli too for Snapshots Generations for Caplin (#8543) --- cl/persistence/beacon_indicies/indicies.go | 15 ++ cl/persistence/block_saver.go | 24 +++ cl/persistence/block_store.go | 8 + .../format/snapshot_format/attestations.go | 194 ------------------ .../snapshot_format/attestations_test.go | 40 ---- .../format/snapshot_format/blocks.go | 9 +- .../format/snapshot_format/snapshots.go | 87 ++++++++ cl/persistence/interface.go | 1 + .../network/backward_beacon_downloader.go | 6 +- cmd/capcli/cli.go | 58 +++++- erigon-lib/downloader/snaptype/files.go | 5 + erigon-lib/kv/tables.go | 4 + .../freezeblocks/block_snapshots.go | 65 ++++++ 13 files changed, 271 insertions(+), 245 deletions(-) delete mode 100644 cl/persistence/format/snapshot_format/attestations.go delete mode 100644 cl/persistence/format/snapshot_format/attestations_test.go create mode 100644 cl/persistence/format/snapshot_format/snapshots.go diff --git a/cl/persistence/beacon_indicies/indicies.go b/cl/persistence/beacon_indicies/indicies.go index cc5ee32e9..da08b445b 100644 --- a/cl/persistence/beacon_indicies/indicies.go +++ b/cl/persistence/beacon_indicies/indicies.go @@ -11,6 +11,21 @@ import ( _ "modernc.org/sqlite" ) +func WriteHighestFinalized(tx kv.RwTx, slot uint64) error { + return tx.Put(kv.HighestFinalized, kv.HighestFinalizedKey, base_encoding.Encode64(slot)) +} + +func ReadHighestFinalized(tx kv.Tx) (uint64, error) { + val, err := tx.GetOne(kv.HighestFinalized, kv.HighestFinalizedKey) + if err != nil { + return 0, err + } + if len(val) == 0 { + return 0, nil + } + return base_encoding.Decode64(val), nil +} + // WriteBlockRootSlot writes the slot associated with a block root. func WriteHeaderSlot(tx kv.RwTx, blockRoot libcommon.Hash, slot uint64) error { return tx.Put(kv.BlockRootToSlot, blockRoot[:], base_encoding.Encode64(slot)) diff --git a/cl/persistence/block_saver.go b/cl/persistence/block_saver.go index 3e190fa21..6bc35ae3c 100644 --- a/cl/persistence/block_saver.go +++ b/cl/persistence/block_saver.go @@ -33,6 +33,30 @@ func NewBeaconChainDatabaseFilesystem(rawDB RawBeaconBlockChain, executionEngine } } +func (b beaconChainDatabaseFilesystem) GetBlock(ctx context.Context, tx kv.Tx, slot uint64) (*peers.PeeredObject[*cltypes.SignedBeaconBlock], error) { + blockRoot, err := beacon_indicies.ReadCanonicalBlockRoot(tx, slot) + if err != nil { + return nil, err + } + if blockRoot == (libcommon.Hash{}) { + return nil, nil + } + + r, err := b.rawDB.BlockReader(ctx, slot, blockRoot) + if err != nil { + return nil, err + } + defer r.Close() + + block := cltypes.NewSignedBeaconBlock(b.cfg) + version := b.cfg.GetCurrentStateVersion(slot / b.cfg.SlotsPerEpoch) + if err := ssz_snappy.DecodeAndReadNoForkDigest(r, block, version); err != nil { + return nil, err + } + + return &peers.PeeredObject[*cltypes.SignedBeaconBlock]{Data: block}, nil +} + func (b beaconChainDatabaseFilesystem) GetRange(ctx context.Context, tx kv.Tx, from uint64, count uint64) (*peers.PeeredObject[[]*cltypes.SignedBeaconBlock], error) { // Retrieve block roots for each ranged slot beaconBlockRooots, slots, err := beacon_indicies.ReadBeaconBlockRootsInSlotRange(ctx, tx, from, count) diff --git a/cl/persistence/block_store.go b/cl/persistence/block_store.go index e20552c8f..34d231921 100644 --- a/cl/persistence/block_store.go +++ b/cl/persistence/block_store.go @@ -30,6 +30,10 @@ func NewBeaconRpcSource(rpc *rpc.BeaconRpcP2P) *BeaconRpcSource { } } +func (*BeaconRpcSource) GetBlock(ctx context.Context, tx kv.Tx, slot uint64) (*peers.PeeredObject[*cltypes.SignedBeaconBlock], error) { + panic("unimplemented") +} + func (b *BeaconRpcSource) GetRange(ctx context.Context, _ kv.Tx, from uint64, count uint64) (*peers.PeeredObject[[]*cltypes.SignedBeaconBlock], error) { if count == 0 { return nil, nil @@ -75,6 +79,10 @@ type GossipSource struct { blocks *btree.Map[uint64, chan *peers.PeeredObject[*cltypes.SignedBeaconBlock]] } +func (*GossipSource) GetBlock(ctx context.Context, tx kv.Tx, slot uint64) (*peers.PeeredObject[*cltypes.SignedBeaconBlock], error) { + panic("unimplemented") +} + func NewGossipSource(ctx context.Context, gossip *network.GossipManager) *GossipSource { g := &GossipSource{ gossip: gossip, diff --git a/cl/persistence/format/snapshot_format/attestations.go b/cl/persistence/format/snapshot_format/attestations.go deleted file mode 100644 index f53051956..000000000 --- a/cl/persistence/format/snapshot_format/attestations.go +++ /dev/null @@ -1,194 +0,0 @@ -package snapshot_format - -// TODO: Make this actually usable. -// func EncodeAttestationsForStorage(attestations *solid.ListSSZ[*solid.Attestation], buf []byte) []byte { -// if attestations.Len() == 0 { -// return nil -// } -// encoded := buf - -// referencedAttestations := []solid.AttestationData{ -// nil, // Full diff -// } -// // Pre-allocate some memory. -// attestations.Range(func(_ int, attestation *solid.Attestation, _ int) bool { -// data := attestation.AttestantionData() -// sig := attestation.Signature() -// // Encode attestation metadata -// // Also we need to keep track of aggregation bits size manually. -// encoded = append(encoded, byte(len(attestation.AggregationBits()))) -// encoded = append(encoded, attestation.AggregationBits()...) -// // Encode signature -// encoded = append(encoded, sig[:]...) -// // Encode attestation body -// var bestEncoding []byte -// bestEncodingIndex := 0 -// // try all non-repeating attestations. -// for i, att := range referencedAttestations { -// currentEncoding := encodeAttestationDataForStorage(attestation.AttestantionData(), att) -// // check if we find a better fit. -// if len(bestEncoding) == 0 || len(bestEncoding) > len(currentEncoding) { -// bestEncodingIndex = i -// bestEncoding = currentEncoding -// // cannot get lower than 1, so accept it as best. -// if len(bestEncoding) == 1 { -// break -// } -// } -// } -// // If it is not repeated then save it. -// if len(bestEncoding) != 1 { -// referencedAttestations = append(referencedAttestations, data) -// } -// encoded = append(encoded, byte(bestEncodingIndex)) -// encoded = append(encoded, bestEncoding...) -// // Encode attester index -// encoded = append(encoded, data.RawValidatorIndex()...) -// return true -// }) -// return encoded -// } - -// // EncodeAttestationsDataForStorage encodes attestation data and compress everything by defaultData. -// func encodeAttestationDataForStorage(data solid.AttestationData, defaultData solid.AttestationData) []byte { -// fieldSet := byte(0) -// var ret []byte - -// numBuffer := make([]byte, 4) - -// // Encode in slot -// if defaultData == nil || data.Slot() != defaultData.Slot() { -// slotBytes := make([]byte, 4) -// binary.LittleEndian.PutUint32(slotBytes, uint32(data.Slot())) -// ret = append(ret, slotBytes...) -// } else { -// fieldSet = 1 -// } - -// if defaultData == nil || !bytes.Equal(data.RawBeaconBlockRoot(), defaultData.RawBeaconBlockRoot()) { -// root := data.BeaconBlockRoot() -// ret = append(ret, root[:]...) -// } else { -// fieldSet |= 2 -// } - -// if defaultData == nil || data.Source().Epoch() != defaultData.Source().Epoch() { -// binary.LittleEndian.PutUint32(numBuffer, uint32(data.Source().Epoch())) -// ret = append(ret, numBuffer...) -// } else { -// fieldSet |= 4 -// } - -// if defaultData == nil || !bytes.Equal(data.Source().RawBlockRoot(), defaultData.Source().RawBlockRoot()) { -// ret = append(ret, data.Source().RawBlockRoot()...) -// } else { -// fieldSet |= 8 -// } - -// if defaultData == nil || data.Target().Epoch() != defaultData.Target().Epoch() { -// binary.LittleEndian.PutUint32(numBuffer, uint32(data.Target().Epoch())) - -// ret = append(ret, numBuffer...) -// } else { -// fieldSet |= 16 -// } - -// if defaultData == nil || !bytes.Equal(data.Target().RawBlockRoot(), defaultData.Target().RawBlockRoot()) { -// root := data.Target().BlockRoot() -// ret = append(ret, root[:]...) -// } else { -// fieldSet |= 32 -// } -// return append([]byte{fieldSet}, ret...) -// } - -// func DecodeAttestationsForStorage(buf []byte, out []byte) error { -// var signature libcommon.Bytes96 - -// if len(buf) == 0 { -// return nil -// } - -// referencedAttestations := []solid.AttestationData{ -// nil, // Full diff -// } -// // current position is how much we read. -// pos := 0 -// for pos != len(buf) { -// attestationData := solid.NewAttestationData() -// // Decode aggregations bits -// aggrBitsLength := int(buf[pos]) -// pos++ -// aggrBits := buf[pos : pos+aggrBitsLength] -// pos += aggrBitsLength -// // Decode signature -// copy(signature[:], buf[pos:]) -// pos += 96 -// // decode attestation body -// // 1) read comparison index -// comparisonIndex := int(buf[pos]) -// pos++ -// n := decodeAttestationDataForStorage(buf[pos:], referencedAttestations[comparisonIndex], attestationData) -// // field set is not null, so we need to remember it. -// if n != 1 { -// referencedAttestations = append(referencedAttestations, attestationData) -// } -// pos += n -// // decode attester index -// attestationData.SetValidatorIndexWithRawBytes(buf[pos:]) -// pos += 8 -// attestations.Append(solid.NewAttestionFromParameters(aggrBits, attestationData, signature)) -// } -// return nil -// } - -// // DecodeAttestationDataForStorage decodes attestation data and decompress everything by defaultData. -// func decodeAttestationDataForStorage(buf []byte, defaultData solid.AttestationData, target solid.AttestationData) (n int) { -// if len(buf) == 0 { -// return -// } -// fieldSet := buf[0] -// n++ -// if fieldSet&1 > 0 { -// target.SetSlotWithRawBytes(defaultData.RawSlot()) -// } else { -// target.SetSlot(uint64(binary.LittleEndian.Uint32(buf[n:]))) -// n += 4 -// } - -// if fieldSet&2 > 0 { -// target.SetBeaconBlockRootWithRawBytes(defaultData.RawBeaconBlockRoot()) -// } else { -// target.SetBeaconBlockRootWithRawBytes(buf[n : n+32]) -// n += 32 -// } - -// if fieldSet&4 > 0 { -// target.Source().SetRawEpoch(defaultData.Source().RawEpoch()) -// } else { -// target.Source().SetEpoch(uint64(binary.LittleEndian.Uint32(buf[n:]))) -// n += 4 -// } - -// if fieldSet&8 > 0 { -// target.Source().SetRawBlockRoot(defaultData.Source().RawBlockRoot()) -// } else { -// target.Source().SetRawBlockRoot(buf[n : n+32]) -// n += 32 -// } - -// if fieldSet&16 > 0 { -// target.Target().SetRawEpoch(defaultData.Target().RawEpoch()) -// } else { -// target.Target().SetEpoch(uint64(binary.LittleEndian.Uint32(buf[n:]))) -// n += 4 -// } - -// if fieldSet&32 > 0 { -// target.Target().SetRawBlockRoot(defaultData.Target().RawBlockRoot()) -// } else { -// target.Target().SetRawBlockRoot(buf[n : n+32]) -// n += 32 -// } -// return -// } diff --git a/cl/persistence/format/snapshot_format/attestations_test.go b/cl/persistence/format/snapshot_format/attestations_test.go deleted file mode 100644 index abcafdb71..000000000 --- a/cl/persistence/format/snapshot_format/attestations_test.go +++ /dev/null @@ -1,40 +0,0 @@ -package snapshot_format_test - -// func TestAttestationsEncoding(t *testing.T) { -// attVec := solid.NewDynamicListSSZ[*solid.Attestation](256) -// for i := 0; i < 256; i++ { -// attVec.Append(solid.NewAttestionFromParameters( -// []byte{byte(i)}, -// solid.NewAttestionDataFromParameters( -// uint64(i*i*i), -// uint64(i*i*i), -// [32]byte{}, -// solid.NewCheckpointFromParameters([32]byte{45, 67}, 219), -// solid.NewCheckpointFromParameters([32]byte{67, 98}, 219), -// ), libcommon.Bytes96{byte(i)})) -// } -// plain, err := attVec.EncodeSSZ(nil) -// require.NoError(t, err) - -// compacted := format.EncodeAttestationsForStorage(attVec, nil) -// require.Less(t, len(compacted), len(plain)) - -// // Now-decode it back. -// resAttVec := solid.NewDynamicListSSZ[*solid.Attestation](256) -// require.NoError(t, format.DecodeAttestationsForStorage(compacted, resAttVec)) - -// require.Equal(t, attVec.Len(), resAttVec.Len()) - -// for i := 0; i < 256; i++ { -// require.Equal(t, attVec.Get(i).Signature(), resAttVec.Get(i).Signature()) -// require.Equal(t, attVec.Get(i).AggregationBits(), resAttVec.Get(i).AggregationBits()) - -// require.Equal(t, attVec.Get(i).AttestantionData().Slot(), resAttVec.Get(i).AttestantionData().Slot()) -// require.Equal(t, attVec.Get(i).AttestantionData().ValidatorIndex(), resAttVec.Get(i).AttestantionData().ValidatorIndex()) -// require.Equal(t, attVec.Get(i).AttestantionData().BeaconBlockRoot(), resAttVec.Get(i).AttestantionData().BeaconBlockRoot()) -// require.Equal(t, attVec.Get(i).AttestantionData().Source(), resAttVec.Get(i).AttestantionData().Source()) -// require.Equal(t, attVec.Get(i).AttestantionData().Target(), resAttVec.Get(i).AttestantionData().Target()) - -// require.Equal(t, attVec.Get(i), resAttVec.Get(i)) -// } -// } diff --git a/cl/persistence/format/snapshot_format/blocks.go b/cl/persistence/format/snapshot_format/blocks.go index 5ec26d63c..e1f7abac3 100644 --- a/cl/persistence/format/snapshot_format/blocks.go +++ b/cl/persistence/format/snapshot_format/blocks.go @@ -64,6 +64,10 @@ func computeInitialOffset(version clparams.StateVersion) uint64 { // WriteBlockForSnapshot writes a block to the given writer in the format expected by the snapshot. func WriteBlockForSnapshot(block *cltypes.SignedBeaconBlock, w io.Writer) error { + bodyRoot, err := block.Block.Body.HashSSZ() + if err != nil { + return err + } // Maybe reuse the buffer? encoded, err := block.EncodeSSZ(nil) if err != nil { @@ -73,6 +77,9 @@ func WriteBlockForSnapshot(block *cltypes.SignedBeaconBlock, w io.Writer) error if _, err := w.Write([]byte{byte(version)}); err != nil { return err } + if _, err := w.Write(bodyRoot[:]); err != nil { + return err + } currentChunkLength := computeInitialOffset(version) body := block.Block.Body @@ -116,7 +123,7 @@ func WriteBlockForSnapshot(block *cltypes.SignedBeaconBlock, w io.Writer) error } func readMetadataForBlock(r io.Reader) (clparams.StateVersion, error) { - b := []byte{0} + b := make([]byte, 33) // version + body root if _, err := r.Read(b); err != nil { return 0, err } diff --git a/cl/persistence/format/snapshot_format/snapshots.go b/cl/persistence/format/snapshot_format/snapshots.go new file mode 100644 index 000000000..8cecbc8f1 --- /dev/null +++ b/cl/persistence/format/snapshot_format/snapshots.go @@ -0,0 +1,87 @@ +package snapshot_format + +import ( + "bytes" + "context" + "fmt" + + "github.com/ledgerwatch/erigon-lib/common/background" + "github.com/ledgerwatch/erigon-lib/common/cmp" + "github.com/ledgerwatch/erigon-lib/compress" + "github.com/ledgerwatch/erigon-lib/downloader/snaptype" + "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/erigon/cl/persistence" + "github.com/ledgerwatch/erigon/turbo/snapshotsync/freezeblocks" + "github.com/ledgerwatch/log/v3" +) + +func dumpBeaconBlocksRange(ctx context.Context, db kv.RoDB, b persistence.BlockSource, fromSlot uint64, toSlot uint64, tmpDir, snapDir string, workers int, lvl log.Lvl, logger log.Logger) error { + segName := snaptype.SegmentFileName(fromSlot, toSlot, snaptype.BeaconBlocks) + f, _ := snaptype.ParseFileName(snapDir, segName) + + sn, err := compress.NewCompressor(ctx, "Snapshot BeaconBlocks", f.Path, tmpDir, compress.MinPatternScore, workers, lvl, logger) + if err != nil { + return err + } + defer sn.Close() + + tx, err := db.BeginRo(ctx) + if err != nil { + return err + } + defer tx.Rollback() + // Generate .seg file, which is just the list of beacon blocks. + var buf bytes.Buffer + for i := fromSlot; i <= toSlot; i++ { + obj, err := b.GetBlock(ctx, tx, i) + if err != nil { + return err + } + if obj == nil { + if err := sn.AddWord(nil); err != nil { + return err + } + continue + } + if err := WriteBlockForSnapshot(obj.Data, &buf); err != nil { + return err + } + if err := sn.AddWord(buf.Bytes()); err != nil { + return err + } + buf.Reset() + } + if err := sn.Compress(); err != nil { + return fmt.Errorf("compress: %w", err) + } + // Generate .idx file, which is the slot => offset mapping. + p := &background.Progress{} + + return freezeblocks.BeaconBlocksIdx(ctx, segName, fromSlot, toSlot, snapDir, tmpDir, p, lvl, logger) +} + +func chooseSegmentEnd(from, to, blocksPerFile uint64) uint64 { + next := (from/blocksPerFile + 1) * blocksPerFile + to = cmp.Min(next, to) + + if to < snaptype.Erigon2MinSegmentSize { + return to + } + + return to - (to % snaptype.Erigon2MinSegmentSize) // round down to the nearest 1k +} + +func DumpBeaconBlocks(ctx context.Context, db kv.RoDB, b persistence.BlockSource, fromSlot, toSlot, blocksPerFile uint64, tmpDir, snapDir string, workers int, lvl log.Lvl, logger log.Logger) error { + if blocksPerFile == 0 { + return nil + } + + for i := fromSlot; i < toSlot; i = chooseSegmentEnd(i, toSlot, blocksPerFile) { + to := chooseSegmentEnd(i, toSlot, blocksPerFile) + logger.Log(lvl, "Dumping beacon blocks", "from", i, "to", to) + if err := dumpBeaconBlocksRange(ctx, db, b, i, to, tmpDir, snapDir, workers, lvl, logger); err != nil { + return err + } + } + return nil +} diff --git a/cl/persistence/interface.go b/cl/persistence/interface.go index 158bcb787..56b7be806 100644 --- a/cl/persistence/interface.go +++ b/cl/persistence/interface.go @@ -13,6 +13,7 @@ import ( type BlockSource interface { GetRange(ctx context.Context, tx kv.Tx, from uint64, count uint64) (*peers.PeeredObject[[]*cltypes.SignedBeaconBlock], error) PurgeRange(ctx context.Context, tx kv.RwTx, from uint64, count uint64) error + GetBlock(ctx context.Context, tx kv.Tx, slot uint64) (*peers.PeeredObject[*cltypes.SignedBeaconBlock], error) } type BeaconChainWriter interface { diff --git a/cl/phase1/network/backward_beacon_downloader.go b/cl/phase1/network/backward_beacon_downloader.go index a33403f05..2f5abf9b8 100644 --- a/cl/phase1/network/backward_beacon_downloader.go +++ b/cl/phase1/network/backward_beacon_downloader.go @@ -87,7 +87,7 @@ func (b *BackwardBeaconDownloader) RequestMore(ctx context.Context) { start = 0 } - reqInterval := time.NewTicker(100 * time.Millisecond) + reqInterval := time.NewTicker(300 * time.Millisecond) doneRespCh := make(chan []*cltypes.SignedBeaconBlock, 1) var responses []*cltypes.SignedBeaconBlock Loop: @@ -99,6 +99,9 @@ Loop: if err != nil { return } + if responses == nil { + return + } if len(responses) == 0 { b.rpc.BanPeer(peerId) return @@ -128,6 +131,7 @@ Loop: } // No? Reject. if blockRoot != b.expectedRoot { + log.Debug("Gotten unexpected root", "got", blockRoot, "expected", b.expectedRoot) continue } // Yes? then go for the callback. diff --git a/cmd/capcli/cli.go b/cmd/capcli/cli.go index 2764f6b86..e9c70f062 100644 --- a/cmd/capcli/cli.go +++ b/cmd/capcli/cli.go @@ -11,10 +11,16 @@ import ( "github.com/ledgerwatch/erigon/cl/clparams" "github.com/ledgerwatch/erigon/cl/cltypes" persistence2 "github.com/ledgerwatch/erigon/cl/persistence" + "github.com/ledgerwatch/erigon/cmd/caplin/caplin1" + "github.com/ledgerwatch/erigon-lib/common/datadir" + "github.com/ledgerwatch/erigon-lib/downloader/snaptype" + "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv/mdbx" "github.com/ledgerwatch/erigon/cl/persistence" + "github.com/ledgerwatch/erigon/cl/persistence/beacon_indicies" "github.com/ledgerwatch/erigon/cl/persistence/db_config" + "github.com/ledgerwatch/erigon/cl/persistence/format/snapshot_format" "github.com/ledgerwatch/erigon/cl/phase1/core" "github.com/ledgerwatch/erigon/cl/phase1/core/state" "github.com/ledgerwatch/erigon/cl/phase1/network" @@ -39,7 +45,8 @@ var CLI struct { Blocks Blocks `cmd:"" help:"download blocks from reqresp network"` Epochs Epochs `cmd:"" help:"download epochs from reqresp network"` - Chain Chain `cmd:"" help:"download the entire chain from reqresp network"` + Chain Chain `cmd:"" help:"download the entire chain from reqresp network"` + DumpSnapshots DumpSnapshots `cmd:"" help:"generate caplin snapshots"` } type chainCfg struct { @@ -60,7 +67,8 @@ type withSentinel struct { } func (w *withSentinel) connectSentinel() (sentinel.SentinelClient, error) { - gconn, err := grpc.Dial(w.Sentinel, grpc.WithInsecure()) + // YOLO message size + gconn, err := grpc.Dial(w.Sentinel, grpc.WithInsecure(), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt))) if err != nil { return nil, err } @@ -348,19 +356,15 @@ func (c *Chain) Run(ctx *Context) error { log.Root().SetHandler(log.LvlFilterHandler(log.LvlInfo, log.StderrHandler)) log.Info("Started chain download", "chain", c.Chain) - aferoFS, err := openFs(c.Datadir, "caplin/beacon") - if err != nil { - return err - } + dirs := datadir.New(c.Datadir) - db := mdbx.MustOpen("caplin/db") + rawDB := persistence.AferoRawBeaconBlockChainFromOsPath(beaconConfig, dirs.CaplinHistory) + beaconDB, db, err := caplin1.OpenCaplinDatabase(ctx, db_config.DatabaseConfiguration{PruneDepth: math.MaxUint64}, beaconConfig, rawDB, dirs.CaplinIndexing, nil, false) if err != nil { return err } defer db.Close() - beaconDB := persistence.NewBeaconChainDatabaseFilesystem(persistence.NewAferoRawBlockSaver(aferoFS, beaconConfig), nil, beaconConfig) - beacon := rpc.NewBeaconRpcP2P(ctx, s, beaconConfig, genesisConfig) bs, err := core.RetrieveBeaconState(ctx, beaconConfig, genesisConfig, clparams.GetCheckpointSyncEndpoint(networkType)) @@ -373,6 +377,12 @@ func (c *Chain) Run(ctx *Context) error { return err } + if err := db.Update(ctx, func(tx kv.RwTx) error { + return beacon_indicies.WriteHighestFinalized(tx, bs.Slot()) + }); err != nil { + return err + } + err = beacon.SetStatus( genesisConfig.GenesisValidatorRoot, beaconConfig.GenesisEpoch, @@ -387,3 +397,33 @@ func (c *Chain) Run(ctx *Context) error { }, bRoot, bs.Slot(), "/tmp", log.Root()) return stages.SpawnStageHistoryDownload(cfg, ctx, log.Root()) } + +type DumpSnapshots struct { + chainCfg + outputFolder +} + +func (c *DumpSnapshots) Run(ctx *Context) error { + _, _, beaconConfig, _, err := clparams.GetConfigsByNetworkName(c.Chain) + if err != nil { + return err + } + log.Root().SetHandler(log.LvlFilterHandler(log.LvlDebug, log.StderrHandler)) + log.Info("Started chain download", "chain", c.Chain) + + dirs := datadir.New(c.Datadir) + log.Root().SetHandler(log.LvlFilterHandler(log.LvlInfo, log.StderrHandler)) + + rawDB := persistence.AferoRawBeaconBlockChainFromOsPath(beaconConfig, dirs.CaplinHistory) + beaconDB, db, err := caplin1.OpenCaplinDatabase(ctx, db_config.DatabaseConfiguration{PruneDepth: math.MaxUint64}, beaconConfig, rawDB, dirs.CaplinIndexing, nil, false) + if err != nil { + return err + } + var to uint64 + db.View(ctx, func(tx kv.Tx) (err error) { + to, err = beacon_indicies.ReadHighestFinalized(tx) + return + }) + + return snapshot_format.DumpBeaconBlocks(ctx, db, beaconDB, 0, to, snaptype.Erigon2SegmentSize, dirs.Tmp, dirs.Snap, 8, log.LvlInfo, log.Root()) +} diff --git a/erigon-lib/downloader/snaptype/files.go b/erigon-lib/downloader/snaptype/files.go index e45ecd598..102c5a2fc 100644 --- a/erigon-lib/downloader/snaptype/files.go +++ b/erigon-lib/downloader/snaptype/files.go @@ -41,6 +41,7 @@ const ( BorEvents BorSpans NumberOfTypes + BeaconBlocks ) func (ft Type) String() string { @@ -55,6 +56,8 @@ func (ft Type) String() string { return "borevents" case BorSpans: return "borspans" + case BeaconBlocks: + return "beaconblocks" default: panic(fmt.Sprintf("unknown file type: %d", ft)) } @@ -72,6 +75,8 @@ func ParseFileType(s string) (Type, bool) { return BorEvents, true case "borspans": return BorSpans, true + case "beaconblocks": + return BeaconBlocks, true default: return NumberOfTypes, false } diff --git a/erigon-lib/kv/tables.go b/erigon-lib/kv/tables.go index 9afe5a63d..995603bfa 100644 --- a/erigon-lib/kv/tables.go +++ b/erigon-lib/kv/tables.go @@ -441,6 +441,8 @@ const ( // [Block Root] => [Parent Root] BlockRootToParentRoot = "BlockRootToParentRoot" + HighestFinalized = "HighestFinalized" // hash -> transaction/receipt lookup metadata + // BlockRoot => Beacon Block Header BeaconBlockHeaders = "BeaconBlockHeaders" @@ -478,6 +480,7 @@ var ( CurrentBodiesSnapshotBlock = []byte("CurrentBodiesSnapshotBlock") PlainStateVersion = []byte("PlainStateVersion") + HighestFinalizedKey = []byte("HighestFinalized") LightClientStore = []byte("LightClientStore") LightClientFinalityUpdate = []byte("LightClientFinalityUpdate") LightClientOptimisticUpdate = []byte("LightClientOptimisticUpdate") @@ -601,6 +604,7 @@ var ChaindataTables = []string{ StateRootToBlockRoot, BlockRootToParentRoot, BeaconBlockHeaders, + HighestFinalized, Attestetations, LightClient, LightClientUpdates, diff --git a/turbo/snapshotsync/freezeblocks/block_snapshots.go b/turbo/snapshotsync/freezeblocks/block_snapshots.go index 46e3dd32c..cb1bf9f5a 100644 --- a/turbo/snapshotsync/freezeblocks/block_snapshots.go +++ b/turbo/snapshotsync/freezeblocks/block_snapshots.go @@ -2014,6 +2014,71 @@ RETRY: return nil } +func BeaconBlocksIdx(ctx context.Context, segmentFilePath string, blockFrom, blockTo uint64, snapDir string, tmpDir string, p *background.Progress, lvl log.Lvl, logger log.Logger) (err error) { + defer func() { + if rec := recover(); rec != nil { + err = fmt.Errorf("BeaconBlocksIdx: at=%d-%d, %v, %s", blockFrom, blockTo, rec, dbg.Stack()) + } + }() + // Calculate how many records there will be in the index + d, err := compress.NewDecompressor(segmentFilePath) + if err != nil { + return err + } + defer d.Close() + g := d.MakeGetter() + var idxFilePath = filepath.Join(snapDir, snaptype.IdxFileName(blockFrom, blockTo, snaptype.BeaconBlocks.String())) + + var baseSpanId uint64 + if blockFrom > zerothSpanEnd { + baseSpanId = 1 + (blockFrom-zerothSpanEnd-1)/spanLength + } + + rs, err := recsplit.NewRecSplit(recsplit.RecSplitArgs{ + KeyCount: d.Count(), + Enums: d.Count() > 0, + BucketSize: 2000, + LeafSize: 8, + TmpDir: tmpDir, + IndexFile: idxFilePath, + BaseDataID: baseSpanId, + }, logger) + if err != nil { + return err + } + rs.LogLvl(log.LvlDebug) + + defer d.EnableMadvNormal().DisableReadAhead() +RETRY: + g.Reset(0) + var i, offset, nextPos uint64 + var key [8]byte + for g.HasNext() { + nextPos, _ = g.Skip() + binary.BigEndian.PutUint64(key[:], i) + i++ + if err = rs.AddKey(key[:], offset); err != nil { + return err + } + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + offset = nextPos + } + if err = rs.Build(ctx); err != nil { + if errors.Is(err, recsplit.ErrCollision) { + logger.Info("Building recsplit. Collision happened. It's ok. Restarting with another salt...", "err", err) + rs.ResetNextSalt() + goto RETRY + } + return err + } + + return nil +} + // HeadersIdx - headerHash -> offset (analog of kv.HeaderNumber) func HeadersIdx(ctx context.Context, chainConfig *chain.Config, segmentFilePath string, firstBlockNumInSegment uint64, tmpDir string, p *background.Progress, lvl log.Lvl, logger log.Logger) (err error) { defer func() {