keystore.py (40334B)
1 #!/usr/bin/env python2 2 # -*- mode: python -*- 3 # 4 # Electrum - lightweight Bitcoin client 5 # Copyright (C) 2016 The Electrum developers 6 # 7 # Permission is hereby granted, free of charge, to any person 8 # obtaining a copy of this software and associated documentation files 9 # (the "Software"), to deal in the Software without restriction, 10 # including without limitation the rights to use, copy, modify, merge, 11 # publish, distribute, sublicense, and/or sell copies of the Software, 12 # and to permit persons to whom the Software is furnished to do so, 13 # subject to the following conditions: 14 # 15 # The above copyright notice and this permission notice shall be 16 # included in all copies or substantial portions of the Software. 17 # 18 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 19 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 20 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 21 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS 22 # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 23 # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 24 # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 25 # SOFTWARE. 26 27 from unicodedata import normalize 28 import hashlib 29 import re 30 from typing import Tuple, TYPE_CHECKING, Union, Sequence, Optional, Dict, List, NamedTuple 31 from functools import lru_cache 32 from abc import ABC, abstractmethod 33 34 from . import bitcoin, ecc, constants, bip32 35 from .bitcoin import deserialize_privkey, serialize_privkey, BaseDecodeError 36 from .transaction import Transaction, PartialTransaction, PartialTxInput, PartialTxOutput, TxInput 37 from .bip32 import (convert_bip32_path_to_list_of_uint32, BIP32_PRIME, 38 is_xpub, is_xprv, BIP32Node, normalize_bip32_derivation, 39 convert_bip32_intpath_to_strpath, is_xkey_consistent_with_key_origin_info) 40 from .ecc import string_to_number 41 from .crypto import (pw_decode, pw_encode, sha256, sha256d, PW_HASH_VERSION_LATEST, 42 SUPPORTED_PW_HASH_VERSIONS, UnsupportedPasswordHashVersion, hash_160) 43 from .util import (InvalidPassword, WalletFileException, 44 BitcoinException, bh2u, bfh, inv_dict, is_hex_str) 45 from .mnemonic import Mnemonic, Wordlist, seed_type, is_seed 46 from .plugin import run_hook 47 from .logging import Logger 48 49 if TYPE_CHECKING: 50 from .gui.qt.util import TaskThread 51 from .plugins.hw_wallet import HW_PluginBase, HardwareClientBase, HardwareHandlerBase 52 from .wallet_db import WalletDB 53 54 55 class CannotDerivePubkey(Exception): pass 56 57 58 class KeyStore(Logger, ABC): 59 type: str 60 61 def __init__(self): 62 Logger.__init__(self) 63 self.is_requesting_to_be_rewritten_to_wallet_file = False # type: bool 64 65 def has_seed(self) -> bool: 66 return False 67 68 def is_watching_only(self) -> bool: 69 return False 70 71 def can_import(self) -> bool: 72 return False 73 74 def get_type_text(self) -> str: 75 return f'{self.type}' 76 77 @abstractmethod 78 def may_have_password(self): 79 """Returns whether the keystore can be encrypted with a password.""" 80 pass 81 82 def _get_tx_derivations(self, tx: 'PartialTransaction') -> Dict[str, Union[Sequence[int], str]]: 83 keypairs = {} 84 for txin in tx.inputs(): 85 keypairs.update(self._get_txin_derivations(txin)) 86 return keypairs 87 88 def _get_txin_derivations(self, txin: 'PartialTxInput') -> Dict[str, Union[Sequence[int], str]]: 89 if txin.is_complete(): 90 return {} 91 keypairs = {} 92 for pubkey in txin.pubkeys: 93 if pubkey in txin.part_sigs: 94 # this pubkey already signed 95 continue 96 derivation = self.get_pubkey_derivation(pubkey, txin) 97 if not derivation: 98 continue 99 keypairs[pubkey.hex()] = derivation 100 return keypairs 101 102 def can_sign(self, tx: 'Transaction', *, ignore_watching_only=False) -> bool: 103 """Returns whether this keystore could sign *something* in this tx.""" 104 if not ignore_watching_only and self.is_watching_only(): 105 return False 106 if not isinstance(tx, PartialTransaction): 107 return False 108 return bool(self._get_tx_derivations(tx)) 109 110 def can_sign_txin(self, txin: 'TxInput', *, ignore_watching_only=False) -> bool: 111 """Returns whether this keystore could sign this txin.""" 112 if not ignore_watching_only and self.is_watching_only(): 113 return False 114 if not isinstance(txin, PartialTxInput): 115 return False 116 return bool(self._get_txin_derivations(txin)) 117 118 def ready_to_sign(self) -> bool: 119 return not self.is_watching_only() 120 121 @abstractmethod 122 def dump(self) -> dict: 123 pass 124 125 @abstractmethod 126 def is_deterministic(self) -> bool: 127 pass 128 129 @abstractmethod 130 def sign_message(self, sequence: 'AddressIndexGeneric', message, password) -> bytes: 131 pass 132 133 @abstractmethod 134 def decrypt_message(self, sequence: 'AddressIndexGeneric', message, password) -> bytes: 135 pass 136 137 @abstractmethod 138 def sign_transaction(self, tx: 'PartialTransaction', password) -> None: 139 pass 140 141 @abstractmethod 142 def get_pubkey_derivation(self, pubkey: bytes, 143 txinout: Union['PartialTxInput', 'PartialTxOutput'], 144 *, only_der_suffix=True) \ 145 -> Union[Sequence[int], str, None]: 146 """Returns either a derivation int-list if the pubkey can be HD derived from this keystore, 147 the pubkey itself (hex) if the pubkey belongs to the keystore but not HD derived, 148 or None if the pubkey is unrelated. 149 """ 150 pass 151 152 def find_my_pubkey_in_txinout( 153 self, txinout: Union['PartialTxInput', 'PartialTxOutput'], 154 *, only_der_suffix: bool = False 155 ) -> Tuple[Optional[bytes], Optional[List[int]]]: 156 # note: we assume that this cosigner only has one pubkey in this txin/txout 157 for pubkey in txinout.bip32_paths: 158 path = self.get_pubkey_derivation(pubkey, txinout, only_der_suffix=only_der_suffix) 159 if path and not isinstance(path, (str, bytes)): 160 return pubkey, list(path) 161 return None, None 162 163 164 class Software_KeyStore(KeyStore): 165 166 def __init__(self, d): 167 KeyStore.__init__(self) 168 self.pw_hash_version = d.get('pw_hash_version', 1) 169 if self.pw_hash_version not in SUPPORTED_PW_HASH_VERSIONS: 170 raise UnsupportedPasswordHashVersion(self.pw_hash_version) 171 172 def may_have_password(self): 173 return not self.is_watching_only() 174 175 def sign_message(self, sequence, message, password) -> bytes: 176 privkey, compressed = self.get_private_key(sequence, password) 177 key = ecc.ECPrivkey(privkey) 178 return key.sign_message(message, compressed) 179 180 def decrypt_message(self, sequence, message, password) -> bytes: 181 privkey, compressed = self.get_private_key(sequence, password) 182 ec = ecc.ECPrivkey(privkey) 183 decrypted = ec.decrypt_message(message) 184 return decrypted 185 186 def sign_transaction(self, tx, password): 187 if self.is_watching_only(): 188 return 189 # Raise if password is not correct. 190 self.check_password(password) 191 # Add private keys 192 keypairs = self._get_tx_derivations(tx) 193 for k, v in keypairs.items(): 194 keypairs[k] = self.get_private_key(v, password) 195 # Sign 196 if keypairs: 197 tx.sign(keypairs) 198 199 @abstractmethod 200 def update_password(self, old_password, new_password): 201 pass 202 203 @abstractmethod 204 def check_password(self, password): 205 pass 206 207 @abstractmethod 208 def get_private_key(self, sequence: 'AddressIndexGeneric', password) -> Tuple[bytes, bool]: 209 """Returns (privkey, is_compressed)""" 210 pass 211 212 213 class Imported_KeyStore(Software_KeyStore): 214 # keystore for imported private keys 215 216 type = 'imported' 217 218 def __init__(self, d): 219 Software_KeyStore.__init__(self, d) 220 self.keypairs = d.get('keypairs', {}) # type: Dict[str, str] 221 222 def is_deterministic(self): 223 return False 224 225 def dump(self): 226 return { 227 'type': self.type, 228 'keypairs': self.keypairs, 229 'pw_hash_version': self.pw_hash_version, 230 } 231 232 def can_import(self): 233 return True 234 235 def check_password(self, password): 236 pubkey = list(self.keypairs.keys())[0] 237 self.get_private_key(pubkey, password) 238 239 def import_privkey(self, sec, password): 240 txin_type, privkey, compressed = deserialize_privkey(sec) 241 pubkey = ecc.ECPrivkey(privkey).get_public_key_hex(compressed=compressed) 242 # re-serialize the key so the internal storage format is consistent 243 serialized_privkey = serialize_privkey( 244 privkey, compressed, txin_type, internal_use=True) 245 # NOTE: if the same pubkey is reused for multiple addresses (script types), 246 # there will only be one pubkey-privkey pair for it in self.keypairs, 247 # and the privkey will encode a txin_type but that txin_type cannot be trusted. 248 # Removing keys complicates this further. 249 self.keypairs[pubkey] = pw_encode(serialized_privkey, password, version=self.pw_hash_version) 250 return txin_type, pubkey 251 252 def delete_imported_key(self, key): 253 self.keypairs.pop(key) 254 255 def get_private_key(self, pubkey: str, password): 256 sec = pw_decode(self.keypairs[pubkey], password, version=self.pw_hash_version) 257 try: 258 txin_type, privkey, compressed = deserialize_privkey(sec) 259 except BaseDecodeError as e: 260 raise InvalidPassword() from e 261 if pubkey != ecc.ECPrivkey(privkey).get_public_key_hex(compressed=compressed): 262 raise InvalidPassword() 263 return privkey, compressed 264 265 def get_pubkey_derivation(self, pubkey, txin, *, only_der_suffix=True): 266 if pubkey.hex() in self.keypairs: 267 return pubkey.hex() 268 return None 269 270 def update_password(self, old_password, new_password): 271 self.check_password(old_password) 272 if new_password == '': 273 new_password = None 274 for k, v in self.keypairs.items(): 275 b = pw_decode(v, old_password, version=self.pw_hash_version) 276 c = pw_encode(b, new_password, version=PW_HASH_VERSION_LATEST) 277 self.keypairs[k] = c 278 self.pw_hash_version = PW_HASH_VERSION_LATEST 279 280 281 class Deterministic_KeyStore(Software_KeyStore): 282 283 def __init__(self, d): 284 Software_KeyStore.__init__(self, d) 285 self.seed = d.get('seed', '') 286 self.passphrase = d.get('passphrase', '') 287 288 def is_deterministic(self): 289 return True 290 291 def dump(self): 292 d = { 293 'type': self.type, 294 'pw_hash_version': self.pw_hash_version, 295 } 296 if self.seed: 297 d['seed'] = self.seed 298 if self.passphrase: 299 d['passphrase'] = self.passphrase 300 return d 301 302 def has_seed(self): 303 return bool(self.seed) 304 305 def is_watching_only(self): 306 return not self.has_seed() 307 308 @abstractmethod 309 def format_seed(self, seed: str) -> str: 310 pass 311 312 def add_seed(self, seed): 313 if self.seed: 314 raise Exception("a seed exists") 315 self.seed = self.format_seed(seed) 316 317 def get_seed(self, password): 318 if not self.has_seed(): 319 raise Exception("This wallet has no seed words") 320 return pw_decode(self.seed, password, version=self.pw_hash_version) 321 322 def get_passphrase(self, password): 323 if self.passphrase: 324 return pw_decode(self.passphrase, password, version=self.pw_hash_version) 325 else: 326 return '' 327 328 329 class MasterPublicKeyMixin(ABC): 330 331 @abstractmethod 332 def get_master_public_key(self) -> str: 333 pass 334 335 @abstractmethod 336 def get_derivation_prefix(self) -> Optional[str]: 337 """Returns to bip32 path from some root node to self.xpub 338 Note that the return value might be None; if it is unknown. 339 """ 340 pass 341 342 @abstractmethod 343 def get_root_fingerprint(self) -> Optional[str]: 344 """Returns the bip32 fingerprint of the top level node. 345 This top level node is the node at the beginning of the derivation prefix, 346 i.e. applying the derivation prefix to it will result self.xpub 347 Note that the return value might be None; if it is unknown. 348 """ 349 pass 350 351 @abstractmethod 352 def get_fp_and_derivation_to_be_used_in_partial_tx( 353 self, 354 der_suffix: Sequence[int], 355 *, 356 only_der_suffix: bool, 357 ) -> Tuple[bytes, Sequence[int]]: 358 """Returns fingerprint and derivation path corresponding to a derivation suffix. 359 The fingerprint is either the root fp or the intermediate fp, depending on what is available 360 and 'only_der_suffix', and the derivation path is adjusted accordingly. 361 """ 362 pass 363 364 @abstractmethod 365 def derive_pubkey(self, for_change: int, n: int) -> bytes: 366 """Returns pubkey at given path. 367 May raise CannotDerivePubkey. 368 """ 369 pass 370 371 def get_pubkey_derivation( 372 self, 373 pubkey: bytes, 374 txinout: Union['PartialTxInput', 'PartialTxOutput'], 375 *, 376 only_der_suffix=True, 377 ) -> Union[Sequence[int], str, None]: 378 EXPECTED_DER_SUFFIX_LEN = 2 379 def test_der_suffix_against_pubkey(der_suffix: Sequence[int], pubkey: bytes) -> bool: 380 if len(der_suffix) != EXPECTED_DER_SUFFIX_LEN: 381 return False 382 try: 383 if pubkey != self.derive_pubkey(*der_suffix): 384 return False 385 except CannotDerivePubkey: 386 return False 387 return True 388 389 if pubkey not in txinout.bip32_paths: 390 return None 391 fp_found, path_found = txinout.bip32_paths[pubkey] 392 der_suffix = None 393 full_path = None 394 # 1. try fp against our root 395 ks_root_fingerprint_hex = self.get_root_fingerprint() 396 ks_der_prefix_str = self.get_derivation_prefix() 397 ks_der_prefix = convert_bip32_path_to_list_of_uint32(ks_der_prefix_str) if ks_der_prefix_str else None 398 if (ks_root_fingerprint_hex is not None and ks_der_prefix is not None and 399 fp_found.hex() == ks_root_fingerprint_hex): 400 if path_found[:len(ks_der_prefix)] == ks_der_prefix: 401 der_suffix = path_found[len(ks_der_prefix):] 402 if not test_der_suffix_against_pubkey(der_suffix, pubkey): 403 der_suffix = None 404 # 2. try fp against our intermediate fingerprint 405 if (der_suffix is None and isinstance(self, Xpub) and 406 fp_found == self.get_bip32_node_for_xpub().calc_fingerprint_of_this_node()): 407 der_suffix = path_found 408 if not test_der_suffix_against_pubkey(der_suffix, pubkey): 409 der_suffix = None 410 # 3. hack/bruteforce: ignore fp and check pubkey anyway 411 # This is only to resolve the following scenario/problem: 412 # problem: if we don't know our root fp, but tx contains root fp and full path, 413 # we will miss the pubkey (false negative match). Though it might still work 414 # within gap limit due to tx.add_info_from_wallet overwriting the fields. 415 # Example: keystore has intermediate xprv without root fp; tx contains root fp and full path. 416 if der_suffix is None: 417 der_suffix = path_found[-EXPECTED_DER_SUFFIX_LEN:] 418 if not test_der_suffix_against_pubkey(der_suffix, pubkey): 419 der_suffix = None 420 # if all attempts/methods failed, we give up now: 421 if der_suffix is None: 422 return None 423 if ks_der_prefix is not None: 424 full_path = ks_der_prefix + list(der_suffix) 425 return der_suffix if only_der_suffix else full_path 426 427 428 class Xpub(MasterPublicKeyMixin): 429 430 def __init__(self, *, derivation_prefix: str = None, root_fingerprint: str = None): 431 self.xpub = None 432 self.xpub_receive = None 433 self.xpub_change = None 434 self._xpub_bip32_node = None # type: Optional[BIP32Node] 435 436 # "key origin" info (subclass should persist these): 437 self._derivation_prefix = derivation_prefix # type: Optional[str] 438 self._root_fingerprint = root_fingerprint # type: Optional[str] 439 440 def get_master_public_key(self): 441 return self.xpub 442 443 def get_bip32_node_for_xpub(self) -> Optional[BIP32Node]: 444 if self._xpub_bip32_node is None: 445 if self.xpub is None: 446 return None 447 self._xpub_bip32_node = BIP32Node.from_xkey(self.xpub) 448 return self._xpub_bip32_node 449 450 def get_derivation_prefix(self) -> Optional[str]: 451 return self._derivation_prefix 452 453 def get_root_fingerprint(self) -> Optional[str]: 454 return self._root_fingerprint 455 456 def get_fp_and_derivation_to_be_used_in_partial_tx( 457 self, 458 der_suffix: Sequence[int], 459 *, 460 only_der_suffix: bool, 461 ) -> Tuple[bytes, Sequence[int]]: 462 fingerprint_hex = self.get_root_fingerprint() 463 der_prefix_str = self.get_derivation_prefix() 464 if not only_der_suffix and fingerprint_hex is not None and der_prefix_str is not None: 465 # use root fp, and true full path 466 fingerprint_bytes = bfh(fingerprint_hex) 467 der_prefix_ints = convert_bip32_path_to_list_of_uint32(der_prefix_str) 468 else: 469 # use intermediate fp, and claim der suffix is the full path 470 fingerprint_bytes = self.get_bip32_node_for_xpub().calc_fingerprint_of_this_node() 471 der_prefix_ints = convert_bip32_path_to_list_of_uint32('m') 472 der_full = der_prefix_ints + list(der_suffix) 473 return fingerprint_bytes, der_full 474 475 def get_xpub_to_be_used_in_partial_tx(self, *, only_der_suffix: bool) -> str: 476 assert self.xpub 477 fp_bytes, der_full = self.get_fp_and_derivation_to_be_used_in_partial_tx(der_suffix=[], 478 only_der_suffix=only_der_suffix) 479 bip32node = self.get_bip32_node_for_xpub() 480 depth = len(der_full) 481 child_number_int = der_full[-1] if len(der_full) >= 1 else 0 482 child_number_bytes = child_number_int.to_bytes(length=4, byteorder="big") 483 fingerprint = bytes(4) if depth == 0 else bip32node.fingerprint 484 bip32node = bip32node._replace(depth=depth, 485 fingerprint=fingerprint, 486 child_number=child_number_bytes) 487 return bip32node.to_xpub() 488 489 def add_key_origin_from_root_node(self, *, derivation_prefix: str, root_node: BIP32Node): 490 assert self.xpub 491 # try to derive ourselves from what we were given 492 child_node1 = root_node.subkey_at_private_derivation(derivation_prefix) 493 child_pubkey_bytes1 = child_node1.eckey.get_public_key_bytes(compressed=True) 494 child_node2 = self.get_bip32_node_for_xpub() 495 child_pubkey_bytes2 = child_node2.eckey.get_public_key_bytes(compressed=True) 496 if child_pubkey_bytes1 != child_pubkey_bytes2: 497 raise Exception("(xpub, derivation_prefix, root_node) inconsistency") 498 self.add_key_origin(derivation_prefix=derivation_prefix, 499 root_fingerprint=root_node.calc_fingerprint_of_this_node().hex().lower()) 500 501 def add_key_origin(self, *, derivation_prefix: str = None, root_fingerprint: str = None) -> None: 502 assert self.xpub 503 if not (root_fingerprint is None or (is_hex_str(root_fingerprint) and len(root_fingerprint) == 8)): 504 raise Exception("root fp must be 8 hex characters") 505 derivation_prefix = normalize_bip32_derivation(derivation_prefix) 506 if not is_xkey_consistent_with_key_origin_info(self.xpub, 507 derivation_prefix=derivation_prefix, 508 root_fingerprint=root_fingerprint): 509 raise Exception("xpub inconsistent with provided key origin info") 510 if root_fingerprint is not None: 511 self._root_fingerprint = root_fingerprint 512 if derivation_prefix is not None: 513 self._derivation_prefix = derivation_prefix 514 self.is_requesting_to_be_rewritten_to_wallet_file = True 515 516 @lru_cache(maxsize=None) 517 def derive_pubkey(self, for_change: int, n: int) -> bytes: 518 for_change = int(for_change) 519 if for_change not in (0, 1): 520 raise CannotDerivePubkey("forbidden path") 521 xpub = self.xpub_change if for_change else self.xpub_receive 522 if xpub is None: 523 rootnode = self.get_bip32_node_for_xpub() 524 xpub = rootnode.subkey_at_public_derivation((for_change,)).to_xpub() 525 if for_change: 526 self.xpub_change = xpub 527 else: 528 self.xpub_receive = xpub 529 return self.get_pubkey_from_xpub(xpub, (n,)) 530 531 @classmethod 532 def get_pubkey_from_xpub(self, xpub: str, sequence) -> bytes: 533 node = BIP32Node.from_xkey(xpub).subkey_at_public_derivation(sequence) 534 return node.eckey.get_public_key_bytes(compressed=True) 535 536 537 class BIP32_KeyStore(Xpub, Deterministic_KeyStore): 538 539 type = 'bip32' 540 541 def __init__(self, d): 542 Xpub.__init__(self, derivation_prefix=d.get('derivation'), root_fingerprint=d.get('root_fingerprint')) 543 Deterministic_KeyStore.__init__(self, d) 544 self.xpub = d.get('xpub') 545 self.xprv = d.get('xprv') 546 547 def format_seed(self, seed): 548 return ' '.join(seed.split()) 549 550 def dump(self): 551 d = Deterministic_KeyStore.dump(self) 552 d['xpub'] = self.xpub 553 d['xprv'] = self.xprv 554 d['derivation'] = self.get_derivation_prefix() 555 d['root_fingerprint'] = self.get_root_fingerprint() 556 return d 557 558 def get_master_private_key(self, password): 559 return pw_decode(self.xprv, password, version=self.pw_hash_version) 560 561 def check_password(self, password): 562 xprv = pw_decode(self.xprv, password, version=self.pw_hash_version) 563 try: 564 bip32node = BIP32Node.from_xkey(xprv) 565 except BaseDecodeError as e: 566 raise InvalidPassword() from e 567 if bip32node.chaincode != self.get_bip32_node_for_xpub().chaincode: 568 raise InvalidPassword() 569 570 def update_password(self, old_password, new_password): 571 self.check_password(old_password) 572 if new_password == '': 573 new_password = None 574 if self.has_seed(): 575 decoded = self.get_seed(old_password) 576 self.seed = pw_encode(decoded, new_password, version=PW_HASH_VERSION_LATEST) 577 if self.passphrase: 578 decoded = self.get_passphrase(old_password) 579 self.passphrase = pw_encode(decoded, new_password, version=PW_HASH_VERSION_LATEST) 580 if self.xprv is not None: 581 b = pw_decode(self.xprv, old_password, version=self.pw_hash_version) 582 self.xprv = pw_encode(b, new_password, version=PW_HASH_VERSION_LATEST) 583 self.pw_hash_version = PW_HASH_VERSION_LATEST 584 585 def is_watching_only(self): 586 return self.xprv is None 587 588 def add_xpub(self, xpub): 589 assert is_xpub(xpub) 590 self.xpub = xpub 591 root_fingerprint, derivation_prefix = bip32.root_fp_and_der_prefix_from_xkey(xpub) 592 self.add_key_origin(derivation_prefix=derivation_prefix, root_fingerprint=root_fingerprint) 593 594 def add_xprv(self, xprv): 595 assert is_xprv(xprv) 596 self.xprv = xprv 597 self.add_xpub(bip32.xpub_from_xprv(xprv)) 598 599 def add_xprv_from_seed(self, bip32_seed, xtype, derivation): 600 rootnode = BIP32Node.from_rootseed(bip32_seed, xtype=xtype) 601 node = rootnode.subkey_at_private_derivation(derivation) 602 self.add_xprv(node.to_xprv()) 603 self.add_key_origin_from_root_node(derivation_prefix=derivation, root_node=rootnode) 604 605 def get_private_key(self, sequence: Sequence[int], password): 606 xprv = self.get_master_private_key(password) 607 node = BIP32Node.from_xkey(xprv).subkey_at_private_derivation(sequence) 608 pk = node.eckey.get_secret_bytes() 609 return pk, True 610 611 def get_keypair(self, sequence, password): 612 k, _ = self.get_private_key(sequence, password) 613 cK = ecc.ECPrivkey(k).get_public_key_bytes() 614 return cK, k 615 616 617 class Old_KeyStore(MasterPublicKeyMixin, Deterministic_KeyStore): 618 619 type = 'old' 620 621 def __init__(self, d): 622 Deterministic_KeyStore.__init__(self, d) 623 self.mpk = d.get('mpk') 624 self._root_fingerprint = None 625 626 def get_hex_seed(self, password): 627 return pw_decode(self.seed, password, version=self.pw_hash_version).encode('utf8') 628 629 def dump(self): 630 d = Deterministic_KeyStore.dump(self) 631 d['mpk'] = self.mpk 632 return d 633 634 def add_seed(self, seedphrase): 635 Deterministic_KeyStore.add_seed(self, seedphrase) 636 s = self.get_hex_seed(None) 637 self.mpk = self.mpk_from_seed(s) 638 639 def add_master_public_key(self, mpk): 640 self.mpk = mpk 641 642 def format_seed(self, seed): 643 from . import old_mnemonic, mnemonic 644 seed = mnemonic.normalize_text(seed) 645 # see if seed was entered as hex 646 if seed: 647 try: 648 bfh(seed) 649 return str(seed) 650 except Exception: 651 pass 652 words = seed.split() 653 seed = old_mnemonic.mn_decode(words) 654 if not seed: 655 raise Exception("Invalid seed") 656 return seed 657 658 def get_seed(self, password): 659 from . import old_mnemonic 660 s = self.get_hex_seed(password) 661 return ' '.join(old_mnemonic.mn_encode(s)) 662 663 @classmethod 664 def mpk_from_seed(klass, seed): 665 secexp = klass.stretch_key(seed) 666 privkey = ecc.ECPrivkey.from_secret_scalar(secexp) 667 return privkey.get_public_key_hex(compressed=False)[2:] 668 669 @classmethod 670 def stretch_key(self, seed): 671 x = seed 672 for i in range(100000): 673 x = hashlib.sha256(x + seed).digest() 674 return string_to_number(x) 675 676 @classmethod 677 def get_sequence(self, mpk, for_change, n): 678 return string_to_number(sha256d(("%d:%d:"%(n, for_change)).encode('ascii') + bfh(mpk))) 679 680 @classmethod 681 def get_pubkey_from_mpk(cls, mpk, for_change, n) -> bytes: 682 z = cls.get_sequence(mpk, for_change, n) 683 master_public_key = ecc.ECPubkey(bfh('04'+mpk)) 684 public_key = master_public_key + z*ecc.GENERATOR 685 return public_key.get_public_key_bytes(compressed=False) 686 687 @lru_cache(maxsize=None) 688 def derive_pubkey(self, for_change, n) -> bytes: 689 for_change = int(for_change) 690 if for_change not in (0, 1): 691 raise CannotDerivePubkey("forbidden path") 692 return self.get_pubkey_from_mpk(self.mpk, for_change, n) 693 694 def _get_private_key_from_stretched_exponent(self, for_change, n, secexp): 695 secexp = (secexp + self.get_sequence(self.mpk, for_change, n)) % ecc.CURVE_ORDER 696 pk = int.to_bytes(secexp, length=32, byteorder='big', signed=False) 697 return pk 698 699 def get_private_key(self, sequence: Sequence[int], password): 700 seed = self.get_hex_seed(password) 701 secexp = self.stretch_key(seed) 702 self._check_seed(seed, secexp=secexp) 703 for_change, n = sequence 704 pk = self._get_private_key_from_stretched_exponent(for_change, n, secexp) 705 return pk, False 706 707 def _check_seed(self, seed, *, secexp=None): 708 if secexp is None: 709 secexp = self.stretch_key(seed) 710 master_private_key = ecc.ECPrivkey.from_secret_scalar(secexp) 711 master_public_key = master_private_key.get_public_key_bytes(compressed=False)[1:] 712 if master_public_key != bfh(self.mpk): 713 raise InvalidPassword() 714 715 def check_password(self, password): 716 seed = self.get_hex_seed(password) 717 self._check_seed(seed) 718 719 def get_master_public_key(self): 720 return self.mpk 721 722 def get_derivation_prefix(self) -> str: 723 return 'm' 724 725 def get_root_fingerprint(self) -> str: 726 if self._root_fingerprint is None: 727 master_public_key = ecc.ECPubkey(bfh('04'+self.mpk)) 728 xfp = hash_160(master_public_key.get_public_key_bytes(compressed=True))[0:4] 729 self._root_fingerprint = xfp.hex().lower() 730 return self._root_fingerprint 731 732 def get_fp_and_derivation_to_be_used_in_partial_tx( 733 self, 734 der_suffix: Sequence[int], 735 *, 736 only_der_suffix: bool, 737 ) -> Tuple[bytes, Sequence[int]]: 738 fingerprint_hex = self.get_root_fingerprint() 739 der_prefix_str = self.get_derivation_prefix() 740 fingerprint_bytes = bfh(fingerprint_hex) 741 der_prefix_ints = convert_bip32_path_to_list_of_uint32(der_prefix_str) 742 der_full = der_prefix_ints + list(der_suffix) 743 return fingerprint_bytes, der_full 744 745 def update_password(self, old_password, new_password): 746 self.check_password(old_password) 747 if new_password == '': 748 new_password = None 749 if self.has_seed(): 750 decoded = pw_decode(self.seed, old_password, version=self.pw_hash_version) 751 self.seed = pw_encode(decoded, new_password, version=PW_HASH_VERSION_LATEST) 752 self.pw_hash_version = PW_HASH_VERSION_LATEST 753 754 755 class Hardware_KeyStore(Xpub, KeyStore): 756 hw_type: str 757 device: str 758 plugin: 'HW_PluginBase' 759 thread: Optional['TaskThread'] = None 760 761 type = 'hardware' 762 763 def __init__(self, d): 764 Xpub.__init__(self, derivation_prefix=d.get('derivation'), root_fingerprint=d.get('root_fingerprint')) 765 KeyStore.__init__(self) 766 # Errors and other user interaction is done through the wallet's 767 # handler. The handler is per-window and preserved across 768 # device reconnects 769 self.xpub = d.get('xpub') 770 self.label = d.get('label') 771 self.soft_device_id = d.get('soft_device_id') # type: Optional[str] 772 self.handler = None # type: Optional[HardwareHandlerBase] 773 run_hook('init_keystore', self) 774 775 def set_label(self, label): 776 self.label = label 777 778 def may_have_password(self): 779 return False 780 781 def is_deterministic(self): 782 return True 783 784 def get_type_text(self) -> str: 785 return f'hw[{self.hw_type}]' 786 787 def dump(self): 788 return { 789 'type': self.type, 790 'hw_type': self.hw_type, 791 'xpub': self.xpub, 792 'derivation': self.get_derivation_prefix(), 793 'root_fingerprint': self.get_root_fingerprint(), 794 'label':self.label, 795 'soft_device_id': self.soft_device_id, 796 } 797 798 def unpaired(self): 799 '''A device paired with the wallet was disconnected. This can be 800 called in any thread context.''' 801 self.logger.info("unpaired") 802 803 def paired(self): 804 '''A device paired with the wallet was (re-)connected. This can be 805 called in any thread context.''' 806 self.logger.info("paired") 807 808 def is_watching_only(self): 809 '''The wallet is not watching-only; the user will be prompted for 810 pin and passphrase as appropriate when needed.''' 811 assert not self.has_seed() 812 return False 813 814 def get_password_for_storage_encryption(self) -> str: 815 client = self.plugin.get_client(self) 816 return client.get_password_for_storage_encryption() 817 818 def has_usable_connection_with_device(self) -> bool: 819 if not hasattr(self, 'plugin'): 820 return False 821 client = self.plugin.get_client(self, force_pair=False) 822 if client is None: 823 return False 824 return client.has_usable_connection_with_device() 825 826 def ready_to_sign(self): 827 return super().ready_to_sign() and self.has_usable_connection_with_device() 828 829 def opportunistically_fill_in_missing_info_from_device(self, client: 'HardwareClientBase'): 830 assert client is not None 831 if self._root_fingerprint is None: 832 self._root_fingerprint = client.request_root_fingerprint_from_device() 833 self.is_requesting_to_be_rewritten_to_wallet_file = True 834 if self.label != client.label(): 835 self.label = client.label() 836 self.is_requesting_to_be_rewritten_to_wallet_file = True 837 if self.soft_device_id != client.get_soft_device_id(): 838 self.soft_device_id = client.get_soft_device_id() 839 self.is_requesting_to_be_rewritten_to_wallet_file = True 840 841 842 KeyStoreWithMPK = Union[KeyStore, MasterPublicKeyMixin] # intersection really... 843 AddressIndexGeneric = Union[Sequence[int], str] # can be hex pubkey str 844 845 846 def bip39_normalize_passphrase(passphrase): 847 return normalize('NFKD', passphrase or '') 848 849 def bip39_to_seed(mnemonic, passphrase): 850 import hashlib, hmac 851 PBKDF2_ROUNDS = 2048 852 mnemonic = normalize('NFKD', ' '.join(mnemonic.split())) 853 passphrase = bip39_normalize_passphrase(passphrase) 854 return hashlib.pbkdf2_hmac('sha512', mnemonic.encode('utf-8'), 855 b'mnemonic' + passphrase.encode('utf-8'), iterations = PBKDF2_ROUNDS) 856 857 858 def bip39_is_checksum_valid(mnemonic: str) -> Tuple[bool, bool]: 859 """Test checksum of bip39 mnemonic assuming English wordlist. 860 Returns tuple (is_checksum_valid, is_wordlist_valid) 861 """ 862 words = [ normalize('NFKD', word) for word in mnemonic.split() ] 863 words_len = len(words) 864 wordlist = Wordlist.from_file("english.txt") 865 n = len(wordlist) 866 i = 0 867 words.reverse() 868 while words: 869 w = words.pop() 870 try: 871 k = wordlist.index(w) 872 except ValueError: 873 return False, False 874 i = i*n + k 875 if words_len not in [12, 15, 18, 21, 24]: 876 return False, True 877 checksum_length = 11 * words_len // 33 # num bits 878 entropy_length = 32 * checksum_length # num bits 879 entropy = i >> checksum_length 880 checksum = i % 2**checksum_length 881 entropy_bytes = int.to_bytes(entropy, length=entropy_length//8, byteorder="big") 882 hashed = int.from_bytes(sha256(entropy_bytes), byteorder="big") 883 calculated_checksum = hashed >> (256 - checksum_length) 884 return checksum == calculated_checksum, True 885 886 887 def from_bip39_seed(seed, passphrase, derivation, xtype=None): 888 k = BIP32_KeyStore({}) 889 bip32_seed = bip39_to_seed(seed, passphrase) 890 if xtype is None: 891 xtype = xtype_from_derivation(derivation) 892 k.add_xprv_from_seed(bip32_seed, xtype, derivation) 893 return k 894 895 896 PURPOSE48_SCRIPT_TYPES = { 897 'p2wsh-p2sh': 1, # specifically multisig 898 'p2wsh': 2, # specifically multisig 899 } 900 PURPOSE48_SCRIPT_TYPES_INV = inv_dict(PURPOSE48_SCRIPT_TYPES) 901 902 903 def xtype_from_derivation(derivation: str) -> str: 904 """Returns the script type to be used for this derivation.""" 905 bip32_indices = convert_bip32_path_to_list_of_uint32(derivation) 906 if len(bip32_indices) >= 1: 907 if bip32_indices[0] == 84 + BIP32_PRIME: 908 return 'p2wpkh' 909 elif bip32_indices[0] == 49 + BIP32_PRIME: 910 return 'p2wpkh-p2sh' 911 elif bip32_indices[0] == 44 + BIP32_PRIME: 912 return 'standard' 913 elif bip32_indices[0] == 45 + BIP32_PRIME: 914 return 'standard' 915 916 if len(bip32_indices) >= 4: 917 if bip32_indices[0] == 48 + BIP32_PRIME: 918 # m / purpose' / coin_type' / account' / script_type' / change / address_index 919 script_type_int = bip32_indices[3] - BIP32_PRIME 920 script_type = PURPOSE48_SCRIPT_TYPES_INV.get(script_type_int) 921 if script_type is not None: 922 return script_type 923 return 'standard' 924 925 926 hw_keystores = {} 927 928 def register_keystore(hw_type, constructor): 929 hw_keystores[hw_type] = constructor 930 931 def hardware_keystore(d) -> Hardware_KeyStore: 932 hw_type = d['hw_type'] 933 if hw_type in hw_keystores: 934 constructor = hw_keystores[hw_type] 935 return constructor(d) 936 raise WalletFileException(f'unknown hardware type: {hw_type}. ' 937 f'hw_keystores: {list(hw_keystores)}') 938 939 def load_keystore(db: 'WalletDB', name: str) -> KeyStore: 940 d = db.get(name, {}) 941 t = d.get('type') 942 if not t: 943 raise WalletFileException( 944 'Wallet format requires update.\n' 945 'Cannot find keystore for name {}'.format(name)) 946 keystore_constructors = {ks.type: ks for ks in [Old_KeyStore, Imported_KeyStore, BIP32_KeyStore]} 947 keystore_constructors['hardware'] = hardware_keystore 948 try: 949 ks_constructor = keystore_constructors[t] 950 except KeyError: 951 raise WalletFileException(f'Unknown type {t} for keystore named {name}') 952 k = ks_constructor(d) 953 return k 954 955 956 def is_old_mpk(mpk: str) -> bool: 957 try: 958 int(mpk, 16) # test if hex string 959 except: 960 return False 961 if len(mpk) != 128: 962 return False 963 try: 964 ecc.ECPubkey(bfh('04' + mpk)) 965 except: 966 return False 967 return True 968 969 970 def is_address_list(text): 971 parts = text.split() 972 return bool(parts) and all(bitcoin.is_address(x) for x in parts) 973 974 975 def get_private_keys(text, *, allow_spaces_inside_key=True, raise_on_error=False): 976 if allow_spaces_inside_key: # see #1612 977 parts = text.split('\n') 978 parts = map(lambda x: ''.join(x.split()), parts) 979 parts = list(filter(bool, parts)) 980 else: 981 parts = text.split() 982 if bool(parts) and all(bitcoin.is_private_key(x, raise_on_error=raise_on_error) for x in parts): 983 return parts 984 985 986 def is_private_key_list(text, *, allow_spaces_inside_key=True, raise_on_error=False): 987 return bool(get_private_keys(text, 988 allow_spaces_inside_key=allow_spaces_inside_key, 989 raise_on_error=raise_on_error)) 990 991 992 def is_master_key(x): 993 return is_old_mpk(x) or is_bip32_key(x) 994 995 996 def is_bip32_key(x): 997 return is_xprv(x) or is_xpub(x) 998 999 1000 def bip44_derivation(account_id, bip43_purpose=44): 1001 coin = constants.net.BIP44_COIN_TYPE 1002 der = "m/%d'/%d'/%d'" % (bip43_purpose, coin, int(account_id)) 1003 return normalize_bip32_derivation(der) 1004 1005 1006 def purpose48_derivation(account_id: int, xtype: str) -> str: 1007 # m / purpose' / coin_type' / account' / script_type' / change / address_index 1008 bip43_purpose = 48 1009 coin = constants.net.BIP44_COIN_TYPE 1010 account_id = int(account_id) 1011 script_type_int = PURPOSE48_SCRIPT_TYPES.get(xtype) 1012 if script_type_int is None: 1013 raise Exception('unknown xtype: {}'.format(xtype)) 1014 der = "m/%d'/%d'/%d'/%d'" % (bip43_purpose, coin, account_id, script_type_int) 1015 return normalize_bip32_derivation(der) 1016 1017 1018 def from_seed(seed, passphrase, is_p2sh=False): 1019 t = seed_type(seed) 1020 if t == 'old': 1021 keystore = Old_KeyStore({}) 1022 keystore.add_seed(seed) 1023 elif t in ['standard', 'segwit']: 1024 keystore = BIP32_KeyStore({}) 1025 keystore.add_seed(seed) 1026 keystore.passphrase = passphrase 1027 bip32_seed = Mnemonic.mnemonic_to_seed(seed, passphrase) 1028 if t == 'standard': 1029 der = "m/" 1030 xtype = 'standard' 1031 else: 1032 der = "m/1'/" if is_p2sh else "m/0'/" 1033 xtype = 'p2wsh' if is_p2sh else 'p2wpkh' 1034 keystore.add_xprv_from_seed(bip32_seed, xtype, der) 1035 else: 1036 raise BitcoinException('Unexpected seed type {}'.format(repr(t))) 1037 return keystore 1038 1039 def from_private_key_list(text): 1040 keystore = Imported_KeyStore({}) 1041 for x in get_private_keys(text): 1042 keystore.import_privkey(x, None) 1043 return keystore 1044 1045 def from_old_mpk(mpk): 1046 keystore = Old_KeyStore({}) 1047 keystore.add_master_public_key(mpk) 1048 return keystore 1049 1050 def from_xpub(xpub): 1051 k = BIP32_KeyStore({}) 1052 k.add_xpub(xpub) 1053 return k 1054 1055 def from_xprv(xprv): 1056 k = BIP32_KeyStore({}) 1057 k.add_xprv(xprv) 1058 return k 1059 1060 def from_master_key(text): 1061 if is_xprv(text): 1062 k = from_xprv(text) 1063 elif is_old_mpk(text): 1064 k = from_old_mpk(text) 1065 elif is_xpub(text): 1066 k = from_xpub(text) 1067 else: 1068 raise BitcoinException('Invalid master key') 1069 return k