electrum

Electrum Bitcoin wallet
git clone https://git.parazyd.org/electrum
Log | Files | Refs | Submodules

ledger.py (32370B)


      1 from struct import pack, unpack
      2 import hashlib
      3 import sys
      4 import traceback
      5 from typing import Optional, Tuple
      6 
      7 from electrum import ecc
      8 from electrum import bip32
      9 from electrum.crypto import hash_160
     10 from electrum.bitcoin import int_to_hex, var_int, is_segwit_script_type, is_b58_address
     11 from electrum.bip32 import BIP32Node, convert_bip32_intpath_to_strpath
     12 from electrum.i18n import _
     13 from electrum.keystore import Hardware_KeyStore
     14 from electrum.transaction import Transaction, PartialTransaction, PartialTxInput, PartialTxOutput
     15 from electrum.wallet import Standard_Wallet
     16 from electrum.util import bfh, bh2u, versiontuple, UserFacingException
     17 from electrum.base_wizard import ScriptTypeNotSupported
     18 from electrum.logging import get_logger
     19 from electrum.plugin import runs_in_hwd_thread, Device
     20 
     21 from ..hw_wallet import HW_PluginBase, HardwareClientBase
     22 from ..hw_wallet.plugin import is_any_tx_output_on_change_branch, validate_op_return_output, LibraryFoundButUnusable
     23 
     24 
     25 _logger = get_logger(__name__)
     26 
     27 
     28 try:
     29     import hid
     30     from btchip.btchipComm import HIDDongleHIDAPI, DongleWait
     31     from btchip.btchip import btchip
     32     from btchip.btchipUtils import compress_public_key,format_transaction, get_regular_input_script, get_p2sh_input_script
     33     from btchip.bitcoinTransaction import bitcoinTransaction
     34     from btchip.btchipFirmwareWizard import checkFirmware, updateFirmware
     35     from btchip.btchipException import BTChipException
     36     BTCHIP = True
     37     BTCHIP_DEBUG = False
     38 except ImportError as e:
     39     if not (isinstance(e, ModuleNotFoundError) and e.name == 'btchip'):
     40         _logger.exception('error importing ledger plugin deps')
     41     BTCHIP = False
     42 
     43 MSG_NEEDS_FW_UPDATE_GENERIC = _('Firmware version too old. Please update at') + \
     44                       ' https://www.ledgerwallet.com'
     45 MSG_NEEDS_FW_UPDATE_SEGWIT = _('Firmware version (or "Bitcoin" app) too old for Segwit support. Please update at') + \
     46                       ' https://www.ledgerwallet.com'
     47 MULTI_OUTPUT_SUPPORT = '1.1.4'
     48 SEGWIT_SUPPORT = '1.1.10'
     49 SEGWIT_SUPPORT_SPECIAL = '1.0.4'
     50 SEGWIT_TRUSTEDINPUTS = '1.4.0'
     51 
     52 
     53 def test_pin_unlocked(func):
     54     """Function decorator to test the Ledger for being unlocked, and if not,
     55     raise a human-readable exception.
     56     """
     57     def catch_exception(self, *args, **kwargs):
     58         try:
     59             return func(self, *args, **kwargs)
     60         except BTChipException as e:
     61             if e.sw == 0x6982:
     62                 raise UserFacingException(_('Your Ledger is locked. Please unlock it.'))
     63             else:
     64                 raise
     65     return catch_exception
     66 
     67 
     68 class Ledger_Client(HardwareClientBase):
     69     def __init__(self, hidDevice, *, product_key: Tuple[int, int],
     70                  plugin: HW_PluginBase):
     71         HardwareClientBase.__init__(self, plugin=plugin)
     72         self.dongleObject = btchip(hidDevice)
     73         self.preflightDone = False
     74         self._product_key = product_key
     75         self._soft_device_id = None
     76 
     77     def is_pairable(self):
     78         return True
     79 
     80     @runs_in_hwd_thread
     81     def close(self):
     82         self.dongleObject.dongle.close()
     83 
     84     def is_initialized(self):
     85         return True
     86 
     87     @runs_in_hwd_thread
     88     def get_soft_device_id(self):
     89         if self._soft_device_id is None:
     90             # modern ledger can provide xpub without user interaction
     91             # (hw1 would prompt for PIN)
     92             if not self.is_hw1():
     93                 self._soft_device_id = self.request_root_fingerprint_from_device()
     94         return self._soft_device_id
     95 
     96     def is_hw1(self) -> bool:
     97         return self._product_key[0] == 0x2581
     98 
     99     def device_model_name(self):
    100         return LedgerPlugin.device_name_from_product_key(self._product_key)
    101 
    102     @runs_in_hwd_thread
    103     def has_usable_connection_with_device(self):
    104         try:
    105             self.dongleObject.getFirmwareVersion()
    106         except BaseException:
    107             return False
    108         return True
    109 
    110     @runs_in_hwd_thread
    111     @test_pin_unlocked
    112     def get_xpub(self, bip32_path, xtype):
    113         self.checkDevice()
    114         # bip32_path is of the form 44'/0'/1'
    115         # S-L-O-W - we don't handle the fingerprint directly, so compute
    116         # it manually from the previous node
    117         # This only happens once so it's bearable
    118         #self.get_client() # prompt for the PIN before displaying the dialog if necessary
    119         #self.handler.show_message("Computing master public key")
    120         if xtype in ['p2wpkh', 'p2wsh'] and not self.supports_native_segwit():
    121             raise UserFacingException(MSG_NEEDS_FW_UPDATE_SEGWIT)
    122         if xtype in ['p2wpkh-p2sh', 'p2wsh-p2sh'] and not self.supports_segwit():
    123             raise UserFacingException(MSG_NEEDS_FW_UPDATE_SEGWIT)
    124         bip32_path = bip32.normalize_bip32_derivation(bip32_path)
    125         bip32_intpath = bip32.convert_bip32_path_to_list_of_uint32(bip32_path)
    126         bip32_path = bip32_path[2:]  # cut off "m/"
    127         if len(bip32_intpath) >= 1:
    128             prevPath = bip32.convert_bip32_intpath_to_strpath(bip32_intpath[:-1])[2:]
    129             nodeData = self.dongleObject.getWalletPublicKey(prevPath)
    130             publicKey = compress_public_key(nodeData['publicKey'])
    131             fingerprint_bytes = hash_160(publicKey)[0:4]
    132             childnum_bytes = bip32_intpath[-1].to_bytes(length=4, byteorder="big")
    133         else:
    134             fingerprint_bytes = bytes(4)
    135             childnum_bytes = bytes(4)
    136         nodeData = self.dongleObject.getWalletPublicKey(bip32_path)
    137         publicKey = compress_public_key(nodeData['publicKey'])
    138         depth = len(bip32_intpath)
    139         return BIP32Node(xtype=xtype,
    140                          eckey=ecc.ECPubkey(bytes(publicKey)),
    141                          chaincode=nodeData['chainCode'],
    142                          depth=depth,
    143                          fingerprint=fingerprint_bytes,
    144                          child_number=childnum_bytes).to_xpub()
    145 
    146     def has_detached_pin_support(self, client):
    147         try:
    148             client.getVerifyPinRemainingAttempts()
    149             return True
    150         except BTChipException as e:
    151             if e.sw == 0x6d00:
    152                 return False
    153             raise e
    154 
    155     def is_pin_validated(self, client):
    156         try:
    157             # Invalid SET OPERATION MODE to verify the PIN status
    158             client.dongle.exchange(bytearray([0xe0, 0x26, 0x00, 0x00, 0x01, 0xAB]))
    159         except BTChipException as e:
    160             if (e.sw == 0x6982):
    161                 return False
    162             if (e.sw == 0x6A80):
    163                 return True
    164             raise e
    165 
    166     def supports_multi_output(self):
    167         return self.multiOutputSupported
    168 
    169     def supports_segwit(self):
    170         return self.segwitSupported
    171 
    172     def supports_native_segwit(self):
    173         return self.nativeSegwitSupported
    174 
    175     def supports_segwit_trustedInputs(self):
    176         return self.segwitTrustedInputs
    177 
    178     @runs_in_hwd_thread
    179     def perform_hw1_preflight(self):
    180         try:
    181             firmwareInfo = self.dongleObject.getFirmwareVersion()
    182             firmware = firmwareInfo['version']
    183             self.multiOutputSupported = versiontuple(firmware) >= versiontuple(MULTI_OUTPUT_SUPPORT)
    184             self.nativeSegwitSupported = versiontuple(firmware) >= versiontuple(SEGWIT_SUPPORT)
    185             self.segwitSupported = self.nativeSegwitSupported or (firmwareInfo['specialVersion'] == 0x20 and versiontuple(firmware) >= versiontuple(SEGWIT_SUPPORT_SPECIAL))
    186             self.segwitTrustedInputs = versiontuple(firmware) >= versiontuple(SEGWIT_TRUSTEDINPUTS)
    187 
    188             if not checkFirmware(firmwareInfo):
    189                 self.close()
    190                 raise UserFacingException(MSG_NEEDS_FW_UPDATE_GENERIC)
    191             try:
    192                 self.dongleObject.getOperationMode()
    193             except BTChipException as e:
    194                 if (e.sw == 0x6985):
    195                     self.close()
    196                     self.handler.get_setup( )
    197                     # Acquire the new client on the next run
    198                 else:
    199                     raise e
    200             if self.has_detached_pin_support(self.dongleObject) and not self.is_pin_validated(self.dongleObject):
    201                 assert self.handler, "no handler for client"
    202                 remaining_attempts = self.dongleObject.getVerifyPinRemainingAttempts()
    203                 if remaining_attempts != 1:
    204                     msg = "Enter your Ledger PIN - remaining attempts : " + str(remaining_attempts)
    205                 else:
    206                     msg = "Enter your Ledger PIN - WARNING : LAST ATTEMPT. If the PIN is not correct, the dongle will be wiped."
    207                 confirmed, p, pin = self.password_dialog(msg)
    208                 if not confirmed:
    209                     raise UserFacingException('Aborted by user - please unplug the dongle and plug it again before retrying')
    210                 pin = pin.encode()
    211                 self.dongleObject.verifyPin(pin)
    212         except BTChipException as e:
    213             if (e.sw == 0x6faa):
    214                 raise UserFacingException("Dongle is temporarily locked - please unplug it and replug it again")
    215             if ((e.sw & 0xFFF0) == 0x63c0):
    216                 raise UserFacingException("Invalid PIN - please unplug the dongle and plug it again before retrying")
    217             if e.sw == 0x6f00 and e.message == 'Invalid channel':
    218                 # based on docs 0x6f00 might be a more general error, hence we also compare message to be sure
    219                 raise UserFacingException("Invalid channel.\n"
    220                                           "Please make sure that 'Browser support' is disabled on your device.")
    221             raise e
    222 
    223     @runs_in_hwd_thread
    224     def checkDevice(self):
    225         if not self.preflightDone:
    226             try:
    227                 self.perform_hw1_preflight()
    228             except BTChipException as e:
    229                 if (e.sw == 0x6d00 or e.sw == 0x6700):
    230                     raise UserFacingException(_("Device not in Bitcoin mode")) from e
    231                 raise e
    232             self.preflightDone = True
    233 
    234     def password_dialog(self, msg=None):
    235         response = self.handler.get_word(msg)
    236         if response is None:
    237             return False, None, None
    238         return True, response, response
    239 
    240 
    241 class Ledger_KeyStore(Hardware_KeyStore):
    242     hw_type = 'ledger'
    243     device = 'Ledger'
    244 
    245     plugin: 'LedgerPlugin'
    246 
    247     def __init__(self, d):
    248         Hardware_KeyStore.__init__(self, d)
    249         # Errors and other user interaction is done through the wallet's
    250         # handler.  The handler is per-window and preserved across
    251         # device reconnects
    252         self.force_watching_only = False
    253         self.signing = False
    254         self.cfg = d.get('cfg', {'mode': 0})
    255 
    256     def dump(self):
    257         obj = Hardware_KeyStore.dump(self)
    258         obj['cfg'] = self.cfg
    259         return obj
    260 
    261     def get_client(self):
    262         return self.plugin.get_client(self).dongleObject
    263 
    264     def get_client_electrum(self) -> Optional[Ledger_Client]:
    265         return self.plugin.get_client(self)
    266 
    267     def give_error(self, message, clear_client = False):
    268         _logger.info(message)
    269         if not self.signing:
    270             self.handler.show_error(message)
    271         else:
    272             self.signing = False
    273         if clear_client:
    274             self.client = None
    275         raise UserFacingException(message)
    276 
    277     def set_and_unset_signing(func):
    278         """Function decorator to set and unset self.signing."""
    279         def wrapper(self, *args, **kwargs):
    280             try:
    281                 self.signing = True
    282                 return func(self, *args, **kwargs)
    283             finally:
    284                 self.signing = False
    285         return wrapper
    286 
    287     def decrypt_message(self, pubkey, message, password):
    288         raise UserFacingException(_('Encryption and decryption are currently not supported for {}').format(self.device))
    289 
    290     @runs_in_hwd_thread
    291     @test_pin_unlocked
    292     @set_and_unset_signing
    293     def sign_message(self, sequence, message, password):
    294         message = message.encode('utf8')
    295         message_hash = hashlib.sha256(message).hexdigest().upper()
    296         # prompt for the PIN before displaying the dialog if necessary
    297         client_ledger = self.get_client()
    298         client_electrum = self.get_client_electrum()
    299         address_path = self.get_derivation_prefix()[2:] + "/%d/%d"%sequence
    300         self.handler.show_message("Signing message ...\r\nMessage hash: "+message_hash)
    301         try:
    302             info = client_ledger.signMessagePrepare(address_path, message)
    303             pin = ""
    304             if info['confirmationNeeded']:
    305                 # do the authenticate dialog and get pin:
    306                 pin = self.handler.get_auth(info, client=client_electrum)
    307                 if not pin:
    308                     raise UserWarning(_('Cancelled by user'))
    309                 pin = str(pin).encode()
    310             signature = client_ledger.signMessageSign(pin)
    311         except BTChipException as e:
    312             if e.sw == 0x6a80:
    313                 self.give_error("Unfortunately, this message cannot be signed by the Ledger wallet. Only alphanumerical messages shorter than 140 characters are supported. Please remove any extra characters (tab, carriage return) and retry.")
    314             elif e.sw == 0x6985:  # cancelled by user
    315                 return b''
    316             elif e.sw == 0x6982:
    317                 raise  # pin lock. decorator will catch it
    318             else:
    319                 self.give_error(e, True)
    320         except UserWarning:
    321             self.handler.show_error(_('Cancelled by user'))
    322             return b''
    323         except Exception as e:
    324             self.give_error(e, True)
    325         finally:
    326             self.handler.finished()
    327         # Parse the ASN.1 signature
    328         rLength = signature[3]
    329         r = signature[4 : 4 + rLength]
    330         sLength = signature[4 + rLength + 1]
    331         s = signature[4 + rLength + 2:]
    332         if rLength == 33:
    333             r = r[1:]
    334         if sLength == 33:
    335             s = s[1:]
    336         # And convert it
    337 
    338         # Pad r and s points with 0x00 bytes when the point is small to get valid signature.
    339         r_padded = bytes([0x00]) * (32 - len(r)) + r
    340         s_padded = bytes([0x00]) * (32 - len(s)) + s
    341         
    342         return bytes([27 + 4 + (signature[0] & 0x01)]) + r_padded + s_padded
    343 
    344     @runs_in_hwd_thread
    345     @test_pin_unlocked
    346     @set_and_unset_signing
    347     def sign_transaction(self, tx, password):
    348         if tx.is_complete():
    349             return
    350         inputs = []
    351         inputsPaths = []
    352         chipInputs = []
    353         redeemScripts = []
    354         changePath = ""
    355         output = None
    356         p2shTransaction = False
    357         segwitTransaction = False
    358         pin = ""
    359         client_ledger = self.get_client() # prompt for the PIN before displaying the dialog if necessary
    360         client_electrum = self.get_client_electrum()
    361         assert client_electrum
    362 
    363         # Fetch inputs of the transaction to sign
    364         for txin in tx.inputs():
    365             if txin.is_coinbase_input():
    366                 self.give_error("Coinbase not supported")     # should never happen
    367 
    368             if txin.script_type in ['p2sh']:
    369                 p2shTransaction = True
    370 
    371             if txin.script_type in ['p2wpkh-p2sh', 'p2wsh-p2sh']:
    372                 if not client_electrum.supports_segwit():
    373                     self.give_error(MSG_NEEDS_FW_UPDATE_SEGWIT)
    374                 segwitTransaction = True
    375 
    376             if txin.script_type in ['p2wpkh', 'p2wsh']:
    377                 if not client_electrum.supports_native_segwit():
    378                     self.give_error(MSG_NEEDS_FW_UPDATE_SEGWIT)
    379                 segwitTransaction = True
    380 
    381             my_pubkey, full_path = self.find_my_pubkey_in_txinout(txin)
    382             if not full_path:
    383                 self.give_error("No matching pubkey for sign_transaction")  # should never happen
    384             full_path = convert_bip32_intpath_to_strpath(full_path)[2:]
    385 
    386             redeemScript = Transaction.get_preimage_script(txin)
    387             txin_prev_tx = txin.utxo
    388             if txin_prev_tx is None and not txin.is_segwit():
    389                 raise UserFacingException(_('Missing previous tx for legacy input.'))
    390             txin_prev_tx_raw = txin_prev_tx.serialize() if txin_prev_tx else None
    391             inputs.append([txin_prev_tx_raw,
    392                            txin.prevout.out_idx,
    393                            redeemScript,
    394                            txin.prevout.txid.hex(),
    395                            my_pubkey,
    396                            txin.nsequence,
    397                            txin.value_sats()])
    398             inputsPaths.append(full_path)
    399 
    400         # Sanity check
    401         if p2shTransaction:
    402             for txin in tx.inputs():
    403                 if txin.script_type != 'p2sh':
    404                     self.give_error("P2SH / regular input mixed in same transaction not supported") # should never happen
    405 
    406         txOutput = var_int(len(tx.outputs()))
    407         for o in tx.outputs():
    408             txOutput += int_to_hex(o.value, 8)
    409             script = o.scriptpubkey.hex()
    410             txOutput += var_int(len(script)//2)
    411             txOutput += script
    412         txOutput = bfh(txOutput)
    413 
    414         if not client_electrum.supports_multi_output():
    415             if len(tx.outputs()) > 2:
    416                 self.give_error("Transaction with more than 2 outputs not supported")
    417         for txout in tx.outputs():
    418             if client_electrum.is_hw1() and txout.address and not is_b58_address(txout.address):
    419                 self.give_error(_("This {} device can only send to base58 addresses.").format(self.device))
    420             if not txout.address:
    421                 if client_electrum.is_hw1():
    422                     self.give_error(_("Only address outputs are supported by {}").format(self.device))
    423                 # note: max_size based on https://github.com/LedgerHQ/ledger-app-btc/commit/3a78dee9c0484821df58975803e40d58fbfc2c38#diff-c61ccd96a6d8b54d48f54a3bc4dfa7e2R26
    424                 validate_op_return_output(txout, max_size=190)
    425 
    426         # Output "change" detection
    427         # - only one output and one change is authorized (for hw.1 and nano)
    428         # - at most one output can bypass confirmation (~change) (for all)
    429         if not p2shTransaction:
    430             has_change = False
    431             any_output_on_change_branch = is_any_tx_output_on_change_branch(tx)
    432             for txout in tx.outputs():
    433                 if txout.is_mine and len(tx.outputs()) > 1 \
    434                         and not has_change:
    435                     # prioritise hiding outputs on the 'change' branch from user
    436                     # because no more than one change address allowed
    437                     if txout.is_change == any_output_on_change_branch:
    438                         my_pubkey, changePath = self.find_my_pubkey_in_txinout(txout)
    439                         assert changePath
    440                         changePath = convert_bip32_intpath_to_strpath(changePath)[2:]
    441                         has_change = True
    442                     else:
    443                         output = txout.address
    444                 else:
    445                     output = txout.address
    446 
    447         self.handler.show_message(_("Confirm Transaction on your Ledger device..."))
    448         try:
    449             # Get trusted inputs from the original transactions
    450             for utxo in inputs:
    451                 sequence = int_to_hex(utxo[5], 4)
    452                 if segwitTransaction and not client_electrum.supports_segwit_trustedInputs():
    453                     tmp = bfh(utxo[3])[::-1]
    454                     tmp += bfh(int_to_hex(utxo[1], 4))
    455                     tmp += bfh(int_to_hex(utxo[6], 8))  # txin['value']
    456                     chipInputs.append({'value' : tmp, 'witness' : True, 'sequence' : sequence})
    457                     redeemScripts.append(bfh(utxo[2]))
    458                 elif (not p2shTransaction) or client_electrum.supports_multi_output():
    459                     txtmp = bitcoinTransaction(bfh(utxo[0]))
    460                     trustedInput = client_ledger.getTrustedInput(txtmp, utxo[1])
    461                     trustedInput['sequence'] = sequence
    462                     if segwitTransaction:
    463                         trustedInput['witness'] = True
    464                     chipInputs.append(trustedInput)
    465                     if p2shTransaction or segwitTransaction:
    466                         redeemScripts.append(bfh(utxo[2]))
    467                     else:
    468                         redeemScripts.append(txtmp.outputs[utxo[1]].script)
    469                 else:
    470                     tmp = bfh(utxo[3])[::-1]
    471                     tmp += bfh(int_to_hex(utxo[1], 4))
    472                     chipInputs.append({'value' : tmp, 'sequence' : sequence})
    473                     redeemScripts.append(bfh(utxo[2]))
    474 
    475             # Sign all inputs
    476             firstTransaction = True
    477             inputIndex = 0
    478             rawTx = tx.serialize_to_network()
    479             client_ledger.enableAlternate2fa(False)
    480             if segwitTransaction:
    481                 client_ledger.startUntrustedTransaction(True, inputIndex,
    482                                                             chipInputs, redeemScripts[inputIndex], version=tx.version)
    483                 # we don't set meaningful outputAddress, amount and fees
    484                 # as we only care about the alternateEncoding==True branch
    485                 outputData = client_ledger.finalizeInput(b'', 0, 0, changePath, bfh(rawTx))
    486                 outputData['outputData'] = txOutput
    487                 if outputData['confirmationNeeded']:
    488                     outputData['address'] = output
    489                     self.handler.finished()
    490                     # do the authenticate dialog and get pin:
    491                     pin = self.handler.get_auth(outputData, client=client_electrum)
    492                     if not pin:
    493                         raise UserWarning()
    494                     self.handler.show_message(_("Confirmed. Signing Transaction..."))
    495                 while inputIndex < len(inputs):
    496                     singleInput = [ chipInputs[inputIndex] ]
    497                     client_ledger.startUntrustedTransaction(False, 0,
    498                                                             singleInput, redeemScripts[inputIndex], version=tx.version)
    499                     inputSignature = client_ledger.untrustedHashSign(inputsPaths[inputIndex], pin, lockTime=tx.locktime)
    500                     inputSignature[0] = 0x30 # force for 1.4.9+
    501                     my_pubkey = inputs[inputIndex][4]
    502                     tx.add_signature_to_txin(txin_idx=inputIndex,
    503                                              signing_pubkey=my_pubkey.hex(),
    504                                              sig=inputSignature.hex())
    505                     inputIndex = inputIndex + 1
    506             else:
    507                 while inputIndex < len(inputs):
    508                     client_ledger.startUntrustedTransaction(firstTransaction, inputIndex,
    509                                                                 chipInputs, redeemScripts[inputIndex], version=tx.version)
    510                     # we don't set meaningful outputAddress, amount and fees
    511                     # as we only care about the alternateEncoding==True branch
    512                     outputData = client_ledger.finalizeInput(b'', 0, 0, changePath, bfh(rawTx))
    513                     outputData['outputData'] = txOutput
    514                     if outputData['confirmationNeeded']:
    515                         outputData['address'] = output
    516                         self.handler.finished()
    517                         # do the authenticate dialog and get pin:
    518                         pin = self.handler.get_auth(outputData, client=client_electrum)
    519                         if not pin:
    520                             raise UserWarning()
    521                         self.handler.show_message(_("Confirmed. Signing Transaction..."))
    522                     else:
    523                         # Sign input with the provided PIN
    524                         inputSignature = client_ledger.untrustedHashSign(inputsPaths[inputIndex], pin, lockTime=tx.locktime)
    525                         inputSignature[0] = 0x30 # force for 1.4.9+
    526                         my_pubkey = inputs[inputIndex][4]
    527                         tx.add_signature_to_txin(txin_idx=inputIndex,
    528                                                  signing_pubkey=my_pubkey.hex(),
    529                                                  sig=inputSignature.hex())
    530                         inputIndex = inputIndex + 1
    531                     firstTransaction = False
    532         except UserWarning:
    533             self.handler.show_error(_('Cancelled by user'))
    534             return
    535         except BTChipException as e:
    536             if e.sw in (0x6985, 0x6d00):  # cancelled by user
    537                 return
    538             elif e.sw == 0x6982:
    539                 raise  # pin lock. decorator will catch it
    540             else:
    541                 self.logger.exception('')
    542                 self.give_error(e, True)
    543         except BaseException as e:
    544             self.logger.exception('')
    545             self.give_error(e, True)
    546         finally:
    547             self.handler.finished()
    548 
    549     @runs_in_hwd_thread
    550     @test_pin_unlocked
    551     @set_and_unset_signing
    552     def show_address(self, sequence, txin_type):
    553         client = self.get_client()
    554         address_path = self.get_derivation_prefix()[2:] + "/%d/%d"%sequence
    555         self.handler.show_message(_("Showing address ..."))
    556         segwit = is_segwit_script_type(txin_type)
    557         segwitNative = txin_type == 'p2wpkh'
    558         try:
    559             client.getWalletPublicKey(address_path, showOnScreen=True, segwit=segwit, segwitNative=segwitNative)
    560         except BTChipException as e:
    561             if e.sw == 0x6985:  # cancelled by user
    562                 pass
    563             elif e.sw == 0x6982:
    564                 raise  # pin lock. decorator will catch it
    565             elif e.sw == 0x6b00:  # hw.1 raises this
    566                 self.handler.show_error('{}\n{}\n{}'.format(
    567                     _('Error showing address') + ':',
    568                     e,
    569                     _('Your device might not have support for this functionality.')))
    570             else:
    571                 self.logger.exception('')
    572                 self.handler.show_error(e)
    573         except BaseException as e:
    574             self.logger.exception('')
    575             self.handler.show_error(e)
    576         finally:
    577             self.handler.finished()
    578 
    579 class LedgerPlugin(HW_PluginBase):
    580     keystore_class = Ledger_KeyStore
    581     minimum_library = (0, 1, 32)
    582     client = None
    583     DEVICE_IDS = [
    584                    (0x2581, 0x1807), # HW.1 legacy btchip
    585                    (0x2581, 0x2b7c), # HW.1 transitional production
    586                    (0x2581, 0x3b7c), # HW.1 ledger production
    587                    (0x2581, 0x4b7c), # HW.1 ledger test
    588                    (0x2c97, 0x0000), # Blue
    589                    (0x2c97, 0x0001), # Nano-S
    590                    (0x2c97, 0x0004), # Nano-X
    591                    (0x2c97, 0x0005), # RFU
    592                    (0x2c97, 0x0006), # RFU
    593                    (0x2c97, 0x0007), # RFU
    594                    (0x2c97, 0x0008), # RFU
    595                    (0x2c97, 0x0009), # RFU
    596                    (0x2c97, 0x000a)  # RFU
    597                  ]
    598     VENDOR_IDS = (0x2c97, )
    599     LEDGER_MODEL_IDS = {
    600         0x10: "Ledger Nano S",
    601         0x40: "Ledger Nano X",
    602     }
    603     SUPPORTED_XTYPES = ('standard', 'p2wpkh-p2sh', 'p2wpkh', 'p2wsh-p2sh', 'p2wsh')
    604 
    605     def __init__(self, parent, config, name):
    606         self.segwit = config.get("segwit")
    607         HW_PluginBase.__init__(self, parent, config, name)
    608         self.libraries_available = self.check_libraries_available()
    609         if not self.libraries_available:
    610             return
    611         # to support legacy devices and legacy firmwares
    612         self.device_manager().register_devices(self.DEVICE_IDS, plugin=self)
    613         # to support modern firmware
    614         self.device_manager().register_vendor_ids(self.VENDOR_IDS, plugin=self)
    615 
    616     def get_library_version(self):
    617         try:
    618             import btchip
    619             version = btchip.__version__
    620         except ImportError:
    621             raise
    622         except:
    623             version = "unknown"
    624         if BTCHIP:
    625             return version
    626         else:
    627             raise LibraryFoundButUnusable(library_version=version)
    628 
    629     @classmethod
    630     def _recognize_device(cls, product_key) -> Tuple[bool, Optional[str]]:
    631         """Returns (can_recognize, model_name) tuple."""
    632         # legacy product_keys
    633         if product_key in cls.DEVICE_IDS:
    634             if product_key[0] == 0x2581:
    635                 return True, "Ledger HW.1"
    636             if product_key == (0x2c97, 0x0000):
    637                 return True, "Ledger Blue"
    638             if product_key == (0x2c97, 0x0001):
    639                 return True, "Ledger Nano S"
    640             if product_key == (0x2c97, 0x0004):
    641                 return True, "Ledger Nano X"
    642             return True, None
    643         # modern product_keys
    644         if product_key[0] == 0x2c97:
    645             product_id = product_key[1]
    646             model_id = product_id >> 8
    647             if model_id in cls.LEDGER_MODEL_IDS:
    648                 model_name = cls.LEDGER_MODEL_IDS[model_id]
    649                 return True, model_name
    650         # give up
    651         return False, None
    652 
    653     def can_recognize_device(self, device: Device) -> bool:
    654         return self._recognize_device(device.product_key)[0]
    655 
    656     @classmethod
    657     def device_name_from_product_key(cls, product_key) -> Optional[str]:
    658         return cls._recognize_device(product_key)[1]
    659 
    660     def create_device_from_hid_enumeration(self, d, *, product_key):
    661         device = super().create_device_from_hid_enumeration(d, product_key=product_key)
    662         if not self.can_recognize_device(device):
    663             return None
    664         return device
    665 
    666     @runs_in_hwd_thread
    667     def get_btchip_device(self, device):
    668         ledger = False
    669         if device.product_key[0] == 0x2581 and device.product_key[1] == 0x3b7c:
    670             ledger = True
    671         if device.product_key[0] == 0x2581 and device.product_key[1] == 0x4b7c:
    672             ledger = True
    673         if device.product_key[0] == 0x2c97:
    674             if device.interface_number == 0 or device.usage_page == 0xffa0:
    675                 ledger = True
    676             else:
    677                 return None  # non-compatible interface of a Nano S or Blue
    678         dev = hid.device()
    679         dev.open_path(device.path)
    680         dev.set_nonblocking(True)
    681         return HIDDongleHIDAPI(dev, ledger, BTCHIP_DEBUG)
    682 
    683     @runs_in_hwd_thread
    684     def create_client(self, device, handler):
    685         if handler:
    686             self.handler = handler
    687 
    688         client = self.get_btchip_device(device)
    689         if client is not None:
    690             client = Ledger_Client(client, product_key=device.product_key, plugin=self)
    691         return client
    692 
    693     def setup_device(self, device_info, wizard, purpose):
    694         device_id = device_info.device.id_
    695         client = self.scan_and_create_client_for_device(device_id=device_id, wizard=wizard)
    696         wizard.run_task_without_blocking_gui(
    697             task=lambda: client.get_xpub("m/0'", 'standard'))  # TODO replace by direct derivation once Nano S > 1.1
    698         return client
    699 
    700     def get_xpub(self, device_id, derivation, xtype, wizard):
    701         if xtype not in self.SUPPORTED_XTYPES:
    702             raise ScriptTypeNotSupported(_('This type of script is not supported with {}.').format(self.device))
    703         client = self.scan_and_create_client_for_device(device_id=device_id, wizard=wizard)
    704         client.checkDevice()
    705         xpub = client.get_xpub(derivation, xtype)
    706         return xpub
    707 
    708     @runs_in_hwd_thread
    709     def get_client(self, keystore, force_pair=True, *,
    710                    devices=None, allow_user_interaction=True):
    711         # All client interaction should not be in the main GUI thread
    712         client = super().get_client(keystore, force_pair,
    713                                     devices=devices,
    714                                     allow_user_interaction=allow_user_interaction)
    715         # returns the client for a given keystore. can use xpub
    716         #if client:
    717         #    client.used()
    718         if client is not None:
    719             client.checkDevice()
    720         return client
    721 
    722     @runs_in_hwd_thread
    723     def show_address(self, wallet, address, keystore=None):
    724         if keystore is None:
    725             keystore = wallet.get_keystore()
    726         if not self.show_address_helper(wallet, address, keystore):
    727             return
    728         if type(wallet) is not Standard_Wallet:
    729             keystore.handler.show_error(_('This function is only available for standard wallets when using {}.').format(self.device))
    730             return
    731         sequence = wallet.get_address_index(address)
    732         txin_type = wallet.get_txin_type(address)
    733         keystore.show_address(sequence, txin_type)