Resumable beacon state reconstruction (#8918)

* Most of the PR changed files are extra and slightly more complicated
unit tests.
* Fixed Eth1DataVotes not inheriting genesis
* Fixed Attestations simulation using wrong slot when reconstructing
partecipation
* Fixed Copy() operation on BeaconState on Eth1DataVotes
* Used correct ListSSZ type for Eth1DataVotes and HistoricalSummaries
* Fixed wrong []uint64 deltas on empty slots
This commit is contained in:
Giulio rebuffo 2023-12-11 14:07:57 +01:00 committed by GitHub
parent 7fb8f9db59
commit 24987878e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
126 changed files with 466 additions and 114 deletions

View File

@ -23,6 +23,7 @@ import (
"github.com/ledgerwatch/erigon/cl/persistence/base_encoding"
"github.com/ledgerwatch/erigon/cl/persistence/beacon_indicies"
state_accessors "github.com/ledgerwatch/erigon/cl/persistence/state"
"github.com/ledgerwatch/erigon/cl/persistence/state/historical_states_reader"
"github.com/ledgerwatch/erigon/cl/phase1/core/state"
"github.com/ledgerwatch/erigon/cl/phase1/core/state/raw"
"github.com/ledgerwatch/erigon/cl/phase1/core/state/shuffling"
@ -134,6 +135,7 @@ func uint64BalancesList(s *state.CachingBeaconState, out []uint64) []uint64 {
func (s *Antiquary) IncrementBeaconState(ctx context.Context, to uint64) error {
var tx kv.Tx
tx, err := s.mainDB.BeginRo(ctx)
if err != nil {
return err
@ -182,6 +184,20 @@ func (s *Antiquary) IncrementBeaconState(ctx context.Context, to uint64) error {
stateEvents := etl.NewCollector(kv.StateEvents, s.dirs.Tmp, etl.NewSortableBuffer(etl.BufferOptimalSize), s.logger)
defer stateEvents.Close()
progress, err := state_accessors.GetStateProcessingProgress(tx)
if err != nil {
return err
}
// Go back a little bit
if progress > s.cfg.SlotsPerEpoch*2 {
progress -= s.cfg.SlotsPerEpoch * 2
} else {
progress = 0
}
progress, err = findNearestSlotBackwards(tx, progress) // Maybe the guess was a missed slot.
if err != nil {
return err
}
// buffers
commonBuffer := &bytes.Buffer{}
compressedWriter, err := zstd.NewWriter(commonBuffer, zstd.WithEncoderLevel(zstd.SpeedBetterCompression))
@ -189,18 +205,35 @@ func (s *Antiquary) IncrementBeaconState(ctx context.Context, to uint64) error {
return err
}
defer compressedWriter.Close()
// TODO(Giulio2002): also store genesis information and resume from state.
if s.currentState == nil {
s.currentState, err = s.genesisState.Copy()
if err != nil {
return err
}
// Collect genesis state if we are at genesis
if err := s.collectGenesisState(ctx, compressedWriter, s.currentState, slashings, proposers, minimalBeaconStates, stateEvents, changedValidators); err != nil {
return err
// progress is 0 when we are at genesis
if progress == 0 {
s.currentState, err = s.genesisState.Copy()
if err != nil {
return err
}
// Collect genesis state if we are at genesis
if err := s.collectGenesisState(ctx, compressedWriter, s.currentState, slashings, proposers, minimalBeaconStates, stateEvents, changedValidators); err != nil {
return err
}
} else {
start := time.Now()
// progress not 0? we need to load the state from the DB
historicalReader := historical_states_reader.NewHistoricalStatesReader(s.cfg, s.snReader, s.validatorsTable, s.fs, s.genesisState)
s.currentState, err = historicalReader.ReadHistoricalState(ctx, tx, progress)
if err != nil {
return fmt.Errorf("failed to read historical state at slot %d: %w", progress, err)
}
end := time.Since(start)
hashRoot, err := s.currentState.HashSSZ()
if err != nil {
return err
}
log.Info("Recovered Beacon State", "slot", s.currentState.Slot(), "elapsed", end, "root", libcommon.Hash(hashRoot).String())
}
}
logLvl := log.LvlInfo
if to-s.currentState.Slot() < 96 {
logLvl = log.LvlDebug
@ -256,8 +289,11 @@ func (s *Antiquary) IncrementBeaconState(ctx context.Context, to uint64) error {
return s.validatorsTable.AddWithdrawalCredentials(uint64(index), slot, libcommon.BytesToHash(wc))
},
OnEpochBoundary: func(epoch uint64) error {
v := append(s.currentState.CurrentJustifiedCheckpoint(), append(s.currentState.PreviousJustifiedCheckpoint(), s.currentState.FinalizedCheckpoint()...)...)
k := base_encoding.Encode64ToBytes4(s.cfg.RoundSlotToEpoch(slot))
v := make([]byte, solid.CheckpointSize*3)
copy(v, s.currentState.CurrentJustifiedCheckpoint())
copy(v[solid.CheckpointSize:], s.currentState.PreviousJustifiedCheckpoint())
copy(v[solid.CheckpointSize*2:], s.currentState.FinalizedCheckpoint())
if err := checkpoints.Collect(k, v); err != nil {
return err
}
@ -302,9 +338,11 @@ func (s *Antiquary) IncrementBeaconState(ctx context.Context, to uint64) error {
progressTimer := time.NewTicker(1 * time.Minute)
defer progressTimer.Stop()
prevSlot := slot
first := false
// This tells us that transition and operations do not happen concurrently and access is safe, so we can optimize for GC.
// there is optimized custom cache to recycle big GC overhead.
for ; slot < to; slot++ {
isDumpSlot := slot%clparams.SlotsPerDump == 0
block, err := s.snReader.ReadBlockBySlot(ctx, tx, slot)
if err != nil {
return err
@ -329,11 +367,11 @@ func (s *Antiquary) IncrementBeaconState(ctx context.Context, to uint64) error {
}
}
if slot%clparams.SlotsPerDump == 0 {
if isDumpSlot && block == nil {
if err := s.antiquateField(ctx, slot, s.currentState.RawBalances(), compressedWriter, "balances"); err != nil {
return err
}
if err := s.antiquateEffectiveBalances(ctx, slot, s.currentState.RawBalances(), compressedWriter); err != nil {
if err := s.antiquateEffectiveBalances(ctx, slot, s.currentState.RawValidatorSet(), compressedWriter); err != nil {
return err
}
if s.currentState.Version() >= clparams.AltairVersion {
@ -360,11 +398,15 @@ func (s *Antiquary) IncrementBeaconState(ctx context.Context, to uint64) error {
slashingsBytes = slashingsBytes[:0]
slashingsBytes = append(slashingsBytes, s.currentState.RawSlashings()...)
// We sanity check the state every 100k slots.
if err := transition.TransitionState(s.currentState, block, slot%100_000 == 0); err != nil {
fullValidation := slot%100_000 == 0 || first
// We sanity check the state every 100k slots or when we start.
if err := transition.TransitionState(s.currentState, block, fullValidation); err != nil {
return err
}
first = false
// if s.currentState.Slot() == 3868670 {
// s.dumpFullBeaconState()
// }
if err := s.storeMinimalState(commonBuffer, s.currentState, minimalBeaconStates); err != nil {
return err
}
@ -372,7 +414,22 @@ func (s *Antiquary) IncrementBeaconState(ctx context.Context, to uint64) error {
return err
}
events.Reset()
if slot%clparams.SlotsPerDump == 0 {
if isDumpSlot {
if err := s.antiquateField(ctx, slot, s.currentState.RawBalances(), compressedWriter, "balances"); err != nil {
return err
}
if err := s.antiquateEffectiveBalances(ctx, slot, s.currentState.RawValidatorSet(), compressedWriter); err != nil {
return err
}
if s.currentState.Version() >= clparams.AltairVersion {
if err := s.antiquateField(ctx, slot, s.currentState.RawInactivityScores(), compressedWriter, "inactivity_scores"); err != nil {
return err
}
}
if err := s.antiquateFullSlashings(slashings, slot, s.currentState.RawSlashings(), commonBuffer, compressedWriter); err != nil {
return err
}
continue
}
@ -503,9 +560,16 @@ func (s *Antiquary) IncrementBeaconState(ctx context.Context, to uint64) error {
if err != nil {
return err
}
log.Info("Historical antiquated", "slot", s.currentState.Slot(), "latency", time.Since(start))
return rwTx.Commit()
if err := rwTx.Commit(); err != nil {
return err
}
endTime := time.Since(start)
stateRoot, err := s.currentState.HashSSZ()
if err != nil {
return err
}
log.Info("Historical states antiquated", "slot", s.currentState.Slot(), "root", libcommon.Hash(stateRoot), "latency", endTime)
return nil
}
func (s *Antiquary) antiquateField(ctx context.Context, slot uint64, uncompressed []byte, compressor *zstd.Encoder, name string) error {
@ -700,6 +764,19 @@ func (s *Antiquary) dumpPayload(k []byte, v []byte, c *etl.Collector, b *bytes.B
return c.Collect(k, common.Copy(b.Bytes()))
}
// func (s *Antiquary) dumpFullBeaconState() {
// b, err := s.currentState.EncodeSSZ(nil)
// if err != nil {
// s.logger.Error("Failed to encode full beacon state", "err", err)
// return
// }
// // just dump it in a.txt like an idiot without afero
// if err := os.WriteFile("b.txt", b, 0644); err != nil {
// s.logger.Error("Failed to write full beacon state", "err", err)
// }
// }
func flattenRandaoMixes(hashes []libcommon.Hash) []byte {
out := make([]byte, len(hashes)*32)
for i, h := range hashes {
@ -720,3 +797,18 @@ func (s *Antiquary) antiquateFullSlashings(collector *etl.Collector, slot uint64
}
return collector.Collect(base_encoding.Encode64ToBytes4(slot), common.Copy(buffer.Bytes()))
}
func findNearestSlotBackwards(tx kv.Tx, slot uint64) (uint64, error) {
canonicalRoot, err := beacon_indicies.ReadCanonicalBlockRoot(tx, slot)
if err != nil {
return 0, err
}
for canonicalRoot == (common.Hash{}) && slot > 0 {
slot--
canonicalRoot, err = beacon_indicies.ReadCanonicalBlockRoot(tx, slot)
if err != nil {
return 0, err
}
}
return slot, nil
}

View File

@ -2,6 +2,7 @@ package antiquary
import (
"context"
"fmt"
"testing"
_ "embed"
@ -36,6 +37,13 @@ func TestStateAntiquaryCapella(t *testing.T) {
runTest(t, blocks, preState, postState)
}
func TestStateAntiquaryBellatrix(t *testing.T) {
t.Skip()
blocks, preState, postState := tests.GetBellatrixRandom()
fmt.Println(len(blocks))
runTest(t, blocks, preState, postState)
}
func TestStateAntiquaryPhase0(t *testing.T) {
t.Skip()
blocks, preState, postState := tests.GetPhase0Random()

View File

@ -0,0 +1 @@
{blocks_count: 96}

Binary file not shown.

Some files were not shown because too many files have changed in this diff Show More