package slasherkv import ( "bytes" "context" "encoding/binary" "fmt" "sort" "sync" ssz "github.com/ferranbt/fastssz" "github.com/golang/snappy" "github.com/pkg/errors" types "github.com/prysmaticlabs/eth2-types" ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" slashertypes "github.com/prysmaticlabs/prysm/beacon-chain/slasher/types" slashpb "github.com/prysmaticlabs/prysm/proto/beacon/rpc/v1" "github.com/prysmaticlabs/prysm/shared/bytesutil" bolt "go.etcd.io/bbolt" "go.opencensus.io/trace" "golang.org/x/sync/errgroup" ) const ( // Signing root (32 bytes) + fastsum64(attesting_indices) (8 bytes). attestationRecordKeySize = 40 signingRootSize = 32 // Bytes. ) // LastEpochWrittenForValidators given a list of validator indices returns the latest // epoch we have recorded the validators writing data for. func (s *Store) LastEpochWrittenForValidators( ctx context.Context, validatorIndices []types.ValidatorIndex, ) ([]*slashertypes.AttestedEpochForValidator, error) { ctx, span := trace.StartSpan(ctx, "BeaconDB.LastEpochWrittenForValidators") defer span.End() attestedEpochs := make([]*slashertypes.AttestedEpochForValidator, 0) encodedIndices := make([][]byte, len(validatorIndices)) for i, valIdx := range validatorIndices { encodedIndices[i] = encodeValidatorIndex(valIdx) } err := s.db.View(func(tx *bolt.Tx) error { bkt := tx.Bucket(attestedEpochsByValidator) for i, encodedIndex := range encodedIndices { epochBytes := bkt.Get(encodedIndex) if epochBytes != nil { var epoch types.Epoch if err := epoch.UnmarshalSSZ(epochBytes); err != nil { return err } attestedEpochs = append(attestedEpochs, &slashertypes.AttestedEpochForValidator{ ValidatorIndex: validatorIndices[i], Epoch: epoch, }) } } return nil }) return attestedEpochs, err } // SaveLastEpochWrittenForValidators updates the latest epoch a slice // of validator indices has attested to. func (s *Store) SaveLastEpochWrittenForValidators( ctx context.Context, validatorIndices []types.ValidatorIndex, epoch types.Epoch, ) error { ctx, span := trace.StartSpan(ctx, "BeaconDB.SaveLastEpochWrittenForValidators") defer span.End() encodedIndices := make([][]byte, len(validatorIndices)) for i, valIdx := range validatorIndices { encodedIndices[i] = encodeValidatorIndex(valIdx) } encodedEpoch, err := epoch.MarshalSSZ() if err != nil { return err } return s.db.Update(func(tx *bolt.Tx) error { bkt := tx.Bucket(attestedEpochsByValidator) for _, encodedIndex := range encodedIndices { if err = bkt.Put(encodedIndex, encodedEpoch); err != nil { return err } } return nil }) } // CheckAttesterDoubleVotes retries any slashable double votes that exist // for a series of input attestations. func (s *Store) CheckAttesterDoubleVotes( ctx context.Context, attestations []*slashertypes.IndexedAttestationWrapper, ) ([]*slashertypes.AttesterDoubleVote, error) { ctx, span := trace.StartSpan(ctx, "BeaconDB.CheckAttesterDoubleVotes") defer span.End() doubleVotes := make([]*slashertypes.AttesterDoubleVote, 0) doubleVotesMu := sync.Mutex{} eg, egctx := errgroup.WithContext(ctx) for _, att := range attestations { // Copy the iteration instance to a local variable to give each go-routine its own copy to play with. // See https://golang.org/doc/faq#closures_and_goroutines for more details. attToProcess := att // process every attestation parallelly. eg.Go(func() error { err := s.db.View(func(tx *bolt.Tx) error { signingRootsBkt := tx.Bucket(attestationDataRootsBucket) encEpoch := encodeTargetEpoch(attToProcess.IndexedAttestation.Data.Target.Epoch) localDoubleVotes := make([]*slashertypes.AttesterDoubleVote, 0) for _, valIdx := range attToProcess.IndexedAttestation.AttestingIndices { encIdx := encodeValidatorIndex(types.ValidatorIndex(valIdx)) validatorEpochKey := append(encEpoch, encIdx...) encExistingAttRecord := signingRootsBkt.Get(validatorEpochKey) if encExistingAttRecord == nil { continue } existingSigningRoot := bytesutil.ToBytes32(encExistingAttRecord[:signingRootSize]) if existingSigningRoot != attToProcess.SigningRoot { existingAttRecord, err := decodeAttestationRecord(encExistingAttRecord[signingRootSize:]) if err != nil { return err } slashAtt := &slashertypes.AttesterDoubleVote{ ValidatorIndex: types.ValidatorIndex(valIdx), Target: attToProcess.IndexedAttestation.Data.Target.Epoch, PrevAttestationWrapper: existingAttRecord, AttestationWrapper: attToProcess, } localDoubleVotes = append(localDoubleVotes, slashAtt) } } // if any routine is cancelled, then cancel this routine too select { case <-egctx.Done(): return egctx.Err() default: } // if there are any doible votes in this attestation, add it to the global double votes if len(localDoubleVotes) > 0 { doubleVotesMu.Lock() defer doubleVotesMu.Unlock() doubleVotes = append(doubleVotes, localDoubleVotes...) } return nil }) return err }) } return doubleVotes, eg.Wait() } // AttestationRecordForValidator given a validator index and a target epoch, // retrieves an existing attestation record we have stored in the database. func (s *Store) AttestationRecordForValidator( ctx context.Context, validatorIdx types.ValidatorIndex, targetEpoch types.Epoch, ) (*slashertypes.IndexedAttestationWrapper, error) { ctx, span := trace.StartSpan(ctx, "BeaconDB.AttestationRecordForValidator") defer span.End() var record *slashertypes.IndexedAttestationWrapper encIdx := encodeValidatorIndex(validatorIdx) encEpoch := encodeTargetEpoch(targetEpoch) key := append(encEpoch, encIdx...) err := s.db.View(func(tx *bolt.Tx) error { signingRootsBkt := tx.Bucket(attestationDataRootsBucket) indexedAttBytes := signingRootsBkt.Get(key) if indexedAttBytes == nil { return nil } decoded, err := decodeAttestationRecord(indexedAttBytes[signingRootSize:]) if err != nil { return err } record = decoded return nil }) return record, err } // SaveAttestationRecordsForValidators saves attestation records for the specified indices. func (s *Store) SaveAttestationRecordsForValidators( ctx context.Context, attestations []*slashertypes.IndexedAttestationWrapper, ) error { ctx, span := trace.StartSpan(ctx, "BeaconDB.SaveAttestationRecordsForValidators") defer span.End() encodedTargetEpoch := make([][]byte, len(attestations)) encodedRecords := make([][]byte, len(attestations)) encodedIndices := make([][]byte, len(attestations)) for i, att := range attestations { encEpoch := encodeTargetEpoch(att.IndexedAttestation.Data.Target.Epoch) value, err := encodeAttestationRecord(att) if err != nil { return err } indicesBytes := make([]byte, len(att.IndexedAttestation.AttestingIndices)*8) for _, idx := range att.IndexedAttestation.AttestingIndices { encodedIdx := encodeValidatorIndex(types.ValidatorIndex(idx)) indicesBytes = append(indicesBytes, encodedIdx...) } encodedIndices[i] = indicesBytes encodedTargetEpoch[i] = encEpoch encodedRecords[i] = value } return s.db.Update(func(tx *bolt.Tx) error { signingRootsBkt := tx.Bucket(attestationDataRootsBucket) for i, att := range attestations { for _, valIdx := range att.IndexedAttestation.AttestingIndices { encIdx := encodeValidatorIndex(types.ValidatorIndex(valIdx)) key := append(encodedTargetEpoch[i], encIdx...) // signature is prefixed so that double votest can be checked without decoding attestations // and save some cpu cycles value := append(att.SigningRoot[:], encodedRecords[i]...) if err := signingRootsBkt.Put(key, value); err != nil { return err } } } return nil }) } // LoadSlasherChunks given a chunk kind and a disk keys, retrieves chunks for a validator // min or max span used by slasher from our database. func (s *Store) LoadSlasherChunks( ctx context.Context, kind slashertypes.ChunkKind, diskKeys [][]byte, ) ([][]uint16, []bool, error) { ctx, span := trace.StartSpan(ctx, "BeaconDB.LoadSlasherChunk") defer span.End() chunks := make([][]uint16, 0) var exists []bool err := s.db.View(func(tx *bolt.Tx) error { bkt := tx.Bucket(slasherChunksBucket) for _, diskKey := range diskKeys { key := append(ssz.MarshalUint8(make([]byte, 0), uint8(kind)), diskKey...) chunkBytes := bkt.Get(key) if chunkBytes == nil { chunks = append(chunks, []uint16{}) exists = append(exists, false) continue } chunk, err := decodeSlasherChunk(chunkBytes) if err != nil { return err } chunks = append(chunks, chunk) exists = append(exists, true) } return nil }) return chunks, exists, err } // SaveSlasherChunks given a chunk kind, list of disk keys, and list of chunks, // saves the chunks to our database for use by slasher in slashing detection. func (s *Store) SaveSlasherChunks( ctx context.Context, kind slashertypes.ChunkKind, chunkKeys [][]byte, chunks [][]uint16, ) error { ctx, span := trace.StartSpan(ctx, "BeaconDB.SaveSlasherChunks") defer span.End() encodedKeys := make([][]byte, len(chunkKeys)) encodedChunks := make([][]byte, len(chunkKeys)) for i := 0; i < len(chunkKeys); i++ { encodedKeys[i] = append(ssz.MarshalUint8(make([]byte, 0), uint8(kind)), chunkKeys[i]...) encodedChunks[i] = encodeSlasherChunk(chunks[i]) } return s.db.Update(func(tx *bolt.Tx) error { bkt := tx.Bucket(slasherChunksBucket) for i := 0; i < len(chunkKeys); i++ { if err := bkt.Put(encodedKeys[i], encodedChunks[i]); err != nil { return err } } return nil }) } // CheckDoubleBlockProposals takes in a list of proposals and for each, // checks if there already exists a proposal at the same slot+validatorIndex combination. If so, // We check if the existing signing root is not-empty and is different than the incoming // proposal signing root. If so, we return a double block proposal object. func (s *Store) CheckDoubleBlockProposals( ctx context.Context, proposals []*slashertypes.SignedBlockHeaderWrapper, ) ([]*ethpb.ProposerSlashing, error) { ctx, span := trace.StartSpan(ctx, "BeaconDB.CheckDoubleBlockProposals") defer span.End() proposerSlashings := make([]*ethpb.ProposerSlashing, 0, len(proposals)) err := s.db.View(func(tx *bolt.Tx) error { bkt := tx.Bucket(proposalRecordsBucket) for _, proposal := range proposals { key, err := keyForValidatorProposal(proposal) if err != nil { return err } encExistingProposalWrapper := bkt.Get(key) if len(encExistingProposalWrapper) < signingRootSize { continue } existingSigningRoot := bytesutil.ToBytes32(encExistingProposalWrapper[:signingRootSize]) if existingSigningRoot != proposal.SigningRoot { existingProposalWrapper, err := decodeProposalRecord(encExistingProposalWrapper) if err != nil { return err } proposerSlashings = append(proposerSlashings, ðpb.ProposerSlashing{ Header_1: existingProposalWrapper.SignedBeaconBlockHeader, Header_2: proposal.SignedBeaconBlockHeader, }) } } return nil }) return proposerSlashings, err } // SaveBlockProposals takes in a list of block proposals and saves them to our // proposal records bucket in the database. func (s *Store) SaveBlockProposals( ctx context.Context, proposals []*slashertypes.SignedBlockHeaderWrapper, ) error { ctx, span := trace.StartSpan(ctx, "BeaconDB.SaveBlockProposals") defer span.End() encodedKeys := make([][]byte, len(proposals)) encodedProposals := make([][]byte, len(proposals)) for i, proposal := range proposals { key, err := keyForValidatorProposal(proposal) if err != nil { return err } enc, err := encodeProposalRecord(proposal) if err != nil { return err } encodedKeys[i] = key encodedProposals[i] = enc } return s.db.Update(func(tx *bolt.Tx) error { bkt := tx.Bucket(proposalRecordsBucket) for i := range proposals { if err := bkt.Put(encodedKeys[i], encodedProposals[i]); err != nil { return err } } return nil }) } // HighestAttestations retrieves the last attestation data from the database for all indices. func (s *Store) HighestAttestations( ctx context.Context, indices []types.ValidatorIndex, ) ([]*slashpb.HighestAttestation, error) { if len(indices) == 0 { return nil, nil } // Sort indices to keep DB interactions short. sort.SliceStable(indices, func(i, j int) bool { return uint64(indices[i]) < uint64(indices[j]) }) var err error encodedIndices := make([][]byte, len(indices)) for i, valIdx := range indices { encodedIndices[i] = encodeValidatorIndex(valIdx) } history := make([]*slashpb.HighestAttestation, 0, len(encodedIndices)) err = s.db.View(func(tx *bolt.Tx) error { signingRootsBkt := tx.Bucket(attestationDataRootsBucket) for i := 0; i < len(encodedIndices); i++ { c := signingRootsBkt.Cursor() for k, v := c.Last(); k != nil; k, v = c.Prev() { if suffixForAttestationRecordsKey(k, encodedIndices[i]) { encodedAttRecord := v[signingRootSize:] if encodedAttRecord == nil { continue } attWrapper, decodeErr := decodeAttestationRecord(encodedAttRecord) if decodeErr != nil { return decodeErr } highestAtt := &slashpb.HighestAttestation{ ValidatorIndex: uint64(indices[i]), HighestSourceEpoch: attWrapper.IndexedAttestation.Data.Source.Epoch, HighestTargetEpoch: attWrapper.IndexedAttestation.Data.Target.Epoch, } history = append(history, highestAtt) break } } } return nil }) return history, err } func suffixForAttestationRecordsKey(key, encodedValidatorIndex []byte) bool { encIdx := key[8:] return bytes.Equal(encIdx, encodedValidatorIndex) } // Disk key for a validator proposal, including a slot+validatorIndex as a byte slice. func keyForValidatorProposal(proposal *slashertypes.SignedBlockHeaderWrapper) ([]byte, error) { encSlot, err := proposal.SignedBeaconBlockHeader.Header.Slot.MarshalSSZ() if err != nil { return nil, err } encValidatorIdx := encodeValidatorIndex(proposal.SignedBeaconBlockHeader.Header.ProposerIndex) return append(encSlot, encValidatorIdx...), nil } func encodeSlasherChunk(chunk []uint16) []byte { val := make([]byte, 0) for i := 0; i < len(chunk); i++ { val = append(val, ssz.MarshalUint16(make([]byte, 0), chunk[i])...) } return snappy.Encode(nil, val) } func decodeSlasherChunk(enc []byte) ([]uint16, error) { chunkBytes, err := snappy.Decode(nil, enc) if err != nil { return nil, err } chunk := make([]uint16, 0) for i := 0; i < len(chunkBytes); i += 2 { distance := ssz.UnmarshallUint16(chunkBytes[i : i+2]) chunk = append(chunk, distance) } return chunk, nil } // Decode attestation record from bytes. func encodeAttestationRecord(att *slashertypes.IndexedAttestationWrapper) ([]byte, error) { if att == nil || att.IndexedAttestation == nil { return []byte{}, errors.New("nil proposal record") } encodedAtt, err := att.IndexedAttestation.MarshalSSZ() if err != nil { return nil, err } compressedAtt := snappy.Encode(nil, encodedAtt) return append(att.SigningRoot[:], compressedAtt...), nil } // Decode attestation record from bytes. func decodeAttestationRecord(encoded []byte) (*slashertypes.IndexedAttestationWrapper, error) { if len(encoded) < signingRootSize { return nil, fmt.Errorf("wrong length for encoded attestation record, want 32, got %d", len(encoded)) } signingRoot := encoded[:signingRootSize] decodedAtt := ðpb.IndexedAttestation{} decodedAttBytes, err := snappy.Decode(nil, encoded[signingRootSize:]) if err != nil { return nil, err } if err := decodedAtt.UnmarshalSSZ(decodedAttBytes); err != nil { return nil, err } return &slashertypes.IndexedAttestationWrapper{ IndexedAttestation: decodedAtt, SigningRoot: bytesutil.ToBytes32(signingRoot), }, nil } func encodeProposalRecord(blkHdr *slashertypes.SignedBlockHeaderWrapper) ([]byte, error) { if blkHdr == nil || blkHdr.SignedBeaconBlockHeader == nil { return []byte{}, errors.New("nil proposal record") } encodedHdr, err := blkHdr.SignedBeaconBlockHeader.MarshalSSZ() if err != nil { return nil, err } compressedHdr := snappy.Encode(nil, encodedHdr) return append(blkHdr.SigningRoot[:], compressedHdr...), nil } func decodeProposalRecord(encoded []byte) (*slashertypes.SignedBlockHeaderWrapper, error) { if len(encoded) < signingRootSize { return nil, fmt.Errorf( "wrong length for encoded proposal record, want %d, got %d", signingRootSize, len(encoded), ) } signingRoot := encoded[:signingRootSize] decodedBlkHdr := ðpb.SignedBeaconBlockHeader{} decodedHdrBytes, err := snappy.Decode(nil, encoded[signingRootSize:]) if err != nil { return nil, err } if err := decodedBlkHdr.UnmarshalSSZ(decodedHdrBytes); err != nil { return nil, err } return &slashertypes.SignedBlockHeaderWrapper{ SignedBeaconBlockHeader: decodedBlkHdr, SigningRoot: bytesutil.ToBytes32(signingRoot), }, nil } // Encodes an epoch into little-endian bytes. func encodeTargetEpoch(epoch types.Epoch) []byte { buf := make([]byte, 8) binary.LittleEndian.PutUint64(buf, uint64(epoch)) return buf } // Encodes a validator index using 5 bytes instead of 8 as a // client optimization to save space in the database. Because the max validator // registry size is 2**40, this is a safe optimization. func encodeValidatorIndex(index types.ValidatorIndex) []byte { buf := make([]byte, 5) v := uint64(index) buf[0] = byte(v) buf[1] = byte(v >> 8) buf[2] = byte(v >> 16) buf[3] = byte(v >> 24) buf[4] = byte(v >> 32) return buf }