electrum

Electrum Bitcoin wallet
git clone https://git.parazyd.org/electrum
Log | Files | Refs | Submodules

crypto.py (15661B)


      1 # -*- coding: utf-8 -*-
      2 #
      3 # Electrum - lightweight Bitcoin client
      4 # Copyright (C) 2018 The Electrum developers
      5 #
      6 # Permission is hereby granted, free of charge, to any person
      7 # obtaining a copy of this software and associated documentation files
      8 # (the "Software"), to deal in the Software without restriction,
      9 # including without limitation the rights to use, copy, modify, merge,
     10 # publish, distribute, sublicense, and/or sell copies of the Software,
     11 # and to permit persons to whom the Software is furnished to do so,
     12 # subject to the following conditions:
     13 #
     14 # The above copyright notice and this permission notice shall be
     15 # included in all copies or substantial portions of the Software.
     16 #
     17 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
     18 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
     19 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
     20 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
     21 # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
     22 # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
     23 # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
     24 # SOFTWARE.
     25 
     26 import base64
     27 import os
     28 import sys
     29 import hashlib
     30 import hmac
     31 from typing import Union
     32 
     33 from .util import assert_bytes, InvalidPassword, to_bytes, to_string, WalletFileException, versiontuple
     34 from .i18n import _
     35 from .logging import get_logger
     36 
     37 
     38 _logger = get_logger(__name__)
     39 
     40 
     41 HAS_PYAES = False
     42 try:
     43     import pyaes
     44 except:
     45     pass
     46 else:
     47     HAS_PYAES = True
     48 
     49 HAS_CRYPTODOME = False
     50 MIN_CRYPTODOME_VERSION = "3.7"
     51 try:
     52     import Cryptodome
     53     if versiontuple(Cryptodome.__version__) < versiontuple(MIN_CRYPTODOME_VERSION):
     54         _logger.warning(f"found module 'Cryptodome' but it is too old: {Cryptodome.__version__}<{MIN_CRYPTODOME_VERSION}")
     55         raise Exception()
     56     from Cryptodome.Cipher import ChaCha20_Poly1305 as CD_ChaCha20_Poly1305
     57     from Cryptodome.Cipher import ChaCha20 as CD_ChaCha20
     58     from Cryptodome.Cipher import AES as CD_AES
     59 except:
     60     pass
     61 else:
     62     HAS_CRYPTODOME = True
     63 
     64 HAS_CRYPTOGRAPHY = False
     65 MIN_CRYPTOGRAPHY_VERSION = "2.1"
     66 try:
     67     import cryptography
     68     if versiontuple(cryptography.__version__) < versiontuple(MIN_CRYPTOGRAPHY_VERSION):
     69         _logger.warning(f"found module 'cryptography' but it is too old: {cryptography.__version__}<{MIN_CRYPTOGRAPHY_VERSION}")
     70         raise Exception()
     71     from cryptography import exceptions
     72     from cryptography.hazmat.primitives.ciphers import Cipher as CG_Cipher
     73     from cryptography.hazmat.primitives.ciphers import algorithms as CG_algorithms
     74     from cryptography.hazmat.primitives.ciphers import modes as CG_modes
     75     from cryptography.hazmat.backends import default_backend as CG_default_backend
     76     import cryptography.hazmat.primitives.ciphers.aead as CG_aead
     77 except:
     78     pass
     79 else:
     80     HAS_CRYPTOGRAPHY = True
     81 
     82 
     83 if not (HAS_CRYPTODOME or HAS_CRYPTOGRAPHY):
     84     sys.exit(f"Error: at least one of ('pycryptodomex', 'cryptography') needs to be installed.")
     85 
     86 
     87 class InvalidPadding(Exception):
     88     pass
     89 
     90 
     91 def append_PKCS7_padding(data: bytes) -> bytes:
     92     assert_bytes(data)
     93     padlen = 16 - (len(data) % 16)
     94     return data + bytes([padlen]) * padlen
     95 
     96 
     97 def strip_PKCS7_padding(data: bytes) -> bytes:
     98     assert_bytes(data)
     99     if len(data) % 16 != 0 or len(data) == 0:
    100         raise InvalidPadding("invalid length")
    101     padlen = data[-1]
    102     if not (0 < padlen <= 16):
    103         raise InvalidPadding("invalid padding byte (out of range)")
    104     for i in data[-padlen:]:
    105         if i != padlen:
    106             raise InvalidPadding("invalid padding byte (inconsistent)")
    107     return data[0:-padlen]
    108 
    109 
    110 def aes_encrypt_with_iv(key: bytes, iv: bytes, data: bytes) -> bytes:
    111     assert_bytes(key, iv, data)
    112     data = append_PKCS7_padding(data)
    113     if HAS_CRYPTODOME:
    114         e = CD_AES.new(key, CD_AES.MODE_CBC, iv).encrypt(data)
    115     elif HAS_CRYPTOGRAPHY:
    116         cipher = CG_Cipher(CG_algorithms.AES(key), CG_modes.CBC(iv), backend=CG_default_backend())
    117         encryptor = cipher.encryptor()
    118         e = encryptor.update(data) + encryptor.finalize()
    119     elif HAS_PYAES:
    120         aes_cbc = pyaes.AESModeOfOperationCBC(key, iv=iv)
    121         aes = pyaes.Encrypter(aes_cbc, padding=pyaes.PADDING_NONE)
    122         e = aes.feed(data) + aes.feed()  # empty aes.feed() flushes buffer
    123     else:
    124         raise Exception("no AES backend found")
    125     return e
    126 
    127 
    128 def aes_decrypt_with_iv(key: bytes, iv: bytes, data: bytes) -> bytes:
    129     assert_bytes(key, iv, data)
    130     if HAS_CRYPTODOME:
    131         cipher = CD_AES.new(key, CD_AES.MODE_CBC, iv)
    132         data = cipher.decrypt(data)
    133     elif HAS_CRYPTOGRAPHY:
    134         cipher = CG_Cipher(CG_algorithms.AES(key), CG_modes.CBC(iv), backend=CG_default_backend())
    135         decryptor = cipher.decryptor()
    136         data = decryptor.update(data) + decryptor.finalize()
    137     elif HAS_PYAES:
    138         aes_cbc = pyaes.AESModeOfOperationCBC(key, iv=iv)
    139         aes = pyaes.Decrypter(aes_cbc, padding=pyaes.PADDING_NONE)
    140         data = aes.feed(data) + aes.feed()  # empty aes.feed() flushes buffer
    141     else:
    142         raise Exception("no AES backend found")
    143     try:
    144         return strip_PKCS7_padding(data)
    145     except InvalidPadding:
    146         raise InvalidPassword()
    147 
    148 
    149 def EncodeAES_base64(secret: bytes, msg: bytes) -> bytes:
    150     """Returns base64 encoded ciphertext."""
    151     e = EncodeAES_bytes(secret, msg)
    152     return base64.b64encode(e)
    153 
    154 
    155 def EncodeAES_bytes(secret: bytes, msg: bytes) -> bytes:
    156     assert_bytes(msg)
    157     iv = bytes(os.urandom(16))
    158     ct = aes_encrypt_with_iv(secret, iv, msg)
    159     return iv + ct
    160 
    161 
    162 def DecodeAES_base64(secret: bytes, ciphertext_b64: Union[bytes, str]) -> bytes:
    163     ciphertext = bytes(base64.b64decode(ciphertext_b64))
    164     return DecodeAES_bytes(secret, ciphertext)
    165 
    166 
    167 def DecodeAES_bytes(secret: bytes, ciphertext: bytes) -> bytes:
    168     assert_bytes(ciphertext)
    169     iv, e = ciphertext[:16], ciphertext[16:]
    170     s = aes_decrypt_with_iv(secret, iv, e)
    171     return s
    172 
    173 
    174 PW_HASH_VERSION_LATEST = 1
    175 KNOWN_PW_HASH_VERSIONS = (1, 2, )
    176 SUPPORTED_PW_HASH_VERSIONS = (1, )
    177 assert PW_HASH_VERSION_LATEST in KNOWN_PW_HASH_VERSIONS
    178 assert PW_HASH_VERSION_LATEST in SUPPORTED_PW_HASH_VERSIONS
    179 
    180 
    181 class UnexpectedPasswordHashVersion(InvalidPassword, WalletFileException):
    182     def __init__(self, version):
    183         self.version = version
    184 
    185     def __str__(self):
    186         return "{unexpected}: {version}\n{instruction}".format(
    187             unexpected=_("Unexpected password hash version"),
    188             version=self.version,
    189             instruction=_('You are most likely using an outdated version of Electrum. Please update.'))
    190 
    191 
    192 class UnsupportedPasswordHashVersion(InvalidPassword, WalletFileException):
    193     def __init__(self, version):
    194         self.version = version
    195 
    196     def __str__(self):
    197         return "{unsupported}: {version}\n{instruction}".format(
    198             unsupported=_("Unsupported password hash version"),
    199             version=self.version,
    200             instruction=f"To open this wallet, try 'git checkout password_v{self.version}'.\n"
    201                         "Alternatively, restore from seed.")
    202 
    203 
    204 def _hash_password(password: Union[bytes, str], *, version: int) -> bytes:
    205     pw = to_bytes(password, 'utf8')
    206     if version not in SUPPORTED_PW_HASH_VERSIONS:
    207         raise UnsupportedPasswordHashVersion(version)
    208     if version == 1:
    209         return sha256d(pw)
    210     else:
    211         assert version not in KNOWN_PW_HASH_VERSIONS
    212         raise UnexpectedPasswordHashVersion(version)
    213 
    214 
    215 def _pw_encode_raw(data: bytes, password: Union[bytes, str], *, version: int) -> bytes:
    216     if version not in KNOWN_PW_HASH_VERSIONS:
    217         raise UnexpectedPasswordHashVersion(version)
    218     # derive key from password
    219     secret = _hash_password(password, version=version)
    220     # encrypt given data
    221     ciphertext = EncodeAES_bytes(secret, data)
    222     return ciphertext
    223 
    224 
    225 def _pw_decode_raw(data_bytes: bytes, password: Union[bytes, str], *, version: int) -> bytes:
    226     if version not in KNOWN_PW_HASH_VERSIONS:
    227         raise UnexpectedPasswordHashVersion(version)
    228     # derive key from password
    229     secret = _hash_password(password, version=version)
    230     # decrypt given data
    231     try:
    232         d = DecodeAES_bytes(secret, data_bytes)
    233     except Exception as e:
    234         raise InvalidPassword() from e
    235     return d
    236 
    237 
    238 def pw_encode_bytes(data: bytes, password: Union[bytes, str], *, version: int) -> str:
    239     """plaintext bytes -> base64 ciphertext"""
    240     ciphertext = _pw_encode_raw(data, password, version=version)
    241     ciphertext_b64 = base64.b64encode(ciphertext)
    242     return ciphertext_b64.decode('utf8')
    243 
    244 
    245 def pw_decode_bytes(data: str, password: Union[bytes, str], *, version:int) -> bytes:
    246     """base64 ciphertext -> plaintext bytes"""
    247     if version not in KNOWN_PW_HASH_VERSIONS:
    248         raise UnexpectedPasswordHashVersion(version)
    249     data_bytes = bytes(base64.b64decode(data))
    250     return _pw_decode_raw(data_bytes, password, version=version)
    251 
    252 
    253 def pw_encode_with_version_and_mac(data: bytes, password: Union[bytes, str]) -> str:
    254     """plaintext bytes -> base64 ciphertext"""
    255     # https://crypto.stackexchange.com/questions/202/should-we-mac-then-encrypt-or-encrypt-then-mac
    256     # Encrypt-and-MAC. The MAC will be used to detect invalid passwords
    257     version = PW_HASH_VERSION_LATEST
    258     mac = sha256(data)[0:4]
    259     ciphertext = _pw_encode_raw(data, password, version=version)
    260     ciphertext_b64 = base64.b64encode(bytes([version]) + ciphertext + mac)
    261     return ciphertext_b64.decode('utf8')
    262 
    263 
    264 def pw_decode_with_version_and_mac(data: str, password: Union[bytes, str]) -> bytes:
    265     """base64 ciphertext -> plaintext bytes"""
    266     data_bytes = bytes(base64.b64decode(data))
    267     version = int(data_bytes[0])
    268     encrypted = data_bytes[1:-4]
    269     mac = data_bytes[-4:]
    270     if version not in KNOWN_PW_HASH_VERSIONS:
    271         raise UnexpectedPasswordHashVersion(version)
    272     decrypted = _pw_decode_raw(encrypted, password, version=version)
    273     if sha256(decrypted)[0:4] != mac:
    274         raise InvalidPassword()
    275     return decrypted
    276 
    277 
    278 def pw_encode(data: str, password: Union[bytes, str, None], *, version: int) -> str:
    279     """plaintext str -> base64 ciphertext"""
    280     if not password:
    281         return data
    282     plaintext_bytes = to_bytes(data, "utf8")
    283     return pw_encode_bytes(plaintext_bytes, password, version=version)
    284 
    285 
    286 def pw_decode(data: str, password: Union[bytes, str, None], *, version: int) -> str:
    287     """base64 ciphertext -> plaintext str"""
    288     if password is None:
    289         return data
    290     plaintext_bytes = pw_decode_bytes(data, password, version=version)
    291     try:
    292         plaintext_str = to_string(plaintext_bytes, "utf8")
    293     except UnicodeDecodeError as e:
    294         raise InvalidPassword() from e
    295     return plaintext_str
    296 
    297 
    298 def sha256(x: Union[bytes, str]) -> bytes:
    299     x = to_bytes(x, 'utf8')
    300     return bytes(hashlib.sha256(x).digest())
    301 
    302 
    303 def sha256d(x: Union[bytes, str]) -> bytes:
    304     x = to_bytes(x, 'utf8')
    305     out = bytes(sha256(sha256(x)))
    306     return out
    307 
    308 
    309 def hash_160(x: bytes) -> bytes:
    310     return ripemd(sha256(x))
    311 
    312 def ripemd(x):
    313     try:
    314         md = hashlib.new('ripemd160')
    315         md.update(x)
    316         return md.digest()
    317     except BaseException:
    318         # ripemd160 is not guaranteed to be available in hashlib on all platforms.
    319         # Historically, our Android builds had hashlib/openssl which did not have it.
    320         # see https://github.com/spesmilo/electrum/issues/7093
    321         # We bundle a pure python implementation as fallback that gets used now:
    322         from . import ripemd
    323         md = ripemd.new(x)
    324         return md.digest()
    325 
    326 def hmac_oneshot(key: bytes, msg: bytes, digest) -> bytes:
    327     if hasattr(hmac, 'digest'):
    328         # requires python 3.7+; faster
    329         return hmac.digest(key, msg, digest)
    330     else:
    331         return hmac.new(key, msg, digest).digest()
    332 
    333 
    334 def chacha20_poly1305_encrypt(
    335         *,
    336         key: bytes,
    337         nonce: bytes,
    338         associated_data: bytes = None,
    339         data: bytes
    340 ) -> bytes:
    341     assert isinstance(key, (bytes, bytearray))
    342     assert isinstance(nonce, (bytes, bytearray))
    343     assert isinstance(associated_data, (bytes, bytearray, type(None)))
    344     assert isinstance(data, (bytes, bytearray))
    345     assert len(key) == 32, f"unexpected key size: {len(key)} (expected: 32)"
    346     assert len(nonce) == 12, f"unexpected nonce size: {len(nonce)} (expected: 12)"
    347     if HAS_CRYPTODOME:
    348         cipher = CD_ChaCha20_Poly1305.new(key=key, nonce=nonce)
    349         if associated_data is not None:
    350             cipher.update(associated_data)
    351         ciphertext, mac = cipher.encrypt_and_digest(plaintext=data)
    352         return ciphertext + mac
    353     if HAS_CRYPTOGRAPHY:
    354         a = CG_aead.ChaCha20Poly1305(key)
    355         return a.encrypt(nonce, data, associated_data)
    356     raise Exception("no chacha20 backend found")
    357 
    358 
    359 def chacha20_poly1305_decrypt(
    360         *,
    361         key: bytes,
    362         nonce: bytes,
    363         associated_data: bytes = None,
    364         data: bytes
    365 ) -> bytes:
    366     assert isinstance(key, (bytes, bytearray))
    367     assert isinstance(nonce, (bytes, bytearray))
    368     assert isinstance(associated_data, (bytes, bytearray, type(None)))
    369     assert isinstance(data, (bytes, bytearray))
    370     assert len(key) == 32, f"unexpected key size: {len(key)} (expected: 32)"
    371     assert len(nonce) == 12, f"unexpected nonce size: {len(nonce)} (expected: 12)"
    372     if HAS_CRYPTODOME:
    373         cipher = CD_ChaCha20_Poly1305.new(key=key, nonce=nonce)
    374         if associated_data is not None:
    375             cipher.update(associated_data)
    376         # raises ValueError if not valid (e.g. incorrect MAC)
    377         return cipher.decrypt_and_verify(ciphertext=data[:-16], received_mac_tag=data[-16:])
    378     if HAS_CRYPTOGRAPHY:
    379         a = CG_aead.ChaCha20Poly1305(key)
    380         try:
    381             return a.decrypt(nonce, data, associated_data)
    382         except cryptography.exceptions.InvalidTag as e:
    383             raise ValueError("invalid tag") from e
    384     raise Exception("no chacha20 backend found")
    385 
    386 
    387 def chacha20_encrypt(*, key: bytes, nonce: bytes, data: bytes) -> bytes:
    388     assert isinstance(key, (bytes, bytearray))
    389     assert isinstance(nonce, (bytes, bytearray))
    390     assert isinstance(data, (bytes, bytearray))
    391     assert len(key) == 32, f"unexpected key size: {len(key)} (expected: 32)"
    392     assert len(nonce) in (8, 12), f"unexpected nonce size: {len(nonce)} (expected: 8 or 12)"
    393     if HAS_CRYPTODOME:
    394         cipher = CD_ChaCha20.new(key=key, nonce=nonce)
    395         return cipher.encrypt(data)
    396     if HAS_CRYPTOGRAPHY:
    397         nonce = bytes(16 - len(nonce)) + nonce  # cryptography wants 16 byte nonces
    398         algo = CG_algorithms.ChaCha20(key=key, nonce=nonce)
    399         cipher = CG_Cipher(algo, mode=None, backend=CG_default_backend())
    400         encryptor = cipher.encryptor()
    401         return encryptor.update(data)
    402     raise Exception("no chacha20 backend found")
    403 
    404 
    405 def chacha20_decrypt(*, key: bytes, nonce: bytes, data: bytes) -> bytes:
    406     assert isinstance(key, (bytes, bytearray))
    407     assert isinstance(nonce, (bytes, bytearray))
    408     assert isinstance(data, (bytes, bytearray))
    409     assert len(key) == 32, f"unexpected key size: {len(key)} (expected: 32)"
    410     assert len(nonce) in (8, 12), f"unexpected nonce size: {len(nonce)} (expected: 8 or 12)"
    411     if HAS_CRYPTODOME:
    412         cipher = CD_ChaCha20.new(key=key, nonce=nonce)
    413         return cipher.decrypt(data)
    414     if HAS_CRYPTOGRAPHY:
    415         nonce = bytes(16 - len(nonce)) + nonce  # cryptography wants 16 byte nonces
    416         algo = CG_algorithms.ChaCha20(key=key, nonce=nonce)
    417         cipher = CG_Cipher(algo, mode=None, backend=CG_default_backend())
    418         decryptor = cipher.decryptor()
    419         return decryptor.update(data)
    420     raise Exception("no chacha20 backend found")