Added Finality_Checkpoints endpoint (#8979)

This commit is contained in:
Giulio rebuffo 2023-12-15 02:27:27 +01:00 committed by GitHub
parent e018bb062a
commit eeb471d800
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 149 additions and 39 deletions

View File

@ -214,10 +214,9 @@ func (s *Antiquary) IncrementBeaconState(ctx context.Context, to uint64) error {
return err
}
// Collect genesis state if we are at genesis
if err := s.collectGenesisState(ctx, compressedWriter, s.currentState, slashings, inactivityScoresC, proposers, minimalBeaconStates, stateEvents, changedValidators); err != nil {
if err := s.collectGenesisState(ctx, compressedWriter, s.currentState, slashings, checkpoints, inactivityScoresC, proposers, minimalBeaconStates, stateEvents, changedValidators); err != nil {
return err
}
s.balances32 = append(s.balances32, s.currentState.RawBalances()...)
} else {
start := time.Now()
// progress not 0? we need to load the state from the DB
@ -232,8 +231,10 @@ func (s *Antiquary) IncrementBeaconState(ctx context.Context, to uint64) error {
return err
}
log.Info("Recovered Beacon State", "slot", s.currentState.Slot(), "elapsed", end, "root", libcommon.Hash(hashRoot).String())
s.balances32 = append(s.balances32, s.currentState.RawBalances()...)
}
s.balances32 = s.balances32[:0]
s.balances32 = append(s.balances32, s.currentState.RawBalances()...)
}
logLvl := log.LvlInfo
@ -379,6 +380,8 @@ func (s *Antiquary) IncrementBeaconState(ctx context.Context, to uint64) error {
if err := s.antiquateEffectiveBalances(ctx, slot, s.currentState.RawValidatorSet(), compressedWriter); err != nil {
return err
}
s.balances32 = s.balances32[:0]
s.balances32 = append(s.balances32, s.currentState.RawBalances()...)
} else if slot%s.cfg.SlotsPerEpoch == 0 {
if err := s.antiquateBytesListDiff(ctx, key, s.balances32, s.currentState.RawBalances(), balances, base_encoding.ComputeCompressedSerializedUint64ListDiff); err != nil {
return err
@ -400,9 +403,10 @@ func (s *Antiquary) IncrementBeaconState(ctx context.Context, to uint64) error {
if err := transition.TransitionState(s.currentState, block, fullValidation); err != nil {
return err
}
first = false
// dump the whole sla
// dump the whole slashings vector.
if slashingOccured {
if err := s.antiquateFullUint64List(slashings, slot, s.currentState.RawSlashings(), commonBuffer, compressedWriter); err != nil {
return err
@ -424,6 +428,9 @@ func (s *Antiquary) IncrementBeaconState(ctx context.Context, to uint64) error {
if err := s.antiquateEffectiveBalances(ctx, slot, s.currentState.RawValidatorSet(), compressedWriter); err != nil {
return err
}
// Reset it as we antiquated it.
s.balances32 = s.balances32[:0]
s.balances32 = append(s.balances32, s.currentState.RawBalances()...)
continue
}
@ -679,7 +686,7 @@ func getProposerDutiesValue(s *state.CachingBeaconState) []byte {
return list
}
func (s *Antiquary) collectGenesisState(ctx context.Context, compressor *zstd.Encoder, state *state.CachingBeaconState, slashings, inactivities, proposersCollector, minimalBeaconStateCollector, stateEvents *etl.Collector, changedValidators map[uint64]struct{}) error {
func (s *Antiquary) collectGenesisState(ctx context.Context, compressor *zstd.Encoder, state *state.CachingBeaconState, slashings, checkpoints, inactivities, proposersCollector, minimalBeaconStateCollector, stateEvents *etl.Collector, changedValidators map[uint64]struct{}) error {
var err error
slot := state.Slot()
epoch := slot / s.cfg.SlotsPerEpoch
@ -713,6 +720,15 @@ func (s *Antiquary) collectGenesisState(ctx context.Context, compressor *zstd.En
return err
}
k := base_encoding.Encode64ToBytes4(s.cfg.RoundSlotToEpoch(slot))
v := make([]byte, solid.CheckpointSize*3)
copy(v, state.CurrentJustifiedCheckpoint())
copy(v[solid.CheckpointSize:], state.PreviousJustifiedCheckpoint())
copy(v[solid.CheckpointSize*2:], state.FinalizedCheckpoint())
if err := checkpoints.Collect(k, v); err != nil {
return err
}
if state.Version() >= clparams.AltairVersion {
// dump inactivity scores
if err := s.antiquateFullUint64List(inactivities, slot, state.RawInactivityScores(), &commonBuffer, compressor); err != nil {
@ -764,7 +780,6 @@ func (s *Antiquary) dumpPayload(k []byte, v []byte, c *etl.Collector, b *bytes.B
// if err := os.WriteFile("b.txt", b, 0644); err != nil {
// s.logger.Error("Failed to write full beacon state", "err", err)
// }
// }
func flattenRandaoMixes(hashes []libcommon.Hash) []byte {

View File

@ -6,10 +6,12 @@ import (
"fmt"
"net/http"
"reflect"
"strings"
"github.com/ledgerwatch/erigon-lib/types/ssz"
"github.com/ledgerwatch/erigon/cl/phase1/forkchoice/fork_graph"
"github.com/ledgerwatch/log/v3"
"golang.org/x/exp/slices"
)
var _ error = EndpointError{}
@ -76,8 +78,9 @@ func HandleEndpoint[T any](h EndpointHandler[T]) http.HandlerFunc {
// TODO: ssz handler
// TODO: potentially add a context option to buffer these
contentType := r.Header.Get("Accept")
switch contentType {
case "application/octet-stream":
contentTypes := strings.Split(contentType, ",")
switch {
case slices.Contains(contentTypes, "application/octet-stream"):
sszMarshaler, ok := any(ans).(ssz.Marshaler)
if !ok {
NewEndpointError(http.StatusBadRequest, "This endpoint does not support SSZ response").WriteTo(w)
@ -90,7 +93,7 @@ func HandleEndpoint[T any](h EndpointHandler[T]) http.HandlerFunc {
return
}
w.Write(encoded)
case "application/json", "":
case contentType == "*/*", contentType == "", slices.Contains(contentTypes, "text/html"), slices.Contains(contentTypes, "application/json"):
w.Header().Add("content-type", "application/json")
err := json.NewEncoder(w).Encode(ans)
if err != nil {

View File

@ -10,6 +10,7 @@ import (
"github.com/ledgerwatch/erigon/cl/beacon/synced_data"
"github.com/ledgerwatch/erigon/cl/clparams"
"github.com/ledgerwatch/erigon/cl/persistence"
"github.com/ledgerwatch/erigon/cl/persistence/state/historical_states_reader"
"github.com/ledgerwatch/erigon/cl/phase1/forkchoice"
"github.com/ledgerwatch/erigon/cl/pool"
"github.com/ledgerwatch/erigon/turbo/snapshotsync/freezeblocks"
@ -26,10 +27,11 @@ type ApiHandler struct {
forkchoiceStore forkchoice.ForkChoiceStorage
operationsPool pool.OperationsPool
syncedData *synced_data.SyncedDataManager
stateReader *historical_states_reader.HistoricalStatesReader
}
func NewApiHandler(genesisConfig *clparams.GenesisConfig, beaconChainConfig *clparams.BeaconChainConfig, source persistence.RawBeaconBlockChain, indiciesDB kv.RoDB, forkchoiceStore forkchoice.ForkChoiceStorage, operationsPool pool.OperationsPool, rcsn freezeblocks.BeaconSnapshotReader, syncedData *synced_data.SyncedDataManager) *ApiHandler {
return &ApiHandler{o: sync.Once{}, genesisCfg: genesisConfig, beaconChainCfg: beaconChainConfig, indiciesDB: indiciesDB, forkchoiceStore: forkchoiceStore, operationsPool: operationsPool, blockReader: rcsn, syncedData: syncedData}
func NewApiHandler(genesisConfig *clparams.GenesisConfig, beaconChainConfig *clparams.BeaconChainConfig, source persistence.RawBeaconBlockChain, indiciesDB kv.RoDB, forkchoiceStore forkchoice.ForkChoiceStorage, operationsPool pool.OperationsPool, rcsn freezeblocks.BeaconSnapshotReader, syncedData *synced_data.SyncedDataManager, stateReader *historical_states_reader.HistoricalStatesReader) *ApiHandler {
return &ApiHandler{o: sync.Once{}, genesisCfg: genesisConfig, beaconChainCfg: beaconChainConfig, indiciesDB: indiciesDB, forkchoiceStore: forkchoiceStore, operationsPool: operationsPool, blockReader: rcsn, syncedData: syncedData, stateReader: stateReader}
}
func (a *ApiHandler) init() {
@ -46,10 +48,11 @@ func (a *ApiHandler) init() {
r.Get("/fork_schedule", beaconhttp.HandleEndpointFunc(a.getForkSchedule))
})
r.Route("/beacon", func(r chi.Router) {
r.Route("/headers", func(r chi.Router) {
r.Get("/", beaconhttp.HandleEndpointFunc(a.getHeaders))
r.Get("/{block_id}", beaconhttp.HandleEndpointFunc(a.getHeader))
})
// r.Route("/headers", func(r chi.Router) {
// r.Get("/", beaconhttp.HandleEndpointFunc(a.getHeaders))
// r.Get("/{block_id}", beaconhttp.HandleEndpointFunc(a.getHeader))
// })
r.Get("/headers", beaconhttp.HandleEndpointFunc(a.getHeaders))
r.Route("/blocks", func(r chi.Router) {
r.Post("/", http.NotFound)
r.Get("/{block_id}", beaconhttp.HandleEndpointFunc(a.getBlock))
@ -72,6 +75,7 @@ func (a *ApiHandler) init() {
r.Get("/head/validators/{index}", http.NotFound) // otterscan
r.Get("/head/committees", http.NotFound) // otterscan
r.Route("/{state_id}", func(r chi.Router) {
r.Get("/finality_checkpoints", beaconhttp.HandleEndpointFunc(a.getFinalityCheckpoints))
r.Get("/validators", http.NotFound)
r.Get("/root", beaconhttp.HandleEndpointFunc(a.getStateRoot))
r.Get("/fork", beaconhttp.HandleEndpointFunc(a.getStateFork))

View File

@ -10,7 +10,9 @@ import (
"github.com/ledgerwatch/erigon/cl/beacon/beaconhttp"
"github.com/ledgerwatch/erigon/cl/clparams"
"github.com/ledgerwatch/erigon/cl/cltypes"
"github.com/ledgerwatch/erigon/cl/cltypes/solid"
"github.com/ledgerwatch/erigon/cl/persistence/beacon_indicies"
state_accessors "github.com/ledgerwatch/erigon/cl/persistence/state"
"github.com/ledgerwatch/erigon/cl/utils"
)
@ -180,3 +182,66 @@ func (a *ApiHandler) getFullState(r *http.Request) (*beaconResponse, error) {
return newBeaconResponse(state).withFinalized(false).withVersion(state.Version()), nil
}
type finalityCheckpointsResponse struct {
FinalizedCheckpoint solid.Checkpoint `json:"finalized_checkpoint"`
CurrentJustifiedCheckpoint solid.Checkpoint `json:"current_justified_checkpoint"`
PreviousJustifiedCheckpoint solid.Checkpoint `json:"previous_justified_checkpoint"`
}
func (a *ApiHandler) getFinalityCheckpoints(r *http.Request) (*beaconResponse, error) {
ctx := r.Context()
tx, err := a.indiciesDB.BeginRo(ctx)
if err != nil {
return nil, err
}
defer tx.Rollback()
blockId, err := stateIdFromRequest(r)
if err != nil {
return nil, beaconhttp.NewEndpointError(http.StatusBadRequest, err.Error())
}
root, httpStatus, err := a.rootFromStateId(ctx, tx, blockId)
if err != nil {
return nil, beaconhttp.NewEndpointError(httpStatus, err.Error())
}
blockRoot, err := beacon_indicies.ReadBlockRootByStateRoot(tx, root)
if err != nil {
return nil, err
}
slot, err := beacon_indicies.ReadBlockSlotByBlockRoot(tx, blockRoot)
if err != nil {
return nil, err
}
if slot == nil {
return nil, beaconhttp.NewEndpointError(http.StatusNotFound, fmt.Sprintf("could not read block slot: %x", blockRoot))
}
ok, finalizedCheckpoint, currentJustifiedCheckpoint, previousJustifiedCheckpoint := a.forkchoiceStore.GetFinalityCheckpoints(blockRoot)
if err != nil {
return nil, beaconhttp.NewEndpointError(http.StatusBadRequest, err.Error())
}
if !ok {
currentJustifiedCheckpoint, previousJustifiedCheckpoint, finalizedCheckpoint, err = state_accessors.ReadCheckpoints(tx, a.beaconChainCfg.RoundSlotToEpoch(*slot))
if err != nil {
return nil, beaconhttp.NewEndpointError(http.StatusBadRequest, err.Error())
}
if currentJustifiedCheckpoint == nil {
return nil, beaconhttp.NewEndpointError(http.StatusNotFound, fmt.Sprintf("could not read checkpoints: %x, %d", blockRoot, a.beaconChainCfg.RoundSlotToEpoch(*slot)))
}
}
version := a.beaconChainCfg.GetCurrentStateVersion(*slot / a.beaconChainCfg.SlotsPerEpoch)
canonicalRoot, err := beacon_indicies.ReadCanonicalBlockRoot(tx, *slot)
if err != nil {
return nil, err
}
return newBeaconResponse(finalityCheckpointsResponse{
FinalizedCheckpoint: finalizedCheckpoint,
CurrentJustifiedCheckpoint: currentJustifiedCheckpoint,
PreviousJustifiedCheckpoint: previousJustifiedCheckpoint,
}).withFinalized(canonicalRoot == root && *slot <= a.forkchoiceStore.FinalizedSlot()).withVersion(version), nil
}

View File

@ -390,7 +390,6 @@ func (r *HistoricalStatesReader) reconstructDiffedUint64List(tx kv.Tx, slot uint
return nil, err
}
// now start diffing
diffCursor, err := tx.Cursor(diffBucket)
if err != nil {
return nil, err
@ -407,12 +406,10 @@ func (r *HistoricalStatesReader) reconstructDiffedUint64List(tx kv.Tx, slot uint
if base_encoding.Decode64FromBytes4(k) > slot {
return nil, fmt.Errorf("diff not found for slot %d", slot)
}
s := time.Now()
currentList, err = base_encoding.ApplyCompressedSerializedUint64ListDiff(currentList, currentList, v)
if err != nil {
return nil, err
}
fmt.Println("diffing", time.Since(s))
}
return currentList, err
@ -445,7 +442,6 @@ func (r *HistoricalStatesReader) reconstructBalances(tx kv.Tx, slot uint64, diff
return nil, err
}
roundedSlot := r.cfg.RoundSlotToEpoch(slot)
fmt.Println(roundedSlot, freshDumpSlot)
for i := freshDumpSlot; i < roundedSlot; i += r.cfg.SlotsPerEpoch {
diff, err := tx.GetOne(diffBucket, base_encoding.Encode64ToBytes4(i))
if err != nil {
@ -454,14 +450,12 @@ func (r *HistoricalStatesReader) reconstructBalances(tx kv.Tx, slot uint64, diff
if len(diff) == 0 {
continue
}
fmt.Println(i)
currentList, err = base_encoding.ApplyCompressedSerializedUint64ListDiff(currentList, currentList, diff)
if err != nil {
return nil, err
}
}
// now start diffing
diffCursor, err := tx.Cursor(diffBucket)
if err != nil {
return nil, err
@ -478,12 +472,10 @@ func (r *HistoricalStatesReader) reconstructBalances(tx kv.Tx, slot uint64, diff
if base_encoding.Decode64FromBytes4(k) > slot {
return nil, fmt.Errorf("diff not found for slot %d", slot)
}
s := time.Now()
currentList, err = base_encoding.ApplyCompressedSerializedUint64ListDiff(currentList, currentList, v)
if err != nil {
return nil, err
}
fmt.Println("diffing", time.Since(s))
}
return currentList, err

View File

@ -26,6 +26,12 @@ const (
allowedCachedStates = 8
)
type finalityCheckpoints struct {
finalizedCheckpoint solid.Checkpoint
currentJustifiedCheckpoint solid.Checkpoint
previousJustifiedCheckpoint solid.Checkpoint
}
type preverifiedAppendListsSizes struct {
validatorLength uint64
historicalRootsLength uint64
@ -56,7 +62,8 @@ type ForkChoiceStore struct {
// We keep track of them so that we can forkchoice with EL.
eth2Roots *lru.Cache[libcommon.Hash, libcommon.Hash] // ETH2 root -> ETH1 hash
// preverifid sizes
preverifiedSizes *lru.Cache[libcommon.Hash, preverifiedAppendListsSizes]
preverifiedSizes *lru.Cache[libcommon.Hash, preverifiedAppendListsSizes]
finalityCheckpoints *lru.Cache[libcommon.Hash, finalityCheckpoints]
mu sync.Mutex
// EL
@ -93,6 +100,12 @@ func NewForkChoiceStore(ctx context.Context, anchorState *state2.CachingBeaconSt
if err != nil {
return nil, err
}
finalityCheckpoints, err := lru.New[libcommon.Hash, finalityCheckpoints](checkpointsPerCache)
if err != nil {
return nil, err
}
anchorPublicKeys := make([]byte, anchorState.ValidatorLength()*length.Bytes48)
for idx := 0; idx < anchorState.ValidatorLength(); idx++ {
pk, err := anchorState.ValidatorPublicKey(idx)
@ -133,6 +146,7 @@ func NewForkChoiceStore(ctx context.Context, anchorState *state2.CachingBeaconSt
beaconCfg: anchorState.BeaconConfig(),
childrens: make(map[libcommon.Hash]childrens),
preverifiedSizes: preverifiedSizes,
finalityCheckpoints: finalityCheckpoints,
}, nil
}
@ -273,3 +287,12 @@ func (f *ForkChoiceStore) PreverifiedHistoricalSummaries(blockRoot libcommon.Has
}
return 0
}
func (f *ForkChoiceStore) GetFinalityCheckpoints(blockRoot libcommon.Hash) (bool, solid.Checkpoint, solid.Checkpoint, solid.Checkpoint) {
f.mu.Lock()
defer f.mu.Unlock()
if ret, ok := f.finalityCheckpoints.Get(blockRoot); ok {
return true, ret.finalizedCheckpoint, ret.currentJustifiedCheckpoint, ret.previousJustifiedCheckpoint
}
return false, solid.Checkpoint{}, solid.Checkpoint{}, solid.Checkpoint{}
}

View File

@ -27,6 +27,7 @@ type ForkChoiceStorageReader interface {
JustifiedSlot() uint64
ProposerBoostRoot() common.Hash
GetStateAtBlockRoot(blockRoot libcommon.Hash, alwaysCopy bool) (*state.CachingBeaconState, error)
GetFinalityCheckpoints(blockRoot libcommon.Hash) (bool, solid.Checkpoint, solid.Checkpoint, solid.Checkpoint)
Slot() uint64
Time() uint64

View File

@ -80,6 +80,11 @@ func (f *ForkChoiceStore) OnBlock(block *cltypes.SignedBeaconBlock, newPayload,
historicalRootsLength: lastProcessedState.HistoricalRootsLength(),
historicalSummariesLength: lastProcessedState.HistoricalSummariesLength(),
})
f.finalityCheckpoints.Add(blockRoot, finalityCheckpoints{
finalizedCheckpoint: lastProcessedState.FinalizedCheckpoint().Copy(),
currentJustifiedCheckpoint: lastProcessedState.CurrentJustifiedCheckpoint().Copy(),
previousJustifiedCheckpoint: lastProcessedState.PreviousJustifiedCheckpoint().Copy(),
})
// Update checkpoints
f.updateCheckpoints(lastProcessedState.CurrentJustifiedCheckpoint().Copy(), lastProcessedState.FinalizedCheckpoint().Copy())
// First thing save previous values of the checkpoints (avoid memory copy of all states and ensure easy revert)

View File

@ -26,6 +26,7 @@ import (
"github.com/ledgerwatch/erigon/cl/persistence/db_config"
"github.com/ledgerwatch/erigon/cl/persistence/format/snapshot_format"
state_accessors "github.com/ledgerwatch/erigon/cl/persistence/state"
"github.com/ledgerwatch/erigon/cl/persistence/state/historical_states_reader"
"github.com/ledgerwatch/erigon/cl/phase1/core/state"
"github.com/ledgerwatch/erigon/cl/phase1/execution_client"
"github.com/ledgerwatch/erigon/cl/phase1/forkchoice"
@ -148,21 +149,6 @@ func RunCaplinPhase1(ctx context.Context, sentinel sentinel.SentinelClient, engi
}()
}
syncedDataManager := synced_data.NewSyncedDataManager(cfg.Active, beaconConfig)
if cfg.Active {
apiHandler := handler.NewApiHandler(genesisConfig, beaconConfig, rawDB, db, forkChoice, pool, rcsn, syncedDataManager)
headApiHandler := &validatorapi.ValidatorApiHandler{
FC: forkChoice,
BeaconChainCfg: beaconConfig,
GenesisCfg: genesisConfig,
}
go beacon.ListenAndServe(&beacon.LayeredBeaconHandler{
ValidatorApi: headApiHandler,
ArchiveApi: apiHandler,
}, cfg)
log.Info("Beacon API started", "addr", cfg.Address)
}
{ // start the gossip manager
go gossipManager.Start(ctx)
logger.Info("Started Ethereum 2.0 Gossip Service")
@ -226,6 +212,22 @@ func RunCaplinPhase1(ctx context.Context, sentinel sentinel.SentinelClient, engi
return err
}
statesReader := historical_states_reader.NewHistoricalStatesReader(beaconConfig, rcsn, vTables, af, genesisState)
syncedDataManager := synced_data.NewSyncedDataManager(cfg.Active, beaconConfig)
if cfg.Active {
apiHandler := handler.NewApiHandler(genesisConfig, beaconConfig, rawDB, db, forkChoice, pool, rcsn, syncedDataManager, statesReader)
headApiHandler := &validatorapi.ValidatorApiHandler{
FC: forkChoice,
BeaconChainCfg: beaconConfig,
GenesisCfg: genesisConfig,
}
go beacon.ListenAndServe(&beacon.LayeredBeaconHandler{
ValidatorApi: headApiHandler,
ArchiveApi: apiHandler,
}, cfg)
log.Info("Beacon API started", "addr", cfg.Address)
}
stageCfg := stages.ClStagesCfg(beaconRpc, antiq, genesisConfig, beaconConfig, state, engine, gossipManager, forkChoice, beaconDB, db, csn, dirs.Tmp, dbConfig, backfilling, syncedDataManager)
sync := stages.ConsensusClStages(ctx, stageCfg)