electrum

Electrum Bitcoin wallet
git clone https://git.parazyd.org/electrum
Log | Files | Refs | Submodules

keystore.py (40334B)


      1 #!/usr/bin/env python2
      2 # -*- mode: python -*-
      3 #
      4 # Electrum - lightweight Bitcoin client
      5 # Copyright (C) 2016  The Electrum developers
      6 #
      7 # Permission is hereby granted, free of charge, to any person
      8 # obtaining a copy of this software and associated documentation files
      9 # (the "Software"), to deal in the Software without restriction,
     10 # including without limitation the rights to use, copy, modify, merge,
     11 # publish, distribute, sublicense, and/or sell copies of the Software,
     12 # and to permit persons to whom the Software is furnished to do so,
     13 # subject to the following conditions:
     14 #
     15 # The above copyright notice and this permission notice shall be
     16 # included in all copies or substantial portions of the Software.
     17 #
     18 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
     19 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
     20 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
     21 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
     22 # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
     23 # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
     24 # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
     25 # SOFTWARE.
     26 
     27 from unicodedata import normalize
     28 import hashlib
     29 import re
     30 from typing import Tuple, TYPE_CHECKING, Union, Sequence, Optional, Dict, List, NamedTuple
     31 from functools import lru_cache
     32 from abc import ABC, abstractmethod
     33 
     34 from . import bitcoin, ecc, constants, bip32
     35 from .bitcoin import deserialize_privkey, serialize_privkey, BaseDecodeError
     36 from .transaction import Transaction, PartialTransaction, PartialTxInput, PartialTxOutput, TxInput
     37 from .bip32 import (convert_bip32_path_to_list_of_uint32, BIP32_PRIME,
     38                     is_xpub, is_xprv, BIP32Node, normalize_bip32_derivation,
     39                     convert_bip32_intpath_to_strpath, is_xkey_consistent_with_key_origin_info)
     40 from .ecc import string_to_number
     41 from .crypto import (pw_decode, pw_encode, sha256, sha256d, PW_HASH_VERSION_LATEST,
     42                      SUPPORTED_PW_HASH_VERSIONS, UnsupportedPasswordHashVersion, hash_160)
     43 from .util import (InvalidPassword, WalletFileException,
     44                    BitcoinException, bh2u, bfh, inv_dict, is_hex_str)
     45 from .mnemonic import Mnemonic, Wordlist, seed_type, is_seed
     46 from .plugin import run_hook
     47 from .logging import Logger
     48 
     49 if TYPE_CHECKING:
     50     from .gui.qt.util import TaskThread
     51     from .plugins.hw_wallet import HW_PluginBase, HardwareClientBase, HardwareHandlerBase
     52     from .wallet_db import WalletDB
     53 
     54 
     55 class CannotDerivePubkey(Exception): pass
     56 
     57 
     58 class KeyStore(Logger, ABC):
     59     type: str
     60 
     61     def __init__(self):
     62         Logger.__init__(self)
     63         self.is_requesting_to_be_rewritten_to_wallet_file = False  # type: bool
     64 
     65     def has_seed(self) -> bool:
     66         return False
     67 
     68     def is_watching_only(self) -> bool:
     69         return False
     70 
     71     def can_import(self) -> bool:
     72         return False
     73 
     74     def get_type_text(self) -> str:
     75         return f'{self.type}'
     76 
     77     @abstractmethod
     78     def may_have_password(self):
     79         """Returns whether the keystore can be encrypted with a password."""
     80         pass
     81 
     82     def _get_tx_derivations(self, tx: 'PartialTransaction') -> Dict[str, Union[Sequence[int], str]]:
     83         keypairs = {}
     84         for txin in tx.inputs():
     85             keypairs.update(self._get_txin_derivations(txin))
     86         return keypairs
     87 
     88     def _get_txin_derivations(self, txin: 'PartialTxInput') -> Dict[str, Union[Sequence[int], str]]:
     89         if txin.is_complete():
     90             return {}
     91         keypairs = {}
     92         for pubkey in txin.pubkeys:
     93             if pubkey in txin.part_sigs:
     94                 # this pubkey already signed
     95                 continue
     96             derivation = self.get_pubkey_derivation(pubkey, txin)
     97             if not derivation:
     98                 continue
     99             keypairs[pubkey.hex()] = derivation
    100         return keypairs
    101 
    102     def can_sign(self, tx: 'Transaction', *, ignore_watching_only=False) -> bool:
    103         """Returns whether this keystore could sign *something* in this tx."""
    104         if not ignore_watching_only and self.is_watching_only():
    105             return False
    106         if not isinstance(tx, PartialTransaction):
    107             return False
    108         return bool(self._get_tx_derivations(tx))
    109 
    110     def can_sign_txin(self, txin: 'TxInput', *, ignore_watching_only=False) -> bool:
    111         """Returns whether this keystore could sign this txin."""
    112         if not ignore_watching_only and self.is_watching_only():
    113             return False
    114         if not isinstance(txin, PartialTxInput):
    115             return False
    116         return bool(self._get_txin_derivations(txin))
    117 
    118     def ready_to_sign(self) -> bool:
    119         return not self.is_watching_only()
    120 
    121     @abstractmethod
    122     def dump(self) -> dict:
    123         pass
    124 
    125     @abstractmethod
    126     def is_deterministic(self) -> bool:
    127         pass
    128 
    129     @abstractmethod
    130     def sign_message(self, sequence: 'AddressIndexGeneric', message, password) -> bytes:
    131         pass
    132 
    133     @abstractmethod
    134     def decrypt_message(self, sequence: 'AddressIndexGeneric', message, password) -> bytes:
    135         pass
    136 
    137     @abstractmethod
    138     def sign_transaction(self, tx: 'PartialTransaction', password) -> None:
    139         pass
    140 
    141     @abstractmethod
    142     def get_pubkey_derivation(self, pubkey: bytes,
    143                               txinout: Union['PartialTxInput', 'PartialTxOutput'],
    144                               *, only_der_suffix=True) \
    145             -> Union[Sequence[int], str, None]:
    146         """Returns either a derivation int-list if the pubkey can be HD derived from this keystore,
    147         the pubkey itself (hex) if the pubkey belongs to the keystore but not HD derived,
    148         or None if the pubkey is unrelated.
    149         """
    150         pass
    151 
    152     def find_my_pubkey_in_txinout(
    153             self, txinout: Union['PartialTxInput', 'PartialTxOutput'],
    154             *, only_der_suffix: bool = False
    155     ) -> Tuple[Optional[bytes], Optional[List[int]]]:
    156         # note: we assume that this cosigner only has one pubkey in this txin/txout
    157         for pubkey in txinout.bip32_paths:
    158             path = self.get_pubkey_derivation(pubkey, txinout, only_der_suffix=only_der_suffix)
    159             if path and not isinstance(path, (str, bytes)):
    160                 return pubkey, list(path)
    161         return None, None
    162 
    163 
    164 class Software_KeyStore(KeyStore):
    165 
    166     def __init__(self, d):
    167         KeyStore.__init__(self)
    168         self.pw_hash_version = d.get('pw_hash_version', 1)
    169         if self.pw_hash_version not in SUPPORTED_PW_HASH_VERSIONS:
    170             raise UnsupportedPasswordHashVersion(self.pw_hash_version)
    171 
    172     def may_have_password(self):
    173         return not self.is_watching_only()
    174 
    175     def sign_message(self, sequence, message, password) -> bytes:
    176         privkey, compressed = self.get_private_key(sequence, password)
    177         key = ecc.ECPrivkey(privkey)
    178         return key.sign_message(message, compressed)
    179 
    180     def decrypt_message(self, sequence, message, password) -> bytes:
    181         privkey, compressed = self.get_private_key(sequence, password)
    182         ec = ecc.ECPrivkey(privkey)
    183         decrypted = ec.decrypt_message(message)
    184         return decrypted
    185 
    186     def sign_transaction(self, tx, password):
    187         if self.is_watching_only():
    188             return
    189         # Raise if password is not correct.
    190         self.check_password(password)
    191         # Add private keys
    192         keypairs = self._get_tx_derivations(tx)
    193         for k, v in keypairs.items():
    194             keypairs[k] = self.get_private_key(v, password)
    195         # Sign
    196         if keypairs:
    197             tx.sign(keypairs)
    198 
    199     @abstractmethod
    200     def update_password(self, old_password, new_password):
    201         pass
    202 
    203     @abstractmethod
    204     def check_password(self, password):
    205         pass
    206 
    207     @abstractmethod
    208     def get_private_key(self, sequence: 'AddressIndexGeneric', password) -> Tuple[bytes, bool]:
    209         """Returns (privkey, is_compressed)"""
    210         pass
    211 
    212 
    213 class Imported_KeyStore(Software_KeyStore):
    214     # keystore for imported private keys
    215 
    216     type = 'imported'
    217 
    218     def __init__(self, d):
    219         Software_KeyStore.__init__(self, d)
    220         self.keypairs = d.get('keypairs', {})  # type: Dict[str, str]
    221 
    222     def is_deterministic(self):
    223         return False
    224 
    225     def dump(self):
    226         return {
    227             'type': self.type,
    228             'keypairs': self.keypairs,
    229             'pw_hash_version': self.pw_hash_version,
    230         }
    231 
    232     def can_import(self):
    233         return True
    234 
    235     def check_password(self, password):
    236         pubkey = list(self.keypairs.keys())[0]
    237         self.get_private_key(pubkey, password)
    238 
    239     def import_privkey(self, sec, password):
    240         txin_type, privkey, compressed = deserialize_privkey(sec)
    241         pubkey = ecc.ECPrivkey(privkey).get_public_key_hex(compressed=compressed)
    242         # re-serialize the key so the internal storage format is consistent
    243         serialized_privkey = serialize_privkey(
    244             privkey, compressed, txin_type, internal_use=True)
    245         # NOTE: if the same pubkey is reused for multiple addresses (script types),
    246         # there will only be one pubkey-privkey pair for it in self.keypairs,
    247         # and the privkey will encode a txin_type but that txin_type cannot be trusted.
    248         # Removing keys complicates this further.
    249         self.keypairs[pubkey] = pw_encode(serialized_privkey, password, version=self.pw_hash_version)
    250         return txin_type, pubkey
    251 
    252     def delete_imported_key(self, key):
    253         self.keypairs.pop(key)
    254 
    255     def get_private_key(self, pubkey: str, password):
    256         sec = pw_decode(self.keypairs[pubkey], password, version=self.pw_hash_version)
    257         try:
    258             txin_type, privkey, compressed = deserialize_privkey(sec)
    259         except BaseDecodeError as e:
    260             raise InvalidPassword() from e
    261         if pubkey != ecc.ECPrivkey(privkey).get_public_key_hex(compressed=compressed):
    262             raise InvalidPassword()
    263         return privkey, compressed
    264 
    265     def get_pubkey_derivation(self, pubkey, txin, *, only_der_suffix=True):
    266         if pubkey.hex() in self.keypairs:
    267             return pubkey.hex()
    268         return None
    269 
    270     def update_password(self, old_password, new_password):
    271         self.check_password(old_password)
    272         if new_password == '':
    273             new_password = None
    274         for k, v in self.keypairs.items():
    275             b = pw_decode(v, old_password, version=self.pw_hash_version)
    276             c = pw_encode(b, new_password, version=PW_HASH_VERSION_LATEST)
    277             self.keypairs[k] = c
    278         self.pw_hash_version = PW_HASH_VERSION_LATEST
    279 
    280 
    281 class Deterministic_KeyStore(Software_KeyStore):
    282 
    283     def __init__(self, d):
    284         Software_KeyStore.__init__(self, d)
    285         self.seed = d.get('seed', '')
    286         self.passphrase = d.get('passphrase', '')
    287 
    288     def is_deterministic(self):
    289         return True
    290 
    291     def dump(self):
    292         d = {
    293             'type': self.type,
    294             'pw_hash_version': self.pw_hash_version,
    295         }
    296         if self.seed:
    297             d['seed'] = self.seed
    298         if self.passphrase:
    299             d['passphrase'] = self.passphrase
    300         return d
    301 
    302     def has_seed(self):
    303         return bool(self.seed)
    304 
    305     def is_watching_only(self):
    306         return not self.has_seed()
    307 
    308     @abstractmethod
    309     def format_seed(self, seed: str) -> str:
    310         pass
    311 
    312     def add_seed(self, seed):
    313         if self.seed:
    314             raise Exception("a seed exists")
    315         self.seed = self.format_seed(seed)
    316 
    317     def get_seed(self, password):
    318         if not self.has_seed():
    319             raise Exception("This wallet has no seed words")
    320         return pw_decode(self.seed, password, version=self.pw_hash_version)
    321 
    322     def get_passphrase(self, password):
    323         if self.passphrase:
    324             return pw_decode(self.passphrase, password, version=self.pw_hash_version)
    325         else:
    326             return ''
    327 
    328 
    329 class MasterPublicKeyMixin(ABC):
    330 
    331     @abstractmethod
    332     def get_master_public_key(self) -> str:
    333         pass
    334 
    335     @abstractmethod
    336     def get_derivation_prefix(self) -> Optional[str]:
    337         """Returns to bip32 path from some root node to self.xpub
    338         Note that the return value might be None; if it is unknown.
    339         """
    340         pass
    341 
    342     @abstractmethod
    343     def get_root_fingerprint(self) -> Optional[str]:
    344         """Returns the bip32 fingerprint of the top level node.
    345         This top level node is the node at the beginning of the derivation prefix,
    346         i.e. applying the derivation prefix to it will result self.xpub
    347         Note that the return value might be None; if it is unknown.
    348         """
    349         pass
    350 
    351     @abstractmethod
    352     def get_fp_and_derivation_to_be_used_in_partial_tx(
    353             self,
    354             der_suffix: Sequence[int],
    355             *,
    356             only_der_suffix: bool,
    357     ) -> Tuple[bytes, Sequence[int]]:
    358         """Returns fingerprint and derivation path corresponding to a derivation suffix.
    359         The fingerprint is either the root fp or the intermediate fp, depending on what is available
    360         and 'only_der_suffix', and the derivation path is adjusted accordingly.
    361         """
    362         pass
    363 
    364     @abstractmethod
    365     def derive_pubkey(self, for_change: int, n: int) -> bytes:
    366         """Returns pubkey at given path.
    367         May raise CannotDerivePubkey.
    368         """
    369         pass
    370 
    371     def get_pubkey_derivation(
    372             self,
    373             pubkey: bytes,
    374             txinout: Union['PartialTxInput', 'PartialTxOutput'],
    375             *,
    376             only_der_suffix=True,
    377     ) -> Union[Sequence[int], str, None]:
    378         EXPECTED_DER_SUFFIX_LEN = 2
    379         def test_der_suffix_against_pubkey(der_suffix: Sequence[int], pubkey: bytes) -> bool:
    380             if len(der_suffix) != EXPECTED_DER_SUFFIX_LEN:
    381                 return False
    382             try:
    383                 if pubkey != self.derive_pubkey(*der_suffix):
    384                     return False
    385             except CannotDerivePubkey:
    386                 return False
    387             return True
    388 
    389         if pubkey not in txinout.bip32_paths:
    390             return None
    391         fp_found, path_found = txinout.bip32_paths[pubkey]
    392         der_suffix = None
    393         full_path = None
    394         # 1. try fp against our root
    395         ks_root_fingerprint_hex = self.get_root_fingerprint()
    396         ks_der_prefix_str = self.get_derivation_prefix()
    397         ks_der_prefix = convert_bip32_path_to_list_of_uint32(ks_der_prefix_str) if ks_der_prefix_str else None
    398         if (ks_root_fingerprint_hex is not None and ks_der_prefix is not None and
    399                 fp_found.hex() == ks_root_fingerprint_hex):
    400             if path_found[:len(ks_der_prefix)] == ks_der_prefix:
    401                 der_suffix = path_found[len(ks_der_prefix):]
    402                 if not test_der_suffix_against_pubkey(der_suffix, pubkey):
    403                     der_suffix = None
    404         # 2. try fp against our intermediate fingerprint
    405         if (der_suffix is None and isinstance(self, Xpub) and
    406                 fp_found == self.get_bip32_node_for_xpub().calc_fingerprint_of_this_node()):
    407             der_suffix = path_found
    408             if not test_der_suffix_against_pubkey(der_suffix, pubkey):
    409                 der_suffix = None
    410         # 3. hack/bruteforce: ignore fp and check pubkey anyway
    411         #    This is only to resolve the following scenario/problem:
    412         #    problem: if we don't know our root fp, but tx contains root fp and full path,
    413         #             we will miss the pubkey (false negative match). Though it might still work
    414         #             within gap limit due to tx.add_info_from_wallet overwriting the fields.
    415         #             Example: keystore has intermediate xprv without root fp; tx contains root fp and full path.
    416         if der_suffix is None:
    417             der_suffix = path_found[-EXPECTED_DER_SUFFIX_LEN:]
    418             if not test_der_suffix_against_pubkey(der_suffix, pubkey):
    419                 der_suffix = None
    420         # if all attempts/methods failed, we give up now:
    421         if der_suffix is None:
    422             return None
    423         if ks_der_prefix is not None:
    424             full_path = ks_der_prefix + list(der_suffix)
    425         return der_suffix if only_der_suffix else full_path
    426 
    427 
    428 class Xpub(MasterPublicKeyMixin):
    429 
    430     def __init__(self, *, derivation_prefix: str = None, root_fingerprint: str = None):
    431         self.xpub = None
    432         self.xpub_receive = None
    433         self.xpub_change = None
    434         self._xpub_bip32_node = None  # type: Optional[BIP32Node]
    435 
    436         # "key origin" info (subclass should persist these):
    437         self._derivation_prefix = derivation_prefix  # type: Optional[str]
    438         self._root_fingerprint = root_fingerprint  # type: Optional[str]
    439 
    440     def get_master_public_key(self):
    441         return self.xpub
    442 
    443     def get_bip32_node_for_xpub(self) -> Optional[BIP32Node]:
    444         if self._xpub_bip32_node is None:
    445             if self.xpub is None:
    446                 return None
    447             self._xpub_bip32_node = BIP32Node.from_xkey(self.xpub)
    448         return self._xpub_bip32_node
    449 
    450     def get_derivation_prefix(self) -> Optional[str]:
    451         return self._derivation_prefix
    452 
    453     def get_root_fingerprint(self) -> Optional[str]:
    454         return self._root_fingerprint
    455 
    456     def get_fp_and_derivation_to_be_used_in_partial_tx(
    457             self,
    458             der_suffix: Sequence[int],
    459             *,
    460             only_der_suffix: bool,
    461     ) -> Tuple[bytes, Sequence[int]]:
    462         fingerprint_hex = self.get_root_fingerprint()
    463         der_prefix_str = self.get_derivation_prefix()
    464         if not only_der_suffix and fingerprint_hex is not None and der_prefix_str is not None:
    465             # use root fp, and true full path
    466             fingerprint_bytes = bfh(fingerprint_hex)
    467             der_prefix_ints = convert_bip32_path_to_list_of_uint32(der_prefix_str)
    468         else:
    469             # use intermediate fp, and claim der suffix is the full path
    470             fingerprint_bytes = self.get_bip32_node_for_xpub().calc_fingerprint_of_this_node()
    471             der_prefix_ints = convert_bip32_path_to_list_of_uint32('m')
    472         der_full = der_prefix_ints + list(der_suffix)
    473         return fingerprint_bytes, der_full
    474 
    475     def get_xpub_to_be_used_in_partial_tx(self, *, only_der_suffix: bool) -> str:
    476         assert self.xpub
    477         fp_bytes, der_full = self.get_fp_and_derivation_to_be_used_in_partial_tx(der_suffix=[],
    478                                                                                  only_der_suffix=only_der_suffix)
    479         bip32node = self.get_bip32_node_for_xpub()
    480         depth = len(der_full)
    481         child_number_int = der_full[-1] if len(der_full) >= 1 else 0
    482         child_number_bytes = child_number_int.to_bytes(length=4, byteorder="big")
    483         fingerprint = bytes(4) if depth == 0 else bip32node.fingerprint
    484         bip32node = bip32node._replace(depth=depth,
    485                                        fingerprint=fingerprint,
    486                                        child_number=child_number_bytes)
    487         return bip32node.to_xpub()
    488 
    489     def add_key_origin_from_root_node(self, *, derivation_prefix: str, root_node: BIP32Node):
    490         assert self.xpub
    491         # try to derive ourselves from what we were given
    492         child_node1 = root_node.subkey_at_private_derivation(derivation_prefix)
    493         child_pubkey_bytes1 = child_node1.eckey.get_public_key_bytes(compressed=True)
    494         child_node2 = self.get_bip32_node_for_xpub()
    495         child_pubkey_bytes2 = child_node2.eckey.get_public_key_bytes(compressed=True)
    496         if child_pubkey_bytes1 != child_pubkey_bytes2:
    497             raise Exception("(xpub, derivation_prefix, root_node) inconsistency")
    498         self.add_key_origin(derivation_prefix=derivation_prefix,
    499                             root_fingerprint=root_node.calc_fingerprint_of_this_node().hex().lower())
    500 
    501     def add_key_origin(self, *, derivation_prefix: str = None, root_fingerprint: str = None) -> None:
    502         assert self.xpub
    503         if not (root_fingerprint is None or (is_hex_str(root_fingerprint) and len(root_fingerprint) == 8)):
    504             raise Exception("root fp must be 8 hex characters")
    505         derivation_prefix = normalize_bip32_derivation(derivation_prefix)
    506         if not is_xkey_consistent_with_key_origin_info(self.xpub,
    507                                                        derivation_prefix=derivation_prefix,
    508                                                        root_fingerprint=root_fingerprint):
    509             raise Exception("xpub inconsistent with provided key origin info")
    510         if root_fingerprint is not None:
    511             self._root_fingerprint = root_fingerprint
    512         if derivation_prefix is not None:
    513             self._derivation_prefix = derivation_prefix
    514         self.is_requesting_to_be_rewritten_to_wallet_file = True
    515 
    516     @lru_cache(maxsize=None)
    517     def derive_pubkey(self, for_change: int, n: int) -> bytes:
    518         for_change = int(for_change)
    519         if for_change not in (0, 1):
    520             raise CannotDerivePubkey("forbidden path")
    521         xpub = self.xpub_change if for_change else self.xpub_receive
    522         if xpub is None:
    523             rootnode = self.get_bip32_node_for_xpub()
    524             xpub = rootnode.subkey_at_public_derivation((for_change,)).to_xpub()
    525             if for_change:
    526                 self.xpub_change = xpub
    527             else:
    528                 self.xpub_receive = xpub
    529         return self.get_pubkey_from_xpub(xpub, (n,))
    530 
    531     @classmethod
    532     def get_pubkey_from_xpub(self, xpub: str, sequence) -> bytes:
    533         node = BIP32Node.from_xkey(xpub).subkey_at_public_derivation(sequence)
    534         return node.eckey.get_public_key_bytes(compressed=True)
    535 
    536 
    537 class BIP32_KeyStore(Xpub, Deterministic_KeyStore):
    538 
    539     type = 'bip32'
    540 
    541     def __init__(self, d):
    542         Xpub.__init__(self, derivation_prefix=d.get('derivation'), root_fingerprint=d.get('root_fingerprint'))
    543         Deterministic_KeyStore.__init__(self, d)
    544         self.xpub = d.get('xpub')
    545         self.xprv = d.get('xprv')
    546 
    547     def format_seed(self, seed):
    548         return ' '.join(seed.split())
    549 
    550     def dump(self):
    551         d = Deterministic_KeyStore.dump(self)
    552         d['xpub'] = self.xpub
    553         d['xprv'] = self.xprv
    554         d['derivation'] = self.get_derivation_prefix()
    555         d['root_fingerprint'] = self.get_root_fingerprint()
    556         return d
    557 
    558     def get_master_private_key(self, password):
    559         return pw_decode(self.xprv, password, version=self.pw_hash_version)
    560 
    561     def check_password(self, password):
    562         xprv = pw_decode(self.xprv, password, version=self.pw_hash_version)
    563         try:
    564             bip32node = BIP32Node.from_xkey(xprv)
    565         except BaseDecodeError as e:
    566             raise InvalidPassword() from e
    567         if bip32node.chaincode != self.get_bip32_node_for_xpub().chaincode:
    568             raise InvalidPassword()
    569 
    570     def update_password(self, old_password, new_password):
    571         self.check_password(old_password)
    572         if new_password == '':
    573             new_password = None
    574         if self.has_seed():
    575             decoded = self.get_seed(old_password)
    576             self.seed = pw_encode(decoded, new_password, version=PW_HASH_VERSION_LATEST)
    577         if self.passphrase:
    578             decoded = self.get_passphrase(old_password)
    579             self.passphrase = pw_encode(decoded, new_password, version=PW_HASH_VERSION_LATEST)
    580         if self.xprv is not None:
    581             b = pw_decode(self.xprv, old_password, version=self.pw_hash_version)
    582             self.xprv = pw_encode(b, new_password, version=PW_HASH_VERSION_LATEST)
    583         self.pw_hash_version = PW_HASH_VERSION_LATEST
    584 
    585     def is_watching_only(self):
    586         return self.xprv is None
    587 
    588     def add_xpub(self, xpub):
    589         assert is_xpub(xpub)
    590         self.xpub = xpub
    591         root_fingerprint, derivation_prefix = bip32.root_fp_and_der_prefix_from_xkey(xpub)
    592         self.add_key_origin(derivation_prefix=derivation_prefix, root_fingerprint=root_fingerprint)
    593 
    594     def add_xprv(self, xprv):
    595         assert is_xprv(xprv)
    596         self.xprv = xprv
    597         self.add_xpub(bip32.xpub_from_xprv(xprv))
    598 
    599     def add_xprv_from_seed(self, bip32_seed, xtype, derivation):
    600         rootnode = BIP32Node.from_rootseed(bip32_seed, xtype=xtype)
    601         node = rootnode.subkey_at_private_derivation(derivation)
    602         self.add_xprv(node.to_xprv())
    603         self.add_key_origin_from_root_node(derivation_prefix=derivation, root_node=rootnode)
    604 
    605     def get_private_key(self, sequence: Sequence[int], password):
    606         xprv = self.get_master_private_key(password)
    607         node = BIP32Node.from_xkey(xprv).subkey_at_private_derivation(sequence)
    608         pk = node.eckey.get_secret_bytes()
    609         return pk, True
    610 
    611     def get_keypair(self, sequence, password):
    612         k, _ = self.get_private_key(sequence, password)
    613         cK = ecc.ECPrivkey(k).get_public_key_bytes()
    614         return cK, k
    615 
    616 
    617 class Old_KeyStore(MasterPublicKeyMixin, Deterministic_KeyStore):
    618 
    619     type = 'old'
    620 
    621     def __init__(self, d):
    622         Deterministic_KeyStore.__init__(self, d)
    623         self.mpk = d.get('mpk')
    624         self._root_fingerprint = None
    625 
    626     def get_hex_seed(self, password):
    627         return pw_decode(self.seed, password, version=self.pw_hash_version).encode('utf8')
    628 
    629     def dump(self):
    630         d = Deterministic_KeyStore.dump(self)
    631         d['mpk'] = self.mpk
    632         return d
    633 
    634     def add_seed(self, seedphrase):
    635         Deterministic_KeyStore.add_seed(self, seedphrase)
    636         s = self.get_hex_seed(None)
    637         self.mpk = self.mpk_from_seed(s)
    638 
    639     def add_master_public_key(self, mpk):
    640         self.mpk = mpk
    641 
    642     def format_seed(self, seed):
    643         from . import old_mnemonic, mnemonic
    644         seed = mnemonic.normalize_text(seed)
    645         # see if seed was entered as hex
    646         if seed:
    647             try:
    648                 bfh(seed)
    649                 return str(seed)
    650             except Exception:
    651                 pass
    652         words = seed.split()
    653         seed = old_mnemonic.mn_decode(words)
    654         if not seed:
    655             raise Exception("Invalid seed")
    656         return seed
    657 
    658     def get_seed(self, password):
    659         from . import old_mnemonic
    660         s = self.get_hex_seed(password)
    661         return ' '.join(old_mnemonic.mn_encode(s))
    662 
    663     @classmethod
    664     def mpk_from_seed(klass, seed):
    665         secexp = klass.stretch_key(seed)
    666         privkey = ecc.ECPrivkey.from_secret_scalar(secexp)
    667         return privkey.get_public_key_hex(compressed=False)[2:]
    668 
    669     @classmethod
    670     def stretch_key(self, seed):
    671         x = seed
    672         for i in range(100000):
    673             x = hashlib.sha256(x + seed).digest()
    674         return string_to_number(x)
    675 
    676     @classmethod
    677     def get_sequence(self, mpk, for_change, n):
    678         return string_to_number(sha256d(("%d:%d:"%(n, for_change)).encode('ascii') + bfh(mpk)))
    679 
    680     @classmethod
    681     def get_pubkey_from_mpk(cls, mpk, for_change, n) -> bytes:
    682         z = cls.get_sequence(mpk, for_change, n)
    683         master_public_key = ecc.ECPubkey(bfh('04'+mpk))
    684         public_key = master_public_key + z*ecc.GENERATOR
    685         return public_key.get_public_key_bytes(compressed=False)
    686 
    687     @lru_cache(maxsize=None)
    688     def derive_pubkey(self, for_change, n) -> bytes:
    689         for_change = int(for_change)
    690         if for_change not in (0, 1):
    691             raise CannotDerivePubkey("forbidden path")
    692         return self.get_pubkey_from_mpk(self.mpk, for_change, n)
    693 
    694     def _get_private_key_from_stretched_exponent(self, for_change, n, secexp):
    695         secexp = (secexp + self.get_sequence(self.mpk, for_change, n)) % ecc.CURVE_ORDER
    696         pk = int.to_bytes(secexp, length=32, byteorder='big', signed=False)
    697         return pk
    698 
    699     def get_private_key(self, sequence: Sequence[int], password):
    700         seed = self.get_hex_seed(password)
    701         secexp = self.stretch_key(seed)
    702         self._check_seed(seed, secexp=secexp)
    703         for_change, n = sequence
    704         pk = self._get_private_key_from_stretched_exponent(for_change, n, secexp)
    705         return pk, False
    706 
    707     def _check_seed(self, seed, *, secexp=None):
    708         if secexp is None:
    709             secexp = self.stretch_key(seed)
    710         master_private_key = ecc.ECPrivkey.from_secret_scalar(secexp)
    711         master_public_key = master_private_key.get_public_key_bytes(compressed=False)[1:]
    712         if master_public_key != bfh(self.mpk):
    713             raise InvalidPassword()
    714 
    715     def check_password(self, password):
    716         seed = self.get_hex_seed(password)
    717         self._check_seed(seed)
    718 
    719     def get_master_public_key(self):
    720         return self.mpk
    721 
    722     def get_derivation_prefix(self) -> str:
    723         return 'm'
    724 
    725     def get_root_fingerprint(self) -> str:
    726         if self._root_fingerprint is None:
    727             master_public_key = ecc.ECPubkey(bfh('04'+self.mpk))
    728             xfp = hash_160(master_public_key.get_public_key_bytes(compressed=True))[0:4]
    729             self._root_fingerprint = xfp.hex().lower()
    730         return self._root_fingerprint
    731 
    732     def get_fp_and_derivation_to_be_used_in_partial_tx(
    733             self,
    734             der_suffix: Sequence[int],
    735             *,
    736             only_der_suffix: bool,
    737     ) -> Tuple[bytes, Sequence[int]]:
    738         fingerprint_hex = self.get_root_fingerprint()
    739         der_prefix_str = self.get_derivation_prefix()
    740         fingerprint_bytes = bfh(fingerprint_hex)
    741         der_prefix_ints = convert_bip32_path_to_list_of_uint32(der_prefix_str)
    742         der_full = der_prefix_ints + list(der_suffix)
    743         return fingerprint_bytes, der_full
    744 
    745     def update_password(self, old_password, new_password):
    746         self.check_password(old_password)
    747         if new_password == '':
    748             new_password = None
    749         if self.has_seed():
    750             decoded = pw_decode(self.seed, old_password, version=self.pw_hash_version)
    751             self.seed = pw_encode(decoded, new_password, version=PW_HASH_VERSION_LATEST)
    752         self.pw_hash_version = PW_HASH_VERSION_LATEST
    753 
    754 
    755 class Hardware_KeyStore(Xpub, KeyStore):
    756     hw_type: str
    757     device: str
    758     plugin: 'HW_PluginBase'
    759     thread: Optional['TaskThread'] = None
    760 
    761     type = 'hardware'
    762 
    763     def __init__(self, d):
    764         Xpub.__init__(self, derivation_prefix=d.get('derivation'), root_fingerprint=d.get('root_fingerprint'))
    765         KeyStore.__init__(self)
    766         # Errors and other user interaction is done through the wallet's
    767         # handler.  The handler is per-window and preserved across
    768         # device reconnects
    769         self.xpub = d.get('xpub')
    770         self.label = d.get('label')
    771         self.soft_device_id = d.get('soft_device_id')  # type: Optional[str]
    772         self.handler = None  # type: Optional[HardwareHandlerBase]
    773         run_hook('init_keystore', self)
    774 
    775     def set_label(self, label):
    776         self.label = label
    777 
    778     def may_have_password(self):
    779         return False
    780 
    781     def is_deterministic(self):
    782         return True
    783 
    784     def get_type_text(self) -> str:
    785         return f'hw[{self.hw_type}]'
    786 
    787     def dump(self):
    788         return {
    789             'type': self.type,
    790             'hw_type': self.hw_type,
    791             'xpub': self.xpub,
    792             'derivation': self.get_derivation_prefix(),
    793             'root_fingerprint': self.get_root_fingerprint(),
    794             'label':self.label,
    795             'soft_device_id': self.soft_device_id,
    796         }
    797 
    798     def unpaired(self):
    799         '''A device paired with the wallet was disconnected.  This can be
    800         called in any thread context.'''
    801         self.logger.info("unpaired")
    802 
    803     def paired(self):
    804         '''A device paired with the wallet was (re-)connected.  This can be
    805         called in any thread context.'''
    806         self.logger.info("paired")
    807 
    808     def is_watching_only(self):
    809         '''The wallet is not watching-only; the user will be prompted for
    810         pin and passphrase as appropriate when needed.'''
    811         assert not self.has_seed()
    812         return False
    813 
    814     def get_password_for_storage_encryption(self) -> str:
    815         client = self.plugin.get_client(self)
    816         return client.get_password_for_storage_encryption()
    817 
    818     def has_usable_connection_with_device(self) -> bool:
    819         if not hasattr(self, 'plugin'):
    820             return False
    821         client = self.plugin.get_client(self, force_pair=False)
    822         if client is None:
    823             return False
    824         return client.has_usable_connection_with_device()
    825 
    826     def ready_to_sign(self):
    827         return super().ready_to_sign() and self.has_usable_connection_with_device()
    828 
    829     def opportunistically_fill_in_missing_info_from_device(self, client: 'HardwareClientBase'):
    830         assert client is not None
    831         if self._root_fingerprint is None:
    832             self._root_fingerprint = client.request_root_fingerprint_from_device()
    833             self.is_requesting_to_be_rewritten_to_wallet_file = True
    834         if self.label != client.label():
    835             self.label = client.label()
    836             self.is_requesting_to_be_rewritten_to_wallet_file = True
    837         if self.soft_device_id != client.get_soft_device_id():
    838             self.soft_device_id = client.get_soft_device_id()
    839             self.is_requesting_to_be_rewritten_to_wallet_file = True
    840 
    841 
    842 KeyStoreWithMPK = Union[KeyStore, MasterPublicKeyMixin]  # intersection really...
    843 AddressIndexGeneric = Union[Sequence[int], str]  # can be hex pubkey str
    844 
    845 
    846 def bip39_normalize_passphrase(passphrase):
    847     return normalize('NFKD', passphrase or '')
    848 
    849 def bip39_to_seed(mnemonic, passphrase):
    850     import hashlib, hmac
    851     PBKDF2_ROUNDS = 2048
    852     mnemonic = normalize('NFKD', ' '.join(mnemonic.split()))
    853     passphrase = bip39_normalize_passphrase(passphrase)
    854     return hashlib.pbkdf2_hmac('sha512', mnemonic.encode('utf-8'),
    855         b'mnemonic' + passphrase.encode('utf-8'), iterations = PBKDF2_ROUNDS)
    856 
    857 
    858 def bip39_is_checksum_valid(mnemonic: str) -> Tuple[bool, bool]:
    859     """Test checksum of bip39 mnemonic assuming English wordlist.
    860     Returns tuple (is_checksum_valid, is_wordlist_valid)
    861     """
    862     words = [ normalize('NFKD', word) for word in mnemonic.split() ]
    863     words_len = len(words)
    864     wordlist = Wordlist.from_file("english.txt")
    865     n = len(wordlist)
    866     i = 0
    867     words.reverse()
    868     while words:
    869         w = words.pop()
    870         try:
    871             k = wordlist.index(w)
    872         except ValueError:
    873             return False, False
    874         i = i*n + k
    875     if words_len not in [12, 15, 18, 21, 24]:
    876         return False, True
    877     checksum_length = 11 * words_len // 33  # num bits
    878     entropy_length = 32 * checksum_length  # num bits
    879     entropy = i >> checksum_length
    880     checksum = i % 2**checksum_length
    881     entropy_bytes = int.to_bytes(entropy, length=entropy_length//8, byteorder="big")
    882     hashed = int.from_bytes(sha256(entropy_bytes), byteorder="big")
    883     calculated_checksum = hashed >> (256 - checksum_length)
    884     return checksum == calculated_checksum, True
    885 
    886 
    887 def from_bip39_seed(seed, passphrase, derivation, xtype=None):
    888     k = BIP32_KeyStore({})
    889     bip32_seed = bip39_to_seed(seed, passphrase)
    890     if xtype is None:
    891         xtype = xtype_from_derivation(derivation)
    892     k.add_xprv_from_seed(bip32_seed, xtype, derivation)
    893     return k
    894 
    895 
    896 PURPOSE48_SCRIPT_TYPES = {
    897     'p2wsh-p2sh': 1,  # specifically multisig
    898     'p2wsh': 2,       # specifically multisig
    899 }
    900 PURPOSE48_SCRIPT_TYPES_INV = inv_dict(PURPOSE48_SCRIPT_TYPES)
    901 
    902 
    903 def xtype_from_derivation(derivation: str) -> str:
    904     """Returns the script type to be used for this derivation."""
    905     bip32_indices = convert_bip32_path_to_list_of_uint32(derivation)
    906     if len(bip32_indices) >= 1:
    907         if bip32_indices[0] == 84 + BIP32_PRIME:
    908             return 'p2wpkh'
    909         elif bip32_indices[0] == 49 + BIP32_PRIME:
    910             return 'p2wpkh-p2sh'
    911         elif bip32_indices[0] == 44 + BIP32_PRIME:
    912             return 'standard'
    913         elif bip32_indices[0] == 45 + BIP32_PRIME:
    914             return 'standard'
    915 
    916     if len(bip32_indices) >= 4:
    917         if bip32_indices[0] == 48 + BIP32_PRIME:
    918             # m / purpose' / coin_type' / account' / script_type' / change / address_index
    919             script_type_int = bip32_indices[3] - BIP32_PRIME
    920             script_type = PURPOSE48_SCRIPT_TYPES_INV.get(script_type_int)
    921             if script_type is not None:
    922                 return script_type
    923     return 'standard'
    924 
    925 
    926 hw_keystores = {}
    927 
    928 def register_keystore(hw_type, constructor):
    929     hw_keystores[hw_type] = constructor
    930 
    931 def hardware_keystore(d) -> Hardware_KeyStore:
    932     hw_type = d['hw_type']
    933     if hw_type in hw_keystores:
    934         constructor = hw_keystores[hw_type]
    935         return constructor(d)
    936     raise WalletFileException(f'unknown hardware type: {hw_type}. '
    937                               f'hw_keystores: {list(hw_keystores)}')
    938 
    939 def load_keystore(db: 'WalletDB', name: str) -> KeyStore:
    940     d = db.get(name, {})
    941     t = d.get('type')
    942     if not t:
    943         raise WalletFileException(
    944             'Wallet format requires update.\n'
    945             'Cannot find keystore for name {}'.format(name))
    946     keystore_constructors = {ks.type: ks for ks in [Old_KeyStore, Imported_KeyStore, BIP32_KeyStore]}
    947     keystore_constructors['hardware'] = hardware_keystore
    948     try:
    949         ks_constructor = keystore_constructors[t]
    950     except KeyError:
    951         raise WalletFileException(f'Unknown type {t} for keystore named {name}')
    952     k = ks_constructor(d)
    953     return k
    954 
    955 
    956 def is_old_mpk(mpk: str) -> bool:
    957     try:
    958         int(mpk, 16)  # test if hex string
    959     except:
    960         return False
    961     if len(mpk) != 128:
    962         return False
    963     try:
    964         ecc.ECPubkey(bfh('04' + mpk))
    965     except:
    966         return False
    967     return True
    968 
    969 
    970 def is_address_list(text):
    971     parts = text.split()
    972     return bool(parts) and all(bitcoin.is_address(x) for x in parts)
    973 
    974 
    975 def get_private_keys(text, *, allow_spaces_inside_key=True, raise_on_error=False):
    976     if allow_spaces_inside_key:  # see #1612
    977         parts = text.split('\n')
    978         parts = map(lambda x: ''.join(x.split()), parts)
    979         parts = list(filter(bool, parts))
    980     else:
    981         parts = text.split()
    982     if bool(parts) and all(bitcoin.is_private_key(x, raise_on_error=raise_on_error) for x in parts):
    983         return parts
    984 
    985 
    986 def is_private_key_list(text, *, allow_spaces_inside_key=True, raise_on_error=False):
    987     return bool(get_private_keys(text,
    988                                  allow_spaces_inside_key=allow_spaces_inside_key,
    989                                  raise_on_error=raise_on_error))
    990 
    991 
    992 def is_master_key(x):
    993     return is_old_mpk(x) or is_bip32_key(x)
    994 
    995 
    996 def is_bip32_key(x):
    997     return is_xprv(x) or is_xpub(x)
    998 
    999 
   1000 def bip44_derivation(account_id, bip43_purpose=44):
   1001     coin = constants.net.BIP44_COIN_TYPE
   1002     der = "m/%d'/%d'/%d'" % (bip43_purpose, coin, int(account_id))
   1003     return normalize_bip32_derivation(der)
   1004 
   1005 
   1006 def purpose48_derivation(account_id: int, xtype: str) -> str:
   1007     # m / purpose' / coin_type' / account' / script_type' / change / address_index
   1008     bip43_purpose = 48
   1009     coin = constants.net.BIP44_COIN_TYPE
   1010     account_id = int(account_id)
   1011     script_type_int = PURPOSE48_SCRIPT_TYPES.get(xtype)
   1012     if script_type_int is None:
   1013         raise Exception('unknown xtype: {}'.format(xtype))
   1014     der = "m/%d'/%d'/%d'/%d'" % (bip43_purpose, coin, account_id, script_type_int)
   1015     return normalize_bip32_derivation(der)
   1016 
   1017 
   1018 def from_seed(seed, passphrase, is_p2sh=False):
   1019     t = seed_type(seed)
   1020     if t == 'old':
   1021         keystore = Old_KeyStore({})
   1022         keystore.add_seed(seed)
   1023     elif t in ['standard', 'segwit']:
   1024         keystore = BIP32_KeyStore({})
   1025         keystore.add_seed(seed)
   1026         keystore.passphrase = passphrase
   1027         bip32_seed = Mnemonic.mnemonic_to_seed(seed, passphrase)
   1028         if t == 'standard':
   1029             der = "m/"
   1030             xtype = 'standard'
   1031         else:
   1032             der = "m/1'/" if is_p2sh else "m/0'/"
   1033             xtype = 'p2wsh' if is_p2sh else 'p2wpkh'
   1034         keystore.add_xprv_from_seed(bip32_seed, xtype, der)
   1035     else:
   1036         raise BitcoinException('Unexpected seed type {}'.format(repr(t)))
   1037     return keystore
   1038 
   1039 def from_private_key_list(text):
   1040     keystore = Imported_KeyStore({})
   1041     for x in get_private_keys(text):
   1042         keystore.import_privkey(x, None)
   1043     return keystore
   1044 
   1045 def from_old_mpk(mpk):
   1046     keystore = Old_KeyStore({})
   1047     keystore.add_master_public_key(mpk)
   1048     return keystore
   1049 
   1050 def from_xpub(xpub):
   1051     k = BIP32_KeyStore({})
   1052     k.add_xpub(xpub)
   1053     return k
   1054 
   1055 def from_xprv(xprv):
   1056     k = BIP32_KeyStore({})
   1057     k.add_xprv(xprv)
   1058     return k
   1059 
   1060 def from_master_key(text):
   1061     if is_xprv(text):
   1062         k = from_xprv(text)
   1063     elif is_old_mpk(text):
   1064         k = from_old_mpk(text)
   1065     elif is_xpub(text):
   1066         k = from_xpub(text)
   1067     else:
   1068         raise BitcoinException('Invalid master key')
   1069     return k