Slasher: Reduce cold start duration. (#13620)

* `getChunk` ==> `getChunkFromDatabase`.

* `loadChunks`: Rename variables.

* `Update`: Use explicit arguments.

* `detect_attestations.go`: Reduce abstraction layers.

* `loadAndUpdateChunks`: Change arguments order.

* `updatedChunkByChunkIndex`: Update all known validators in the chunk.

* `LastEpochWrittenForValidators`: Avoid avoidable `for`loop.

* `chunks.go`: Ensure implementations respect the interface.

* `LastEpochWrittenForValidators`: Stop considering lack of epoch as genesis epoch.

* `updatedChunkByChunkIndex`: Don't update latest updated epoch.

And add a bunch of tests.

* Improve slasher cold boot duration.

Before this commit, on a slasher cold boot (aka, without any db),
the `updatedChunkByChunkIndex` function looped for all validators
AND for all epochs between the genesis epoch and the current epoch.

This could take several dozen of minutes, and it is useless since the
min/max spans are actually a circular buffer with a limited lenght.
Cells of min/max spans can be overwritten (with the same value)
plenty of times.

After this commit, the `updatedChunkByChunkIndex` function loops
for all validators AND AT most 'historyLength' lenght.
Every cell of min/max spans are written AT MOST once.

Time needed for slasher boot goes from `O(nm)` to "only" `O(m)`, where:
- `n` is the number of epochs since the genesis.
- `m` is the number of validators.
This commit is contained in:
Manu NALEPA 2024-02-16 11:10:26 +01:00 committed by GitHub
parent 1ec745b88e
commit 4030614df0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 448 additions and 279 deletions

View File

@ -34,26 +34,26 @@ func (s *Store) LastEpochWrittenForValidators(
defer span.End()
attestedEpochs := make([]*slashertypes.AttestedEpochForValidator, 0)
encodedIndexes := make([][]byte, len(validatorIndexes))
for i, validatorIndex := range validatorIndexes {
encodedIndexes[i] = encodeValidatorIndex(validatorIndex)
}
err := s.db.View(func(tx *bolt.Tx) error {
bkt := tx.Bucket(attestedEpochsByValidator)
for i, encodedIndex := range encodedIndexes {
var epoch primitives.Epoch
for _, validatorIndex := range validatorIndexes {
encodedIndex := encodeValidatorIndex(validatorIndex)
if epochBytes := bkt.Get(encodedIndex); epochBytes != nil {
if err := epoch.UnmarshalSSZ(epochBytes); err != nil {
return err
}
epochBytes := bkt.Get(encodedIndex)
if epochBytes == nil {
// If there is no epoch for this validator, skip to the next validator.
continue
}
var epoch primitives.Epoch
if err := epoch.UnmarshalSSZ(epochBytes); err != nil {
return err
}
attestedEpoch := &slashertypes.AttestedEpochForValidator{
ValidatorIndex: validatorIndexes[i],
ValidatorIndex: validatorIndex,
Epoch: epoch,
}

View File

@ -56,19 +56,16 @@ func TestStore_LastEpochWrittenForValidators(t *testing.T) {
epochs[i] = primitives.Epoch(i)
}
attestedEpochs, err := beaconDB.LastEpochWrittenForValidators(ctx, indices)
require.NoError(t, err)
require.Equal(t, true, len(attestedEpochs) == len(indices))
for _, item := range attestedEpochs {
require.Equal(t, primitives.Epoch(0), item.Epoch)
}
epochsByValidator := make(map[primitives.ValidatorIndex]primitives.Epoch, validatorsCount)
for i := 0; i < validatorsCount; i++ {
epochsByValidator[indices[i]] = epochs[i]
}
// No epochs written for any validators, should return empty list.
attestedEpochs, err := beaconDB.LastEpochWrittenForValidators(ctx, indices)
require.NoError(t, err)
require.Equal(t, 0, len(attestedEpochs))
err = beaconDB.SaveLastEpochsWrittenForValidators(ctx, epochsByValidator)
require.NoError(t, err)

View File

@ -14,14 +14,6 @@ import (
"github.com/sirupsen/logrus"
)
// A struct encapsulating input arguments to
// functions used for attester slashing detection and
// loading, saving, and updating min/max span chunks.
type chunkUpdateArgs struct {
chunkIndex uint64
currentEpoch primitives.Epoch
}
// Chunker defines a struct which represents a slice containing a chunk for K different validator's
// min/max spans used for surround vote detection in slasher. The interface defines methods used to check
// if an attestation is slashable for a validator index based on the contents of
@ -36,7 +28,8 @@ type Chunker interface {
attestation *slashertypes.IndexedAttestationWrapper,
) (*ethpb.AttesterSlashing, error)
Update(
args *chunkUpdateArgs,
chunkIndex uint64,
currentEpoch primitives.Epoch,
validatorIndex primitives.ValidatorIndex,
startEpoch,
newTargetEpoch primitives.Epoch,
@ -88,6 +81,8 @@ type MinSpanChunksSlice struct {
data []uint16
}
var _ Chunker = (*MinSpanChunksSlice)(nil)
// MaxSpanChunksSlice represents the same data structure as MinSpanChunksSlice however
// keeps track of validator max spans for slashing detection instead.
type MaxSpanChunksSlice struct {
@ -95,6 +90,8 @@ type MaxSpanChunksSlice struct {
data []uint16
}
var _ Chunker = (*MaxSpanChunksSlice)(nil)
// EmptyMinSpanChunksSlice initializes a min span chunk of length C*K for
// C = chunkSize and K = validatorChunkSize filled with neutral elements.
// For min spans, the neutral element is `undefined`, represented by MaxUint16.
@ -384,21 +381,22 @@ func (m *MaxSpanChunksSlice) CheckSlashable(
// to update. In our example, we stop at 2, which is still part of chunk 0, so no need
// to jump to another min span chunks slice to perform updates.
func (m *MinSpanChunksSlice) Update(
args *chunkUpdateArgs,
chunkIndex uint64,
currentEpoch primitives.Epoch,
validatorIndex primitives.ValidatorIndex,
startEpoch,
newTargetEpoch primitives.Epoch,
) (keepGoing bool, err error) {
// The lowest epoch we need to update.
minEpoch := primitives.Epoch(0)
if args.currentEpoch > (m.params.historyLength - 1) {
minEpoch = args.currentEpoch - (m.params.historyLength - 1)
if currentEpoch > (m.params.historyLength - 1) {
minEpoch = currentEpoch - (m.params.historyLength - 1)
}
epochInChunk := startEpoch
// We go down the chunk for the validator, updating every value starting at startEpoch down to minEpoch.
// As long as the epoch, e, in the same chunk index and e >= minEpoch, we proceed with
// a for loop.
for m.params.chunkIndex(epochInChunk) == args.chunkIndex && epochInChunk >= minEpoch {
for m.params.chunkIndex(epochInChunk) == chunkIndex && epochInChunk >= minEpoch {
var chunkTarget primitives.Epoch
chunkTarget, err = chunkDataAtEpoch(m.params, m.data, validatorIndex, epochInChunk)
if err != nil {
@ -433,7 +431,8 @@ func (m *MinSpanChunksSlice) Update(
// more about how update exactly works, refer to the detailed documentation for the Update function for
// MinSpanChunksSlice.
func (m *MaxSpanChunksSlice) Update(
args *chunkUpdateArgs,
chunkIndex uint64,
currentEpoch primitives.Epoch,
validatorIndex primitives.ValidatorIndex,
startEpoch,
newTargetEpoch primitives.Epoch,
@ -442,7 +441,7 @@ func (m *MaxSpanChunksSlice) Update(
// We go down the chunk for the validator, updating every value starting at startEpoch up to
// and including the current epoch. As long as the epoch, e, is in the same chunk index and e <= currentEpoch,
// we proceed with a for loop.
for m.params.chunkIndex(epochInChunk) == args.chunkIndex && epochInChunk <= args.currentEpoch {
for m.params.chunkIndex(epochInChunk) == chunkIndex && epochInChunk <= currentEpoch {
var chunkTarget primitives.Epoch
chunkTarget, err = chunkDataAtEpoch(m.params, m.data, validatorIndex, epochInChunk)
if err != nil {
@ -465,7 +464,7 @@ func (m *MaxSpanChunksSlice) Update(
}
// If the epoch to update now lies beyond the current chunk, then
// continue to the next chunk to update it.
keepGoing = epochInChunk <= args.currentEpoch
keepGoing = epochInChunk <= currentEpoch
return
}

View File

@ -127,14 +127,10 @@ func TestMinSpanChunksSlice_CheckSlashable(t *testing.T) {
source = primitives.Epoch(1)
target = primitives.Epoch(2)
att = createAttestationWrapperEmptySig(t, source, target, nil, nil)
chunkIdx := uint64(0)
chunkIndex := uint64(0)
startEpoch := target
currentEpoch := target
args := &chunkUpdateArgs{
chunkIndex: chunkIdx,
currentEpoch: currentEpoch,
}
_, err = chunk.Update(args, validatorIdx, startEpoch, target)
_, err = chunk.Update(chunkIndex, currentEpoch, validatorIdx, startEpoch, target)
require.NoError(t, err)
// Next up, we create a surrounding vote, but it should NOT be slashable
@ -209,14 +205,10 @@ func TestMaxSpanChunksSlice_CheckSlashable(t *testing.T) {
source = primitives.Epoch(0)
target = primitives.Epoch(3)
att = createAttestationWrapperEmptySig(t, source, target, nil, nil)
chunkIdx := uint64(0)
chunkIndex := uint64(0)
startEpoch := source
currentEpoch := target
args := &chunkUpdateArgs{
chunkIndex: chunkIdx,
currentEpoch: currentEpoch,
}
_, err = chunk.Update(args, validatorIdx, startEpoch, target)
_, err = chunk.Update(chunkIndex, currentEpoch, validatorIdx, startEpoch, target)
require.NoError(t, err)
// Next up, we create a surrounded vote, but it should NOT be slashable
@ -288,15 +280,11 @@ func TestMinSpanChunksSlice_Update_MultipleChunks(t *testing.T) {
}
chunk := EmptyMinSpanChunksSlice(params)
target := primitives.Epoch(3)
chunkIdx := uint64(1)
validatorIdx := primitives.ValidatorIndex(0)
chunkIndex := uint64(1)
validatorIndex := primitives.ValidatorIndex(0)
startEpoch := target
currentEpoch := target
args := &chunkUpdateArgs{
chunkIndex: chunkIdx,
currentEpoch: currentEpoch,
}
keepGoing, err := chunk.Update(args, validatorIdx, startEpoch, target)
keepGoing, err := chunk.Update(chunkIndex, currentEpoch, validatorIndex, startEpoch, target)
require.NoError(t, err)
// We should keep going! We still have to update the data for chunk index 0.
@ -306,15 +294,11 @@ func TestMinSpanChunksSlice_Update_MultipleChunks(t *testing.T) {
// Now we update for chunk index 0.
chunk = EmptyMinSpanChunksSlice(params)
chunkIdx = uint64(0)
validatorIdx = primitives.ValidatorIndex(0)
chunkIndex = uint64(0)
validatorIndex = primitives.ValidatorIndex(0)
startEpoch = primitives.Epoch(1)
currentEpoch = target
args = &chunkUpdateArgs{
chunkIndex: chunkIdx,
currentEpoch: currentEpoch,
}
keepGoing, err = chunk.Update(args, validatorIdx, startEpoch, target)
keepGoing, err = chunk.Update(chunkIndex, currentEpoch, validatorIndex, startEpoch, target)
require.NoError(t, err)
require.Equal(t, false, keepGoing)
want = []uint16{3, 2, math.MaxUint16, math.MaxUint16, math.MaxUint16, math.MaxUint16}
@ -329,15 +313,11 @@ func TestMaxSpanChunksSlice_Update_MultipleChunks(t *testing.T) {
}
chunk := EmptyMaxSpanChunksSlice(params)
target := primitives.Epoch(3)
chunkIdx := uint64(0)
chunkIndex := uint64(0)
validatorIdx := primitives.ValidatorIndex(0)
startEpoch := primitives.Epoch(0)
currentEpoch := target
args := &chunkUpdateArgs{
chunkIndex: chunkIdx,
currentEpoch: currentEpoch,
}
keepGoing, err := chunk.Update(args, validatorIdx, startEpoch, target)
keepGoing, err := chunk.Update(chunkIndex, currentEpoch, validatorIdx, startEpoch, target)
require.NoError(t, err)
// We should keep going! We still have to update the data for chunk index 1.
@ -347,15 +327,11 @@ func TestMaxSpanChunksSlice_Update_MultipleChunks(t *testing.T) {
// Now we update for chunk index 1.
chunk = EmptyMaxSpanChunksSlice(params)
chunkIdx = uint64(1)
chunkIndex = uint64(1)
validatorIdx = primitives.ValidatorIndex(0)
startEpoch = primitives.Epoch(2)
currentEpoch = target
args = &chunkUpdateArgs{
chunkIndex: chunkIdx,
currentEpoch: currentEpoch,
}
keepGoing, err = chunk.Update(args, validatorIdx, startEpoch, target)
keepGoing, err = chunk.Update(chunkIndex, currentEpoch, validatorIdx, startEpoch, target)
require.NoError(t, err)
require.Equal(t, false, keepGoing)
want = []uint16{1, 0, 0, 0, 0, 0}
@ -393,15 +369,11 @@ func TestMinSpanChunksSlice_Update_SingleChunk(t *testing.T) {
}
chunk := EmptyMinSpanChunksSlice(params)
target := primitives.Epoch(1)
chunkIdx := uint64(0)
chunkIndex := uint64(0)
validatorIdx := primitives.ValidatorIndex(0)
startEpoch := target
currentEpoch := target
args := &chunkUpdateArgs{
chunkIndex: chunkIdx,
currentEpoch: currentEpoch,
}
keepGoing, err := chunk.Update(args, validatorIdx, startEpoch, target)
keepGoing, err := chunk.Update(chunkIndex, currentEpoch, validatorIdx, startEpoch, target)
require.NoError(t, err)
require.Equal(t, false, keepGoing)
want := []uint16{1, 0, math.MaxUint16, math.MaxUint16, math.MaxUint16, math.MaxUint16}
@ -416,15 +388,11 @@ func TestMaxSpanChunksSlice_Update_SingleChunk(t *testing.T) {
}
chunk := EmptyMaxSpanChunksSlice(params)
target := primitives.Epoch(3)
chunkIdx := uint64(0)
chunkIndex := uint64(0)
validatorIdx := primitives.ValidatorIndex(0)
startEpoch := primitives.Epoch(0)
currentEpoch := target
args := &chunkUpdateArgs{
chunkIndex: chunkIdx,
currentEpoch: currentEpoch,
}
keepGoing, err := chunk.Update(args, validatorIdx, startEpoch, target)
keepGoing, err := chunk.Update(chunkIndex, currentEpoch, validatorIdx, startEpoch, target)
require.NoError(t, err)
require.Equal(t, false, keepGoing)
want := []uint16{3, 2, 1, 0, 0, 0, 0, 0}

View File

@ -20,14 +20,11 @@ import (
func (s *Service) checkSlashableAttestations(
ctx context.Context, currentEpoch primitives.Epoch, atts []*slashertypes.IndexedAttestationWrapper,
) (map[[fieldparams.RootLength]byte]*ethpb.AttesterSlashing, error) {
totalStart := time.Now()
start := time.Now()
slashings := map[[fieldparams.RootLength]byte]*ethpb.AttesterSlashing{}
// Double votes
log.Debug("Checking for double votes")
start := time.Now()
doubleVoteSlashings, err := s.checkDoubleVotes(ctx, atts)
if err != nil {
return nil, errors.Wrap(err, "could not check slashable double votes")
@ -47,40 +44,20 @@ func (s *Service) checkSlashableAttestations(
}
// Surrounding / surrounded votes
groupedByValidatorChunkIndexAtts := s.groupByValidatorChunkIndex(atts)
log.WithField("numBatches", len(groupedByValidatorChunkIndexAtts)).Debug("Batching attestations by validator chunk index")
groupsCount := len(groupedByValidatorChunkIndexAtts)
surroundStart := time.Now()
for validatorChunkIndex, attestations := range groupedByValidatorChunkIndexAtts {
surroundSlashings, err := s.checkSurrounds(ctx, attestations, currentEpoch, validatorChunkIndex)
if err != nil {
return nil, err
}
for root, slashing := range surroundSlashings {
slashings[root] = slashing
}
indices := s.params.validatorIndexesInChunk(validatorChunkIndex)
for _, idx := range indices {
s.latestEpochWrittenForValidator[idx] = currentEpoch
}
surroundSlashings, err := s.checkSurroundVotes(ctx, atts, currentEpoch)
if err != nil {
return nil, errors.Wrap(err, "could not check slashable surround votes")
}
surroundElapsed := time.Since(surroundStart)
totalElapsed := time.Since(totalStart)
for root, slashing := range surroundSlashings {
slashings[root] = slashing
}
elapsed := time.Since(start)
fields := logrus.Fields{
"numAttestations": len(atts),
"numBatchesByValidatorChunkIndex": groupsCount,
"elapsed": totalElapsed,
}
if groupsCount > 0 {
avgProcessingTimePerBatch := surroundElapsed / time.Duration(groupsCount)
fields["avgBatchProcessingTime"] = avgProcessingTimePerBatch
"numAttestations": len(atts),
"elapsed": elapsed,
}
log.WithFields(fields).Info("Done checking slashable attestations")
@ -92,70 +69,65 @@ func (s *Service) checkSlashableAttestations(
return slashings, nil
}
// Given a list of attestations all corresponding to a validator chunk index as well
// as the current epoch in time, we perform slashing detection.
// The process is as follows given a list of attestations:
//
// 1. Group the attestations by chunk index.
// 2. Update the min and max spans for those grouped attestations, check if any slashings are
// found in the process
// 3. Update the latest written epoch for all validators involved to the current epoch.
//
// This function performs a lot of critical actions and is split into smaller helpers for cleanliness.
func (s *Service) checkSurrounds(
// Check for surrounding and surrounded votes in our database given a list of incoming attestations.
func (s *Service) checkSurroundVotes(
ctx context.Context,
attestations []*slashertypes.IndexedAttestationWrapper,
attWrappers []*slashertypes.IndexedAttestationWrapper,
currentEpoch primitives.Epoch,
validatorChunkIndex uint64,
) (map[[fieldparams.RootLength]byte]*ethpb.AttesterSlashing, error) {
// Map of updated chunks by chunk index, which will be saved at the end.
updatedMinChunks, updatedMaxChunks := map[uint64]Chunker{}, map[uint64]Chunker{}
groupedByChunkIndexAtts := s.groupByChunkIndex(attestations)
validatorIndexes := s.params.validatorIndexesInChunk(validatorChunkIndex)
slashings := map[[fieldparams.RootLength]byte]*ethpb.AttesterSlashing{}
// Update epoch for validators.
for _, validatorIndex := range validatorIndexes {
// This function modifies `updatedMinChunks` in place.
if err := s.epochUpdateForValidator(ctx, updatedMinChunks, validatorChunkIndex, slashertypes.MinSpan, currentEpoch, validatorIndex); err != nil {
return nil, errors.Wrapf(err, "could not update validator index for min chunks %d", validatorIndex)
// Group attestation wrappers by validator chunk index.
attWrappersByValidatorChunkIndex := s.groupByValidatorChunkIndex(attWrappers)
for validatorChunkIndex, attWrappers := range attWrappersByValidatorChunkIndex {
minChunkByChunkIndex, err := s.updatedChunkByChunkIndex(ctx, slashertypes.MinSpan, currentEpoch, validatorChunkIndex)
if err != nil {
return nil, errors.Wrap(err, "could not update updatedMinChunks")
}
// This function modifies `updatedMaxChunks` in place.
if err := s.epochUpdateForValidator(ctx, updatedMaxChunks, validatorChunkIndex, slashertypes.MaxSpan, currentEpoch, validatorIndex); err != nil {
return nil, errors.Wrapf(err, "could not update validator index for max chunks %d", validatorIndex)
maxChunkByChunkIndex, err := s.updatedChunkByChunkIndex(ctx, slashertypes.MaxSpan, currentEpoch, validatorChunkIndex)
if err != nil {
return nil, errors.Wrap(err, "could not update updatedMaxChunks")
}
}
// Check for surrounding votes.
surroundingSlashings, err := s.updateSpans(ctx, updatedMinChunks, groupedByChunkIndexAtts, slashertypes.MinSpan, validatorChunkIndex, currentEpoch)
if err != nil {
return nil, errors.Wrapf(err, "could not update min attestation spans for validator chunk index %d", validatorChunkIndex)
}
// Group (already grouped by validator chunk index) attestation wrappers by chunk index.
attWrappersByChunkIndex := s.groupByChunkIndex(attWrappers)
for root, slashing := range surroundingSlashings {
slashings[root] = slashing
}
// Check for surrounding votes.
surroundingSlashings, err := s.updateSpans(ctx, minChunkByChunkIndex, attWrappersByChunkIndex, slashertypes.MinSpan, validatorChunkIndex, currentEpoch)
if err != nil {
return nil, errors.Wrapf(err, "could not update min attestation spans for validator chunk index %d", validatorChunkIndex)
}
// Check for surrounded votes.
surroundedSlashings, err := s.updateSpans(ctx, updatedMaxChunks, groupedByChunkIndexAtts, slashertypes.MaxSpan, validatorChunkIndex, currentEpoch)
if err != nil {
return nil, errors.Wrapf(err, "could not update max attestation spans for validator chunk index %d", validatorChunkIndex)
}
for root, slashing := range surroundingSlashings {
slashings[root] = slashing
}
for root, slashing := range surroundedSlashings {
slashings[root] = slashing
}
// Check for surrounded votes.
surroundedSlashings, err := s.updateSpans(ctx, maxChunkByChunkIndex, attWrappersByChunkIndex, slashertypes.MaxSpan, validatorChunkIndex, currentEpoch)
if err != nil {
return nil, errors.Wrapf(err, "could not update max attestation spans for validator chunk index %d", validatorChunkIndex)
}
// Save updated chunks into the database.
if err := s.saveUpdatedChunks(ctx, updatedMinChunks, slashertypes.MinSpan, validatorChunkIndex); err != nil {
return nil, errors.Wrap(err, "could not save chunks for min spans")
}
for root, slashing := range surroundedSlashings {
slashings[root] = slashing
}
if err := s.saveUpdatedChunks(ctx, updatedMaxChunks, slashertypes.MaxSpan, validatorChunkIndex); err != nil {
return nil, errors.Wrap(err, "could not save chunks for max spans")
// Save updated chunks into the database.
if err := s.saveUpdatedChunks(ctx, minChunkByChunkIndex, slashertypes.MinSpan, validatorChunkIndex); err != nil {
return nil, errors.Wrap(err, "could not save chunks for min spans")
}
if err := s.saveUpdatedChunks(ctx, maxChunkByChunkIndex, slashertypes.MaxSpan, validatorChunkIndex); err != nil {
return nil, errors.Wrap(err, "could not save chunks for max spans")
}
// Update the latest written epoch for all validators involved to the current chunk.
indices := s.params.validatorIndexesInChunk(validatorChunkIndex)
for _, idx := range indices {
s.latestEpochWrittenForValidator[idx] = currentEpoch
}
}
return slashings, nil
@ -264,55 +236,74 @@ func (s *Service) checkDoubleVotes(
return slashings, nil
}
// This function updates `updatedChunks`, representing the slashing spans for a given validator for
// a change in epoch since the last epoch we have recorded for the validator.
// For example, if the last epoch a validator has written is N, and the current epoch is N+5,
// we update entries in the slashing spans with their neutral element for epochs N+1 to N+4.
// This also puts any loaded chunks in a map used as a cache for further processing and minimizing
// database reads later on.
func (s *Service) epochUpdateForValidator(
// updatedChunkByChunkIndex loads the chunks from the database for validators corresponding to
// the `validatorChunkIndex`.
// It then updates the chunks with the neutral element for corresponding validators from
// the epoch just after the latest epoch written to the current epoch.
// A mapping between chunk index and chunk is returned to the caller.
func (s *Service) updatedChunkByChunkIndex(
ctx context.Context,
updatedChunks map[uint64]Chunker,
validatorChunkIndex uint64,
chunkKind slashertypes.ChunkKind,
currentEpoch primitives.Epoch,
validatorIndex primitives.ValidatorIndex,
) error {
var err error
validatorChunkIndex uint64,
) (map[uint64]Chunker, error) {
chunkByChunkIndex := map[uint64]Chunker{}
latestEpochWritten, ok := s.latestEpochWrittenForValidator[validatorIndex]
if !ok {
return nil
}
validatorIndexes := s.params.validatorIndexesInChunk(validatorChunkIndex)
for _, validatorIndex := range validatorIndexes {
// Retrieve the latest epoch written for the validator.
latestEpochWritten, ok := s.latestEpochWrittenForValidator[validatorIndex]
for latestEpochWritten <= currentEpoch {
chunkIndex := s.params.chunkIndex(latestEpochWritten)
// Start from the epoch just after the latest epoch written.
epochToWrite, err := latestEpochWritten.SafeAdd(1)
if err != nil {
return nil, errors.Wrap(err, "could not add 1 to latest epoch written")
}
currentChunk, ok := updatedChunks[chunkIndex]
if !ok {
currentChunk, err = s.getChunk(ctx, chunkKind, validatorChunkIndex, chunkIndex)
if err != nil {
return errors.Wrap(err, "could not get chunk")
}
epochToWrite = 0
}
for s.params.chunkIndex(latestEpochWritten) == chunkIndex && latestEpochWritten <= currentEpoch {
if err := setChunkRawDistance(
s.params,
currentChunk.Chunk(),
validatorIndex,
latestEpochWritten,
currentChunk.NeutralElement(),
); err != nil {
return err
// It is useless to update more than `historyLength` epochs, since
// the chunks are circular and we will be overwritten at least one.
if currentEpoch-epochToWrite >= s.params.historyLength {
epochToWrite = currentEpoch + 1 - s.params.historyLength
}
for epochToWrite <= currentEpoch {
// Get the chunk index for the latest epoch written.
chunkIndex := s.params.chunkIndex(epochToWrite)
// Get the chunk corresponding to the chunk index from the `chunkByChunkIndex` map.
currentChunk, ok := chunkByChunkIndex[chunkIndex]
if !ok {
// If the chunk is not in the map, retrieve it from the database.
currentChunk, err = s.getChunkFromDatabase(ctx, chunkKind, validatorChunkIndex, chunkIndex)
if err != nil {
return nil, errors.Wrap(err, "could not get chunk")
}
}
updatedChunks[chunkIndex] = currentChunk
latestEpochWritten++
// Update the current chunk with the neutral element for the validator index for the latest epoch written.
for s.params.chunkIndex(epochToWrite) == chunkIndex && epochToWrite <= currentEpoch {
if err := setChunkRawDistance(
s.params,
currentChunk.Chunk(),
validatorIndex,
epochToWrite,
currentChunk.NeutralElement(),
); err != nil {
return nil, err
}
epochToWrite++
}
chunkByChunkIndex[chunkIndex] = currentChunk
}
}
return nil
return chunkByChunkIndex, nil
}
// Updates spans and detects any slashable attester offenses along the way.
@ -409,7 +400,7 @@ func (s *Service) applyAttestationForValidator(
chunk, ok := chunksByChunkIdx[chunkIndex]
if !ok {
chunk, err = s.getChunk(ctx, chunkKind, validatorChunkIndex, chunkIndex)
chunk, err = s.getChunkFromDatabase(ctx, chunkKind, validatorChunkIndex, chunkIndex)
if err != nil {
return nil, errors.Wrapf(err, "could not get chunk at index %d", chunkIndex)
}
@ -451,17 +442,15 @@ func (s *Service) applyAttestationForValidator(
chunk, ok := chunksByChunkIdx[chunkIndex]
if !ok {
chunk, err = s.getChunk(ctx, chunkKind, validatorChunkIndex, chunkIndex)
chunk, err = s.getChunkFromDatabase(ctx, chunkKind, validatorChunkIndex, chunkIndex)
if err != nil {
return nil, errors.Wrapf(err, "could not get chunk at index %d", chunkIndex)
}
}
keepGoing, err := chunk.Update(
&chunkUpdateArgs{
chunkIndex: chunkIndex,
currentEpoch: currentEpoch,
},
chunkIndex,
currentEpoch,
validatorIndex,
startEpoch,
targetEpoch,
@ -490,8 +479,8 @@ func (s *Service) applyAttestationForValidator(
return nil, nil
}
// Retrieve a chunk from database from database.
func (s *Service) getChunk(
// Retrieve a chunk from database.
func (s *Service) getChunkFromDatabase(
ctx context.Context,
chunkKind slashertypes.ChunkKind,
validatorChunkIndex uint64,
@ -517,14 +506,14 @@ func (s *Service) loadChunks(
ctx context.Context,
validatorChunkIndex uint64,
chunkKind slashertypes.ChunkKind,
chunkIndices []uint64,
chunkIndexes []uint64,
) (map[uint64]Chunker, error) {
ctx, span := trace.StartSpan(ctx, "Slasher.loadChunks")
defer span.End()
chunkKeys := make([][]byte, 0, len(chunkIndices))
for _, chunkIdx := range chunkIndices {
chunkKeys = append(chunkKeys, s.params.flatSliceID(validatorChunkIndex, chunkIdx))
chunkKeys := make([][]byte, 0, len(chunkIndexes))
for _, chunkIndex := range chunkIndexes {
chunkKeys = append(chunkKeys, s.params.flatSliceID(validatorChunkIndex, chunkIndex))
}
rawChunks, chunksExist, err := s.serviceCfg.Database.LoadSlasherChunks(ctx, chunkKind, chunkKeys)
@ -563,7 +552,7 @@ func (s *Service) loadChunks(
return nil, errors.Wrap(err, "could not initialize chunk")
}
chunksByChunkIdx[chunkIndices[i]] = chunk
chunksByChunkIdx[chunkIndexes[i]] = chunk
}
return chunksByChunkIdx, nil

View File

@ -834,77 +834,293 @@ func Test_processQueuedAttestations_OverlappingChunkIndices(t *testing.T) {
}
func Test_epochUpdateForValidators(t *testing.T) {
ctx := context.Background()
slasherDB := dbtest.SetupSlasherDB(t)
neutralMin, neutralMax := uint16(65535), uint16(0)
// Check if the chunk at chunk index already exists in-memory.
s := &Service{
params: &Parameters{
chunkSize: 2, // 2 epochs in a chunk.
validatorChunkSize: 2, // 2 validators in a chunk.
historyLength: 4,
testCases := []struct {
name string
chunkSize uint64
validatorChunkSize uint64
historyLength primitives.Epoch
currentEpoch primitives.Epoch
validatorChunkIndex uint64
latestUpdatedEpochByValidatorIndex map[primitives.ValidatorIndex]primitives.Epoch
initialMinChunkByChunkIndex map[uint64][]uint16
expectedMinChunkByChunkIndex map[uint64][]uint16
initialMaxChunkByChunkIndex map[uint64][]uint16
expectedMaxChunkByChunkIndex map[uint64][]uint16
}{
{
name: "start with no data - first chunk",
chunkSize: 4,
validatorChunkSize: 2,
historyLength: 8,
currentEpoch: 2,
validatorChunkIndex: 21,
latestUpdatedEpochByValidatorIndex: nil,
initialMinChunkByChunkIndex: nil,
expectedMinChunkByChunkIndex: map[uint64][]uint16{
// | validator 42 | validator 43 |
0: {neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin},
},
initialMaxChunkByChunkIndex: nil,
expectedMaxChunkByChunkIndex: map[uint64][]uint16{
// | validator 42 | validator 43 |
0: {neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax},
},
},
{
name: "start with no data - second chunk",
chunkSize: 4,
validatorChunkSize: 2,
historyLength: 8,
currentEpoch: 5,
validatorChunkIndex: 21,
latestUpdatedEpochByValidatorIndex: nil,
initialMinChunkByChunkIndex: nil,
expectedMinChunkByChunkIndex: map[uint64][]uint16{
// | validator 42 | validator 43 |
0: {neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin},
1: {neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin},
},
initialMaxChunkByChunkIndex: nil,
expectedMaxChunkByChunkIndex: map[uint64][]uint16{
// | validator 42 | validator 43 |
0: {neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax},
1: {neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax},
},
},
{
name: "start with some data - first chunk",
chunkSize: 4,
validatorChunkSize: 2,
historyLength: 8,
currentEpoch: 2,
validatorChunkIndex: 21,
latestUpdatedEpochByValidatorIndex: map[primitives.ValidatorIndex]primitives.Epoch{42: 0, 43: 1},
initialMinChunkByChunkIndex: map[uint64][]uint16{
// | validator 42 | validator 43 |
0: {14, 9999, 9999, 9999, 15, 16, 9999, 9999},
},
expectedMinChunkByChunkIndex: map[uint64][]uint16{
// | validator 42 | validator 43 |
0: {14, neutralMin, neutralMin, 9999, 15, 16, neutralMin, 9999},
},
initialMaxChunkByChunkIndex: map[uint64][]uint16{
// | validator 42 | validator 43 |
0: {70, 9999, 9999, 9999, 71, 72, 9999, 9999},
},
expectedMaxChunkByChunkIndex: map[uint64][]uint16{
// | validator 42 | validator 43 |
0: {70, neutralMax, neutralMax, 9999, 71, 72, neutralMax, 9999},
},
},
{
name: "start with some data - second chunk",
chunkSize: 4,
validatorChunkSize: 2,
historyLength: 8,
currentEpoch: 5,
validatorChunkIndex: 21,
latestUpdatedEpochByValidatorIndex: map[primitives.ValidatorIndex]primitives.Epoch{42: 1, 43: 2},
initialMinChunkByChunkIndex: map[uint64][]uint16{
// | validator 42 | validator 43 |
0: {14, 13, 9999, 9999, 15, 16, 17, 9999},
},
expectedMinChunkByChunkIndex: map[uint64][]uint16{
// | validator 42 | validator 43 |
0: {14, 13, neutralMin, neutralMin, 15, 16, 17, neutralMin},
// | validator 42 | validator 43 |
1: {neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin},
},
initialMaxChunkByChunkIndex: map[uint64][]uint16{
// | validator 42 | validator 43 |
0: {70, 69, 9999, 9999, 71, 72, 73, 9999},
},
expectedMaxChunkByChunkIndex: map[uint64][]uint16{
// | validator 42 | validator 43 |
0: {70, 69, neutralMax, neutralMax, 71, 72, 73, neutralMax},
// | validator 42 | validator 43 |
1: {neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax},
},
},
{
name: "start with some data - third chunk",
chunkSize: 4,
validatorChunkSize: 2,
historyLength: 12,
currentEpoch: 9,
validatorChunkIndex: 21,
latestUpdatedEpochByValidatorIndex: map[primitives.ValidatorIndex]primitives.Epoch{42: 5, 43: 6},
initialMinChunkByChunkIndex: map[uint64][]uint16{
// | validator 42 | validator 43 |
1: {14, 13, 9999, 9999, 15, 16, 17, 9999},
},
expectedMinChunkByChunkIndex: map[uint64][]uint16{
// | validator 42 | validator 43 |
1: {14, 13, neutralMin, neutralMin, 15, 16, 17, neutralMin},
// | validator 42 | validator 43 |
2: {neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin},
},
initialMaxChunkByChunkIndex: map[uint64][]uint16{
// | validator 42 | validator 43 |
1: {70, 69, 9999, 9999, 71, 72, 73, 9999},
},
expectedMaxChunkByChunkIndex: map[uint64][]uint16{
// | validator 42 | validator 43 |
1: {70, 69, neutralMax, neutralMax, 71, 72, 73, neutralMax},
// | validator 42 | validator 43 |
2: {neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax},
},
},
{
name: "start with some data - third chunk - wrap to first chunk",
chunkSize: 4,
validatorChunkSize: 2,
historyLength: 12,
currentEpoch: 14,
validatorChunkIndex: 21,
latestUpdatedEpochByValidatorIndex: map[primitives.ValidatorIndex]primitives.Epoch{42: 9, 43: 10},
initialMinChunkByChunkIndex: map[uint64][]uint16{
// | validator 42 | validator 43 |
0: {55, 55, 55, 55, 55, 55, 55, 55},
1: {66, 66, 66, 66, 66, 66, 66, 66},
// | validator 42 | validator 43 |
2: {77, 77, 9999, 9999, 77, 77, 77, 9999},
},
expectedMinChunkByChunkIndex: map[uint64][]uint16{
// | validator 42 | validator 43 |
2: {77, 77, neutralMin, neutralMin, 77, 77, 77, neutralMin},
// | validator 42 | validator 43 |
0: {neutralMin, neutralMin, neutralMin, 55, neutralMin, neutralMin, neutralMin, 55},
},
initialMaxChunkByChunkIndex: map[uint64][]uint16{
// | validator 42 | validator 43 |
0: {55, 55, 55, 55, 55, 55, 55, 55},
1: {66, 66, 66, 66, 66, 66, 66, 66},
// | validator 42 | validator 43 |
2: {77, 77, 9999, 9999, 77, 77, 77, 9999},
},
expectedMaxChunkByChunkIndex: map[uint64][]uint16{
// | validator 42 | validator 43 |
2: {77, 77, neutralMax, neutralMax, 77, 77, 77, neutralMax},
// | validator 42 | validator 43 |
0: {neutralMax, neutralMax, neutralMax, 55, neutralMax, neutralMax, neutralMax, 55},
},
},
{
name: "start with some data - high latest updated epoch",
chunkSize: 4,
validatorChunkSize: 2,
historyLength: 12,
currentEpoch: 16,
validatorChunkIndex: 21,
latestUpdatedEpochByValidatorIndex: map[primitives.ValidatorIndex]primitives.Epoch{42: 2, 43: 3},
initialMinChunkByChunkIndex: map[uint64][]uint16{
// | validator 42 | validator 43 |
0: {55, 55, 55, 55, 55, 55, 55, 55},
1: {66, 66, 66, 66, 66, 66, 66, 66},
2: {77, 77, 77, 77, 77, 77, 77, 77},
},
expectedMinChunkByChunkIndex: map[uint64][]uint16{
// | validator 42 | validator 43 |
0: {neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin},
1: {neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin},
2: {neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin},
},
initialMaxChunkByChunkIndex: map[uint64][]uint16{
// | validator 42 | validator 43 |
0: {55, 55, 55, 55, 55, 55, 55, 55},
1: {66, 66, 66, 66, 66, 66, 66, 66},
2: {77, 77, 77, 77, 77, 77, 77, 77},
},
expectedMaxChunkByChunkIndex: map[uint64][]uint16{
// | validator 42 | validator 43 |
0: {neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax},
1: {neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax},
2: {neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax},
},
},
serviceCfg: &ServiceConfig{Database: slasherDB},
latestEpochWrittenForValidator: map[primitives.ValidatorIndex]primitives.Epoch{},
}
t.Run("no update if no latest written epoch", func(t *testing.T) {
validators := []primitives.ValidatorIndex{
1, 2,
}
currentEpoch := primitives.Epoch(3)
// No last written epoch for both validators.
s.latestEpochWrittenForValidator = map[primitives.ValidatorIndex]primitives.Epoch{}
for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
// Create context.
ctx := context.Background()
// Because the validators have no recorded latest epoch written, we expect
// no chunks to be loaded nor updated to.
updatedChunks := make(map[uint64]Chunker)
for _, valIdx := range validators {
err := s.epochUpdateForValidator(
ctx,
updatedChunks,
0, // validatorChunkIndex
slashertypes.MinSpan,
currentEpoch,
valIdx,
// Initialize the slasher database.
slasherDB := dbtest.SetupSlasherDB(t)
// Intialize the slasher service.
service := &Service{
params: &Parameters{
chunkSize: tt.chunkSize,
validatorChunkSize: tt.validatorChunkSize,
historyLength: tt.historyLength,
},
serviceCfg: &ServiceConfig{Database: slasherDB},
latestEpochWrittenForValidator: tt.latestUpdatedEpochByValidatorIndex,
}
// Save min initial chunks if they exist.
if tt.initialMinChunkByChunkIndex != nil {
minChunkerByChunkerIndex := map[uint64]Chunker{}
for chunkIndex, minChunk := range tt.initialMinChunkByChunkIndex {
minChunkerByChunkerIndex[chunkIndex] = &MinSpanChunksSlice{data: minChunk}
}
err := service.saveUpdatedChunks(ctx, minChunkerByChunkerIndex, slashertypes.MinSpan, tt.validatorChunkIndex)
require.NoError(t, err)
}
// Save max initial chunks if they exist.
if tt.initialMaxChunkByChunkIndex != nil {
maxChunkerByChunkerIndex := map[uint64]Chunker{}
for chunkIndex, maxChunk := range tt.initialMaxChunkByChunkIndex {
maxChunkerByChunkerIndex[chunkIndex] = &MaxSpanChunksSlice{data: maxChunk}
}
err := service.saveUpdatedChunks(ctx, maxChunkerByChunkerIndex, slashertypes.MaxSpan, tt.validatorChunkIndex)
require.NoError(t, err)
}
// Get chunks.
actualMinChunkByChunkIndex, err := service.updatedChunkByChunkIndex(
ctx, slashertypes.MinSpan, tt.currentEpoch, tt.validatorChunkIndex,
)
// Compare the actual and expected chunks.
require.NoError(t, err)
}
require.Equal(t, 0, len(updatedChunks))
})
require.Equal(t, len(tt.expectedMinChunkByChunkIndex), len(actualMinChunkByChunkIndex))
for chunkIndex, expectedMinChunk := range tt.expectedMinChunkByChunkIndex {
actualMinChunk, ok := actualMinChunkByChunkIndex[chunkIndex]
require.Equal(t, true, ok)
require.Equal(t, len(expectedMinChunk), len(actualMinChunk.Chunk()))
require.DeepSSZEqual(t, expectedMinChunk, actualMinChunk.Chunk())
}
t.Run("update from latest written epoch", func(t *testing.T) {
validators := []primitives.ValidatorIndex{
1, 2,
}
currentEpoch := primitives.Epoch(3)
// Set the latest written epoch for validators to current epoch - 1.
latestWrittenEpoch := currentEpoch - 1
s.latestEpochWrittenForValidator = map[primitives.ValidatorIndex]primitives.Epoch{
1: latestWrittenEpoch,
2: latestWrittenEpoch,
}
// Because the latest written epoch for the input validators is == 2, we expect
// that we will update all epochs from 2 up to 3 (the current epoch). This is all
// safe contained in chunk index 1.
updatedChunks := make(map[uint64]Chunker)
for _, valIdx := range validators {
err := s.epochUpdateForValidator(
ctx,
updatedChunks,
0, // validatorChunkIndex,
slashertypes.MinSpan,
currentEpoch,
valIdx,
actualMaxChunkByChunkIndex, err := service.updatedChunkByChunkIndex(
ctx, slashertypes.MaxSpan, tt.currentEpoch, tt.validatorChunkIndex,
)
require.NoError(t, err)
}
require.Equal(t, 1, len(updatedChunks))
_, ok := updatedChunks[1]
require.Equal(t, true, ok)
})
require.Equal(t, len(tt.expectedMaxChunkByChunkIndex), len(actualMaxChunkByChunkIndex))
for chunkIndex, expectedMaxChunk := range tt.expectedMaxChunkByChunkIndex {
actualMaxChunk, ok := actualMaxChunkByChunkIndex[chunkIndex]
require.Equal(t, true, ok)
require.Equal(t, len(expectedMaxChunk), len(actualMaxChunk.Chunk()))
require.DeepSSZEqual(t, expectedMaxChunk, actualMaxChunk.Chunk())
}
})
}
}
func Test_applyAttestationForValidator_MinSpanChunk(t *testing.T) {