coldcard.py (24379B)
1 # 2 # Coldcard Electrum plugin main code. 3 # 4 # 5 import os, time, io 6 import traceback 7 from typing import TYPE_CHECKING, Optional 8 import struct 9 10 from electrum import bip32 11 from electrum.bip32 import BIP32Node, InvalidMasterKeyVersionBytes 12 from electrum.i18n import _ 13 from electrum.plugin import Device, hook, runs_in_hwd_thread 14 from electrum.keystore import Hardware_KeyStore, KeyStoreWithMPK 15 from electrum.transaction import PartialTransaction 16 from electrum.wallet import Standard_Wallet, Multisig_Wallet, Abstract_Wallet 17 from electrum.util import bfh, bh2u, versiontuple, UserFacingException 18 from electrum.base_wizard import ScriptTypeNotSupported 19 from electrum.logging import get_logger 20 21 from ..hw_wallet import HW_PluginBase, HardwareClientBase 22 from ..hw_wallet.plugin import LibraryFoundButUnusable, only_hook_if_libraries_available 23 24 25 _logger = get_logger(__name__) 26 27 28 try: 29 import hid 30 from ckcc.protocol import CCProtocolPacker, CCProtocolUnpacker 31 from ckcc.protocol import CCProtoError, CCUserRefused, CCBusyError 32 from ckcc.constants import (MAX_MSG_LEN, MAX_BLK_LEN, MSG_SIGNING_MAX_LENGTH, MAX_TXN_LEN, 33 AF_CLASSIC, AF_P2SH, AF_P2WPKH, AF_P2WSH, AF_P2WPKH_P2SH, AF_P2WSH_P2SH) 34 35 from ckcc.client import ColdcardDevice, COINKITE_VID, CKCC_PID, CKCC_SIMULATOR_PATH 36 37 requirements_ok = True 38 39 40 class ElectrumColdcardDevice(ColdcardDevice): 41 # avoid use of pycoin for MiTM message signature test 42 def mitm_verify(self, sig, expect_xpub): 43 # verify a signature (65 bytes) over the session key, using the master bip32 node 44 # - customized to use specific EC library of Electrum. 45 pubkey = BIP32Node.from_xkey(expect_xpub).eckey 46 try: 47 pubkey.verify_message_hash(sig[1:65], self.session_key) 48 return True 49 except: 50 return False 51 52 except ImportError as e: 53 if not (isinstance(e, ModuleNotFoundError) and e.name == 'ckcc'): 54 _logger.exception('error importing coldcard plugin deps') 55 requirements_ok = False 56 57 COINKITE_VID = 0xd13e 58 CKCC_PID = 0xcc10 59 60 CKCC_SIMULATED_PID = CKCC_PID ^ 0x55aa 61 62 63 class CKCCClient(HardwareClientBase): 64 65 def __init__(self, plugin, handler, dev_path, *, is_simulator=False): 66 HardwareClientBase.__init__(self, plugin=plugin) 67 self.device = plugin.device 68 self.handler = handler 69 70 # if we know what the (xfp, xpub) "should be" then track it here 71 self._expected_device = None 72 73 if is_simulator: 74 self.dev = ElectrumColdcardDevice(dev_path, encrypt=True) 75 else: 76 # open the real HID device 77 hd = hid.device(path=dev_path) 78 hd.open_path(dev_path) 79 80 self.dev = ElectrumColdcardDevice(dev=hd, encrypt=True) 81 82 # NOTE: MiTM test is delayed until we have a hint as to what XPUB we 83 # should expect. It's also kinda slow. 84 85 def __repr__(self): 86 return '<CKCCClient: xfp=%s label=%r>' % (xfp2str(self.dev.master_fingerprint), 87 self.label()) 88 89 @runs_in_hwd_thread 90 def verify_connection(self, expected_xfp: int, expected_xpub=None): 91 ex = (expected_xfp, expected_xpub) 92 93 if self._expected_device == ex: 94 # all is as expected 95 return 96 97 if expected_xpub is None: 98 expected_xpub = self.dev.master_xpub 99 100 if ( (self._expected_device is not None) 101 or (self.dev.master_fingerprint != expected_xfp) 102 or (self.dev.master_xpub != expected_xpub)): 103 # probably indicating programing error, not hacking 104 _logger.info(f"xpubs. reported by device: {self.dev.master_xpub}. " 105 f"stored in file: {expected_xpub}") 106 raise RuntimeError("Expecting %s but that's not what's connected?!" % 107 xfp2str(expected_xfp)) 108 109 # check signature over session key 110 # - mitm might have lied about xfp and xpub up to here 111 # - important that we use value capture at wallet creation time, not some value 112 # we read over USB today 113 self.dev.check_mitm(expected_xpub=expected_xpub) 114 115 self._expected_device = ex 116 117 if not getattr(self, 'ckcc_xpub', None): 118 self.ckcc_xpub = expected_xpub 119 120 _logger.info("Successfully verified against MiTM") 121 122 def is_pairable(self): 123 # can't do anything w/ devices that aren't setup (this code not normally reachable) 124 return bool(self.dev.master_xpub) 125 126 @runs_in_hwd_thread 127 def close(self): 128 # close the HID device (so can be reused) 129 self.dev.close() 130 self.dev = None 131 132 def is_initialized(self): 133 return bool(self.dev.master_xpub) 134 135 def label(self): 136 # 'label' of this Coldcard. Warning: gets saved into wallet file, which might 137 # not be encrypted, so better for privacy if based on xpub/fingerprint rather than 138 # USB serial number. 139 if self.dev.is_simulator: 140 lab = 'Coldcard Simulator ' + xfp2str(self.dev.master_fingerprint) 141 elif not self.dev.master_fingerprint: 142 # failback; not expected 143 lab = 'Coldcard #' + self.dev.serial 144 else: 145 lab = 'Coldcard ' + xfp2str(self.dev.master_fingerprint) 146 147 return lab 148 149 def manipulate_keystore_dict_during_wizard_setup(self, d: dict): 150 master_xpub = self.dev.master_xpub 151 if master_xpub is not None: 152 try: 153 node = BIP32Node.from_xkey(master_xpub) 154 except InvalidMasterKeyVersionBytes: 155 raise UserFacingException( 156 _('Invalid xpub magic. Make sure your {} device is set to the correct chain.').format(self.device) + ' ' + 157 _('You might have to unplug and plug it in again.') 158 ) from None 159 d['ckcc_xpub'] = master_xpub 160 161 @runs_in_hwd_thread 162 def has_usable_connection_with_device(self): 163 # Do end-to-end ping test 164 try: 165 self.ping_check() 166 return True 167 except: 168 return False 169 170 @runs_in_hwd_thread 171 def get_xpub(self, bip32_path, xtype): 172 assert xtype in ColdcardPlugin.SUPPORTED_XTYPES 173 _logger.info('Derive xtype = %r' % xtype) 174 xpub = self.dev.send_recv(CCProtocolPacker.get_xpub(bip32_path), timeout=5000) 175 # TODO handle timeout? 176 # change type of xpub to the requested type 177 try: 178 node = BIP32Node.from_xkey(xpub) 179 except InvalidMasterKeyVersionBytes: 180 raise UserFacingException(_('Invalid xpub magic. Make sure your {} device is set to the correct chain.') 181 .format(self.device)) from None 182 if xtype != 'standard': 183 xpub = node._replace(xtype=xtype).to_xpub() 184 return xpub 185 186 @runs_in_hwd_thread 187 def ping_check(self): 188 # check connection is working 189 assert self.dev.session_key, 'not encrypted?' 190 req = b'1234 Electrum Plugin 4321' # free up to 59 bytes 191 try: 192 echo = self.dev.send_recv(CCProtocolPacker.ping(req)) 193 assert echo == req 194 except: 195 raise RuntimeError("Communication trouble with Coldcard") 196 197 @runs_in_hwd_thread 198 def show_address(self, path, addr_fmt): 199 # prompt user w/ address, also returns it immediately. 200 return self.dev.send_recv(CCProtocolPacker.show_address(path, addr_fmt), timeout=None) 201 202 @runs_in_hwd_thread 203 def show_p2sh_address(self, *args, **kws): 204 # prompt user w/ p2sh address, also returns it immediately. 205 return self.dev.send_recv(CCProtocolPacker.show_p2sh_address(*args, **kws), timeout=None) 206 207 @runs_in_hwd_thread 208 def get_version(self): 209 # gives list of strings 210 return self.dev.send_recv(CCProtocolPacker.version(), timeout=1000).split('\n') 211 212 @runs_in_hwd_thread 213 def sign_message_start(self, path, msg): 214 # this starts the UX experience. 215 self.dev.send_recv(CCProtocolPacker.sign_message(msg, path), timeout=None) 216 217 @runs_in_hwd_thread 218 def sign_message_poll(self): 219 # poll device... if user has approved, will get tuple: (addr, sig) else None 220 return self.dev.send_recv(CCProtocolPacker.get_signed_msg(), timeout=None) 221 222 @runs_in_hwd_thread 223 def sign_transaction_start(self, raw_psbt: bytes, *, finalize: bool = False): 224 # Multiple steps to sign: 225 # - upload binary 226 # - start signing UX 227 # - wait for coldcard to complete process, or have it refused. 228 # - download resulting txn 229 assert 20 <= len(raw_psbt) < MAX_TXN_LEN, 'PSBT is too big' 230 dlen, chk = self.dev.upload_file(raw_psbt) 231 232 resp = self.dev.send_recv(CCProtocolPacker.sign_transaction(dlen, chk, finalize=finalize), 233 timeout=None) 234 235 if resp != None: 236 raise ValueError(resp) 237 238 @runs_in_hwd_thread 239 def sign_transaction_poll(self): 240 # poll device... if user has approved, will get tuple: (legnth, checksum) else None 241 return self.dev.send_recv(CCProtocolPacker.get_signed_txn(), timeout=None) 242 243 @runs_in_hwd_thread 244 def download_file(self, length, checksum, file_number=1): 245 # get a file 246 return self.dev.download_file(length, checksum, file_number=file_number) 247 248 249 250 class Coldcard_KeyStore(Hardware_KeyStore): 251 hw_type = 'coldcard' 252 device = 'Coldcard' 253 254 plugin: 'ColdcardPlugin' 255 256 def __init__(self, d): 257 Hardware_KeyStore.__init__(self, d) 258 # Errors and other user interaction is done through the wallet's 259 # handler. The handler is per-window and preserved across 260 # device reconnects 261 self.force_watching_only = False 262 self.ux_busy = False 263 264 # we need to know at least the fingerprint of the master xpub to verify against MiTM 265 # - device reports these value during encryption setup process 266 # - full xpub value now optional 267 self.ckcc_xpub = d.get('ckcc_xpub', None) 268 269 def dump(self): 270 # our additions to the stored data about keystore -- only during creation? 271 d = Hardware_KeyStore.dump(self) 272 d['ckcc_xpub'] = self.ckcc_xpub 273 return d 274 275 def get_xfp_int(self) -> int: 276 xfp = self.get_root_fingerprint() 277 assert xfp is not None 278 return xfp_int_from_xfp_bytes(bfh(xfp)) 279 280 def get_client(self): 281 # called when user tries to do something like view address, sign somthing. 282 # - not called during probing/setup 283 # - will fail if indicated device can't produce the xpub (at derivation) expected 284 rv = self.plugin.get_client(self) 285 if rv: 286 xfp_int = self.get_xfp_int() 287 rv.verify_connection(xfp_int, self.ckcc_xpub) 288 289 return rv 290 291 def give_error(self, message, clear_client=False): 292 self.logger.info(message) 293 if not self.ux_busy: 294 self.handler.show_error(message) 295 else: 296 self.ux_busy = False 297 if clear_client: 298 self.client = None 299 raise UserFacingException(message) 300 301 def wrap_busy(func): 302 # decorator: function takes over the UX on the device. 303 def wrapper(self, *args, **kwargs): 304 try: 305 self.ux_busy = True 306 return func(self, *args, **kwargs) 307 finally: 308 self.ux_busy = False 309 return wrapper 310 311 def decrypt_message(self, pubkey, message, password): 312 raise UserFacingException(_('Encryption and decryption are currently not supported for {}').format(self.device)) 313 314 @wrap_busy 315 def sign_message(self, sequence, message, password): 316 # Sign a message on device. Since we have big screen, of course we 317 # have to show the message unabiguously there first! 318 try: 319 msg = message.encode('ascii', errors='strict') 320 assert 1 <= len(msg) <= MSG_SIGNING_MAX_LENGTH 321 except (UnicodeError, AssertionError): 322 # there are other restrictions on message content, 323 # but let the device enforce and report those 324 self.handler.show_error('Only short (%d max) ASCII messages can be signed.' 325 % MSG_SIGNING_MAX_LENGTH) 326 return b'' 327 328 path = self.get_derivation_prefix() + ("/%d/%d" % sequence) 329 try: 330 cl = self.get_client() 331 try: 332 self.handler.show_message("Signing message (using %s)..." % path) 333 334 cl.sign_message_start(path, msg) 335 336 while 1: 337 # How to kill some time, without locking UI? 338 time.sleep(0.250) 339 340 resp = cl.sign_message_poll() 341 if resp is not None: 342 break 343 344 finally: 345 self.handler.finished() 346 347 assert len(resp) == 2 348 addr, raw_sig = resp 349 350 # already encoded in Bitcoin fashion, binary. 351 assert 40 < len(raw_sig) <= 65 352 353 return raw_sig 354 355 except (CCUserRefused, CCBusyError) as exc: 356 self.handler.show_error(str(exc)) 357 except CCProtoError as exc: 358 self.logger.exception('Error showing address') 359 self.handler.show_error('{}\n\n{}'.format( 360 _('Error showing address') + ':', str(exc))) 361 except Exception as e: 362 self.give_error(e, True) 363 364 # give empty bytes for error cases; it seems to clear the old signature box 365 return b'' 366 367 @wrap_busy 368 def sign_transaction(self, tx, password): 369 # Upload PSBT for signing. 370 # - we can also work offline (without paired device present) 371 if tx.is_complete(): 372 return 373 374 client = self.get_client() 375 376 assert client.dev.master_fingerprint == self.get_xfp_int() 377 378 raw_psbt = tx.serialize_as_bytes() 379 380 try: 381 try: 382 self.handler.show_message("Authorize Transaction...") 383 384 client.sign_transaction_start(raw_psbt) 385 386 while 1: 387 # How to kill some time, without locking UI? 388 time.sleep(0.250) 389 390 resp = client.sign_transaction_poll() 391 if resp is not None: 392 break 393 394 rlen, rsha = resp 395 396 # download the resulting txn. 397 raw_resp = client.download_file(rlen, rsha) 398 399 finally: 400 self.handler.finished() 401 402 except (CCUserRefused, CCBusyError) as exc: 403 self.logger.info(f'Did not sign: {exc}') 404 self.handler.show_error(str(exc)) 405 return 406 except BaseException as e: 407 self.logger.exception('') 408 self.give_error(e, True) 409 return 410 411 tx2 = PartialTransaction.from_raw_psbt(raw_resp) 412 # apply partial signatures back into txn 413 tx.combine_with_other_psbt(tx2) 414 # caller's logic looks at tx now and if it's sufficiently signed, 415 # will send it if that's the user's intent. 416 417 @staticmethod 418 def _encode_txin_type(txin_type): 419 # Map from Electrum code names to our code numbers. 420 return {'standard': AF_CLASSIC, 'p2pkh': AF_CLASSIC, 421 'p2sh': AF_P2SH, 422 'p2wpkh-p2sh': AF_P2WPKH_P2SH, 423 'p2wpkh': AF_P2WPKH, 424 'p2wsh-p2sh': AF_P2WSH_P2SH, 425 'p2wsh': AF_P2WSH, 426 }[txin_type] 427 428 @wrap_busy 429 def show_address(self, sequence, txin_type): 430 client = self.get_client() 431 address_path = self.get_derivation_prefix()[2:] + "/%d/%d"%sequence 432 addr_fmt = self._encode_txin_type(txin_type) 433 try: 434 try: 435 self.handler.show_message(_("Showing address ...")) 436 dev_addr = client.show_address(address_path, addr_fmt) 437 # we could double check address here 438 finally: 439 self.handler.finished() 440 except CCProtoError as exc: 441 self.logger.exception('Error showing address') 442 self.handler.show_error('{}\n\n{}'.format( 443 _('Error showing address') + ':', str(exc))) 444 except BaseException as exc: 445 self.logger.exception('') 446 self.handler.show_error(exc) 447 448 @wrap_busy 449 def show_p2sh_address(self, M, script, xfp_paths, txin_type): 450 client = self.get_client() 451 addr_fmt = self._encode_txin_type(txin_type) 452 try: 453 try: 454 self.handler.show_message(_("Showing address ...")) 455 dev_addr = client.show_p2sh_address(M, xfp_paths, script, addr_fmt=addr_fmt) 456 # we could double check address here 457 finally: 458 self.handler.finished() 459 except CCProtoError as exc: 460 self.logger.exception('Error showing address') 461 self.handler.show_error('{}.\n{}\n\n{}'.format( 462 _('Error showing address'), 463 _('Make sure you have imported the correct wallet description ' 464 'file on the device for this multisig wallet.'), 465 str(exc))) 466 except BaseException as exc: 467 self.logger.exception('') 468 self.handler.show_error(exc) 469 470 471 class ColdcardPlugin(HW_PluginBase): 472 keystore_class = Coldcard_KeyStore 473 minimum_library = (0, 7, 7) 474 475 DEVICE_IDS = [ 476 (COINKITE_VID, CKCC_PID), 477 (COINKITE_VID, CKCC_SIMULATED_PID) 478 ] 479 480 SUPPORTED_XTYPES = ('standard', 'p2wpkh-p2sh', 'p2wpkh', 'p2wsh-p2sh', 'p2wsh') 481 482 def __init__(self, parent, config, name): 483 HW_PluginBase.__init__(self, parent, config, name) 484 485 self.libraries_available = self.check_libraries_available() 486 if not self.libraries_available: 487 return 488 489 self.device_manager().register_devices(self.DEVICE_IDS, plugin=self) 490 self.device_manager().register_enumerate_func(self.detect_simulator) 491 492 def get_library_version(self): 493 import ckcc 494 try: 495 version = ckcc.__version__ 496 except AttributeError: 497 version = 'unknown' 498 if requirements_ok: 499 return version 500 else: 501 raise LibraryFoundButUnusable(library_version=version) 502 503 def detect_simulator(self): 504 # if there is a simulator running on this machine, 505 # return details about it so it's offered as a pairing choice 506 fn = CKCC_SIMULATOR_PATH 507 508 if os.path.exists(fn): 509 return [Device(path=fn, 510 interface_number=-1, 511 id_=fn, 512 product_key=(COINKITE_VID, CKCC_SIMULATED_PID), 513 usage_page=0, 514 transport_ui_string='simulator')] 515 516 return [] 517 518 @runs_in_hwd_thread 519 def create_client(self, device, handler): 520 if handler: 521 self.handler = handler 522 523 # We are given a HID device, or at least some details about it. 524 # Not sure why not we aren't just given a HID library handle, but 525 # the 'path' is unabiguous, so we'll use that. 526 try: 527 rv = CKCCClient(self, handler, device.path, 528 is_simulator=(device.product_key[1] == CKCC_SIMULATED_PID)) 529 return rv 530 except Exception as e: 531 self.logger.exception('late failure connecting to device?') 532 return None 533 534 def setup_device(self, device_info, wizard, purpose): 535 device_id = device_info.device.id_ 536 client = self.scan_and_create_client_for_device(device_id=device_id, wizard=wizard) 537 return client 538 539 def get_xpub(self, device_id, derivation, xtype, wizard): 540 # this seems to be part of the pairing process only, not during normal ops? 541 # base_wizard:on_hw_derivation 542 if xtype not in self.SUPPORTED_XTYPES: 543 raise ScriptTypeNotSupported(_('This type of script is not supported with {}.').format(self.device)) 544 client = self.scan_and_create_client_for_device(device_id=device_id, wizard=wizard) 545 client.ping_check() 546 547 xpub = client.get_xpub(derivation, xtype) 548 return xpub 549 550 @runs_in_hwd_thread 551 def get_client(self, keystore, force_pair=True, *, 552 devices=None, allow_user_interaction=True) -> Optional['CKCCClient']: 553 # Acquire a connection to the hardware device (via USB) 554 client = super().get_client(keystore, force_pair, 555 devices=devices, 556 allow_user_interaction=allow_user_interaction) 557 558 if client is not None: 559 client.ping_check() 560 561 return client 562 563 @staticmethod 564 def export_ms_wallet(wallet: Multisig_Wallet, fp, name): 565 # Build the text file Coldcard needs to understand the multisig wallet 566 # it is participating in. All involved Coldcards can share same file. 567 assert isinstance(wallet, Multisig_Wallet) 568 569 print('# Exported from Electrum', file=fp) 570 print(f'Name: {name:.20s}', file=fp) 571 print(f'Policy: {wallet.m} of {wallet.n}', file=fp) 572 print(f'Format: {wallet.txin_type.upper()}' , file=fp) 573 574 xpubs = [] 575 for xpub, ks in zip(wallet.get_master_public_keys(), wallet.get_keystores()): # type: str, KeyStoreWithMPK 576 fp_bytes, der_full = ks.get_fp_and_derivation_to_be_used_in_partial_tx(der_suffix=[], only_der_suffix=False) 577 fp_hex = fp_bytes.hex().upper() 578 der_prefix_str = bip32.convert_bip32_intpath_to_strpath(der_full) 579 xpubs.append( (fp_hex, xpub, der_prefix_str) ) 580 581 # Before v3.2.1 derivation didn't matter too much to the Coldcard, since it 582 # could use key path data from PSBT or USB request as needed. However, 583 # derivation data is now required. 584 585 print('', file=fp) 586 587 assert len(xpubs) == wallet.n 588 for xfp, xpub, der_prefix in xpubs: 589 print(f'Derivation: {der_prefix}', file=fp) 590 print(f'{xfp}: {xpub}\n', file=fp) 591 592 def show_address(self, wallet, address, keystore: 'Coldcard_KeyStore' = None): 593 if keystore is None: 594 keystore = wallet.get_keystore() 595 if not self.show_address_helper(wallet, address, keystore): 596 return 597 598 txin_type = wallet.get_txin_type(address) 599 600 # Standard_Wallet => not multisig, must be bip32 601 if type(wallet) is Standard_Wallet: 602 sequence = wallet.get_address_index(address) 603 keystore.show_address(sequence, txin_type) 604 elif type(wallet) is Multisig_Wallet: 605 assert isinstance(wallet, Multisig_Wallet) # only here for type-hints in IDE 606 # More involved for P2SH/P2WSH addresses: need M, and all public keys, and their 607 # derivation paths. Must construct script, and track fingerprints+paths for 608 # all those keys 609 610 pubkey_deriv_info = wallet.get_public_keys_with_deriv_info(address) 611 pubkey_hexes = sorted([pk.hex() for pk in list(pubkey_deriv_info)]) 612 xfp_paths = [] 613 for pubkey_hex in pubkey_hexes: 614 pubkey = bytes.fromhex(pubkey_hex) 615 ks, der_suffix = pubkey_deriv_info[pubkey] 616 fp_bytes, der_full = ks.get_fp_and_derivation_to_be_used_in_partial_tx(der_suffix, only_der_suffix=False) 617 xfp_int = xfp_int_from_xfp_bytes(fp_bytes) 618 xfp_paths.append([xfp_int] + list(der_full)) 619 620 script = bfh(wallet.pubkeys_to_scriptcode(pubkey_hexes)) 621 622 keystore.show_p2sh_address(wallet.m, script, xfp_paths, txin_type) 623 624 else: 625 keystore.handler.show_error(_('This function is only available for standard wallets when using {}.').format(self.device)) 626 return 627 628 629 def xfp_int_from_xfp_bytes(fp_bytes: bytes) -> int: 630 return int.from_bytes(fp_bytes, byteorder="little", signed=False) 631 632 633 def xfp2str(xfp: int) -> str: 634 # Standardized way to show an xpub's fingerprint... it's a 4-byte string 635 # and not really an integer. Used to show as '0x%08x' but that's wrong endian. 636 return struct.pack('<I', xfp).hex().lower() 637 638 # EOF