address_synchronizer.py (38895B)
1 # Electrum - lightweight Bitcoin client 2 # Copyright (C) 2018 The Electrum Developers 3 # 4 # Permission is hereby granted, free of charge, to any person 5 # obtaining a copy of this software and associated documentation files 6 # (the "Software"), to deal in the Software without restriction, 7 # including without limitation the rights to use, copy, modify, merge, 8 # publish, distribute, sublicense, and/or sell copies of the Software, 9 # and to permit persons to whom the Software is furnished to do so, 10 # subject to the following conditions: 11 # 12 # The above copyright notice and this permission notice shall be 13 # included in all copies or substantial portions of the Software. 14 # 15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 16 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 17 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 18 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS 19 # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 20 # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 21 # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 22 # SOFTWARE. 23 24 import asyncio 25 import threading 26 import asyncio 27 import itertools 28 from collections import defaultdict 29 from typing import TYPE_CHECKING, Dict, Optional, Set, Tuple, NamedTuple, Sequence, List 30 31 from aiorpcx import TaskGroup 32 33 from . import bitcoin, util 34 from .bitcoin import COINBASE_MATURITY 35 from .util import profiler, bfh, TxMinedInfo, UnrelatedTransactionException 36 from .transaction import Transaction, TxOutput, TxInput, PartialTxInput, TxOutpoint, PartialTransaction 37 from .synchronizer import Synchronizer 38 from .verifier import SPV 39 from .blockchain import hash_header 40 from .i18n import _ 41 from .logging import Logger 42 43 if TYPE_CHECKING: 44 from .network import Network 45 from .wallet_db import WalletDB 46 47 48 TX_HEIGHT_FUTURE = -3 49 TX_HEIGHT_LOCAL = -2 50 TX_HEIGHT_UNCONF_PARENT = -1 51 TX_HEIGHT_UNCONFIRMED = 0 52 53 54 class HistoryItem(NamedTuple): 55 txid: str 56 tx_mined_status: TxMinedInfo 57 delta: int 58 fee: Optional[int] 59 balance: int 60 61 62 class TxWalletDelta(NamedTuple): 63 is_relevant: bool # "related to wallet?" 64 is_any_input_ismine: bool 65 is_all_input_ismine: bool 66 delta: int 67 fee: Optional[int] 68 69 70 class AddressSynchronizer(Logger): 71 """ 72 inherited by wallet 73 """ 74 75 network: Optional['Network'] 76 synchronizer: Optional['Synchronizer'] 77 verifier: Optional['SPV'] 78 79 def __init__(self, db: 'WalletDB'): 80 self.db = db 81 self.network = None 82 Logger.__init__(self) 83 # verifier (SPV) and synchronizer are started in start_network 84 self.synchronizer = None 85 self.verifier = None 86 # locks: if you need to take multiple ones, acquire them in the order they are defined here! 87 self.lock = threading.RLock() 88 self.transaction_lock = threading.RLock() 89 self.future_tx = {} # type: Dict[str, int] # txid -> blocks remaining 90 # Transactions pending verification. txid -> tx_height. Access with self.lock. 91 self.unverified_tx = defaultdict(int) 92 # true when synchronized 93 self.up_to_date = False 94 # thread local storage for caching stuff 95 self.threadlocal_cache = threading.local() 96 97 self._get_addr_balance_cache = {} 98 99 self.load_and_cleanup() 100 101 def with_lock(func): 102 def func_wrapper(self: 'AddressSynchronizer', *args, **kwargs): 103 with self.lock: 104 return func(self, *args, **kwargs) 105 return func_wrapper 106 107 def with_transaction_lock(func): 108 def func_wrapper(self: 'AddressSynchronizer', *args, **kwargs): 109 with self.transaction_lock: 110 return func(self, *args, **kwargs) 111 return func_wrapper 112 113 def load_and_cleanup(self): 114 self.load_local_history() 115 self.check_history() 116 self.load_unverified_transactions() 117 self.remove_local_transactions_we_dont_have() 118 119 def is_mine(self, address: Optional[str]) -> bool: 120 if not address: return False 121 return self.db.is_addr_in_history(address) 122 123 def get_addresses(self): 124 return sorted(self.db.get_history()) 125 126 def get_address_history(self, addr: str) -> Sequence[Tuple[str, int]]: 127 """Returns the history for the address, in the format that would be returned by a server. 128 129 Note: The difference between db.get_addr_history and this method is that 130 db.get_addr_history stores the response from a server, so it only includes txns 131 a server sees, i.e. that does not contain local and future txns. 132 """ 133 h = [] 134 # we need self.transaction_lock but get_tx_height will take self.lock 135 # so we need to take that too here, to enforce order of locks 136 with self.lock, self.transaction_lock: 137 related_txns = self._history_local.get(addr, set()) 138 for tx_hash in related_txns: 139 tx_height = self.get_tx_height(tx_hash).height 140 h.append((tx_hash, tx_height)) 141 return h 142 143 def get_address_history_len(self, addr: str) -> int: 144 """Return number of transactions where address is involved.""" 145 return len(self._history_local.get(addr, ())) 146 147 def get_txin_address(self, txin: TxInput) -> Optional[str]: 148 if isinstance(txin, PartialTxInput): 149 if txin.address: 150 return txin.address 151 prevout_hash = txin.prevout.txid.hex() 152 prevout_n = txin.prevout.out_idx 153 for addr in self.db.get_txo_addresses(prevout_hash): 154 d = self.db.get_txo_addr(prevout_hash, addr) 155 if prevout_n in d: 156 return addr 157 tx = self.db.get_transaction(prevout_hash) 158 if tx: 159 return tx.outputs()[prevout_n].address 160 return None 161 162 def get_txin_value(self, txin: TxInput, *, address: str = None) -> Optional[int]: 163 if txin.value_sats() is not None: 164 return txin.value_sats() 165 prevout_hash = txin.prevout.txid.hex() 166 prevout_n = txin.prevout.out_idx 167 if address is None: 168 address = self.get_txin_address(txin) 169 if address: 170 d = self.db.get_txo_addr(prevout_hash, address) 171 try: 172 v, cb = d[prevout_n] 173 return v 174 except KeyError: 175 pass 176 tx = self.db.get_transaction(prevout_hash) 177 if tx: 178 return tx.outputs()[prevout_n].value 179 return None 180 181 def get_txout_address(self, txo: TxOutput) -> Optional[str]: 182 return txo.address 183 184 def load_unverified_transactions(self): 185 # review transactions that are in the history 186 for addr in self.db.get_history(): 187 hist = self.db.get_addr_history(addr) 188 for tx_hash, tx_height in hist: 189 # add it in case it was previously unconfirmed 190 self.add_unverified_tx(tx_hash, tx_height) 191 192 def start_network(self, network: Optional['Network']) -> None: 193 self.network = network 194 if self.network is not None: 195 self.synchronizer = Synchronizer(self) 196 self.verifier = SPV(self.network, self) 197 util.register_callback(self.on_blockchain_updated, ['blockchain_updated']) 198 199 def on_blockchain_updated(self, event, *args): 200 self._get_addr_balance_cache = {} # invalidate cache 201 202 async def stop(self): 203 if self.network: 204 try: 205 async with TaskGroup() as group: 206 if self.synchronizer: 207 await group.spawn(self.synchronizer.stop()) 208 if self.verifier: 209 await group.spawn(self.verifier.stop()) 210 finally: # even if we get cancelled 211 self.synchronizer = None 212 self.verifier = None 213 util.unregister_callback(self.on_blockchain_updated) 214 self.db.put('stored_height', self.get_local_height()) 215 216 def add_address(self, address): 217 if not self.db.get_addr_history(address): 218 self.db.history[address] = [] 219 self.set_up_to_date(False) 220 if self.synchronizer: 221 self.synchronizer.add(address) 222 223 def get_conflicting_transactions(self, tx_hash, tx: Transaction, include_self=False): 224 """Returns a set of transaction hashes from the wallet history that are 225 directly conflicting with tx, i.e. they have common outpoints being 226 spent with tx. 227 228 include_self specifies whether the tx itself should be reported as a 229 conflict (if already in wallet history) 230 """ 231 conflicting_txns = set() 232 with self.transaction_lock: 233 for txin in tx.inputs(): 234 if txin.is_coinbase_input(): 235 continue 236 prevout_hash = txin.prevout.txid.hex() 237 prevout_n = txin.prevout.out_idx 238 spending_tx_hash = self.db.get_spent_outpoint(prevout_hash, prevout_n) 239 if spending_tx_hash is None: 240 continue 241 # this outpoint has already been spent, by spending_tx 242 # annoying assert that has revealed several bugs over time: 243 assert self.db.get_transaction(spending_tx_hash), "spending tx not in wallet db" 244 conflicting_txns |= {spending_tx_hash} 245 if tx_hash in conflicting_txns: 246 # this tx is already in history, so it conflicts with itself 247 if len(conflicting_txns) > 1: 248 raise Exception('Found conflicting transactions already in wallet history.') 249 if not include_self: 250 conflicting_txns -= {tx_hash} 251 return conflicting_txns 252 253 def add_transaction(self, tx: Transaction, *, allow_unrelated=False) -> bool: 254 """Returns whether the tx was successfully added to the wallet history.""" 255 assert tx, tx 256 # note: tx.is_complete() is not necessarily True; tx might be partial 257 # but it *needs* to have a txid: 258 tx_hash = tx.txid() 259 if tx_hash is None: 260 raise Exception("cannot add tx without txid to wallet history") 261 # we need self.transaction_lock but get_tx_height will take self.lock 262 # so we need to take that too here, to enforce order of locks 263 with self.lock, self.transaction_lock: 264 # NOTE: returning if tx in self.transactions might seem like a good idea 265 # BUT we track is_mine inputs in a txn, and during subsequent calls 266 # of add_transaction tx, we might learn of more-and-more inputs of 267 # being is_mine, as we roll the gap_limit forward 268 is_coinbase = tx.inputs()[0].is_coinbase_input() 269 tx_height = self.get_tx_height(tx_hash).height 270 if not allow_unrelated: 271 # note that during sync, if the transactions are not properly sorted, 272 # it could happen that we think tx is unrelated but actually one of the inputs is is_mine. 273 # this is the main motivation for allow_unrelated 274 is_mine = any([self.is_mine(self.get_txin_address(txin)) for txin in tx.inputs()]) 275 is_for_me = any([self.is_mine(self.get_txout_address(txo)) for txo in tx.outputs()]) 276 if not is_mine and not is_for_me: 277 raise UnrelatedTransactionException() 278 # Find all conflicting transactions. 279 # In case of a conflict, 280 # 1. confirmed > mempool > local 281 # 2. this new txn has priority over existing ones 282 # When this method exits, there must NOT be any conflict, so 283 # either keep this txn and remove all conflicting (along with dependencies) 284 # or drop this txn 285 conflicting_txns = self.get_conflicting_transactions(tx_hash, tx) 286 if conflicting_txns: 287 existing_mempool_txn = any( 288 self.get_tx_height(tx_hash2).height in (TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_UNCONF_PARENT) 289 for tx_hash2 in conflicting_txns) 290 existing_confirmed_txn = any( 291 self.get_tx_height(tx_hash2).height > 0 292 for tx_hash2 in conflicting_txns) 293 if existing_confirmed_txn and tx_height <= 0: 294 # this is a non-confirmed tx that conflicts with confirmed txns; drop. 295 return False 296 if existing_mempool_txn and tx_height == TX_HEIGHT_LOCAL: 297 # this is a local tx that conflicts with non-local txns; drop. 298 return False 299 # keep this txn and remove all conflicting 300 for tx_hash2 in conflicting_txns: 301 self.remove_transaction(tx_hash2) 302 # add inputs 303 def add_value_from_prev_output(): 304 # note: this takes linear time in num is_mine outputs of prev_tx 305 addr = self.get_txin_address(txi) 306 if addr and self.is_mine(addr): 307 outputs = self.db.get_txo_addr(prevout_hash, addr) 308 try: 309 v, is_cb = outputs[prevout_n] 310 except KeyError: 311 pass 312 else: 313 self.db.add_txi_addr(tx_hash, addr, ser, v) 314 self._get_addr_balance_cache.pop(addr, None) # invalidate cache 315 for txi in tx.inputs(): 316 if txi.is_coinbase_input(): 317 continue 318 prevout_hash = txi.prevout.txid.hex() 319 prevout_n = txi.prevout.out_idx 320 ser = txi.prevout.to_str() 321 self.db.set_spent_outpoint(prevout_hash, prevout_n, tx_hash) 322 add_value_from_prev_output() 323 # add outputs 324 for n, txo in enumerate(tx.outputs()): 325 v = txo.value 326 ser = tx_hash + ':%d'%n 327 scripthash = bitcoin.script_to_scripthash(txo.scriptpubkey.hex()) 328 self.db.add_prevout_by_scripthash(scripthash, prevout=TxOutpoint.from_str(ser), value=v) 329 addr = self.get_txout_address(txo) 330 if addr and self.is_mine(addr): 331 self.db.add_txo_addr(tx_hash, addr, n, v, is_coinbase) 332 self._get_addr_balance_cache.pop(addr, None) # invalidate cache 333 # give v to txi that spends me 334 next_tx = self.db.get_spent_outpoint(tx_hash, n) 335 if next_tx is not None: 336 self.db.add_txi_addr(next_tx, addr, ser, v) 337 self._add_tx_to_local_history(next_tx) 338 # add to local history 339 self._add_tx_to_local_history(tx_hash) 340 # save 341 self.db.add_transaction(tx_hash, tx) 342 self.db.add_num_inputs_to_tx(tx_hash, len(tx.inputs())) 343 return True 344 345 def remove_transaction(self, tx_hash: str) -> None: 346 """Removes a transaction AND all its dependents/children 347 from the wallet history. 348 """ 349 with self.lock, self.transaction_lock: 350 to_remove = {tx_hash} 351 to_remove |= self.get_depending_transactions(tx_hash) 352 for txid in to_remove: 353 self._remove_transaction(txid) 354 355 def _remove_transaction(self, tx_hash: str) -> None: 356 """Removes a single transaction from the wallet history, and attempts 357 to undo all effects of the tx (spending inputs, creating outputs, etc). 358 """ 359 def remove_from_spent_outpoints(): 360 # undo spends in spent_outpoints 361 if tx is not None: 362 # if we have the tx, this branch is faster 363 for txin in tx.inputs(): 364 if txin.is_coinbase_input(): 365 continue 366 prevout_hash = txin.prevout.txid.hex() 367 prevout_n = txin.prevout.out_idx 368 self.db.remove_spent_outpoint(prevout_hash, prevout_n) 369 else: 370 # expensive but always works 371 for prevout_hash, prevout_n in self.db.list_spent_outpoints(): 372 spending_txid = self.db.get_spent_outpoint(prevout_hash, prevout_n) 373 if spending_txid == tx_hash: 374 self.db.remove_spent_outpoint(prevout_hash, prevout_n) 375 376 with self.lock, self.transaction_lock: 377 self.logger.info(f"removing tx from history {tx_hash}") 378 tx = self.db.remove_transaction(tx_hash) 379 remove_from_spent_outpoints() 380 self._remove_tx_from_local_history(tx_hash) 381 for addr in itertools.chain(self.db.get_txi_addresses(tx_hash), self.db.get_txo_addresses(tx_hash)): 382 self._get_addr_balance_cache.pop(addr, None) # invalidate cache 383 self.db.remove_txi(tx_hash) 384 self.db.remove_txo(tx_hash) 385 self.db.remove_tx_fee(tx_hash) 386 self.db.remove_verified_tx(tx_hash) 387 self.unverified_tx.pop(tx_hash, None) 388 if tx: 389 for idx, txo in enumerate(tx.outputs()): 390 scripthash = bitcoin.script_to_scripthash(txo.scriptpubkey.hex()) 391 prevout = TxOutpoint(bfh(tx_hash), idx) 392 self.db.remove_prevout_by_scripthash(scripthash, prevout=prevout, value=txo.value) 393 394 def get_depending_transactions(self, tx_hash: str) -> Set[str]: 395 """Returns all (grand-)children of tx_hash in this wallet.""" 396 with self.transaction_lock: 397 children = set() 398 for n in self.db.get_spent_outpoints(tx_hash): 399 other_hash = self.db.get_spent_outpoint(tx_hash, n) 400 children.add(other_hash) 401 children |= self.get_depending_transactions(other_hash) 402 return children 403 404 def receive_tx_callback(self, tx_hash: str, tx: Transaction, tx_height: int) -> None: 405 self.add_unverified_tx(tx_hash, tx_height) 406 self.add_transaction(tx, allow_unrelated=True) 407 408 def receive_history_callback(self, addr: str, hist, tx_fees: Dict[str, int]): 409 with self.lock: 410 old_hist = self.get_address_history(addr) 411 for tx_hash, height in old_hist: 412 if (tx_hash, height) not in hist: 413 # make tx local 414 self.unverified_tx.pop(tx_hash, None) 415 self.db.remove_verified_tx(tx_hash) 416 if self.verifier: 417 self.verifier.remove_spv_proof_for_tx(tx_hash) 418 self.db.set_addr_history(addr, hist) 419 420 for tx_hash, tx_height in hist: 421 # add it in case it was previously unconfirmed 422 self.add_unverified_tx(tx_hash, tx_height) 423 # if addr is new, we have to recompute txi and txo 424 tx = self.db.get_transaction(tx_hash) 425 if tx is None: 426 continue 427 self.add_transaction(tx, allow_unrelated=True) 428 429 # Store fees 430 for tx_hash, fee_sat in tx_fees.items(): 431 self.db.add_tx_fee_from_server(tx_hash, fee_sat) 432 433 @profiler 434 def load_local_history(self): 435 self._history_local = {} # type: Dict[str, Set[str]] # address -> set(txid) 436 self._address_history_changed_events = defaultdict(asyncio.Event) # address -> Event 437 for txid in itertools.chain(self.db.list_txi(), self.db.list_txo()): 438 self._add_tx_to_local_history(txid) 439 440 @profiler 441 def check_history(self): 442 hist_addrs_mine = list(filter(lambda k: self.is_mine(k), self.db.get_history())) 443 hist_addrs_not_mine = list(filter(lambda k: not self.is_mine(k), self.db.get_history())) 444 for addr in hist_addrs_not_mine: 445 self.db.remove_addr_history(addr) 446 for addr in hist_addrs_mine: 447 hist = self.db.get_addr_history(addr) 448 for tx_hash, tx_height in hist: 449 if self.db.get_txi_addresses(tx_hash) or self.db.get_txo_addresses(tx_hash): 450 continue 451 tx = self.db.get_transaction(tx_hash) 452 if tx is not None: 453 self.add_transaction(tx, allow_unrelated=True) 454 455 def remove_local_transactions_we_dont_have(self): 456 for txid in itertools.chain(self.db.list_txi(), self.db.list_txo()): 457 tx_height = self.get_tx_height(txid).height 458 if tx_height == TX_HEIGHT_LOCAL and not self.db.get_transaction(txid): 459 self.remove_transaction(txid) 460 461 def clear_history(self): 462 with self.lock: 463 with self.transaction_lock: 464 self.db.clear_history() 465 self._history_local.clear() 466 self._get_addr_balance_cache = {} # invalidate cache 467 468 def get_txpos(self, tx_hash): 469 """Returns (height, txpos) tuple, even if the tx is unverified.""" 470 with self.lock: 471 verified_tx_mined_info = self.db.get_verified_tx(tx_hash) 472 if verified_tx_mined_info: 473 return verified_tx_mined_info.height, verified_tx_mined_info.txpos 474 elif tx_hash in self.unverified_tx: 475 height = self.unverified_tx[tx_hash] 476 return (height, -1) if height > 0 else ((1e9 - height), -1) 477 else: 478 return (1e9+1, -1) 479 480 def with_local_height_cached(func): 481 # get local height only once, as it's relatively expensive. 482 # take care that nested calls work as expected 483 def f(self, *args, **kwargs): 484 orig_val = getattr(self.threadlocal_cache, 'local_height', None) 485 self.threadlocal_cache.local_height = orig_val or self.get_local_height() 486 try: 487 return func(self, *args, **kwargs) 488 finally: 489 self.threadlocal_cache.local_height = orig_val 490 return f 491 492 @with_lock 493 @with_transaction_lock 494 @with_local_height_cached 495 def get_history(self, *, domain=None) -> Sequence[HistoryItem]: 496 # get domain 497 if domain is None: 498 domain = self.get_addresses() 499 domain = set(domain) 500 # 1. Get the history of each address in the domain, maintain the 501 # delta of a tx as the sum of its deltas on domain addresses 502 tx_deltas = defaultdict(int) # type: Dict[str, int] 503 for addr in domain: 504 h = self.get_address_history(addr) 505 for tx_hash, height in h: 506 tx_deltas[tx_hash] += self.get_tx_delta(tx_hash, addr) 507 # 2. create sorted history 508 history = [] 509 for tx_hash in tx_deltas: 510 delta = tx_deltas[tx_hash] 511 tx_mined_status = self.get_tx_height(tx_hash) 512 fee = self.get_tx_fee(tx_hash) 513 history.append((tx_hash, tx_mined_status, delta, fee)) 514 history.sort(key = lambda x: self.get_txpos(x[0]), reverse=True) 515 # 3. add balance 516 c, u, x = self.get_balance(domain) 517 balance = c + u + x 518 h2 = [] 519 for tx_hash, tx_mined_status, delta, fee in history: 520 h2.append(HistoryItem(txid=tx_hash, 521 tx_mined_status=tx_mined_status, 522 delta=delta, 523 fee=fee, 524 balance=balance)) 525 balance -= delta 526 h2.reverse() 527 528 if balance != 0: 529 raise Exception("wallet.get_history() failed balance sanity-check") 530 531 return h2 532 533 def _add_tx_to_local_history(self, txid): 534 with self.transaction_lock: 535 for addr in itertools.chain(self.db.get_txi_addresses(txid), self.db.get_txo_addresses(txid)): 536 cur_hist = self._history_local.get(addr, set()) 537 cur_hist.add(txid) 538 self._history_local[addr] = cur_hist 539 self._mark_address_history_changed(addr) 540 541 def _remove_tx_from_local_history(self, txid): 542 with self.transaction_lock: 543 for addr in itertools.chain(self.db.get_txi_addresses(txid), self.db.get_txo_addresses(txid)): 544 cur_hist = self._history_local.get(addr, set()) 545 try: 546 cur_hist.remove(txid) 547 except KeyError: 548 pass 549 else: 550 self._history_local[addr] = cur_hist 551 552 def _mark_address_history_changed(self, addr: str) -> None: 553 # history for this address changed, wake up coroutines: 554 self._address_history_changed_events[addr].set() 555 # clear event immediately so that coroutines can wait() for the next change: 556 self._address_history_changed_events[addr].clear() 557 558 async def wait_for_address_history_to_change(self, addr: str) -> None: 559 """Wait until the server tells us about a new transaction related to addr. 560 561 Unconfirmed and confirmed transactions are not distinguished, and so e.g. SPV 562 is not taken into account. 563 """ 564 assert self.is_mine(addr), "address needs to be is_mine to be watched" 565 await self._address_history_changed_events[addr].wait() 566 567 def add_unverified_tx(self, tx_hash, tx_height): 568 if self.db.is_in_verified_tx(tx_hash): 569 if tx_height in (TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_UNCONF_PARENT): 570 with self.lock: 571 self.db.remove_verified_tx(tx_hash) 572 if self.verifier: 573 self.verifier.remove_spv_proof_for_tx(tx_hash) 574 else: 575 with self.lock: 576 # tx will be verified only if height > 0 577 self.unverified_tx[tx_hash] = tx_height 578 579 def remove_unverified_tx(self, tx_hash, tx_height): 580 with self.lock: 581 new_height = self.unverified_tx.get(tx_hash) 582 if new_height == tx_height: 583 self.unverified_tx.pop(tx_hash, None) 584 585 def add_verified_tx(self, tx_hash: str, info: TxMinedInfo): 586 # Remove from the unverified map and add to the verified map 587 with self.lock: 588 self.unverified_tx.pop(tx_hash, None) 589 self.db.add_verified_tx(tx_hash, info) 590 tx_mined_status = self.get_tx_height(tx_hash) 591 util.trigger_callback('verified', self, tx_hash, tx_mined_status) 592 593 def get_unverified_txs(self): 594 '''Returns a map from tx hash to transaction height''' 595 with self.lock: 596 return dict(self.unverified_tx) # copy 597 598 def undo_verifications(self, blockchain, above_height): 599 '''Used by the verifier when a reorg has happened''' 600 txs = set() 601 with self.lock: 602 for tx_hash in self.db.list_verified_tx(): 603 info = self.db.get_verified_tx(tx_hash) 604 tx_height = info.height 605 if tx_height > above_height: 606 header = blockchain.read_header(tx_height) 607 if not header or hash_header(header) != info.header_hash: 608 self.db.remove_verified_tx(tx_hash) 609 # NOTE: we should add these txns to self.unverified_tx, 610 # but with what height? 611 # If on the new fork after the reorg, the txn is at the 612 # same height, we will not get a status update for the 613 # address. If the txn is not mined or at a diff height, 614 # we should get a status update. Unless we put tx into 615 # unverified_tx, it will turn into local. So we put it 616 # into unverified_tx with the old height, and if we get 617 # a status update, that will overwrite it. 618 self.unverified_tx[tx_hash] = tx_height 619 txs.add(tx_hash) 620 return txs 621 622 def get_local_height(self) -> int: 623 """ return last known height if we are offline """ 624 cached_local_height = getattr(self.threadlocal_cache, 'local_height', None) 625 if cached_local_height is not None: 626 return cached_local_height 627 return self.network.get_local_height() if self.network else self.db.get('stored_height', 0) 628 629 def add_future_tx(self, tx: Transaction, num_blocks: int) -> bool: 630 assert num_blocks > 0, num_blocks 631 with self.lock: 632 tx_was_added = self.add_transaction(tx) 633 if tx_was_added: 634 self.future_tx[tx.txid()] = num_blocks 635 return tx_was_added 636 637 def get_tx_height(self, tx_hash: str) -> TxMinedInfo: 638 if tx_hash is None: # ugly backwards compat... 639 return TxMinedInfo(height=TX_HEIGHT_LOCAL, conf=0) 640 with self.lock: 641 verified_tx_mined_info = self.db.get_verified_tx(tx_hash) 642 if verified_tx_mined_info: 643 conf = max(self.get_local_height() - verified_tx_mined_info.height + 1, 0) 644 return verified_tx_mined_info._replace(conf=conf) 645 elif tx_hash in self.unverified_tx: 646 height = self.unverified_tx[tx_hash] 647 return TxMinedInfo(height=height, conf=0) 648 elif tx_hash in self.future_tx: 649 num_blocks_remainining = self.future_tx[tx_hash] 650 assert num_blocks_remainining > 0, num_blocks_remainining 651 return TxMinedInfo(height=TX_HEIGHT_FUTURE, conf=-num_blocks_remainining) 652 else: 653 # local transaction 654 return TxMinedInfo(height=TX_HEIGHT_LOCAL, conf=0) 655 656 def set_up_to_date(self, up_to_date): 657 with self.lock: 658 status_changed = self.up_to_date != up_to_date 659 self.up_to_date = up_to_date 660 if self.network: 661 self.network.notify('status') 662 if status_changed: 663 self.logger.info(f'set_up_to_date: {up_to_date}') 664 665 def is_up_to_date(self): 666 with self.lock: return self.up_to_date 667 668 def get_history_sync_state_details(self) -> Tuple[int, int]: 669 if self.synchronizer: 670 return self.synchronizer.num_requests_sent_and_answered() 671 else: 672 return 0, 0 673 674 @with_transaction_lock 675 def get_tx_delta(self, tx_hash: str, address: str) -> int: 676 """effect of tx on address""" 677 delta = 0 678 # subtract the value of coins sent from address 679 d = self.db.get_txi_addr(tx_hash, address) 680 for n, v in d: 681 delta -= v 682 # add the value of the coins received at address 683 d = self.db.get_txo_addr(tx_hash, address) 684 for n, (v, cb) in d.items(): 685 delta += v 686 return delta 687 688 def get_wallet_delta(self, tx: Transaction) -> TxWalletDelta: 689 """effect of tx on wallet""" 690 is_relevant = False # "related to wallet?" 691 num_input_ismine = 0 692 v_in = v_in_mine = v_out = v_out_mine = 0 693 with self.lock, self.transaction_lock: 694 for txin in tx.inputs(): 695 addr = self.get_txin_address(txin) 696 value = self.get_txin_value(txin, address=addr) 697 if self.is_mine(addr): 698 num_input_ismine += 1 699 is_relevant = True 700 assert value is not None 701 v_in_mine += value 702 if value is None: 703 v_in = None 704 elif v_in is not None: 705 v_in += value 706 for txout in tx.outputs(): 707 v_out += txout.value 708 if self.is_mine(txout.address): 709 v_out_mine += txout.value 710 is_relevant = True 711 delta = v_out_mine - v_in_mine 712 if v_in is not None: 713 fee = v_in - v_out 714 else: 715 fee = None 716 if fee is None and isinstance(tx, PartialTransaction): 717 fee = tx.get_fee() 718 return TxWalletDelta( 719 is_relevant=is_relevant, 720 is_any_input_ismine=num_input_ismine > 0, 721 is_all_input_ismine=num_input_ismine == len(tx.inputs()), 722 delta=delta, 723 fee=fee, 724 ) 725 726 def get_tx_fee(self, txid: str) -> Optional[int]: 727 """ Returns tx_fee or None. Use server fee only if tx is unconfirmed and not mine""" 728 # check if stored fee is available 729 fee = self.db.get_tx_fee(txid, trust_server=False) 730 if fee is not None: 731 return fee 732 # delete server-sent fee for confirmed txns 733 confirmed = self.get_tx_height(txid).conf > 0 734 if confirmed: 735 self.db.add_tx_fee_from_server(txid, None) 736 # if all inputs are ismine, try to calc fee now; 737 # otherwise, return stored value 738 num_all_inputs = self.db.get_num_all_inputs_of_tx(txid) 739 if num_all_inputs is not None: 740 # check if tx is mine 741 num_ismine_inputs = self.db.get_num_ismine_inputs_of_tx(txid) 742 assert num_ismine_inputs <= num_all_inputs, (num_ismine_inputs, num_all_inputs) 743 # trust server if tx is unconfirmed and not mine 744 if num_ismine_inputs < num_all_inputs: 745 return None if confirmed else self.db.get_tx_fee(txid, trust_server=True) 746 # lookup tx and deserialize it. 747 # note that deserializing is expensive, hence above hacks 748 tx = self.db.get_transaction(txid) 749 if not tx: 750 return None 751 fee = self.get_wallet_delta(tx).fee 752 # save result 753 self.db.add_tx_fee_we_calculated(txid, fee) 754 self.db.add_num_inputs_to_tx(txid, len(tx.inputs())) 755 return fee 756 757 def get_addr_io(self, address): 758 with self.lock, self.transaction_lock: 759 h = self.get_address_history(address) 760 received = {} 761 sent = {} 762 for tx_hash, height in h: 763 d = self.db.get_txo_addr(tx_hash, address) 764 for n, (v, is_cb) in d.items(): 765 received[tx_hash + ':%d'%n] = (height, v, is_cb) 766 for tx_hash, height in h: 767 l = self.db.get_txi_addr(tx_hash, address) 768 for txi, v in l: 769 sent[txi] = height 770 return received, sent 771 772 773 def get_addr_outputs(self, address: str) -> Dict[TxOutpoint, PartialTxInput]: 774 coins, spent = self.get_addr_io(address) 775 out = {} 776 for prevout_str, v in coins.items(): 777 tx_height, value, is_cb = v 778 prevout = TxOutpoint.from_str(prevout_str) 779 utxo = PartialTxInput(prevout=prevout, is_coinbase_output=is_cb) 780 utxo._trusted_address = address 781 utxo._trusted_value_sats = value 782 utxo.block_height = tx_height 783 utxo.spent_height = spent.get(prevout_str, None) 784 out[prevout] = utxo 785 return out 786 787 def get_addr_utxo(self, address: str) -> Dict[TxOutpoint, PartialTxInput]: 788 out = self.get_addr_outputs(address) 789 for k, v in list(out.items()): 790 if v.spent_height is not None: 791 out.pop(k) 792 return out 793 794 # return the total amount ever received by an address 795 def get_addr_received(self, address): 796 received, sent = self.get_addr_io(address) 797 return sum([v for height, v, is_cb in received.values()]) 798 799 @with_local_height_cached 800 def get_addr_balance(self, address, *, excluded_coins: Set[str] = None) -> Tuple[int, int, int]: 801 """Return the balance of a bitcoin address: 802 confirmed and matured, unconfirmed, unmatured 803 """ 804 if not excluded_coins: # cache is only used if there are no excluded_coins 805 cached_value = self._get_addr_balance_cache.get(address) 806 if cached_value: 807 return cached_value 808 if excluded_coins is None: 809 excluded_coins = set() 810 assert isinstance(excluded_coins, set), f"excluded_coins should be set, not {type(excluded_coins)}" 811 received, sent = self.get_addr_io(address) 812 c = u = x = 0 813 mempool_height = self.get_local_height() + 1 # height of next block 814 for txo, (tx_height, v, is_cb) in received.items(): 815 if txo in excluded_coins: 816 continue 817 if is_cb and tx_height + COINBASE_MATURITY > mempool_height: 818 x += v 819 elif tx_height > 0: 820 c += v 821 else: 822 u += v 823 if txo in sent: 824 if sent[txo] > 0: 825 c -= v 826 else: 827 u -= v 828 result = c, u, x 829 # cache result. 830 if not excluded_coins: 831 # Cache needs to be invalidated if a transaction is added to/ 832 # removed from history; or on new blocks (maturity...) 833 self._get_addr_balance_cache[address] = result 834 return result 835 836 @with_local_height_cached 837 def get_utxos(self, domain=None, *, excluded_addresses=None, 838 mature_only: bool = False, confirmed_only: bool = False, 839 nonlocal_only: bool = False) -> Sequence[PartialTxInput]: 840 coins = [] 841 if domain is None: 842 domain = self.get_addresses() 843 domain = set(domain) 844 if excluded_addresses: 845 domain = set(domain) - set(excluded_addresses) 846 mempool_height = self.get_local_height() + 1 # height of next block 847 for addr in domain: 848 utxos = self.get_addr_utxo(addr) 849 for utxo in utxos.values(): 850 if confirmed_only and utxo.block_height <= 0: 851 continue 852 if nonlocal_only and utxo.block_height == TX_HEIGHT_LOCAL: 853 continue 854 if (mature_only and utxo.is_coinbase_output() 855 and utxo.block_height + COINBASE_MATURITY > mempool_height): 856 continue 857 coins.append(utxo) 858 continue 859 return coins 860 861 def get_balance(self, domain=None, *, excluded_addresses: Set[str] = None, 862 excluded_coins: Set[str] = None) -> Tuple[int, int, int]: 863 if domain is None: 864 domain = self.get_addresses() 865 if excluded_addresses is None: 866 excluded_addresses = set() 867 assert isinstance(excluded_addresses, set), f"excluded_addresses should be set, not {type(excluded_addresses)}" 868 domain = set(domain) - excluded_addresses 869 cc = uu = xx = 0 870 for addr in domain: 871 c, u, x = self.get_addr_balance(addr, excluded_coins=excluded_coins) 872 cc += c 873 uu += u 874 xx += x 875 return cc, uu, xx 876 877 def is_used(self, address: str) -> bool: 878 return self.get_address_history_len(address) != 0 879 880 def is_empty(self, address: str) -> bool: 881 c, u, x = self.get_addr_balance(address) 882 return c+u+x == 0 883 884 def synchronize(self): 885 pass