electrum

Electrum Bitcoin wallet
git clone https://git.parazyd.org/electrum
Log | Files | Refs | Submodules

safe_t.py (19535B)


      1 from binascii import hexlify, unhexlify
      2 import traceback
      3 import sys
      4 from typing import NamedTuple, Any, Optional, Dict, Union, List, Tuple, TYPE_CHECKING
      5 
      6 from electrum.util import bfh, bh2u, versiontuple, UserCancelled, UserFacingException
      7 from electrum.bip32 import BIP32Node
      8 from electrum import constants
      9 from electrum.i18n import _
     10 from electrum.plugin import Device, runs_in_hwd_thread
     11 from electrum.transaction import Transaction, PartialTransaction, PartialTxInput, PartialTxOutput
     12 from electrum.keystore import Hardware_KeyStore
     13 from electrum.base_wizard import ScriptTypeNotSupported
     14 
     15 from ..hw_wallet import HW_PluginBase
     16 from ..hw_wallet.plugin import (is_any_tx_output_on_change_branch, trezor_validate_op_return_output_and_get_data,
     17                                 get_xpubs_and_der_suffixes_from_txinout)
     18 
     19 if TYPE_CHECKING:
     20     from .client import SafeTClient
     21 
     22 # Safe-T mini initialization methods
     23 TIM_NEW, TIM_RECOVER, TIM_MNEMONIC, TIM_PRIVKEY = range(0, 4)
     24 
     25 
     26 class SafeTKeyStore(Hardware_KeyStore):
     27     hw_type = 'safe_t'
     28     device = 'Safe-T mini'
     29 
     30     plugin: 'SafeTPlugin'
     31 
     32     def get_client(self, force_pair=True):
     33         return self.plugin.get_client(self, force_pair)
     34 
     35     def decrypt_message(self, sequence, message, password):
     36         raise UserFacingException(_('Encryption and decryption are not implemented by {}').format(self.device))
     37 
     38     @runs_in_hwd_thread
     39     def sign_message(self, sequence, message, password):
     40         client = self.get_client()
     41         address_path = self.get_derivation_prefix() + "/%d/%d"%sequence
     42         address_n = client.expand_path(address_path)
     43         msg_sig = client.sign_message(self.plugin.get_coin_name(), address_n, message)
     44         return msg_sig.signature
     45 
     46     @runs_in_hwd_thread
     47     def sign_transaction(self, tx, password):
     48         if tx.is_complete():
     49             return
     50         # previous transactions used as inputs
     51         prev_tx = {}
     52         for txin in tx.inputs():
     53             tx_hash = txin.prevout.txid.hex()
     54             if txin.utxo is None and not txin.is_segwit():
     55                 raise UserFacingException(_('Missing previous tx for legacy input.'))
     56             prev_tx[tx_hash] = txin.utxo
     57 
     58         self.plugin.sign_transaction(self, tx, prev_tx)
     59 
     60 
     61 class SafeTPlugin(HW_PluginBase):
     62     # Derived classes provide:
     63     #
     64     #  class-static variables: client_class, firmware_URL, handler_class,
     65     #     libraries_available, libraries_URL, minimum_firmware,
     66     #     wallet_class, types
     67 
     68     firmware_URL = 'https://safe-t.io'
     69     libraries_URL = 'https://github.com/archos-safe-t/python-safet'
     70     minimum_firmware = (1, 0, 5)
     71     keystore_class = SafeTKeyStore
     72     minimum_library = (0, 1, 0)
     73     SUPPORTED_XTYPES = ('standard', 'p2wpkh-p2sh', 'p2wpkh', 'p2wsh-p2sh', 'p2wsh')
     74 
     75     MAX_LABEL_LEN = 32
     76 
     77     def __init__(self, parent, config, name):
     78         HW_PluginBase.__init__(self, parent, config, name)
     79 
     80         self.libraries_available = self.check_libraries_available()
     81         if not self.libraries_available:
     82             return
     83 
     84         from . import client
     85         from . import transport
     86         import safetlib.messages
     87         self.client_class = client.SafeTClient
     88         self.types = safetlib.messages
     89         self.DEVICE_IDS = ('Safe-T mini',)
     90 
     91         self.transport_handler = transport.SafeTTransport()
     92         self.device_manager().register_enumerate_func(self.enumerate)
     93 
     94     def get_library_version(self):
     95         import safetlib
     96         try:
     97             return safetlib.__version__
     98         except AttributeError:
     99             return 'unknown'
    100 
    101     @runs_in_hwd_thread
    102     def enumerate(self):
    103         devices = self.transport_handler.enumerate_devices()
    104         return [Device(path=d.get_path(),
    105                        interface_number=-1,
    106                        id_=d.get_path(),
    107                        product_key='Safe-T mini',
    108                        usage_page=0,
    109                        transport_ui_string=d.get_path())
    110                 for d in devices]
    111 
    112     @runs_in_hwd_thread
    113     def create_client(self, device, handler):
    114         try:
    115             self.logger.info(f"connecting to device at {device.path}")
    116             transport = self.transport_handler.get_transport(device.path)
    117         except BaseException as e:
    118             self.logger.info(f"cannot connect at {device.path} {e}")
    119             return None
    120 
    121         if not transport:
    122             self.logger.info(f"cannot connect at {device.path}")
    123             return
    124 
    125         self.logger.info(f"connected to device at {device.path}")
    126         client = self.client_class(transport, handler, self)
    127 
    128         # Try a ping for device sanity
    129         try:
    130             client.ping('t')
    131         except BaseException as e:
    132             self.logger.info(f"ping failed {e}")
    133             return None
    134 
    135         if not client.atleast_version(*self.minimum_firmware):
    136             msg = (_('Outdated {} firmware for device labelled {}. Please '
    137                      'download the updated firmware from {}')
    138                    .format(self.device, client.label(), self.firmware_URL))
    139             self.logger.info(msg)
    140             if handler:
    141                 handler.show_error(msg)
    142             else:
    143                 raise UserFacingException(msg)
    144             return None
    145 
    146         return client
    147 
    148     @runs_in_hwd_thread
    149     def get_client(self, keystore, force_pair=True, *,
    150                    devices=None, allow_user_interaction=True) -> Optional['SafeTClient']:
    151         client = super().get_client(keystore, force_pair,
    152                                     devices=devices,
    153                                     allow_user_interaction=allow_user_interaction)
    154         # returns the client for a given keystore. can use xpub
    155         if client:
    156             client.used()
    157         return client
    158 
    159     def get_coin_name(self):
    160         return "Testnet" if constants.net.TESTNET else "Bitcoin"
    161 
    162     def initialize_device(self, device_id, wizard, handler):
    163         # Initialization method
    164         msg = _("Choose how you want to initialize your {}.\n\n"
    165                 "The first two methods are secure as no secret information "
    166                 "is entered into your computer.\n\n"
    167                 "For the last two methods you input secrets on your keyboard "
    168                 "and upload them to your {}, and so you should "
    169                 "only do those on a computer you know to be trustworthy "
    170                 "and free of malware."
    171         ).format(self.device, self.device)
    172         choices = [
    173             # Must be short as QT doesn't word-wrap radio button text
    174             (TIM_NEW, _("Let the device generate a completely new seed randomly")),
    175             (TIM_RECOVER, _("Recover from a seed you have previously written down")),
    176             (TIM_MNEMONIC, _("Upload a BIP39 mnemonic to generate the seed")),
    177             (TIM_PRIVKEY, _("Upload a master private key"))
    178         ]
    179         def f(method):
    180             import threading
    181             settings = self.request_safe_t_init_settings(wizard, method, self.device)
    182             t = threading.Thread(target=self._initialize_device_safe, args=(settings, method, device_id, wizard, handler))
    183             t.setDaemon(True)
    184             t.start()
    185             exit_code = wizard.loop.exec_()
    186             if exit_code != 0:
    187                 # this method (initialize_device) was called with the expectation
    188                 # of leaving the device in an initialized state when finishing.
    189                 # signal that this is not the case:
    190                 raise UserCancelled()
    191         wizard.choice_dialog(title=_('Initialize Device'), message=msg, choices=choices, run_next=f)
    192 
    193     def _initialize_device_safe(self, settings, method, device_id, wizard, handler):
    194         exit_code = 0
    195         try:
    196             self._initialize_device(settings, method, device_id, wizard, handler)
    197         except UserCancelled:
    198             exit_code = 1
    199         except BaseException as e:
    200             self.logger.exception('')
    201             handler.show_error(repr(e))
    202             exit_code = 1
    203         finally:
    204             wizard.loop.exit(exit_code)
    205 
    206     @runs_in_hwd_thread
    207     def _initialize_device(self, settings, method, device_id, wizard, handler):
    208         item, label, pin_protection, passphrase_protection = settings
    209 
    210         if method == TIM_RECOVER:
    211             handler.show_error(_(
    212                 "You will be asked to enter 24 words regardless of your "
    213                 "seed's actual length.  If you enter a word incorrectly or "
    214                 "misspell it, you cannot change it or go back - you will need "
    215                 "to start again from the beginning.\n\nSo please enter "
    216                 "the words carefully!"),
    217                 blocking=True)
    218 
    219         language = 'english'
    220         devmgr = self.device_manager()
    221         client = devmgr.client_by_id(device_id)
    222         if not client:
    223             raise Exception(_("The device was disconnected."))
    224 
    225         if method == TIM_NEW:
    226             strength = 64 * (item + 2)  # 128, 192 or 256
    227             u2f_counter = 0
    228             skip_backup = False
    229             client.reset_device(True, strength, passphrase_protection,
    230                                 pin_protection, label, language,
    231                                 u2f_counter, skip_backup)
    232         elif method == TIM_RECOVER:
    233             word_count = 6 * (item + 2)  # 12, 18 or 24
    234             client.step = 0
    235             client.recovery_device(word_count, passphrase_protection,
    236                                        pin_protection, label, language)
    237         elif method == TIM_MNEMONIC:
    238             pin = pin_protection  # It's the pin, not a boolean
    239             client.load_device_by_mnemonic(str(item), pin,
    240                                            passphrase_protection,
    241                                            label, language)
    242         else:
    243             pin = pin_protection  # It's the pin, not a boolean
    244             client.load_device_by_xprv(item, pin, passphrase_protection,
    245                                        label, language)
    246 
    247     def _make_node_path(self, xpub, address_n):
    248         bip32node = BIP32Node.from_xkey(xpub)
    249         node = self.types.HDNodeType(
    250             depth=bip32node.depth,
    251             fingerprint=int.from_bytes(bip32node.fingerprint, 'big'),
    252             child_num=int.from_bytes(bip32node.child_number, 'big'),
    253             chain_code=bip32node.chaincode,
    254             public_key=bip32node.eckey.get_public_key_bytes(compressed=True),
    255         )
    256         return self.types.HDNodePathType(node=node, address_n=address_n)
    257 
    258     def setup_device(self, device_info, wizard, purpose):
    259         device_id = device_info.device.id_
    260         client = self.scan_and_create_client_for_device(device_id=device_id, wizard=wizard)
    261         if not device_info.initialized:
    262             self.initialize_device(device_id, wizard, client.handler)
    263         wizard.run_task_without_blocking_gui(
    264             task=lambda: client.get_xpub("m", 'standard'))
    265         client.used()
    266         return client
    267 
    268     def get_xpub(self, device_id, derivation, xtype, wizard):
    269         if xtype not in self.SUPPORTED_XTYPES:
    270             raise ScriptTypeNotSupported(_('This type of script is not supported with {}.').format(self.device))
    271         client = self.scan_and_create_client_for_device(device_id=device_id, wizard=wizard)
    272         xpub = client.get_xpub(derivation, xtype)
    273         client.used()
    274         return xpub
    275 
    276     def get_safet_input_script_type(self, electrum_txin_type: str):
    277         if electrum_txin_type in ('p2wpkh', 'p2wsh'):
    278             return self.types.InputScriptType.SPENDWITNESS
    279         if electrum_txin_type in ('p2wpkh-p2sh', 'p2wsh-p2sh'):
    280             return self.types.InputScriptType.SPENDP2SHWITNESS
    281         if electrum_txin_type in ('p2pkh', ):
    282             return self.types.InputScriptType.SPENDADDRESS
    283         if electrum_txin_type in ('p2sh', ):
    284             return self.types.InputScriptType.SPENDMULTISIG
    285         raise ValueError('unexpected txin type: {}'.format(electrum_txin_type))
    286 
    287     def get_safet_output_script_type(self, electrum_txin_type: str):
    288         if electrum_txin_type in ('p2wpkh', 'p2wsh'):
    289             return self.types.OutputScriptType.PAYTOWITNESS
    290         if electrum_txin_type in ('p2wpkh-p2sh', 'p2wsh-p2sh'):
    291             return self.types.OutputScriptType.PAYTOP2SHWITNESS
    292         if electrum_txin_type in ('p2pkh', ):
    293             return self.types.OutputScriptType.PAYTOADDRESS
    294         if electrum_txin_type in ('p2sh', ):
    295             return self.types.OutputScriptType.PAYTOMULTISIG
    296         raise ValueError('unexpected txin type: {}'.format(electrum_txin_type))
    297 
    298     @runs_in_hwd_thread
    299     def sign_transaction(self, keystore, tx: PartialTransaction, prev_tx):
    300         self.prev_tx = prev_tx
    301         client = self.get_client(keystore)
    302         inputs = self.tx_inputs(tx, for_sig=True, keystore=keystore)
    303         outputs = self.tx_outputs(tx, keystore=keystore)
    304         signatures = client.sign_tx(self.get_coin_name(), inputs, outputs,
    305                                     lock_time=tx.locktime, version=tx.version)[0]
    306         signatures = [(bh2u(x) + '01') for x in signatures]
    307         tx.update_signatures(signatures)
    308 
    309     @runs_in_hwd_thread
    310     def show_address(self, wallet, address, keystore=None):
    311         if keystore is None:
    312             keystore = wallet.get_keystore()
    313         if not self.show_address_helper(wallet, address, keystore):
    314             return
    315         client = self.get_client(keystore)
    316         if not client.atleast_version(1, 0):
    317             keystore.handler.show_error(_("Your device firmware is too old"))
    318             return
    319         deriv_suffix = wallet.get_address_index(address)
    320         derivation = keystore.get_derivation_prefix()
    321         address_path = "%s/%d/%d"%(derivation, *deriv_suffix)
    322         address_n = client.expand_path(address_path)
    323         script_type = self.get_safet_input_script_type(wallet.txin_type)
    324 
    325         # prepare multisig, if available:
    326         xpubs = wallet.get_master_public_keys()
    327         if len(xpubs) > 1:
    328             pubkeys = wallet.get_public_keys(address)
    329             # sort xpubs using the order of pubkeys
    330             sorted_pairs = sorted(zip(pubkeys, xpubs))
    331             multisig = self._make_multisig(
    332                 wallet.m,
    333                 [(xpub, deriv_suffix) for pubkey, xpub in sorted_pairs])
    334         else:
    335             multisig = None
    336 
    337         client.get_address(self.get_coin_name(), address_n, True, multisig=multisig, script_type=script_type)
    338 
    339     def tx_inputs(self, tx: Transaction, *, for_sig=False, keystore: 'SafeTKeyStore' = None):
    340         inputs = []
    341         for txin in tx.inputs():
    342             txinputtype = self.types.TxInputType()
    343             if txin.is_coinbase_input():
    344                 prev_hash = b"\x00"*32
    345                 prev_index = 0xffffffff  # signed int -1
    346             else:
    347                 if for_sig:
    348                     assert isinstance(tx, PartialTransaction)
    349                     assert isinstance(txin, PartialTxInput)
    350                     assert keystore
    351                     if len(txin.pubkeys) > 1:
    352                         xpubs_and_deriv_suffixes = get_xpubs_and_der_suffixes_from_txinout(tx, txin)
    353                         multisig = self._make_multisig(txin.num_sig, xpubs_and_deriv_suffixes)
    354                     else:
    355                         multisig = None
    356                     script_type = self.get_safet_input_script_type(txin.script_type)
    357                     txinputtype = self.types.TxInputType(
    358                         script_type=script_type,
    359                         multisig=multisig)
    360                     my_pubkey, full_path = keystore.find_my_pubkey_in_txinout(txin)
    361                     if full_path:
    362                         txinputtype._extend_address_n(full_path)
    363 
    364                 prev_hash = txin.prevout.txid
    365                 prev_index = txin.prevout.out_idx
    366 
    367             if txin.value_sats() is not None:
    368                 txinputtype.amount = txin.value_sats()
    369             txinputtype.prev_hash = prev_hash
    370             txinputtype.prev_index = prev_index
    371 
    372             if txin.script_sig is not None:
    373                 txinputtype.script_sig = txin.script_sig
    374 
    375             txinputtype.sequence = txin.nsequence
    376 
    377             inputs.append(txinputtype)
    378 
    379         return inputs
    380 
    381     def _make_multisig(self, m, xpubs):
    382         if len(xpubs) == 1:
    383             return None
    384         pubkeys = [self._make_node_path(xpub, deriv) for xpub, deriv in xpubs]
    385         return self.types.MultisigRedeemScriptType(
    386             pubkeys=pubkeys,
    387             signatures=[b''] * len(pubkeys),
    388             m=m)
    389 
    390     def tx_outputs(self, tx: PartialTransaction, *, keystore: 'SafeTKeyStore'):
    391 
    392         def create_output_by_derivation():
    393             script_type = self.get_safet_output_script_type(txout.script_type)
    394             if len(txout.pubkeys) > 1:
    395                 xpubs_and_deriv_suffixes = get_xpubs_and_der_suffixes_from_txinout(tx, txout)
    396                 multisig = self._make_multisig(txout.num_sig, xpubs_and_deriv_suffixes)
    397             else:
    398                 multisig = None
    399             my_pubkey, full_path = keystore.find_my_pubkey_in_txinout(txout)
    400             assert full_path
    401             txoutputtype = self.types.TxOutputType(
    402                 multisig=multisig,
    403                 amount=txout.value,
    404                 address_n=full_path,
    405                 script_type=script_type)
    406             return txoutputtype
    407 
    408         def create_output_by_address():
    409             txoutputtype = self.types.TxOutputType()
    410             txoutputtype.amount = txout.value
    411             if address:
    412                 txoutputtype.script_type = self.types.OutputScriptType.PAYTOADDRESS
    413                 txoutputtype.address = address
    414             else:
    415                 txoutputtype.script_type = self.types.OutputScriptType.PAYTOOPRETURN
    416                 txoutputtype.op_return_data = trezor_validate_op_return_output_and_get_data(txout)
    417             return txoutputtype
    418 
    419         outputs = []
    420         has_change = False
    421         any_output_on_change_branch = is_any_tx_output_on_change_branch(tx)
    422 
    423         for txout in tx.outputs():
    424             address = txout.address
    425             use_create_by_derivation = False
    426 
    427             if txout.is_mine and not has_change:
    428                 # prioritise hiding outputs on the 'change' branch from user
    429                 # because no more than one change address allowed
    430                 # note: ^ restriction can be removed once we require fw
    431                 # that has https://github.com/trezor/trezor-mcu/pull/306
    432                 if txout.is_change == any_output_on_change_branch:
    433                     use_create_by_derivation = True
    434                     has_change = True
    435 
    436             if use_create_by_derivation:
    437                 txoutputtype = create_output_by_derivation()
    438             else:
    439                 txoutputtype = create_output_by_address()
    440             outputs.append(txoutputtype)
    441 
    442         return outputs
    443 
    444     def electrum_tx_to_txtype(self, tx: Optional[Transaction]):
    445         t = self.types.TransactionType()
    446         if tx is None:
    447             # probably for segwit input and we don't need this prev txn
    448             return t
    449         tx.deserialize()
    450         t.version = tx.version
    451         t.lock_time = tx.locktime
    452         inputs = self.tx_inputs(tx)
    453         t._extend_inputs(inputs)
    454         for out in tx.outputs():
    455             o = t._add_bin_outputs()
    456             o.amount = out.value
    457             o.script_pubkey = out.scriptpubkey
    458         return t
    459 
    460     # This function is called from the TREZOR libraries (via tx_api)
    461     def get_tx(self, tx_hash):
    462         tx = self.prev_tx[tx_hash]
    463         return self.electrum_tx_to_txtype(tx)