diff --git a/beacon-chain/archiver/service.go b/beacon-chain/archiver/service.go index 8f3cec865..88a021964 100644 --- a/beacon-chain/archiver/service.go +++ b/beacon-chain/archiver/service.go @@ -6,7 +6,7 @@ import ( "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/beacon-chain/blockchain" - "github.com/prysmaticlabs/prysm/beacon-chain/core/epoch" + epochProcessing "github.com/prysmaticlabs/prysm/beacon-chain/core/epoch" "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" "github.com/prysmaticlabs/prysm/beacon-chain/core/statefeed" "github.com/prysmaticlabs/prysm/beacon-chain/core/validators" @@ -22,11 +22,12 @@ var log = logrus.WithField("prefix", "archiver") // Service defining archiver functionality for persisting checkpointed // beacon chain information to a database backend for historical purposes. type Service struct { - ctx context.Context - cancel context.CancelFunc - beaconDB db.Database - headFetcher blockchain.HeadFetcher - stateNotifier statefeed.Notifier + ctx context.Context + cancel context.CancelFunc + beaconDB db.Database + headFetcher blockchain.HeadFetcher + stateNotifier statefeed.Notifier + lastArchivedEpoch uint64 } // Config options for the archiver service. @@ -66,13 +67,12 @@ func (s *Service) Status() error { } // We archive committee information pertaining to the head state's epoch. -func (s *Service) archiveCommitteeInfo(ctx context.Context, headState *pb.BeaconState) error { - currentEpoch := helpers.SlotToEpoch(headState.Slot) - proposerSeed, err := helpers.Seed(headState, currentEpoch, params.BeaconConfig().DomainBeaconProposer) +func (s *Service) archiveCommitteeInfo(ctx context.Context, headState *pb.BeaconState, epoch uint64) error { + proposerSeed, err := helpers.Seed(headState, epoch, params.BeaconConfig().DomainBeaconProposer) if err != nil { return errors.Wrap(err, "could not generate seed") } - attesterSeed, err := helpers.Seed(headState, currentEpoch, params.BeaconConfig().DomainBeaconAttester) + attesterSeed, err := helpers.Seed(headState, epoch, params.BeaconConfig().DomainBeaconAttester) if err != nil { return errors.Wrap(err, "could not generate seed") } @@ -81,17 +81,18 @@ func (s *Service) archiveCommitteeInfo(ctx context.Context, headState *pb.Beacon ProposerSeed: proposerSeed[:], AttesterSeed: attesterSeed[:], } - if err := s.beaconDB.SaveArchivedCommitteeInfo(ctx, currentEpoch, info); err != nil { + if err := s.beaconDB.SaveArchivedCommitteeInfo(ctx, epoch, info); err != nil { return errors.Wrap(err, "could not archive committee info") } return nil } // We archive active validator set changes that happened during the previous epoch. -func (s *Service) archiveActiveSetChanges(ctx context.Context, headState *pb.BeaconState) error { - activations := validators.ActivatedValidatorIndices(helpers.PrevEpoch(headState), headState.Validators) - slashings := validators.SlashedValidatorIndices(helpers.PrevEpoch(headState), headState.Validators) - activeValidatorCount, err := helpers.ActiveValidatorCount(headState, helpers.PrevEpoch(headState)) +func (s *Service) archiveActiveSetChanges(ctx context.Context, headState *pb.BeaconState, epoch uint64) error { + prevEpoch := epoch - 1 + activations := validators.ActivatedValidatorIndices(prevEpoch, headState.Validators) + slashings := validators.SlashedValidatorIndices(prevEpoch, headState.Validators) + activeValidatorCount, err := helpers.ActiveValidatorCount(headState, prevEpoch) if err != nil { return errors.Wrap(err, "could not get active validator count") } @@ -104,7 +105,7 @@ func (s *Service) archiveActiveSetChanges(ctx context.Context, headState *pb.Bea Exited: exited, Slashed: slashings, } - if err := s.beaconDB.SaveArchivedActiveValidatorChanges(ctx, helpers.PrevEpoch(headState), activeSetChanges); err != nil { + if err := s.beaconDB.SaveArchivedActiveValidatorChanges(ctx, prevEpoch, activeSetChanges); err != nil { return errors.Wrap(err, "could not archive active validator set changes") } return nil @@ -112,19 +113,18 @@ func (s *Service) archiveActiveSetChanges(ctx context.Context, headState *pb.Bea // We compute participation metrics by first retrieving the head state and // matching validator attestations during the epoch. -func (s *Service) archiveParticipation(ctx context.Context, headState *pb.BeaconState) error { - participation, err := epoch.ComputeValidatorParticipation(headState, helpers.SlotToEpoch(headState.Slot)) +func (s *Service) archiveParticipation(ctx context.Context, headState *pb.BeaconState, epoch uint64) error { + participation, err := epochProcessing.ComputeValidatorParticipation(headState, epoch) if err != nil { return errors.Wrap(err, "could not compute participation") } - return s.beaconDB.SaveArchivedValidatorParticipation(ctx, helpers.SlotToEpoch(headState.Slot), participation) + return s.beaconDB.SaveArchivedValidatorParticipation(ctx, epoch, participation) } // We archive validator balances and active indices. -func (s *Service) archiveBalances(ctx context.Context, headState *pb.BeaconState) error { +func (s *Service) archiveBalances(ctx context.Context, headState *pb.BeaconState, epoch uint64) error { balances := headState.Balances - currentEpoch := helpers.CurrentEpoch(headState) - if err := s.beaconDB.SaveArchivedBalances(ctx, currentEpoch, balances); err != nil { + if err := s.beaconDB.SaveArchivedBalances(ctx, epoch, balances); err != nil { return errors.Wrap(err, "could not archive balances") } return nil @@ -146,29 +146,35 @@ func (s *Service) run(ctx context.Context) { log.WithError(err).Error("Head state is not available") continue } - if !helpers.IsEpochEnd(headState.Slot) { + currentEpoch := helpers.CurrentEpoch(headState) + if !helpers.IsEpochEnd(headState.Slot) && currentEpoch <= s.lastArchivedEpoch { continue } - if err := s.archiveCommitteeInfo(ctx, headState); err != nil { + epochToArchive := currentEpoch + if !helpers.IsEpochEnd(headState.Slot) { + epochToArchive-- + } + if err := s.archiveCommitteeInfo(ctx, headState, epochToArchive); err != nil { log.WithError(err).Error("Could not archive committee info") continue } - if err := s.archiveActiveSetChanges(ctx, headState); err != nil { + if err := s.archiveActiveSetChanges(ctx, headState, epochToArchive); err != nil { log.WithError(err).Error("Could not archive active validator set changes") continue } - if err := s.archiveParticipation(ctx, headState); err != nil { + if err := s.archiveParticipation(ctx, headState, epochToArchive); err != nil { log.WithError(err).Error("Could not archive validator participation") continue } - if err := s.archiveBalances(ctx, headState); err != nil { + if err := s.archiveBalances(ctx, headState, epochToArchive); err != nil { log.WithError(err).Error("Could not archive validator balances and active indices") continue } log.WithField( "epoch", - helpers.CurrentEpoch(headState), + epochToArchive, ).Debug("Successfully archived beacon chain data during epoch") + s.lastArchivedEpoch = epochToArchive } case <-s.ctx.Done(): log.Debug("Context closed, exiting goroutine") diff --git a/beacon-chain/archiver/service_test.go b/beacon-chain/archiver/service_test.go index 0c951589c..dbd781c27 100644 --- a/beacon-chain/archiver/service_test.go +++ b/beacon-chain/archiver/service_test.go @@ -74,6 +74,56 @@ func TestArchiverService_OnlyArchiveAtEpochEnd(t *testing.T) { testutil.AssertLogsDoNotContain(t, hook, "Successfully archived") } +func TestArchiverService_ArchivesEvenThroughSkipSlot(t *testing.T) { + hook := logTest.NewGlobal() + svc, beaconDB := setupService(t) + validatorCount := uint64(100) + headState := setupState(t, validatorCount) + defer dbutil.TeardownDB(t, beaconDB) + event := &statefeed.Event{ + Type: statefeed.BlockProcessed, + Data: &statefeed.BlockProcessedData{ + BlockRoot: [32]byte{1, 2, 3}, + Verified: true, + }, + } + + exitRoutine := make(chan bool) + go func() { + svc.run(svc.ctx) + <-exitRoutine + }() + + // Send out an event every slot, skipping the end slot of the epoch. + for i := uint64(0); i < params.BeaconConfig().SlotsPerEpoch+1; i++ { + headState.Slot = i + svc.headFetcher = &mock.ChainService{ + State: headState, + } + if helpers.IsEpochEnd(i) { + continue + } + // Send in a loop to ensure it is delivered (busy wait for the service to subscribe to the state feed). + for sent := 0; sent == 0; { + sent = svc.stateNotifier.StateFeed().Send(event) + } + } + if err := svc.Stop(); err != nil { + t.Fatal(err) + } + exitRoutine <- true + + // The context should have been canceled. + if svc.ctx.Err() != context.Canceled { + t.Error("context was not canceled") + } + + testutil.AssertLogsContain(t, hook, "Received block processed event") + // Even though there was a skip slot, we should still be able to archive + // upon the next block event afterwards. + testutil.AssertLogsContain(t, hook, "Successfully archived") +} + func TestArchiverService_ComputesAndSavesParticipation(t *testing.T) { hook := logTest.NewGlobal() validatorCount := uint64(100) @@ -106,7 +156,7 @@ func TestArchiverService_ComputesAndSavesParticipation(t *testing.T) { } if !proto.Equal(wanted, retrieved) { - t.Errorf("Wanted participation for epoch %d %v, retrieved %v", currentEpoch, wanted, retrieved) + t.Errorf("Wanted participation for epoch %d %v, retrieved %v", currentEpoch-1, wanted, retrieved) } testutil.AssertLogsContain(t, hook, "Successfully archived") } @@ -280,7 +330,7 @@ func TestArchiverService_SavesExitedValidatorChanges(t *testing.T) { }, } triggerStateEvent(t, svc, event) - + testutil.AssertLogsContain(t, hook, "Successfully archived") retrieved, err := beaconDB.ArchivedActiveValidatorChanges(svc.ctx, prevEpoch) if err != nil { t.Fatal(err) @@ -291,7 +341,6 @@ func TestArchiverService_SavesExitedValidatorChanges(t *testing.T) { if !reflect.DeepEqual(retrieved.Exited, []uint64{95}) { t.Errorf("Wanted indices 95 exited, received %v", retrieved.Exited) } - testutil.AssertLogsContain(t, hook, "Successfully archived") } func setupState(t *testing.T, validatorCount uint64) *pb.BeaconState {