mirror of
https://gitlab.com/pulsechaincom/prysm-pulse.git
synced 2025-01-19 00:04:12 +00:00
3d3b9d1217
* Add specdocs static code analyzer * docs pulling script * update content pulling script * add test * better parsing of incoming docs * update test * implements analyzer * separate tool * remove analyzer code * cleanup * deep source fixes * untrack raw specs files * add back phase0 defs * update spec texts * re-arrange code * updated spec list * cleanup * more comments and readme * add merkle proofs specs * add extra.md * mark wrong length issue * update readme * update readme * remove non-def snippets * update comment * check numrows * ignore last empty line Co-authored-by: Raul Jordan <raul@prysmaticlabs.com>
6.8 KiB
6.8 KiB
def check_if_validator_active(state: BeaconState, validator_index: ValidatorIndex) -> bool:
validator = state.validators[validator_index]
return is_active_validator(validator, get_current_epoch(state))
def get_committee_assignment(state: BeaconState,
epoch: Epoch,
validator_index: ValidatorIndex
) -> Optional[Tuple[Sequence[ValidatorIndex], CommitteeIndex, Slot]]:
"""
Return the committee assignment in the ``epoch`` for ``validator_index``.
``assignment`` returned is a tuple of the following form:
* ``assignment[0]`` is the list of validators in the committee
* ``assignment[1]`` is the index to which the committee is assigned
* ``assignment[2]`` is the slot at which the committee is assigned
Return None if no assignment.
"""
next_epoch = Epoch(get_current_epoch(state) + 1)
assert epoch <= next_epoch
start_slot = compute_start_slot_at_epoch(epoch)
committee_count_per_slot = get_committee_count_per_slot(state, epoch)
for slot in range(start_slot, start_slot + SLOTS_PER_EPOCH):
for index in range(committee_count_per_slot):
committee = get_beacon_committee(state, Slot(slot), CommitteeIndex(index))
if validator_index in committee:
return committee, CommitteeIndex(index), Slot(slot)
return None
def is_proposer(state: BeaconState, validator_index: ValidatorIndex) -> bool:
return get_beacon_proposer_index(state) == validator_index
def get_epoch_signature(state: BeaconState, block: BeaconBlock, privkey: int) -> BLSSignature:
domain = get_domain(state, DOMAIN_RANDAO, compute_epoch_at_slot(block.slot))
signing_root = compute_signing_root(compute_epoch_at_slot(block.slot), domain)
return bls.Sign(privkey, signing_root)
def compute_time_at_slot(state: BeaconState, slot: Slot) -> uint64:
return uint64(state.genesis_time + slot * SECONDS_PER_SLOT)
def voting_period_start_time(state: BeaconState) -> uint64:
eth1_voting_period_start_slot = Slot(state.slot - state.slot % (EPOCHS_PER_ETH1_VOTING_PERIOD * SLOTS_PER_EPOCH))
return compute_time_at_slot(state, eth1_voting_period_start_slot)
def is_candidate_block(block: Eth1Block, period_start: uint64) -> bool:
return (
block.timestamp + SECONDS_PER_ETH1_BLOCK * ETH1_FOLLOW_DISTANCE <= period_start
and block.timestamp + SECONDS_PER_ETH1_BLOCK * ETH1_FOLLOW_DISTANCE * 2 >= period_start
)
def get_eth1_vote(state: BeaconState, eth1_chain: Sequence[Eth1Block]) -> Eth1Data:
period_start = voting_period_start_time(state)
# `eth1_chain` abstractly represents all blocks in the eth1 chain sorted by ascending block height
votes_to_consider = [
get_eth1_data(block) for block in eth1_chain
if (
is_candidate_block(block, period_start)
# Ensure cannot move back to earlier deposit contract states
and get_eth1_data(block).deposit_count >= state.eth1_data.deposit_count
)
]
# Valid votes already cast during this period
valid_votes = [vote for vote in state.eth1_data_votes if vote in votes_to_consider]
# Default vote on latest eth1 block data in the period range unless eth1 chain is not live
# Non-substantive casting for linter
state_eth1_data: Eth1Data = state.eth1_data
default_vote = votes_to_consider[len(votes_to_consider) - 1] if any(votes_to_consider) else state_eth1_data
return max(
valid_votes,
key=lambda v: (valid_votes.count(v), -valid_votes.index(v)), # Tiebreak by smallest distance
default=default_vote
)
def compute_new_state_root(state: BeaconState, block: BeaconBlock) -> Root:
temp_state: BeaconState = state.copy()
signed_block = SignedBeaconBlock(message=block)
state_transition(temp_state, signed_block, validate_result=False)
return hash_tree_root(temp_state)
def get_block_signature(state: BeaconState, block: BeaconBlock, privkey: int) -> BLSSignature:
domain = get_domain(state, DOMAIN_BEACON_PROPOSER, compute_epoch_at_slot(block.slot))
signing_root = compute_signing_root(block, domain)
return bls.Sign(privkey, signing_root)
def get_attestation_signature(state: BeaconState, attestation_data: AttestationData, privkey: int) -> BLSSignature:
domain = get_domain(state, DOMAIN_BEACON_ATTESTER, attestation_data.target.epoch)
signing_root = compute_signing_root(attestation_data, domain)
return bls.Sign(privkey, signing_root)
def compute_subnet_for_attestation(committees_per_slot: uint64, slot: Slot, committee_index: CommitteeIndex) -> uint64:
"""
Compute the correct subnet for an attestation for Phase 0.
Note, this mimics expected future behavior where attestations will be mapped to their shard subnet.
"""
slots_since_epoch_start = uint64(slot % SLOTS_PER_EPOCH)
committees_since_epoch_start = committees_per_slot * slots_since_epoch_start
return uint64((committees_since_epoch_start + committee_index) % ATTESTATION_SUBNET_COUNT)
def get_slot_signature(state: BeaconState, slot: Slot, privkey: int) -> BLSSignature:
domain = get_domain(state, DOMAIN_SELECTION_PROOF, compute_epoch_at_slot(slot))
signing_root = compute_signing_root(slot, domain)
return bls.Sign(privkey, signing_root)
def is_aggregator(state: BeaconState, slot: Slot, index: CommitteeIndex, slot_signature: BLSSignature) -> bool:
committee = get_beacon_committee(state, slot, index)
modulo = max(1, len(committee) // TARGET_AGGREGATORS_PER_COMMITTEE)
return bytes_to_uint64(hash(slot_signature)[0:8]) % modulo == 0
def get_aggregate_signature(attestations: Sequence[Attestation]) -> BLSSignature:
signatures = [attestation.signature for attestation in attestations]
return bls.Aggregate(signatures)
def get_aggregate_and_proof(state: BeaconState,
aggregator_index: ValidatorIndex,
aggregate: Attestation,
privkey: int) -> AggregateAndProof:
return AggregateAndProof(
aggregator_index=aggregator_index,
aggregate=aggregate,
selection_proof=get_slot_signature(state, aggregate.data.slot, privkey),
)
def get_aggregate_and_proof_signature(state: BeaconState,
aggregate_and_proof: AggregateAndProof,
privkey: int) -> BLSSignature:
aggregate = aggregate_and_proof.aggregate
domain = get_domain(state, DOMAIN_AGGREGATE_AND_PROOF, compute_epoch_at_slot(aggregate.data.slot))
signing_root = compute_signing_root(aggregate_and_proof, domain)
return bls.Sign(privkey, signing_root)