lighthouse-pulse/slasher/src/slasher.rs
Michael Sproul 3b61ac9cbf Optimise slasher DB layout and switch to MDBX (#2776)
## Issue Addressed

Closes #2286
Closes #2538
Closes #2342

## Proposed Changes

Part II of major slasher optimisations after #2767

These changes will be backwards-incompatible due to the move to MDBX (and the schema change) 😱 

* [x] Shrink attester keys from 16 bytes to 7 bytes.
* [x] Shrink attester records from 64 bytes to 6 bytes.
* [x] Separate `DiskConfig` from regular `Config`.
* [x] Add configuration for the LRU cache size.
* [x] Add a "migration" that deletes any legacy LMDB database.
2021-12-21 08:23:17 +00:00

356 lines
12 KiB
Rust

use crate::batch_stats::{AttestationStats, BatchStats, BlockStats};
use crate::metrics::{
self, SLASHER_NUM_ATTESTATIONS_DEFERRED, SLASHER_NUM_ATTESTATIONS_DROPPED,
SLASHER_NUM_ATTESTATIONS_STORED_PER_BATCH, SLASHER_NUM_ATTESTATIONS_VALID,
SLASHER_NUM_BLOCKS_PROCESSED,
};
use crate::{
array, AttestationBatch, AttestationQueue, AttesterRecord, BlockQueue, Config, Error,
IndexedAttestationId, ProposerSlashingStatus, RwTransaction, SimpleBatch, SlasherDB,
};
use parking_lot::Mutex;
use slog::{debug, error, info, Logger};
use std::collections::HashSet;
use std::sync::Arc;
use types::{
AttesterSlashing, Epoch, EthSpec, IndexedAttestation, ProposerSlashing, SignedBeaconBlockHeader,
};
#[derive(Debug)]
pub struct Slasher<E: EthSpec> {
db: SlasherDB<E>,
attestation_queue: AttestationQueue<E>,
block_queue: BlockQueue,
attester_slashings: Mutex<HashSet<AttesterSlashing<E>>>,
proposer_slashings: Mutex<HashSet<ProposerSlashing>>,
config: Arc<Config>,
log: Logger,
}
impl<E: EthSpec> Slasher<E> {
pub fn open(config: Config, log: Logger) -> Result<Self, Error> {
config.validate()?;
let config = Arc::new(config);
let db = SlasherDB::open(config.clone(), log.clone())?;
let attester_slashings = Mutex::new(HashSet::new());
let proposer_slashings = Mutex::new(HashSet::new());
let attestation_queue = AttestationQueue::default();
let block_queue = BlockQueue::default();
Ok(Self {
db,
attestation_queue,
block_queue,
attester_slashings,
proposer_slashings,
config,
log,
})
}
/// Harvest all attester slashings found, removing them from the slasher.
pub fn get_attester_slashings(&self) -> HashSet<AttesterSlashing<E>> {
std::mem::take(&mut self.attester_slashings.lock())
}
/// Harvest all proposer slashings found, removing them from the slasher.
pub fn get_proposer_slashings(&self) -> HashSet<ProposerSlashing> {
std::mem::take(&mut self.proposer_slashings.lock())
}
pub fn config(&self) -> &Config {
&self.config
}
pub fn log(&self) -> &Logger {
&self.log
}
/// Accept an attestation from the network and queue it for processing.
pub fn accept_attestation(&self, attestation: IndexedAttestation<E>) {
self.attestation_queue.queue(attestation);
}
/// Accept a block from the network and queue it for processing.
pub fn accept_block_header(&self, block_header: SignedBeaconBlockHeader) {
self.block_queue.queue(block_header);
}
/// Apply queued blocks and attestations to the on-disk database, and detect slashings!
pub fn process_queued(&self, current_epoch: Epoch) -> Result<BatchStats, Error> {
let mut txn = self.db.begin_rw_txn()?;
let block_stats = self.process_blocks(&mut txn)?;
let attestation_stats = self.process_attestations(current_epoch, &mut txn)?;
txn.commit()?;
Ok(BatchStats {
block_stats,
attestation_stats,
})
}
/// Apply queued blocks to the on-disk database.
///
/// Return the number of blocks
pub fn process_blocks(&self, txn: &mut RwTransaction<'_>) -> Result<BlockStats, Error> {
let blocks = self.block_queue.dequeue();
let num_processed = blocks.len();
let mut slashings = vec![];
metrics::set_gauge(&SLASHER_NUM_BLOCKS_PROCESSED, blocks.len() as i64);
for block in blocks {
if let ProposerSlashingStatus::DoubleVote(slashing) =
self.db.check_or_insert_block_proposal(txn, block)?
{
slashings.push(*slashing);
}
}
let num_slashings = slashings.len();
if !slashings.is_empty() {
info!(
self.log,
"Found {} new proposer slashings!",
slashings.len(),
);
self.proposer_slashings.lock().extend(slashings);
}
Ok(BlockStats {
num_processed,
num_slashings,
})
}
/// Apply queued attestations to the on-disk database.
pub fn process_attestations(
&self,
current_epoch: Epoch,
txn: &mut RwTransaction<'_>,
) -> Result<AttestationStats, Error> {
let snapshot = self.attestation_queue.dequeue();
let num_processed = snapshot.len();
// Filter attestations for relevance.
let (snapshot, deferred, num_dropped) = self.validate(snapshot, current_epoch);
let num_valid = snapshot.len();
let num_deferred = deferred.len();
self.attestation_queue.requeue(deferred);
debug!(
self.log,
"Pre-processing attestations for slasher";
"num_valid" => num_valid,
"num_deferred" => num_deferred,
"num_dropped" => num_dropped,
);
metrics::set_gauge(&SLASHER_NUM_ATTESTATIONS_VALID, num_valid as i64);
metrics::set_gauge(&SLASHER_NUM_ATTESTATIONS_DEFERRED, num_deferred as i64);
metrics::set_gauge(&SLASHER_NUM_ATTESTATIONS_DROPPED, num_dropped as i64);
// De-duplicate attestations and sort by validator index.
let mut batch = AttestationBatch::default();
for indexed_record in snapshot {
batch.queue(indexed_record);
}
// Insert relevant attestations into database.
let mut num_stored = 0;
for weak_record in &batch.attestations {
if let Some(indexed_record) = weak_record.upgrade() {
let indexed_attestation_id = self.db.store_indexed_attestation(
txn,
indexed_record.record.indexed_attestation_hash,
&indexed_record.indexed,
)?;
indexed_record.set_id(indexed_attestation_id);
// Prime the attestation data root LRU cache.
self.db.cache_attestation_data_root(
IndexedAttestationId::new(indexed_attestation_id),
indexed_record.record.attestation_data_hash,
);
num_stored += 1;
}
}
debug!(
self.log,
"Stored attestations in slasher DB";
"num_stored" => num_stored,
"num_valid" => num_valid,
);
metrics::set_gauge(
&SLASHER_NUM_ATTESTATIONS_STORED_PER_BATCH,
num_stored as i64,
);
// Group attestations into chunked batches and process them.
let grouped_attestations = batch.group_by_validator_chunk_index(&self.config);
for (subqueue_id, subqueue) in grouped_attestations.subqueues.into_iter().enumerate() {
self.process_batch(txn, subqueue_id, subqueue, current_epoch)?;
}
metrics::set_gauge(
&metrics::SLASHER_ATTESTATION_ROOT_CACHE_SIZE,
self.db.attestation_root_cache_size() as i64,
);
Ok(AttestationStats { num_processed })
}
/// Process a batch of attestations for a range of validator indices.
fn process_batch(
&self,
txn: &mut RwTransaction<'_>,
subqueue_id: usize,
batch: SimpleBatch<E>,
current_epoch: Epoch,
) -> Result<(), Error> {
// First, check for double votes.
for attestation in &batch {
let indexed_attestation_id = IndexedAttestationId::new(attestation.get_id());
match self.check_double_votes(
txn,
subqueue_id,
&attestation.indexed,
&attestation.record,
indexed_attestation_id,
) {
Ok(slashings) => {
if !slashings.is_empty() {
info!(
self.log,
"Found {} new double-vote slashings!",
slashings.len()
);
}
self.attester_slashings.lock().extend(slashings);
}
Err(e) => {
error!(
self.log,
"Error checking for double votes";
"error" => format!("{:?}", e)
);
return Err(e);
}
}
}
// Then check for surrounds using the min-max arrays.
match array::update(
&self.db,
txn,
subqueue_id,
batch,
current_epoch,
&self.config,
) {
Ok(slashings) => {
if !slashings.is_empty() {
info!(
self.log,
"Found {} new surround slashings!",
slashings.len()
);
}
self.attester_slashings.lock().extend(slashings);
}
Err(e) => {
error!(
self.log,
"Error processing array update";
"error" => format!("{:?}", e),
);
return Err(e);
}
}
Ok(())
}
/// Check for double votes from all validators on `attestation` who match the `subqueue_id`.
fn check_double_votes(
&self,
txn: &mut RwTransaction<'_>,
subqueue_id: usize,
attestation: &IndexedAttestation<E>,
attester_record: &AttesterRecord,
indexed_attestation_id: IndexedAttestationId,
) -> Result<HashSet<AttesterSlashing<E>>, Error> {
let mut slashings = HashSet::new();
for validator_index in self
.config
.attesting_validators_in_chunk(attestation, subqueue_id)
{
let slashing_status = self.db.check_and_update_attester_record(
txn,
validator_index,
attestation,
attester_record,
indexed_attestation_id,
)?;
if let Some(slashing) = slashing_status.into_slashing(attestation) {
debug!(
self.log,
"Found double-vote slashing";
"validator_index" => validator_index,
"epoch" => slashing.attestation_1.data.target.epoch,
);
slashings.insert(slashing);
}
}
Ok(slashings)
}
/// Validate the attestations in `batch` for ingestion during `current_epoch`.
///
/// Drop any attestations that are too old to ever be relevant, and return any attestations
/// that might be valid in the future.
///
/// Returns `(valid, deferred, num_dropped)`.
fn validate(
&self,
batch: SimpleBatch<E>,
current_epoch: Epoch,
) -> (SimpleBatch<E>, SimpleBatch<E>, usize) {
let mut keep = Vec::with_capacity(batch.len());
let mut defer = vec![];
let mut drop_count = 0;
for indexed_record in batch {
let attestation = &indexed_record.indexed;
let target_epoch = attestation.data.target.epoch;
let source_epoch = attestation.data.source.epoch;
if source_epoch > target_epoch
|| source_epoch + self.config.history_length as u64 <= current_epoch
{
drop_count += 1;
continue;
}
// Check that the attestation's target epoch is acceptable, and defer it
// if it's not.
if target_epoch > current_epoch {
defer.push(indexed_record);
} else {
// Otherwise the attestation is OK to process.
keep.push(indexed_record);
}
}
(keep, defer, drop_count)
}
/// Prune unnecessary attestations and blocks from the on-disk database.
pub fn prune_database(&self, current_epoch: Epoch) -> Result<(), Error> {
self.db.prune(current_epoch)
}
}