digitalbitbox.py (31895B)
1 # ---------------------------------------------------------------------------------- 2 # Electrum plugin for the Digital Bitbox hardware wallet by Shift Devices AG 3 # digitalbitbox.com 4 # 5 6 import base64 7 import binascii 8 import hashlib 9 import hmac 10 import json 11 import math 12 import os 13 import re 14 import struct 15 import sys 16 import time 17 import copy 18 19 from electrum.crypto import sha256d, EncodeAES_base64, EncodeAES_bytes, DecodeAES_bytes, hmac_oneshot 20 from electrum.bitcoin import public_key_to_p2pkh 21 from electrum.bip32 import BIP32Node, convert_bip32_intpath_to_strpath, is_all_public_derivation 22 from electrum import ecc 23 from electrum.ecc import msg_magic 24 from electrum.wallet import Standard_Wallet 25 from electrum import constants 26 from electrum.transaction import Transaction, PartialTransaction, PartialTxInput 27 from electrum.i18n import _ 28 from electrum.keystore import Hardware_KeyStore 29 from electrum.util import to_string, UserCancelled, UserFacingException, bfh 30 from electrum.base_wizard import ScriptTypeNotSupported, HWD_SETUP_NEW_WALLET 31 from electrum.network import Network 32 from electrum.logging import get_logger 33 from electrum.plugin import runs_in_hwd_thread, run_in_hwd_thread 34 35 from ..hw_wallet import HW_PluginBase, HardwareClientBase 36 37 38 _logger = get_logger(__name__) 39 40 41 try: 42 import hid 43 DIGIBOX = True 44 except ImportError as e: 45 DIGIBOX = False 46 47 48 49 # ---------------------------------------------------------------------------------- 50 # USB HID interface 51 # 52 53 def to_hexstr(s): 54 return binascii.hexlify(s).decode('ascii') 55 56 57 def derive_keys(x): 58 h = sha256d(x) 59 h = hashlib.sha512(h).digest() 60 return (h[:32],h[32:]) 61 62 MIN_MAJOR_VERSION = 5 63 64 ENCRYPTION_PRIVKEY_KEY = 'encryptionprivkey' 65 CHANNEL_ID_KEY = 'comserverchannelid' 66 67 class DigitalBitbox_Client(HardwareClientBase): 68 69 def __init__(self, plugin, hidDevice): 70 HardwareClientBase.__init__(self, plugin=plugin) 71 self.dbb_hid = hidDevice 72 self.opened = True 73 self.password = None 74 self.isInitialized = False 75 self.setupRunning = False 76 self.usbReportSize = 64 # firmware > v2.0.0 77 78 @runs_in_hwd_thread 79 def close(self): 80 if self.opened: 81 try: 82 self.dbb_hid.close() 83 except: 84 pass 85 self.opened = False 86 87 88 def is_pairable(self): 89 return True 90 91 92 def is_initialized(self): 93 return self.dbb_has_password() 94 95 96 def is_paired(self): 97 return self.password is not None 98 99 def has_usable_connection_with_device(self): 100 try: 101 self.dbb_has_password() 102 except BaseException: 103 return False 104 return True 105 106 def _get_xpub(self, bip32_path): 107 if self.check_device_dialog(): 108 return self.hid_send_encrypt(('{"xpub": "%s"}' % bip32_path).encode('utf8')) 109 110 def get_xpub(self, bip32_path, xtype): 111 assert xtype in self.plugin.SUPPORTED_XTYPES 112 reply = self._get_xpub(bip32_path) 113 if reply: 114 xpub = reply['xpub'] 115 # Change type of xpub to the requested type. The firmware 116 # only ever returns the mainnet standard type, but it is agnostic 117 # to the type when signing. 118 if xtype != 'standard' or constants.net.TESTNET: 119 node = BIP32Node.from_xkey(xpub, net=constants.BitcoinMainnet) 120 xpub = node._replace(xtype=xtype).to_xpub() 121 return xpub 122 else: 123 raise Exception('no reply') 124 125 def dbb_has_password(self): 126 reply = self.hid_send_plain(b'{"ping":""}') 127 if 'ping' not in reply: 128 raise UserFacingException(_('Device communication error. Please unplug and replug your Digital Bitbox.')) 129 if reply['ping'] == 'password': 130 return True 131 return False 132 133 134 def stretch_key(self, key: bytes): 135 return to_hexstr(hashlib.pbkdf2_hmac('sha512', key, b'Digital Bitbox', iterations = 20480)) 136 137 138 def backup_password_dialog(self): 139 msg = _("Enter the password used when the backup was created:") 140 while True: 141 password = self.handler.get_passphrase(msg, False) 142 if password is None: 143 return None 144 if len(password) < 4: 145 msg = _("Password must have at least 4 characters.") \ 146 + "\n\n" + _("Enter password:") 147 elif len(password) > 64: 148 msg = _("Password must have less than 64 characters.") \ 149 + "\n\n" + _("Enter password:") 150 else: 151 return password.encode('utf8') 152 153 154 def password_dialog(self, msg): 155 while True: 156 password = self.handler.get_passphrase(msg, False) 157 if password is None: 158 return False 159 if len(password) < 4: 160 msg = _("Password must have at least 4 characters.") + \ 161 "\n\n" + _("Enter password:") 162 elif len(password) > 64: 163 msg = _("Password must have less than 64 characters.") + \ 164 "\n\n" + _("Enter password:") 165 else: 166 self.password = password.encode('utf8') 167 return True 168 169 def check_device_dialog(self): 170 match = re.search(r'v([0-9])+\.[0-9]+\.[0-9]+', 171 run_in_hwd_thread(self.dbb_hid.get_serial_number_string)) 172 if match is None: 173 raise Exception("error detecting firmware version") 174 major_version = int(match.group(1)) 175 if major_version < MIN_MAJOR_VERSION: 176 raise Exception("Please upgrade to the newest firmware using the BitBox Desktop app: https://shiftcrypto.ch/start") 177 # Set password if fresh device 178 if self.password is None and not self.dbb_has_password(): 179 if not self.setupRunning: 180 return False # A fresh device cannot connect to an existing wallet 181 msg = _("An uninitialized Digital Bitbox is detected.") + " " + \ 182 _("Enter a new password below.") + "\n\n" + \ 183 _("REMEMBER THE PASSWORD!") + "\n\n" + \ 184 _("You cannot access your coins or a backup without the password.") + "\n" + \ 185 _("A backup is saved automatically when generating a new wallet.") 186 if self.password_dialog(msg): 187 reply = self.hid_send_plain(b'{"password":"' + self.password + b'"}') 188 else: 189 return False 190 191 # Get password from user if not yet set 192 msg = _("Enter your Digital Bitbox password:") 193 while self.password is None: 194 if not self.password_dialog(msg): 195 raise UserCancelled() 196 reply = self.hid_send_encrypt(b'{"led":"blink"}') 197 if 'error' in reply: 198 self.password = None 199 if reply['error']['code'] == 109: 200 msg = _("Incorrect password entered.") + "\n\n" + \ 201 reply['error']['message'] + "\n\n" + \ 202 _("Enter your Digital Bitbox password:") 203 else: 204 # Should never occur 205 msg = _("Unexpected error occurred.") + "\n\n" + \ 206 reply['error']['message'] + "\n\n" + \ 207 _("Enter your Digital Bitbox password:") 208 209 # Initialize device if not yet initialized 210 if not self.setupRunning: 211 self.isInitialized = True # Wallet exists. Electrum code later checks if the device matches the wallet 212 elif not self.isInitialized: 213 reply = self.hid_send_encrypt(b'{"device":"info"}') 214 if reply['device']['id'] != "": 215 self.recover_or_erase_dialog() # Already seeded 216 else: 217 self.seed_device_dialog() # Seed if not initialized 218 self.mobile_pairing_dialog() 219 return self.isInitialized 220 221 222 def recover_or_erase_dialog(self): 223 msg = _("The Digital Bitbox is already seeded. Choose an option:") + "\n" 224 choices = [ 225 (_("Create a wallet using the current seed")), 226 (_("Load a wallet from the micro SD card (the current seed is overwritten)")), 227 (_("Erase the Digital Bitbox")) 228 ] 229 reply = self.handler.query_choice(msg, choices) 230 if reply is None: 231 return # user cancelled 232 if reply == 2: 233 self.dbb_erase() 234 elif reply == 1: 235 if not self.dbb_load_backup(): 236 return 237 else: 238 if self.hid_send_encrypt(b'{"device":"info"}')['device']['lock']: 239 raise UserFacingException(_("Full 2FA enabled. This is not supported yet.")) 240 # Use existing seed 241 self.isInitialized = True 242 243 244 def seed_device_dialog(self): 245 msg = _("Choose how to initialize your Digital Bitbox:") + "\n" 246 choices = [ 247 (_("Generate a new random wallet")), 248 (_("Load a wallet from the micro SD card")) 249 ] 250 reply = self.handler.query_choice(msg, choices) 251 if reply is None: 252 return # user cancelled 253 if reply == 0: 254 self.dbb_generate_wallet() 255 else: 256 if not self.dbb_load_backup(show_msg=False): 257 return 258 self.isInitialized = True 259 260 def mobile_pairing_dialog(self): 261 dbb_user_dir = None 262 if sys.platform == 'darwin': 263 dbb_user_dir = os.path.join(os.environ.get("HOME", ""), "Library", "Application Support", "DBB") 264 elif sys.platform == 'win32': 265 dbb_user_dir = os.path.join(os.environ["APPDATA"], "DBB") 266 else: 267 dbb_user_dir = os.path.join(os.environ["HOME"], ".dbb") 268 269 if not dbb_user_dir: 270 return 271 272 try: 273 # Python 3.5+ 274 jsonDecodeError = json.JSONDecodeError 275 except AttributeError: 276 jsonDecodeError = ValueError 277 try: 278 with open(os.path.join(dbb_user_dir, "config.dat")) as f: 279 dbb_config = json.load(f) 280 except (FileNotFoundError, jsonDecodeError): 281 return 282 283 if ENCRYPTION_PRIVKEY_KEY not in dbb_config or CHANNEL_ID_KEY not in dbb_config: 284 return 285 286 choices = [ 287 _('Do not pair'), 288 _('Import pairing from the Digital Bitbox desktop app'), 289 ] 290 reply = self.handler.query_choice(_('Mobile pairing options'), choices) 291 if reply is None: 292 return # user cancelled 293 294 if reply == 0: 295 if self.plugin.is_mobile_paired(): 296 del self.plugin.digitalbitbox_config[ENCRYPTION_PRIVKEY_KEY] 297 del self.plugin.digitalbitbox_config[CHANNEL_ID_KEY] 298 elif reply == 1: 299 # import pairing from dbb app 300 self.plugin.digitalbitbox_config[ENCRYPTION_PRIVKEY_KEY] = dbb_config[ENCRYPTION_PRIVKEY_KEY] 301 self.plugin.digitalbitbox_config[CHANNEL_ID_KEY] = dbb_config[CHANNEL_ID_KEY] 302 self.plugin.config.set_key('digitalbitbox', self.plugin.digitalbitbox_config) 303 304 def dbb_generate_wallet(self): 305 key = self.stretch_key(self.password) 306 filename = ("Electrum-" + time.strftime("%Y-%m-%d-%H-%M-%S") + ".pdf") 307 msg = ('{"seed":{"source": "create", "key": "%s", "filename": "%s", "entropy": "%s"}}' % (key, filename, to_hexstr(os.urandom(32)))).encode('utf8') 308 reply = self.hid_send_encrypt(msg) 309 if 'error' in reply: 310 raise UserFacingException(reply['error']['message']) 311 312 313 def dbb_erase(self): 314 self.handler.show_message(_("Are you sure you want to erase the Digital Bitbox?") + "\n\n" + 315 _("To continue, touch the Digital Bitbox's light for 3 seconds.") + "\n\n" + 316 _("To cancel, briefly touch the light or wait for the timeout.")) 317 hid_reply = self.hid_send_encrypt(b'{"reset":"__ERASE__"}') 318 self.handler.finished() 319 if 'error' in hid_reply: 320 raise UserFacingException(hid_reply['error']['message']) 321 else: 322 self.password = None 323 raise UserFacingException('Device erased') 324 325 326 def dbb_load_backup(self, show_msg=True): 327 backups = self.hid_send_encrypt(b'{"backup":"list"}') 328 if 'error' in backups: 329 raise UserFacingException(backups['error']['message']) 330 f = self.handler.query_choice(_("Choose a backup file:"), backups['backup']) 331 if f is None: 332 return False # user cancelled 333 key = self.backup_password_dialog() 334 if key is None: 335 raise Exception('Canceled by user') 336 key = self.stretch_key(key) 337 if show_msg: 338 self.handler.show_message(_("Loading backup...") + "\n\n" + 339 _("To continue, touch the Digital Bitbox's light for 3 seconds.") + "\n\n" + 340 _("To cancel, briefly touch the light or wait for the timeout.")) 341 msg = ('{"seed":{"source": "backup", "key": "%s", "filename": "%s"}}' % (key, backups['backup'][f])).encode('utf8') 342 hid_reply = self.hid_send_encrypt(msg) 343 self.handler.finished() 344 if 'error' in hid_reply: 345 raise UserFacingException(hid_reply['error']['message']) 346 return True 347 348 @runs_in_hwd_thread 349 def hid_send_frame(self, data): 350 HWW_CID = 0xFF000000 351 HWW_CMD = 0x80 + 0x40 + 0x01 352 data_len = len(data) 353 seq = 0; 354 idx = 0; 355 write = [] 356 while idx < data_len: 357 if idx == 0: 358 # INIT frame 359 write = data[idx : idx + min(data_len, self.usbReportSize - 7)] 360 self.dbb_hid.write(b'\0' + struct.pack(">IBH", HWW_CID, HWW_CMD, data_len & 0xFFFF) + write + b'\xEE' * (self.usbReportSize - 7 - len(write))) 361 else: 362 # CONT frame 363 write = data[idx : idx + min(data_len, self.usbReportSize - 5)] 364 self.dbb_hid.write(b'\0' + struct.pack(">IB", HWW_CID, seq) + write + b'\xEE' * (self.usbReportSize - 5 - len(write))) 365 seq += 1 366 idx += len(write) 367 368 @runs_in_hwd_thread 369 def hid_read_frame(self): 370 # INIT response 371 read = bytearray(self.dbb_hid.read(self.usbReportSize)) 372 cid = ((read[0] * 256 + read[1]) * 256 + read[2]) * 256 + read[3] 373 cmd = read[4] 374 data_len = read[5] * 256 + read[6] 375 data = read[7:] 376 idx = len(read) - 7; 377 while idx < data_len: 378 # CONT response 379 read = bytearray(self.dbb_hid.read(self.usbReportSize)) 380 data += read[5:] 381 idx += len(read) - 5 382 return data 383 384 @runs_in_hwd_thread 385 def hid_send_plain(self, msg): 386 reply = "" 387 try: 388 serial_number = self.dbb_hid.get_serial_number_string() 389 if "v2.0." in serial_number or "v1." in serial_number: 390 hidBufSize = 4096 391 self.dbb_hid.write('\0' + msg + '\0' * (hidBufSize - len(msg))) 392 r = bytearray() 393 while len(r) < hidBufSize: 394 r += bytearray(self.dbb_hid.read(hidBufSize)) 395 else: 396 self.hid_send_frame(msg) 397 r = self.hid_read_frame() 398 r = r.rstrip(b' \t\r\n\0') 399 r = r.replace(b"\0", b'') 400 r = to_string(r, 'utf8') 401 reply = json.loads(r) 402 except Exception as e: 403 _logger.info(f'Exception caught {repr(e)}') 404 return reply 405 406 @runs_in_hwd_thread 407 def hid_send_encrypt(self, msg): 408 sha256_byte_len = 32 409 reply = "" 410 try: 411 encryption_key, authentication_key = derive_keys(self.password) 412 msg = EncodeAES_bytes(encryption_key, msg) 413 hmac_digest = hmac_oneshot(authentication_key, msg, hashlib.sha256) 414 authenticated_msg = base64.b64encode(msg + hmac_digest) 415 reply = self.hid_send_plain(authenticated_msg) 416 if 'ciphertext' in reply: 417 b64_unencoded = bytes(base64.b64decode(''.join(reply["ciphertext"]))) 418 reply_hmac = b64_unencoded[-sha256_byte_len:] 419 hmac_calculated = hmac_oneshot(authentication_key, b64_unencoded[:-sha256_byte_len], hashlib.sha256) 420 if not hmac.compare_digest(reply_hmac, hmac_calculated): 421 raise Exception("Failed to validate HMAC") 422 reply = DecodeAES_bytes(encryption_key, b64_unencoded[:-sha256_byte_len]) 423 reply = to_string(reply, 'utf8') 424 reply = json.loads(reply) 425 if 'error' in reply: 426 self.password = None 427 except Exception as e: 428 _logger.info(f'Exception caught {repr(e)}') 429 return reply 430 431 432 433 # ---------------------------------------------------------------------------------- 434 # 435 # 436 437 class DigitalBitbox_KeyStore(Hardware_KeyStore): 438 hw_type = 'digitalbitbox' 439 device = 'DigitalBitbox' 440 441 plugin: 'DigitalBitboxPlugin' 442 443 def __init__(self, d): 444 Hardware_KeyStore.__init__(self, d) 445 self.force_watching_only = False 446 self.maxInputs = 14 # maximum inputs per single sign command 447 448 def give_error(self, message, clear_client = False): 449 if clear_client: 450 self.client = None 451 raise Exception(message) 452 453 454 def decrypt_message(self, pubkey, message, password): 455 raise RuntimeError(_('Encryption and decryption are currently not supported for {}').format(self.device)) 456 457 458 def sign_message(self, sequence, message, password): 459 sig = None 460 try: 461 message = message.encode('utf8') 462 inputPath = self.get_derivation_prefix() + "/%d/%d" % sequence 463 msg_hash = sha256d(msg_magic(message)) 464 inputHash = to_hexstr(msg_hash) 465 hasharray = [] 466 hasharray.append({'hash': inputHash, 'keypath': inputPath}) 467 hasharray = json.dumps(hasharray) 468 469 msg = ('{"sign":{"meta":"sign message", "data":%s}}' % hasharray).encode('utf8') 470 471 dbb_client = self.plugin.get_client(self) 472 473 if not dbb_client.is_paired(): 474 raise Exception(_("Could not sign message.")) 475 476 reply = dbb_client.hid_send_encrypt(msg) 477 self.handler.show_message(_("Signing message ...") + "\n\n" + 478 _("To continue, touch the Digital Bitbox's blinking light for 3 seconds.") + "\n\n" + 479 _("To cancel, briefly touch the blinking light or wait for the timeout.")) 480 reply = dbb_client.hid_send_encrypt(msg) # Send twice, first returns an echo for smart verification (not implemented) 481 self.handler.finished() 482 483 if 'error' in reply: 484 raise Exception(reply['error']['message']) 485 486 if 'sign' not in reply: 487 raise Exception(_("Could not sign message.")) 488 489 if 'recid' in reply['sign'][0]: 490 # firmware > v2.1.1 491 sig_string = binascii.unhexlify(reply['sign'][0]['sig']) 492 recid = int(reply['sign'][0]['recid'], 16) 493 sig = ecc.construct_sig65(sig_string, recid, True) 494 pubkey, compressed = ecc.ECPubkey.from_signature65(sig, msg_hash) 495 addr = public_key_to_p2pkh(pubkey.get_public_key_bytes(compressed=compressed)) 496 if ecc.verify_message_with_address(addr, sig, message) is False: 497 raise Exception(_("Could not sign message")) 498 elif 'pubkey' in reply['sign'][0]: 499 # firmware <= v2.1.1 500 for recid in range(4): 501 sig_string = binascii.unhexlify(reply['sign'][0]['sig']) 502 sig = ecc.construct_sig65(sig_string, recid, True) 503 try: 504 addr = public_key_to_p2pkh(binascii.unhexlify(reply['sign'][0]['pubkey'])) 505 if ecc.verify_message_with_address(addr, sig, message): 506 break 507 except Exception: 508 continue 509 else: 510 raise Exception(_("Could not sign message")) 511 512 513 except BaseException as e: 514 self.give_error(e) 515 return sig 516 517 518 def sign_transaction(self, tx, password): 519 if tx.is_complete(): 520 return 521 522 try: 523 p2pkhTransaction = True 524 inputhasharray = [] 525 hasharray = [] 526 pubkeyarray = [] 527 528 # Build hasharray from inputs 529 for i, txin in enumerate(tx.inputs()): 530 if txin.is_coinbase_input(): 531 self.give_error("Coinbase not supported") # should never happen 532 533 if txin.script_type != 'p2pkh': 534 p2pkhTransaction = False 535 536 my_pubkey, inputPath = self.find_my_pubkey_in_txinout(txin) 537 if not inputPath: 538 self.give_error("No matching pubkey for sign_transaction") # should never happen 539 inputPath = convert_bip32_intpath_to_strpath(inputPath) 540 inputHash = sha256d(bfh(tx.serialize_preimage(i))) 541 hasharray_i = {'hash': to_hexstr(inputHash), 'keypath': inputPath} 542 hasharray.append(hasharray_i) 543 inputhasharray.append(inputHash) 544 545 # Build pubkeyarray from outputs 546 for txout in tx.outputs(): 547 assert txout.address 548 if txout.is_change: 549 changePubkey, changePath = self.find_my_pubkey_in_txinout(txout) 550 assert changePath 551 changePath = convert_bip32_intpath_to_strpath(changePath) 552 changePubkey = changePubkey.hex() 553 pubkeyarray_i = {'pubkey': changePubkey, 'keypath': changePath} 554 pubkeyarray.append(pubkeyarray_i) 555 556 # Special serialization of the unsigned transaction for 557 # the mobile verification app. 558 # At the moment, verification only works for p2pkh transactions. 559 if p2pkhTransaction: 560 tx_copy = copy.deepcopy(tx) 561 # monkey-patch method of tx_copy instance to change serialization 562 def input_script(self, txin: PartialTxInput, *, estimate_size=False): 563 if txin.script_type == 'p2pkh': 564 return Transaction.get_preimage_script(txin) 565 raise Exception("unsupported type %s" % txin.script_type) 566 tx_copy.input_script = input_script.__get__(tx_copy, PartialTransaction) 567 tx_dbb_serialized = tx_copy.serialize_to_network() 568 else: 569 # We only need this for the signing echo / verification. 570 tx_dbb_serialized = None 571 572 # Build sign command 573 dbb_signatures = [] 574 steps = math.ceil(1.0 * len(hasharray) / self.maxInputs) 575 for step in range(int(steps)): 576 hashes = hasharray[step * self.maxInputs : (step + 1) * self.maxInputs] 577 578 msg = { 579 "sign": { 580 "data": hashes, 581 "checkpub": pubkeyarray, 582 }, 583 } 584 if tx_dbb_serialized is not None: 585 msg["sign"]["meta"] = to_hexstr(sha256d(tx_dbb_serialized)) 586 msg = json.dumps(msg).encode('ascii') 587 dbb_client = self.plugin.get_client(self) 588 589 if not dbb_client.is_paired(): 590 raise Exception("Could not sign transaction.") 591 592 reply = dbb_client.hid_send_encrypt(msg) 593 if 'error' in reply: 594 raise Exception(reply['error']['message']) 595 596 if 'echo' not in reply: 597 raise Exception("Could not sign transaction.") 598 599 if self.plugin.is_mobile_paired() and tx_dbb_serialized is not None: 600 reply['tx'] = tx_dbb_serialized 601 self.plugin.comserver_post_notification(reply) 602 603 if steps > 1: 604 self.handler.show_message(_("Signing large transaction. Please be patient ...") + "\n\n" + 605 _("To continue, touch the Digital Bitbox's blinking light for 3 seconds.") + " " + 606 _("(Touch {} of {})").format((step + 1), steps) + "\n\n" + 607 _("To cancel, briefly touch the blinking light or wait for the timeout.") + "\n\n") 608 else: 609 self.handler.show_message(_("Signing transaction...") + "\n\n" + 610 _("To continue, touch the Digital Bitbox's blinking light for 3 seconds.") + "\n\n" + 611 _("To cancel, briefly touch the blinking light or wait for the timeout.")) 612 613 # Send twice, first returns an echo for smart verification 614 reply = dbb_client.hid_send_encrypt(msg) 615 self.handler.finished() 616 617 if 'error' in reply: 618 if reply["error"].get('code') in (600, 601): 619 # aborted via LED short touch or timeout 620 raise UserCancelled() 621 raise Exception(reply['error']['message']) 622 623 if 'sign' not in reply: 624 raise Exception("Could not sign transaction.") 625 626 dbb_signatures.extend(reply['sign']) 627 628 # Fill signatures 629 if len(dbb_signatures) != len(tx.inputs()): 630 raise Exception("Incorrect number of transactions signed.") # Should never occur 631 for i, txin in enumerate(tx.inputs()): 632 for pubkey_bytes in txin.pubkeys: 633 if txin.is_complete(): 634 break 635 signed = dbb_signatures[i] 636 if 'recid' in signed: 637 # firmware > v2.1.1 638 recid = int(signed['recid'], 16) 639 s = binascii.unhexlify(signed['sig']) 640 h = inputhasharray[i] 641 pk = ecc.ECPubkey.from_sig_string(s, recid, h) 642 pk = pk.get_public_key_hex(compressed=True) 643 elif 'pubkey' in signed: 644 # firmware <= v2.1.1 645 pk = signed['pubkey'] 646 if pk != pubkey_bytes.hex(): 647 continue 648 sig_r = int(signed['sig'][:64], 16) 649 sig_s = int(signed['sig'][64:], 16) 650 sig = ecc.der_sig_from_r_and_s(sig_r, sig_s) 651 sig = to_hexstr(sig) + '01' 652 tx.add_signature_to_txin(txin_idx=i, signing_pubkey=pubkey_bytes.hex(), sig=sig) 653 except UserCancelled: 654 raise 655 except BaseException as e: 656 self.give_error(e, True) 657 else: 658 _logger.info(f"Transaction is_complete {tx.is_complete()}") 659 660 661 class DigitalBitboxPlugin(HW_PluginBase): 662 663 libraries_available = DIGIBOX 664 keystore_class = DigitalBitbox_KeyStore 665 client = None 666 DEVICE_IDS = [ 667 (0x03eb, 0x2402) # Digital Bitbox 668 ] 669 SUPPORTED_XTYPES = ('standard', 'p2wpkh-p2sh', 'p2wpkh', 'p2wsh-p2sh', 'p2wsh') 670 671 def __init__(self, parent, config, name): 672 HW_PluginBase.__init__(self, parent, config, name) 673 if self.libraries_available: 674 self.device_manager().register_devices(self.DEVICE_IDS, plugin=self) 675 676 self.digitalbitbox_config = self.config.get('digitalbitbox', {}) 677 678 @runs_in_hwd_thread 679 def get_dbb_device(self, device): 680 dev = hid.device() 681 dev.open_path(device.path) 682 return dev 683 684 685 def create_client(self, device, handler): 686 if device.interface_number == 0 or device.usage_page == 0xffff: 687 if handler: 688 self.handler = handler 689 client = self.get_dbb_device(device) 690 if client is not None: 691 client = DigitalBitbox_Client(self, client) 692 return client 693 else: 694 return None 695 696 697 def setup_device(self, device_info, wizard, purpose): 698 device_id = device_info.device.id_ 699 client = self.scan_and_create_client_for_device(device_id=device_id, wizard=wizard) 700 if purpose == HWD_SETUP_NEW_WALLET: 701 client.setupRunning = True 702 wizard.run_task_without_blocking_gui( 703 task=lambda: client.get_xpub("m/44'/0'", 'standard')) 704 return client 705 706 707 def is_mobile_paired(self): 708 return ENCRYPTION_PRIVKEY_KEY in self.digitalbitbox_config 709 710 711 def comserver_post_notification(self, payload): 712 assert self.is_mobile_paired(), "unexpected mobile pairing error" 713 url = 'https://digitalbitbox.com/smartverification/index.php' 714 key_s = base64.b64decode(self.digitalbitbox_config[ENCRYPTION_PRIVKEY_KEY]) 715 args = 'c=data&s=0&dt=0&uuid=%s&pl=%s' % ( 716 self.digitalbitbox_config[CHANNEL_ID_KEY], 717 EncodeAES_base64(key_s, json.dumps(payload).encode('ascii')).decode('ascii'), 718 ) 719 try: 720 text = Network.send_http_on_proxy('post', url, body=args.encode('ascii'), headers={'content-type': 'application/x-www-form-urlencoded'}) 721 _logger.info(f'digitalbitbox reply from server {text}') 722 except Exception as e: 723 self.handler.show_error(repr(e)) # repr because str(Exception()) == '' 724 725 726 def get_xpub(self, device_id, derivation, xtype, wizard): 727 if xtype not in self.SUPPORTED_XTYPES: 728 raise ScriptTypeNotSupported(_('This type of script is not supported with {}.').format(self.device)) 729 if is_all_public_derivation(derivation): 730 raise Exception(f"The {self.device} does not reveal xpubs corresponding to non-hardened paths. (path: {derivation})") 731 client = self.scan_and_create_client_for_device(device_id=device_id, wizard=wizard) 732 client.check_device_dialog() 733 xpub = client.get_xpub(derivation, xtype) 734 return xpub 735 736 737 def get_client(self, keystore, force_pair=True, *, 738 devices=None, allow_user_interaction=True): 739 client = super().get_client(keystore, force_pair, 740 devices=devices, 741 allow_user_interaction=allow_user_interaction) 742 if client is not None: 743 client.check_device_dialog() 744 return client 745 746 def show_address(self, wallet, address, keystore=None): 747 if keystore is None: 748 keystore = wallet.get_keystore() 749 if not self.show_address_helper(wallet, address, keystore): 750 return 751 if type(wallet) is not Standard_Wallet: 752 keystore.handler.show_error(_('This function is only available for standard wallets when using {}.').format(self.device)) 753 return 754 if not self.is_mobile_paired(): 755 keystore.handler.show_error(_('This function is only available after pairing your {} with a mobile device.').format(self.device)) 756 return 757 if wallet.get_txin_type(address) != 'p2pkh': 758 keystore.handler.show_error(_('This function is only available for p2pkh keystores when using {}.').format(self.device)) 759 return 760 change, index = wallet.get_address_index(address) 761 keypath = '%s/%d/%d' % (keystore.get_derivation_prefix(), change, index) 762 xpub = self.get_client(keystore)._get_xpub(keypath) 763 verify_request_payload = { 764 "type": 'p2pkh', 765 "echo": xpub['echo'], 766 } 767 self.comserver_post_notification(verify_request_payload)