Slasher: Reduce surrounding/surrounded attestations processing time (#13629)

* Improve package documentation.

* `processAttestations`: Improve logging.

* Add `Benchmark_checkSurroundVotes` benchmark.

* Implement `saveChunksToDisk` as remplacement of `saveUpdatedChunks`.

The idea is to open only on DB transaction for all validator chunk indexes instead of
one DB transaction per validator chunk index.

It saves the overhead due to transaction start/stop of the DB.

Result of `Benchmark_checkSurroundVotes`:
- Before this commit: 133 seconds
- After this commit: 5.05 seconds

* `LoadSlasherChunks` and `SaveSlasherChunks`: Batch.

* `loadChunks` ==> `loadChunksFromDisk`

* `updatedChunkByChunkIndex`: Don't update if `latestEpochWritten == currentEpoch `.

* `updatedChunkByChunkIndex`: Load all needed chunks once.

* `latestEpochWritten` ==> `latestEpochUpdated`.

* `checkSurroundVotes`: Dump to disk at most every `25_600` chunks.

* `SaveAttestationRecordsForValidators`: Batch.

* `batchSize`: Use as package const and add comment.
This commit is contained in:
Manu NALEPA 2024-02-21 16:12:37 +01:00 committed by GitHub
parent 24b029bbef
commit cb80d5ad32
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 760 additions and 237 deletions

View File

@ -48,7 +48,6 @@ go_test(
"//consensus-types/primitives:go_default_library",
"//encoding/bytesutil:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//testing/assert:go_default_library",
"//testing/require:go_default_library",
"//time/slots:go_default_library",
"@com_github_prysmaticlabs_fastssz//:go_default_library",

View File

@ -23,6 +23,10 @@ import (
const (
attestationRecordKeySize = 32 // Bytes.
rootSize = 32 // Bytes.
// For database performance reasons, database read/write operations
// are chunked into batches of maximum `batchSize` elements.
batchSize = 10_000
)
// LastEpochWrittenForValidators given a list of validator indices returns the latest
@ -259,14 +263,23 @@ func (s *Store) AttestationRecordForValidator(
// then only the first one is (arbitrarily) saved in the `attestationDataRootsBucket` bucket.
func (s *Store) SaveAttestationRecordsForValidators(
ctx context.Context,
attestations []*slashertypes.IndexedAttestationWrapper,
attWrappers []*slashertypes.IndexedAttestationWrapper,
) error {
_, span := trace.StartSpan(ctx, "BeaconDB.SaveAttestationRecordsForValidators")
defer span.End()
encodedTargetEpoch := make([][]byte, len(attestations))
encodedRecords := make([][]byte, len(attestations))
for i, attestation := range attestations {
attWrappersCount := len(attWrappers)
// If no attestations are provided, skip.
if attWrappersCount == 0 {
return nil
}
// Build encoded target epochs and encoded records
encodedTargetEpoch := make([][]byte, attWrappersCount)
encodedRecords := make([][]byte, attWrappersCount)
for i, attestation := range attWrappers {
encEpoch := encodeTargetEpoch(attestation.IndexedAttestation.Data.Target.Epoch)
value, err := encodeAttestationRecord(attestation)
@ -278,60 +291,115 @@ func (s *Store) SaveAttestationRecordsForValidators(
encodedRecords[i] = value
}
return s.db.Update(func(tx *bolt.Tx) error {
attRecordsBkt := tx.Bucket(attestationRecordsBucket)
dataRootsBkt := tx.Bucket(attestationDataRootsBucket)
// Save attestation records in the database by batch.
for stop := attWrappersCount; stop >= 0; stop -= batchSize {
start := max(0, stop-batchSize)
for i := len(attestations) - 1; i >= 0; i-- {
attestation := attestations[i]
attWrappersBatch := attWrappers[start:stop]
encodedTargetEpochBatch := encodedTargetEpoch[start:stop]
encodedRecordsBatch := encodedRecords[start:stop]
if err := attRecordsBkt.Put(attestation.DataRoot[:], encodedRecords[i]); err != nil {
return err
}
for _, valIdx := range attestation.IndexedAttestation.AttestingIndices {
encIdx := encodeValidatorIndex(primitives.ValidatorIndex(valIdx))
key := append(encodedTargetEpoch[i], encIdx...)
if err := dataRootsBkt.Put(key, attestation.DataRoot[:]); err != nil {
return err
}
}
// Perform basic check.
if len(encodedTargetEpochBatch) != len(encodedRecordsBatch) {
return fmt.Errorf(
"cannot save attestation records, got %d target epochs and %d records",
len(encodedTargetEpochBatch), len(encodedRecordsBatch),
)
}
return nil
})
currentBatchSize := len(encodedTargetEpochBatch)
// Save attestation records in the database.
if err := s.db.Update(func(tx *bolt.Tx) error {
attRecordsBkt := tx.Bucket(attestationRecordsBucket)
dataRootsBkt := tx.Bucket(attestationDataRootsBucket)
for i := currentBatchSize - 1; i >= 0; i-- {
attWrapper := attWrappersBatch[i]
dataRoot := attWrapper.DataRoot
encodedTargetEpoch := encodedTargetEpochBatch[i]
encodedRecord := encodedRecordsBatch[i]
if err := attRecordsBkt.Put(dataRoot[:], encodedRecord); err != nil {
return err
}
for _, validatorIndex := range attWrapper.IndexedAttestation.AttestingIndices {
encodedIndex := encodeValidatorIndex(primitives.ValidatorIndex(validatorIndex))
key := append(encodedTargetEpoch, encodedIndex...)
if err := dataRootsBkt.Put(key, dataRoot[:]); err != nil {
return err
}
}
}
return nil
}); err != nil {
return errors.Wrap(err, "failed to save attestation records")
}
}
return nil
}
// LoadSlasherChunks given a chunk kind and a disk keys, retrieves chunks for a validator
// min or max span used by slasher from our database.
func (s *Store) LoadSlasherChunks(
ctx context.Context, kind slashertypes.ChunkKind, diskKeys [][]byte,
ctx context.Context, kind slashertypes.ChunkKind, chunkKeys [][]byte,
) ([][]uint16, []bool, error) {
_, span := trace.StartSpan(ctx, "BeaconDB.LoadSlasherChunk")
defer span.End()
chunks := make([][]uint16, 0)
var exists []bool
err := s.db.View(func(tx *bolt.Tx) error {
bkt := tx.Bucket(slasherChunksBucket)
for _, diskKey := range diskKeys {
key := append(ssz.MarshalUint8(make([]byte, 0), uint8(kind)), diskKey...)
chunkBytes := bkt.Get(key)
if chunkBytes == nil {
chunks = append(chunks, []uint16{})
exists = append(exists, false)
continue
keysCount := len(chunkKeys)
chunks := make([][]uint16, 0, keysCount)
exists := make([]bool, 0, keysCount)
encodedKeys := make([][]byte, 0, keysCount)
// Encode kind.
encodedKind := ssz.MarshalUint8(make([]byte, 0), uint8(kind))
// Encode keys.
for _, chunkKey := range chunkKeys {
encodedKey := append(encodedKind, chunkKey...)
encodedKeys = append(encodedKeys, encodedKey)
}
// Read chunks from the database by batch.
for start := 0; start < keysCount; start += batchSize {
stop := min(start+batchSize, len(encodedKeys))
encodedKeysBatch := encodedKeys[start:stop]
if err := s.db.View(func(tx *bolt.Tx) error {
bkt := tx.Bucket(slasherChunksBucket)
for _, encodedKey := range encodedKeysBatch {
chunkBytes := bkt.Get(encodedKey)
if chunkBytes == nil {
chunks = append(chunks, []uint16{})
exists = append(exists, false)
continue
}
chunk, err := decodeSlasherChunk(chunkBytes)
if err != nil {
return err
}
chunks = append(chunks, chunk)
exists = append(exists, true)
}
chunk, err := decodeSlasherChunk(chunkBytes)
if err != nil {
return err
}
chunks = append(chunks, chunk)
exists = append(exists, true)
return nil
}); err != nil {
return nil, nil, err
}
return nil
})
return chunks, exists, err
}
return chunks, exists, nil
}
// SaveSlasherChunks given a chunk kind, list of disk keys, and list of chunks,
@ -341,25 +409,60 @@ func (s *Store) SaveSlasherChunks(
) error {
_, span := trace.StartSpan(ctx, "BeaconDB.SaveSlasherChunks")
defer span.End()
encodedKeys := make([][]byte, len(chunkKeys))
encodedChunks := make([][]byte, len(chunkKeys))
for i := 0; i < len(chunkKeys); i++ {
encodedKeys[i] = append(ssz.MarshalUint8(make([]byte, 0), uint8(kind)), chunkKeys[i]...)
encodedChunk, err := encodeSlasherChunk(chunks[i])
// Ensure we have the same number of keys and chunks.
if len(chunkKeys) != len(chunks) {
return fmt.Errorf(
"cannot save slasher chunks, got %d keys and %d chunks",
len(chunkKeys), len(chunks),
)
}
chunksCount := len(chunks)
// Encode kind.
encodedKind := ssz.MarshalUint8(make([]byte, 0), uint8(kind))
// Encode keys and chunks.
encodedKeys := make([][]byte, chunksCount)
encodedChunks := make([][]byte, chunksCount)
for i := 0; i < chunksCount; i++ {
chunkKey, chunk := chunkKeys[i], chunks[i]
encodedKey := append(encodedKind, chunkKey...)
encodedChunk, err := encodeSlasherChunk(chunk)
if err != nil {
return err
return errors.Wrapf(err, "failed to encode slasher chunk for key %v", chunkKey)
}
encodedKeys[i] = encodedKey
encodedChunks[i] = encodedChunk
}
return s.db.Update(func(tx *bolt.Tx) error {
bkt := tx.Bucket(slasherChunksBucket)
for i := 0; i < len(chunkKeys); i++ {
if err := bkt.Put(encodedKeys[i], encodedChunks[i]); err != nil {
return err
// Save chunks in the database by batch.
for start := 0; start < chunksCount; start += batchSize {
stop := min(start+batchSize, len(encodedKeys))
encodedKeysBatch := encodedKeys[start:stop]
encodedChunksBatch := encodedChunks[start:stop]
batchSize := len(encodedKeysBatch)
if err := s.db.Update(func(tx *bolt.Tx) error {
bkt := tx.Bucket(slasherChunksBucket)
for i := 0; i < batchSize; i++ {
if err := bkt.Put(encodedKeysBatch[i], encodedChunksBatch[i]); err != nil {
return err
}
}
return nil
}); err != nil {
return errors.Wrap(err, "failed to save slasher chunks")
}
return nil
})
}
return nil
}
// CheckDoubleBlockProposals takes in a list of proposals and for each,

View File

@ -14,33 +14,56 @@ import (
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/testing/assert"
"github.com/prysmaticlabs/prysm/v5/testing/require"
)
func TestStore_AttestationRecordForValidator_SaveRetrieve(t *testing.T) {
ctx := context.Background()
beaconDB := setupDB(t)
valIdx := primitives.ValidatorIndex(1)
target := primitives.Epoch(5)
source := primitives.Epoch(4)
attRecord, err := beaconDB.AttestationRecordForValidator(ctx, valIdx, target)
require.NoError(t, err)
require.Equal(t, true, attRecord == nil)
const attestationsCount = 11_000
sr := [32]byte{1}
err = beaconDB.SaveAttestationRecordsForValidators(
ctx,
[]*slashertypes.IndexedAttestationWrapper{
createAttestationWrapper(source, target, []uint64{uint64(valIdx)}, sr[:]),
},
)
// Create context.
ctx := context.Background()
// Create database.
beaconDB := setupDB(t)
// Define the validator index.
validatorIndex := primitives.ValidatorIndex(1)
// Defines attestations to save and retrieve.
attWrappers := make([]*slashertypes.IndexedAttestationWrapper, attestationsCount)
for i := 0; i < attestationsCount; i++ {
var dataRoot [32]byte
binary.LittleEndian.PutUint64(dataRoot[:], uint64(i))
attWrapper := createAttestationWrapper(
primitives.Epoch(i),
primitives.Epoch(i+1),
[]uint64{uint64(validatorIndex)},
dataRoot[:],
)
attWrappers[i] = attWrapper
}
// Check on a sample of validators that no attestation records are available.
for i := 0; i < attestationsCount; i += 100 {
attRecord, err := beaconDB.AttestationRecordForValidator(ctx, validatorIndex, primitives.Epoch(i+1))
require.NoError(t, err)
require.Equal(t, true, attRecord == nil)
}
// Save the attestation records to the database.
err := beaconDB.SaveAttestationRecordsForValidators(ctx, attWrappers)
require.NoError(t, err)
attRecord, err = beaconDB.AttestationRecordForValidator(ctx, valIdx, target)
require.NoError(t, err)
assert.DeepEqual(t, target, attRecord.IndexedAttestation.Data.Target.Epoch)
assert.DeepEqual(t, source, attRecord.IndexedAttestation.Data.Source.Epoch)
assert.DeepEqual(t, sr, attRecord.DataRoot)
// Check on a sample of validators that attestation records are available.
for i := 0; i < attestationsCount; i += 100 {
expected := attWrappers[i]
actual, err := beaconDB.AttestationRecordForValidator(ctx, validatorIndex, primitives.Epoch(i+1))
require.NoError(t, err)
require.DeepEqual(t, expected.IndexedAttestation.Data.Source.Epoch, actual.IndexedAttestation.Data.Source.Epoch)
}
}
func TestStore_LastEpochWrittenForValidators(t *testing.T) {
@ -138,61 +161,113 @@ func TestStore_CheckAttesterDoubleVotes(t *testing.T) {
}
func TestStore_SlasherChunk_SaveRetrieve(t *testing.T) {
// Define test parameters.
const (
elemsPerChunk = 16
totalChunks = 11_000
)
// Create context.
ctx := context.Background()
// Create database.
beaconDB := setupDB(t)
elemsPerChunk := 16
totalChunks := 64
chunkKeys := make([][]byte, totalChunks)
chunks := make([][]uint16, totalChunks)
// Create min chunk keys and chunks.
minChunkKeys := make([][]byte, totalChunks)
minChunks := make([][]uint16, totalChunks)
for i := 0; i < totalChunks; i++ {
// Create chunk key.
chunkKey := ssz.MarshalUint64(make([]byte, 0), uint64(i))
minChunkKeys[i] = chunkKey
// Create chunk.
chunk := make([]uint16, elemsPerChunk)
for j := 0; j < len(chunk); j++ {
chunk[j] = uint16(0)
chunk[j] = uint16(i + j)
}
chunks[i] = chunk
chunkKeys[i] = ssz.MarshalUint64(make([]byte, 0), uint64(i))
minChunks[i] = chunk
}
// We save chunks for min spans.
err := beaconDB.SaveSlasherChunks(ctx, slashertypes.MinSpan, chunkKeys, chunks)
// Create max chunk keys and chunks.
maxChunkKeys := make([][]byte, totalChunks)
maxChunks := make([][]uint16, totalChunks)
for i := 0; i < totalChunks; i++ {
// Create chunk key.
chunkKey := ssz.MarshalUint64(make([]byte, 0), uint64(i+1))
maxChunkKeys[i] = chunkKey
// Create chunk.
chunk := make([]uint16, elemsPerChunk)
for j := 0; j < len(chunk); j++ {
chunk[j] = uint16(i + j + 1)
}
maxChunks[i] = chunk
}
// Save chunks for min spans.
err := beaconDB.SaveSlasherChunks(ctx, slashertypes.MinSpan, minChunkKeys, minChunks)
require.NoError(t, err)
// We expect no chunks to be stored for max spans.
// Expect no chunks to be stored for max spans.
_, chunksExist, err := beaconDB.LoadSlasherChunks(
ctx, slashertypes.MaxSpan, chunkKeys,
ctx, slashertypes.MaxSpan, minChunkKeys,
)
require.NoError(t, err)
require.Equal(t, len(chunks), len(chunksExist))
require.Equal(t, len(minChunks), len(chunksExist))
for _, exists := range chunksExist {
require.Equal(t, false, exists)
}
// We check we saved the right chunks.
// Check the right chunks are saved.
retrievedChunks, chunksExist, err := beaconDB.LoadSlasherChunks(
ctx, slashertypes.MinSpan, chunkKeys,
ctx, slashertypes.MinSpan, minChunkKeys,
)
require.NoError(t, err)
require.Equal(t, len(chunks), len(retrievedChunks))
require.Equal(t, len(chunks), len(chunksExist))
require.Equal(t, len(minChunks), len(retrievedChunks))
require.Equal(t, len(minChunks), len(chunksExist))
for i, exists := range chunksExist {
require.Equal(t, true, exists)
require.DeepEqual(t, chunks[i], retrievedChunks[i])
require.DeepEqual(t, minChunks[i], retrievedChunks[i])
}
// We save chunks for max spans.
err = beaconDB.SaveSlasherChunks(ctx, slashertypes.MaxSpan, chunkKeys, chunks)
// Save chunks for max spans.
err = beaconDB.SaveSlasherChunks(ctx, slashertypes.MaxSpan, maxChunkKeys, maxChunks)
require.NoError(t, err)
// We check we saved the right chunks.
// Check right chunks are saved.
retrievedChunks, chunksExist, err = beaconDB.LoadSlasherChunks(
ctx, slashertypes.MaxSpan, chunkKeys,
ctx, slashertypes.MaxSpan, maxChunkKeys,
)
require.NoError(t, err)
require.Equal(t, len(chunks), len(retrievedChunks))
require.Equal(t, len(chunks), len(chunksExist))
require.Equal(t, len(maxChunks), len(retrievedChunks))
require.Equal(t, len(maxChunks), len(chunksExist))
for i, exists := range chunksExist {
require.Equal(t, true, exists)
require.DeepEqual(t, chunks[i], retrievedChunks[i])
require.DeepEqual(t, maxChunks[i], retrievedChunks[i])
}
// Check the right chunks are still saved for min span.
retrievedChunks, chunksExist, err = beaconDB.LoadSlasherChunks(
ctx, slashertypes.MinSpan, minChunkKeys,
)
require.NoError(t, err)
require.Equal(t, len(minChunks), len(retrievedChunks))
require.Equal(t, len(minChunks), len(chunksExist))
for i, exists := range chunksExist {
require.Equal(t, true, exists)
require.DeepEqual(t, minChunks[i], retrievedChunks[i])
}
}

View File

@ -4,14 +4,12 @@ import (
"bytes"
"context"
"fmt"
"time"
"github.com/pkg/errors"
slashertypes "github.com/prysmaticlabs/prysm/v5/beacon-chain/slasher/types"
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)
@ -20,8 +18,6 @@ import (
func (s *Service) checkSlashableAttestations(
ctx context.Context, currentEpoch primitives.Epoch, atts []*slashertypes.IndexedAttestationWrapper,
) (map[[fieldparams.RootLength]byte]*ethpb.AttesterSlashing, error) {
start := time.Now()
slashings := map[[fieldparams.RootLength]byte]*ethpb.AttesterSlashing{}
// Double votes
@ -30,8 +26,6 @@ func (s *Service) checkSlashableAttestations(
return nil, errors.Wrap(err, "could not check slashable double votes")
}
log.WithField("elapsed", time.Since(start)).Debug("Done checking double votes")
for root, slashing := range doubleVoteSlashings {
slashings[root] = slashing
}
@ -53,19 +47,6 @@ func (s *Service) checkSlashableAttestations(
slashings[root] = slashing
}
elapsed := time.Since(start)
fields := logrus.Fields{
"numAttestations": len(atts),
"elapsed": elapsed,
}
log.WithFields(fields).Info("Done checking slashable attestations")
if len(slashings) > 0 {
log.WithField("numSlashings", len(slashings)).Warn("Slashable attestation offenses found")
}
return slashings, nil
}
@ -75,10 +56,21 @@ func (s *Service) checkSurroundVotes(
attWrappers []*slashertypes.IndexedAttestationWrapper,
currentEpoch primitives.Epoch,
) (map[[fieldparams.RootLength]byte]*ethpb.AttesterSlashing, error) {
// With 256 validators and 16 epochs per chunk, there is 4096 `uint16` elements per chunk.
// 4096 `uint16` elements = 8192 bytes = 8KB
// 25_600 chunks * 8KB = 200MB
const maxChunkBeforeFlush = 25_600
slashings := map[[fieldparams.RootLength]byte]*ethpb.AttesterSlashing{}
// Group attestation wrappers by validator chunk index.
attWrappersByValidatorChunkIndex := s.groupByValidatorChunkIndex(attWrappers)
attWrappersByValidatorChunkIndexCount := len(attWrappersByValidatorChunkIndex)
minChunkByChunkIndexByValidatorChunkIndex := make(map[uint64]map[uint64]Chunker, attWrappersByValidatorChunkIndexCount)
maxChunkByChunkIndexByValidatorChunkIndex := make(map[uint64]map[uint64]Chunker, attWrappersByValidatorChunkIndexCount)
chunksCounts := 0
for validatorChunkIndex, attWrappers := range attWrappersByValidatorChunkIndex {
minChunkByChunkIndex, err := s.updatedChunkByChunkIndex(ctx, slashertypes.MinSpan, currentEpoch, validatorChunkIndex)
@ -91,6 +83,8 @@ func (s *Service) checkSurroundVotes(
return nil, errors.Wrap(err, "could not update updatedMaxChunks")
}
chunksCounts += len(minChunkByChunkIndex) + len(maxChunkByChunkIndex)
// Group (already grouped by validator chunk index) attestation wrappers by chunk index.
attWrappersByChunkIndex := s.groupByChunkIndex(attWrappers)
@ -114,20 +108,42 @@ func (s *Service) checkSurroundVotes(
slashings[root] = slashing
}
// Save updated chunks into the database.
if err := s.saveUpdatedChunks(ctx, minChunkByChunkIndex, slashertypes.MinSpan, validatorChunkIndex); err != nil {
return nil, errors.Wrap(err, "could not save chunks for min spans")
// Memoize the updated chunks for the current validator chunk index.
minChunkByChunkIndexByValidatorChunkIndex[validatorChunkIndex] = minChunkByChunkIndex
maxChunkByChunkIndexByValidatorChunkIndex[validatorChunkIndex] = maxChunkByChunkIndex
if chunksCounts >= maxChunkBeforeFlush {
// Save the updated chunks to disk if we have reached the maximum number of chunks to store in memory.
if err := s.saveChunksToDisk(ctx, slashertypes.MinSpan, minChunkByChunkIndexByValidatorChunkIndex); err != nil {
return nil, errors.Wrap(err, "could not save updated min chunks to disk")
}
if err := s.saveChunksToDisk(ctx, slashertypes.MaxSpan, maxChunkByChunkIndexByValidatorChunkIndex); err != nil {
return nil, errors.Wrap(err, "could not save updated max chunks to disk")
}
// Reset the chunks counts.
chunksCounts = 0
// Reset memoized chunks.
minChunkByChunkIndexByValidatorChunkIndex = make(map[uint64]map[uint64]Chunker, attWrappersByValidatorChunkIndexCount)
maxChunkByChunkIndexByValidatorChunkIndex = make(map[uint64]map[uint64]Chunker, attWrappersByValidatorChunkIndexCount)
}
if err := s.saveUpdatedChunks(ctx, maxChunkByChunkIndex, slashertypes.MaxSpan, validatorChunkIndex); err != nil {
return nil, errors.Wrap(err, "could not save chunks for max spans")
// Update the latest updated epoch for all validators involved to the current chunk.
indexes := s.params.validatorIndexesInChunk(validatorChunkIndex)
for _, index := range indexes {
s.latestEpochUpdatedForValidator[index] = currentEpoch
}
}
// Update the latest written epoch for all validators involved to the current chunk.
indices := s.params.validatorIndexesInChunk(validatorChunkIndex)
for _, idx := range indices {
s.latestEpochWrittenForValidator[idx] = currentEpoch
}
// Save the updated chunks to disk.
if err := s.saveChunksToDisk(ctx, slashertypes.MinSpan, minChunkByChunkIndexByValidatorChunkIndex); err != nil {
return nil, errors.Wrap(err, "could not save updated min chunks to disk")
}
if err := s.saveChunksToDisk(ctx, slashertypes.MaxSpan, maxChunkByChunkIndexByValidatorChunkIndex); err != nil {
return nil, errors.Wrap(err, "could not save updated max chunks to disk")
}
return slashings, nil
@ -239,7 +255,7 @@ func (s *Service) checkDoubleVotes(
// updatedChunkByChunkIndex loads the chunks from the database for validators corresponding to
// the `validatorChunkIndex`.
// It then updates the chunks with the neutral element for corresponding validators from
// the epoch just after the latest epoch written to the current epoch.
// the epoch just after the latest updated epoch to the current epoch.
// A mapping between chunk index and chunk is returned to the caller.
func (s *Service) updatedChunkByChunkIndex(
ctx context.Context,
@ -247,56 +263,97 @@ func (s *Service) updatedChunkByChunkIndex(
currentEpoch primitives.Epoch,
validatorChunkIndex uint64,
) (map[uint64]Chunker, error) {
chunkByChunkIndex := map[uint64]Chunker{}
// Every validator may have a first epoch to update.
// For a given validator,
// - If it has no latest updated epoch, then the first epoch to update is set to 0.
// - If the latest updated epoch is the current epoch, then there is no epoch to update.
// Thus, then there is no first epoch to update.
// - In all other cases, the first epoch to update is the latest updated epoch + 1.
// minFirstEpochToUpdate is set to the smallest first epoch to update for all validators in the chunk
// corresponding to the `validatorChunkIndex`.
var minFirstEpochToUpdate *primitives.Epoch
neededChunkIndexesMap := map[uint64]bool{}
validatorIndexes := s.params.validatorIndexesInChunk(validatorChunkIndex)
for _, validatorIndex := range validatorIndexes {
// Retrieve the latest epoch written for the validator.
latestEpochWritten, ok := s.latestEpochWrittenForValidator[validatorIndex]
// Start from the epoch just after the latest epoch written.
epochToWrite, err := latestEpochWritten.SafeAdd(1)
// Retrieve the first epoch to write for the validator index.
isAnEpochToUpdate, firstEpochToUpdate, err := s.firstEpochToUpdate(validatorIndex, currentEpoch)
if err != nil {
return nil, errors.Wrap(err, "could not add 1 to latest epoch written")
return nil, errors.Wrapf(err, "could not get first epoch to write for validator index %d with current epoch %d", validatorIndex, currentEpoch)
}
if !ok {
epochToWrite = 0
if !isAnEpochToUpdate {
// If there is no epoch to write, skip.
continue
}
// It is useless to update more than `historyLength` epochs, since
// the chunks are circular and we will be overwritten at least one.
if currentEpoch-epochToWrite >= s.params.historyLength {
epochToWrite = currentEpoch + 1 - s.params.historyLength
// If, for this validator index, the chunk corresponding to the first epoch to write
// (and all following epochs until the current epoch) are already flagged as needed,
// skip.
if minFirstEpochToUpdate != nil && *minFirstEpochToUpdate <= firstEpochToUpdate {
continue
}
for epochToWrite <= currentEpoch {
// Get the chunk index for the latest epoch written.
chunkIndex := s.params.chunkIndex(epochToWrite)
minFirstEpochToUpdate = &firstEpochToUpdate
// Add new needed chunk indexes to the map.
for i := firstEpochToUpdate; i <= currentEpoch; i++ {
chunkIndex := s.params.chunkIndex(i)
neededChunkIndexesMap[chunkIndex] = true
}
}
// Get the list of needed chunk indexes.
neededChunkIndexes := make([]uint64, 0, len(neededChunkIndexesMap))
for chunkIndex := range neededChunkIndexesMap {
neededChunkIndexes = append(neededChunkIndexes, chunkIndex)
}
// Retrieve needed chunks from the database.
chunkByChunkIndex, err := s.loadChunksFromDisk(ctx, validatorChunkIndex, chunkKind, neededChunkIndexes)
if err != nil {
return nil, errors.Wrap(err, "could not load chunks from disk")
}
for _, validatorIndex := range validatorIndexes {
// Retrieve the first epoch to write for the validator index.
isAnEpochToUpdate, firstEpochToUpdate, err := s.firstEpochToUpdate(validatorIndex, currentEpoch)
if err != nil {
return nil, errors.Wrapf(err, "could not get first epoch to write for validator index %d with current epoch %d", validatorIndex, currentEpoch)
}
if !isAnEpochToUpdate {
// If there is no epoch to write, skip.
continue
}
epochToUpdate := firstEpochToUpdate
for epochToUpdate <= currentEpoch {
// Get the chunk index for the ecpoh to write.
chunkIndex := s.params.chunkIndex(epochToUpdate)
// Get the chunk corresponding to the chunk index from the `chunkByChunkIndex` map.
currentChunk, ok := chunkByChunkIndex[chunkIndex]
if !ok {
// If the chunk is not in the map, retrieve it from the database.
currentChunk, err = s.getChunkFromDatabase(ctx, chunkKind, validatorChunkIndex, chunkIndex)
if err != nil {
return nil, errors.Wrap(err, "could not get chunk")
}
return nil, errors.Errorf("chunk at index %d does not exist", chunkIndex)
}
// Update the current chunk with the neutral element for the validator index for the latest epoch written.
for s.params.chunkIndex(epochToWrite) == chunkIndex && epochToWrite <= currentEpoch {
// Update the current chunk with the neutral element for the validator index for the epoch to write.
for s.params.chunkIndex(epochToUpdate) == chunkIndex && epochToUpdate <= currentEpoch {
if err := setChunkRawDistance(
s.params,
currentChunk.Chunk(),
validatorIndex,
epochToWrite,
epochToUpdate,
currentChunk.NeutralElement(),
); err != nil {
return nil, err
}
epochToWrite++
epochToUpdate++
}
chunkByChunkIndex[chunkIndex] = currentChunk
@ -306,6 +363,40 @@ func (s *Service) updatedChunkByChunkIndex(
return chunkByChunkIndex, nil
}
// firstEpochToUpdate, given a validator index and the current epoch, returns a boolean indicating
// if there is an epoch to write. If it is the case, it returns the first epoch to write.
func (s *Service) firstEpochToUpdate(validatorIndex primitives.ValidatorIndex, currentEpoch primitives.Epoch) (bool, primitives.Epoch, error) {
latestEpochUpdated, ok := s.latestEpochUpdatedForValidator[validatorIndex]
// Start from the epoch just after the latest updated epoch.
epochToUpdate, err := latestEpochUpdated.SafeAdd(1)
if err != nil {
return false, primitives.Epoch(0), errors.Wrap(err, "could not add 1 to latest updated epoch")
}
if !ok {
epochToUpdate = 0
}
if latestEpochUpdated == currentEpoch {
// If the latest updated epoch is the current epoch, we do not need to update anything.
return false, primitives.Epoch(0), nil
}
// Latest updated epoch should not be greater than the current epoch.
if latestEpochUpdated > currentEpoch {
return false, primitives.Epoch(0), errors.Errorf("epoch to write `%d` should not be greater than the current epoch `%d`", epochToUpdate, currentEpoch)
}
// It is useless to update more than `historyLength` epochs, since
// the chunks are circular and we will be overwritten at least one.
if currentEpoch-epochToUpdate >= s.params.historyLength {
epochToUpdate = currentEpoch + 1 - s.params.historyLength
}
return true, epochToUpdate, nil
}
// Updates spans and detects any slashable attester offenses along the way.
// 1. Determine the chunks we need to use for updating for the validator indices
// in a validator chunk index, then retrieve those chunks from the database.
@ -487,7 +578,7 @@ func (s *Service) getChunkFromDatabase(
chunkIndex uint64,
) (Chunker, error) {
// We can ensure we load the appropriate chunk we need by fetching from the DB.
diskChunks, err := s.loadChunks(ctx, validatorChunkIndex, chunkKind, []uint64{chunkIndex})
diskChunks, err := s.loadChunksFromDisk(ctx, validatorChunkIndex, chunkKind, []uint64{chunkIndex})
if err != nil {
return nil, errors.Wrapf(err, "could not load chunk at index %d", chunkIndex)
}
@ -502,7 +593,7 @@ func (s *Service) getChunkFromDatabase(
// Load chunks for a specified list of chunk indices. We attempt to load it from the database.
// If the data exists, then we initialize a chunk of a specified kind. Otherwise, we create
// an empty chunk, add it to our map, and then return it to the caller.
func (s *Service) loadChunks(
func (s *Service) loadChunksFromDisk(
ctx context.Context,
validatorChunkIndex uint64,
chunkKind slashertypes.ChunkKind,
@ -511,17 +602,36 @@ func (s *Service) loadChunks(
ctx, span := trace.StartSpan(ctx, "Slasher.loadChunks")
defer span.End()
chunkKeys := make([][]byte, 0, len(chunkIndexes))
for _, chunkIndex := range chunkIndexes {
chunkKeys = append(chunkKeys, s.params.flatSliceID(validatorChunkIndex, chunkIndex))
chunksCount := len(chunkIndexes)
if chunksCount == 0 {
return map[uint64]Chunker{}, nil
}
// Build chunk keys.
chunkKeys := make([][]byte, 0, chunksCount)
for _, chunkIndex := range chunkIndexes {
chunkKey := s.params.flatSliceID(validatorChunkIndex, chunkIndex)
chunkKeys = append(chunkKeys, chunkKey)
}
// Load the chunks from the database.
rawChunks, chunksExist, err := s.serviceCfg.Database.LoadSlasherChunks(ctx, chunkKind, chunkKeys)
if err != nil {
return nil, errors.Wrapf(err, "could not load slasher chunk index")
}
chunksByChunkIdx := make(map[uint64]Chunker, len(rawChunks))
// Perform basic checks.
if len(rawChunks) != chunksCount {
return nil, errors.Errorf("expected %d chunks, got %d", chunksCount, len(rawChunks))
}
if len(chunksExist) != chunksCount {
return nil, errors.Errorf("expected %d chunks exist, got %d", chunksCount, len(chunksExist))
}
// Initialize the chunks.
chunksByChunkIdx := make(map[uint64]Chunker, chunksCount)
for i := 0; i < len(rawChunks); i++ {
// If the chunk exists in the database, we initialize it from the raw bytes data.
// If it does not exist, we initialize an empty chunk.
@ -558,21 +668,35 @@ func (s *Service) loadChunks(
return chunksByChunkIdx, nil
}
// Saves updated chunks to disk given the required database schema.
func (s *Service) saveUpdatedChunks(
func (s *Service) saveChunksToDisk(
ctx context.Context,
updatedChunksByChunkIdx map[uint64]Chunker,
chunkKind slashertypes.ChunkKind,
validatorChunkIndex uint64,
chunkByChunkIndexByValidatorChunkIndex map[uint64]map[uint64]Chunker,
) error {
ctx, span := trace.StartSpan(ctx, "Slasher.saveUpdatedChunks")
ctx, span := trace.StartSpan(ctx, "Slasher.saveChunksToDisk")
defer span.End()
chunkKeys := make([][]byte, 0, len(updatedChunksByChunkIdx))
chunks := make([][]uint16, 0, len(updatedChunksByChunkIdx))
for chunkIdx, chunk := range updatedChunksByChunkIdx {
chunkKeys = append(chunkKeys, s.params.flatSliceID(validatorChunkIndex, chunkIdx))
chunks = append(chunks, chunk.Chunk())
// Compute the total number of chunks to save.
chunksCount := 0
for _, chunkByChunkIndex := range chunkByChunkIndexByValidatorChunkIndex {
chunksCount += len(chunkByChunkIndex)
}
chunksSavedTotal.Add(float64(len(chunks)))
// Create needed arrays.
chunkKeys := make([][]byte, 0, chunksCount)
chunks := make([][]uint16, 0, chunksCount)
// Fill the arrays.
for validatorChunkIndex, chunkByChunkIndex := range chunkByChunkIndexByValidatorChunkIndex {
for chunkIndex, chunk := range chunkByChunkIndex {
chunkKeys = append(chunkKeys, s.params.flatSliceID(validatorChunkIndex, chunkIndex))
chunks = append(chunks, chunk.Chunk())
}
}
// Update prometheus metrics.
chunksSavedTotal.Add(float64(chunksCount))
// Save the chunks to disk.
return s.serviceCfg.Database.SaveSlasherChunks(ctx, chunkKind, chunkKeys, chunks)
}

View File

@ -3,6 +3,7 @@ package slasher
import (
"context"
"fmt"
"math/rand"
"testing"
"time"
@ -833,7 +834,7 @@ func Test_processQueuedAttestations_OverlappingChunkIndices(t *testing.T) {
require.LogsDoNotContain(t, hook, "Could not detect")
}
func Test_epochUpdateForValidators(t *testing.T) {
func Test_updatedChunkByChunkIndex(t *testing.T) {
neutralMin, neutralMax := uint16(65535), uint16(0)
testCases := []struct {
@ -1066,7 +1067,7 @@ func Test_epochUpdateForValidators(t *testing.T) {
historyLength: tt.historyLength,
},
serviceCfg: &ServiceConfig{Database: slasherDB},
latestEpochWrittenForValidator: tt.latestUpdatedEpochByValidatorIndex,
latestEpochUpdatedForValidator: tt.latestUpdatedEpochByValidatorIndex,
}
// Save min initial chunks if they exist.
@ -1076,7 +1077,11 @@ func Test_epochUpdateForValidators(t *testing.T) {
minChunkerByChunkerIndex[chunkIndex] = &MinSpanChunksSlice{data: minChunk}
}
err := service.saveUpdatedChunks(ctx, minChunkerByChunkerIndex, slashertypes.MinSpan, tt.validatorChunkIndex)
minChunkerByChunkerIndexByValidatorChunkerIndex := map[uint64]map[uint64]Chunker{
tt.validatorChunkIndex: minChunkerByChunkerIndex,
}
err := service.saveChunksToDisk(ctx, slashertypes.MinSpan, minChunkerByChunkerIndexByValidatorChunkerIndex)
require.NoError(t, err)
}
@ -1087,7 +1092,11 @@ func Test_epochUpdateForValidators(t *testing.T) {
maxChunkerByChunkerIndex[chunkIndex] = &MaxSpanChunksSlice{data: maxChunk}
}
err := service.saveUpdatedChunks(ctx, maxChunkerByChunkerIndex, slashertypes.MaxSpan, tt.validatorChunkIndex)
maxChunkerByChunkerIndexByValidatorChunkerIndex := map[uint64]map[uint64]Chunker{
tt.validatorChunkIndex: maxChunkerByChunkerIndex,
}
err := service.saveChunksToDisk(ctx, slashertypes.MaxSpan, maxChunkerByChunkerIndexByValidatorChunkerIndex)
require.NoError(t, err)
}
@ -1269,7 +1278,7 @@ func testLoadChunks(t *testing.T, kind slashertypes.ChunkKind) {
emptyChunk = EmptyMaxSpanChunksSlice(defaultParams)
}
chunkIdx := uint64(2)
received, err := s.loadChunks(ctx, 0, kind, []uint64{chunkIdx})
received, err := s.loadChunksFromDisk(ctx, 0, kind, []uint64{chunkIdx})
require.NoError(t, err)
wanted := map[uint64]Chunker{
chunkIdx: emptyChunk,
@ -1301,15 +1310,15 @@ func testLoadChunks(t *testing.T, kind slashertypes.ChunkKind) {
4: existingChunk,
6: existingChunk,
}
err = s.saveUpdatedChunks(
ctx,
updatedChunks,
kind,
0, // validatorChunkIndex
)
chunkByChunkIndexByValidatorChunkIndex := map[uint64]map[uint64]Chunker{
0: updatedChunks,
}
err = s.saveChunksToDisk(ctx, kind, chunkByChunkIndexByValidatorChunkIndex)
require.NoError(t, err)
// Check if the retrieved chunks match what we just saved to disk.
received, err = s.loadChunks(ctx, 0, kind, []uint64{2, 4, 6})
received, err = s.loadChunksFromDisk(ctx, 0, kind, []uint64{2, 4, 6})
require.NoError(t, err)
require.DeepEqual(t, updatedChunks, received)
}
@ -1351,7 +1360,54 @@ func TestService_processQueuedAttestations(t *testing.T) {
tickerChan <- 1
cancel()
s.wg.Wait()
assert.LogsContain(t, hook, "Processing queued")
assert.LogsContain(t, hook, "Start processing queued attestations")
assert.LogsContain(t, hook, "Done processing queued attestations")
}
func Benchmark_saveChunksToDisk(b *testing.B) {
// Define the parameters.
const (
chunkKind = slashertypes.MinSpan
validatorsChunksCount = 6000 // Corresponds to 1_536_000 validators x 256 validators / chunk
chunkIndex uint64 = 13
validatorChunkIndex uint64 = 42
)
params := DefaultParams()
// Get a context.
ctx := context.Background()
chunkByChunkIndexByValidatorChunkIndex := make(map[uint64]map[uint64]Chunker, validatorsChunksCount)
// Populate the chunkers.
for i := 0; i < validatorsChunksCount; i++ {
data := make([]uint16, params.chunkSize)
for j := 0; j < int(params.chunkSize); j++ {
data[j] = uint16(rand.Intn(1 << 16))
}
chunker := map[uint64]Chunker{chunkIndex: &MinSpanChunksSlice{params: params, data: data}}
chunkByChunkIndexByValidatorChunkIndex[uint64(i)] = chunker
}
// Initialize the slasher database.
slasherDB := dbtest.SetupSlasherDB(b)
// Initialize the slasher service.
service, err := New(ctx, &ServiceConfig{Database: slasherDB})
require.NoError(b, err)
// Reset the benchmark timer.
b.ResetTimer()
// Run the benchmark.
for i := 0; i < b.N; i++ {
b.StartTimer()
err = service.saveChunksToDisk(ctx, slashertypes.MinSpan, chunkByChunkIndexByValidatorChunkIndex)
b.StopTimer()
require.NoError(b, err)
}
}
func BenchmarkCheckSlashableAttestations(b *testing.B) {
@ -1444,6 +1500,66 @@ func runAttestationsBenchmark(b *testing.B, s *Service, numAtts, numValidators u
}
}
func Benchmark_checkSurroundVotes(b *testing.B) {
const (
// Approximatively the number of Holesky active validators on 2024-02-16
// This number is both a multiple of 32 (the number of slots per epoch) and 256 (the number of validators per chunk)
validatorsCount = 1_638_400
slotsPerEpoch = 32
targetEpoch = 42
sourceEpoch = 43
currentEpoch = 43
)
// Create a context.
ctx := context.Background()
// Initialize the slasher database.
slasherDB := dbtest.SetupSlasherDB(b)
// Initialize the slasher service.
service, err := New(ctx, &ServiceConfig{Database: slasherDB})
require.NoError(b, err)
// Create the attesting validators indexes.
// The best case scenario would be to have all validators attesting for a slot with contiguous indexes.
// So for 1_638_400 validators with 32 slots per epoch, we would have 48_000 attestation wrappers per slot.
// With 256 validators per chunk, we would have only 188 modified chunks.
//
// In this benchmark, we use the worst case scenario where attestating validators are evenly splitted across all validators chunks.
// We also suppose that only one chunk per validator chunk index is modified.
// For one given validator index, multiple chunk indexes could be modified.
//
// With 1_638_400 validators we have 6400 chunks. If exactly 8 validators per chunks attest, we have:
// 6_400 chunks * 8 = 51_200 validators attesting per slot. And 51_200 validators * 32 slots = 1_638_400
// attesting validators per epoch.
// ==> Attesting validator indexes will be computed as follows:
// validator chunk index 0 validator chunk index 1 validator chunk index 6_399
// [0, 32, 64, 96, 128, 160, 192, 224 | 256, 288, 320, 352, 384, 416, 448, 480 | ... | ..., 1_638_606, 1_638_368, 1_638_400]
//
attestingValidatorsCount := validatorsCount / slotsPerEpoch
validatorIndexes := make([]uint64, attestingValidatorsCount)
for i := 0; i < attestingValidatorsCount; i++ {
validatorIndexes[i] = 32 * uint64(i)
}
// Create the attestation wrapper.
// This benchmark assume that all validators produced the exact same head, source and target votes.
attWrapper := createAttestationWrapperEmptySig(b, sourceEpoch, targetEpoch, validatorIndexes, nil)
attWrappers := []*slashertypes.IndexedAttestationWrapper{attWrapper}
// Run the benchmark.
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StartTimer()
_, err = service.checkSurroundVotes(ctx, attWrappers, currentEpoch)
b.StopTimer()
require.NoError(b, err)
}
}
// createAttestationWrapperEmptySig creates an attestation wrapper with source and target,
// for validators with indices, and a beacon block root (corresponding to the head vote).
// For source and target epochs, the corresponding root is null.

View File

@ -1,45 +1,142 @@
// nolint:dupword
//
// Package slasher defines an optimized implementation of Ethereum proof-of-stake slashing
// detection, namely focused on catching "surround vote" slashable
// offenses as explained here: https://blog.ethereum.org/2020/01/13/validated-staking-on-eth2-1-incentives/.
//
// Surround vote detection is a difficult problem if done naively, as slasher
// needs to keep track of every single attestation by every single validator
// in the network and be ready to efficiently detect whether incoming attestations
// are slashable with respect to older ones. To do this, the Sigma Prime team
// created an elaborate design document: https://hackmd.io/@sproul/min-max-slasher
// offering an optimal solution.
//
// Attesting histories are kept for each validator in two separate arrays known
// as min and max spans, which are explained in our design document:
// https://hackmd.io/@prysmaticlabs/slasher.
//
// A regular pair of min and max spans for a validator look as follows
// with length = H where H is the amount of epochs worth of history
// we want to persist for slashing detection.
//
// validator_1_min_span = [2, 2, 2, ..., 2]
// validator_1_max_span = [0, 0, 0, ..., 0]
//
// Instead of always dealing with length H arrays, which can be prohibitively
// expensive to handle in memory, we split these arrays into chunks of length C.
// For C = 3, for example, the 0th chunk of validator 1's min and max spans would look
// as follows:
//
// validator_1_min_span_chunk_0 = [2, 2, 2]
// validator_1_max_span_chunk_0 = [2, 2, 2]
//
// Next, on disk, we take chunks for K validators, and store them as flat slices.
// For example, if H = 3, C = 3, and K = 3, then we can store 3 validators' chunks as a flat
// slice as follows:
//
// val0 val1 val2
// | | |
// { } { } { }
// [2, 2, 2, 2, 2, 2, 2, 2, 2]
//
// This is known as 2D chunking, pioneered by the Sigma Prime team here:
// https://hackmd.io/@sproul/min-max-slasher. The parameters H, C, and K will be
// used extensively throughout this package.
/*
Package slasher defines an optimized implementation of Ethereum proof-of-stake slashing
detection, namely focused on catching "surround vote" slashable
offenses as explained here: https://blog.ethereum.org/2020/01/13/validated-staking-on-eth2-1-incentives/.
Surround vote detection is a difficult problem if done naively, as slasher
needs to keep track of every single attestation by every single validator
in the network and be ready to efficiently detect whether incoming attestations
are slashable with respect to older ones. To do this, the Sigma Prime team
created an elaborate design document: https://hackmd.io/@sproul/min-max-slasher
offering an optimal solution.
Attesting histories are kept for each validator in two separate arrays known
as min and max spans, which are explained in our design document:
https://hackmd.io/@prysmaticlabs/slasher.
This is known as 2D chunking, pioneered by the Sigma Prime team here:
https://hackmd.io/@sproul/min-max-slasher. The parameters H, C, and K will be
used extensively throughout this package.
Attestations are represented as following: `<source epoch>====><target epoch>`
N: Number of epochs worth of history we want to keep for each validator.
In the following example:
- N = 4096
- Validators 257 and 258 have some attestations
- All other validators have no attestations
For MIN SPAN, `∞“ is actually set to the max `uint16` value: 65535
validator 257 : 8193=======>8195 8196=>8197=============>8200 8204=>8205=>8206=>8207=========>8209=>8210=>8211=>8212=>8213=>8214 8219=>8220 8221=>8222
validator 258 : 8193=======>8196=>8197=>8198=>8199=>8200=>8201=======>8203=>8204=>8205=>8206=>8207===>8208=>8209=>8210=>8211=>8212=>8213=>8214=>8215=>8216=>8217=>8218=>8219=>8220=>8221
/----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------\
| MIN SPAN | kN+0 kN+1 kN+2 kN+3 kN+4 kN+5 kN+6 kN+7 kN+8 kN+9 kN+10 kN+11 kN+12 kN+13 kN+14 kN+15 | kN+16 kN+17 kN+18 kN+19 kN+20 kN+21 kN+22 kN+23 kN+24 kN+25 kN+26 kN+27 kN+28 kN+29 kN+30 kN+31 | ... | (k+1)N-16 (k+1)N-15 (k+1)N-14 (k+1)N-13 (k+1)N-12 (k+1)N-11 (k+1)N-10 (k+1)N-9 (k+1)N-8 (k+1)N-7 (k+1)N-6 (k+1)N-5 (k+1)N-4 (k+1)N-3 (k+1)N-2 (k+1)N-1 |
|-------------------+-------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------|
| validator 0 | | | ... | |
| validator 1 | | | ... | |
| validator 2 | | | ... | |
| ................. | ............................................................................................... | ............................................................................................... | ... | .............................................................................................................................................................. |
| validator 254 | | | ... | |
| validator 255 | | | ... | |
|-------------------+-------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------|
| validator 256 | | | ... | |
| validator 257 | 3 4 3 2 4 8 7 6 5 4 3 2 2 2 3 3 | 2 2 2 2 2 7 6 5 4 3 2 3 2 | ... | |
| validator 258 | 4 3 3 2 2 2 2 2 3 3 2 2 2 2 2 2 | 2 2 2 2 2 2 2 2 2 2 2 2 | ... | |
| ................. | ............................................................................................... | ............................................................................................... | ... | .............................................................................................................................................................. |
| validator 510 | | | ... | |
| validator 511 | | | ... | |
|-------------------+-------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------|
| ................. | ............................................................................................... | ............................................................................................... | ... | .............................................................................................................................................................. |
|-------------------+-------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------|
| validator M - 256 | | | ... | |
| validator M - 255 | | | ... | |
| ................. | ............................................................................................... | ............................................................................................... | ... | .............................................................................................................................................................. |
| validator M - 1 | | | ... | |
\----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------/
/----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------\
| MAX SPAN | kN+0 kN+1 kN+2 kN+3 kN+4 kN+5 kN+6 kN+7 kN+8 kN+9 kN+10 kN+11 kN+12 kN+13 kN+14 kN+15 | kN+16 kN+17 kN+18 kN+19 kN+20 kN+21 kN+22 kN+23 kN+24 kN+25 kN+26 kN+27 kN+28 kN+29 kN+30 kN+31 | ... | (k+1)N-16 (k+1)N-15 (k+1)N-14 (k+1)N-13 (k+1)N-12 (k+1)N-11 (k+1)N-10 (k+1)N-9 (k+1)N-8 (k+1)N-7 (k+1)N-6 (k+1)N-5 (k+1)N-4 (k+1)N-3 (k+1)N-2 (k+1)N-1 |
|-------------------+-------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------|
| validator 0 | 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 | 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 | ... | 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 |
| validator 1 | 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 | 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 | ... | 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 |
| validator 2 | 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 | 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 | ... | 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 |
| ................. | ............................................................................................... | ............................................................................................... | ... | .............................................................................................................................................................. |
| validator 14 | 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 | 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 | ... | 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 |
| validator 15 | 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 | 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 | ... | 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 |
| ------------------+-------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------|
| validator 256 | 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 | 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 | ... | 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 |
| validator 257 | 0 0 1 0 0 0 2 1 0 0 0 0 0 0 0 0 | 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 | ... | 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 |
| validator 258 | 0 0 0 1 0 0 0 0 0 0 1 0 0 0 0 0 | 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 | ... | 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 |
| ................. | ............................................................................................... | ............................................................................................... | ... | .............................................................................................................................................................. |
| validator 510 | 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 | 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 | ... | 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 |
| validator 511 | 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 | 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 | ... | 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 |
| ------------------+-------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------|
| ................. | ............................................................................................... | ............................................................................................... | ... | .............................................................................................................................................................. |
| ------------------+-------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------|
| validator M - 256 | 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 | 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 | ... | 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 |
| validator M - 255 | 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 | 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 | ... | 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 |
| ................. | ............................................................................................... | ............................................................................................... | ... | .............................................................................................................................................................. |
| validator M - 1 | 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 | 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 | ... | 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 |
\ ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------/
How to know if an incoming attestation will surround a pre-existing one?
------------------------------------------------------------------------
Example with an incoming attestation 8197====>8199 for validator 257.
- First, we retrieve the MIN SPAN value for the source epoch of the incoming attestation (here the source epoch is 8197). We get the value 8.
- Then, for the incoming attestation, we compute `target - source`. We get the value 8199 - 8197 = 2.
- 8 >= 2, so the incoming attestation will NOT surround any pre-existing one.
Example with an incoming attestation 8202====>8206 for validator 257.
- First, we retrieve the MIN SPAN value for the source epoch of the incoming attestation (here the source epoch is 8202). We get the value 3.
- Then, for the incoming attestation, we compute `target - source`. We get the value 8206 - 8202 = 4.
- 3 < 4, so the incoming attestation will surround a pre-existing one. (In this precise case, it will surround 8204=>8205)
How to know if an incoming attestation will be surrounded by a pre-existing one?
--------------------------------------------------------------------------------
Example with an incoming attestation 8197====>8199 for validator 257.
- First, we retrieve the MAX SPAN value for the source epoch of the incoming attestation (here the source epoch is 8197). We get the value 0.
- Then, for the incoming attestation, we compute `target - source`. We get the value 8199 - 8197 = 2.
- 0 <= 2, so the incoming attestation will NOT be surrounded by any pre-existing one.
Example with an incoming attestation 8198====>8199 for validator 257.
- First, we retrieve the MAX SPAN value for the source epoch of the incoming attestation (here the source epoch is 8198). We get the value 2.
- Then, for the incoming attestation, we compute `target - source`. We get the value 8199 - 8198 = 1.
- 2 > 1, so the incoming attestation will be surrounded by a pre-existing one. (In this precise case, it will be surrounded by 8197=>8200)
Data are stored on disk by chunk.
For example: For MIN SPAN, validators 256 to 511 included, epochs 8208 to 8223 included, the corresponding chunk is:
/---------------------------------------------------------------------------------------------------------------------\
| MIN SPAN | kN+16 kN+17 kN+18 kN+19 kN+20 kN+21 kN+22 kN+23 kN+24 kN+25 kN+26 kN+27 kN+28 kN+29 kN+30 kN+31 |
|-------------------+-------------------------------------------------------------------------------------------------|
| validator 256 | |
| validator 257 | 2 2 2 2 2 7 6 5 4 3 2 3 2 |
| validator 258 | 2 2 2 2 2 2 2 2 2 2 2 2 |
| ................. | ............................................................................................... |
| validator 510 | |
| validator 511 | |
\---------------------------------------------------------------------------------------------------------------------/
Chunks are stored into the database a flat array of bytes.
For this example, the stored value will be:
| validator 256 | validator 257 | validator 258 |...| validator 510 | validator 511 |
[,,,,,,,,,,,,,,,,2,2,2,2,2,7,6,5,4,3,2,3,2,,,,2,2,2,2,2,2,2,2,2,2,2,2,,,,,...,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,]
A chunk contains 256 validators * 16 epochs = 4096 values.
A chunk value is stored on 2 bytes (uint16).
==> A chunk takes 8192 bytes = 8KB
There is 4096 epochs / 16 epochs per chunk = 256 chunks per batch of 256 validators.
Storing all values fo a batch of 256 validators takes 256 * 8KB = 2MB
With 1_048_576 validators, we need 4096 * 2MB = 8GB
Storing both MIN and MAX spans for 1_048_576 validators takes 16GB.
Each chunk is stored snappy-compressed in the database.
If all validators attest ideally, a MIN SPAN chunk will contain only `2`s, and and MAX SPAN chunk will contain only `0`s.
This will compress very well, and will let us store a lot of data in a small amount of space.
*/
package slasher

View File

@ -129,7 +129,7 @@ func (s *Service) processAttestations(
validAttestationsCount := len(validAttestations)
validInFutureAttestationsCount := len(validInFutureAttestations)
// Log useful infrormation
// Log useful information.
log.WithFields(logrus.Fields{
"currentSlot": currentSlot,
"currentEpoch": currentEpoch,
@ -137,7 +137,9 @@ func (s *Service) processAttestations(
"numDeferredAtts": validInFutureAttestationsCount,
"numDroppedAtts": numDropped,
"attsQueueSize": queuedAttestationsCount,
}).Info("Processing queued attestations for slashing detection")
}).Info("Start processing queued attestations")
start := time.Now()
// Check for attestatinos slashings (double, sourrounding, surrounded votes).
slashings, err := s.checkSlashableAttestations(ctx, currentEpoch, validAttestations)
@ -154,6 +156,13 @@ func (s *Service) processAttestations(
return nil
}
end := time.Since(start)
log.WithField("elapsed", end).Info("Done processing queued attestations")
if len(slashings) > 0 {
log.WithField("numSlashings", len(slashings)).Warn("Slashable attestation offenses found")
}
return processedAttesterSlashings
}

View File

@ -58,7 +58,7 @@ type Service struct {
attsSlotTicker *slots.SlotTicker
blocksSlotTicker *slots.SlotTicker
pruningSlotTicker *slots.SlotTicker
latestEpochWrittenForValidator map[primitives.ValidatorIndex]primitives.Epoch
latestEpochUpdatedForValidator map[primitives.ValidatorIndex]primitives.Epoch
wg sync.WaitGroup
}
@ -74,7 +74,7 @@ func New(ctx context.Context, srvCfg *ServiceConfig) (*Service, error) {
blksQueue: newBlocksQueue(),
ctx: ctx,
cancel: cancel,
latestEpochWrittenForValidator: make(map[primitives.ValidatorIndex]primitives.Epoch),
latestEpochUpdatedForValidator: make(map[primitives.ValidatorIndex]primitives.Epoch),
}, nil
}
@ -111,7 +111,7 @@ func (s *Service) run() {
return
}
for _, item := range epochsByValidator {
s.latestEpochWrittenForValidator[item.ValidatorIndex] = item.Epoch
s.latestEpochUpdatedForValidator[item.ValidatorIndex] = item.Epoch
}
log.WithField("elapsed", time.Since(start)).Info(
"Finished retrieving last epoch written per validator",
@ -162,7 +162,7 @@ func (s *Service) Stop() error {
defer innerCancel()
log.Info("Flushing last epoch written for each validator to disk, please wait")
if err := s.serviceCfg.Database.SaveLastEpochsWrittenForValidators(
ctx, s.latestEpochWrittenForValidator,
ctx, s.latestEpochUpdatedForValidator,
); err != nil {
log.Error(err)
}