commit 90d32038faf618031241e2ca4ea206ee7504051c
parent 0c37009cdb3cb276096cf14b5dca18b9338f7eaa
Author: ThomasV <thomasv@gitorious>
Date: Thu, 2 Jul 2015 09:25:43 +0200
Merge branch 'kyuupichan-synchronizer-unthread'
Diffstat:
3 files changed, 160 insertions(+), 160 deletions(-)
diff --git a/lib/network_proxy.py b/lib/network_proxy.py
@@ -61,10 +61,13 @@ class NetworkProxy(util.DaemonThread):
self.blockchain_height = 0
self.server_height = 0
self.interfaces = []
+ self.jobs = []
def run(self):
while self.is_running():
+ for job in self.jobs:
+ job()
try:
response = self.pipe.get()
except util.timeout:
diff --git a/lib/synchronizer.py b/lib/synchronizer.py
@@ -17,174 +17,170 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import threading
-import time
-import Queue
+from threading import Lock
-import bitcoin
-import util
+from bitcoin import Hash, hash_encode
from transaction import Transaction
+from util import print_error, print_msg
-class WalletSynchronizer(util.DaemonThread):
+class WalletSynchronizer():
+ '''The synchronizer keeps the wallet up-to-date with its set of
+ addresses and their transactions. It subscribes over the network
+ to wallet addresses, gets the wallet to generate new addresses
+ when necessary, requests the transaction history of any addresses
+ we don't have the full history of, and requests binary transaction
+ data of any transactions the wallet doesn't have.
+
+ External interface: __init__() and add() member functions.
+ '''
def __init__(self, wallet, network):
- util.DaemonThread.__init__(self)
self.wallet = wallet
self.network = network
- self.was_updated = True
- self.queue = Queue.Queue()
- self.address_queue = Queue.Queue()
+ self.new_addresses = set()
+ # Entries are (tx_hash, tx_height) tuples
+ self.requested_tx = set()
+ self.requested_histories = {}
+ self.requested_addrs = set()
+ self.lock = Lock()
+ self.initialize()
+
+ def print_error(self, *msg):
+ print_error("[Synchronizer]", *msg)
+
+ def print_msg(self, *msg):
+ print_msg("[Synchronizer]", *msg)
+
+ def parse_response(self, response):
+ if response.get('error'):
+ self.print_error("response error:", response)
+ return None, None
+ return response['params'], response['result']
+
+ def is_up_to_date(self):
+ return (not self.requested_tx and not self.requested_histories
+ and not self.requested_addrs)
def add(self, address):
- self.address_queue.put(address)
+ '''This can be called from the proxy or GUI threads.'''
+ with self.lock:
+ self.new_addresses.add(address)
def subscribe_to_addresses(self, addresses):
- messages = []
- for addr in addresses:
- messages.append(('blockchain.address.subscribe', [addr]))
- self.network.send(messages, self.queue.put)
-
- def run(self):
- while self.is_running():
- if not self.network.is_connected():
- time.sleep(0.1)
- continue
- self.run_interface()
- self.print_error("stopped")
-
- def run_interface(self):
- #print_error("synchronizer: connected to", self.network.get_parameters())
-
- requested_tx = []
- missing_tx = []
- requested_histories = {}
-
- # request any missing transactions
+ if addresses:
+ self.requested_addrs |= addresses
+ msgs = map(lambda addr: ('blockchain.address.subscribe', [addr]),
+ addresses)
+ self.network.send(msgs, self.addr_subscription_response)
+
+ def addr_subscription_response(self, response):
+ params, result = self.parse_response(response)
+ if not params:
+ return
+ addr = params[0]
+ if addr in self.requested_addrs: # Notifications won't be in
+ self.requested_addrs.remove(addr)
+ history = self.wallet.get_address_history(addr)
+ if self.wallet.get_status(history) != result:
+ if self.requested_histories.get(addr) is None:
+ self.network.send([('blockchain.address.get_history', [addr])],
+ self.addr_history_response)
+ self.requested_histories[addr] = result
+
+ def addr_history_response(self, response):
+ params, result = self.parse_response(response)
+ if not params:
+ return
+ addr = params[0]
+ self.print_error("receiving history", addr, len(result))
+ server_status = self.requested_histories.pop(addr)
+
+ # Check that txids are unique
+ hashes = set(map(lambda item: item['tx_hash'], result))
+ if len(hashes) != len(result):
+ self.print_error("error: server history has non-unique txids: %s"% addr)
+ return
+
+ # Check that the status corresponds to what was announced
+ hist = map(lambda item: (item['tx_hash'], item['height']), result)
+ if self.wallet.get_status(hist) != server_status:
+ self.print_error("error: status mismatch: %s" % addr)
+ return
+
+ # Store received history
+ self.wallet.receive_history_callback(addr, hist)
+
+ # Request transactions we don't have
+ self.request_missing_txs(hist)
+
+ def tx_response(self, response):
+ params, result = self.parse_response(response)
+ if not params:
+ return
+ tx_hash, tx_height = params
+ assert tx_hash == hash_encode(Hash(result.decode('hex')))
+ tx = Transaction(result)
+ try:
+ tx.deserialize()
+ except Exception:
+ self.print_msg("cannot deserialize transaction, skipping", tx_hash)
+ return
+
+ self.wallet.receive_tx_callback(tx_hash, tx, tx_height)
+ self.requested_tx.remove((tx_hash, tx_height))
+ self.print_error("received tx:", tx_hash, len(tx.raw))
+ if not self.requested_tx:
+ self.network.trigger_callback('updated')
+ # Updated gets called too many times from other places as
+ # well; if we used that signal we get the notification
+ # three times
+ self.network.trigger_callback("new_transaction")
+
+ def request_missing_txs(self, hist):
+ # "hist" is a list of [tx_hash, tx_height] lists
+ missing = set()
+ for tx_hash, tx_height in hist:
+ if self.wallet.transactions.get(tx_hash) is None:
+ missing.add((tx_hash, tx_height))
+ missing -= self.requested_tx
+ if missing:
+ requests = [('blockchain.transaction.get', tx) for tx in missing]
+ self.network.send(requests, self.tx_response)
+ self.requested_tx |= missing
+
+ def initialize(self):
+ '''Check the initial state of the wallet. Subscribe to all its
+ addresses, and request any transactions in its address history
+ we don't have.
+ '''
for history in self.wallet.history.values():
- if history == ['*']: continue
- for tx_hash, tx_height in history:
- if self.wallet.transactions.get(tx_hash) is None and (tx_hash, tx_height) not in missing_tx:
- missing_tx.append( (tx_hash, tx_height) )
-
- if missing_tx:
- self.print_error("missing tx", missing_tx)
-
- # subscriptions
- self.subscribe_to_addresses(self.wallet.addresses(True))
-
- while self.is_running():
-
- # 1. create new addresses
- self.wallet.synchronize()
-
- # request missing addresses
- new_addresses = []
- while True:
- try:
- addr = self.address_queue.get(block=False)
- except Queue.Empty:
- break
- new_addresses.append(addr)
- if new_addresses:
- self.subscribe_to_addresses(new_addresses)
-
- # request missing transactions
- for tx_hash, tx_height in missing_tx:
- if (tx_hash, tx_height) not in requested_tx:
- self.network.send([ ('blockchain.transaction.get',[tx_hash, tx_height]) ], self.queue.put)
- requested_tx.append( (tx_hash, tx_height) )
- missing_tx = []
-
- # detect if situation has changed
- if self.network.is_up_to_date() and self.queue.empty():
- if not self.wallet.is_up_to_date():
- self.wallet.set_up_to_date(True)
- self.was_updated = True
- self.wallet.save_transactions()
- else:
- if self.wallet.is_up_to_date():
- self.wallet.set_up_to_date(False)
- self.was_updated = True
-
- if self.was_updated:
- self.network.trigger_callback('updated')
- self.was_updated = False
-
- # 2. get a response
- try:
- r = self.queue.get(timeout=0.1)
- except Queue.Empty:
- continue
-
- # 3. process response
- method = r['method']
- params = r['params']
- result = r.get('result')
- error = r.get('error')
- if error:
- self.print_error("error", r)
+ # Old electrum servers returned ['*'] when all history for
+ # the address was pruned. This no longer happens but may
+ # remain in old wallets.
+ if history == ['*']:
continue
-
- if method == 'blockchain.address.subscribe':
- addr = params[0]
- if self.wallet.get_status(self.wallet.get_address_history(addr)) != result:
- if requested_histories.get(addr) is None:
- self.network.send([('blockchain.address.get_history', [addr])], self.queue.put)
- requested_histories[addr] = result
-
- elif method == 'blockchain.address.get_history':
- addr = params[0]
- self.print_error("receiving history", addr, len(result))
- hist = []
- # check that txids are unique
- txids = []
- for item in result:
- tx_hash = item['tx_hash']
- if tx_hash not in txids:
- txids.append(tx_hash)
- hist.append( (tx_hash, item['height']) )
-
- if len(hist) != len(result):
- self.print_error("error: server sent history with non-unique txid", result)
- continue
-
- # check that the status corresponds to what was announced
- rs = requested_histories.pop(addr)
- if self.wallet.get_status(hist) != rs:
- self.print_error("error: status mismatch: %s" % addr)
- continue
-
- # store received history
- self.wallet.receive_history_callback(addr, hist)
-
- # request transactions that we don't have
- for tx_hash, tx_height in hist:
- if self.wallet.transactions.get(tx_hash) is None:
- if (tx_hash, tx_height) not in requested_tx and (tx_hash, tx_height) not in missing_tx:
- missing_tx.append( (tx_hash, tx_height) )
-
- elif method == 'blockchain.transaction.get':
- tx_hash = params[0]
- tx_height = params[1]
- assert tx_hash == bitcoin.hash_encode(bitcoin.Hash(result.decode('hex')))
- tx = Transaction(result)
- try:
- tx.deserialize()
- except Exception:
- self.print_msg("Warning: Cannot deserialize transactions. skipping")
- continue
-
- self.wallet.receive_tx_callback(tx_hash, tx, tx_height)
- self.was_updated = True
- requested_tx.remove( (tx_hash, tx_height) )
- self.print_error("received tx:", tx_hash, len(tx.raw))
-
- else:
- self.print_error("Error: Unknown message:" + method + ", " + repr(params) + ", " + repr(result) )
-
- if self.was_updated and not requested_tx:
- self.network.trigger_callback('updated')
- # Updated gets called too many times from other places as well; if we use that signal we get the notification three times
- self.network.trigger_callback("new_transaction")
- self.was_updated = False
+ self.request_missing_txs(history)
+
+ if self.requested_tx:
+ self.print_error("missing tx", self.requested_tx)
+ self.subscribe_to_addresses(set(self.wallet.addresses(True)))
+
+ def main_loop(self):
+ '''Called from the network proxy thread main loop.'''
+ # 1. Create new addresses
+ self.wallet.synchronize()
+
+ # 2. Subscribe to new addresses
+ with self.lock:
+ addresses = self.new_addresses
+ self.new_addresses = set()
+ self.subscribe_to_addresses(addresses)
+
+ # 3. Detect if situation has changed
+ up_to_date = self.is_up_to_date()
+ if up_to_date != self.wallet.is_up_to_date():
+ self.wallet.set_up_to_date(up_to_date)
+ if up_to_date:
+ self.wallet.save_transactions()
+ self.network.trigger_callback('updated')
diff --git a/lib/wallet.py b/lib/wallet.py
@@ -1107,15 +1107,16 @@ class Abstract_Wallet(object):
self.verifier.start()
self.set_verifier(self.verifier)
self.synchronizer = WalletSynchronizer(self, network)
- self.synchronizer.start()
+ network.jobs.append(self.synchronizer.main_loop)
else:
self.verifier = None
- self.synchronizer =None
+ self.synchronizer = None
def stop_threads(self):
if self.network:
self.verifier.stop()
- self.synchronizer.stop()
+ self.network.jobs = []
+ self.synchronizer = None
self.storage.put('stored_height', self.get_local_height(), True)
def restore(self, cb):