diff --git a/beacon-chain/db/slasherkv/slasher.go b/beacon-chain/db/slasherkv/slasher.go index 8a73f2b39..9a523a93e 100644 --- a/beacon-chain/db/slasherkv/slasher.go +++ b/beacon-chain/db/slasherkv/slasher.go @@ -34,26 +34,26 @@ func (s *Store) LastEpochWrittenForValidators( defer span.End() attestedEpochs := make([]*slashertypes.AttestedEpochForValidator, 0) - encodedIndexes := make([][]byte, len(validatorIndexes)) - - for i, validatorIndex := range validatorIndexes { - encodedIndexes[i] = encodeValidatorIndex(validatorIndex) - } err := s.db.View(func(tx *bolt.Tx) error { bkt := tx.Bucket(attestedEpochsByValidator) - for i, encodedIndex := range encodedIndexes { - var epoch primitives.Epoch + for _, validatorIndex := range validatorIndexes { + encodedIndex := encodeValidatorIndex(validatorIndex) - if epochBytes := bkt.Get(encodedIndex); epochBytes != nil { - if err := epoch.UnmarshalSSZ(epochBytes); err != nil { - return err - } + epochBytes := bkt.Get(encodedIndex) + if epochBytes == nil { + // If there is no epoch for this validator, skip to the next validator. + continue + } + + var epoch primitives.Epoch + if err := epoch.UnmarshalSSZ(epochBytes); err != nil { + return err } attestedEpoch := &slashertypes.AttestedEpochForValidator{ - ValidatorIndex: validatorIndexes[i], + ValidatorIndex: validatorIndex, Epoch: epoch, } diff --git a/beacon-chain/db/slasherkv/slasher_test.go b/beacon-chain/db/slasherkv/slasher_test.go index ad1282e0a..aaa2dfa39 100644 --- a/beacon-chain/db/slasherkv/slasher_test.go +++ b/beacon-chain/db/slasherkv/slasher_test.go @@ -56,19 +56,16 @@ func TestStore_LastEpochWrittenForValidators(t *testing.T) { epochs[i] = primitives.Epoch(i) } - attestedEpochs, err := beaconDB.LastEpochWrittenForValidators(ctx, indices) - require.NoError(t, err) - require.Equal(t, true, len(attestedEpochs) == len(indices)) - - for _, item := range attestedEpochs { - require.Equal(t, primitives.Epoch(0), item.Epoch) - } - epochsByValidator := make(map[primitives.ValidatorIndex]primitives.Epoch, validatorsCount) for i := 0; i < validatorsCount; i++ { epochsByValidator[indices[i]] = epochs[i] } + // No epochs written for any validators, should return empty list. + attestedEpochs, err := beaconDB.LastEpochWrittenForValidators(ctx, indices) + require.NoError(t, err) + require.Equal(t, 0, len(attestedEpochs)) + err = beaconDB.SaveLastEpochsWrittenForValidators(ctx, epochsByValidator) require.NoError(t, err) diff --git a/beacon-chain/slasher/chunks.go b/beacon-chain/slasher/chunks.go index 7639ba14e..32fc9a858 100644 --- a/beacon-chain/slasher/chunks.go +++ b/beacon-chain/slasher/chunks.go @@ -14,14 +14,6 @@ import ( "github.com/sirupsen/logrus" ) -// A struct encapsulating input arguments to -// functions used for attester slashing detection and -// loading, saving, and updating min/max span chunks. -type chunkUpdateArgs struct { - chunkIndex uint64 - currentEpoch primitives.Epoch -} - // Chunker defines a struct which represents a slice containing a chunk for K different validator's // min/max spans used for surround vote detection in slasher. The interface defines methods used to check // if an attestation is slashable for a validator index based on the contents of @@ -36,7 +28,8 @@ type Chunker interface { attestation *slashertypes.IndexedAttestationWrapper, ) (*ethpb.AttesterSlashing, error) Update( - args *chunkUpdateArgs, + chunkIndex uint64, + currentEpoch primitives.Epoch, validatorIndex primitives.ValidatorIndex, startEpoch, newTargetEpoch primitives.Epoch, @@ -88,6 +81,8 @@ type MinSpanChunksSlice struct { data []uint16 } +var _ Chunker = (*MinSpanChunksSlice)(nil) + // MaxSpanChunksSlice represents the same data structure as MinSpanChunksSlice however // keeps track of validator max spans for slashing detection instead. type MaxSpanChunksSlice struct { @@ -95,6 +90,8 @@ type MaxSpanChunksSlice struct { data []uint16 } +var _ Chunker = (*MaxSpanChunksSlice)(nil) + // EmptyMinSpanChunksSlice initializes a min span chunk of length C*K for // C = chunkSize and K = validatorChunkSize filled with neutral elements. // For min spans, the neutral element is `undefined`, represented by MaxUint16. @@ -384,21 +381,22 @@ func (m *MaxSpanChunksSlice) CheckSlashable( // to update. In our example, we stop at 2, which is still part of chunk 0, so no need // to jump to another min span chunks slice to perform updates. func (m *MinSpanChunksSlice) Update( - args *chunkUpdateArgs, + chunkIndex uint64, + currentEpoch primitives.Epoch, validatorIndex primitives.ValidatorIndex, startEpoch, newTargetEpoch primitives.Epoch, ) (keepGoing bool, err error) { // The lowest epoch we need to update. minEpoch := primitives.Epoch(0) - if args.currentEpoch > (m.params.historyLength - 1) { - minEpoch = args.currentEpoch - (m.params.historyLength - 1) + if currentEpoch > (m.params.historyLength - 1) { + minEpoch = currentEpoch - (m.params.historyLength - 1) } epochInChunk := startEpoch // We go down the chunk for the validator, updating every value starting at startEpoch down to minEpoch. // As long as the epoch, e, in the same chunk index and e >= minEpoch, we proceed with // a for loop. - for m.params.chunkIndex(epochInChunk) == args.chunkIndex && epochInChunk >= minEpoch { + for m.params.chunkIndex(epochInChunk) == chunkIndex && epochInChunk >= minEpoch { var chunkTarget primitives.Epoch chunkTarget, err = chunkDataAtEpoch(m.params, m.data, validatorIndex, epochInChunk) if err != nil { @@ -433,7 +431,8 @@ func (m *MinSpanChunksSlice) Update( // more about how update exactly works, refer to the detailed documentation for the Update function for // MinSpanChunksSlice. func (m *MaxSpanChunksSlice) Update( - args *chunkUpdateArgs, + chunkIndex uint64, + currentEpoch primitives.Epoch, validatorIndex primitives.ValidatorIndex, startEpoch, newTargetEpoch primitives.Epoch, @@ -442,7 +441,7 @@ func (m *MaxSpanChunksSlice) Update( // We go down the chunk for the validator, updating every value starting at startEpoch up to // and including the current epoch. As long as the epoch, e, is in the same chunk index and e <= currentEpoch, // we proceed with a for loop. - for m.params.chunkIndex(epochInChunk) == args.chunkIndex && epochInChunk <= args.currentEpoch { + for m.params.chunkIndex(epochInChunk) == chunkIndex && epochInChunk <= currentEpoch { var chunkTarget primitives.Epoch chunkTarget, err = chunkDataAtEpoch(m.params, m.data, validatorIndex, epochInChunk) if err != nil { @@ -465,7 +464,7 @@ func (m *MaxSpanChunksSlice) Update( } // If the epoch to update now lies beyond the current chunk, then // continue to the next chunk to update it. - keepGoing = epochInChunk <= args.currentEpoch + keepGoing = epochInChunk <= currentEpoch return } diff --git a/beacon-chain/slasher/chunks_test.go b/beacon-chain/slasher/chunks_test.go index 2385ae27d..a9519b8a2 100644 --- a/beacon-chain/slasher/chunks_test.go +++ b/beacon-chain/slasher/chunks_test.go @@ -127,14 +127,10 @@ func TestMinSpanChunksSlice_CheckSlashable(t *testing.T) { source = primitives.Epoch(1) target = primitives.Epoch(2) att = createAttestationWrapperEmptySig(t, source, target, nil, nil) - chunkIdx := uint64(0) + chunkIndex := uint64(0) startEpoch := target currentEpoch := target - args := &chunkUpdateArgs{ - chunkIndex: chunkIdx, - currentEpoch: currentEpoch, - } - _, err = chunk.Update(args, validatorIdx, startEpoch, target) + _, err = chunk.Update(chunkIndex, currentEpoch, validatorIdx, startEpoch, target) require.NoError(t, err) // Next up, we create a surrounding vote, but it should NOT be slashable @@ -209,14 +205,10 @@ func TestMaxSpanChunksSlice_CheckSlashable(t *testing.T) { source = primitives.Epoch(0) target = primitives.Epoch(3) att = createAttestationWrapperEmptySig(t, source, target, nil, nil) - chunkIdx := uint64(0) + chunkIndex := uint64(0) startEpoch := source currentEpoch := target - args := &chunkUpdateArgs{ - chunkIndex: chunkIdx, - currentEpoch: currentEpoch, - } - _, err = chunk.Update(args, validatorIdx, startEpoch, target) + _, err = chunk.Update(chunkIndex, currentEpoch, validatorIdx, startEpoch, target) require.NoError(t, err) // Next up, we create a surrounded vote, but it should NOT be slashable @@ -288,15 +280,11 @@ func TestMinSpanChunksSlice_Update_MultipleChunks(t *testing.T) { } chunk := EmptyMinSpanChunksSlice(params) target := primitives.Epoch(3) - chunkIdx := uint64(1) - validatorIdx := primitives.ValidatorIndex(0) + chunkIndex := uint64(1) + validatorIndex := primitives.ValidatorIndex(0) startEpoch := target currentEpoch := target - args := &chunkUpdateArgs{ - chunkIndex: chunkIdx, - currentEpoch: currentEpoch, - } - keepGoing, err := chunk.Update(args, validatorIdx, startEpoch, target) + keepGoing, err := chunk.Update(chunkIndex, currentEpoch, validatorIndex, startEpoch, target) require.NoError(t, err) // We should keep going! We still have to update the data for chunk index 0. @@ -306,15 +294,11 @@ func TestMinSpanChunksSlice_Update_MultipleChunks(t *testing.T) { // Now we update for chunk index 0. chunk = EmptyMinSpanChunksSlice(params) - chunkIdx = uint64(0) - validatorIdx = primitives.ValidatorIndex(0) + chunkIndex = uint64(0) + validatorIndex = primitives.ValidatorIndex(0) startEpoch = primitives.Epoch(1) currentEpoch = target - args = &chunkUpdateArgs{ - chunkIndex: chunkIdx, - currentEpoch: currentEpoch, - } - keepGoing, err = chunk.Update(args, validatorIdx, startEpoch, target) + keepGoing, err = chunk.Update(chunkIndex, currentEpoch, validatorIndex, startEpoch, target) require.NoError(t, err) require.Equal(t, false, keepGoing) want = []uint16{3, 2, math.MaxUint16, math.MaxUint16, math.MaxUint16, math.MaxUint16} @@ -329,15 +313,11 @@ func TestMaxSpanChunksSlice_Update_MultipleChunks(t *testing.T) { } chunk := EmptyMaxSpanChunksSlice(params) target := primitives.Epoch(3) - chunkIdx := uint64(0) + chunkIndex := uint64(0) validatorIdx := primitives.ValidatorIndex(0) startEpoch := primitives.Epoch(0) currentEpoch := target - args := &chunkUpdateArgs{ - chunkIndex: chunkIdx, - currentEpoch: currentEpoch, - } - keepGoing, err := chunk.Update(args, validatorIdx, startEpoch, target) + keepGoing, err := chunk.Update(chunkIndex, currentEpoch, validatorIdx, startEpoch, target) require.NoError(t, err) // We should keep going! We still have to update the data for chunk index 1. @@ -347,15 +327,11 @@ func TestMaxSpanChunksSlice_Update_MultipleChunks(t *testing.T) { // Now we update for chunk index 1. chunk = EmptyMaxSpanChunksSlice(params) - chunkIdx = uint64(1) + chunkIndex = uint64(1) validatorIdx = primitives.ValidatorIndex(0) startEpoch = primitives.Epoch(2) currentEpoch = target - args = &chunkUpdateArgs{ - chunkIndex: chunkIdx, - currentEpoch: currentEpoch, - } - keepGoing, err = chunk.Update(args, validatorIdx, startEpoch, target) + keepGoing, err = chunk.Update(chunkIndex, currentEpoch, validatorIdx, startEpoch, target) require.NoError(t, err) require.Equal(t, false, keepGoing) want = []uint16{1, 0, 0, 0, 0, 0} @@ -393,15 +369,11 @@ func TestMinSpanChunksSlice_Update_SingleChunk(t *testing.T) { } chunk := EmptyMinSpanChunksSlice(params) target := primitives.Epoch(1) - chunkIdx := uint64(0) + chunkIndex := uint64(0) validatorIdx := primitives.ValidatorIndex(0) startEpoch := target currentEpoch := target - args := &chunkUpdateArgs{ - chunkIndex: chunkIdx, - currentEpoch: currentEpoch, - } - keepGoing, err := chunk.Update(args, validatorIdx, startEpoch, target) + keepGoing, err := chunk.Update(chunkIndex, currentEpoch, validatorIdx, startEpoch, target) require.NoError(t, err) require.Equal(t, false, keepGoing) want := []uint16{1, 0, math.MaxUint16, math.MaxUint16, math.MaxUint16, math.MaxUint16} @@ -416,15 +388,11 @@ func TestMaxSpanChunksSlice_Update_SingleChunk(t *testing.T) { } chunk := EmptyMaxSpanChunksSlice(params) target := primitives.Epoch(3) - chunkIdx := uint64(0) + chunkIndex := uint64(0) validatorIdx := primitives.ValidatorIndex(0) startEpoch := primitives.Epoch(0) currentEpoch := target - args := &chunkUpdateArgs{ - chunkIndex: chunkIdx, - currentEpoch: currentEpoch, - } - keepGoing, err := chunk.Update(args, validatorIdx, startEpoch, target) + keepGoing, err := chunk.Update(chunkIndex, currentEpoch, validatorIdx, startEpoch, target) require.NoError(t, err) require.Equal(t, false, keepGoing) want := []uint16{3, 2, 1, 0, 0, 0, 0, 0} diff --git a/beacon-chain/slasher/detect_attestations.go b/beacon-chain/slasher/detect_attestations.go index a96d17d2a..8f4643d71 100644 --- a/beacon-chain/slasher/detect_attestations.go +++ b/beacon-chain/slasher/detect_attestations.go @@ -20,14 +20,11 @@ import ( func (s *Service) checkSlashableAttestations( ctx context.Context, currentEpoch primitives.Epoch, atts []*slashertypes.IndexedAttestationWrapper, ) (map[[fieldparams.RootLength]byte]*ethpb.AttesterSlashing, error) { - totalStart := time.Now() + start := time.Now() slashings := map[[fieldparams.RootLength]byte]*ethpb.AttesterSlashing{} // Double votes - log.Debug("Checking for double votes") - start := time.Now() - doubleVoteSlashings, err := s.checkDoubleVotes(ctx, atts) if err != nil { return nil, errors.Wrap(err, "could not check slashable double votes") @@ -47,40 +44,20 @@ func (s *Service) checkSlashableAttestations( } // Surrounding / surrounded votes - groupedByValidatorChunkIndexAtts := s.groupByValidatorChunkIndex(atts) - log.WithField("numBatches", len(groupedByValidatorChunkIndexAtts)).Debug("Batching attestations by validator chunk index") - groupsCount := len(groupedByValidatorChunkIndexAtts) - - surroundStart := time.Now() - - for validatorChunkIndex, attestations := range groupedByValidatorChunkIndexAtts { - surroundSlashings, err := s.checkSurrounds(ctx, attestations, currentEpoch, validatorChunkIndex) - if err != nil { - return nil, err - } - - for root, slashing := range surroundSlashings { - slashings[root] = slashing - } - - indices := s.params.validatorIndexesInChunk(validatorChunkIndex) - for _, idx := range indices { - s.latestEpochWrittenForValidator[idx] = currentEpoch - } + surroundSlashings, err := s.checkSurroundVotes(ctx, atts, currentEpoch) + if err != nil { + return nil, errors.Wrap(err, "could not check slashable surround votes") } - surroundElapsed := time.Since(surroundStart) - totalElapsed := time.Since(totalStart) + for root, slashing := range surroundSlashings { + slashings[root] = slashing + } + + elapsed := time.Since(start) fields := logrus.Fields{ - "numAttestations": len(atts), - "numBatchesByValidatorChunkIndex": groupsCount, - "elapsed": totalElapsed, - } - - if groupsCount > 0 { - avgProcessingTimePerBatch := surroundElapsed / time.Duration(groupsCount) - fields["avgBatchProcessingTime"] = avgProcessingTimePerBatch + "numAttestations": len(atts), + "elapsed": elapsed, } log.WithFields(fields).Info("Done checking slashable attestations") @@ -92,70 +69,65 @@ func (s *Service) checkSlashableAttestations( return slashings, nil } -// Given a list of attestations all corresponding to a validator chunk index as well -// as the current epoch in time, we perform slashing detection. -// The process is as follows given a list of attestations: -// -// 1. Group the attestations by chunk index. -// 2. Update the min and max spans for those grouped attestations, check if any slashings are -// found in the process -// 3. Update the latest written epoch for all validators involved to the current epoch. -// -// This function performs a lot of critical actions and is split into smaller helpers for cleanliness. -func (s *Service) checkSurrounds( +// Check for surrounding and surrounded votes in our database given a list of incoming attestations. +func (s *Service) checkSurroundVotes( ctx context.Context, - attestations []*slashertypes.IndexedAttestationWrapper, + attWrappers []*slashertypes.IndexedAttestationWrapper, currentEpoch primitives.Epoch, - validatorChunkIndex uint64, ) (map[[fieldparams.RootLength]byte]*ethpb.AttesterSlashing, error) { - // Map of updated chunks by chunk index, which will be saved at the end. - updatedMinChunks, updatedMaxChunks := map[uint64]Chunker{}, map[uint64]Chunker{} - - groupedByChunkIndexAtts := s.groupByChunkIndex(attestations) - validatorIndexes := s.params.validatorIndexesInChunk(validatorChunkIndex) - slashings := map[[fieldparams.RootLength]byte]*ethpb.AttesterSlashing{} - // Update epoch for validators. - for _, validatorIndex := range validatorIndexes { - // This function modifies `updatedMinChunks` in place. - if err := s.epochUpdateForValidator(ctx, updatedMinChunks, validatorChunkIndex, slashertypes.MinSpan, currentEpoch, validatorIndex); err != nil { - return nil, errors.Wrapf(err, "could not update validator index for min chunks %d", validatorIndex) + // Group attestation wrappers by validator chunk index. + attWrappersByValidatorChunkIndex := s.groupByValidatorChunkIndex(attWrappers) + + for validatorChunkIndex, attWrappers := range attWrappersByValidatorChunkIndex { + minChunkByChunkIndex, err := s.updatedChunkByChunkIndex(ctx, slashertypes.MinSpan, currentEpoch, validatorChunkIndex) + if err != nil { + return nil, errors.Wrap(err, "could not update updatedMinChunks") } - // This function modifies `updatedMaxChunks` in place. - if err := s.epochUpdateForValidator(ctx, updatedMaxChunks, validatorChunkIndex, slashertypes.MaxSpan, currentEpoch, validatorIndex); err != nil { - return nil, errors.Wrapf(err, "could not update validator index for max chunks %d", validatorIndex) + maxChunkByChunkIndex, err := s.updatedChunkByChunkIndex(ctx, slashertypes.MaxSpan, currentEpoch, validatorChunkIndex) + if err != nil { + return nil, errors.Wrap(err, "could not update updatedMaxChunks") } - } - // Check for surrounding votes. - surroundingSlashings, err := s.updateSpans(ctx, updatedMinChunks, groupedByChunkIndexAtts, slashertypes.MinSpan, validatorChunkIndex, currentEpoch) - if err != nil { - return nil, errors.Wrapf(err, "could not update min attestation spans for validator chunk index %d", validatorChunkIndex) - } + // Group (already grouped by validator chunk index) attestation wrappers by chunk index. + attWrappersByChunkIndex := s.groupByChunkIndex(attWrappers) - for root, slashing := range surroundingSlashings { - slashings[root] = slashing - } + // Check for surrounding votes. + surroundingSlashings, err := s.updateSpans(ctx, minChunkByChunkIndex, attWrappersByChunkIndex, slashertypes.MinSpan, validatorChunkIndex, currentEpoch) + if err != nil { + return nil, errors.Wrapf(err, "could not update min attestation spans for validator chunk index %d", validatorChunkIndex) + } - // Check for surrounded votes. - surroundedSlashings, err := s.updateSpans(ctx, updatedMaxChunks, groupedByChunkIndexAtts, slashertypes.MaxSpan, validatorChunkIndex, currentEpoch) - if err != nil { - return nil, errors.Wrapf(err, "could not update max attestation spans for validator chunk index %d", validatorChunkIndex) - } + for root, slashing := range surroundingSlashings { + slashings[root] = slashing + } - for root, slashing := range surroundedSlashings { - slashings[root] = slashing - } + // Check for surrounded votes. + surroundedSlashings, err := s.updateSpans(ctx, maxChunkByChunkIndex, attWrappersByChunkIndex, slashertypes.MaxSpan, validatorChunkIndex, currentEpoch) + if err != nil { + return nil, errors.Wrapf(err, "could not update max attestation spans for validator chunk index %d", validatorChunkIndex) + } - // Save updated chunks into the database. - if err := s.saveUpdatedChunks(ctx, updatedMinChunks, slashertypes.MinSpan, validatorChunkIndex); err != nil { - return nil, errors.Wrap(err, "could not save chunks for min spans") - } + for root, slashing := range surroundedSlashings { + slashings[root] = slashing + } - if err := s.saveUpdatedChunks(ctx, updatedMaxChunks, slashertypes.MaxSpan, validatorChunkIndex); err != nil { - return nil, errors.Wrap(err, "could not save chunks for max spans") + // Save updated chunks into the database. + if err := s.saveUpdatedChunks(ctx, minChunkByChunkIndex, slashertypes.MinSpan, validatorChunkIndex); err != nil { + return nil, errors.Wrap(err, "could not save chunks for min spans") + } + + if err := s.saveUpdatedChunks(ctx, maxChunkByChunkIndex, slashertypes.MaxSpan, validatorChunkIndex); err != nil { + return nil, errors.Wrap(err, "could not save chunks for max spans") + } + + // Update the latest written epoch for all validators involved to the current chunk. + indices := s.params.validatorIndexesInChunk(validatorChunkIndex) + for _, idx := range indices { + s.latestEpochWrittenForValidator[idx] = currentEpoch + } } return slashings, nil @@ -264,55 +236,74 @@ func (s *Service) checkDoubleVotes( return slashings, nil } -// This function updates `updatedChunks`, representing the slashing spans for a given validator for -// a change in epoch since the last epoch we have recorded for the validator. -// For example, if the last epoch a validator has written is N, and the current epoch is N+5, -// we update entries in the slashing spans with their neutral element for epochs N+1 to N+4. -// This also puts any loaded chunks in a map used as a cache for further processing and minimizing -// database reads later on. -func (s *Service) epochUpdateForValidator( +// updatedChunkByChunkIndex loads the chunks from the database for validators corresponding to +// the `validatorChunkIndex`. +// It then updates the chunks with the neutral element for corresponding validators from +// the epoch just after the latest epoch written to the current epoch. +// A mapping between chunk index and chunk is returned to the caller. +func (s *Service) updatedChunkByChunkIndex( ctx context.Context, - updatedChunks map[uint64]Chunker, - validatorChunkIndex uint64, chunkKind slashertypes.ChunkKind, currentEpoch primitives.Epoch, - validatorIndex primitives.ValidatorIndex, -) error { - var err error + validatorChunkIndex uint64, +) (map[uint64]Chunker, error) { + chunkByChunkIndex := map[uint64]Chunker{} - latestEpochWritten, ok := s.latestEpochWrittenForValidator[validatorIndex] - if !ok { - return nil - } + validatorIndexes := s.params.validatorIndexesInChunk(validatorChunkIndex) + for _, validatorIndex := range validatorIndexes { + // Retrieve the latest epoch written for the validator. + latestEpochWritten, ok := s.latestEpochWrittenForValidator[validatorIndex] - for latestEpochWritten <= currentEpoch { - chunkIndex := s.params.chunkIndex(latestEpochWritten) + // Start from the epoch just after the latest epoch written. + epochToWrite, err := latestEpochWritten.SafeAdd(1) + if err != nil { + return nil, errors.Wrap(err, "could not add 1 to latest epoch written") + } - currentChunk, ok := updatedChunks[chunkIndex] if !ok { - currentChunk, err = s.getChunk(ctx, chunkKind, validatorChunkIndex, chunkIndex) - if err != nil { - return errors.Wrap(err, "could not get chunk") - } + epochToWrite = 0 } - for s.params.chunkIndex(latestEpochWritten) == chunkIndex && latestEpochWritten <= currentEpoch { - if err := setChunkRawDistance( - s.params, - currentChunk.Chunk(), - validatorIndex, - latestEpochWritten, - currentChunk.NeutralElement(), - ); err != nil { - return err + // It is useless to update more than `historyLength` epochs, since + // the chunks are circular and we will be overwritten at least one. + if currentEpoch-epochToWrite >= s.params.historyLength { + epochToWrite = currentEpoch + 1 - s.params.historyLength + } + + for epochToWrite <= currentEpoch { + // Get the chunk index for the latest epoch written. + chunkIndex := s.params.chunkIndex(epochToWrite) + + // Get the chunk corresponding to the chunk index from the `chunkByChunkIndex` map. + currentChunk, ok := chunkByChunkIndex[chunkIndex] + if !ok { + // If the chunk is not in the map, retrieve it from the database. + currentChunk, err = s.getChunkFromDatabase(ctx, chunkKind, validatorChunkIndex, chunkIndex) + if err != nil { + return nil, errors.Wrap(err, "could not get chunk") + } } - updatedChunks[chunkIndex] = currentChunk - latestEpochWritten++ + // Update the current chunk with the neutral element for the validator index for the latest epoch written. + for s.params.chunkIndex(epochToWrite) == chunkIndex && epochToWrite <= currentEpoch { + if err := setChunkRawDistance( + s.params, + currentChunk.Chunk(), + validatorIndex, + epochToWrite, + currentChunk.NeutralElement(), + ); err != nil { + return nil, err + } + + epochToWrite++ + } + + chunkByChunkIndex[chunkIndex] = currentChunk } } - return nil + return chunkByChunkIndex, nil } // Updates spans and detects any slashable attester offenses along the way. @@ -409,7 +400,7 @@ func (s *Service) applyAttestationForValidator( chunk, ok := chunksByChunkIdx[chunkIndex] if !ok { - chunk, err = s.getChunk(ctx, chunkKind, validatorChunkIndex, chunkIndex) + chunk, err = s.getChunkFromDatabase(ctx, chunkKind, validatorChunkIndex, chunkIndex) if err != nil { return nil, errors.Wrapf(err, "could not get chunk at index %d", chunkIndex) } @@ -451,17 +442,15 @@ func (s *Service) applyAttestationForValidator( chunk, ok := chunksByChunkIdx[chunkIndex] if !ok { - chunk, err = s.getChunk(ctx, chunkKind, validatorChunkIndex, chunkIndex) + chunk, err = s.getChunkFromDatabase(ctx, chunkKind, validatorChunkIndex, chunkIndex) if err != nil { return nil, errors.Wrapf(err, "could not get chunk at index %d", chunkIndex) } } keepGoing, err := chunk.Update( - &chunkUpdateArgs{ - chunkIndex: chunkIndex, - currentEpoch: currentEpoch, - }, + chunkIndex, + currentEpoch, validatorIndex, startEpoch, targetEpoch, @@ -490,8 +479,8 @@ func (s *Service) applyAttestationForValidator( return nil, nil } -// Retrieve a chunk from database from database. -func (s *Service) getChunk( +// Retrieve a chunk from database. +func (s *Service) getChunkFromDatabase( ctx context.Context, chunkKind slashertypes.ChunkKind, validatorChunkIndex uint64, @@ -517,14 +506,14 @@ func (s *Service) loadChunks( ctx context.Context, validatorChunkIndex uint64, chunkKind slashertypes.ChunkKind, - chunkIndices []uint64, + chunkIndexes []uint64, ) (map[uint64]Chunker, error) { ctx, span := trace.StartSpan(ctx, "Slasher.loadChunks") defer span.End() - chunkKeys := make([][]byte, 0, len(chunkIndices)) - for _, chunkIdx := range chunkIndices { - chunkKeys = append(chunkKeys, s.params.flatSliceID(validatorChunkIndex, chunkIdx)) + chunkKeys := make([][]byte, 0, len(chunkIndexes)) + for _, chunkIndex := range chunkIndexes { + chunkKeys = append(chunkKeys, s.params.flatSliceID(validatorChunkIndex, chunkIndex)) } rawChunks, chunksExist, err := s.serviceCfg.Database.LoadSlasherChunks(ctx, chunkKind, chunkKeys) @@ -563,7 +552,7 @@ func (s *Service) loadChunks( return nil, errors.Wrap(err, "could not initialize chunk") } - chunksByChunkIdx[chunkIndices[i]] = chunk + chunksByChunkIdx[chunkIndexes[i]] = chunk } return chunksByChunkIdx, nil diff --git a/beacon-chain/slasher/detect_attestations_test.go b/beacon-chain/slasher/detect_attestations_test.go index 34782d93e..1d87e776d 100644 --- a/beacon-chain/slasher/detect_attestations_test.go +++ b/beacon-chain/slasher/detect_attestations_test.go @@ -834,77 +834,293 @@ func Test_processQueuedAttestations_OverlappingChunkIndices(t *testing.T) { } func Test_epochUpdateForValidators(t *testing.T) { - ctx := context.Background() - slasherDB := dbtest.SetupSlasherDB(t) + neutralMin, neutralMax := uint16(65535), uint16(0) - // Check if the chunk at chunk index already exists in-memory. - s := &Service{ - params: &Parameters{ - chunkSize: 2, // 2 epochs in a chunk. - validatorChunkSize: 2, // 2 validators in a chunk. - historyLength: 4, + testCases := []struct { + name string + chunkSize uint64 + validatorChunkSize uint64 + historyLength primitives.Epoch + currentEpoch primitives.Epoch + validatorChunkIndex uint64 + latestUpdatedEpochByValidatorIndex map[primitives.ValidatorIndex]primitives.Epoch + initialMinChunkByChunkIndex map[uint64][]uint16 + expectedMinChunkByChunkIndex map[uint64][]uint16 + initialMaxChunkByChunkIndex map[uint64][]uint16 + expectedMaxChunkByChunkIndex map[uint64][]uint16 + }{ + { + name: "start with no data - first chunk", + chunkSize: 4, + validatorChunkSize: 2, + historyLength: 8, + currentEpoch: 2, + validatorChunkIndex: 21, + latestUpdatedEpochByValidatorIndex: nil, + initialMinChunkByChunkIndex: nil, + expectedMinChunkByChunkIndex: map[uint64][]uint16{ + // | validator 42 | validator 43 | + 0: {neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin}, + }, + initialMaxChunkByChunkIndex: nil, + expectedMaxChunkByChunkIndex: map[uint64][]uint16{ + // | validator 42 | validator 43 | + 0: {neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax}, + }, + }, + { + name: "start with no data - second chunk", + chunkSize: 4, + validatorChunkSize: 2, + historyLength: 8, + currentEpoch: 5, + validatorChunkIndex: 21, + latestUpdatedEpochByValidatorIndex: nil, + initialMinChunkByChunkIndex: nil, + expectedMinChunkByChunkIndex: map[uint64][]uint16{ + // | validator 42 | validator 43 | + 0: {neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin}, + 1: {neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin}, + }, + initialMaxChunkByChunkIndex: nil, + expectedMaxChunkByChunkIndex: map[uint64][]uint16{ + // | validator 42 | validator 43 | + 0: {neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax}, + 1: {neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax}, + }, + }, + { + name: "start with some data - first chunk", + chunkSize: 4, + validatorChunkSize: 2, + historyLength: 8, + currentEpoch: 2, + validatorChunkIndex: 21, + latestUpdatedEpochByValidatorIndex: map[primitives.ValidatorIndex]primitives.Epoch{42: 0, 43: 1}, + initialMinChunkByChunkIndex: map[uint64][]uint16{ + // | validator 42 | validator 43 | + 0: {14, 9999, 9999, 9999, 15, 16, 9999, 9999}, + }, + expectedMinChunkByChunkIndex: map[uint64][]uint16{ + // | validator 42 | validator 43 | + 0: {14, neutralMin, neutralMin, 9999, 15, 16, neutralMin, 9999}, + }, + initialMaxChunkByChunkIndex: map[uint64][]uint16{ + // | validator 42 | validator 43 | + 0: {70, 9999, 9999, 9999, 71, 72, 9999, 9999}, + }, + expectedMaxChunkByChunkIndex: map[uint64][]uint16{ + // | validator 42 | validator 43 | + 0: {70, neutralMax, neutralMax, 9999, 71, 72, neutralMax, 9999}, + }, + }, + { + name: "start with some data - second chunk", + chunkSize: 4, + validatorChunkSize: 2, + historyLength: 8, + currentEpoch: 5, + validatorChunkIndex: 21, + latestUpdatedEpochByValidatorIndex: map[primitives.ValidatorIndex]primitives.Epoch{42: 1, 43: 2}, + initialMinChunkByChunkIndex: map[uint64][]uint16{ + // | validator 42 | validator 43 | + 0: {14, 13, 9999, 9999, 15, 16, 17, 9999}, + }, + expectedMinChunkByChunkIndex: map[uint64][]uint16{ + // | validator 42 | validator 43 | + 0: {14, 13, neutralMin, neutralMin, 15, 16, 17, neutralMin}, + + // | validator 42 | validator 43 | + 1: {neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin}, + }, + initialMaxChunkByChunkIndex: map[uint64][]uint16{ + // | validator 42 | validator 43 | + 0: {70, 69, 9999, 9999, 71, 72, 73, 9999}, + }, + expectedMaxChunkByChunkIndex: map[uint64][]uint16{ + // | validator 42 | validator 43 | + 0: {70, 69, neutralMax, neutralMax, 71, 72, 73, neutralMax}, + + // | validator 42 | validator 43 | + 1: {neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax}, + }, + }, + { + name: "start with some data - third chunk", + chunkSize: 4, + validatorChunkSize: 2, + historyLength: 12, + currentEpoch: 9, + validatorChunkIndex: 21, + latestUpdatedEpochByValidatorIndex: map[primitives.ValidatorIndex]primitives.Epoch{42: 5, 43: 6}, + initialMinChunkByChunkIndex: map[uint64][]uint16{ + // | validator 42 | validator 43 | + 1: {14, 13, 9999, 9999, 15, 16, 17, 9999}, + }, + expectedMinChunkByChunkIndex: map[uint64][]uint16{ + // | validator 42 | validator 43 | + 1: {14, 13, neutralMin, neutralMin, 15, 16, 17, neutralMin}, + + // | validator 42 | validator 43 | + 2: {neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin}, + }, + initialMaxChunkByChunkIndex: map[uint64][]uint16{ + // | validator 42 | validator 43 | + 1: {70, 69, 9999, 9999, 71, 72, 73, 9999}, + }, + expectedMaxChunkByChunkIndex: map[uint64][]uint16{ + // | validator 42 | validator 43 | + 1: {70, 69, neutralMax, neutralMax, 71, 72, 73, neutralMax}, + + // | validator 42 | validator 43 | + 2: {neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax}, + }, + }, + { + name: "start with some data - third chunk - wrap to first chunk", + chunkSize: 4, + validatorChunkSize: 2, + historyLength: 12, + currentEpoch: 14, + validatorChunkIndex: 21, + latestUpdatedEpochByValidatorIndex: map[primitives.ValidatorIndex]primitives.Epoch{42: 9, 43: 10}, + initialMinChunkByChunkIndex: map[uint64][]uint16{ + // | validator 42 | validator 43 | + 0: {55, 55, 55, 55, 55, 55, 55, 55}, + 1: {66, 66, 66, 66, 66, 66, 66, 66}, + + // | validator 42 | validator 43 | + 2: {77, 77, 9999, 9999, 77, 77, 77, 9999}, + }, + expectedMinChunkByChunkIndex: map[uint64][]uint16{ + // | validator 42 | validator 43 | + 2: {77, 77, neutralMin, neutralMin, 77, 77, 77, neutralMin}, + + // | validator 42 | validator 43 | + 0: {neutralMin, neutralMin, neutralMin, 55, neutralMin, neutralMin, neutralMin, 55}, + }, + initialMaxChunkByChunkIndex: map[uint64][]uint16{ + // | validator 42 | validator 43 | + 0: {55, 55, 55, 55, 55, 55, 55, 55}, + 1: {66, 66, 66, 66, 66, 66, 66, 66}, + + // | validator 42 | validator 43 | + 2: {77, 77, 9999, 9999, 77, 77, 77, 9999}, + }, + expectedMaxChunkByChunkIndex: map[uint64][]uint16{ + // | validator 42 | validator 43 | + 2: {77, 77, neutralMax, neutralMax, 77, 77, 77, neutralMax}, + + // | validator 42 | validator 43 | + 0: {neutralMax, neutralMax, neutralMax, 55, neutralMax, neutralMax, neutralMax, 55}, + }, + }, + { + name: "start with some data - high latest updated epoch", + chunkSize: 4, + validatorChunkSize: 2, + historyLength: 12, + currentEpoch: 16, + validatorChunkIndex: 21, + latestUpdatedEpochByValidatorIndex: map[primitives.ValidatorIndex]primitives.Epoch{42: 2, 43: 3}, + initialMinChunkByChunkIndex: map[uint64][]uint16{ + // | validator 42 | validator 43 | + 0: {55, 55, 55, 55, 55, 55, 55, 55}, + 1: {66, 66, 66, 66, 66, 66, 66, 66}, + 2: {77, 77, 77, 77, 77, 77, 77, 77}, + }, + expectedMinChunkByChunkIndex: map[uint64][]uint16{ + // | validator 42 | validator 43 | + 0: {neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin}, + 1: {neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin}, + 2: {neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin}, + }, + initialMaxChunkByChunkIndex: map[uint64][]uint16{ + // | validator 42 | validator 43 | + 0: {55, 55, 55, 55, 55, 55, 55, 55}, + 1: {66, 66, 66, 66, 66, 66, 66, 66}, + 2: {77, 77, 77, 77, 77, 77, 77, 77}, + }, + expectedMaxChunkByChunkIndex: map[uint64][]uint16{ + // | validator 42 | validator 43 | + 0: {neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax}, + 1: {neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax}, + 2: {neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax}, + }, }, - serviceCfg: &ServiceConfig{Database: slasherDB}, - latestEpochWrittenForValidator: map[primitives.ValidatorIndex]primitives.Epoch{}, } - t.Run("no update if no latest written epoch", func(t *testing.T) { - validators := []primitives.ValidatorIndex{ - 1, 2, - } - currentEpoch := primitives.Epoch(3) - // No last written epoch for both validators. - s.latestEpochWrittenForValidator = map[primitives.ValidatorIndex]primitives.Epoch{} + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + // Create context. + ctx := context.Background() - // Because the validators have no recorded latest epoch written, we expect - // no chunks to be loaded nor updated to. - updatedChunks := make(map[uint64]Chunker) - for _, valIdx := range validators { - err := s.epochUpdateForValidator( - ctx, - updatedChunks, - 0, // validatorChunkIndex - slashertypes.MinSpan, - currentEpoch, - valIdx, + // Initialize the slasher database. + slasherDB := dbtest.SetupSlasherDB(t) + + // Intialize the slasher service. + service := &Service{ + params: &Parameters{ + chunkSize: tt.chunkSize, + validatorChunkSize: tt.validatorChunkSize, + historyLength: tt.historyLength, + }, + serviceCfg: &ServiceConfig{Database: slasherDB}, + latestEpochWrittenForValidator: tt.latestUpdatedEpochByValidatorIndex, + } + + // Save min initial chunks if they exist. + if tt.initialMinChunkByChunkIndex != nil { + minChunkerByChunkerIndex := map[uint64]Chunker{} + for chunkIndex, minChunk := range tt.initialMinChunkByChunkIndex { + minChunkerByChunkerIndex[chunkIndex] = &MinSpanChunksSlice{data: minChunk} + } + + err := service.saveUpdatedChunks(ctx, minChunkerByChunkerIndex, slashertypes.MinSpan, tt.validatorChunkIndex) + require.NoError(t, err) + } + + // Save max initial chunks if they exist. + if tt.initialMaxChunkByChunkIndex != nil { + maxChunkerByChunkerIndex := map[uint64]Chunker{} + for chunkIndex, maxChunk := range tt.initialMaxChunkByChunkIndex { + maxChunkerByChunkerIndex[chunkIndex] = &MaxSpanChunksSlice{data: maxChunk} + } + + err := service.saveUpdatedChunks(ctx, maxChunkerByChunkerIndex, slashertypes.MaxSpan, tt.validatorChunkIndex) + require.NoError(t, err) + } + + // Get chunks. + actualMinChunkByChunkIndex, err := service.updatedChunkByChunkIndex( + ctx, slashertypes.MinSpan, tt.currentEpoch, tt.validatorChunkIndex, ) + + // Compare the actual and expected chunks. require.NoError(t, err) - } - require.Equal(t, 0, len(updatedChunks)) - }) + require.Equal(t, len(tt.expectedMinChunkByChunkIndex), len(actualMinChunkByChunkIndex)) + for chunkIndex, expectedMinChunk := range tt.expectedMinChunkByChunkIndex { + actualMinChunk, ok := actualMinChunkByChunkIndex[chunkIndex] + require.Equal(t, true, ok) + require.Equal(t, len(expectedMinChunk), len(actualMinChunk.Chunk())) + require.DeepSSZEqual(t, expectedMinChunk, actualMinChunk.Chunk()) + } - t.Run("update from latest written epoch", func(t *testing.T) { - validators := []primitives.ValidatorIndex{ - 1, 2, - } - currentEpoch := primitives.Epoch(3) - - // Set the latest written epoch for validators to current epoch - 1. - latestWrittenEpoch := currentEpoch - 1 - s.latestEpochWrittenForValidator = map[primitives.ValidatorIndex]primitives.Epoch{ - 1: latestWrittenEpoch, - 2: latestWrittenEpoch, - } - - // Because the latest written epoch for the input validators is == 2, we expect - // that we will update all epochs from 2 up to 3 (the current epoch). This is all - // safe contained in chunk index 1. - updatedChunks := make(map[uint64]Chunker) - for _, valIdx := range validators { - err := s.epochUpdateForValidator( - ctx, - updatedChunks, - 0, // validatorChunkIndex, - slashertypes.MinSpan, - currentEpoch, - valIdx, + actualMaxChunkByChunkIndex, err := service.updatedChunkByChunkIndex( + ctx, slashertypes.MaxSpan, tt.currentEpoch, tt.validatorChunkIndex, ) + require.NoError(t, err) - } - require.Equal(t, 1, len(updatedChunks)) - _, ok := updatedChunks[1] - require.Equal(t, true, ok) - }) + require.Equal(t, len(tt.expectedMaxChunkByChunkIndex), len(actualMaxChunkByChunkIndex)) + for chunkIndex, expectedMaxChunk := range tt.expectedMaxChunkByChunkIndex { + actualMaxChunk, ok := actualMaxChunkByChunkIndex[chunkIndex] + require.Equal(t, true, ok) + require.Equal(t, len(expectedMaxChunk), len(actualMaxChunk.Chunk())) + require.DeepSSZEqual(t, expectedMaxChunk, actualMaxChunk.Chunk()) + } + + }) + } } func Test_applyAttestationForValidator_MinSpanChunk(t *testing.T) {