trezor.py (20338B)
1 import traceback 2 import sys 3 from typing import NamedTuple, Any, Optional, Dict, Union, List, Tuple, TYPE_CHECKING 4 5 from electrum.util import bfh, bh2u, versiontuple, UserCancelled, UserFacingException 6 from electrum.bip32 import BIP32Node, convert_bip32_path_to_list_of_uint32 as parse_path 7 from electrum import constants 8 from electrum.i18n import _ 9 from electrum.plugin import Device, runs_in_hwd_thread 10 from electrum.transaction import Transaction, PartialTransaction, PartialTxInput, PartialTxOutput 11 from electrum.keystore import Hardware_KeyStore 12 from electrum.base_wizard import ScriptTypeNotSupported, HWD_SETUP_NEW_WALLET 13 from electrum.logging import get_logger 14 15 from ..hw_wallet import HW_PluginBase 16 from ..hw_wallet.plugin import (is_any_tx_output_on_change_branch, trezor_validate_op_return_output_and_get_data, 17 LibraryFoundButUnusable, OutdatedHwFirmwareException, 18 get_xpubs_and_der_suffixes_from_txinout) 19 20 _logger = get_logger(__name__) 21 22 23 try: 24 import trezorlib 25 import trezorlib.transport 26 from trezorlib.transport.bridge import BridgeTransport, call_bridge 27 28 from .clientbase import TrezorClientBase 29 30 from trezorlib.messages import ( 31 Capability, BackupType, RecoveryDeviceType, HDNodeType, HDNodePathType, 32 InputScriptType, OutputScriptType, MultisigRedeemScriptType, 33 TxInputType, TxOutputType, TxOutputBinType, TransactionType, SignTx) 34 35 from trezorlib.client import PASSPHRASE_ON_DEVICE 36 37 TREZORLIB = True 38 except Exception as e: 39 if not (isinstance(e, ModuleNotFoundError) and e.name == 'trezorlib'): 40 _logger.exception('error importing trezor plugin deps') 41 TREZORLIB = False 42 43 class _EnumMissing: 44 def __init__(self): 45 self.counter = 0 46 self.values = {} 47 48 def __getattr__(self, key): 49 if key not in self.values: 50 self.values[key] = self.counter 51 self.counter += 1 52 return self.values[key] 53 54 Capability = _EnumMissing() 55 BackupType = _EnumMissing() 56 RecoveryDeviceType = _EnumMissing() 57 58 PASSPHRASE_ON_DEVICE = object() 59 60 61 # Trezor initialization methods 62 TIM_NEW, TIM_RECOVER = range(2) 63 64 TREZOR_PRODUCT_KEY = 'Trezor' 65 66 67 class TrezorKeyStore(Hardware_KeyStore): 68 hw_type = 'trezor' 69 device = TREZOR_PRODUCT_KEY 70 71 plugin: 'TrezorPlugin' 72 73 def get_client(self, force_pair=True): 74 return self.plugin.get_client(self, force_pair) 75 76 def decrypt_message(self, sequence, message, password): 77 raise UserFacingException(_('Encryption and decryption are not implemented by {}').format(self.device)) 78 79 def sign_message(self, sequence, message, password): 80 client = self.get_client() 81 address_path = self.get_derivation_prefix() + "/%d/%d"%sequence 82 msg_sig = client.sign_message(address_path, message) 83 return msg_sig.signature 84 85 def sign_transaction(self, tx, password): 86 if tx.is_complete(): 87 return 88 # previous transactions used as inputs 89 prev_tx = {} 90 for txin in tx.inputs(): 91 tx_hash = txin.prevout.txid.hex() 92 if txin.utxo is None: 93 raise UserFacingException(_('Missing previous tx.')) 94 prev_tx[tx_hash] = txin.utxo 95 96 self.plugin.sign_transaction(self, tx, prev_tx) 97 98 99 class TrezorInitSettings(NamedTuple): 100 word_count: int 101 label: str 102 pin_enabled: bool 103 passphrase_enabled: bool 104 recovery_type: Any = None 105 backup_type: int = BackupType.Bip39 106 no_backup: bool = False 107 108 109 class TrezorPlugin(HW_PluginBase): 110 # Derived classes provide: 111 # 112 # class-static variables: client_class, firmware_URL, handler_class, 113 # libraries_available, libraries_URL, minimum_firmware, 114 # wallet_class, types 115 116 firmware_URL = 'https://wallet.trezor.io' 117 libraries_URL = 'https://pypi.org/project/trezor/' 118 minimum_firmware = (1, 5, 2) 119 keystore_class = TrezorKeyStore 120 minimum_library = (0, 12, 0) 121 maximum_library = (0, 13) 122 SUPPORTED_XTYPES = ('standard', 'p2wpkh-p2sh', 'p2wpkh', 'p2wsh-p2sh', 'p2wsh') 123 DEVICE_IDS = (TREZOR_PRODUCT_KEY,) 124 125 MAX_LABEL_LEN = 32 126 127 def __init__(self, parent, config, name): 128 super().__init__(parent, config, name) 129 130 self.libraries_available = self.check_libraries_available() 131 if not self.libraries_available: 132 return 133 self.device_manager().register_enumerate_func(self.enumerate) 134 self._is_bridge_available = None 135 136 def get_library_version(self): 137 import trezorlib 138 try: 139 version = trezorlib.__version__ 140 except Exception: 141 version = 'unknown' 142 if TREZORLIB: 143 return version 144 else: 145 raise LibraryFoundButUnusable(library_version=version) 146 147 @runs_in_hwd_thread 148 def is_bridge_available(self) -> bool: 149 # Testing whether the Bridge is available can take several seconds 150 # (when it is not), as it is slow to timeout, hence we cache it. 151 if self._is_bridge_available is None: 152 try: 153 call_bridge("enumerate") 154 except Exception: 155 self._is_bridge_available = False 156 # never again try with Bridge due to slow timeout 157 BridgeTransport.ENABLED = False 158 else: 159 self._is_bridge_available = True 160 return self._is_bridge_available 161 162 @runs_in_hwd_thread 163 def enumerate(self): 164 # If there is a bridge, prefer that. 165 # On Windows, the bridge runs as Admin (and Electrum usually does not), 166 # so the bridge has better chances of finding devices. see #5420 167 # This also avoids duplicate entries. 168 if self.is_bridge_available(): 169 devices = BridgeTransport.enumerate() 170 else: 171 devices = trezorlib.transport.enumerate_devices() 172 return [Device(path=d.get_path(), 173 interface_number=-1, 174 id_=d.get_path(), 175 product_key=TREZOR_PRODUCT_KEY, 176 usage_page=0, 177 transport_ui_string=d.get_path()) 178 for d in devices] 179 180 @runs_in_hwd_thread 181 def create_client(self, device, handler): 182 try: 183 self.logger.info(f"connecting to device at {device.path}") 184 transport = trezorlib.transport.get_transport(device.path) 185 except BaseException as e: 186 self.logger.info(f"cannot connect at {device.path} {e}") 187 return None 188 189 if not transport: 190 self.logger.info(f"cannot connect at {device.path}") 191 return 192 193 self.logger.info(f"connected to device at {device.path}") 194 # note that this call can still raise! 195 return TrezorClientBase(transport, handler, self) 196 197 @runs_in_hwd_thread 198 def get_client(self, keystore, force_pair=True, *, 199 devices=None, allow_user_interaction=True) -> Optional['TrezorClientBase']: 200 client = super().get_client(keystore, force_pair, 201 devices=devices, 202 allow_user_interaction=allow_user_interaction) 203 # returns the client for a given keystore. can use xpub 204 if client: 205 client.used() 206 return client 207 208 def get_coin_name(self): 209 return "Testnet" if constants.net.TESTNET else "Bitcoin" 210 211 def initialize_device(self, device_id, wizard, handler): 212 # Initialization method 213 msg = _("Choose how you want to initialize your {}.").format(self.device, self.device) 214 choices = [ 215 # Must be short as QT doesn't word-wrap radio button text 216 (TIM_NEW, _("Let the device generate a completely new seed randomly")), 217 (TIM_RECOVER, _("Recover from a seed you have previously written down")), 218 ] 219 def f(method): 220 import threading 221 settings = self.request_trezor_init_settings(wizard, method, device_id) 222 t = threading.Thread(target=self._initialize_device_safe, args=(settings, method, device_id, wizard, handler)) 223 t.setDaemon(True) 224 t.start() 225 exit_code = wizard.loop.exec_() 226 if exit_code != 0: 227 # this method (initialize_device) was called with the expectation 228 # of leaving the device in an initialized state when finishing. 229 # signal that this is not the case: 230 raise UserCancelled() 231 wizard.choice_dialog(title=_('Initialize Device'), message=msg, choices=choices, run_next=f) 232 233 def _initialize_device_safe(self, settings, method, device_id, wizard, handler): 234 exit_code = 0 235 try: 236 self._initialize_device(settings, method, device_id, wizard, handler) 237 except UserCancelled: 238 exit_code = 1 239 except BaseException as e: 240 self.logger.exception('') 241 handler.show_error(repr(e)) 242 exit_code = 1 243 finally: 244 wizard.loop.exit(exit_code) 245 246 @runs_in_hwd_thread 247 def _initialize_device(self, settings: TrezorInitSettings, method, device_id, wizard, handler): 248 if method == TIM_RECOVER and settings.recovery_type == RecoveryDeviceType.ScrambledWords: 249 handler.show_error(_( 250 "You will be asked to enter 24 words regardless of your " 251 "seed's actual length. If you enter a word incorrectly or " 252 "misspell it, you cannot change it or go back - you will need " 253 "to start again from the beginning.\n\nSo please enter " 254 "the words carefully!"), 255 blocking=True) 256 257 devmgr = self.device_manager() 258 client = devmgr.client_by_id(device_id) 259 if not client: 260 raise Exception(_("The device was disconnected.")) 261 262 if method == TIM_NEW: 263 strength_from_word_count = {12: 128, 18: 192, 20: 128, 24: 256, 33: 256} 264 client.reset_device( 265 strength=strength_from_word_count[settings.word_count], 266 passphrase_protection=settings.passphrase_enabled, 267 pin_protection=settings.pin_enabled, 268 label=settings.label, 269 backup_type=settings.backup_type, 270 no_backup=settings.no_backup) 271 elif method == TIM_RECOVER: 272 client.recover_device( 273 recovery_type=settings.recovery_type, 274 word_count=settings.word_count, 275 passphrase_protection=settings.passphrase_enabled, 276 pin_protection=settings.pin_enabled, 277 label=settings.label) 278 if settings.recovery_type == RecoveryDeviceType.Matrix: 279 handler.close_matrix_dialog() 280 else: 281 raise RuntimeError("Unsupported recovery method") 282 283 def _make_node_path(self, xpub, address_n): 284 bip32node = BIP32Node.from_xkey(xpub) 285 node = HDNodeType( 286 depth=bip32node.depth, 287 fingerprint=int.from_bytes(bip32node.fingerprint, 'big'), 288 child_num=int.from_bytes(bip32node.child_number, 'big'), 289 chain_code=bip32node.chaincode, 290 public_key=bip32node.eckey.get_public_key_bytes(compressed=True), 291 ) 292 return HDNodePathType(node=node, address_n=address_n) 293 294 def setup_device(self, device_info, wizard, purpose): 295 device_id = device_info.device.id_ 296 client = self.scan_and_create_client_for_device(device_id=device_id, wizard=wizard) 297 298 if not client.is_uptodate(): 299 msg = (_('Outdated {} firmware for device labelled {}. Please ' 300 'download the updated firmware from {}') 301 .format(self.device, client.label(), self.firmware_URL)) 302 raise OutdatedHwFirmwareException(msg) 303 304 if not device_info.initialized: 305 self.initialize_device(device_id, wizard, client.handler) 306 is_creating_wallet = purpose == HWD_SETUP_NEW_WALLET 307 wizard.run_task_without_blocking_gui( 308 task=lambda: client.get_xpub('m', 'standard', creating=is_creating_wallet)) 309 client.used() 310 return client 311 312 def get_xpub(self, device_id, derivation, xtype, wizard): 313 if xtype not in self.SUPPORTED_XTYPES: 314 raise ScriptTypeNotSupported(_('This type of script is not supported with {}.').format(self.device)) 315 client = self.scan_and_create_client_for_device(device_id=device_id, wizard=wizard) 316 xpub = client.get_xpub(derivation, xtype) 317 client.used() 318 return xpub 319 320 def get_trezor_input_script_type(self, electrum_txin_type: str): 321 if electrum_txin_type in ('p2wpkh', 'p2wsh'): 322 return InputScriptType.SPENDWITNESS 323 if electrum_txin_type in ('p2wpkh-p2sh', 'p2wsh-p2sh'): 324 return InputScriptType.SPENDP2SHWITNESS 325 if electrum_txin_type in ('p2pkh', ): 326 return InputScriptType.SPENDADDRESS 327 if electrum_txin_type in ('p2sh', ): 328 return InputScriptType.SPENDMULTISIG 329 raise ValueError('unexpected txin type: {}'.format(electrum_txin_type)) 330 331 def get_trezor_output_script_type(self, electrum_txin_type: str): 332 if electrum_txin_type in ('p2wpkh', 'p2wsh'): 333 return OutputScriptType.PAYTOWITNESS 334 if electrum_txin_type in ('p2wpkh-p2sh', 'p2wsh-p2sh'): 335 return OutputScriptType.PAYTOP2SHWITNESS 336 if electrum_txin_type in ('p2pkh', ): 337 return OutputScriptType.PAYTOADDRESS 338 if electrum_txin_type in ('p2sh', ): 339 return OutputScriptType.PAYTOMULTISIG 340 raise ValueError('unexpected txin type: {}'.format(electrum_txin_type)) 341 342 @runs_in_hwd_thread 343 def sign_transaction(self, keystore, tx: PartialTransaction, prev_tx): 344 prev_tx = { bfh(txhash): self.electrum_tx_to_txtype(tx) for txhash, tx in prev_tx.items() } 345 client = self.get_client(keystore) 346 inputs = self.tx_inputs(tx, for_sig=True, keystore=keystore) 347 outputs = self.tx_outputs(tx, keystore=keystore) 348 details = SignTx(lock_time=tx.locktime, version=tx.version) 349 signatures, _ = client.sign_tx(self.get_coin_name(), inputs, outputs, details=details, prev_txes=prev_tx) 350 signatures = [(bh2u(x) + '01') for x in signatures] 351 tx.update_signatures(signatures) 352 353 @runs_in_hwd_thread 354 def show_address(self, wallet, address, keystore=None): 355 if keystore is None: 356 keystore = wallet.get_keystore() 357 if not self.show_address_helper(wallet, address, keystore): 358 return 359 deriv_suffix = wallet.get_address_index(address) 360 derivation = keystore.get_derivation_prefix() 361 address_path = "%s/%d/%d"%(derivation, *deriv_suffix) 362 script_type = self.get_trezor_input_script_type(wallet.txin_type) 363 364 # prepare multisig, if available: 365 xpubs = wallet.get_master_public_keys() 366 if len(xpubs) > 1: 367 pubkeys = wallet.get_public_keys(address) 368 # sort xpubs using the order of pubkeys 369 sorted_pairs = sorted(zip(pubkeys, xpubs)) 370 multisig = self._make_multisig( 371 wallet.m, 372 [(xpub, deriv_suffix) for pubkey, xpub in sorted_pairs]) 373 else: 374 multisig = None 375 376 client = self.get_client(keystore) 377 client.show_address(address_path, script_type, multisig) 378 379 def tx_inputs(self, tx: Transaction, *, for_sig=False, keystore: 'TrezorKeyStore' = None): 380 inputs = [] 381 for txin in tx.inputs(): 382 txinputtype = TxInputType() 383 if txin.is_coinbase_input(): 384 prev_hash = b"\x00"*32 385 prev_index = 0xffffffff # signed int -1 386 else: 387 if for_sig: 388 assert isinstance(tx, PartialTransaction) 389 assert isinstance(txin, PartialTxInput) 390 assert keystore 391 if len(txin.pubkeys) > 1: 392 xpubs_and_deriv_suffixes = get_xpubs_and_der_suffixes_from_txinout(tx, txin) 393 multisig = self._make_multisig(txin.num_sig, xpubs_and_deriv_suffixes) 394 else: 395 multisig = None 396 script_type = self.get_trezor_input_script_type(txin.script_type) 397 txinputtype = TxInputType( 398 script_type=script_type, 399 multisig=multisig) 400 my_pubkey, full_path = keystore.find_my_pubkey_in_txinout(txin) 401 if full_path: 402 txinputtype.address_n = full_path 403 404 prev_hash = txin.prevout.txid 405 prev_index = txin.prevout.out_idx 406 407 if txin.value_sats() is not None: 408 txinputtype.amount = txin.value_sats() 409 txinputtype.prev_hash = prev_hash 410 txinputtype.prev_index = prev_index 411 412 if txin.script_sig is not None: 413 txinputtype.script_sig = txin.script_sig 414 415 txinputtype.sequence = txin.nsequence 416 417 inputs.append(txinputtype) 418 419 return inputs 420 421 def _make_multisig(self, m, xpubs): 422 if len(xpubs) == 1: 423 return None 424 pubkeys = [self._make_node_path(xpub, deriv) for xpub, deriv in xpubs] 425 return MultisigRedeemScriptType( 426 pubkeys=pubkeys, 427 signatures=[b''] * len(pubkeys), 428 m=m) 429 430 def tx_outputs(self, tx: PartialTransaction, *, keystore: 'TrezorKeyStore'): 431 432 def create_output_by_derivation(): 433 script_type = self.get_trezor_output_script_type(txout.script_type) 434 if len(txout.pubkeys) > 1: 435 xpubs_and_deriv_suffixes = get_xpubs_and_der_suffixes_from_txinout(tx, txout) 436 multisig = self._make_multisig(txout.num_sig, xpubs_and_deriv_suffixes) 437 else: 438 multisig = None 439 my_pubkey, full_path = keystore.find_my_pubkey_in_txinout(txout) 440 assert full_path 441 txoutputtype = TxOutputType( 442 multisig=multisig, 443 amount=txout.value, 444 address_n=full_path, 445 script_type=script_type) 446 return txoutputtype 447 448 def create_output_by_address(): 449 txoutputtype = TxOutputType() 450 txoutputtype.amount = txout.value 451 if address: 452 txoutputtype.script_type = OutputScriptType.PAYTOADDRESS 453 txoutputtype.address = address 454 else: 455 txoutputtype.script_type = OutputScriptType.PAYTOOPRETURN 456 txoutputtype.op_return_data = trezor_validate_op_return_output_and_get_data(txout) 457 return txoutputtype 458 459 outputs = [] 460 has_change = False 461 any_output_on_change_branch = is_any_tx_output_on_change_branch(tx) 462 463 for txout in tx.outputs(): 464 address = txout.address 465 use_create_by_derivation = False 466 467 if txout.is_mine and not has_change: 468 # prioritise hiding outputs on the 'change' branch from user 469 # because no more than one change address allowed 470 # note: ^ restriction can be removed once we require fw 471 # that has https://github.com/trezor/trezor-mcu/pull/306 472 if txout.is_change == any_output_on_change_branch: 473 use_create_by_derivation = True 474 has_change = True 475 476 if use_create_by_derivation: 477 txoutputtype = create_output_by_derivation() 478 else: 479 txoutputtype = create_output_by_address() 480 outputs.append(txoutputtype) 481 482 return outputs 483 484 def electrum_tx_to_txtype(self, tx: Optional[Transaction]): 485 t = TransactionType() 486 if tx is None: 487 # probably for segwit input and we don't need this prev txn 488 return t 489 tx.deserialize() 490 t.version = tx.version 491 t.lock_time = tx.locktime 492 t.inputs = self.tx_inputs(tx) 493 t.bin_outputs = [ 494 TxOutputBinType(amount=o.value, script_pubkey=o.scriptpubkey) 495 for o in tx.outputs() 496 ] 497 return t