```python def check_if_validator_active(state: BeaconState, validator_index: ValidatorIndex) -> bool: validator = state.validators[validator_index] return is_active_validator(validator, get_current_epoch(state)) ``` ```python def get_committee_assignment(state: BeaconState, epoch: Epoch, validator_index: ValidatorIndex ) -> Optional[Tuple[Sequence[ValidatorIndex], CommitteeIndex, Slot]]: """ Return the committee assignment in the ``epoch`` for ``validator_index``. ``assignment`` returned is a tuple of the following form: * ``assignment[0]`` is the list of validators in the committee * ``assignment[1]`` is the index to which the committee is assigned * ``assignment[2]`` is the slot at which the committee is assigned Return None if no assignment. """ next_epoch = Epoch(get_current_epoch(state) + 1) assert epoch <= next_epoch start_slot = compute_start_slot_at_epoch(epoch) committee_count_per_slot = get_committee_count_per_slot(state, epoch) for slot in range(start_slot, start_slot + SLOTS_PER_EPOCH): for index in range(committee_count_per_slot): committee = get_beacon_committee(state, Slot(slot), CommitteeIndex(index)) if validator_index in committee: return committee, CommitteeIndex(index), Slot(slot) return None ``` ```python def is_proposer(state: BeaconState, validator_index: ValidatorIndex) -> bool: return get_beacon_proposer_index(state) == validator_index ``` ```python def get_epoch_signature(state: BeaconState, block: BeaconBlock, privkey: int) -> BLSSignature: domain = get_domain(state, DOMAIN_RANDAO, compute_epoch_at_slot(block.slot)) signing_root = compute_signing_root(compute_epoch_at_slot(block.slot), domain) return bls.Sign(privkey, signing_root) ``` ```python def compute_time_at_slot(state: BeaconState, slot: Slot) -> uint64: return uint64(state.genesis_time + slot * SECONDS_PER_SLOT) ``` ```python def voting_period_start_time(state: BeaconState) -> uint64: eth1_voting_period_start_slot = Slot(state.slot - state.slot % (EPOCHS_PER_ETH1_VOTING_PERIOD * SLOTS_PER_EPOCH)) return compute_time_at_slot(state, eth1_voting_period_start_slot) ``` ```python def is_candidate_block(block: Eth1Block, period_start: uint64) -> bool: return ( block.timestamp + SECONDS_PER_ETH1_BLOCK * ETH1_FOLLOW_DISTANCE <= period_start and block.timestamp + SECONDS_PER_ETH1_BLOCK * ETH1_FOLLOW_DISTANCE * 2 >= period_start ) ``` ```python def get_eth1_vote(state: BeaconState, eth1_chain: Sequence[Eth1Block]) -> Eth1Data: period_start = voting_period_start_time(state) # `eth1_chain` abstractly represents all blocks in the eth1 chain sorted by ascending block height votes_to_consider = [ get_eth1_data(block) for block in eth1_chain if ( is_candidate_block(block, period_start) # Ensure cannot move back to earlier deposit contract states and get_eth1_data(block).deposit_count >= state.eth1_data.deposit_count ) ] # Valid votes already cast during this period valid_votes = [vote for vote in state.eth1_data_votes if vote in votes_to_consider] # Default vote on latest eth1 block data in the period range unless eth1 chain is not live # Non-substantive casting for linter state_eth1_data: Eth1Data = state.eth1_data default_vote = votes_to_consider[len(votes_to_consider) - 1] if any(votes_to_consider) else state_eth1_data return max( valid_votes, key=lambda v: (valid_votes.count(v), -valid_votes.index(v)), # Tiebreak by smallest distance default=default_vote ) ``` ```python def compute_new_state_root(state: BeaconState, block: BeaconBlock) -> Root: temp_state: BeaconState = state.copy() signed_block = SignedBeaconBlock(message=block) state_transition(temp_state, signed_block, validate_result=False) return hash_tree_root(temp_state) ``` ```python def get_block_signature(state: BeaconState, block: BeaconBlock, privkey: int) -> BLSSignature: domain = get_domain(state, DOMAIN_BEACON_PROPOSER, compute_epoch_at_slot(block.slot)) signing_root = compute_signing_root(block, domain) return bls.Sign(privkey, signing_root) ``` ```python def get_attestation_signature(state: BeaconState, attestation_data: AttestationData, privkey: int) -> BLSSignature: domain = get_domain(state, DOMAIN_BEACON_ATTESTER, attestation_data.target.epoch) signing_root = compute_signing_root(attestation_data, domain) return bls.Sign(privkey, signing_root) ``` ```python def compute_subnet_for_attestation(committees_per_slot: uint64, slot: Slot, committee_index: CommitteeIndex) -> uint64: """ Compute the correct subnet for an attestation for Phase 0. Note, this mimics expected future behavior where attestations will be mapped to their shard subnet. """ slots_since_epoch_start = uint64(slot % SLOTS_PER_EPOCH) committees_since_epoch_start = committees_per_slot * slots_since_epoch_start return uint64((committees_since_epoch_start + committee_index) % ATTESTATION_SUBNET_COUNT) ``` ```python def get_slot_signature(state: BeaconState, slot: Slot, privkey: int) -> BLSSignature: domain = get_domain(state, DOMAIN_SELECTION_PROOF, compute_epoch_at_slot(slot)) signing_root = compute_signing_root(slot, domain) return bls.Sign(privkey, signing_root) ``` ```python def is_aggregator(state: BeaconState, slot: Slot, index: CommitteeIndex, slot_signature: BLSSignature) -> bool: committee = get_beacon_committee(state, slot, index) modulo = max(1, len(committee) // TARGET_AGGREGATORS_PER_COMMITTEE) return bytes_to_uint64(hash(slot_signature)[0:8]) % modulo == 0 ``` ```python def get_aggregate_signature(attestations: Sequence[Attestation]) -> BLSSignature: signatures = [attestation.signature for attestation in attestations] return bls.Aggregate(signatures) ``` ```python def get_aggregate_and_proof(state: BeaconState, aggregator_index: ValidatorIndex, aggregate: Attestation, privkey: int) -> AggregateAndProof: return AggregateAndProof( aggregator_index=aggregator_index, aggregate=aggregate, selection_proof=get_slot_signature(state, aggregate.data.slot, privkey), ) ``` ```python def get_aggregate_and_proof_signature(state: BeaconState, aggregate_and_proof: AggregateAndProof, privkey: int) -> BLSSignature: aggregate = aggregate_and_proof.aggregate domain = get_domain(state, DOMAIN_AGGREGATE_AND_PROOF, compute_epoch_at_slot(aggregate.data.slot)) signing_root = compute_signing_root(aggregate_and_proof, domain) return bls.Sign(privkey, signing_root) ```