safe_t.py (19535B)
1 from binascii import hexlify, unhexlify 2 import traceback 3 import sys 4 from typing import NamedTuple, Any, Optional, Dict, Union, List, Tuple, TYPE_CHECKING 5 6 from electrum.util import bfh, bh2u, versiontuple, UserCancelled, UserFacingException 7 from electrum.bip32 import BIP32Node 8 from electrum import constants 9 from electrum.i18n import _ 10 from electrum.plugin import Device, runs_in_hwd_thread 11 from electrum.transaction import Transaction, PartialTransaction, PartialTxInput, PartialTxOutput 12 from electrum.keystore import Hardware_KeyStore 13 from electrum.base_wizard import ScriptTypeNotSupported 14 15 from ..hw_wallet import HW_PluginBase 16 from ..hw_wallet.plugin import (is_any_tx_output_on_change_branch, trezor_validate_op_return_output_and_get_data, 17 get_xpubs_and_der_suffixes_from_txinout) 18 19 if TYPE_CHECKING: 20 from .client import SafeTClient 21 22 # Safe-T mini initialization methods 23 TIM_NEW, TIM_RECOVER, TIM_MNEMONIC, TIM_PRIVKEY = range(0, 4) 24 25 26 class SafeTKeyStore(Hardware_KeyStore): 27 hw_type = 'safe_t' 28 device = 'Safe-T mini' 29 30 plugin: 'SafeTPlugin' 31 32 def get_client(self, force_pair=True): 33 return self.plugin.get_client(self, force_pair) 34 35 def decrypt_message(self, sequence, message, password): 36 raise UserFacingException(_('Encryption and decryption are not implemented by {}').format(self.device)) 37 38 @runs_in_hwd_thread 39 def sign_message(self, sequence, message, password): 40 client = self.get_client() 41 address_path = self.get_derivation_prefix() + "/%d/%d"%sequence 42 address_n = client.expand_path(address_path) 43 msg_sig = client.sign_message(self.plugin.get_coin_name(), address_n, message) 44 return msg_sig.signature 45 46 @runs_in_hwd_thread 47 def sign_transaction(self, tx, password): 48 if tx.is_complete(): 49 return 50 # previous transactions used as inputs 51 prev_tx = {} 52 for txin in tx.inputs(): 53 tx_hash = txin.prevout.txid.hex() 54 if txin.utxo is None and not txin.is_segwit(): 55 raise UserFacingException(_('Missing previous tx for legacy input.')) 56 prev_tx[tx_hash] = txin.utxo 57 58 self.plugin.sign_transaction(self, tx, prev_tx) 59 60 61 class SafeTPlugin(HW_PluginBase): 62 # Derived classes provide: 63 # 64 # class-static variables: client_class, firmware_URL, handler_class, 65 # libraries_available, libraries_URL, minimum_firmware, 66 # wallet_class, types 67 68 firmware_URL = 'https://safe-t.io' 69 libraries_URL = 'https://github.com/archos-safe-t/python-safet' 70 minimum_firmware = (1, 0, 5) 71 keystore_class = SafeTKeyStore 72 minimum_library = (0, 1, 0) 73 SUPPORTED_XTYPES = ('standard', 'p2wpkh-p2sh', 'p2wpkh', 'p2wsh-p2sh', 'p2wsh') 74 75 MAX_LABEL_LEN = 32 76 77 def __init__(self, parent, config, name): 78 HW_PluginBase.__init__(self, parent, config, name) 79 80 self.libraries_available = self.check_libraries_available() 81 if not self.libraries_available: 82 return 83 84 from . import client 85 from . import transport 86 import safetlib.messages 87 self.client_class = client.SafeTClient 88 self.types = safetlib.messages 89 self.DEVICE_IDS = ('Safe-T mini',) 90 91 self.transport_handler = transport.SafeTTransport() 92 self.device_manager().register_enumerate_func(self.enumerate) 93 94 def get_library_version(self): 95 import safetlib 96 try: 97 return safetlib.__version__ 98 except AttributeError: 99 return 'unknown' 100 101 @runs_in_hwd_thread 102 def enumerate(self): 103 devices = self.transport_handler.enumerate_devices() 104 return [Device(path=d.get_path(), 105 interface_number=-1, 106 id_=d.get_path(), 107 product_key='Safe-T mini', 108 usage_page=0, 109 transport_ui_string=d.get_path()) 110 for d in devices] 111 112 @runs_in_hwd_thread 113 def create_client(self, device, handler): 114 try: 115 self.logger.info(f"connecting to device at {device.path}") 116 transport = self.transport_handler.get_transport(device.path) 117 except BaseException as e: 118 self.logger.info(f"cannot connect at {device.path} {e}") 119 return None 120 121 if not transport: 122 self.logger.info(f"cannot connect at {device.path}") 123 return 124 125 self.logger.info(f"connected to device at {device.path}") 126 client = self.client_class(transport, handler, self) 127 128 # Try a ping for device sanity 129 try: 130 client.ping('t') 131 except BaseException as e: 132 self.logger.info(f"ping failed {e}") 133 return None 134 135 if not client.atleast_version(*self.minimum_firmware): 136 msg = (_('Outdated {} firmware for device labelled {}. Please ' 137 'download the updated firmware from {}') 138 .format(self.device, client.label(), self.firmware_URL)) 139 self.logger.info(msg) 140 if handler: 141 handler.show_error(msg) 142 else: 143 raise UserFacingException(msg) 144 return None 145 146 return client 147 148 @runs_in_hwd_thread 149 def get_client(self, keystore, force_pair=True, *, 150 devices=None, allow_user_interaction=True) -> Optional['SafeTClient']: 151 client = super().get_client(keystore, force_pair, 152 devices=devices, 153 allow_user_interaction=allow_user_interaction) 154 # returns the client for a given keystore. can use xpub 155 if client: 156 client.used() 157 return client 158 159 def get_coin_name(self): 160 return "Testnet" if constants.net.TESTNET else "Bitcoin" 161 162 def initialize_device(self, device_id, wizard, handler): 163 # Initialization method 164 msg = _("Choose how you want to initialize your {}.\n\n" 165 "The first two methods are secure as no secret information " 166 "is entered into your computer.\n\n" 167 "For the last two methods you input secrets on your keyboard " 168 "and upload them to your {}, and so you should " 169 "only do those on a computer you know to be trustworthy " 170 "and free of malware." 171 ).format(self.device, self.device) 172 choices = [ 173 # Must be short as QT doesn't word-wrap radio button text 174 (TIM_NEW, _("Let the device generate a completely new seed randomly")), 175 (TIM_RECOVER, _("Recover from a seed you have previously written down")), 176 (TIM_MNEMONIC, _("Upload a BIP39 mnemonic to generate the seed")), 177 (TIM_PRIVKEY, _("Upload a master private key")) 178 ] 179 def f(method): 180 import threading 181 settings = self.request_safe_t_init_settings(wizard, method, self.device) 182 t = threading.Thread(target=self._initialize_device_safe, args=(settings, method, device_id, wizard, handler)) 183 t.setDaemon(True) 184 t.start() 185 exit_code = wizard.loop.exec_() 186 if exit_code != 0: 187 # this method (initialize_device) was called with the expectation 188 # of leaving the device in an initialized state when finishing. 189 # signal that this is not the case: 190 raise UserCancelled() 191 wizard.choice_dialog(title=_('Initialize Device'), message=msg, choices=choices, run_next=f) 192 193 def _initialize_device_safe(self, settings, method, device_id, wizard, handler): 194 exit_code = 0 195 try: 196 self._initialize_device(settings, method, device_id, wizard, handler) 197 except UserCancelled: 198 exit_code = 1 199 except BaseException as e: 200 self.logger.exception('') 201 handler.show_error(repr(e)) 202 exit_code = 1 203 finally: 204 wizard.loop.exit(exit_code) 205 206 @runs_in_hwd_thread 207 def _initialize_device(self, settings, method, device_id, wizard, handler): 208 item, label, pin_protection, passphrase_protection = settings 209 210 if method == TIM_RECOVER: 211 handler.show_error(_( 212 "You will be asked to enter 24 words regardless of your " 213 "seed's actual length. If you enter a word incorrectly or " 214 "misspell it, you cannot change it or go back - you will need " 215 "to start again from the beginning.\n\nSo please enter " 216 "the words carefully!"), 217 blocking=True) 218 219 language = 'english' 220 devmgr = self.device_manager() 221 client = devmgr.client_by_id(device_id) 222 if not client: 223 raise Exception(_("The device was disconnected.")) 224 225 if method == TIM_NEW: 226 strength = 64 * (item + 2) # 128, 192 or 256 227 u2f_counter = 0 228 skip_backup = False 229 client.reset_device(True, strength, passphrase_protection, 230 pin_protection, label, language, 231 u2f_counter, skip_backup) 232 elif method == TIM_RECOVER: 233 word_count = 6 * (item + 2) # 12, 18 or 24 234 client.step = 0 235 client.recovery_device(word_count, passphrase_protection, 236 pin_protection, label, language) 237 elif method == TIM_MNEMONIC: 238 pin = pin_protection # It's the pin, not a boolean 239 client.load_device_by_mnemonic(str(item), pin, 240 passphrase_protection, 241 label, language) 242 else: 243 pin = pin_protection # It's the pin, not a boolean 244 client.load_device_by_xprv(item, pin, passphrase_protection, 245 label, language) 246 247 def _make_node_path(self, xpub, address_n): 248 bip32node = BIP32Node.from_xkey(xpub) 249 node = self.types.HDNodeType( 250 depth=bip32node.depth, 251 fingerprint=int.from_bytes(bip32node.fingerprint, 'big'), 252 child_num=int.from_bytes(bip32node.child_number, 'big'), 253 chain_code=bip32node.chaincode, 254 public_key=bip32node.eckey.get_public_key_bytes(compressed=True), 255 ) 256 return self.types.HDNodePathType(node=node, address_n=address_n) 257 258 def setup_device(self, device_info, wizard, purpose): 259 device_id = device_info.device.id_ 260 client = self.scan_and_create_client_for_device(device_id=device_id, wizard=wizard) 261 if not device_info.initialized: 262 self.initialize_device(device_id, wizard, client.handler) 263 wizard.run_task_without_blocking_gui( 264 task=lambda: client.get_xpub("m", 'standard')) 265 client.used() 266 return client 267 268 def get_xpub(self, device_id, derivation, xtype, wizard): 269 if xtype not in self.SUPPORTED_XTYPES: 270 raise ScriptTypeNotSupported(_('This type of script is not supported with {}.').format(self.device)) 271 client = self.scan_and_create_client_for_device(device_id=device_id, wizard=wizard) 272 xpub = client.get_xpub(derivation, xtype) 273 client.used() 274 return xpub 275 276 def get_safet_input_script_type(self, electrum_txin_type: str): 277 if electrum_txin_type in ('p2wpkh', 'p2wsh'): 278 return self.types.InputScriptType.SPENDWITNESS 279 if electrum_txin_type in ('p2wpkh-p2sh', 'p2wsh-p2sh'): 280 return self.types.InputScriptType.SPENDP2SHWITNESS 281 if electrum_txin_type in ('p2pkh', ): 282 return self.types.InputScriptType.SPENDADDRESS 283 if electrum_txin_type in ('p2sh', ): 284 return self.types.InputScriptType.SPENDMULTISIG 285 raise ValueError('unexpected txin type: {}'.format(electrum_txin_type)) 286 287 def get_safet_output_script_type(self, electrum_txin_type: str): 288 if electrum_txin_type in ('p2wpkh', 'p2wsh'): 289 return self.types.OutputScriptType.PAYTOWITNESS 290 if electrum_txin_type in ('p2wpkh-p2sh', 'p2wsh-p2sh'): 291 return self.types.OutputScriptType.PAYTOP2SHWITNESS 292 if electrum_txin_type in ('p2pkh', ): 293 return self.types.OutputScriptType.PAYTOADDRESS 294 if electrum_txin_type in ('p2sh', ): 295 return self.types.OutputScriptType.PAYTOMULTISIG 296 raise ValueError('unexpected txin type: {}'.format(electrum_txin_type)) 297 298 @runs_in_hwd_thread 299 def sign_transaction(self, keystore, tx: PartialTransaction, prev_tx): 300 self.prev_tx = prev_tx 301 client = self.get_client(keystore) 302 inputs = self.tx_inputs(tx, for_sig=True, keystore=keystore) 303 outputs = self.tx_outputs(tx, keystore=keystore) 304 signatures = client.sign_tx(self.get_coin_name(), inputs, outputs, 305 lock_time=tx.locktime, version=tx.version)[0] 306 signatures = [(bh2u(x) + '01') for x in signatures] 307 tx.update_signatures(signatures) 308 309 @runs_in_hwd_thread 310 def show_address(self, wallet, address, keystore=None): 311 if keystore is None: 312 keystore = wallet.get_keystore() 313 if not self.show_address_helper(wallet, address, keystore): 314 return 315 client = self.get_client(keystore) 316 if not client.atleast_version(1, 0): 317 keystore.handler.show_error(_("Your device firmware is too old")) 318 return 319 deriv_suffix = wallet.get_address_index(address) 320 derivation = keystore.get_derivation_prefix() 321 address_path = "%s/%d/%d"%(derivation, *deriv_suffix) 322 address_n = client.expand_path(address_path) 323 script_type = self.get_safet_input_script_type(wallet.txin_type) 324 325 # prepare multisig, if available: 326 xpubs = wallet.get_master_public_keys() 327 if len(xpubs) > 1: 328 pubkeys = wallet.get_public_keys(address) 329 # sort xpubs using the order of pubkeys 330 sorted_pairs = sorted(zip(pubkeys, xpubs)) 331 multisig = self._make_multisig( 332 wallet.m, 333 [(xpub, deriv_suffix) for pubkey, xpub in sorted_pairs]) 334 else: 335 multisig = None 336 337 client.get_address(self.get_coin_name(), address_n, True, multisig=multisig, script_type=script_type) 338 339 def tx_inputs(self, tx: Transaction, *, for_sig=False, keystore: 'SafeTKeyStore' = None): 340 inputs = [] 341 for txin in tx.inputs(): 342 txinputtype = self.types.TxInputType() 343 if txin.is_coinbase_input(): 344 prev_hash = b"\x00"*32 345 prev_index = 0xffffffff # signed int -1 346 else: 347 if for_sig: 348 assert isinstance(tx, PartialTransaction) 349 assert isinstance(txin, PartialTxInput) 350 assert keystore 351 if len(txin.pubkeys) > 1: 352 xpubs_and_deriv_suffixes = get_xpubs_and_der_suffixes_from_txinout(tx, txin) 353 multisig = self._make_multisig(txin.num_sig, xpubs_and_deriv_suffixes) 354 else: 355 multisig = None 356 script_type = self.get_safet_input_script_type(txin.script_type) 357 txinputtype = self.types.TxInputType( 358 script_type=script_type, 359 multisig=multisig) 360 my_pubkey, full_path = keystore.find_my_pubkey_in_txinout(txin) 361 if full_path: 362 txinputtype._extend_address_n(full_path) 363 364 prev_hash = txin.prevout.txid 365 prev_index = txin.prevout.out_idx 366 367 if txin.value_sats() is not None: 368 txinputtype.amount = txin.value_sats() 369 txinputtype.prev_hash = prev_hash 370 txinputtype.prev_index = prev_index 371 372 if txin.script_sig is not None: 373 txinputtype.script_sig = txin.script_sig 374 375 txinputtype.sequence = txin.nsequence 376 377 inputs.append(txinputtype) 378 379 return inputs 380 381 def _make_multisig(self, m, xpubs): 382 if len(xpubs) == 1: 383 return None 384 pubkeys = [self._make_node_path(xpub, deriv) for xpub, deriv in xpubs] 385 return self.types.MultisigRedeemScriptType( 386 pubkeys=pubkeys, 387 signatures=[b''] * len(pubkeys), 388 m=m) 389 390 def tx_outputs(self, tx: PartialTransaction, *, keystore: 'SafeTKeyStore'): 391 392 def create_output_by_derivation(): 393 script_type = self.get_safet_output_script_type(txout.script_type) 394 if len(txout.pubkeys) > 1: 395 xpubs_and_deriv_suffixes = get_xpubs_and_der_suffixes_from_txinout(tx, txout) 396 multisig = self._make_multisig(txout.num_sig, xpubs_and_deriv_suffixes) 397 else: 398 multisig = None 399 my_pubkey, full_path = keystore.find_my_pubkey_in_txinout(txout) 400 assert full_path 401 txoutputtype = self.types.TxOutputType( 402 multisig=multisig, 403 amount=txout.value, 404 address_n=full_path, 405 script_type=script_type) 406 return txoutputtype 407 408 def create_output_by_address(): 409 txoutputtype = self.types.TxOutputType() 410 txoutputtype.amount = txout.value 411 if address: 412 txoutputtype.script_type = self.types.OutputScriptType.PAYTOADDRESS 413 txoutputtype.address = address 414 else: 415 txoutputtype.script_type = self.types.OutputScriptType.PAYTOOPRETURN 416 txoutputtype.op_return_data = trezor_validate_op_return_output_and_get_data(txout) 417 return txoutputtype 418 419 outputs = [] 420 has_change = False 421 any_output_on_change_branch = is_any_tx_output_on_change_branch(tx) 422 423 for txout in tx.outputs(): 424 address = txout.address 425 use_create_by_derivation = False 426 427 if txout.is_mine and not has_change: 428 # prioritise hiding outputs on the 'change' branch from user 429 # because no more than one change address allowed 430 # note: ^ restriction can be removed once we require fw 431 # that has https://github.com/trezor/trezor-mcu/pull/306 432 if txout.is_change == any_output_on_change_branch: 433 use_create_by_derivation = True 434 has_change = True 435 436 if use_create_by_derivation: 437 txoutputtype = create_output_by_derivation() 438 else: 439 txoutputtype = create_output_by_address() 440 outputs.append(txoutputtype) 441 442 return outputs 443 444 def electrum_tx_to_txtype(self, tx: Optional[Transaction]): 445 t = self.types.TransactionType() 446 if tx is None: 447 # probably for segwit input and we don't need this prev txn 448 return t 449 tx.deserialize() 450 t.version = tx.version 451 t.lock_time = tx.locktime 452 inputs = self.tx_inputs(tx) 453 t._extend_inputs(inputs) 454 for out in tx.outputs(): 455 o = t._add_bin_outputs() 456 o.amount = out.value 457 o.script_pubkey = out.scriptpubkey 458 return t 459 460 # This function is called from the TREZOR libraries (via tx_api) 461 def get_tx(self, tx_hash): 462 tx = self.prev_tx[tx_hash] 463 return self.electrum_tx_to_txtype(tx)