storage.py (7860B)
1 #!/usr/bin/env python 2 # 3 # Electrum - lightweight Bitcoin client 4 # Copyright (C) 2015 Thomas Voegtlin 5 # 6 # Permission is hereby granted, free of charge, to any person 7 # obtaining a copy of this software and associated documentation files 8 # (the "Software"), to deal in the Software without restriction, 9 # including without limitation the rights to use, copy, modify, merge, 10 # publish, distribute, sublicense, and/or sell copies of the Software, 11 # and to permit persons to whom the Software is furnished to do so, 12 # subject to the following conditions: 13 # 14 # The above copyright notice and this permission notice shall be 15 # included in all copies or substantial portions of the Software. 16 # 17 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 18 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 19 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 20 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS 21 # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 22 # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 23 # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 24 # SOFTWARE. 25 import os 26 import threading 27 import stat 28 import hashlib 29 import base64 30 import zlib 31 from enum import IntEnum 32 33 from . import ecc 34 from .util import (profiler, InvalidPassword, WalletFileException, bfh, standardize_path, 35 test_read_write_permissions) 36 37 from .wallet_db import WalletDB 38 from .logging import Logger 39 40 41 def get_derivation_used_for_hw_device_encryption(): 42 return ("m" 43 "/4541509'" # ascii 'ELE' as decimal ("BIP43 purpose") 44 "/1112098098'") # ascii 'BIE2' as decimal 45 46 47 class StorageEncryptionVersion(IntEnum): 48 PLAINTEXT = 0 49 USER_PASSWORD = 1 50 XPUB_PASSWORD = 2 51 52 53 class StorageReadWriteError(Exception): pass 54 55 56 # TODO: Rename to Storage 57 class WalletStorage(Logger): 58 59 def __init__(self, path): 60 Logger.__init__(self) 61 self.path = standardize_path(path) 62 self._file_exists = bool(self.path and os.path.exists(self.path)) 63 self.logger.info(f"wallet path {self.path}") 64 self.pubkey = None 65 self.decrypted = '' 66 try: 67 test_read_write_permissions(self.path) 68 except IOError as e: 69 raise StorageReadWriteError(e) from e 70 if self.file_exists(): 71 with open(self.path, "r", encoding='utf-8') as f: 72 self.raw = f.read() 73 self._encryption_version = self._init_encryption_version() 74 else: 75 self.raw = '' 76 self._encryption_version = StorageEncryptionVersion.PLAINTEXT 77 78 def read(self): 79 return self.decrypted if self.is_encrypted() else self.raw 80 81 def write(self, data: str) -> None: 82 s = self.encrypt_before_writing(data) 83 temp_path = "%s.tmp.%s" % (self.path, os.getpid()) 84 with open(temp_path, "w", encoding='utf-8') as f: 85 f.write(s) 86 f.flush() 87 os.fsync(f.fileno()) 88 89 try: 90 mode = os.stat(self.path).st_mode 91 except FileNotFoundError: 92 mode = stat.S_IREAD | stat.S_IWRITE 93 94 # assert that wallet file does not exist, to prevent wallet corruption (see issue #5082) 95 if not self.file_exists(): 96 assert not os.path.exists(self.path) 97 os.replace(temp_path, self.path) 98 os.chmod(self.path, mode) 99 self._file_exists = True 100 self.logger.info(f"saved {self.path}") 101 102 def file_exists(self) -> bool: 103 return self._file_exists 104 105 def is_past_initial_decryption(self): 106 """Return if storage is in a usable state for normal operations. 107 108 The value is True exactly 109 if encryption is disabled completely (self.is_encrypted() == False), 110 or if encryption is enabled but the contents have already been decrypted. 111 """ 112 return not self.is_encrypted() or bool(self.pubkey) 113 114 def is_encrypted(self): 115 """Return if storage encryption is currently enabled.""" 116 return self.get_encryption_version() != StorageEncryptionVersion.PLAINTEXT 117 118 def is_encrypted_with_user_pw(self): 119 return self.get_encryption_version() == StorageEncryptionVersion.USER_PASSWORD 120 121 def is_encrypted_with_hw_device(self): 122 return self.get_encryption_version() == StorageEncryptionVersion.XPUB_PASSWORD 123 124 def get_encryption_version(self): 125 """Return the version of encryption used for this storage. 126 127 0: plaintext / no encryption 128 129 ECIES, private key derived from a password, 130 1: password is provided by user 131 2: password is derived from an xpub; used with hw wallets 132 """ 133 return self._encryption_version 134 135 def _init_encryption_version(self): 136 try: 137 magic = base64.b64decode(self.raw)[0:4] 138 if magic == b'BIE1': 139 return StorageEncryptionVersion.USER_PASSWORD 140 elif magic == b'BIE2': 141 return StorageEncryptionVersion.XPUB_PASSWORD 142 else: 143 return StorageEncryptionVersion.PLAINTEXT 144 except: 145 return StorageEncryptionVersion.PLAINTEXT 146 147 @staticmethod 148 def get_eckey_from_password(password): 149 secret = hashlib.pbkdf2_hmac('sha512', password.encode('utf-8'), b'', iterations=1024) 150 ec_key = ecc.ECPrivkey.from_arbitrary_size_secret(secret) 151 return ec_key 152 153 def _get_encryption_magic(self): 154 v = self._encryption_version 155 if v == StorageEncryptionVersion.USER_PASSWORD: 156 return b'BIE1' 157 elif v == StorageEncryptionVersion.XPUB_PASSWORD: 158 return b'BIE2' 159 else: 160 raise WalletFileException('no encryption magic for version: %s' % v) 161 162 def decrypt(self, password) -> None: 163 if self.is_past_initial_decryption(): 164 return 165 ec_key = self.get_eckey_from_password(password) 166 if self.raw: 167 enc_magic = self._get_encryption_magic() 168 s = zlib.decompress(ec_key.decrypt_message(self.raw, enc_magic)) 169 s = s.decode('utf8') 170 else: 171 s = '' 172 self.pubkey = ec_key.get_public_key_hex() 173 self.decrypted = s 174 175 def encrypt_before_writing(self, plaintext: str) -> str: 176 s = plaintext 177 if self.pubkey: 178 s = bytes(s, 'utf8') 179 c = zlib.compress(s, level=zlib.Z_BEST_SPEED) 180 enc_magic = self._get_encryption_magic() 181 public_key = ecc.ECPubkey(bfh(self.pubkey)) 182 s = public_key.encrypt_message(c, enc_magic) 183 s = s.decode('utf8') 184 return s 185 186 def check_password(self, password) -> None: 187 """Raises an InvalidPassword exception on invalid password""" 188 if not self.is_encrypted(): 189 return 190 if not self.is_past_initial_decryption(): 191 self.decrypt(password) # this sets self.pubkey 192 assert self.pubkey is not None 193 if self.pubkey != self.get_eckey_from_password(password).get_public_key_hex(): 194 raise InvalidPassword() 195 196 def set_password(self, password, enc_version=None): 197 """Set a password to be used for encrypting this storage.""" 198 if not self.is_past_initial_decryption(): 199 raise Exception("storage needs to be decrypted before changing password") 200 if enc_version is None: 201 enc_version = self._encryption_version 202 if password and enc_version != StorageEncryptionVersion.PLAINTEXT: 203 ec_key = self.get_eckey_from_password(password) 204 self.pubkey = ec_key.get_public_key_hex() 205 self._encryption_version = enc_version 206 else: 207 self.pubkey = None 208 self._encryption_version = StorageEncryptionVersion.PLAINTEXT 209 210 def basename(self) -> str: 211 return os.path.basename(self.path) 212