electrum

Electrum Bitcoin wallet
git clone https://git.parazyd.org/electrum
Log | Files | Refs | Submodules

digitalbitbox.py (31895B)


      1 # ----------------------------------------------------------------------------------
      2 # Electrum plugin for the Digital Bitbox hardware wallet by Shift Devices AG
      3 # digitalbitbox.com
      4 #
      5 
      6 import base64
      7 import binascii
      8 import hashlib
      9 import hmac
     10 import json
     11 import math
     12 import os
     13 import re
     14 import struct
     15 import sys
     16 import time
     17 import copy
     18 
     19 from electrum.crypto import sha256d, EncodeAES_base64, EncodeAES_bytes, DecodeAES_bytes, hmac_oneshot
     20 from electrum.bitcoin import public_key_to_p2pkh
     21 from electrum.bip32 import BIP32Node, convert_bip32_intpath_to_strpath, is_all_public_derivation
     22 from electrum import ecc
     23 from electrum.ecc import msg_magic
     24 from electrum.wallet import Standard_Wallet
     25 from electrum import constants
     26 from electrum.transaction import Transaction, PartialTransaction, PartialTxInput
     27 from electrum.i18n import _
     28 from electrum.keystore import Hardware_KeyStore
     29 from electrum.util import to_string, UserCancelled, UserFacingException, bfh
     30 from electrum.base_wizard import ScriptTypeNotSupported, HWD_SETUP_NEW_WALLET
     31 from electrum.network import Network
     32 from electrum.logging import get_logger
     33 from electrum.plugin import runs_in_hwd_thread, run_in_hwd_thread
     34 
     35 from ..hw_wallet import HW_PluginBase, HardwareClientBase
     36 
     37 
     38 _logger = get_logger(__name__)
     39 
     40 
     41 try:
     42     import hid
     43     DIGIBOX = True
     44 except ImportError as e:
     45     DIGIBOX = False
     46 
     47 
     48 
     49 # ----------------------------------------------------------------------------------
     50 # USB HID interface
     51 #
     52 
     53 def to_hexstr(s):
     54     return binascii.hexlify(s).decode('ascii')
     55 
     56 
     57 def derive_keys(x):
     58     h = sha256d(x)
     59     h = hashlib.sha512(h).digest()
     60     return (h[:32],h[32:])
     61 
     62 MIN_MAJOR_VERSION = 5
     63 
     64 ENCRYPTION_PRIVKEY_KEY = 'encryptionprivkey'
     65 CHANNEL_ID_KEY = 'comserverchannelid'
     66 
     67 class DigitalBitbox_Client(HardwareClientBase):
     68 
     69     def __init__(self, plugin, hidDevice):
     70         HardwareClientBase.__init__(self, plugin=plugin)
     71         self.dbb_hid = hidDevice
     72         self.opened = True
     73         self.password = None
     74         self.isInitialized = False
     75         self.setupRunning = False
     76         self.usbReportSize = 64 # firmware > v2.0.0
     77 
     78     @runs_in_hwd_thread
     79     def close(self):
     80         if self.opened:
     81             try:
     82                 self.dbb_hid.close()
     83             except:
     84                 pass
     85         self.opened = False
     86 
     87 
     88     def is_pairable(self):
     89         return True
     90 
     91 
     92     def is_initialized(self):
     93         return self.dbb_has_password()
     94 
     95 
     96     def is_paired(self):
     97         return self.password is not None
     98 
     99     def has_usable_connection_with_device(self):
    100         try:
    101             self.dbb_has_password()
    102         except BaseException:
    103             return False
    104         return True
    105 
    106     def _get_xpub(self, bip32_path):
    107         if self.check_device_dialog():
    108             return self.hid_send_encrypt(('{"xpub": "%s"}' % bip32_path).encode('utf8'))
    109 
    110     def get_xpub(self, bip32_path, xtype):
    111         assert xtype in self.plugin.SUPPORTED_XTYPES
    112         reply = self._get_xpub(bip32_path)
    113         if reply:
    114             xpub = reply['xpub']
    115             # Change type of xpub to the requested type. The firmware
    116             # only ever returns the mainnet standard type, but it is agnostic
    117             # to the type when signing.
    118             if xtype != 'standard' or constants.net.TESTNET:
    119                 node = BIP32Node.from_xkey(xpub, net=constants.BitcoinMainnet)
    120                 xpub = node._replace(xtype=xtype).to_xpub()
    121             return xpub
    122         else:
    123             raise Exception('no reply')
    124 
    125     def dbb_has_password(self):
    126         reply = self.hid_send_plain(b'{"ping":""}')
    127         if 'ping' not in reply:
    128             raise UserFacingException(_('Device communication error. Please unplug and replug your Digital Bitbox.'))
    129         if reply['ping'] == 'password':
    130             return True
    131         return False
    132 
    133 
    134     def stretch_key(self, key: bytes):
    135         return to_hexstr(hashlib.pbkdf2_hmac('sha512', key, b'Digital Bitbox', iterations = 20480))
    136 
    137 
    138     def backup_password_dialog(self):
    139         msg = _("Enter the password used when the backup was created:")
    140         while True:
    141             password = self.handler.get_passphrase(msg, False)
    142             if password is None:
    143                 return None
    144             if len(password) < 4:
    145                 msg = _("Password must have at least 4 characters.") \
    146                       + "\n\n" + _("Enter password:")
    147             elif len(password) > 64:
    148                 msg = _("Password must have less than 64 characters.") \
    149                       + "\n\n" + _("Enter password:")
    150             else:
    151                 return password.encode('utf8')
    152 
    153 
    154     def password_dialog(self, msg):
    155         while True:
    156             password = self.handler.get_passphrase(msg, False)
    157             if password is None:
    158                 return False
    159             if len(password) < 4:
    160                 msg = _("Password must have at least 4 characters.") + \
    161                       "\n\n" + _("Enter password:")
    162             elif len(password) > 64:
    163                 msg = _("Password must have less than 64 characters.") + \
    164                       "\n\n" + _("Enter password:")
    165             else:
    166                 self.password = password.encode('utf8')
    167                 return True
    168 
    169     def check_device_dialog(self):
    170         match = re.search(r'v([0-9])+\.[0-9]+\.[0-9]+',
    171                           run_in_hwd_thread(self.dbb_hid.get_serial_number_string))
    172         if match is None:
    173             raise Exception("error detecting firmware version")
    174         major_version = int(match.group(1))
    175         if major_version < MIN_MAJOR_VERSION:
    176             raise Exception("Please upgrade to the newest firmware using the BitBox Desktop app: https://shiftcrypto.ch/start")
    177         # Set password if fresh device
    178         if self.password is None and not self.dbb_has_password():
    179             if not self.setupRunning:
    180                 return False # A fresh device cannot connect to an existing wallet
    181             msg = _("An uninitialized Digital Bitbox is detected.") + " " + \
    182                   _("Enter a new password below.") + "\n\n" + \
    183                   _("REMEMBER THE PASSWORD!") + "\n\n" + \
    184                   _("You cannot access your coins or a backup without the password.") + "\n" + \
    185                   _("A backup is saved automatically when generating a new wallet.")
    186             if self.password_dialog(msg):
    187                 reply = self.hid_send_plain(b'{"password":"' + self.password + b'"}')
    188             else:
    189                 return False
    190 
    191         # Get password from user if not yet set
    192         msg = _("Enter your Digital Bitbox password:")
    193         while self.password is None:
    194             if not self.password_dialog(msg):
    195                 raise UserCancelled()
    196             reply = self.hid_send_encrypt(b'{"led":"blink"}')
    197             if 'error' in reply:
    198                 self.password = None
    199                 if reply['error']['code'] == 109:
    200                     msg = _("Incorrect password entered.") + "\n\n" + \
    201                           reply['error']['message'] + "\n\n" + \
    202                           _("Enter your Digital Bitbox password:")
    203                 else:
    204                     # Should never occur
    205                     msg = _("Unexpected error occurred.") + "\n\n" + \
    206                           reply['error']['message'] + "\n\n" + \
    207                           _("Enter your Digital Bitbox password:")
    208 
    209         # Initialize device if not yet initialized
    210         if not self.setupRunning:
    211             self.isInitialized = True # Wallet exists. Electrum code later checks if the device matches the wallet
    212         elif not self.isInitialized:
    213             reply = self.hid_send_encrypt(b'{"device":"info"}')
    214             if reply['device']['id'] != "":
    215                 self.recover_or_erase_dialog() # Already seeded
    216             else:
    217                 self.seed_device_dialog() # Seed if not initialized
    218             self.mobile_pairing_dialog()
    219         return self.isInitialized
    220 
    221 
    222     def recover_or_erase_dialog(self):
    223         msg = _("The Digital Bitbox is already seeded. Choose an option:") + "\n"
    224         choices = [
    225             (_("Create a wallet using the current seed")),
    226             (_("Load a wallet from the micro SD card (the current seed is overwritten)")),
    227             (_("Erase the Digital Bitbox"))
    228         ]
    229         reply = self.handler.query_choice(msg, choices)
    230         if reply is None:
    231             return  # user cancelled
    232         if reply == 2:
    233             self.dbb_erase()
    234         elif reply == 1:
    235             if not self.dbb_load_backup():
    236                 return
    237         else:
    238             if self.hid_send_encrypt(b'{"device":"info"}')['device']['lock']:
    239                 raise UserFacingException(_("Full 2FA enabled. This is not supported yet."))
    240             # Use existing seed
    241         self.isInitialized = True
    242 
    243 
    244     def seed_device_dialog(self):
    245         msg = _("Choose how to initialize your Digital Bitbox:") + "\n"
    246         choices = [
    247             (_("Generate a new random wallet")),
    248             (_("Load a wallet from the micro SD card"))
    249         ]
    250         reply = self.handler.query_choice(msg, choices)
    251         if reply is None:
    252             return  # user cancelled
    253         if reply == 0:
    254             self.dbb_generate_wallet()
    255         else:
    256             if not self.dbb_load_backup(show_msg=False):
    257                 return
    258         self.isInitialized = True
    259 
    260     def mobile_pairing_dialog(self):
    261         dbb_user_dir = None
    262         if sys.platform == 'darwin':
    263             dbb_user_dir = os.path.join(os.environ.get("HOME", ""), "Library", "Application Support", "DBB")
    264         elif sys.platform == 'win32':
    265             dbb_user_dir = os.path.join(os.environ["APPDATA"], "DBB")
    266         else:
    267             dbb_user_dir = os.path.join(os.environ["HOME"], ".dbb")
    268 
    269         if not dbb_user_dir:
    270             return
    271 
    272         try:
    273             # Python 3.5+
    274             jsonDecodeError = json.JSONDecodeError
    275         except AttributeError:
    276             jsonDecodeError = ValueError
    277         try:
    278             with open(os.path.join(dbb_user_dir, "config.dat")) as f:
    279                 dbb_config = json.load(f)
    280         except (FileNotFoundError, jsonDecodeError):
    281             return
    282 
    283         if ENCRYPTION_PRIVKEY_KEY not in dbb_config or CHANNEL_ID_KEY not in dbb_config:
    284             return
    285 
    286         choices = [
    287             _('Do not pair'),
    288             _('Import pairing from the Digital Bitbox desktop app'),
    289         ]
    290         reply = self.handler.query_choice(_('Mobile pairing options'), choices)
    291         if reply is None:
    292             return  # user cancelled
    293 
    294         if reply == 0:
    295             if self.plugin.is_mobile_paired():
    296                 del self.plugin.digitalbitbox_config[ENCRYPTION_PRIVKEY_KEY]
    297                 del self.plugin.digitalbitbox_config[CHANNEL_ID_KEY]
    298         elif reply == 1:
    299             # import pairing from dbb app
    300             self.plugin.digitalbitbox_config[ENCRYPTION_PRIVKEY_KEY] = dbb_config[ENCRYPTION_PRIVKEY_KEY]
    301             self.plugin.digitalbitbox_config[CHANNEL_ID_KEY] = dbb_config[CHANNEL_ID_KEY]
    302         self.plugin.config.set_key('digitalbitbox', self.plugin.digitalbitbox_config)
    303 
    304     def dbb_generate_wallet(self):
    305         key = self.stretch_key(self.password)
    306         filename = ("Electrum-" + time.strftime("%Y-%m-%d-%H-%M-%S") + ".pdf")
    307         msg = ('{"seed":{"source": "create", "key": "%s", "filename": "%s", "entropy": "%s"}}' % (key, filename, to_hexstr(os.urandom(32)))).encode('utf8')
    308         reply = self.hid_send_encrypt(msg)
    309         if 'error' in reply:
    310             raise UserFacingException(reply['error']['message'])
    311 
    312 
    313     def dbb_erase(self):
    314         self.handler.show_message(_("Are you sure you want to erase the Digital Bitbox?") + "\n\n" +
    315                                   _("To continue, touch the Digital Bitbox's light for 3 seconds.") + "\n\n" +
    316                                   _("To cancel, briefly touch the light or wait for the timeout."))
    317         hid_reply = self.hid_send_encrypt(b'{"reset":"__ERASE__"}')
    318         self.handler.finished()
    319         if 'error' in hid_reply:
    320             raise UserFacingException(hid_reply['error']['message'])
    321         else:
    322             self.password = None
    323             raise UserFacingException('Device erased')
    324 
    325 
    326     def dbb_load_backup(self, show_msg=True):
    327         backups = self.hid_send_encrypt(b'{"backup":"list"}')
    328         if 'error' in backups:
    329             raise UserFacingException(backups['error']['message'])
    330         f = self.handler.query_choice(_("Choose a backup file:"), backups['backup'])
    331         if f is None:
    332             return False  # user cancelled
    333         key = self.backup_password_dialog()
    334         if key is None:
    335             raise Exception('Canceled by user')
    336         key = self.stretch_key(key)
    337         if show_msg:
    338             self.handler.show_message(_("Loading backup...") + "\n\n" +
    339                                       _("To continue, touch the Digital Bitbox's light for 3 seconds.") + "\n\n" +
    340                                       _("To cancel, briefly touch the light or wait for the timeout."))
    341         msg = ('{"seed":{"source": "backup", "key": "%s", "filename": "%s"}}' % (key, backups['backup'][f])).encode('utf8')
    342         hid_reply = self.hid_send_encrypt(msg)
    343         self.handler.finished()
    344         if 'error' in hid_reply:
    345             raise UserFacingException(hid_reply['error']['message'])
    346         return True
    347 
    348     @runs_in_hwd_thread
    349     def hid_send_frame(self, data):
    350         HWW_CID = 0xFF000000
    351         HWW_CMD = 0x80 + 0x40 + 0x01
    352         data_len = len(data)
    353         seq = 0;
    354         idx = 0;
    355         write = []
    356         while idx < data_len:
    357             if idx == 0:
    358                 # INIT frame
    359                 write = data[idx : idx + min(data_len, self.usbReportSize - 7)]
    360                 self.dbb_hid.write(b'\0' + struct.pack(">IBH", HWW_CID, HWW_CMD, data_len & 0xFFFF) + write + b'\xEE' * (self.usbReportSize - 7 - len(write)))
    361             else:
    362                 # CONT frame
    363                 write = data[idx : idx + min(data_len, self.usbReportSize - 5)]
    364                 self.dbb_hid.write(b'\0' + struct.pack(">IB", HWW_CID, seq) + write + b'\xEE' * (self.usbReportSize - 5 - len(write)))
    365                 seq += 1
    366             idx += len(write)
    367 
    368     @runs_in_hwd_thread
    369     def hid_read_frame(self):
    370         # INIT response
    371         read = bytearray(self.dbb_hid.read(self.usbReportSize))
    372         cid = ((read[0] * 256 + read[1]) * 256 + read[2]) * 256 + read[3]
    373         cmd = read[4]
    374         data_len = read[5] * 256 + read[6]
    375         data = read[7:]
    376         idx = len(read) - 7;
    377         while idx < data_len:
    378             # CONT response
    379             read = bytearray(self.dbb_hid.read(self.usbReportSize))
    380             data += read[5:]
    381             idx += len(read) - 5
    382         return data
    383 
    384     @runs_in_hwd_thread
    385     def hid_send_plain(self, msg):
    386         reply = ""
    387         try:
    388             serial_number = self.dbb_hid.get_serial_number_string()
    389             if "v2.0." in serial_number or "v1." in serial_number:
    390                 hidBufSize = 4096
    391                 self.dbb_hid.write('\0' + msg + '\0' * (hidBufSize - len(msg)))
    392                 r = bytearray()
    393                 while len(r) < hidBufSize:
    394                     r += bytearray(self.dbb_hid.read(hidBufSize))
    395             else:
    396                 self.hid_send_frame(msg)
    397                 r = self.hid_read_frame()
    398             r = r.rstrip(b' \t\r\n\0')
    399             r = r.replace(b"\0", b'')
    400             r = to_string(r, 'utf8')
    401             reply = json.loads(r)
    402         except Exception as e:
    403             _logger.info(f'Exception caught {repr(e)}')
    404         return reply
    405 
    406     @runs_in_hwd_thread
    407     def hid_send_encrypt(self, msg):
    408         sha256_byte_len = 32
    409         reply = ""
    410         try:
    411             encryption_key, authentication_key = derive_keys(self.password)
    412             msg = EncodeAES_bytes(encryption_key, msg)
    413             hmac_digest = hmac_oneshot(authentication_key, msg, hashlib.sha256)
    414             authenticated_msg = base64.b64encode(msg + hmac_digest)
    415             reply = self.hid_send_plain(authenticated_msg)
    416             if 'ciphertext' in reply:
    417                 b64_unencoded = bytes(base64.b64decode(''.join(reply["ciphertext"])))
    418                 reply_hmac = b64_unencoded[-sha256_byte_len:]
    419                 hmac_calculated = hmac_oneshot(authentication_key, b64_unencoded[:-sha256_byte_len], hashlib.sha256)
    420                 if not hmac.compare_digest(reply_hmac, hmac_calculated):
    421                     raise Exception("Failed to validate HMAC")
    422                 reply = DecodeAES_bytes(encryption_key, b64_unencoded[:-sha256_byte_len])
    423                 reply = to_string(reply, 'utf8')
    424                 reply = json.loads(reply)
    425             if 'error' in reply:
    426                 self.password = None
    427         except Exception as e:
    428             _logger.info(f'Exception caught {repr(e)}')
    429         return reply
    430 
    431 
    432 
    433 # ----------------------------------------------------------------------------------
    434 #
    435 #
    436 
    437 class DigitalBitbox_KeyStore(Hardware_KeyStore):
    438     hw_type = 'digitalbitbox'
    439     device = 'DigitalBitbox'
    440 
    441     plugin: 'DigitalBitboxPlugin'
    442 
    443     def __init__(self, d):
    444         Hardware_KeyStore.__init__(self, d)
    445         self.force_watching_only = False
    446         self.maxInputs = 14 # maximum inputs per single sign command
    447 
    448     def give_error(self, message, clear_client = False):
    449         if clear_client:
    450             self.client = None
    451         raise Exception(message)
    452 
    453 
    454     def decrypt_message(self, pubkey, message, password):
    455         raise RuntimeError(_('Encryption and decryption are currently not supported for {}').format(self.device))
    456 
    457 
    458     def sign_message(self, sequence, message, password):
    459         sig = None
    460         try:
    461             message = message.encode('utf8')
    462             inputPath = self.get_derivation_prefix() + "/%d/%d" % sequence
    463             msg_hash = sha256d(msg_magic(message))
    464             inputHash = to_hexstr(msg_hash)
    465             hasharray = []
    466             hasharray.append({'hash': inputHash, 'keypath': inputPath})
    467             hasharray = json.dumps(hasharray)
    468 
    469             msg = ('{"sign":{"meta":"sign message", "data":%s}}' % hasharray).encode('utf8')
    470 
    471             dbb_client = self.plugin.get_client(self)
    472 
    473             if not dbb_client.is_paired():
    474                 raise Exception(_("Could not sign message."))
    475 
    476             reply = dbb_client.hid_send_encrypt(msg)
    477             self.handler.show_message(_("Signing message ...") + "\n\n" +
    478                                       _("To continue, touch the Digital Bitbox's blinking light for 3 seconds.") + "\n\n" +
    479                                       _("To cancel, briefly touch the blinking light or wait for the timeout."))
    480             reply = dbb_client.hid_send_encrypt(msg) # Send twice, first returns an echo for smart verification (not implemented)
    481             self.handler.finished()
    482 
    483             if 'error' in reply:
    484                 raise Exception(reply['error']['message'])
    485 
    486             if 'sign' not in reply:
    487                 raise Exception(_("Could not sign message."))
    488 
    489             if 'recid' in reply['sign'][0]:
    490                 # firmware > v2.1.1
    491                 sig_string = binascii.unhexlify(reply['sign'][0]['sig'])
    492                 recid = int(reply['sign'][0]['recid'], 16)
    493                 sig = ecc.construct_sig65(sig_string, recid, True)
    494                 pubkey, compressed = ecc.ECPubkey.from_signature65(sig, msg_hash)
    495                 addr = public_key_to_p2pkh(pubkey.get_public_key_bytes(compressed=compressed))
    496                 if ecc.verify_message_with_address(addr, sig, message) is False:
    497                     raise Exception(_("Could not sign message"))
    498             elif 'pubkey' in reply['sign'][0]:
    499                 # firmware <= v2.1.1
    500                 for recid in range(4):
    501                     sig_string = binascii.unhexlify(reply['sign'][0]['sig'])
    502                     sig = ecc.construct_sig65(sig_string, recid, True)
    503                     try:
    504                         addr = public_key_to_p2pkh(binascii.unhexlify(reply['sign'][0]['pubkey']))
    505                         if ecc.verify_message_with_address(addr, sig, message):
    506                             break
    507                     except Exception:
    508                         continue
    509                 else:
    510                     raise Exception(_("Could not sign message"))
    511 
    512 
    513         except BaseException as e:
    514             self.give_error(e)
    515         return sig
    516 
    517 
    518     def sign_transaction(self, tx, password):
    519         if tx.is_complete():
    520             return
    521 
    522         try:
    523             p2pkhTransaction = True
    524             inputhasharray = []
    525             hasharray = []
    526             pubkeyarray = []
    527 
    528             # Build hasharray from inputs
    529             for i, txin in enumerate(tx.inputs()):
    530                 if txin.is_coinbase_input():
    531                     self.give_error("Coinbase not supported") # should never happen
    532 
    533                 if txin.script_type != 'p2pkh':
    534                     p2pkhTransaction = False
    535 
    536                 my_pubkey, inputPath = self.find_my_pubkey_in_txinout(txin)
    537                 if not inputPath:
    538                     self.give_error("No matching pubkey for sign_transaction")  # should never happen
    539                 inputPath = convert_bip32_intpath_to_strpath(inputPath)
    540                 inputHash = sha256d(bfh(tx.serialize_preimage(i)))
    541                 hasharray_i = {'hash': to_hexstr(inputHash), 'keypath': inputPath}
    542                 hasharray.append(hasharray_i)
    543                 inputhasharray.append(inputHash)
    544 
    545             # Build pubkeyarray from outputs
    546             for txout in tx.outputs():
    547                 assert txout.address
    548                 if txout.is_change:
    549                     changePubkey, changePath = self.find_my_pubkey_in_txinout(txout)
    550                     assert changePath
    551                     changePath = convert_bip32_intpath_to_strpath(changePath)
    552                     changePubkey = changePubkey.hex()
    553                     pubkeyarray_i = {'pubkey': changePubkey, 'keypath': changePath}
    554                     pubkeyarray.append(pubkeyarray_i)
    555 
    556             # Special serialization of the unsigned transaction for
    557             # the mobile verification app.
    558             # At the moment, verification only works for p2pkh transactions.
    559             if p2pkhTransaction:
    560                 tx_copy = copy.deepcopy(tx)
    561                 # monkey-patch method of tx_copy instance to change serialization
    562                 def input_script(self, txin: PartialTxInput, *, estimate_size=False):
    563                     if txin.script_type == 'p2pkh':
    564                         return Transaction.get_preimage_script(txin)
    565                     raise Exception("unsupported type %s" % txin.script_type)
    566                 tx_copy.input_script = input_script.__get__(tx_copy, PartialTransaction)
    567                 tx_dbb_serialized = tx_copy.serialize_to_network()
    568             else:
    569                 # We only need this for the signing echo / verification.
    570                 tx_dbb_serialized = None
    571 
    572             # Build sign command
    573             dbb_signatures = []
    574             steps = math.ceil(1.0 * len(hasharray) / self.maxInputs)
    575             for step in range(int(steps)):
    576                 hashes = hasharray[step * self.maxInputs : (step + 1) * self.maxInputs]
    577 
    578                 msg = {
    579                     "sign": {
    580                         "data": hashes,
    581                         "checkpub": pubkeyarray,
    582                     },
    583                 }
    584                 if tx_dbb_serialized is not None:
    585                     msg["sign"]["meta"] = to_hexstr(sha256d(tx_dbb_serialized))
    586                 msg = json.dumps(msg).encode('ascii')
    587                 dbb_client = self.plugin.get_client(self)
    588 
    589                 if not dbb_client.is_paired():
    590                     raise Exception("Could not sign transaction.")
    591 
    592                 reply = dbb_client.hid_send_encrypt(msg)
    593                 if 'error' in reply:
    594                     raise Exception(reply['error']['message'])
    595 
    596                 if 'echo' not in reply:
    597                     raise Exception("Could not sign transaction.")
    598 
    599                 if self.plugin.is_mobile_paired() and tx_dbb_serialized is not None:
    600                     reply['tx'] = tx_dbb_serialized
    601                     self.plugin.comserver_post_notification(reply)
    602 
    603                 if steps > 1:
    604                     self.handler.show_message(_("Signing large transaction. Please be patient ...") + "\n\n" +
    605                                               _("To continue, touch the Digital Bitbox's blinking light for 3 seconds.") + " " +
    606                                               _("(Touch {} of {})").format((step + 1), steps) + "\n\n" +
    607                                               _("To cancel, briefly touch the blinking light or wait for the timeout.") + "\n\n")
    608                 else:
    609                     self.handler.show_message(_("Signing transaction...") + "\n\n" +
    610                                               _("To continue, touch the Digital Bitbox's blinking light for 3 seconds.") + "\n\n" +
    611                                               _("To cancel, briefly touch the blinking light or wait for the timeout."))
    612 
    613                 # Send twice, first returns an echo for smart verification
    614                 reply = dbb_client.hid_send_encrypt(msg)
    615                 self.handler.finished()
    616 
    617                 if 'error' in reply:
    618                     if reply["error"].get('code') in (600, 601):
    619                         # aborted via LED short touch or timeout
    620                         raise UserCancelled()
    621                     raise Exception(reply['error']['message'])
    622 
    623                 if 'sign' not in reply:
    624                     raise Exception("Could not sign transaction.")
    625 
    626                 dbb_signatures.extend(reply['sign'])
    627 
    628             # Fill signatures
    629             if len(dbb_signatures) != len(tx.inputs()):
    630                 raise Exception("Incorrect number of transactions signed.") # Should never occur
    631             for i, txin in enumerate(tx.inputs()):
    632                 for pubkey_bytes in txin.pubkeys:
    633                     if txin.is_complete():
    634                         break
    635                     signed = dbb_signatures[i]
    636                     if 'recid' in signed:
    637                         # firmware > v2.1.1
    638                         recid = int(signed['recid'], 16)
    639                         s = binascii.unhexlify(signed['sig'])
    640                         h = inputhasharray[i]
    641                         pk = ecc.ECPubkey.from_sig_string(s, recid, h)
    642                         pk = pk.get_public_key_hex(compressed=True)
    643                     elif 'pubkey' in signed:
    644                         # firmware <= v2.1.1
    645                         pk = signed['pubkey']
    646                     if pk != pubkey_bytes.hex():
    647                         continue
    648                     sig_r = int(signed['sig'][:64], 16)
    649                     sig_s = int(signed['sig'][64:], 16)
    650                     sig = ecc.der_sig_from_r_and_s(sig_r, sig_s)
    651                     sig = to_hexstr(sig) + '01'
    652                     tx.add_signature_to_txin(txin_idx=i, signing_pubkey=pubkey_bytes.hex(), sig=sig)
    653         except UserCancelled:
    654             raise
    655         except BaseException as e:
    656             self.give_error(e, True)
    657         else:
    658             _logger.info(f"Transaction is_complete {tx.is_complete()}")
    659 
    660 
    661 class DigitalBitboxPlugin(HW_PluginBase):
    662 
    663     libraries_available = DIGIBOX
    664     keystore_class = DigitalBitbox_KeyStore
    665     client = None
    666     DEVICE_IDS = [
    667                    (0x03eb, 0x2402) # Digital Bitbox
    668                  ]
    669     SUPPORTED_XTYPES = ('standard', 'p2wpkh-p2sh', 'p2wpkh', 'p2wsh-p2sh', 'p2wsh')
    670 
    671     def __init__(self, parent, config, name):
    672         HW_PluginBase.__init__(self, parent, config, name)
    673         if self.libraries_available:
    674             self.device_manager().register_devices(self.DEVICE_IDS, plugin=self)
    675 
    676         self.digitalbitbox_config = self.config.get('digitalbitbox', {})
    677 
    678     @runs_in_hwd_thread
    679     def get_dbb_device(self, device):
    680         dev = hid.device()
    681         dev.open_path(device.path)
    682         return dev
    683 
    684 
    685     def create_client(self, device, handler):
    686         if device.interface_number == 0 or device.usage_page == 0xffff:
    687             if handler:
    688                 self.handler = handler
    689             client = self.get_dbb_device(device)
    690             if client is not None:
    691                 client = DigitalBitbox_Client(self, client)
    692             return client
    693         else:
    694             return None
    695 
    696 
    697     def setup_device(self, device_info, wizard, purpose):
    698         device_id = device_info.device.id_
    699         client = self.scan_and_create_client_for_device(device_id=device_id, wizard=wizard)
    700         if purpose == HWD_SETUP_NEW_WALLET:
    701             client.setupRunning = True
    702         wizard.run_task_without_blocking_gui(
    703             task=lambda: client.get_xpub("m/44'/0'", 'standard'))
    704         return client
    705 
    706 
    707     def is_mobile_paired(self):
    708         return ENCRYPTION_PRIVKEY_KEY in self.digitalbitbox_config
    709 
    710 
    711     def comserver_post_notification(self, payload):
    712         assert self.is_mobile_paired(), "unexpected mobile pairing error"
    713         url = 'https://digitalbitbox.com/smartverification/index.php'
    714         key_s = base64.b64decode(self.digitalbitbox_config[ENCRYPTION_PRIVKEY_KEY])
    715         args = 'c=data&s=0&dt=0&uuid=%s&pl=%s' % (
    716             self.digitalbitbox_config[CHANNEL_ID_KEY],
    717             EncodeAES_base64(key_s, json.dumps(payload).encode('ascii')).decode('ascii'),
    718         )
    719         try:
    720             text = Network.send_http_on_proxy('post', url, body=args.encode('ascii'), headers={'content-type': 'application/x-www-form-urlencoded'})
    721             _logger.info(f'digitalbitbox reply from server {text}')
    722         except Exception as e:
    723             self.handler.show_error(repr(e)) # repr because str(Exception()) == ''
    724 
    725 
    726     def get_xpub(self, device_id, derivation, xtype, wizard):
    727         if xtype not in self.SUPPORTED_XTYPES:
    728             raise ScriptTypeNotSupported(_('This type of script is not supported with {}.').format(self.device))
    729         if is_all_public_derivation(derivation):
    730             raise Exception(f"The {self.device} does not reveal xpubs corresponding to non-hardened paths. (path: {derivation})")
    731         client = self.scan_and_create_client_for_device(device_id=device_id, wizard=wizard)
    732         client.check_device_dialog()
    733         xpub = client.get_xpub(derivation, xtype)
    734         return xpub
    735 
    736 
    737     def get_client(self, keystore, force_pair=True, *,
    738                    devices=None, allow_user_interaction=True):
    739         client = super().get_client(keystore, force_pair,
    740                                     devices=devices,
    741                                     allow_user_interaction=allow_user_interaction)
    742         if client is not None:
    743             client.check_device_dialog()
    744         return client
    745 
    746     def show_address(self, wallet, address, keystore=None):
    747         if keystore is None:
    748             keystore = wallet.get_keystore()
    749         if not self.show_address_helper(wallet, address, keystore):
    750             return
    751         if type(wallet) is not Standard_Wallet:
    752             keystore.handler.show_error(_('This function is only available for standard wallets when using {}.').format(self.device))
    753             return
    754         if not self.is_mobile_paired():
    755             keystore.handler.show_error(_('This function is only available after pairing your {} with a mobile device.').format(self.device))
    756             return
    757         if wallet.get_txin_type(address) != 'p2pkh':
    758             keystore.handler.show_error(_('This function is only available for p2pkh keystores when using {}.').format(self.device))
    759             return
    760         change, index = wallet.get_address_index(address)
    761         keypath = '%s/%d/%d' % (keystore.get_derivation_prefix(), change, index)
    762         xpub = self.get_client(keystore)._get_xpub(keypath)
    763         verify_request_payload = {
    764             "type": 'p2pkh',
    765             "echo": xpub['echo'],
    766         }
    767         self.comserver_post_notification(verify_request_payload)