electrum

Electrum Bitcoin wallet
git clone https://git.parazyd.org/electrum
Log | Files | Refs | Submodules

storage.py (7860B)


      1 #!/usr/bin/env python
      2 #
      3 # Electrum - lightweight Bitcoin client
      4 # Copyright (C) 2015 Thomas Voegtlin
      5 #
      6 # Permission is hereby granted, free of charge, to any person
      7 # obtaining a copy of this software and associated documentation files
      8 # (the "Software"), to deal in the Software without restriction,
      9 # including without limitation the rights to use, copy, modify, merge,
     10 # publish, distribute, sublicense, and/or sell copies of the Software,
     11 # and to permit persons to whom the Software is furnished to do so,
     12 # subject to the following conditions:
     13 #
     14 # The above copyright notice and this permission notice shall be
     15 # included in all copies or substantial portions of the Software.
     16 #
     17 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
     18 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
     19 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
     20 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
     21 # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
     22 # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
     23 # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
     24 # SOFTWARE.
     25 import os
     26 import threading
     27 import stat
     28 import hashlib
     29 import base64
     30 import zlib
     31 from enum import IntEnum
     32 
     33 from . import ecc
     34 from .util import (profiler, InvalidPassword, WalletFileException, bfh, standardize_path,
     35                    test_read_write_permissions)
     36 
     37 from .wallet_db import WalletDB
     38 from .logging import Logger
     39 
     40 
     41 def get_derivation_used_for_hw_device_encryption():
     42     return ("m"
     43             "/4541509'"      # ascii 'ELE'  as decimal ("BIP43 purpose")
     44             "/1112098098'")  # ascii 'BIE2' as decimal
     45 
     46 
     47 class StorageEncryptionVersion(IntEnum):
     48     PLAINTEXT = 0
     49     USER_PASSWORD = 1
     50     XPUB_PASSWORD = 2
     51 
     52 
     53 class StorageReadWriteError(Exception): pass
     54 
     55 
     56 # TODO: Rename to Storage
     57 class WalletStorage(Logger):
     58 
     59     def __init__(self, path):
     60         Logger.__init__(self)
     61         self.path = standardize_path(path)
     62         self._file_exists = bool(self.path and os.path.exists(self.path))
     63         self.logger.info(f"wallet path {self.path}")
     64         self.pubkey = None
     65         self.decrypted = ''
     66         try:
     67             test_read_write_permissions(self.path)
     68         except IOError as e:
     69             raise StorageReadWriteError(e) from e
     70         if self.file_exists():
     71             with open(self.path, "r", encoding='utf-8') as f:
     72                 self.raw = f.read()
     73             self._encryption_version = self._init_encryption_version()
     74         else:
     75             self.raw = ''
     76             self._encryption_version = StorageEncryptionVersion.PLAINTEXT
     77 
     78     def read(self):
     79         return self.decrypted if self.is_encrypted() else self.raw
     80 
     81     def write(self, data: str) -> None:
     82         s = self.encrypt_before_writing(data)
     83         temp_path = "%s.tmp.%s" % (self.path, os.getpid())
     84         with open(temp_path, "w", encoding='utf-8') as f:
     85             f.write(s)
     86             f.flush()
     87             os.fsync(f.fileno())
     88 
     89         try:
     90             mode = os.stat(self.path).st_mode
     91         except FileNotFoundError:
     92             mode = stat.S_IREAD | stat.S_IWRITE
     93 
     94         # assert that wallet file does not exist, to prevent wallet corruption (see issue #5082)
     95         if not self.file_exists():
     96             assert not os.path.exists(self.path)
     97         os.replace(temp_path, self.path)
     98         os.chmod(self.path, mode)
     99         self._file_exists = True
    100         self.logger.info(f"saved {self.path}")
    101 
    102     def file_exists(self) -> bool:
    103         return self._file_exists
    104 
    105     def is_past_initial_decryption(self):
    106         """Return if storage is in a usable state for normal operations.
    107 
    108         The value is True exactly
    109             if encryption is disabled completely (self.is_encrypted() == False),
    110             or if encryption is enabled but the contents have already been decrypted.
    111         """
    112         return not self.is_encrypted() or bool(self.pubkey)
    113 
    114     def is_encrypted(self):
    115         """Return if storage encryption is currently enabled."""
    116         return self.get_encryption_version() != StorageEncryptionVersion.PLAINTEXT
    117 
    118     def is_encrypted_with_user_pw(self):
    119         return self.get_encryption_version() == StorageEncryptionVersion.USER_PASSWORD
    120 
    121     def is_encrypted_with_hw_device(self):
    122         return self.get_encryption_version() == StorageEncryptionVersion.XPUB_PASSWORD
    123 
    124     def get_encryption_version(self):
    125         """Return the version of encryption used for this storage.
    126 
    127         0: plaintext / no encryption
    128 
    129         ECIES, private key derived from a password,
    130         1: password is provided by user
    131         2: password is derived from an xpub; used with hw wallets
    132         """
    133         return self._encryption_version
    134 
    135     def _init_encryption_version(self):
    136         try:
    137             magic = base64.b64decode(self.raw)[0:4]
    138             if magic == b'BIE1':
    139                 return StorageEncryptionVersion.USER_PASSWORD
    140             elif magic == b'BIE2':
    141                 return StorageEncryptionVersion.XPUB_PASSWORD
    142             else:
    143                 return StorageEncryptionVersion.PLAINTEXT
    144         except:
    145             return StorageEncryptionVersion.PLAINTEXT
    146 
    147     @staticmethod
    148     def get_eckey_from_password(password):
    149         secret = hashlib.pbkdf2_hmac('sha512', password.encode('utf-8'), b'', iterations=1024)
    150         ec_key = ecc.ECPrivkey.from_arbitrary_size_secret(secret)
    151         return ec_key
    152 
    153     def _get_encryption_magic(self):
    154         v = self._encryption_version
    155         if v == StorageEncryptionVersion.USER_PASSWORD:
    156             return b'BIE1'
    157         elif v == StorageEncryptionVersion.XPUB_PASSWORD:
    158             return b'BIE2'
    159         else:
    160             raise WalletFileException('no encryption magic for version: %s' % v)
    161 
    162     def decrypt(self, password) -> None:
    163         if self.is_past_initial_decryption():
    164             return
    165         ec_key = self.get_eckey_from_password(password)
    166         if self.raw:
    167             enc_magic = self._get_encryption_magic()
    168             s = zlib.decompress(ec_key.decrypt_message(self.raw, enc_magic))
    169             s = s.decode('utf8')
    170         else:
    171             s = ''
    172         self.pubkey = ec_key.get_public_key_hex()
    173         self.decrypted = s
    174 
    175     def encrypt_before_writing(self, plaintext: str) -> str:
    176         s = plaintext
    177         if self.pubkey:
    178             s = bytes(s, 'utf8')
    179             c = zlib.compress(s, level=zlib.Z_BEST_SPEED)
    180             enc_magic = self._get_encryption_magic()
    181             public_key = ecc.ECPubkey(bfh(self.pubkey))
    182             s = public_key.encrypt_message(c, enc_magic)
    183             s = s.decode('utf8')
    184         return s
    185 
    186     def check_password(self, password) -> None:
    187         """Raises an InvalidPassword exception on invalid password"""
    188         if not self.is_encrypted():
    189             return
    190         if not self.is_past_initial_decryption():
    191             self.decrypt(password)  # this sets self.pubkey
    192         assert self.pubkey is not None
    193         if self.pubkey != self.get_eckey_from_password(password).get_public_key_hex():
    194             raise InvalidPassword()
    195 
    196     def set_password(self, password, enc_version=None):
    197         """Set a password to be used for encrypting this storage."""
    198         if not self.is_past_initial_decryption():
    199             raise Exception("storage needs to be decrypted before changing password")
    200         if enc_version is None:
    201             enc_version = self._encryption_version
    202         if password and enc_version != StorageEncryptionVersion.PLAINTEXT:
    203             ec_key = self.get_eckey_from_password(password)
    204             self.pubkey = ec_key.get_public_key_hex()
    205             self._encryption_version = enc_version
    206         else:
    207             self.pubkey = None
    208             self._encryption_version = StorageEncryptionVersion.PLAINTEXT
    209 
    210     def basename(self) -> str:
    211         return os.path.basename(self.path)
    212