mirror of
https://gitlab.com/pulsechaincom/staking-deposit-cli.git
synced 2024-12-23 11:57:19 +00:00
Merge branch 'master' into hwwhww/tox
This commit is contained in:
commit
dd390b6463
@ -1,7 +1,7 @@
|
|||||||
import os
|
import os
|
||||||
import time
|
import time
|
||||||
import json
|
import json
|
||||||
from typing import List
|
from typing import Dict, List
|
||||||
from py_ecc.bls import G2ProofOfPossession as bls
|
from py_ecc.bls import G2ProofOfPossession as bls
|
||||||
|
|
||||||
from eth2deposit.key_handling.key_derivation.path import mnemonic_and_path_to_key
|
from eth2deposit.key_handling.key_derivation.path import mnemonic_and_path_to_key
|
||||||
@ -9,6 +9,7 @@ from eth2deposit.key_handling.keystore import (
|
|||||||
Keystore,
|
Keystore,
|
||||||
ScryptKeystore,
|
ScryptKeystore,
|
||||||
)
|
)
|
||||||
|
from eth2deposit.utils.constants import BLS_WITHDRAWAL_PREFIX
|
||||||
from eth2deposit.utils.crypto import SHA256
|
from eth2deposit.utils.crypto import SHA256
|
||||||
from eth2deposit.utils.ssz import (
|
from eth2deposit.utils.ssz import (
|
||||||
compute_domain,
|
compute_domain,
|
||||||
@ -26,14 +27,20 @@ class ValidatorCredentials:
|
|||||||
self.amount = amount
|
self.amount = amount
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def signing_pk(self):
|
def signing_pk(self) -> bytes:
|
||||||
return bls.PrivToPub(self.signing_sk)
|
return bls.PrivToPub(self.signing_sk)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def withdrawal_pk(self):
|
def withdrawal_pk(self) -> bytes:
|
||||||
return bls.PrivToPub(self.withdrawal_sk)
|
return bls.PrivToPub(self.withdrawal_sk)
|
||||||
|
|
||||||
def signing_keystore(self, password: str) -> ScryptKeystore:
|
@property
|
||||||
|
def withdrawal_credentials(self) -> bytes:
|
||||||
|
withdrawal_credentials = BLS_WITHDRAWAL_PREFIX
|
||||||
|
withdrawal_credentials += SHA256(self.withdrawal_pk)[1:]
|
||||||
|
return withdrawal_credentials
|
||||||
|
|
||||||
|
def signing_keystore(self, password: str) -> Keystore:
|
||||||
secret = self.signing_sk.to_bytes(32, 'big')
|
secret = self.signing_sk.to_bytes(32, 'big')
|
||||||
return ScryptKeystore.encrypt(secret=secret, password=password, path=self.signing_key_path)
|
return ScryptKeystore.encrypt(secret=secret, password=password, path=self.signing_key_path)
|
||||||
|
|
||||||
@ -76,12 +83,12 @@ def sign_deposit_data(deposit_data: DepositMessage, sk: int) -> Deposit:
|
|||||||
return signed_deposit_data
|
return signed_deposit_data
|
||||||
|
|
||||||
|
|
||||||
def export_deposit_data_json(*, credentials: List[ValidatorCredentials], folder: str):
|
def export_deposit_data_json(*, credentials: List[ValidatorCredentials], folder: str) -> str:
|
||||||
deposit_data: List[dict] = []
|
deposit_data: List[Dict[bytes, bytes]] = []
|
||||||
for credential in credentials:
|
for credential in credentials:
|
||||||
deposit_datum = DepositMessage(
|
deposit_datum = DepositMessage(
|
||||||
pubkey=credential.signing_pk,
|
pubkey=credential.signing_pk,
|
||||||
withdrawal_credentials=SHA256(credential.withdrawal_pk),
|
withdrawal_credentials=credential.withdrawal_credentials,
|
||||||
amount=credential.amount,
|
amount=credential.amount,
|
||||||
)
|
)
|
||||||
signed_deposit_datum = sign_deposit_data(deposit_datum, credential.signing_sk)
|
signed_deposit_datum = sign_deposit_data(deposit_datum, credential.signing_sk)
|
||||||
|
@ -40,7 +40,7 @@ def generate_mnemonic(language: str, words_path: str) -> str:
|
|||||||
return mnemonic
|
return mnemonic
|
||||||
|
|
||||||
|
|
||||||
def check_python_version():
|
def check_python_version() -> None:
|
||||||
'''
|
'''
|
||||||
Checks that the python version running is sufficient and exits if not.
|
Checks that the python version running is sufficient and exits if not.
|
||||||
'''
|
'''
|
||||||
@ -54,12 +54,12 @@ def check_python_version():
|
|||||||
'--num_validators',
|
'--num_validators',
|
||||||
prompt='Please choose how many validators you wish to run',
|
prompt='Please choose how many validators you wish to run',
|
||||||
required=True,
|
required=True,
|
||||||
type=int,
|
type=int, # type: ignore
|
||||||
)
|
)
|
||||||
@click.option(
|
@click.option(
|
||||||
'--mnemonic_language',
|
'--mnemonic_language',
|
||||||
prompt='Please choose your mnemonic language',
|
prompt='Please choose your mnemonic language',
|
||||||
type=click.Choice(languages, case_sensitive=False), # type: ignore
|
type=click.Choice(languages, case_sensitive=False),
|
||||||
default='english',
|
default='english',
|
||||||
)
|
)
|
||||||
@click.option(
|
@click.option(
|
||||||
|
@ -4,6 +4,7 @@ from secrets import randbits
|
|||||||
from typing import (
|
from typing import (
|
||||||
List,
|
List,
|
||||||
Optional,
|
Optional,
|
||||||
|
Sequence,
|
||||||
)
|
)
|
||||||
|
|
||||||
from eth2deposit.utils.crypto import (
|
from eth2deposit.utils.crypto import (
|
||||||
@ -12,11 +13,11 @@ from eth2deposit.utils.crypto import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def _get_word_list(language: str, path: str):
|
def _get_word_list(language: str, path: str) -> Sequence[str]:
|
||||||
return open(os.path.join(path, '%s.txt' % language)).readlines()
|
return open(os.path.join(path, '%s.txt' % language), encoding='utf-8').readlines()
|
||||||
|
|
||||||
|
|
||||||
def _get_word(*, word_list, index: int) -> str:
|
def _get_word(*, word_list: Sequence[str], index: int) -> str:
|
||||||
assert index < 2048
|
assert index < 2048
|
||||||
return word_list[index][:-1]
|
return word_list[index][:-1]
|
||||||
|
|
||||||
@ -30,7 +31,7 @@ def get_seed(*, mnemonic: str, password: str='') -> bytes:
|
|||||||
return PBKDF2(password=mnemonic, salt=salt, dklen=64, c=2048, prf='sha512')
|
return PBKDF2(password=mnemonic, salt=salt, dklen=64, c=2048, prf='sha512')
|
||||||
|
|
||||||
|
|
||||||
def get_languages(path) -> List[str]:
|
def get_languages(path: str) -> List[str]:
|
||||||
"""
|
"""
|
||||||
Walk the `path` and list all the languages with word-lists available.
|
Walk the `path` and list all the languages with word-lists available.
|
||||||
"""
|
"""
|
||||||
|
@ -6,6 +6,7 @@ from dataclasses import (
|
|||||||
)
|
)
|
||||||
import json
|
import json
|
||||||
from secrets import randbits
|
from secrets import randbits
|
||||||
|
from typing import Any, Dict, Union
|
||||||
from uuid import uuid4
|
from uuid import uuid4
|
||||||
from eth2deposit.utils.crypto import (
|
from eth2deposit.utils.crypto import (
|
||||||
AES_128_CTR,
|
AES_128_CTR,
|
||||||
@ -18,21 +19,21 @@ from py_ecc.bls import G2ProofOfPossession as bls
|
|||||||
hexdigits = set('0123456789abcdef')
|
hexdigits = set('0123456789abcdef')
|
||||||
|
|
||||||
|
|
||||||
def to_bytes(obj):
|
def encode_bytes(obj: Union[str, Dict[str, Any]]) -> Union[bytes, str, Dict[str, Any]]:
|
||||||
if isinstance(obj, str):
|
if isinstance(obj, str) and all(c in hexdigits for c in obj):
|
||||||
if all(c in hexdigits for c in obj):
|
return bytes.fromhex(obj)
|
||||||
return bytes.fromhex(obj)
|
|
||||||
elif isinstance(obj, dict):
|
elif isinstance(obj, dict):
|
||||||
for key, value in obj.items():
|
for key, value in obj.items():
|
||||||
obj[key] = to_bytes(value)
|
obj[key] = encode_bytes(value)
|
||||||
return obj
|
return obj
|
||||||
|
|
||||||
|
|
||||||
class BytesDataclass:
|
class BytesDataclass:
|
||||||
def __post_init__(self):
|
def __post_init__(self) -> None:
|
||||||
for field in fields(self):
|
for field in fields(self):
|
||||||
if field.type in (dict, bytes):
|
if field.type in (bytes, Dict[str, Any]):
|
||||||
self.__setattr__(field.name, to_bytes(self.__getattribute__(field.name)))
|
# Convert hexstring to bytes
|
||||||
|
self.__setattr__(field.name, encode_bytes(self.__getattribute__(field.name)))
|
||||||
|
|
||||||
def as_json(self) -> str:
|
def as_json(self) -> str:
|
||||||
return json.dumps(asdict(self), default=lambda x: x.hex())
|
return json.dumps(asdict(self), default=lambda x: x.hex())
|
||||||
@ -41,7 +42,7 @@ class BytesDataclass:
|
|||||||
@dataclass
|
@dataclass
|
||||||
class KeystoreModule(BytesDataclass):
|
class KeystoreModule(BytesDataclass):
|
||||||
function: str = ''
|
function: str = ''
|
||||||
params: dict = dataclass_field(default_factory=dict)
|
params: Dict[str, Any] = dataclass_field(default_factory=dict)
|
||||||
message: bytes = bytes()
|
message: bytes = bytes()
|
||||||
|
|
||||||
|
|
||||||
@ -52,7 +53,7 @@ class KeystoreCrypto(BytesDataclass):
|
|||||||
cipher: KeystoreModule = KeystoreModule()
|
cipher: KeystoreModule = KeystoreModule()
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_json(cls, json_dict: dict):
|
def from_json(cls, json_dict: Dict[Any, Any]) -> 'KeystoreCrypto':
|
||||||
kdf = KeystoreModule(**json_dict['kdf'])
|
kdf = KeystoreModule(**json_dict['kdf'])
|
||||||
checksum = KeystoreModule(**json_dict['checksum'])
|
checksum = KeystoreModule(**json_dict['checksum'])
|
||||||
cipher = KeystoreModule(**json_dict['cipher'])
|
cipher = KeystoreModule(**json_dict['cipher'])
|
||||||
@ -67,20 +68,20 @@ class Keystore(BytesDataclass):
|
|||||||
uuid: str = str(uuid4()) # Generate a new uuid
|
uuid: str = str(uuid4()) # Generate a new uuid
|
||||||
version: int = 4
|
version: int = 4
|
||||||
|
|
||||||
def kdf(self, **kwargs):
|
def kdf(self, **kwargs: Any) -> bytes:
|
||||||
return scrypt(**kwargs) if 'scrypt' in self.crypto.kdf.function else PBKDF2(**kwargs)
|
return scrypt(**kwargs) if 'scrypt' in self.crypto.kdf.function else PBKDF2(**kwargs)
|
||||||
|
|
||||||
def save(self, file: str):
|
def save(self, file: str) -> None:
|
||||||
with open(file, 'w') as f:
|
with open(file, 'w') as f:
|
||||||
f.write(self.as_json())
|
f.write(self.as_json())
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def open(cls, file: str):
|
def open(cls, file: str) -> 'Keystore':
|
||||||
with open(file, 'r') as f:
|
with open(file, 'r') as f:
|
||||||
return cls.from_json(f.read())
|
return cls.from_json(f.read())
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_json(cls, path: str):
|
def from_json(cls, path: str) -> 'Keystore':
|
||||||
with open(path, 'r') as f:
|
with open(path, 'r') as f:
|
||||||
json_dict = json.load(f)
|
json_dict = json.load(f)
|
||||||
crypto = KeystoreCrypto.from_json(json_dict['crypto'])
|
crypto = KeystoreCrypto.from_json(json_dict['crypto'])
|
||||||
@ -93,7 +94,7 @@ class Keystore(BytesDataclass):
|
|||||||
@classmethod
|
@classmethod
|
||||||
def encrypt(cls, *, secret: bytes, password: str, path: str='',
|
def encrypt(cls, *, secret: bytes, password: str, path: str='',
|
||||||
kdf_salt: bytes=randbits(256).to_bytes(32, 'big'),
|
kdf_salt: bytes=randbits(256).to_bytes(32, 'big'),
|
||||||
aes_iv: bytes=randbits(128).to_bytes(16, 'big')):
|
aes_iv: bytes=randbits(128).to_bytes(16, 'big')) -> 'Keystore':
|
||||||
keystore = cls()
|
keystore = cls()
|
||||||
keystore.crypto.kdf.params['salt'] = kdf_salt
|
keystore.crypto.kdf.params['salt'] = kdf_salt
|
||||||
decryption_key = keystore.kdf(password=password, **keystore.crypto.kdf.params)
|
decryption_key = keystore.kdf(password=password, **keystore.crypto.kdf.params)
|
||||||
|
@ -1,12 +1,15 @@
|
|||||||
import os
|
import os
|
||||||
|
|
||||||
|
|
||||||
|
# Spec constants
|
||||||
DOMAIN_DEPOSIT = bytes.fromhex('03000000')
|
DOMAIN_DEPOSIT = bytes.fromhex('03000000')
|
||||||
GENESIS_FORK_VERSION = bytes.fromhex('00000000')
|
GENESIS_FORK_VERSION = bytes.fromhex('00000000')
|
||||||
|
BLS_WITHDRAWAL_PREFIX = bytes.fromhex('00')
|
||||||
|
|
||||||
MIN_DEPOSIT_AMOUNT = 2 ** 0 * 10 ** 9
|
MIN_DEPOSIT_AMOUNT = 2 ** 0 * 10 ** 9
|
||||||
MAX_DEPOSIT_AMOUNT = 2 ** 5 * 10 ** 9
|
MAX_DEPOSIT_AMOUNT = 2 ** 5 * 10 ** 9
|
||||||
|
|
||||||
WORD_LISTS_PATH = os.path.join('eth2deposit', 'key_handling', 'key_derivation', 'word_lists')
|
|
||||||
|
|
||||||
|
# File/folder constants
|
||||||
|
WORD_LISTS_PATH = os.path.join('eth2deposit', 'key_handling', 'key_derivation', 'word_lists')
|
||||||
DEFAULT_VALIDATOR_KEYS_FOLDER_NAME = 'validator_keys'
|
DEFAULT_VALIDATOR_KEYS_FOLDER_NAME = 'validator_keys'
|
||||||
|
@ -1,3 +1,5 @@
|
|||||||
|
from typing import Any
|
||||||
|
|
||||||
from Crypto.Hash import (
|
from Crypto.Hash import (
|
||||||
SHA256 as _sha256,
|
SHA256 as _sha256,
|
||||||
SHA512 as _sha512,
|
SHA512 as _sha512,
|
||||||
@ -12,7 +14,7 @@ from Crypto.Cipher import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def SHA256(x):
|
def SHA256(x: bytes) -> bytes:
|
||||||
return _sha256.new(x).digest()
|
return _sha256.new(x).digest()
|
||||||
|
|
||||||
|
|
||||||
@ -35,5 +37,5 @@ def HKDF(*, salt: bytes, IKM: bytes, L: int) -> bytes:
|
|||||||
return res if isinstance(res, bytes) else res[0] # PyCryptodome can return Tuple[bytes]
|
return res if isinstance(res, bytes) else res[0] # PyCryptodome can return Tuple[bytes]
|
||||||
|
|
||||||
|
|
||||||
def AES_128_CTR(*, key: bytes, iv: bytes):
|
def AES_128_CTR(*, key: bytes, iv: bytes) -> Any:
|
||||||
return _AES.new(key=key, mode=_AES.MODE_CTR, initial_value=iv, nonce=b'')
|
return _AES.new(key=key, mode=_AES.MODE_CTR, initial_value=iv, nonce=b'')
|
||||||
|
@ -3,6 +3,8 @@ from eth_typing import (
|
|||||||
BLSPubkey,
|
BLSPubkey,
|
||||||
BLSSignature,
|
BLSSignature,
|
||||||
)
|
)
|
||||||
|
from typing import Any, Dict
|
||||||
|
|
||||||
from py_ecc.bls import G2ProofOfPossession as bls
|
from py_ecc.bls import G2ProofOfPossession as bls
|
||||||
|
|
||||||
from eth2deposit.utils.ssz import (
|
from eth2deposit.utils.ssz import (
|
||||||
@ -25,7 +27,7 @@ def verify_deposit_data_json(filefolder: str) -> bool:
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
def verify_deposit(deposit_data_dict: dict) -> bool:
|
def verify_deposit(deposit_data_dict: Dict[str, Any]) -> bool:
|
||||||
'''
|
'''
|
||||||
Checks whether a deposit is valid based on the eth2 rules.
|
Checks whether a deposit is valid based on the eth2 rules.
|
||||||
https://github.com/ethereum/eth2.0-specs/blob/dev/specs/phase0/beacon-chain.md#deposits
|
https://github.com/ethereum/eth2.0-specs/blob/dev/specs/phase0/beacon-chain.md#deposits
|
||||||
|
11
mypy.ini
11
mypy.ini
@ -1,3 +1,12 @@
|
|||||||
[mypy]
|
[mypy]
|
||||||
follow_imports = False
|
warn_unused_ignores = True
|
||||||
ignore_missing_imports = True
|
ignore_missing_imports = True
|
||||||
|
strict_optional = False
|
||||||
|
check_untyped_defs = True
|
||||||
|
disallow_incomplete_defs = True
|
||||||
|
disallow_untyped_defs = True
|
||||||
|
disallow_any_generics = True
|
||||||
|
disallow_untyped_calls = True
|
||||||
|
warn_redundant_casts = True
|
||||||
|
warn_unused_configs = True
|
||||||
|
strict_equality = True
|
||||||
|
@ -10,6 +10,18 @@ from eth2deposit.deposit import main
|
|||||||
from eth2deposit.utils.constants import DEFAULT_VALIDATOR_KEYS_FOLDER_NAME
|
from eth2deposit.utils.constants import DEFAULT_VALIDATOR_KEYS_FOLDER_NAME
|
||||||
|
|
||||||
|
|
||||||
|
def clean_key_folder(my_folder_path):
|
||||||
|
validator_keys_folder_path = os.path.join(my_folder_path, DEFAULT_VALIDATOR_KEYS_FOLDER_NAME)
|
||||||
|
if not os.path.exists(validator_keys_folder_path):
|
||||||
|
return
|
||||||
|
|
||||||
|
_, _, key_files = next(os.walk(validator_keys_folder_path))
|
||||||
|
for key_file_name in key_files:
|
||||||
|
os.remove(os.path.join(validator_keys_folder_path, key_file_name))
|
||||||
|
os.rmdir(validator_keys_folder_path)
|
||||||
|
os.rmdir(my_folder_path)
|
||||||
|
|
||||||
|
|
||||||
def test_deposit(monkeypatch):
|
def test_deposit(monkeypatch):
|
||||||
# monkeypatch get_mnemonic
|
# monkeypatch get_mnemonic
|
||||||
def get_mnemonic(language, words_path, entropy=None):
|
def get_mnemonic(language, words_path, entropy=None):
|
||||||
@ -19,6 +31,7 @@ def test_deposit(monkeypatch):
|
|||||||
|
|
||||||
# Prepare folder
|
# Prepare folder
|
||||||
my_folder_path = os.path.join(os.getcwd(), 'TESTING_TEMP_FOLDER')
|
my_folder_path = os.path.join(os.getcwd(), 'TESTING_TEMP_FOLDER')
|
||||||
|
clean_key_folder(my_folder_path)
|
||||||
if not os.path.exists(my_folder_path):
|
if not os.path.exists(my_folder_path):
|
||||||
os.mkdir(my_folder_path)
|
os.mkdir(my_folder_path)
|
||||||
|
|
||||||
@ -35,10 +48,7 @@ def test_deposit(monkeypatch):
|
|||||||
assert len(key_files) == 2
|
assert len(key_files) == 2
|
||||||
|
|
||||||
# Clean up
|
# Clean up
|
||||||
for key_file_name in key_files:
|
clean_key_folder(my_folder_path)
|
||||||
os.remove(os.path.join(validator_keys_folder_path, key_file_name))
|
|
||||||
os.rmdir(validator_keys_folder_path)
|
|
||||||
os.rmdir(my_folder_path)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
@ -93,7 +103,4 @@ async def test_script():
|
|||||||
_, _, key_files = next(os.walk(validator_keys_folder_path))
|
_, _, key_files = next(os.walk(validator_keys_folder_path))
|
||||||
|
|
||||||
# Clean up
|
# Clean up
|
||||||
for key_file_name in key_files:
|
clean_key_folder(my_folder_path)
|
||||||
os.remove(os.path.join(validator_keys_folder_path, key_file_name))
|
|
||||||
os.rmdir(validator_keys_folder_path)
|
|
||||||
os.rmdir(my_folder_path)
|
|
||||||
|
@ -12,7 +12,7 @@ WORD_LISTS_PATH = os.path.join(os.getcwd(), 'eth2deposit', 'key_handling', 'key_
|
|||||||
|
|
||||||
test_vector_filefolder = os.path.join('tests', 'test_key_handling',
|
test_vector_filefolder = os.path.join('tests', 'test_key_handling',
|
||||||
'test_key_derivation', 'test_vectors', 'mnemonic.json')
|
'test_key_derivation', 'test_vectors', 'mnemonic.json')
|
||||||
with open(test_vector_filefolder, 'r') as f:
|
with open(test_vector_filefolder, 'r', encoding='utf-8') as f:
|
||||||
test_vectors = json.load(f)
|
test_vectors = json.load(f)
|
||||||
|
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user