electrum

Electrum Bitcoin wallet
git clone https://git.parazyd.org/electrum
Log | Files | Refs | Submodules

address_synchronizer.py (38895B)


      1 # Electrum - lightweight Bitcoin client
      2 # Copyright (C) 2018 The Electrum Developers
      3 #
      4 # Permission is hereby granted, free of charge, to any person
      5 # obtaining a copy of this software and associated documentation files
      6 # (the "Software"), to deal in the Software without restriction,
      7 # including without limitation the rights to use, copy, modify, merge,
      8 # publish, distribute, sublicense, and/or sell copies of the Software,
      9 # and to permit persons to whom the Software is furnished to do so,
     10 # subject to the following conditions:
     11 #
     12 # The above copyright notice and this permission notice shall be
     13 # included in all copies or substantial portions of the Software.
     14 #
     15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
     16 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
     17 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
     18 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
     19 # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
     20 # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
     21 # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
     22 # SOFTWARE.
     23 
     24 import asyncio
     25 import threading
     26 import asyncio
     27 import itertools
     28 from collections import defaultdict
     29 from typing import TYPE_CHECKING, Dict, Optional, Set, Tuple, NamedTuple, Sequence, List
     30 
     31 from aiorpcx import TaskGroup
     32 
     33 from . import bitcoin, util
     34 from .bitcoin import COINBASE_MATURITY
     35 from .util import profiler, bfh, TxMinedInfo, UnrelatedTransactionException
     36 from .transaction import Transaction, TxOutput, TxInput, PartialTxInput, TxOutpoint, PartialTransaction
     37 from .synchronizer import Synchronizer
     38 from .verifier import SPV
     39 from .blockchain import hash_header
     40 from .i18n import _
     41 from .logging import Logger
     42 
     43 if TYPE_CHECKING:
     44     from .network import Network
     45     from .wallet_db import WalletDB
     46 
     47 
     48 TX_HEIGHT_FUTURE = -3
     49 TX_HEIGHT_LOCAL = -2
     50 TX_HEIGHT_UNCONF_PARENT = -1
     51 TX_HEIGHT_UNCONFIRMED = 0
     52 
     53 
     54 class HistoryItem(NamedTuple):
     55     txid: str
     56     tx_mined_status: TxMinedInfo
     57     delta: int
     58     fee: Optional[int]
     59     balance: int
     60 
     61 
     62 class TxWalletDelta(NamedTuple):
     63     is_relevant: bool  # "related to wallet?"
     64     is_any_input_ismine: bool
     65     is_all_input_ismine: bool
     66     delta: int
     67     fee: Optional[int]
     68 
     69 
     70 class AddressSynchronizer(Logger):
     71     """
     72     inherited by wallet
     73     """
     74 
     75     network: Optional['Network']
     76     synchronizer: Optional['Synchronizer']
     77     verifier: Optional['SPV']
     78 
     79     def __init__(self, db: 'WalletDB'):
     80         self.db = db
     81         self.network = None
     82         Logger.__init__(self)
     83         # verifier (SPV) and synchronizer are started in start_network
     84         self.synchronizer = None
     85         self.verifier = None
     86         # locks: if you need to take multiple ones, acquire them in the order they are defined here!
     87         self.lock = threading.RLock()
     88         self.transaction_lock = threading.RLock()
     89         self.future_tx = {}  # type: Dict[str, int]  # txid -> blocks remaining
     90         # Transactions pending verification.  txid -> tx_height. Access with self.lock.
     91         self.unverified_tx = defaultdict(int)
     92         # true when synchronized
     93         self.up_to_date = False
     94         # thread local storage for caching stuff
     95         self.threadlocal_cache = threading.local()
     96 
     97         self._get_addr_balance_cache = {}
     98 
     99         self.load_and_cleanup()
    100 
    101     def with_lock(func):
    102         def func_wrapper(self: 'AddressSynchronizer', *args, **kwargs):
    103             with self.lock:
    104                 return func(self, *args, **kwargs)
    105         return func_wrapper
    106 
    107     def with_transaction_lock(func):
    108         def func_wrapper(self: 'AddressSynchronizer', *args, **kwargs):
    109             with self.transaction_lock:
    110                 return func(self, *args, **kwargs)
    111         return func_wrapper
    112 
    113     def load_and_cleanup(self):
    114         self.load_local_history()
    115         self.check_history()
    116         self.load_unverified_transactions()
    117         self.remove_local_transactions_we_dont_have()
    118 
    119     def is_mine(self, address: Optional[str]) -> bool:
    120         if not address: return False
    121         return self.db.is_addr_in_history(address)
    122 
    123     def get_addresses(self):
    124         return sorted(self.db.get_history())
    125 
    126     def get_address_history(self, addr: str) -> Sequence[Tuple[str, int]]:
    127         """Returns the history for the address, in the format that would be returned by a server.
    128 
    129         Note: The difference between db.get_addr_history and this method is that
    130         db.get_addr_history stores the response from a server, so it only includes txns
    131         a server sees, i.e. that does not contain local and future txns.
    132         """
    133         h = []
    134         # we need self.transaction_lock but get_tx_height will take self.lock
    135         # so we need to take that too here, to enforce order of locks
    136         with self.lock, self.transaction_lock:
    137             related_txns = self._history_local.get(addr, set())
    138             for tx_hash in related_txns:
    139                 tx_height = self.get_tx_height(tx_hash).height
    140                 h.append((tx_hash, tx_height))
    141         return h
    142 
    143     def get_address_history_len(self, addr: str) -> int:
    144         """Return number of transactions where address is involved."""
    145         return len(self._history_local.get(addr, ()))
    146 
    147     def get_txin_address(self, txin: TxInput) -> Optional[str]:
    148         if isinstance(txin, PartialTxInput):
    149             if txin.address:
    150                 return txin.address
    151         prevout_hash = txin.prevout.txid.hex()
    152         prevout_n = txin.prevout.out_idx
    153         for addr in self.db.get_txo_addresses(prevout_hash):
    154             d = self.db.get_txo_addr(prevout_hash, addr)
    155             if prevout_n in d:
    156                 return addr
    157         tx = self.db.get_transaction(prevout_hash)
    158         if tx:
    159             return tx.outputs()[prevout_n].address
    160         return None
    161 
    162     def get_txin_value(self, txin: TxInput, *, address: str = None) -> Optional[int]:
    163         if txin.value_sats() is not None:
    164             return txin.value_sats()
    165         prevout_hash = txin.prevout.txid.hex()
    166         prevout_n = txin.prevout.out_idx
    167         if address is None:
    168             address = self.get_txin_address(txin)
    169         if address:
    170             d = self.db.get_txo_addr(prevout_hash, address)
    171             try:
    172                 v, cb = d[prevout_n]
    173                 return v
    174             except KeyError:
    175                 pass
    176         tx = self.db.get_transaction(prevout_hash)
    177         if tx:
    178             return tx.outputs()[prevout_n].value
    179         return None
    180 
    181     def get_txout_address(self, txo: TxOutput) -> Optional[str]:
    182         return txo.address
    183 
    184     def load_unverified_transactions(self):
    185         # review transactions that are in the history
    186         for addr in self.db.get_history():
    187             hist = self.db.get_addr_history(addr)
    188             for tx_hash, tx_height in hist:
    189                 # add it in case it was previously unconfirmed
    190                 self.add_unverified_tx(tx_hash, tx_height)
    191 
    192     def start_network(self, network: Optional['Network']) -> None:
    193         self.network = network
    194         if self.network is not None:
    195             self.synchronizer = Synchronizer(self)
    196             self.verifier = SPV(self.network, self)
    197             util.register_callback(self.on_blockchain_updated, ['blockchain_updated'])
    198 
    199     def on_blockchain_updated(self, event, *args):
    200         self._get_addr_balance_cache = {}  # invalidate cache
    201 
    202     async def stop(self):
    203         if self.network:
    204             try:
    205                 async with TaskGroup() as group:
    206                     if self.synchronizer:
    207                         await group.spawn(self.synchronizer.stop())
    208                     if self.verifier:
    209                         await group.spawn(self.verifier.stop())
    210             finally:  # even if we get cancelled
    211                 self.synchronizer = None
    212                 self.verifier = None
    213                 util.unregister_callback(self.on_blockchain_updated)
    214                 self.db.put('stored_height', self.get_local_height())
    215 
    216     def add_address(self, address):
    217         if not self.db.get_addr_history(address):
    218             self.db.history[address] = []
    219             self.set_up_to_date(False)
    220         if self.synchronizer:
    221             self.synchronizer.add(address)
    222 
    223     def get_conflicting_transactions(self, tx_hash, tx: Transaction, include_self=False):
    224         """Returns a set of transaction hashes from the wallet history that are
    225         directly conflicting with tx, i.e. they have common outpoints being
    226         spent with tx.
    227 
    228         include_self specifies whether the tx itself should be reported as a
    229         conflict (if already in wallet history)
    230         """
    231         conflicting_txns = set()
    232         with self.transaction_lock:
    233             for txin in tx.inputs():
    234                 if txin.is_coinbase_input():
    235                     continue
    236                 prevout_hash = txin.prevout.txid.hex()
    237                 prevout_n = txin.prevout.out_idx
    238                 spending_tx_hash = self.db.get_spent_outpoint(prevout_hash, prevout_n)
    239                 if spending_tx_hash is None:
    240                     continue
    241                 # this outpoint has already been spent, by spending_tx
    242                 # annoying assert that has revealed several bugs over time:
    243                 assert self.db.get_transaction(spending_tx_hash), "spending tx not in wallet db"
    244                 conflicting_txns |= {spending_tx_hash}
    245             if tx_hash in conflicting_txns:
    246                 # this tx is already in history, so it conflicts with itself
    247                 if len(conflicting_txns) > 1:
    248                     raise Exception('Found conflicting transactions already in wallet history.')
    249                 if not include_self:
    250                     conflicting_txns -= {tx_hash}
    251             return conflicting_txns
    252 
    253     def add_transaction(self, tx: Transaction, *, allow_unrelated=False) -> bool:
    254         """Returns whether the tx was successfully added to the wallet history."""
    255         assert tx, tx
    256         # note: tx.is_complete() is not necessarily True; tx might be partial
    257         # but it *needs* to have a txid:
    258         tx_hash = tx.txid()
    259         if tx_hash is None:
    260             raise Exception("cannot add tx without txid to wallet history")
    261         # we need self.transaction_lock but get_tx_height will take self.lock
    262         # so we need to take that too here, to enforce order of locks
    263         with self.lock, self.transaction_lock:
    264             # NOTE: returning if tx in self.transactions might seem like a good idea
    265             # BUT we track is_mine inputs in a txn, and during subsequent calls
    266             # of add_transaction tx, we might learn of more-and-more inputs of
    267             # being is_mine, as we roll the gap_limit forward
    268             is_coinbase = tx.inputs()[0].is_coinbase_input()
    269             tx_height = self.get_tx_height(tx_hash).height
    270             if not allow_unrelated:
    271                 # note that during sync, if the transactions are not properly sorted,
    272                 # it could happen that we think tx is unrelated but actually one of the inputs is is_mine.
    273                 # this is the main motivation for allow_unrelated
    274                 is_mine = any([self.is_mine(self.get_txin_address(txin)) for txin in tx.inputs()])
    275                 is_for_me = any([self.is_mine(self.get_txout_address(txo)) for txo in tx.outputs()])
    276                 if not is_mine and not is_for_me:
    277                     raise UnrelatedTransactionException()
    278             # Find all conflicting transactions.
    279             # In case of a conflict,
    280             #     1. confirmed > mempool > local
    281             #     2. this new txn has priority over existing ones
    282             # When this method exits, there must NOT be any conflict, so
    283             # either keep this txn and remove all conflicting (along with dependencies)
    284             #     or drop this txn
    285             conflicting_txns = self.get_conflicting_transactions(tx_hash, tx)
    286             if conflicting_txns:
    287                 existing_mempool_txn = any(
    288                     self.get_tx_height(tx_hash2).height in (TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_UNCONF_PARENT)
    289                     for tx_hash2 in conflicting_txns)
    290                 existing_confirmed_txn = any(
    291                     self.get_tx_height(tx_hash2).height > 0
    292                     for tx_hash2 in conflicting_txns)
    293                 if existing_confirmed_txn and tx_height <= 0:
    294                     # this is a non-confirmed tx that conflicts with confirmed txns; drop.
    295                     return False
    296                 if existing_mempool_txn and tx_height == TX_HEIGHT_LOCAL:
    297                     # this is a local tx that conflicts with non-local txns; drop.
    298                     return False
    299                 # keep this txn and remove all conflicting
    300                 for tx_hash2 in conflicting_txns:
    301                     self.remove_transaction(tx_hash2)
    302             # add inputs
    303             def add_value_from_prev_output():
    304                 # note: this takes linear time in num is_mine outputs of prev_tx
    305                 addr = self.get_txin_address(txi)
    306                 if addr and self.is_mine(addr):
    307                     outputs = self.db.get_txo_addr(prevout_hash, addr)
    308                     try:
    309                         v, is_cb = outputs[prevout_n]
    310                     except KeyError:
    311                         pass
    312                     else:
    313                         self.db.add_txi_addr(tx_hash, addr, ser, v)
    314                         self._get_addr_balance_cache.pop(addr, None)  # invalidate cache
    315             for txi in tx.inputs():
    316                 if txi.is_coinbase_input():
    317                     continue
    318                 prevout_hash = txi.prevout.txid.hex()
    319                 prevout_n = txi.prevout.out_idx
    320                 ser = txi.prevout.to_str()
    321                 self.db.set_spent_outpoint(prevout_hash, prevout_n, tx_hash)
    322                 add_value_from_prev_output()
    323             # add outputs
    324             for n, txo in enumerate(tx.outputs()):
    325                 v = txo.value
    326                 ser = tx_hash + ':%d'%n
    327                 scripthash = bitcoin.script_to_scripthash(txo.scriptpubkey.hex())
    328                 self.db.add_prevout_by_scripthash(scripthash, prevout=TxOutpoint.from_str(ser), value=v)
    329                 addr = self.get_txout_address(txo)
    330                 if addr and self.is_mine(addr):
    331                     self.db.add_txo_addr(tx_hash, addr, n, v, is_coinbase)
    332                     self._get_addr_balance_cache.pop(addr, None)  # invalidate cache
    333                     # give v to txi that spends me
    334                     next_tx = self.db.get_spent_outpoint(tx_hash, n)
    335                     if next_tx is not None:
    336                         self.db.add_txi_addr(next_tx, addr, ser, v)
    337                         self._add_tx_to_local_history(next_tx)
    338             # add to local history
    339             self._add_tx_to_local_history(tx_hash)
    340             # save
    341             self.db.add_transaction(tx_hash, tx)
    342             self.db.add_num_inputs_to_tx(tx_hash, len(tx.inputs()))
    343             return True
    344 
    345     def remove_transaction(self, tx_hash: str) -> None:
    346         """Removes a transaction AND all its dependents/children
    347         from the wallet history.
    348         """
    349         with self.lock, self.transaction_lock:
    350             to_remove = {tx_hash}
    351             to_remove |= self.get_depending_transactions(tx_hash)
    352             for txid in to_remove:
    353                 self._remove_transaction(txid)
    354 
    355     def _remove_transaction(self, tx_hash: str) -> None:
    356         """Removes a single transaction from the wallet history, and attempts
    357          to undo all effects of the tx (spending inputs, creating outputs, etc).
    358         """
    359         def remove_from_spent_outpoints():
    360             # undo spends in spent_outpoints
    361             if tx is not None:
    362                 # if we have the tx, this branch is faster
    363                 for txin in tx.inputs():
    364                     if txin.is_coinbase_input():
    365                         continue
    366                     prevout_hash = txin.prevout.txid.hex()
    367                     prevout_n = txin.prevout.out_idx
    368                     self.db.remove_spent_outpoint(prevout_hash, prevout_n)
    369             else:
    370                 # expensive but always works
    371                 for prevout_hash, prevout_n in self.db.list_spent_outpoints():
    372                     spending_txid = self.db.get_spent_outpoint(prevout_hash, prevout_n)
    373                     if spending_txid == tx_hash:
    374                         self.db.remove_spent_outpoint(prevout_hash, prevout_n)
    375 
    376         with self.lock, self.transaction_lock:
    377             self.logger.info(f"removing tx from history {tx_hash}")
    378             tx = self.db.remove_transaction(tx_hash)
    379             remove_from_spent_outpoints()
    380             self._remove_tx_from_local_history(tx_hash)
    381             for addr in itertools.chain(self.db.get_txi_addresses(tx_hash), self.db.get_txo_addresses(tx_hash)):
    382                 self._get_addr_balance_cache.pop(addr, None)  # invalidate cache
    383             self.db.remove_txi(tx_hash)
    384             self.db.remove_txo(tx_hash)
    385             self.db.remove_tx_fee(tx_hash)
    386             self.db.remove_verified_tx(tx_hash)
    387             self.unverified_tx.pop(tx_hash, None)
    388             if tx:
    389                 for idx, txo in enumerate(tx.outputs()):
    390                     scripthash = bitcoin.script_to_scripthash(txo.scriptpubkey.hex())
    391                     prevout = TxOutpoint(bfh(tx_hash), idx)
    392                     self.db.remove_prevout_by_scripthash(scripthash, prevout=prevout, value=txo.value)
    393 
    394     def get_depending_transactions(self, tx_hash: str) -> Set[str]:
    395         """Returns all (grand-)children of tx_hash in this wallet."""
    396         with self.transaction_lock:
    397             children = set()
    398             for n in self.db.get_spent_outpoints(tx_hash):
    399                 other_hash = self.db.get_spent_outpoint(tx_hash, n)
    400                 children.add(other_hash)
    401                 children |= self.get_depending_transactions(other_hash)
    402             return children
    403 
    404     def receive_tx_callback(self, tx_hash: str, tx: Transaction, tx_height: int) -> None:
    405         self.add_unverified_tx(tx_hash, tx_height)
    406         self.add_transaction(tx, allow_unrelated=True)
    407 
    408     def receive_history_callback(self, addr: str, hist, tx_fees: Dict[str, int]):
    409         with self.lock:
    410             old_hist = self.get_address_history(addr)
    411             for tx_hash, height in old_hist:
    412                 if (tx_hash, height) not in hist:
    413                     # make tx local
    414                     self.unverified_tx.pop(tx_hash, None)
    415                     self.db.remove_verified_tx(tx_hash)
    416                     if self.verifier:
    417                         self.verifier.remove_spv_proof_for_tx(tx_hash)
    418             self.db.set_addr_history(addr, hist)
    419 
    420         for tx_hash, tx_height in hist:
    421             # add it in case it was previously unconfirmed
    422             self.add_unverified_tx(tx_hash, tx_height)
    423             # if addr is new, we have to recompute txi and txo
    424             tx = self.db.get_transaction(tx_hash)
    425             if tx is None:
    426                 continue
    427             self.add_transaction(tx, allow_unrelated=True)
    428 
    429         # Store fees
    430         for tx_hash, fee_sat in tx_fees.items():
    431             self.db.add_tx_fee_from_server(tx_hash, fee_sat)
    432 
    433     @profiler
    434     def load_local_history(self):
    435         self._history_local = {}  # type: Dict[str, Set[str]]  # address -> set(txid)
    436         self._address_history_changed_events = defaultdict(asyncio.Event)  # address -> Event
    437         for txid in itertools.chain(self.db.list_txi(), self.db.list_txo()):
    438             self._add_tx_to_local_history(txid)
    439 
    440     @profiler
    441     def check_history(self):
    442         hist_addrs_mine = list(filter(lambda k: self.is_mine(k), self.db.get_history()))
    443         hist_addrs_not_mine = list(filter(lambda k: not self.is_mine(k), self.db.get_history()))
    444         for addr in hist_addrs_not_mine:
    445             self.db.remove_addr_history(addr)
    446         for addr in hist_addrs_mine:
    447             hist = self.db.get_addr_history(addr)
    448             for tx_hash, tx_height in hist:
    449                 if self.db.get_txi_addresses(tx_hash) or self.db.get_txo_addresses(tx_hash):
    450                     continue
    451                 tx = self.db.get_transaction(tx_hash)
    452                 if tx is not None:
    453                     self.add_transaction(tx, allow_unrelated=True)
    454 
    455     def remove_local_transactions_we_dont_have(self):
    456         for txid in itertools.chain(self.db.list_txi(), self.db.list_txo()):
    457             tx_height = self.get_tx_height(txid).height
    458             if tx_height == TX_HEIGHT_LOCAL and not self.db.get_transaction(txid):
    459                 self.remove_transaction(txid)
    460 
    461     def clear_history(self):
    462         with self.lock:
    463             with self.transaction_lock:
    464                 self.db.clear_history()
    465                 self._history_local.clear()
    466                 self._get_addr_balance_cache = {}  # invalidate cache
    467 
    468     def get_txpos(self, tx_hash):
    469         """Returns (height, txpos) tuple, even if the tx is unverified."""
    470         with self.lock:
    471             verified_tx_mined_info = self.db.get_verified_tx(tx_hash)
    472             if verified_tx_mined_info:
    473                 return verified_tx_mined_info.height, verified_tx_mined_info.txpos
    474             elif tx_hash in self.unverified_tx:
    475                 height = self.unverified_tx[tx_hash]
    476                 return (height, -1) if height > 0 else ((1e9 - height), -1)
    477             else:
    478                 return (1e9+1, -1)
    479 
    480     def with_local_height_cached(func):
    481         # get local height only once, as it's relatively expensive.
    482         # take care that nested calls work as expected
    483         def f(self, *args, **kwargs):
    484             orig_val = getattr(self.threadlocal_cache, 'local_height', None)
    485             self.threadlocal_cache.local_height = orig_val or self.get_local_height()
    486             try:
    487                 return func(self, *args, **kwargs)
    488             finally:
    489                 self.threadlocal_cache.local_height = orig_val
    490         return f
    491 
    492     @with_lock
    493     @with_transaction_lock
    494     @with_local_height_cached
    495     def get_history(self, *, domain=None) -> Sequence[HistoryItem]:
    496         # get domain
    497         if domain is None:
    498             domain = self.get_addresses()
    499         domain = set(domain)
    500         # 1. Get the history of each address in the domain, maintain the
    501         #    delta of a tx as the sum of its deltas on domain addresses
    502         tx_deltas = defaultdict(int)  # type: Dict[str, int]
    503         for addr in domain:
    504             h = self.get_address_history(addr)
    505             for tx_hash, height in h:
    506                 tx_deltas[tx_hash] += self.get_tx_delta(tx_hash, addr)
    507         # 2. create sorted history
    508         history = []
    509         for tx_hash in tx_deltas:
    510             delta = tx_deltas[tx_hash]
    511             tx_mined_status = self.get_tx_height(tx_hash)
    512             fee = self.get_tx_fee(tx_hash)
    513             history.append((tx_hash, tx_mined_status, delta, fee))
    514         history.sort(key = lambda x: self.get_txpos(x[0]), reverse=True)
    515         # 3. add balance
    516         c, u, x = self.get_balance(domain)
    517         balance = c + u + x
    518         h2 = []
    519         for tx_hash, tx_mined_status, delta, fee in history:
    520             h2.append(HistoryItem(txid=tx_hash,
    521                                   tx_mined_status=tx_mined_status,
    522                                   delta=delta,
    523                                   fee=fee,
    524                                   balance=balance))
    525             balance -= delta
    526         h2.reverse()
    527 
    528         if balance != 0:
    529             raise Exception("wallet.get_history() failed balance sanity-check")
    530 
    531         return h2
    532 
    533     def _add_tx_to_local_history(self, txid):
    534         with self.transaction_lock:
    535             for addr in itertools.chain(self.db.get_txi_addresses(txid), self.db.get_txo_addresses(txid)):
    536                 cur_hist = self._history_local.get(addr, set())
    537                 cur_hist.add(txid)
    538                 self._history_local[addr] = cur_hist
    539                 self._mark_address_history_changed(addr)
    540 
    541     def _remove_tx_from_local_history(self, txid):
    542         with self.transaction_lock:
    543             for addr in itertools.chain(self.db.get_txi_addresses(txid), self.db.get_txo_addresses(txid)):
    544                 cur_hist = self._history_local.get(addr, set())
    545                 try:
    546                     cur_hist.remove(txid)
    547                 except KeyError:
    548                     pass
    549                 else:
    550                     self._history_local[addr] = cur_hist
    551 
    552     def _mark_address_history_changed(self, addr: str) -> None:
    553         # history for this address changed, wake up coroutines:
    554         self._address_history_changed_events[addr].set()
    555         # clear event immediately so that coroutines can wait() for the next change:
    556         self._address_history_changed_events[addr].clear()
    557 
    558     async def wait_for_address_history_to_change(self, addr: str) -> None:
    559         """Wait until the server tells us about a new transaction related to addr.
    560 
    561         Unconfirmed and confirmed transactions are not distinguished, and so e.g. SPV
    562         is not taken into account.
    563         """
    564         assert self.is_mine(addr), "address needs to be is_mine to be watched"
    565         await self._address_history_changed_events[addr].wait()
    566 
    567     def add_unverified_tx(self, tx_hash, tx_height):
    568         if self.db.is_in_verified_tx(tx_hash):
    569             if tx_height in (TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_UNCONF_PARENT):
    570                 with self.lock:
    571                     self.db.remove_verified_tx(tx_hash)
    572                 if self.verifier:
    573                     self.verifier.remove_spv_proof_for_tx(tx_hash)
    574         else:
    575             with self.lock:
    576                 # tx will be verified only if height > 0
    577                 self.unverified_tx[tx_hash] = tx_height
    578 
    579     def remove_unverified_tx(self, tx_hash, tx_height):
    580         with self.lock:
    581             new_height = self.unverified_tx.get(tx_hash)
    582             if new_height == tx_height:
    583                 self.unverified_tx.pop(tx_hash, None)
    584 
    585     def add_verified_tx(self, tx_hash: str, info: TxMinedInfo):
    586         # Remove from the unverified map and add to the verified map
    587         with self.lock:
    588             self.unverified_tx.pop(tx_hash, None)
    589             self.db.add_verified_tx(tx_hash, info)
    590         tx_mined_status = self.get_tx_height(tx_hash)
    591         util.trigger_callback('verified', self, tx_hash, tx_mined_status)
    592 
    593     def get_unverified_txs(self):
    594         '''Returns a map from tx hash to transaction height'''
    595         with self.lock:
    596             return dict(self.unverified_tx)  # copy
    597 
    598     def undo_verifications(self, blockchain, above_height):
    599         '''Used by the verifier when a reorg has happened'''
    600         txs = set()
    601         with self.lock:
    602             for tx_hash in self.db.list_verified_tx():
    603                 info = self.db.get_verified_tx(tx_hash)
    604                 tx_height = info.height
    605                 if tx_height > above_height:
    606                     header = blockchain.read_header(tx_height)
    607                     if not header or hash_header(header) != info.header_hash:
    608                         self.db.remove_verified_tx(tx_hash)
    609                         # NOTE: we should add these txns to self.unverified_tx,
    610                         # but with what height?
    611                         # If on the new fork after the reorg, the txn is at the
    612                         # same height, we will not get a status update for the
    613                         # address. If the txn is not mined or at a diff height,
    614                         # we should get a status update. Unless we put tx into
    615                         # unverified_tx, it will turn into local. So we put it
    616                         # into unverified_tx with the old height, and if we get
    617                         # a status update, that will overwrite it.
    618                         self.unverified_tx[tx_hash] = tx_height
    619                         txs.add(tx_hash)
    620         return txs
    621 
    622     def get_local_height(self) -> int:
    623         """ return last known height if we are offline """
    624         cached_local_height = getattr(self.threadlocal_cache, 'local_height', None)
    625         if cached_local_height is not None:
    626             return cached_local_height
    627         return self.network.get_local_height() if self.network else self.db.get('stored_height', 0)
    628 
    629     def add_future_tx(self, tx: Transaction, num_blocks: int) -> bool:
    630         assert num_blocks > 0, num_blocks
    631         with self.lock:
    632             tx_was_added = self.add_transaction(tx)
    633             if tx_was_added:
    634                 self.future_tx[tx.txid()] = num_blocks
    635             return tx_was_added
    636 
    637     def get_tx_height(self, tx_hash: str) -> TxMinedInfo:
    638         if tx_hash is None:  # ugly backwards compat...
    639             return TxMinedInfo(height=TX_HEIGHT_LOCAL, conf=0)
    640         with self.lock:
    641             verified_tx_mined_info = self.db.get_verified_tx(tx_hash)
    642             if verified_tx_mined_info:
    643                 conf = max(self.get_local_height() - verified_tx_mined_info.height + 1, 0)
    644                 return verified_tx_mined_info._replace(conf=conf)
    645             elif tx_hash in self.unverified_tx:
    646                 height = self.unverified_tx[tx_hash]
    647                 return TxMinedInfo(height=height, conf=0)
    648             elif tx_hash in self.future_tx:
    649                 num_blocks_remainining = self.future_tx[tx_hash]
    650                 assert num_blocks_remainining > 0, num_blocks_remainining
    651                 return TxMinedInfo(height=TX_HEIGHT_FUTURE, conf=-num_blocks_remainining)
    652             else:
    653                 # local transaction
    654                 return TxMinedInfo(height=TX_HEIGHT_LOCAL, conf=0)
    655 
    656     def set_up_to_date(self, up_to_date):
    657         with self.lock:
    658             status_changed = self.up_to_date != up_to_date
    659             self.up_to_date = up_to_date
    660         if self.network:
    661             self.network.notify('status')
    662         if status_changed:
    663             self.logger.info(f'set_up_to_date: {up_to_date}')
    664 
    665     def is_up_to_date(self):
    666         with self.lock: return self.up_to_date
    667 
    668     def get_history_sync_state_details(self) -> Tuple[int, int]:
    669         if self.synchronizer:
    670             return self.synchronizer.num_requests_sent_and_answered()
    671         else:
    672             return 0, 0
    673 
    674     @with_transaction_lock
    675     def get_tx_delta(self, tx_hash: str, address: str) -> int:
    676         """effect of tx on address"""
    677         delta = 0
    678         # subtract the value of coins sent from address
    679         d = self.db.get_txi_addr(tx_hash, address)
    680         for n, v in d:
    681             delta -= v
    682         # add the value of the coins received at address
    683         d = self.db.get_txo_addr(tx_hash, address)
    684         for n, (v, cb) in d.items():
    685             delta += v
    686         return delta
    687 
    688     def get_wallet_delta(self, tx: Transaction) -> TxWalletDelta:
    689         """effect of tx on wallet"""
    690         is_relevant = False  # "related to wallet?"
    691         num_input_ismine = 0
    692         v_in = v_in_mine = v_out = v_out_mine = 0
    693         with self.lock, self.transaction_lock:
    694             for txin in tx.inputs():
    695                 addr = self.get_txin_address(txin)
    696                 value = self.get_txin_value(txin, address=addr)
    697                 if self.is_mine(addr):
    698                     num_input_ismine += 1
    699                     is_relevant = True
    700                     assert value is not None
    701                     v_in_mine += value
    702                 if value is None:
    703                     v_in = None
    704                 elif v_in is not None:
    705                     v_in += value
    706             for txout in tx.outputs():
    707                 v_out += txout.value
    708                 if self.is_mine(txout.address):
    709                     v_out_mine += txout.value
    710                     is_relevant = True
    711         delta = v_out_mine - v_in_mine
    712         if v_in is not None:
    713             fee = v_in - v_out
    714         else:
    715             fee = None
    716         if fee is None and isinstance(tx, PartialTransaction):
    717             fee = tx.get_fee()
    718         return TxWalletDelta(
    719             is_relevant=is_relevant,
    720             is_any_input_ismine=num_input_ismine > 0,
    721             is_all_input_ismine=num_input_ismine == len(tx.inputs()),
    722             delta=delta,
    723             fee=fee,
    724         )
    725 
    726     def get_tx_fee(self, txid: str) -> Optional[int]:
    727         """ Returns tx_fee or None. Use server fee only if tx is unconfirmed and not mine"""
    728         # check if stored fee is available
    729         fee = self.db.get_tx_fee(txid, trust_server=False)
    730         if fee is not None:
    731             return fee
    732         # delete server-sent fee for confirmed txns
    733         confirmed = self.get_tx_height(txid).conf > 0
    734         if confirmed:
    735             self.db.add_tx_fee_from_server(txid, None)
    736         # if all inputs are ismine, try to calc fee now;
    737         # otherwise, return stored value
    738         num_all_inputs = self.db.get_num_all_inputs_of_tx(txid)
    739         if num_all_inputs is not None:
    740             # check if tx is mine
    741             num_ismine_inputs = self.db.get_num_ismine_inputs_of_tx(txid)
    742             assert num_ismine_inputs <= num_all_inputs, (num_ismine_inputs, num_all_inputs)
    743             # trust server if tx is unconfirmed and not mine
    744             if num_ismine_inputs < num_all_inputs:
    745                 return None if confirmed else self.db.get_tx_fee(txid, trust_server=True)
    746         # lookup tx and deserialize it.
    747         # note that deserializing is expensive, hence above hacks
    748         tx = self.db.get_transaction(txid)
    749         if not tx:
    750             return None
    751         fee = self.get_wallet_delta(tx).fee
    752         # save result
    753         self.db.add_tx_fee_we_calculated(txid, fee)
    754         self.db.add_num_inputs_to_tx(txid, len(tx.inputs()))
    755         return fee
    756 
    757     def get_addr_io(self, address):
    758         with self.lock, self.transaction_lock:
    759             h = self.get_address_history(address)
    760             received = {}
    761             sent = {}
    762             for tx_hash, height in h:
    763                 d = self.db.get_txo_addr(tx_hash, address)
    764                 for n, (v, is_cb) in d.items():
    765                     received[tx_hash + ':%d'%n] = (height, v, is_cb)
    766             for tx_hash, height in h:
    767                 l = self.db.get_txi_addr(tx_hash, address)
    768                 for txi, v in l:
    769                     sent[txi] = height
    770         return received, sent
    771 
    772 
    773     def get_addr_outputs(self, address: str) -> Dict[TxOutpoint, PartialTxInput]:
    774         coins, spent = self.get_addr_io(address)
    775         out = {}
    776         for prevout_str, v in coins.items():
    777             tx_height, value, is_cb = v
    778             prevout = TxOutpoint.from_str(prevout_str)
    779             utxo = PartialTxInput(prevout=prevout, is_coinbase_output=is_cb)
    780             utxo._trusted_address = address
    781             utxo._trusted_value_sats = value
    782             utxo.block_height = tx_height
    783             utxo.spent_height = spent.get(prevout_str, None)
    784             out[prevout] = utxo
    785         return out
    786 
    787     def get_addr_utxo(self, address: str) -> Dict[TxOutpoint, PartialTxInput]:
    788         out = self.get_addr_outputs(address)
    789         for k, v in list(out.items()):
    790             if v.spent_height is not None:
    791                 out.pop(k)
    792         return out
    793 
    794     # return the total amount ever received by an address
    795     def get_addr_received(self, address):
    796         received, sent = self.get_addr_io(address)
    797         return sum([v for height, v, is_cb in received.values()])
    798 
    799     @with_local_height_cached
    800     def get_addr_balance(self, address, *, excluded_coins: Set[str] = None) -> Tuple[int, int, int]:
    801         """Return the balance of a bitcoin address:
    802         confirmed and matured, unconfirmed, unmatured
    803         """
    804         if not excluded_coins:  # cache is only used if there are no excluded_coins
    805             cached_value = self._get_addr_balance_cache.get(address)
    806             if cached_value:
    807                 return cached_value
    808         if excluded_coins is None:
    809             excluded_coins = set()
    810         assert isinstance(excluded_coins, set), f"excluded_coins should be set, not {type(excluded_coins)}"
    811         received, sent = self.get_addr_io(address)
    812         c = u = x = 0
    813         mempool_height = self.get_local_height() + 1  # height of next block
    814         for txo, (tx_height, v, is_cb) in received.items():
    815             if txo in excluded_coins:
    816                 continue
    817             if is_cb and tx_height + COINBASE_MATURITY > mempool_height:
    818                 x += v
    819             elif tx_height > 0:
    820                 c += v
    821             else:
    822                 u += v
    823             if txo in sent:
    824                 if sent[txo] > 0:
    825                     c -= v
    826                 else:
    827                     u -= v
    828         result = c, u, x
    829         # cache result.
    830         if not excluded_coins:
    831             # Cache needs to be invalidated if a transaction is added to/
    832             # removed from history; or on new blocks (maturity...)
    833             self._get_addr_balance_cache[address] = result
    834         return result
    835 
    836     @with_local_height_cached
    837     def get_utxos(self, domain=None, *, excluded_addresses=None,
    838                   mature_only: bool = False, confirmed_only: bool = False,
    839                   nonlocal_only: bool = False) -> Sequence[PartialTxInput]:
    840         coins = []
    841         if domain is None:
    842             domain = self.get_addresses()
    843         domain = set(domain)
    844         if excluded_addresses:
    845             domain = set(domain) - set(excluded_addresses)
    846         mempool_height = self.get_local_height() + 1  # height of next block
    847         for addr in domain:
    848             utxos = self.get_addr_utxo(addr)
    849             for utxo in utxos.values():
    850                 if confirmed_only and utxo.block_height <= 0:
    851                     continue
    852                 if nonlocal_only and utxo.block_height == TX_HEIGHT_LOCAL:
    853                     continue
    854                 if (mature_only and utxo.is_coinbase_output()
    855                         and utxo.block_height + COINBASE_MATURITY > mempool_height):
    856                     continue
    857                 coins.append(utxo)
    858                 continue
    859         return coins
    860 
    861     def get_balance(self, domain=None, *, excluded_addresses: Set[str] = None,
    862                     excluded_coins: Set[str] = None) -> Tuple[int, int, int]:
    863         if domain is None:
    864             domain = self.get_addresses()
    865         if excluded_addresses is None:
    866             excluded_addresses = set()
    867         assert isinstance(excluded_addresses, set), f"excluded_addresses should be set, not {type(excluded_addresses)}"
    868         domain = set(domain) - excluded_addresses
    869         cc = uu = xx = 0
    870         for addr in domain:
    871             c, u, x = self.get_addr_balance(addr, excluded_coins=excluded_coins)
    872             cc += c
    873             uu += u
    874             xx += x
    875         return cc, uu, xx
    876 
    877     def is_used(self, address: str) -> bool:
    878         return self.get_address_history_len(address) != 0
    879 
    880     def is_empty(self, address: str) -> bool:
    881         c, u, x = self.get_addr_balance(address)
    882         return c+u+x == 0
    883 
    884     def synchronize(self):
    885         pass