mirror of
https://gitlab.com/pulsechaincom/prysm-pulse.git
synced 2024-12-23 11:57:18 +00:00
Fix new state mgmt sync stuck in a loop (#5142)
This commit is contained in:
parent
2ab4b86f9b
commit
dc3fb018fe
@ -135,10 +135,18 @@ func (s *Service) Start() {
|
||||
if err != nil {
|
||||
log.Fatalf("Could not fetch finalized cp: %v", err)
|
||||
}
|
||||
|
||||
if beaconState == nil {
|
||||
beaconState, err = s.beaconDB.State(ctx, bytesutil.ToBytes32(cp.Root))
|
||||
if err != nil {
|
||||
log.Fatalf("Could not fetch beacon state: %v", err)
|
||||
if featureconfig.Get().NewStateMgmt {
|
||||
beaconState, err = s.stateGen.StateByRoot(ctx, bytesutil.ToBytes32(cp.Root))
|
||||
if err != nil {
|
||||
log.Fatalf("Could not fetch beacon state: %v", err)
|
||||
}
|
||||
} else {
|
||||
beaconState, err = s.beaconDB.State(ctx, bytesutil.ToBytes32(cp.Root))
|
||||
if err != nil {
|
||||
log.Fatalf("Could not fetch beacon state: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -435,7 +443,6 @@ func (s *Service) initializeChainInfo(ctx context.Context) error {
|
||||
if finalizedState == nil || finalizedBlock == nil {
|
||||
return errors.New("finalized state and block can't be nil")
|
||||
}
|
||||
|
||||
s.setHead(finalizedRoot, finalizedBlock, finalizedState)
|
||||
|
||||
return nil
|
||||
|
@ -78,7 +78,10 @@ func (s *State) MigrateToCold(ctx context.Context, finalizedState *state.BeaconS
|
||||
}).Info("Saved archived point during state migration")
|
||||
}
|
||||
|
||||
if s.beaconDB.HasState(ctx, r) {
|
||||
// Do not delete the current finalized state in case user wants to
|
||||
// switch back to old state service, deleting the recent finalized state
|
||||
// could cause issue switching back.
|
||||
if s.beaconDB.HasState(ctx, r) && r != finalizedRoot {
|
||||
if err := s.beaconDB.DeleteState(ctx, r); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -72,7 +72,7 @@ func newBlocksFetcher(ctx context.Context, cfg *blocksFetcherConfig) *blocksFetc
|
||||
rateLimiter := leakybucket.NewCollector(
|
||||
allowedBlocksPerSecond, /* rate */
|
||||
allowedBlocksPerSecond, /* capacity */
|
||||
false /* deleteEmptyBuckets */)
|
||||
false /* deleteEmptyBuckets */)
|
||||
|
||||
return &blocksFetcher{
|
||||
ctx: ctx,
|
||||
|
@ -10,6 +10,7 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
|
||||
"github.com/prysmaticlabs/prysm/shared/bls"
|
||||
"github.com/prysmaticlabs/prysm/shared/bytesutil"
|
||||
"github.com/prysmaticlabs/prysm/shared/featureconfig"
|
||||
"github.com/prysmaticlabs/prysm/shared/params"
|
||||
"github.com/prysmaticlabs/prysm/shared/runutil"
|
||||
"github.com/prysmaticlabs/prysm/shared/traceutil"
|
||||
@ -61,7 +62,8 @@ func (s *Service) processPendingAtts(ctx context.Context) error {
|
||||
attestations := s.blkRootToPendingAtts[bRoot]
|
||||
s.pendingAttsLock.RUnlock()
|
||||
// Has the pending attestation's missing block arrived and the node processed block yet?
|
||||
if s.db.HasBlock(ctx, bRoot) && s.db.HasState(ctx, bRoot) {
|
||||
hasStateSummary := featureconfig.Get().NewStateMgmt && s.db.HasStateSummary(ctx, bRoot)
|
||||
if s.db.HasBlock(ctx, bRoot) && (s.db.HasState(ctx, bRoot) || hasStateSummary) {
|
||||
numberOfBlocksRecoveredFromAtt.Inc()
|
||||
for _, att := range attestations {
|
||||
// The pending attestations can arrive in both aggregated and unaggregated forms,
|
||||
|
@ -45,7 +45,7 @@ func (r *Service) beaconBlockSubscriber(ctx context.Context, msg proto.Message)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Handle block when the parent is unknown
|
||||
// Handle block when the parent is unknown.
|
||||
if !r.db.HasBlock(ctx, bytesutil.ToBytes32(block.ParentRoot)) {
|
||||
r.pendingQueueLock.Lock()
|
||||
r.slotToPendingBlocks[block.Slot] = signed
|
||||
|
@ -128,7 +128,8 @@ func (r *Service) validateAggregatedAtt(ctx context.Context, a *ethpb.AggregateA
|
||||
|
||||
func (r *Service) validateBlockInAttestation(ctx context.Context, a *ethpb.AggregateAttestationAndProof) bool {
|
||||
// Verify the block being voted and the processed state is in DB. The block should have passed validation if it's in the DB.
|
||||
hasState := r.db.HasState(ctx, bytesutil.ToBytes32(a.Aggregate.Data.BeaconBlockRoot))
|
||||
hasStateSummary := featureconfig.Get().NewStateMgmt && r.db.HasStateSummary(ctx, bytesutil.ToBytes32(a.Aggregate.Data.BeaconBlockRoot))
|
||||
hasState := r.db.HasState(ctx, bytesutil.ToBytes32(a.Aggregate.Data.BeaconBlockRoot)) || hasStateSummary
|
||||
hasBlock := r.db.HasBlock(ctx, bytesutil.ToBytes32(a.Aggregate.Data.BeaconBlockRoot))
|
||||
if !(hasState && hasBlock) {
|
||||
// A node doesn't have the block, it'll request from peer while saving the pending attestation to a queue.
|
||||
|
@ -74,7 +74,8 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p
|
||||
}
|
||||
|
||||
// Verify the block being voted and the processed state is in DB and. The block should have passed validation if it's in the DB.
|
||||
hasState := s.db.HasState(ctx, bytesutil.ToBytes32(att.Data.BeaconBlockRoot))
|
||||
hasStateSummary := featureconfig.Get().NewStateMgmt && s.db.HasStateSummary(ctx, bytesutil.ToBytes32(att.Data.BeaconBlockRoot))
|
||||
hasState := s.db.HasState(ctx, bytesutil.ToBytes32(att.Data.BeaconBlockRoot)) || hasStateSummary
|
||||
hasBlock := s.db.HasBlock(ctx, bytesutil.ToBytes32(att.Data.BeaconBlockRoot))
|
||||
if !(hasState && hasBlock) {
|
||||
// A node doesn't have the block, it'll request from peer while saving the pending attestation to a queue.
|
||||
|
@ -233,7 +233,6 @@ func ReverseBytes32Slice(arr [][32]byte) [][32]byte {
|
||||
return arr
|
||||
}
|
||||
|
||||
|
||||
// PadTo pads a byte slice to the given size. If the byte slice is larger than the given size, the
|
||||
// original slice is returned.
|
||||
func PadTo(b []byte, size int) []byte {
|
||||
@ -241,4 +240,4 @@ func PadTo(b []byte, size int) []byte {
|
||||
return b
|
||||
}
|
||||
return append(b, make([]byte, size-len(b))...)
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user