synchronizer.py (13823B)
1 #!/usr/bin/env python 2 # 3 # Electrum - lightweight Bitcoin client 4 # Copyright (C) 2014 Thomas Voegtlin 5 # 6 # Permission is hereby granted, free of charge, to any person 7 # obtaining a copy of this software and associated documentation files 8 # (the "Software"), to deal in the Software without restriction, 9 # including without limitation the rights to use, copy, modify, merge, 10 # publish, distribute, sublicense, and/or sell copies of the Software, 11 # and to permit persons to whom the Software is furnished to do so, 12 # subject to the following conditions: 13 # 14 # The above copyright notice and this permission notice shall be 15 # included in all copies or substantial portions of the Software. 16 # 17 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 18 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 19 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 20 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS 21 # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 22 # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 23 # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 24 # SOFTWARE. 25 import asyncio 26 import hashlib 27 from typing import Dict, List, TYPE_CHECKING, Tuple, Set 28 from collections import defaultdict 29 import logging 30 31 from aiorpcx import TaskGroup, run_in_thread, RPCError 32 33 from . import util 34 from .transaction import Transaction, PartialTransaction 35 from .util import bh2u, make_aiohttp_session, NetworkJobOnDefaultServer, random_shuffled_copy 36 from .bitcoin import address_to_scripthash, is_address 37 from .logging import Logger 38 from .interface import GracefulDisconnect, NetworkTimeout 39 40 if TYPE_CHECKING: 41 from .network import Network 42 from .address_synchronizer import AddressSynchronizer 43 44 45 class SynchronizerFailure(Exception): pass 46 47 48 def history_status(h): 49 if not h: 50 return None 51 status = '' 52 for tx_hash, height in h: 53 status += tx_hash + ':%d:' % height 54 return bh2u(hashlib.sha256(status.encode('ascii')).digest()) 55 56 57 class SynchronizerBase(NetworkJobOnDefaultServer): 58 """Subscribe over the network to a set of addresses, and monitor their statuses. 59 Every time a status changes, run a coroutine provided by the subclass. 60 """ 61 def __init__(self, network: 'Network'): 62 self.asyncio_loop = network.asyncio_loop 63 self._reset_request_counters() 64 65 NetworkJobOnDefaultServer.__init__(self, network) 66 67 def _reset(self): 68 super()._reset() 69 self.requested_addrs = set() 70 self.scripthash_to_address = {} 71 self._processed_some_notifications = False # so that we don't miss them 72 self._reset_request_counters() 73 # Queues 74 self.add_queue = asyncio.Queue() 75 self.status_queue = asyncio.Queue() 76 77 async def _run_tasks(self, *, taskgroup): 78 await super()._run_tasks(taskgroup=taskgroup) 79 try: 80 async with taskgroup as group: 81 await group.spawn(self.send_subscriptions()) 82 await group.spawn(self.handle_status()) 83 await group.spawn(self.main()) 84 finally: 85 # we are being cancelled now 86 # TODO: libbitcoin (this would be handled by zeromq.Client.stop() 87 print("self.session.unsubscribe(self.status_queue)") 88 89 def _reset_request_counters(self): 90 self._requests_sent = 0 91 self._requests_answered = 0 92 93 def add(self, addr): 94 asyncio.run_coroutine_threadsafe(self._add_address(addr), self.asyncio_loop) 95 96 async def _add_address(self, addr: str): 97 # note: this method is async as add_queue.put_nowait is not thread-safe. 98 if not is_address(addr): raise ValueError(f"invalid bitcoin address {addr}") 99 if addr in self.requested_addrs: return 100 self.requested_addrs.add(addr) 101 self.add_queue.put_nowait(addr) 102 103 async def _on_address_status(self, addr, status): 104 """Handle the change of the status of an address.""" 105 raise NotImplementedError() # implemented by subclasses 106 107 async def send_subscriptions(self): 108 async def subscribe_to_address(addr): 109 h = address_to_scripthash(addr) 110 self.scripthash_to_address[h] = addr 111 self._requests_sent += 1 112 async with self._network_request_semaphore: 113 # TODO: libbitcoin 114 print("await self.session.subscribe('blockchain.scripthash.subscribe', [h], self.status_queue)") 115 # TODO: libbitcoin XXX: Review this, it's probably incorrect 116 print(f"DEBUG: network: subscribe_to_address: {h}") 117 await self.interface.client._subscribe_to_scripthash(h, self.status_queue) 118 self._requests_answered += 1 119 self.requested_addrs.remove(addr) 120 121 while True: 122 addr = await self.add_queue.get() 123 await self.taskgroup.spawn(subscribe_to_address, addr) 124 125 async def handle_status(self): 126 while True: 127 h, status = await self.status_queue.get() 128 addr = self.scripthash_to_address[h] 129 await self.taskgroup.spawn(self._on_address_status, addr, status) 130 self._processed_some_notifications = True 131 132 def num_requests_sent_and_answered(self) -> Tuple[int, int]: 133 return self._requests_sent, self._requests_answered 134 135 async def main(self): 136 raise NotImplementedError() # implemented by subclasses 137 138 139 class Synchronizer(SynchronizerBase): 140 '''The synchronizer keeps the wallet up-to-date with its set of 141 addresses and their transactions. It subscribes over the network 142 to wallet addresses, gets the wallet to generate new addresses 143 when necessary, requests the transaction history of any addresses 144 we don't have the full history of, and requests binary transaction 145 data of any transactions the wallet doesn't have. 146 ''' 147 def __init__(self, wallet: 'AddressSynchronizer'): 148 self.wallet = wallet 149 SynchronizerBase.__init__(self, wallet.network) 150 151 def _reset(self): 152 super()._reset() 153 self.requested_tx = {} 154 self.requested_histories = set() 155 self._stale_histories = dict() # type: Dict[str, asyncio.Task] 156 157 def diagnostic_name(self): 158 return self.wallet.diagnostic_name() 159 160 def is_up_to_date(self): 161 return (not self.requested_addrs 162 and not self.requested_histories 163 and not self.requested_tx 164 and not self._stale_histories) 165 166 async def _on_address_status(self, addr, status): 167 history = self.wallet.db.get_addr_history(addr) 168 if history_status(history) == status: 169 return 170 # No point in requesting history twice for the same announced status. 171 # However if we got announced a new status, we should request history again: 172 if (addr, status) in self.requested_histories: 173 return 174 # request address history 175 self.requested_histories.add((addr, status)) 176 self._stale_histories.pop(addr, asyncio.Future()).cancel() 177 h = address_to_scripthash(addr) 178 self._requests_sent += 1 179 async with self._network_request_semaphore: 180 result = await self.interface.get_history_for_scripthash(h) 181 self._requests_answered += 1 182 self.logger.info(f"receiving history {addr} {len(result)}") 183 hist = list(map(lambda item: (item['tx_hash'], item['height']), result)) 184 # tx_fees 185 tx_fees = [(item['tx_hash'], item.get('fee')) for item in result] 186 tx_fees = dict(filter(lambda x:x[1] is not None, tx_fees)) 187 # Check that the status corresponds to what was announced 188 if history_status(hist) != status: 189 # could happen naturally if history changed between getting status and history (race) 190 self.logger.info(f"error: status mismatch: {addr}. we'll wait a bit for status update.") 191 # The server is supposed to send a new status notification, which will trigger a new 192 # get_history. We shall wait a bit for this to happen, otherwise we disconnect. 193 async def disconnect_if_still_stale(): 194 timeout = self.network.get_network_timeout_seconds(NetworkTimeout.Generic) 195 await asyncio.sleep(timeout) 196 raise SynchronizerFailure(f"timeout reached waiting for addr {addr}: history still stale") 197 self._stale_histories[addr] = await self.taskgroup.spawn(disconnect_if_still_stale) 198 else: 199 self._stale_histories.pop(addr, asyncio.Future()).cancel() 200 # Store received history 201 self.wallet.receive_history_callback(addr, hist, tx_fees) 202 # Request transactions we don't have 203 await self._request_missing_txs(hist) 204 205 # Remove request; this allows up_to_date to be True 206 self.requested_histories.discard((addr, status)) 207 208 async def _request_missing_txs(self, hist, *, allow_server_not_finding_tx=False): 209 # "hist" is a list of [tx_hash, tx_height] lists 210 transaction_hashes = [] 211 for tx_hash, tx_height in hist: 212 if tx_hash in self.requested_tx: 213 continue 214 tx = self.wallet.db.get_transaction(tx_hash) 215 if tx and not isinstance(tx, PartialTransaction): 216 continue # already have complete tx 217 transaction_hashes.append(tx_hash) 218 self.requested_tx[tx_hash] = tx_height 219 220 if not transaction_hashes: return 221 async with TaskGroup() as group: 222 for tx_hash in transaction_hashes: 223 await group.spawn(self._get_transaction(tx_hash, allow_server_not_finding_tx=allow_server_not_finding_tx)) 224 225 async def _get_transaction(self, tx_hash, *, allow_server_not_finding_tx=False): 226 self._requests_sent += 1 227 try: 228 async with self._network_request_semaphore: 229 raw_tx = await self.interface.get_transaction(tx_hash) 230 except RPCError as e: 231 # most likely, "No such mempool or blockchain transaction" 232 if allow_server_not_finding_tx: 233 self.requested_tx.pop(tx_hash) 234 return 235 else: 236 raise 237 finally: 238 self._requests_answered += 1 239 tx = Transaction(raw_tx) 240 if tx_hash != tx.txid(): 241 raise SynchronizerFailure(f"received tx does not match expected txid ({tx_hash} != {tx.txid()})") 242 tx_height = self.requested_tx.pop(tx_hash) 243 self.wallet.receive_tx_callback(tx_hash, tx, tx_height) 244 self.logger.info(f"received tx {tx_hash} height: {tx_height} bytes: {len(raw_tx)}") 245 # callbacks 246 util.trigger_callback('new_transaction', self.wallet, tx) 247 248 async def main(self): 249 self.wallet.set_up_to_date(False) 250 # request missing txns, if any 251 for addr in random_shuffled_copy(self.wallet.db.get_history()): 252 history = self.wallet.db.get_addr_history(addr) 253 # Old electrum servers returned ['*'] when all history for the address 254 # was pruned. This no longer happens but may remain in old wallets. 255 if history == ['*']: continue 256 await self._request_missing_txs(history, allow_server_not_finding_tx=True) 257 # add addresses to bootstrap 258 for addr in random_shuffled_copy(self.wallet.get_addresses()): 259 await self._add_address(addr) 260 # main loop 261 while True: 262 await asyncio.sleep(0.1) 263 await run_in_thread(self.wallet.synchronize) 264 up_to_date = self.is_up_to_date() 265 if (up_to_date != self.wallet.is_up_to_date() 266 or up_to_date and self._processed_some_notifications): 267 self._processed_some_notifications = False 268 if up_to_date: 269 self._reset_request_counters() 270 self.wallet.set_up_to_date(up_to_date) 271 util.trigger_callback('wallet_updated', self.wallet) 272 273 274 class Notifier(SynchronizerBase): 275 """Watch addresses. Every time the status of an address changes, 276 an HTTP POST is sent to the corresponding URL. 277 """ 278 def __init__(self, network): 279 SynchronizerBase.__init__(self, network) 280 self.watched_addresses = defaultdict(list) # type: Dict[str, List[str]] 281 self._start_watching_queue = asyncio.Queue() # type: asyncio.Queue[Tuple[str, str]] 282 283 async def main(self): 284 # resend existing subscriptions if we were restarted 285 for addr in self.watched_addresses: 286 await self._add_address(addr) 287 # main loop 288 while True: 289 addr, url = await self._start_watching_queue.get() 290 self.watched_addresses[addr].append(url) 291 await self._add_address(addr) 292 293 async def start_watching_addr(self, addr: str, url: str): 294 await self._start_watching_queue.put((addr, url)) 295 296 async def stop_watching_addr(self, addr: str): 297 self.watched_addresses.pop(addr, None) 298 # TODO blockchain.scripthash.unsubscribe 299 300 async def _on_address_status(self, addr, status): 301 if addr not in self.watched_addresses: 302 return 303 self.logger.info(f'new status for addr {addr}') 304 headers = {'content-type': 'application/json'} 305 data = {'address': addr, 'status': status} 306 for url in self.watched_addresses[addr]: 307 try: 308 async with make_aiohttp_session(proxy=self.network.proxy, headers=headers) as session: 309 async with session.post(url, json=data, headers=headers) as resp: 310 await resp.text() 311 except Exception as e: 312 self.logger.info(repr(e)) 313 else: 314 self.logger.info(f'Got Response for {addr}')