diff --git a/cl/antiquary/state_antiquary.go b/cl/antiquary/state_antiquary.go index 3ae2a1dd3..3556cd124 100644 --- a/cl/antiquary/state_antiquary.go +++ b/cl/antiquary/state_antiquary.go @@ -214,10 +214,9 @@ func (s *Antiquary) IncrementBeaconState(ctx context.Context, to uint64) error { return err } // Collect genesis state if we are at genesis - if err := s.collectGenesisState(ctx, compressedWriter, s.currentState, slashings, inactivityScoresC, proposers, minimalBeaconStates, stateEvents, changedValidators); err != nil { + if err := s.collectGenesisState(ctx, compressedWriter, s.currentState, slashings, checkpoints, inactivityScoresC, proposers, minimalBeaconStates, stateEvents, changedValidators); err != nil { return err } - s.balances32 = append(s.balances32, s.currentState.RawBalances()...) } else { start := time.Now() // progress not 0? we need to load the state from the DB @@ -232,8 +231,10 @@ func (s *Antiquary) IncrementBeaconState(ctx context.Context, to uint64) error { return err } log.Info("Recovered Beacon State", "slot", s.currentState.Slot(), "elapsed", end, "root", libcommon.Hash(hashRoot).String()) - s.balances32 = append(s.balances32, s.currentState.RawBalances()...) + } + s.balances32 = s.balances32[:0] + s.balances32 = append(s.balances32, s.currentState.RawBalances()...) } logLvl := log.LvlInfo @@ -379,6 +380,8 @@ func (s *Antiquary) IncrementBeaconState(ctx context.Context, to uint64) error { if err := s.antiquateEffectiveBalances(ctx, slot, s.currentState.RawValidatorSet(), compressedWriter); err != nil { return err } + s.balances32 = s.balances32[:0] + s.balances32 = append(s.balances32, s.currentState.RawBalances()...) } else if slot%s.cfg.SlotsPerEpoch == 0 { if err := s.antiquateBytesListDiff(ctx, key, s.balances32, s.currentState.RawBalances(), balances, base_encoding.ComputeCompressedSerializedUint64ListDiff); err != nil { return err @@ -400,9 +403,10 @@ func (s *Antiquary) IncrementBeaconState(ctx context.Context, to uint64) error { if err := transition.TransitionState(s.currentState, block, fullValidation); err != nil { return err } + first = false - // dump the whole sla + // dump the whole slashings vector. if slashingOccured { if err := s.antiquateFullUint64List(slashings, slot, s.currentState.RawSlashings(), commonBuffer, compressedWriter); err != nil { return err @@ -424,6 +428,9 @@ func (s *Antiquary) IncrementBeaconState(ctx context.Context, to uint64) error { if err := s.antiquateEffectiveBalances(ctx, slot, s.currentState.RawValidatorSet(), compressedWriter); err != nil { return err } + // Reset it as we antiquated it. + s.balances32 = s.balances32[:0] + s.balances32 = append(s.balances32, s.currentState.RawBalances()...) continue } @@ -679,7 +686,7 @@ func getProposerDutiesValue(s *state.CachingBeaconState) []byte { return list } -func (s *Antiquary) collectGenesisState(ctx context.Context, compressor *zstd.Encoder, state *state.CachingBeaconState, slashings, inactivities, proposersCollector, minimalBeaconStateCollector, stateEvents *etl.Collector, changedValidators map[uint64]struct{}) error { +func (s *Antiquary) collectGenesisState(ctx context.Context, compressor *zstd.Encoder, state *state.CachingBeaconState, slashings, checkpoints, inactivities, proposersCollector, minimalBeaconStateCollector, stateEvents *etl.Collector, changedValidators map[uint64]struct{}) error { var err error slot := state.Slot() epoch := slot / s.cfg.SlotsPerEpoch @@ -713,6 +720,15 @@ func (s *Antiquary) collectGenesisState(ctx context.Context, compressor *zstd.En return err } + k := base_encoding.Encode64ToBytes4(s.cfg.RoundSlotToEpoch(slot)) + v := make([]byte, solid.CheckpointSize*3) + copy(v, state.CurrentJustifiedCheckpoint()) + copy(v[solid.CheckpointSize:], state.PreviousJustifiedCheckpoint()) + copy(v[solid.CheckpointSize*2:], state.FinalizedCheckpoint()) + if err := checkpoints.Collect(k, v); err != nil { + return err + } + if state.Version() >= clparams.AltairVersion { // dump inactivity scores if err := s.antiquateFullUint64List(inactivities, slot, state.RawInactivityScores(), &commonBuffer, compressor); err != nil { @@ -764,7 +780,6 @@ func (s *Antiquary) dumpPayload(k []byte, v []byte, c *etl.Collector, b *bytes.B // if err := os.WriteFile("b.txt", b, 0644); err != nil { // s.logger.Error("Failed to write full beacon state", "err", err) // } - // } func flattenRandaoMixes(hashes []libcommon.Hash) []byte { diff --git a/cl/beacon/beaconhttp/api.go b/cl/beacon/beaconhttp/api.go index b0c3d94c3..7c649d579 100644 --- a/cl/beacon/beaconhttp/api.go +++ b/cl/beacon/beaconhttp/api.go @@ -6,10 +6,12 @@ import ( "fmt" "net/http" "reflect" + "strings" "github.com/ledgerwatch/erigon-lib/types/ssz" "github.com/ledgerwatch/erigon/cl/phase1/forkchoice/fork_graph" "github.com/ledgerwatch/log/v3" + "golang.org/x/exp/slices" ) var _ error = EndpointError{} @@ -76,8 +78,9 @@ func HandleEndpoint[T any](h EndpointHandler[T]) http.HandlerFunc { // TODO: ssz handler // TODO: potentially add a context option to buffer these contentType := r.Header.Get("Accept") - switch contentType { - case "application/octet-stream": + contentTypes := strings.Split(contentType, ",") + switch { + case slices.Contains(contentTypes, "application/octet-stream"): sszMarshaler, ok := any(ans).(ssz.Marshaler) if !ok { NewEndpointError(http.StatusBadRequest, "This endpoint does not support SSZ response").WriteTo(w) @@ -90,7 +93,7 @@ func HandleEndpoint[T any](h EndpointHandler[T]) http.HandlerFunc { return } w.Write(encoded) - case "application/json", "": + case contentType == "*/*", contentType == "", slices.Contains(contentTypes, "text/html"), slices.Contains(contentTypes, "application/json"): w.Header().Add("content-type", "application/json") err := json.NewEncoder(w).Encode(ans) if err != nil { diff --git a/cl/beacon/handler/handler.go b/cl/beacon/handler/handler.go index b6703bb7b..05c5ef1c1 100644 --- a/cl/beacon/handler/handler.go +++ b/cl/beacon/handler/handler.go @@ -10,6 +10,7 @@ import ( "github.com/ledgerwatch/erigon/cl/beacon/synced_data" "github.com/ledgerwatch/erigon/cl/clparams" "github.com/ledgerwatch/erigon/cl/persistence" + "github.com/ledgerwatch/erigon/cl/persistence/state/historical_states_reader" "github.com/ledgerwatch/erigon/cl/phase1/forkchoice" "github.com/ledgerwatch/erigon/cl/pool" "github.com/ledgerwatch/erigon/turbo/snapshotsync/freezeblocks" @@ -26,10 +27,11 @@ type ApiHandler struct { forkchoiceStore forkchoice.ForkChoiceStorage operationsPool pool.OperationsPool syncedData *synced_data.SyncedDataManager + stateReader *historical_states_reader.HistoricalStatesReader } -func NewApiHandler(genesisConfig *clparams.GenesisConfig, beaconChainConfig *clparams.BeaconChainConfig, source persistence.RawBeaconBlockChain, indiciesDB kv.RoDB, forkchoiceStore forkchoice.ForkChoiceStorage, operationsPool pool.OperationsPool, rcsn freezeblocks.BeaconSnapshotReader, syncedData *synced_data.SyncedDataManager) *ApiHandler { - return &ApiHandler{o: sync.Once{}, genesisCfg: genesisConfig, beaconChainCfg: beaconChainConfig, indiciesDB: indiciesDB, forkchoiceStore: forkchoiceStore, operationsPool: operationsPool, blockReader: rcsn, syncedData: syncedData} +func NewApiHandler(genesisConfig *clparams.GenesisConfig, beaconChainConfig *clparams.BeaconChainConfig, source persistence.RawBeaconBlockChain, indiciesDB kv.RoDB, forkchoiceStore forkchoice.ForkChoiceStorage, operationsPool pool.OperationsPool, rcsn freezeblocks.BeaconSnapshotReader, syncedData *synced_data.SyncedDataManager, stateReader *historical_states_reader.HistoricalStatesReader) *ApiHandler { + return &ApiHandler{o: sync.Once{}, genesisCfg: genesisConfig, beaconChainCfg: beaconChainConfig, indiciesDB: indiciesDB, forkchoiceStore: forkchoiceStore, operationsPool: operationsPool, blockReader: rcsn, syncedData: syncedData, stateReader: stateReader} } func (a *ApiHandler) init() { @@ -46,10 +48,11 @@ func (a *ApiHandler) init() { r.Get("/fork_schedule", beaconhttp.HandleEndpointFunc(a.getForkSchedule)) }) r.Route("/beacon", func(r chi.Router) { - r.Route("/headers", func(r chi.Router) { - r.Get("/", beaconhttp.HandleEndpointFunc(a.getHeaders)) - r.Get("/{block_id}", beaconhttp.HandleEndpointFunc(a.getHeader)) - }) + // r.Route("/headers", func(r chi.Router) { + // r.Get("/", beaconhttp.HandleEndpointFunc(a.getHeaders)) + // r.Get("/{block_id}", beaconhttp.HandleEndpointFunc(a.getHeader)) + // }) + r.Get("/headers", beaconhttp.HandleEndpointFunc(a.getHeaders)) r.Route("/blocks", func(r chi.Router) { r.Post("/", http.NotFound) r.Get("/{block_id}", beaconhttp.HandleEndpointFunc(a.getBlock)) @@ -72,6 +75,7 @@ func (a *ApiHandler) init() { r.Get("/head/validators/{index}", http.NotFound) // otterscan r.Get("/head/committees", http.NotFound) // otterscan r.Route("/{state_id}", func(r chi.Router) { + r.Get("/finality_checkpoints", beaconhttp.HandleEndpointFunc(a.getFinalityCheckpoints)) r.Get("/validators", http.NotFound) r.Get("/root", beaconhttp.HandleEndpointFunc(a.getStateRoot)) r.Get("/fork", beaconhttp.HandleEndpointFunc(a.getStateFork)) diff --git a/cl/beacon/handler/states.go b/cl/beacon/handler/states.go index 0d0c75d95..01e5c0f26 100644 --- a/cl/beacon/handler/states.go +++ b/cl/beacon/handler/states.go @@ -10,7 +10,9 @@ import ( "github.com/ledgerwatch/erigon/cl/beacon/beaconhttp" "github.com/ledgerwatch/erigon/cl/clparams" "github.com/ledgerwatch/erigon/cl/cltypes" + "github.com/ledgerwatch/erigon/cl/cltypes/solid" "github.com/ledgerwatch/erigon/cl/persistence/beacon_indicies" + state_accessors "github.com/ledgerwatch/erigon/cl/persistence/state" "github.com/ledgerwatch/erigon/cl/utils" ) @@ -180,3 +182,66 @@ func (a *ApiHandler) getFullState(r *http.Request) (*beaconResponse, error) { return newBeaconResponse(state).withFinalized(false).withVersion(state.Version()), nil } + +type finalityCheckpointsResponse struct { + FinalizedCheckpoint solid.Checkpoint `json:"finalized_checkpoint"` + CurrentJustifiedCheckpoint solid.Checkpoint `json:"current_justified_checkpoint"` + PreviousJustifiedCheckpoint solid.Checkpoint `json:"previous_justified_checkpoint"` +} + +func (a *ApiHandler) getFinalityCheckpoints(r *http.Request) (*beaconResponse, error) { + ctx := r.Context() + + tx, err := a.indiciesDB.BeginRo(ctx) + if err != nil { + return nil, err + } + defer tx.Rollback() + blockId, err := stateIdFromRequest(r) + if err != nil { + return nil, beaconhttp.NewEndpointError(http.StatusBadRequest, err.Error()) + } + + root, httpStatus, err := a.rootFromStateId(ctx, tx, blockId) + if err != nil { + return nil, beaconhttp.NewEndpointError(httpStatus, err.Error()) + } + + blockRoot, err := beacon_indicies.ReadBlockRootByStateRoot(tx, root) + if err != nil { + return nil, err + } + + slot, err := beacon_indicies.ReadBlockSlotByBlockRoot(tx, blockRoot) + if err != nil { + return nil, err + } + if slot == nil { + return nil, beaconhttp.NewEndpointError(http.StatusNotFound, fmt.Sprintf("could not read block slot: %x", blockRoot)) + } + + ok, finalizedCheckpoint, currentJustifiedCheckpoint, previousJustifiedCheckpoint := a.forkchoiceStore.GetFinalityCheckpoints(blockRoot) + if err != nil { + return nil, beaconhttp.NewEndpointError(http.StatusBadRequest, err.Error()) + } + if !ok { + currentJustifiedCheckpoint, previousJustifiedCheckpoint, finalizedCheckpoint, err = state_accessors.ReadCheckpoints(tx, a.beaconChainCfg.RoundSlotToEpoch(*slot)) + if err != nil { + return nil, beaconhttp.NewEndpointError(http.StatusBadRequest, err.Error()) + } + if currentJustifiedCheckpoint == nil { + return nil, beaconhttp.NewEndpointError(http.StatusNotFound, fmt.Sprintf("could not read checkpoints: %x, %d", blockRoot, a.beaconChainCfg.RoundSlotToEpoch(*slot))) + } + } + version := a.beaconChainCfg.GetCurrentStateVersion(*slot / a.beaconChainCfg.SlotsPerEpoch) + canonicalRoot, err := beacon_indicies.ReadCanonicalBlockRoot(tx, *slot) + if err != nil { + return nil, err + } + + return newBeaconResponse(finalityCheckpointsResponse{ + FinalizedCheckpoint: finalizedCheckpoint, + CurrentJustifiedCheckpoint: currentJustifiedCheckpoint, + PreviousJustifiedCheckpoint: previousJustifiedCheckpoint, + }).withFinalized(canonicalRoot == root && *slot <= a.forkchoiceStore.FinalizedSlot()).withVersion(version), nil +} diff --git a/cl/persistence/state/historical_states_reader/historical_states_reader.go b/cl/persistence/state/historical_states_reader/historical_states_reader.go index fe804024e..738e74fa7 100644 --- a/cl/persistence/state/historical_states_reader/historical_states_reader.go +++ b/cl/persistence/state/historical_states_reader/historical_states_reader.go @@ -390,7 +390,6 @@ func (r *HistoricalStatesReader) reconstructDiffedUint64List(tx kv.Tx, slot uint return nil, err } - // now start diffing diffCursor, err := tx.Cursor(diffBucket) if err != nil { return nil, err @@ -407,12 +406,10 @@ func (r *HistoricalStatesReader) reconstructDiffedUint64List(tx kv.Tx, slot uint if base_encoding.Decode64FromBytes4(k) > slot { return nil, fmt.Errorf("diff not found for slot %d", slot) } - s := time.Now() currentList, err = base_encoding.ApplyCompressedSerializedUint64ListDiff(currentList, currentList, v) if err != nil { return nil, err } - fmt.Println("diffing", time.Since(s)) } return currentList, err @@ -445,7 +442,6 @@ func (r *HistoricalStatesReader) reconstructBalances(tx kv.Tx, slot uint64, diff return nil, err } roundedSlot := r.cfg.RoundSlotToEpoch(slot) - fmt.Println(roundedSlot, freshDumpSlot) for i := freshDumpSlot; i < roundedSlot; i += r.cfg.SlotsPerEpoch { diff, err := tx.GetOne(diffBucket, base_encoding.Encode64ToBytes4(i)) if err != nil { @@ -454,14 +450,12 @@ func (r *HistoricalStatesReader) reconstructBalances(tx kv.Tx, slot uint64, diff if len(diff) == 0 { continue } - fmt.Println(i) currentList, err = base_encoding.ApplyCompressedSerializedUint64ListDiff(currentList, currentList, diff) if err != nil { return nil, err } } - // now start diffing diffCursor, err := tx.Cursor(diffBucket) if err != nil { return nil, err @@ -478,12 +472,10 @@ func (r *HistoricalStatesReader) reconstructBalances(tx kv.Tx, slot uint64, diff if base_encoding.Decode64FromBytes4(k) > slot { return nil, fmt.Errorf("diff not found for slot %d", slot) } - s := time.Now() currentList, err = base_encoding.ApplyCompressedSerializedUint64ListDiff(currentList, currentList, v) if err != nil { return nil, err } - fmt.Println("diffing", time.Since(s)) } return currentList, err diff --git a/cl/phase1/forkchoice/forkchoice.go b/cl/phase1/forkchoice/forkchoice.go index bbe79bc45..4c0a97536 100644 --- a/cl/phase1/forkchoice/forkchoice.go +++ b/cl/phase1/forkchoice/forkchoice.go @@ -26,6 +26,12 @@ const ( allowedCachedStates = 8 ) +type finalityCheckpoints struct { + finalizedCheckpoint solid.Checkpoint + currentJustifiedCheckpoint solid.Checkpoint + previousJustifiedCheckpoint solid.Checkpoint +} + type preverifiedAppendListsSizes struct { validatorLength uint64 historicalRootsLength uint64 @@ -56,7 +62,8 @@ type ForkChoiceStore struct { // We keep track of them so that we can forkchoice with EL. eth2Roots *lru.Cache[libcommon.Hash, libcommon.Hash] // ETH2 root -> ETH1 hash // preverifid sizes - preverifiedSizes *lru.Cache[libcommon.Hash, preverifiedAppendListsSizes] + preverifiedSizes *lru.Cache[libcommon.Hash, preverifiedAppendListsSizes] + finalityCheckpoints *lru.Cache[libcommon.Hash, finalityCheckpoints] mu sync.Mutex // EL @@ -93,6 +100,12 @@ func NewForkChoiceStore(ctx context.Context, anchorState *state2.CachingBeaconSt if err != nil { return nil, err } + + finalityCheckpoints, err := lru.New[libcommon.Hash, finalityCheckpoints](checkpointsPerCache) + if err != nil { + return nil, err + } + anchorPublicKeys := make([]byte, anchorState.ValidatorLength()*length.Bytes48) for idx := 0; idx < anchorState.ValidatorLength(); idx++ { pk, err := anchorState.ValidatorPublicKey(idx) @@ -133,6 +146,7 @@ func NewForkChoiceStore(ctx context.Context, anchorState *state2.CachingBeaconSt beaconCfg: anchorState.BeaconConfig(), childrens: make(map[libcommon.Hash]childrens), preverifiedSizes: preverifiedSizes, + finalityCheckpoints: finalityCheckpoints, }, nil } @@ -273,3 +287,12 @@ func (f *ForkChoiceStore) PreverifiedHistoricalSummaries(blockRoot libcommon.Has } return 0 } + +func (f *ForkChoiceStore) GetFinalityCheckpoints(blockRoot libcommon.Hash) (bool, solid.Checkpoint, solid.Checkpoint, solid.Checkpoint) { + f.mu.Lock() + defer f.mu.Unlock() + if ret, ok := f.finalityCheckpoints.Get(blockRoot); ok { + return true, ret.finalizedCheckpoint, ret.currentJustifiedCheckpoint, ret.previousJustifiedCheckpoint + } + return false, solid.Checkpoint{}, solid.Checkpoint{}, solid.Checkpoint{} +} diff --git a/cl/phase1/forkchoice/interface.go b/cl/phase1/forkchoice/interface.go index 96d34abd5..79758eb2e 100644 --- a/cl/phase1/forkchoice/interface.go +++ b/cl/phase1/forkchoice/interface.go @@ -27,6 +27,7 @@ type ForkChoiceStorageReader interface { JustifiedSlot() uint64 ProposerBoostRoot() common.Hash GetStateAtBlockRoot(blockRoot libcommon.Hash, alwaysCopy bool) (*state.CachingBeaconState, error) + GetFinalityCheckpoints(blockRoot libcommon.Hash) (bool, solid.Checkpoint, solid.Checkpoint, solid.Checkpoint) Slot() uint64 Time() uint64 diff --git a/cl/phase1/forkchoice/on_block.go b/cl/phase1/forkchoice/on_block.go index 2e709f7b0..5cd183c8a 100644 --- a/cl/phase1/forkchoice/on_block.go +++ b/cl/phase1/forkchoice/on_block.go @@ -80,6 +80,11 @@ func (f *ForkChoiceStore) OnBlock(block *cltypes.SignedBeaconBlock, newPayload, historicalRootsLength: lastProcessedState.HistoricalRootsLength(), historicalSummariesLength: lastProcessedState.HistoricalSummariesLength(), }) + f.finalityCheckpoints.Add(blockRoot, finalityCheckpoints{ + finalizedCheckpoint: lastProcessedState.FinalizedCheckpoint().Copy(), + currentJustifiedCheckpoint: lastProcessedState.CurrentJustifiedCheckpoint().Copy(), + previousJustifiedCheckpoint: lastProcessedState.PreviousJustifiedCheckpoint().Copy(), + }) // Update checkpoints f.updateCheckpoints(lastProcessedState.CurrentJustifiedCheckpoint().Copy(), lastProcessedState.FinalizedCheckpoint().Copy()) // First thing save previous values of the checkpoints (avoid memory copy of all states and ensure easy revert) diff --git a/cmd/caplin/caplin1/run.go b/cmd/caplin/caplin1/run.go index 0aba101d0..0eccd46bf 100644 --- a/cmd/caplin/caplin1/run.go +++ b/cmd/caplin/caplin1/run.go @@ -26,6 +26,7 @@ import ( "github.com/ledgerwatch/erigon/cl/persistence/db_config" "github.com/ledgerwatch/erigon/cl/persistence/format/snapshot_format" state_accessors "github.com/ledgerwatch/erigon/cl/persistence/state" + "github.com/ledgerwatch/erigon/cl/persistence/state/historical_states_reader" "github.com/ledgerwatch/erigon/cl/phase1/core/state" "github.com/ledgerwatch/erigon/cl/phase1/execution_client" "github.com/ledgerwatch/erigon/cl/phase1/forkchoice" @@ -148,21 +149,6 @@ func RunCaplinPhase1(ctx context.Context, sentinel sentinel.SentinelClient, engi }() } - syncedDataManager := synced_data.NewSyncedDataManager(cfg.Active, beaconConfig) - if cfg.Active { - apiHandler := handler.NewApiHandler(genesisConfig, beaconConfig, rawDB, db, forkChoice, pool, rcsn, syncedDataManager) - headApiHandler := &validatorapi.ValidatorApiHandler{ - FC: forkChoice, - BeaconChainCfg: beaconConfig, - GenesisCfg: genesisConfig, - } - go beacon.ListenAndServe(&beacon.LayeredBeaconHandler{ - ValidatorApi: headApiHandler, - ArchiveApi: apiHandler, - }, cfg) - log.Info("Beacon API started", "addr", cfg.Address) - } - { // start the gossip manager go gossipManager.Start(ctx) logger.Info("Started Ethereum 2.0 Gossip Service") @@ -226,6 +212,22 @@ func RunCaplinPhase1(ctx context.Context, sentinel sentinel.SentinelClient, engi return err } + statesReader := historical_states_reader.NewHistoricalStatesReader(beaconConfig, rcsn, vTables, af, genesisState) + syncedDataManager := synced_data.NewSyncedDataManager(cfg.Active, beaconConfig) + if cfg.Active { + apiHandler := handler.NewApiHandler(genesisConfig, beaconConfig, rawDB, db, forkChoice, pool, rcsn, syncedDataManager, statesReader) + headApiHandler := &validatorapi.ValidatorApiHandler{ + FC: forkChoice, + BeaconChainCfg: beaconConfig, + GenesisCfg: genesisConfig, + } + go beacon.ListenAndServe(&beacon.LayeredBeaconHandler{ + ValidatorApi: headApiHandler, + ArchiveApi: apiHandler, + }, cfg) + log.Info("Beacon API started", "addr", cfg.Address) + } + stageCfg := stages.ClStagesCfg(beaconRpc, antiq, genesisConfig, beaconConfig, state, engine, gossipManager, forkChoice, beaconDB, db, csn, dirs.Tmp, dbConfig, backfilling, syncedDataManager) sync := stages.ConsensusClStages(ctx, stageCfg)