commit e3888752d6508ebbb9b4e9a8a09555914cd68fd2
parent 69dc762a5ab0b47540c162fde787652dbdac109d
Author: ThomasV <>
Date: Wed, 18 Jul 2018 11:18:57 +0200
separate address synchronizer from wallet
7 files changed, 508 insertions(+), 480 deletions(-)
diff --git a/electrum/ b/electrum/
@@ -1,6 +1,6 @@
from .version import ELECTRUM_VERSION
from .util import format_satoshis, print_msg, print_error, set_verbosity
-from .wallet import Synchronizer, Wallet
+from .wallet import Wallet
from .storage import WalletStorage
from .coinchooser import COIN_CHOOSERS
from .network import Network, pick_random_server
diff --git a/electrum/ b/electrum/
@@ -0,0 +1,494 @@
+# Electrum - lightweight Bitcoin client
+# Copyright (C) 2018 The Electrum Developers
+# Permission is hereby granted, free of charge, to any person
+# obtaining a copy of this software and associated documentation files
+# (the "Software"), to deal in the Software without restriction,
+# including without limitation the rights to use, copy, modify, merge,
+# publish, distribute, sublicense, and/or sell copies of the Software,
+# and to permit persons to whom the Software is furnished to do so,
+# subject to the following conditions:
+# The above copyright notice and this permission notice shall be
+# included in all copies or substantial portions of the Software.
+import threading
+import itertools
+from collections import defaultdict
+from .util import PrintError, profiler
+from .transaction import Transaction
+from .synchronizer import Synchronizer
+from .verifier import SPV
+class AddTransactionException(Exception):
+ pass
+class UnrelatedTransactionException(AddTransactionException):
+ def __str__(self):
+ return _("Transaction is unrelated to this wallet.")
+class AddressSynchronizer(PrintError):
+ """
+ inherited by wallet
+ """
+ def __init__(self, storage):
+ = storage
+ = None
+ # verifier (SPV) and synchronizer are started in start_threads
+ self.synchronizer = None
+ self.verifier = None
+ # locks: if you need to take multiple ones, acquire them in the order they are defined here!
+ self.lock = threading.RLock()
+ self.transaction_lock = threading.RLock()
+ # address -> list(txid, height)
+ self.history = storage.get('addr_history',{})
+ # Verified transactions. txid -> (height, timestamp, block_pos). Access with self.lock.
+ self.verified_tx = storage.get('verified_tx3', {})
+ # Transactions pending verification. txid -> tx_height. Access with self.lock.
+ self.unverified_tx = defaultdict(int)
+ # true when synchronized
+ self.up_to_date = False
+ self.load_transactions()
+ self.load_local_history()
+ self.load_unverified_transactions()
+ self.remove_local_transactions_we_dont_have()
+ def load_unverified_transactions(self):
+ # review transactions that are in the history
+ for addr, hist in self.history.items():
+ for tx_hash, tx_height in hist:
+ # add it in case it was previously unconfirmed
+ self.add_unverified_tx(tx_hash, tx_height)
+ def start_threads(self, network):
+ = network
+ if is not None:
+ self.verifier = SPV(, self)
+ self.synchronizer = Synchronizer(self, network)
+ network.add_jobs([self.verifier, self.synchronizer])
+ else:
+ self.verifier = None
+ self.synchronizer = None
+ def stop_threads(self):
+ if
+[self.synchronizer, self.verifier])
+ self.synchronizer.release()
+ self.synchronizer = None
+ self.verifier = None
+ # Now no references to the synchronizer or verifier
+ # remain so they will be GC-ed
+'stored_height', self.get_local_height())
+ self.save_transactions()
+ self.save_verified_tx()
+ def add_address(self, address):
+ if address not in self.history:
+ self.history[address] = []
+ self.set_up_to_date(False)
+ if self.synchronizer:
+ self.synchronizer.add(address)
+ def get_conflicting_transactions(self, tx):
+ """Returns a set of transaction hashes from the wallet history that are
+ directly conflicting with tx, i.e. they have common outpoints being
+ spent with tx. If the tx is already in wallet history, that will not be
+ reported as a conflict.
+ """
+ conflicting_txns = set()
+ with self.transaction_lock:
+ for txin in tx.inputs():
+ if txin['type'] == 'coinbase':
+ continue
+ prevout_hash = txin['prevout_hash']
+ prevout_n = txin['prevout_n']
+ spending_tx_hash = self.spent_outpoints[prevout_hash].get(prevout_n)
+ if spending_tx_hash is None:
+ continue
+ # this outpoint has already been spent, by spending_tx
+ assert spending_tx_hash in self.transactions
+ conflicting_txns |= {spending_tx_hash}
+ txid = tx.txid()
+ if txid in conflicting_txns:
+ # this tx is already in history, so it conflicts with itself
+ if len(conflicting_txns) > 1:
+ raise Exception('Found conflicting transactions already in wallet history.')
+ conflicting_txns -= {txid}
+ return conflicting_txns
+ def add_transaction(self, tx_hash, tx, allow_unrelated=False):
+ assert tx_hash, tx_hash
+ assert tx, tx
+ assert tx.is_complete()
+ # we need self.transaction_lock but get_tx_height will take self.lock
+ # so we need to take that too here, to enforce order of locks
+ with self.lock, self.transaction_lock:
+ # NOTE: returning if tx in self.transactions might seem like a good idea
+ # BUT we track is_mine inputs in a txn, and during subsequent calls
+ # of add_transaction tx, we might learn of more-and-more inputs of
+ # being is_mine, as we roll the gap_limit forward
+ is_coinbase = tx.inputs()[0]['type'] == 'coinbase'
+ tx_height = self.get_tx_height(tx_hash)[0]
+ if not allow_unrelated:
+ # note that during sync, if the transactions are not properly sorted,
+ # it could happen that we think tx is unrelated but actually one of the inputs is is_mine.
+ # this is the main motivation for allow_unrelated
+ is_mine = any([self.is_mine(self.get_txin_address(txin)) for txin in tx.inputs()])
+ is_for_me = any([self.is_mine(self.get_txout_address(txo)) for txo in tx.outputs()])
+ if not is_mine and not is_for_me:
+ raise UnrelatedTransactionException()
+ # Find all conflicting transactions.
+ # In case of a conflict,
+ # 1. confirmed > mempool > local
+ # 2. this new txn has priority over existing ones
+ # When this method exits, there must NOT be any conflict, so
+ # either keep this txn and remove all conflicting (along with dependencies)
+ # or drop this txn
+ conflicting_txns = self.get_conflicting_transactions(tx)
+ if conflicting_txns:
+ existing_mempool_txn = any(
+ self.get_tx_height(tx_hash2)[0] in (TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_UNCONF_PARENT)
+ for tx_hash2 in conflicting_txns)
+ existing_confirmed_txn = any(
+ self.get_tx_height(tx_hash2)[0] > 0
+ for tx_hash2 in conflicting_txns)
+ if existing_confirmed_txn and tx_height <= 0:
+ # this is a non-confirmed tx that conflicts with confirmed txns; drop.
+ return False
+ if existing_mempool_txn and tx_height == TX_HEIGHT_LOCAL:
+ # this is a local tx that conflicts with non-local txns; drop.
+ return False
+ # keep this txn and remove all conflicting
+ to_remove = set()
+ to_remove |= conflicting_txns
+ for conflicting_tx_hash in conflicting_txns:
+ to_remove |= self.get_depending_transactions(conflicting_tx_hash)
+ for tx_hash2 in to_remove:
+ self.remove_transaction(tx_hash2)
+ # add inputs
+ def add_value_from_prev_output():
+ dd = self.txo.get(prevout_hash, {})
+ # note: this nested loop takes linear time in num is_mine outputs of prev_tx
+ for addr, outputs in dd.items():
+ # note: instead of [(n, v, is_cb), ...]; we could store: {n -> (v, is_cb)}
+ for n, v, is_cb in outputs:
+ if n == prevout_n:
+ if addr and self.is_mine(addr):
+ if d.get(addr) is None:
+ d[addr] = set()
+ d[addr].add((ser, v))
+ return
+ self.txi[tx_hash] = d = {}
+ for txi in tx.inputs():
+ if txi['type'] == 'coinbase':
+ continue
+ prevout_hash = txi['prevout_hash']
+ prevout_n = txi['prevout_n']
+ ser = prevout_hash + ':%d' % prevout_n
+ self.spent_outpoints[prevout_hash][prevout_n] = tx_hash
+ add_value_from_prev_output()
+ # add outputs
+ self.txo[tx_hash] = d = {}
+ for n, txo in enumerate(tx.outputs()):
+ v = txo[2]
+ ser = tx_hash + ':%d'%n
+ addr = self.get_txout_address(txo)
+ if addr and self.is_mine(addr):
+ if d.get(addr) is None:
+ d[addr] = []
+ d[addr].append((n, v, is_coinbase))
+ # give v to txi that spends me
+ next_tx = self.spent_outpoints[tx_hash].get(n)
+ if next_tx is not None:
+ dd = self.txi.get(next_tx, {})
+ if dd.get(addr) is None:
+ dd[addr] = set()
+ if (ser, v) not in dd[addr]:
+ dd[addr].add((ser, v))
+ self._add_tx_to_local_history(next_tx)
+ # add to local history
+ self._add_tx_to_local_history(tx_hash)
+ # save
+ self.transactions[tx_hash] = tx
+ return True
+ def remove_transaction(self, tx_hash):
+ def remove_from_spent_outpoints():
+ # undo spends in spent_outpoints
+ if tx is not None: # if we have the tx, this branch is faster
+ for txin in tx.inputs():
+ if txin['type'] == 'coinbase':
+ continue
+ prevout_hash = txin['prevout_hash']
+ prevout_n = txin['prevout_n']
+ self.spent_outpoints[prevout_hash].pop(prevout_n, None)
+ if not self.spent_outpoints[prevout_hash]:
+ self.spent_outpoints.pop(prevout_hash)
+ else: # expensive but always works
+ for prevout_hash, d in list(self.spent_outpoints.items()):
+ for prevout_n, spending_txid in d.items():
+ if spending_txid == tx_hash:
+ self.spent_outpoints[prevout_hash].pop(prevout_n, None)
+ if not self.spent_outpoints[prevout_hash]:
+ self.spent_outpoints.pop(prevout_hash)
+ # Remove this tx itself; if nothing spends from it.
+ # It is not so clear what to do if other txns spend from it, but it will be
+ # removed when those other txns are removed.
+ if not self.spent_outpoints[tx_hash]:
+ self.spent_outpoints.pop(tx_hash)
+ with self.transaction_lock:
+ self.print_error("removing tx from history", tx_hash)
+ tx = self.transactions.pop(tx_hash, None)
+ remove_from_spent_outpoints()
+ self._remove_tx_from_local_history(tx_hash)
+ self.txi.pop(tx_hash, None)
+ self.txo.pop(tx_hash, None)
+ def receive_tx_callback(self, tx_hash, tx, tx_height):
+ self.add_unverified_tx(tx_hash, tx_height)
+ self.add_transaction(tx_hash, tx, allow_unrelated=True)
+ def receive_history_callback(self, addr, hist, tx_fees):
+ with self.lock:
+ old_hist = self.get_address_history(addr)
+ for tx_hash, height in old_hist:
+ if (tx_hash, height) not in hist:
+ # make tx local
+ self.unverified_tx.pop(tx_hash, None)
+ self.verified_tx.pop(tx_hash, None)
+ if self.verifier:
+ self.verifier.remove_spv_proof_for_tx(tx_hash)
+ self.history[addr] = hist
+ for tx_hash, tx_height in hist:
+ # add it in case it was previously unconfirmed
+ self.add_unverified_tx(tx_hash, tx_height)
+ # if addr is new, we have to recompute txi and txo
+ tx = self.transactions.get(tx_hash)
+ if tx is None:
+ continue
+ self.add_transaction(tx_hash, tx, allow_unrelated=True)
+ # Store fees
+ self.tx_fees.update(tx_fees)
+ @profiler
+ def load_transactions(self):
+ # load txi, txo, tx_fees
+ self.txi ='txi', {})
+ for txid, d in list(self.txi.items()):
+ for addr, lst in d.items():
+ self.txi[txid][addr] = set([tuple(x) for x in lst])
+ self.txo ='txo', {})
+ self.tx_fees ='tx_fees', {})
+ tx_list ='transactions', {})
+ # load transactions
+ self.transactions = {}
+ for tx_hash, raw in tx_list.items():
+ tx = Transaction(raw)
+ self.transactions[tx_hash] = tx
+ if self.txi.get(tx_hash) is None and self.txo.get(tx_hash) is None:
+ self.print_error("removing unreferenced tx", tx_hash)
+ self.transactions.pop(tx_hash)
+ # load spent_outpoints
+ _spent_outpoints ='spent_outpoints', {})
+ self.spent_outpoints = defaultdict(dict)
+ for prevout_hash, d in _spent_outpoints.items():
+ for prevout_n_str, spending_txid in d.items():
+ prevout_n = int(prevout_n_str)
+ self.spent_outpoints[prevout_hash][prevout_n] = spending_txid
+ @profiler
+ def load_local_history(self):
+ self._history_local = {} # address -> set(txid)
+ for txid in itertools.chain(self.txi, self.txo):
+ self._add_tx_to_local_history(txid)
+ def remove_local_transactions_we_dont_have(self):
+ txid_set = set(self.txi) | set(self.txo)
+ for txid in txid_set:
+ tx_height = self.get_tx_height(txid)[0]
+ if tx_height == TX_HEIGHT_LOCAL and txid not in self.transactions:
+ self.remove_transaction(txid)
+ @profiler
+ def save_transactions(self, write=False):
+ with self.transaction_lock:
+ tx = {}
+ for k,v in self.transactions.items():
+ tx[k] = str(v)
+'transactions', tx)
+'txi', self.txi)
+'txo', self.txo)
+'tx_fees', self.tx_fees)
+'addr_history', self.history)
+'spent_outpoints', self.spent_outpoints)
+ if write:
+ def save_verified_tx(self, write=False):
+ with self.lock:
+'verified_tx3', self.verified_tx)
+ if write:
+ def clear_history(self):
+ with self.lock:
+ with self.transaction_lock:
+ self.txi = {}
+ self.txo = {}
+ self.tx_fees = {}
+ self.spent_outpoints = defaultdict(dict)
+ self.history = {}
+ self.verified_tx = {}
+ self.transactions = {}
+ self.save_transactions()
+ def get_history(self, domain=None):
+ # get domain
+ if domain is None:
+ domain = self.get_addresses()
+ domain = set(domain)
+ # 1. Get the history of each address in the domain, maintain the
+ # delta of a tx as the sum of its deltas on domain addresses
+ tx_deltas = defaultdict(int)
+ for addr in domain:
+ h = self.get_address_history(addr)
+ for tx_hash, height in h:
+ delta = self.get_tx_delta(tx_hash, addr)
+ if delta is None or tx_deltas[tx_hash] is None:
+ tx_deltas[tx_hash] = None
+ else:
+ tx_deltas[tx_hash] += delta
+ # 2. create sorted history
+ history = []
+ for tx_hash in tx_deltas:
+ delta = tx_deltas[tx_hash]
+ height, conf, timestamp = self.get_tx_height(tx_hash)
+ history.append((tx_hash, height, conf, timestamp, delta))
+ history.sort(key = lambda x: self.get_txpos(x[0]))
+ history.reverse()
+ # 3. add balance
+ c, u, x = self.get_balance(domain)
+ balance = c + u + x
+ h2 = []
+ for tx_hash, height, conf, timestamp, delta in history:
+ h2.append((tx_hash, height, conf, timestamp, delta, balance))
+ if balance is None or delta is None:
+ balance = None
+ else:
+ balance -= delta
+ h2.reverse()
+ # fixme: this may happen if history is incomplete
+ if balance not in [None, 0]:
+ self.print_error("Error: history not synchronized")
+ return []
+ return h2
+ def _add_tx_to_local_history(self, txid):
+ with self.transaction_lock:
+ for addr in itertools.chain(self.txi.get(txid, []), self.txo.get(txid, [])):
+ cur_hist = self._history_local.get(addr, set())
+ cur_hist.add(txid)
+ self._history_local[addr] = cur_hist
+ def _remove_tx_from_local_history(self, txid):
+ with self.transaction_lock:
+ for addr in itertools.chain(self.txi.get(txid, []), self.txo.get(txid, [])):
+ cur_hist = self._history_local.get(addr, set())
+ try:
+ cur_hist.remove(txid)
+ except KeyError:
+ pass
+ else:
+ self._history_local[addr] = cur_hist
+ def add_unverified_tx(self, tx_hash, tx_height):
+ and tx_hash in self.verified_tx:
+ with self.lock:
+ self.verified_tx.pop(tx_hash)
+ if self.verifier:
+ self.verifier.remove_spv_proof_for_tx(tx_hash)
+ # tx will be verified only if height > 0
+ if tx_hash not in self.verified_tx:
+ with self.lock:
+ self.unverified_tx[tx_hash] = tx_height
+ def add_verified_tx(self, tx_hash, info):
+ # Remove from the unverified map and add to the verified map
+ with self.lock:
+ self.unverified_tx.pop(tx_hash, None)
+ self.verified_tx[tx_hash] = info # (tx_height, timestamp, pos)
+ height, conf, timestamp = self.get_tx_height(tx_hash)
+'verified', tx_hash, height, conf, timestamp)
+ def get_unverified_txs(self):
+ '''Returns a map from tx hash to transaction height'''
+ with self.lock:
+ return dict(self.unverified_tx) # copy
+ def undo_verifications(self, blockchain, height):
+ '''Used by the verifier when a reorg has happened'''
+ txs = set()
+ with self.lock:
+ for tx_hash, item in list(self.verified_tx.items()):
+ tx_height, timestamp, pos = item
+ if tx_height >= height:
+ header = blockchain.read_header(tx_height)
+ # fixme: use block hash, not timestamp
+ if not header or header.get('timestamp') != timestamp:
+ self.verified_tx.pop(tx_hash, None)
+ txs.add(tx_hash)
+ return txs
+ def get_local_height(self):
+ """ return last known height if we are offline """
+ return if else'stored_height', 0)
+ def get_tx_height(self, tx_hash):
+ """ Given a transaction, returns (height, conf, timestamp) """
+ with self.lock:
+ if tx_hash in self.verified_tx:
+ height, timestamp, pos = self.verified_tx[tx_hash]
+ conf = max(self.get_local_height() - height + 1, 0)
+ return height, conf, timestamp
+ elif tx_hash in self.unverified_tx:
+ height = self.unverified_tx[tx_hash]
+ return height, 0, None
+ else:
+ # local transaction
+ return TX_HEIGHT_LOCAL, 0, None
+ def set_up_to_date(self, up_to_date):
+ with self.lock:
+ self.up_to_date = up_to_date
+ if up_to_date:
+ self.save_transactions(write=True)
+ # if the verifier is also up to date, persist that too;
+ # otherwise it will persist its results when it finishes
+ if self.verifier and self.verifier.is_up_to_date():
+ self.save_verified_tx(write=True)
+ def is_up_to_date(self):
+ with self.lock: return self.up_to_date
diff --git a/electrum/gui/qt/ b/electrum/gui/qt/
@@ -26,7 +26,7 @@
import webbrowser
import datetime
-from electrum.wallet import AddTransactionException, TX_HEIGHT_LOCAL
+from electrum.address_synchronizer import TX_HEIGHT_LOCAL
from .util import *
from electrum.i18n import _
from electrum.util import block_explorer_URL, profiler, print_error
diff --git a/electrum/gui/qt/ b/electrum/gui/qt/
@@ -51,7 +51,8 @@ from electrum.util import (format_time, format_satoshis, format_fee_satoshis,
base_units, base_units_list, base_unit_name_to_decimal_point,
decimal_point_to_base_unit_name, quantize_feerate)
from electrum.transaction import Transaction
-from electrum.wallet import Multisig_Wallet, AddTransactionException, CannotBumpFee
+from electrum.address_synchronizer import AddTransactionException
+from electrum.wallet import Multisig_Wallet, CannotBumpFee
from .amountedit import AmountEdit, BTCAmountEdit, MyLineEdit, FeerateEdit
from .qrcodewidget import QRCodeWidget, QRDialog
diff --git a/electrum/gui/qt/ b/electrum/gui/qt/
@@ -37,7 +37,6 @@ from electrum.plugin import run_hook
from electrum import simple_config
from electrum.util import bfh
-from electrum.wallet import AddTransactionException
from electrum.transaction import SerializationError
from .util import *
diff --git a/electrum/tests/ b/electrum/tests/
@@ -7,7 +7,8 @@ from typing import Sequence
from electrum import storage, bitcoin, keystore, constants
from electrum import Transaction
from electrum import SimpleConfig
-from electrum.wallet import TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_UNCONF_PARENT, sweep, Multisig_Wallet, Standard_Wallet, Imported_Wallet
+from electrum.address_synchronizer import TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_UNCONF_PARENT
+from electrum.wallet import sweep, Multisig_Wallet, Standard_Wallet, Imported_Wallet
from electrum.util import bfh, bh2u
from electrum.plugins.trustedcoin import trustedcoin
diff --git a/electrum/ b/electrum/
@@ -28,7 +28,7 @@
import os
-import threading
+import sys
import random
import time
import json
@@ -36,12 +36,8 @@ import copy
import errno
import traceback
from functools import partial
-from collections import defaultdict
from numbers import Number
from decimal import Decimal
-import itertools
-import sys
from .i18n import _
from .util import (NotEnoughFunds, PrintError, UserCancelled, profiler,
@@ -57,8 +53,7 @@ from .storage import multisig_type, STO_EV_PLAINTEXT, STO_EV_USER_PW, STO_EV_XPU
from . import transaction, bitcoin, coinchooser, paymentrequest, contacts
from .transaction import Transaction
from .plugin import run_hook
-from .synchronizer import Synchronizer
-from .verifier import SPV
+from .address_synchronizer import AddressSynchronizer
from .paymentrequest import PR_PAID, PR_UNPAID, PR_UNKNOWN, PR_EXPIRED
from .paymentrequest import InvoiceStore
@@ -71,9 +66,6 @@ TX_STATUS = [
def relayfee(network):
@@ -158,65 +150,37 @@ def sweep(privkeys, network, config, recipient, fee=None, imax=100):
return tx
-class AddTransactionException(Exception):
- pass
-class UnrelatedTransactionException(AddTransactionException):
- def __str__(self):
- return _("Transaction is unrelated to this wallet.")
+class CannotBumpFee(Exception): pass
-class CannotBumpFee(Exception): pass
-class Abstract_Wallet(PrintError):
+class Abstract_Wallet(AddressSynchronizer):
Wallet classes are created to handle various address generation methods.
Completion states (watching-only, single account, no seed, etc) are handled inside classes.
max_change_outputs = 3
+ gap_limit_for_change = 6
def __init__(self, storage):
+ AddressSynchronizer.__init__(self, storage)
self.electrum_version = ELECTRUM_VERSION
- = storage
- = None
- # verifier (SPV) and synchronizer are started in start_threads
- self.synchronizer = None
- self.verifier = None
- self.gap_limit_for_change = 6 # constant
- # locks: if you need to take multiple ones, acquire them in the order they are defined here!
- self.lock = threading.RLock()
- self.transaction_lock = threading.RLock()
# saved fields
self.use_change = storage.get('use_change', True)
self.multiple_change = storage.get('multiple_change', False)
self.labels = storage.get('labels', {})
self.frozen_addresses = set(storage.get('frozen_addresses',[]))
- self.history = storage.get('addr_history',{}) # address -> list(txid, height)
self.fiat_value = storage.get('fiat_value', {})
self.receive_requests = storage.get('payment_requests', {})
- # Verified transactions. txid -> (height, timestamp, block_pos). Access with self.lock.
- self.verified_tx = storage.get('verified_tx3', {})
- # Transactions pending verification. txid -> tx_height. Access with self.lock.
- self.unverified_tx = defaultdict(int)
- self.load_transactions()
- self.load_local_history()
- self.check_history()
- self.load_unverified_transactions()
- self.remove_local_transactions_we_dont_have()
- # wallet.up_to_date is true when the wallet is synchronized
- self.up_to_date = False
+ self.check_history()
# save wallet type the first time
if'wallet_type') is None:
@@ -228,7 +192,6 @@ class Abstract_Wallet(PrintError):
self.coin_price_cache = {}
def diagnostic_name(self):
return self.basename()
@@ -239,91 +202,15 @@ class Abstract_Wallet(PrintError):
return None
- def load_transactions(self):
- # load txi, txo, tx_fees
- self.txi ='txi', {})
- for txid, d in list(self.txi.items()):
- for addr, lst in d.items():
- self.txi[txid][addr] = set([tuple(x) for x in lst])
- self.txo ='txo', {})
- self.tx_fees ='tx_fees', {})
- tx_list ='transactions', {})
- # load transactions
- self.transactions = {}
- for tx_hash, raw in tx_list.items():
- tx = Transaction(raw)
- self.transactions[tx_hash] = tx
- if self.txi.get(tx_hash) is None and self.txo.get(tx_hash) is None:
- self.print_error("removing unreferenced tx", tx_hash)
- self.transactions.pop(tx_hash)
- # load spent_outpoints
- _spent_outpoints ='spent_outpoints', {})
- self.spent_outpoints = defaultdict(dict)
- for prevout_hash, d in _spent_outpoints.items():
- for prevout_n_str, spending_txid in d.items():
- prevout_n = int(prevout_n_str)
- self.spent_outpoints[prevout_hash][prevout_n] = spending_txid
- @profiler
- def load_local_history(self):
- self._history_local = {} # address -> set(txid)
- for txid in itertools.chain(self.txi, self.txo):
- self._add_tx_to_local_history(txid)
- def remove_local_transactions_we_dont_have(self):
- txid_set = set(self.txi) | set(self.txo)
- for txid in txid_set:
- tx_height = self.get_tx_height(txid)[0]
- if tx_height == TX_HEIGHT_LOCAL and txid not in self.transactions:
- self.remove_transaction(txid)
- @profiler
- def save_transactions(self, write=False):
- with self.transaction_lock:
- tx = {}
- for k,v in self.transactions.items():
- tx[k] = str(v)
-'transactions', tx)
-'txi', self.txi)
-'txo', self.txo)
-'tx_fees', self.tx_fees)
-'addr_history', self.history)
-'spent_outpoints', self.spent_outpoints)
- if write:
- def save_verified_tx(self, write=False):
- with self.lock:
-'verified_tx3', self.verified_tx)
- if write:
- def clear_history(self):
- with self.lock:
- with self.transaction_lock:
- self.txi = {}
- self.txo = {}
- self.tx_fees = {}
- self.spent_outpoints = defaultdict(dict)
- self.history = {}
- self.verified_tx = {}
- self.transactions = {}
- self.save_transactions()
- @profiler
def check_history(self):
save = False
hist_addrs_mine = list(filter(lambda k: self.is_mine(k), self.history.keys()))
hist_addrs_not_mine = list(filter(lambda k: not self.is_mine(k), self.history.keys()))
for addr in hist_addrs_not_mine:
save = True
for addr in hist_addrs_mine:
hist = self.history[addr]
for tx_hash, tx_height in hist:
if self.txi.get(tx_hash) or self.txo.get(tx_hash):
@@ -358,19 +245,6 @@ class Abstract_Wallet(PrintError):
def is_deterministic(self):
return self.keystore.is_deterministic()
- def set_up_to_date(self, up_to_date):
- with self.lock:
- self.up_to_date = up_to_date
- if up_to_date:
- self.save_transactions(write=True)
- # if the verifier is also up to date, persist that too;
- # otherwise it will persist its results when it finishes
- if self.verifier and self.verifier.is_up_to_date():
- self.save_verified_tx(write=True)
- def is_up_to_date(self):
- with self.lock: return self.up_to_date
def set_label(self, name, text = None):
changed = False
old_text = self.labels.get(name)
@@ -441,64 +315,6 @@ class Abstract_Wallet(PrintError):
def get_public_keys(self, address):
return [self.get_public_key(address)]
- def add_unverified_tx(self, tx_hash, tx_height):
- and tx_hash in self.verified_tx:
- with self.lock:
- self.verified_tx.pop(tx_hash)
- if self.verifier:
- self.verifier.remove_spv_proof_for_tx(tx_hash)
- # tx will be verified only if height > 0
- if tx_hash not in self.verified_tx:
- with self.lock:
- self.unverified_tx[tx_hash] = tx_height
- def add_verified_tx(self, tx_hash, info):
- # Remove from the unverified map and add to the verified map
- with self.lock:
- self.unverified_tx.pop(tx_hash, None)
- self.verified_tx[tx_hash] = info # (tx_height, timestamp, pos)
- height, conf, timestamp = self.get_tx_height(tx_hash)
-'verified', tx_hash, height, conf, timestamp)
- def get_unverified_txs(self):
- '''Returns a map from tx hash to transaction height'''
- with self.lock:
- return dict(self.unverified_tx) # copy
- def undo_verifications(self, blockchain, height):
- '''Used by the verifier when a reorg has happened'''
- txs = set()
- with self.lock:
- for tx_hash, item in list(self.verified_tx.items()):
- tx_height, timestamp, pos = item
- if tx_height >= height:
- header = blockchain.read_header(tx_height)
- # fixme: use block hash, not timestamp
- if not header or header.get('timestamp') != timestamp:
- self.verified_tx.pop(tx_hash, None)
- txs.add(tx_hash)
- return txs
- def get_local_height(self):
- """ return last known height if we are offline """
- return if else'stored_height', 0)
- def get_tx_height(self, tx_hash):
- """ Given a transaction, returns (height, conf, timestamp) """
- with self.lock:
- if tx_hash in self.verified_tx:
- height, timestamp, pos = self.verified_tx[tx_hash]
- conf = max(self.get_local_height() - height + 1, 0)
- return height, conf, timestamp
- elif tx_hash in self.unverified_tx:
- height = self.unverified_tx[tx_hash]
- return height, 0, None
- else:
- # local transaction
- return TX_HEIGHT_LOCAL, 0, None
def get_txpos(self, tx_hash):
"return position, even if the tx is unverified"
with self.lock:
@@ -757,24 +573,6 @@ class Abstract_Wallet(PrintError):
h.append((tx_hash, tx_height))
return h
- def _add_tx_to_local_history(self, txid):
- with self.transaction_lock:
- for addr in itertools.chain(self.txi.get(txid, []), self.txo.get(txid, [])):
- cur_hist = self._history_local.get(addr, set())
- cur_hist.add(txid)
- self._history_local[addr] = cur_hist
- def _remove_tx_from_local_history(self, txid):
- with self.transaction_lock:
- for addr in itertools.chain(self.txi.get(txid, []), self.txo.get(txid, [])):
- cur_hist = self._history_local.get(addr, set())
- try:
- cur_hist.remove(txid)
- except KeyError:
- pass
- else:
- self._history_local[addr] = cur_hist
def get_txin_address(self, txi):
addr = txi.get('address')
if addr and addr != "(pubkey)":
@@ -798,235 +596,6 @@ class Abstract_Wallet(PrintError):
addr = None
return addr
- def get_conflicting_transactions(self, tx):
- """Returns a set of transaction hashes from the wallet history that are
- directly conflicting with tx, i.e. they have common outpoints being
- spent with tx. If the tx is already in wallet history, that will not be
- reported as a conflict.
- """
- conflicting_txns = set()
- with self.transaction_lock:
- for txin in tx.inputs():
- if txin['type'] == 'coinbase':
- continue
- prevout_hash = txin['prevout_hash']
- prevout_n = txin['prevout_n']
- spending_tx_hash = self.spent_outpoints[prevout_hash].get(prevout_n)
- if spending_tx_hash is None:
- continue
- # this outpoint has already been spent, by spending_tx
- assert spending_tx_hash in self.transactions
- conflicting_txns |= {spending_tx_hash}
- txid = tx.txid()
- if txid in conflicting_txns:
- # this tx is already in history, so it conflicts with itself
- if len(conflicting_txns) > 1:
- raise Exception('Found conflicting transactions already in wallet history.')
- conflicting_txns -= {txid}
- return conflicting_txns
- def add_transaction(self, tx_hash, tx, allow_unrelated=False):
- assert tx_hash, tx_hash
- assert tx, tx
- assert tx.is_complete()
- # we need self.transaction_lock but get_tx_height will take self.lock
- # so we need to take that too here, to enforce order of locks
- with self.lock, self.transaction_lock:
- # NOTE: returning if tx in self.transactions might seem like a good idea
- # BUT we track is_mine inputs in a txn, and during subsequent calls
- # of add_transaction tx, we might learn of more-and-more inputs of
- # being is_mine, as we roll the gap_limit forward
- is_coinbase = tx.inputs()[0]['type'] == 'coinbase'
- tx_height = self.get_tx_height(tx_hash)[0]
- if not allow_unrelated:
- # note that during sync, if the transactions are not properly sorted,
- # it could happen that we think tx is unrelated but actually one of the inputs is is_mine.
- # this is the main motivation for allow_unrelated
- is_mine = any([self.is_mine(self.get_txin_address(txin)) for txin in tx.inputs()])
- is_for_me = any([self.is_mine(self.get_txout_address(txo)) for txo in tx.outputs()])
- if not is_mine and not is_for_me:
- raise UnrelatedTransactionException()
- # Find all conflicting transactions.
- # In case of a conflict,
- # 1. confirmed > mempool > local
- # 2. this new txn has priority over existing ones
- # When this method exits, there must NOT be any conflict, so
- # either keep this txn and remove all conflicting (along with dependencies)
- # or drop this txn
- conflicting_txns = self.get_conflicting_transactions(tx)
- if conflicting_txns:
- existing_mempool_txn = any(
- self.get_tx_height(tx_hash2)[0] in (TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_UNCONF_PARENT)
- for tx_hash2 in conflicting_txns)
- existing_confirmed_txn = any(
- self.get_tx_height(tx_hash2)[0] > 0
- for tx_hash2 in conflicting_txns)
- if existing_confirmed_txn and tx_height <= 0:
- # this is a non-confirmed tx that conflicts with confirmed txns; drop.
- return False
- if existing_mempool_txn and tx_height == TX_HEIGHT_LOCAL:
- # this is a local tx that conflicts with non-local txns; drop.
- return False
- # keep this txn and remove all conflicting
- to_remove = set()
- to_remove |= conflicting_txns
- for conflicting_tx_hash in conflicting_txns:
- to_remove |= self.get_depending_transactions(conflicting_tx_hash)
- for tx_hash2 in to_remove:
- self.remove_transaction(tx_hash2)
- # add inputs
- def add_value_from_prev_output():
- dd = self.txo.get(prevout_hash, {})
- # note: this nested loop takes linear time in num is_mine outputs of prev_tx
- for addr, outputs in dd.items():
- # note: instead of [(n, v, is_cb), ...]; we could store: {n -> (v, is_cb)}
- for n, v, is_cb in outputs:
- if n == prevout_n:
- if addr and self.is_mine(addr):
- if d.get(addr) is None:
- d[addr] = set()
- d[addr].add((ser, v))
- return
- self.txi[tx_hash] = d = {}
- for txi in tx.inputs():
- if txi['type'] == 'coinbase':
- continue
- prevout_hash = txi['prevout_hash']
- prevout_n = txi['prevout_n']
- ser = prevout_hash + ':%d' % prevout_n
- self.spent_outpoints[prevout_hash][prevout_n] = tx_hash
- add_value_from_prev_output()
- # add outputs
- self.txo[tx_hash] = d = {}
- for n, txo in enumerate(tx.outputs()):
- v = txo[2]
- ser = tx_hash + ':%d'%n
- addr = self.get_txout_address(txo)
- if addr and self.is_mine(addr):
- if d.get(addr) is None:
- d[addr] = []
- d[addr].append((n, v, is_coinbase))
- # give v to txi that spends me
- next_tx = self.spent_outpoints[tx_hash].get(n)
- if next_tx is not None:
- dd = self.txi.get(next_tx, {})
- if dd.get(addr) is None:
- dd[addr] = set()
- if (ser, v) not in dd[addr]:
- dd[addr].add((ser, v))
- self._add_tx_to_local_history(next_tx)
- # add to local history
- self._add_tx_to_local_history(tx_hash)
- # save
- self.transactions[tx_hash] = tx
- return True
- def remove_transaction(self, tx_hash):
- def remove_from_spent_outpoints():
- # undo spends in spent_outpoints
- if tx is not None: # if we have the tx, this branch is faster
- for txin in tx.inputs():
- if txin['type'] == 'coinbase':
- continue
- prevout_hash = txin['prevout_hash']
- prevout_n = txin['prevout_n']
- self.spent_outpoints[prevout_hash].pop(prevout_n, None)
- if not self.spent_outpoints[prevout_hash]:
- self.spent_outpoints.pop(prevout_hash)
- else: # expensive but always works
- for prevout_hash, d in list(self.spent_outpoints.items()):
- for prevout_n, spending_txid in d.items():
- if spending_txid == tx_hash:
- self.spent_outpoints[prevout_hash].pop(prevout_n, None)
- if not self.spent_outpoints[prevout_hash]:
- self.spent_outpoints.pop(prevout_hash)
- # Remove this tx itself; if nothing spends from it.
- # It is not so clear what to do if other txns spend from it, but it will be
- # removed when those other txns are removed.
- if not self.spent_outpoints[tx_hash]:
- self.spent_outpoints.pop(tx_hash)
- with self.transaction_lock:
- self.print_error("removing tx from history", tx_hash)
- tx = self.transactions.pop(tx_hash, None)
- remove_from_spent_outpoints()
- self._remove_tx_from_local_history(tx_hash)
- self.txi.pop(tx_hash, None)
- self.txo.pop(tx_hash, None)
- def receive_tx_callback(self, tx_hash, tx, tx_height):
- self.add_unverified_tx(tx_hash, tx_height)
- self.add_transaction(tx_hash, tx, allow_unrelated=True)
- def receive_history_callback(self, addr, hist, tx_fees):
- with self.lock:
- old_hist = self.get_address_history(addr)
- for tx_hash, height in old_hist:
- if (tx_hash, height) not in hist:
- # make tx local
- self.unverified_tx.pop(tx_hash, None)
- self.verified_tx.pop(tx_hash, None)
- if self.verifier:
- self.verifier.remove_spv_proof_for_tx(tx_hash)
- self.history[addr] = hist
- for tx_hash, tx_height in hist:
- # add it in case it was previously unconfirmed
- self.add_unverified_tx(tx_hash, tx_height)
- # if addr is new, we have to recompute txi and txo
- tx = self.transactions.get(tx_hash)
- if tx is None:
- continue
- self.add_transaction(tx_hash, tx, allow_unrelated=True)
- # Store fees
- self.tx_fees.update(tx_fees)
- def get_history(self, domain=None):
- # get domain
- if domain is None:
- domain = self.get_addresses()
- domain = set(domain)
- # 1. Get the history of each address in the domain, maintain the
- # delta of a tx as the sum of its deltas on domain addresses
- tx_deltas = defaultdict(int)
- for addr in domain:
- h = self.get_address_history(addr)
- for tx_hash, height in h:
- delta = self.get_tx_delta(tx_hash, addr)
- if delta is None or tx_deltas[tx_hash] is None:
- tx_deltas[tx_hash] = None
- else:
- tx_deltas[tx_hash] += delta
- # 2. create sorted history
- history = []
- for tx_hash in tx_deltas:
- delta = tx_deltas[tx_hash]
- height, conf, timestamp = self.get_tx_height(tx_hash)
- history.append((tx_hash, height, conf, timestamp, delta))
- history.sort(key = lambda x: self.get_txpos(x[0]))
- history.reverse()
- # 3. add balance
- c, u, x = self.get_balance(domain)
- balance = c + u + x
- h2 = []
- for tx_hash, height, conf, timestamp, delta in history:
- h2.append((tx_hash, height, conf, timestamp, delta, balance))
- if balance is None or delta is None:
- balance = None
- else:
- balance -= delta
- h2.reverse()
- # fixme: this may happen if history is incomplete
- if balance not in [None, 0]:
- self.print_error("Error: history not synchronized")
- return []
- return h2
def balance_at_timestamp(self, domain, target_timestamp):
h = self.get_history(domain)
for tx_hash, height, conf, timestamp, value, balance in h:
@@ -1285,36 +854,6 @@ class Abstract_Wallet(PrintError):
return True
return False
- def load_unverified_transactions(self):
- # review transactions that are in the history
- for addr, hist in self.history.items():
- for tx_hash, tx_height in hist:
- # add it in case it was previously unconfirmed
- self.add_unverified_tx(tx_hash, tx_height)
- def start_threads(self, network):
- = network
- if is not None:
- self.verifier = SPV(, self)
- self.synchronizer = Synchronizer(self, network)
- network.add_jobs([self.verifier, self.synchronizer])
- else:
- self.verifier = None
- self.synchronizer = None
- def stop_threads(self):
- if
-[self.synchronizer, self.verifier])
- self.synchronizer.release()
- self.synchronizer = None
- self.verifier = None
- # Now no references to the synchronizer or verifier
- # remain so they will be GC-ed
-'stored_height', self.get_local_height())
- self.save_transactions()
- self.save_verified_tx()
def wait_until_synchronized(self, callback=None):
def wait_for_wallet():
@@ -1605,7 +1144,7 @@ class Abstract_Wallet(PrintError):
expiration = 0
conf = None
if amount:
- if self.up_to_date:
+ if self.is_up_to_date():
paid, conf = self.get_payment_status(address, amount)
status = PR_PAID if paid else PR_UNPAID
if status == PR_UNPAID and expiration is not None and time.time() > timestamp + expiration:
@@ -1700,12 +1239,6 @@ class Abstract_Wallet(PrintError):
def can_delete_address(self):
return False
- def add_address(self, address):
- if address not in self.history:
- self.history[address] = []
- if self.synchronizer:
- self.synchronizer.add(address)
def has_password(self):
return self.has_keystore_encryption() or self.has_storage_encryption()