electrum

Electrum Bitcoin wallet
git clone https://git.parazyd.org/electrum
Log | Files | Refs | Submodules

trezor.py (20338B)


      1 import traceback
      2 import sys
      3 from typing import NamedTuple, Any, Optional, Dict, Union, List, Tuple, TYPE_CHECKING
      4 
      5 from electrum.util import bfh, bh2u, versiontuple, UserCancelled, UserFacingException
      6 from electrum.bip32 import BIP32Node, convert_bip32_path_to_list_of_uint32 as parse_path
      7 from electrum import constants
      8 from electrum.i18n import _
      9 from electrum.plugin import Device, runs_in_hwd_thread
     10 from electrum.transaction import Transaction, PartialTransaction, PartialTxInput, PartialTxOutput
     11 from electrum.keystore import Hardware_KeyStore
     12 from electrum.base_wizard import ScriptTypeNotSupported, HWD_SETUP_NEW_WALLET
     13 from electrum.logging import get_logger
     14 
     15 from ..hw_wallet import HW_PluginBase
     16 from ..hw_wallet.plugin import (is_any_tx_output_on_change_branch, trezor_validate_op_return_output_and_get_data,
     17                                 LibraryFoundButUnusable, OutdatedHwFirmwareException,
     18                                 get_xpubs_and_der_suffixes_from_txinout)
     19 
     20 _logger = get_logger(__name__)
     21 
     22 
     23 try:
     24     import trezorlib
     25     import trezorlib.transport
     26     from trezorlib.transport.bridge import BridgeTransport, call_bridge
     27 
     28     from .clientbase import TrezorClientBase
     29 
     30     from trezorlib.messages import (
     31         Capability, BackupType, RecoveryDeviceType, HDNodeType, HDNodePathType,
     32         InputScriptType, OutputScriptType, MultisigRedeemScriptType,
     33         TxInputType, TxOutputType, TxOutputBinType, TransactionType, SignTx)
     34 
     35     from trezorlib.client import PASSPHRASE_ON_DEVICE
     36 
     37     TREZORLIB = True
     38 except Exception as e:
     39     if not (isinstance(e, ModuleNotFoundError) and e.name == 'trezorlib'):
     40         _logger.exception('error importing trezor plugin deps')
     41     TREZORLIB = False
     42 
     43     class _EnumMissing:
     44         def __init__(self):
     45             self.counter = 0
     46             self.values = {}
     47 
     48         def __getattr__(self, key):
     49             if key not in self.values:
     50                 self.values[key] = self.counter
     51                 self.counter += 1
     52             return self.values[key]
     53 
     54     Capability = _EnumMissing()
     55     BackupType = _EnumMissing()
     56     RecoveryDeviceType = _EnumMissing()
     57 
     58     PASSPHRASE_ON_DEVICE = object()
     59 
     60 
     61 # Trezor initialization methods
     62 TIM_NEW, TIM_RECOVER = range(2)
     63 
     64 TREZOR_PRODUCT_KEY = 'Trezor'
     65 
     66 
     67 class TrezorKeyStore(Hardware_KeyStore):
     68     hw_type = 'trezor'
     69     device = TREZOR_PRODUCT_KEY
     70 
     71     plugin: 'TrezorPlugin'
     72 
     73     def get_client(self, force_pair=True):
     74         return self.plugin.get_client(self, force_pair)
     75 
     76     def decrypt_message(self, sequence, message, password):
     77         raise UserFacingException(_('Encryption and decryption are not implemented by {}').format(self.device))
     78 
     79     def sign_message(self, sequence, message, password):
     80         client = self.get_client()
     81         address_path = self.get_derivation_prefix() + "/%d/%d"%sequence
     82         msg_sig = client.sign_message(address_path, message)
     83         return msg_sig.signature
     84 
     85     def sign_transaction(self, tx, password):
     86         if tx.is_complete():
     87             return
     88         # previous transactions used as inputs
     89         prev_tx = {}
     90         for txin in tx.inputs():
     91             tx_hash = txin.prevout.txid.hex()
     92             if txin.utxo is None:
     93                 raise UserFacingException(_('Missing previous tx.'))
     94             prev_tx[tx_hash] = txin.utxo
     95 
     96         self.plugin.sign_transaction(self, tx, prev_tx)
     97 
     98 
     99 class TrezorInitSettings(NamedTuple):
    100     word_count: int
    101     label: str
    102     pin_enabled: bool
    103     passphrase_enabled: bool
    104     recovery_type: Any = None
    105     backup_type: int = BackupType.Bip39
    106     no_backup: bool = False
    107 
    108 
    109 class TrezorPlugin(HW_PluginBase):
    110     # Derived classes provide:
    111     #
    112     #  class-static variables: client_class, firmware_URL, handler_class,
    113     #     libraries_available, libraries_URL, minimum_firmware,
    114     #     wallet_class, types
    115 
    116     firmware_URL = 'https://wallet.trezor.io'
    117     libraries_URL = 'https://pypi.org/project/trezor/'
    118     minimum_firmware = (1, 5, 2)
    119     keystore_class = TrezorKeyStore
    120     minimum_library = (0, 12, 0)
    121     maximum_library = (0, 13)
    122     SUPPORTED_XTYPES = ('standard', 'p2wpkh-p2sh', 'p2wpkh', 'p2wsh-p2sh', 'p2wsh')
    123     DEVICE_IDS = (TREZOR_PRODUCT_KEY,)
    124 
    125     MAX_LABEL_LEN = 32
    126 
    127     def __init__(self, parent, config, name):
    128         super().__init__(parent, config, name)
    129 
    130         self.libraries_available = self.check_libraries_available()
    131         if not self.libraries_available:
    132             return
    133         self.device_manager().register_enumerate_func(self.enumerate)
    134         self._is_bridge_available = None
    135 
    136     def get_library_version(self):
    137         import trezorlib
    138         try:
    139             version = trezorlib.__version__
    140         except Exception:
    141             version = 'unknown'
    142         if TREZORLIB:
    143             return version
    144         else:
    145             raise LibraryFoundButUnusable(library_version=version)
    146 
    147     @runs_in_hwd_thread
    148     def is_bridge_available(self) -> bool:
    149         # Testing whether the Bridge is available can take several seconds
    150         # (when it is not), as it is slow to timeout, hence we cache it.
    151         if self._is_bridge_available is None:
    152             try:
    153                 call_bridge("enumerate")
    154             except Exception:
    155                 self._is_bridge_available = False
    156                 # never again try with Bridge due to slow timeout
    157                 BridgeTransport.ENABLED = False
    158             else:
    159                 self._is_bridge_available = True
    160         return self._is_bridge_available
    161 
    162     @runs_in_hwd_thread
    163     def enumerate(self):
    164         # If there is a bridge, prefer that.
    165         # On Windows, the bridge runs as Admin (and Electrum usually does not),
    166         # so the bridge has better chances of finding devices. see #5420
    167         # This also avoids duplicate entries.
    168         if self.is_bridge_available():
    169             devices = BridgeTransport.enumerate()
    170         else:
    171             devices = trezorlib.transport.enumerate_devices()
    172         return [Device(path=d.get_path(),
    173                        interface_number=-1,
    174                        id_=d.get_path(),
    175                        product_key=TREZOR_PRODUCT_KEY,
    176                        usage_page=0,
    177                        transport_ui_string=d.get_path())
    178                 for d in devices]
    179 
    180     @runs_in_hwd_thread
    181     def create_client(self, device, handler):
    182         try:
    183             self.logger.info(f"connecting to device at {device.path}")
    184             transport = trezorlib.transport.get_transport(device.path)
    185         except BaseException as e:
    186             self.logger.info(f"cannot connect at {device.path} {e}")
    187             return None
    188 
    189         if not transport:
    190             self.logger.info(f"cannot connect at {device.path}")
    191             return
    192 
    193         self.logger.info(f"connected to device at {device.path}")
    194         # note that this call can still raise!
    195         return TrezorClientBase(transport, handler, self)
    196 
    197     @runs_in_hwd_thread
    198     def get_client(self, keystore, force_pair=True, *,
    199                    devices=None, allow_user_interaction=True) -> Optional['TrezorClientBase']:
    200         client = super().get_client(keystore, force_pair,
    201                                     devices=devices,
    202                                     allow_user_interaction=allow_user_interaction)
    203         # returns the client for a given keystore. can use xpub
    204         if client:
    205             client.used()
    206         return client
    207 
    208     def get_coin_name(self):
    209         return "Testnet" if constants.net.TESTNET else "Bitcoin"
    210 
    211     def initialize_device(self, device_id, wizard, handler):
    212         # Initialization method
    213         msg = _("Choose how you want to initialize your {}.").format(self.device, self.device)
    214         choices = [
    215             # Must be short as QT doesn't word-wrap radio button text
    216             (TIM_NEW, _("Let the device generate a completely new seed randomly")),
    217             (TIM_RECOVER, _("Recover from a seed you have previously written down")),
    218         ]
    219         def f(method):
    220             import threading
    221             settings = self.request_trezor_init_settings(wizard, method, device_id)
    222             t = threading.Thread(target=self._initialize_device_safe, args=(settings, method, device_id, wizard, handler))
    223             t.setDaemon(True)
    224             t.start()
    225             exit_code = wizard.loop.exec_()
    226             if exit_code != 0:
    227                 # this method (initialize_device) was called with the expectation
    228                 # of leaving the device in an initialized state when finishing.
    229                 # signal that this is not the case:
    230                 raise UserCancelled()
    231         wizard.choice_dialog(title=_('Initialize Device'), message=msg, choices=choices, run_next=f)
    232 
    233     def _initialize_device_safe(self, settings, method, device_id, wizard, handler):
    234         exit_code = 0
    235         try:
    236             self._initialize_device(settings, method, device_id, wizard, handler)
    237         except UserCancelled:
    238             exit_code = 1
    239         except BaseException as e:
    240             self.logger.exception('')
    241             handler.show_error(repr(e))
    242             exit_code = 1
    243         finally:
    244             wizard.loop.exit(exit_code)
    245 
    246     @runs_in_hwd_thread
    247     def _initialize_device(self, settings: TrezorInitSettings, method, device_id, wizard, handler):
    248         if method == TIM_RECOVER and settings.recovery_type == RecoveryDeviceType.ScrambledWords:
    249             handler.show_error(_(
    250                 "You will be asked to enter 24 words regardless of your "
    251                 "seed's actual length.  If you enter a word incorrectly or "
    252                 "misspell it, you cannot change it or go back - you will need "
    253                 "to start again from the beginning.\n\nSo please enter "
    254                 "the words carefully!"),
    255                 blocking=True)
    256 
    257         devmgr = self.device_manager()
    258         client = devmgr.client_by_id(device_id)
    259         if not client:
    260             raise Exception(_("The device was disconnected."))
    261 
    262         if method == TIM_NEW:
    263             strength_from_word_count = {12: 128, 18: 192, 20: 128, 24: 256, 33: 256}
    264             client.reset_device(
    265                 strength=strength_from_word_count[settings.word_count],
    266                 passphrase_protection=settings.passphrase_enabled,
    267                 pin_protection=settings.pin_enabled,
    268                 label=settings.label,
    269                 backup_type=settings.backup_type,
    270                 no_backup=settings.no_backup)
    271         elif method == TIM_RECOVER:
    272             client.recover_device(
    273                 recovery_type=settings.recovery_type,
    274                 word_count=settings.word_count,
    275                 passphrase_protection=settings.passphrase_enabled,
    276                 pin_protection=settings.pin_enabled,
    277                 label=settings.label)
    278             if settings.recovery_type == RecoveryDeviceType.Matrix:
    279                 handler.close_matrix_dialog()
    280         else:
    281             raise RuntimeError("Unsupported recovery method")
    282 
    283     def _make_node_path(self, xpub, address_n):
    284         bip32node = BIP32Node.from_xkey(xpub)
    285         node = HDNodeType(
    286             depth=bip32node.depth,
    287             fingerprint=int.from_bytes(bip32node.fingerprint, 'big'),
    288             child_num=int.from_bytes(bip32node.child_number, 'big'),
    289             chain_code=bip32node.chaincode,
    290             public_key=bip32node.eckey.get_public_key_bytes(compressed=True),
    291         )
    292         return HDNodePathType(node=node, address_n=address_n)
    293 
    294     def setup_device(self, device_info, wizard, purpose):
    295         device_id = device_info.device.id_
    296         client = self.scan_and_create_client_for_device(device_id=device_id, wizard=wizard)
    297 
    298         if not client.is_uptodate():
    299             msg = (_('Outdated {} firmware for device labelled {}. Please '
    300                      'download the updated firmware from {}')
    301                    .format(self.device, client.label(), self.firmware_URL))
    302             raise OutdatedHwFirmwareException(msg)
    303 
    304         if not device_info.initialized:
    305             self.initialize_device(device_id, wizard, client.handler)
    306         is_creating_wallet = purpose == HWD_SETUP_NEW_WALLET
    307         wizard.run_task_without_blocking_gui(
    308             task=lambda: client.get_xpub('m', 'standard', creating=is_creating_wallet))
    309         client.used()
    310         return client
    311 
    312     def get_xpub(self, device_id, derivation, xtype, wizard):
    313         if xtype not in self.SUPPORTED_XTYPES:
    314             raise ScriptTypeNotSupported(_('This type of script is not supported with {}.').format(self.device))
    315         client = self.scan_and_create_client_for_device(device_id=device_id, wizard=wizard)
    316         xpub = client.get_xpub(derivation, xtype)
    317         client.used()
    318         return xpub
    319 
    320     def get_trezor_input_script_type(self, electrum_txin_type: str):
    321         if electrum_txin_type in ('p2wpkh', 'p2wsh'):
    322             return InputScriptType.SPENDWITNESS
    323         if electrum_txin_type in ('p2wpkh-p2sh', 'p2wsh-p2sh'):
    324             return InputScriptType.SPENDP2SHWITNESS
    325         if electrum_txin_type in ('p2pkh', ):
    326             return InputScriptType.SPENDADDRESS
    327         if electrum_txin_type in ('p2sh', ):
    328             return InputScriptType.SPENDMULTISIG
    329         raise ValueError('unexpected txin type: {}'.format(electrum_txin_type))
    330 
    331     def get_trezor_output_script_type(self, electrum_txin_type: str):
    332         if electrum_txin_type in ('p2wpkh', 'p2wsh'):
    333             return OutputScriptType.PAYTOWITNESS
    334         if electrum_txin_type in ('p2wpkh-p2sh', 'p2wsh-p2sh'):
    335             return OutputScriptType.PAYTOP2SHWITNESS
    336         if electrum_txin_type in ('p2pkh', ):
    337             return OutputScriptType.PAYTOADDRESS
    338         if electrum_txin_type in ('p2sh', ):
    339             return OutputScriptType.PAYTOMULTISIG
    340         raise ValueError('unexpected txin type: {}'.format(electrum_txin_type))
    341 
    342     @runs_in_hwd_thread
    343     def sign_transaction(self, keystore, tx: PartialTransaction, prev_tx):
    344         prev_tx = { bfh(txhash): self.electrum_tx_to_txtype(tx) for txhash, tx in prev_tx.items() }
    345         client = self.get_client(keystore)
    346         inputs = self.tx_inputs(tx, for_sig=True, keystore=keystore)
    347         outputs = self.tx_outputs(tx, keystore=keystore)
    348         details = SignTx(lock_time=tx.locktime, version=tx.version)
    349         signatures, _ = client.sign_tx(self.get_coin_name(), inputs, outputs, details=details, prev_txes=prev_tx)
    350         signatures = [(bh2u(x) + '01') for x in signatures]
    351         tx.update_signatures(signatures)
    352 
    353     @runs_in_hwd_thread
    354     def show_address(self, wallet, address, keystore=None):
    355         if keystore is None:
    356             keystore = wallet.get_keystore()
    357         if not self.show_address_helper(wallet, address, keystore):
    358             return
    359         deriv_suffix = wallet.get_address_index(address)
    360         derivation = keystore.get_derivation_prefix()
    361         address_path = "%s/%d/%d"%(derivation, *deriv_suffix)
    362         script_type = self.get_trezor_input_script_type(wallet.txin_type)
    363 
    364         # prepare multisig, if available:
    365         xpubs = wallet.get_master_public_keys()
    366         if len(xpubs) > 1:
    367             pubkeys = wallet.get_public_keys(address)
    368             # sort xpubs using the order of pubkeys
    369             sorted_pairs = sorted(zip(pubkeys, xpubs))
    370             multisig = self._make_multisig(
    371                 wallet.m,
    372                 [(xpub, deriv_suffix) for pubkey, xpub in sorted_pairs])
    373         else:
    374             multisig = None
    375 
    376         client = self.get_client(keystore)
    377         client.show_address(address_path, script_type, multisig)
    378 
    379     def tx_inputs(self, tx: Transaction, *, for_sig=False, keystore: 'TrezorKeyStore' = None):
    380         inputs = []
    381         for txin in tx.inputs():
    382             txinputtype = TxInputType()
    383             if txin.is_coinbase_input():
    384                 prev_hash = b"\x00"*32
    385                 prev_index = 0xffffffff  # signed int -1
    386             else:
    387                 if for_sig:
    388                     assert isinstance(tx, PartialTransaction)
    389                     assert isinstance(txin, PartialTxInput)
    390                     assert keystore
    391                     if len(txin.pubkeys) > 1:
    392                         xpubs_and_deriv_suffixes = get_xpubs_and_der_suffixes_from_txinout(tx, txin)
    393                         multisig = self._make_multisig(txin.num_sig, xpubs_and_deriv_suffixes)
    394                     else:
    395                         multisig = None
    396                     script_type = self.get_trezor_input_script_type(txin.script_type)
    397                     txinputtype = TxInputType(
    398                         script_type=script_type,
    399                         multisig=multisig)
    400                     my_pubkey, full_path = keystore.find_my_pubkey_in_txinout(txin)
    401                     if full_path:
    402                         txinputtype.address_n = full_path
    403 
    404                 prev_hash = txin.prevout.txid
    405                 prev_index = txin.prevout.out_idx
    406 
    407             if txin.value_sats() is not None:
    408                 txinputtype.amount = txin.value_sats()
    409             txinputtype.prev_hash = prev_hash
    410             txinputtype.prev_index = prev_index
    411 
    412             if txin.script_sig is not None:
    413                 txinputtype.script_sig = txin.script_sig
    414 
    415             txinputtype.sequence = txin.nsequence
    416 
    417             inputs.append(txinputtype)
    418 
    419         return inputs
    420 
    421     def _make_multisig(self, m, xpubs):
    422         if len(xpubs) == 1:
    423             return None
    424         pubkeys = [self._make_node_path(xpub, deriv) for xpub, deriv in xpubs]
    425         return MultisigRedeemScriptType(
    426             pubkeys=pubkeys,
    427             signatures=[b''] * len(pubkeys),
    428             m=m)
    429 
    430     def tx_outputs(self, tx: PartialTransaction, *, keystore: 'TrezorKeyStore'):
    431 
    432         def create_output_by_derivation():
    433             script_type = self.get_trezor_output_script_type(txout.script_type)
    434             if len(txout.pubkeys) > 1:
    435                 xpubs_and_deriv_suffixes = get_xpubs_and_der_suffixes_from_txinout(tx, txout)
    436                 multisig = self._make_multisig(txout.num_sig, xpubs_and_deriv_suffixes)
    437             else:
    438                 multisig = None
    439             my_pubkey, full_path = keystore.find_my_pubkey_in_txinout(txout)
    440             assert full_path
    441             txoutputtype = TxOutputType(
    442                 multisig=multisig,
    443                 amount=txout.value,
    444                 address_n=full_path,
    445                 script_type=script_type)
    446             return txoutputtype
    447 
    448         def create_output_by_address():
    449             txoutputtype = TxOutputType()
    450             txoutputtype.amount = txout.value
    451             if address:
    452                 txoutputtype.script_type = OutputScriptType.PAYTOADDRESS
    453                 txoutputtype.address = address
    454             else:
    455                 txoutputtype.script_type = OutputScriptType.PAYTOOPRETURN
    456                 txoutputtype.op_return_data = trezor_validate_op_return_output_and_get_data(txout)
    457             return txoutputtype
    458 
    459         outputs = []
    460         has_change = False
    461         any_output_on_change_branch = is_any_tx_output_on_change_branch(tx)
    462 
    463         for txout in tx.outputs():
    464             address = txout.address
    465             use_create_by_derivation = False
    466 
    467             if txout.is_mine and not has_change:
    468                 # prioritise hiding outputs on the 'change' branch from user
    469                 # because no more than one change address allowed
    470                 # note: ^ restriction can be removed once we require fw
    471                 # that has https://github.com/trezor/trezor-mcu/pull/306
    472                 if txout.is_change == any_output_on_change_branch:
    473                     use_create_by_derivation = True
    474                     has_change = True
    475 
    476             if use_create_by_derivation:
    477                 txoutputtype = create_output_by_derivation()
    478             else:
    479                 txoutputtype = create_output_by_address()
    480             outputs.append(txoutputtype)
    481 
    482         return outputs
    483 
    484     def electrum_tx_to_txtype(self, tx: Optional[Transaction]):
    485         t = TransactionType()
    486         if tx is None:
    487             # probably for segwit input and we don't need this prev txn
    488             return t
    489         tx.deserialize()
    490         t.version = tx.version
    491         t.lock_time = tx.locktime
    492         t.inputs = self.tx_inputs(tx)
    493         t.bin_outputs = [
    494             TxOutputBinType(amount=o.value, script_pubkey=o.scriptpubkey)
    495             for o in tx.outputs()
    496         ]
    497         return t