electrum

Electrum Bitcoin wallet
git clone https://git.parazyd.org/electrum
Log | Files | Refs | Submodules

synchronizer.py (13823B)


      1 #!/usr/bin/env python
      2 #
      3 # Electrum - lightweight Bitcoin client
      4 # Copyright (C) 2014 Thomas Voegtlin
      5 #
      6 # Permission is hereby granted, free of charge, to any person
      7 # obtaining a copy of this software and associated documentation files
      8 # (the "Software"), to deal in the Software without restriction,
      9 # including without limitation the rights to use, copy, modify, merge,
     10 # publish, distribute, sublicense, and/or sell copies of the Software,
     11 # and to permit persons to whom the Software is furnished to do so,
     12 # subject to the following conditions:
     13 #
     14 # The above copyright notice and this permission notice shall be
     15 # included in all copies or substantial portions of the Software.
     16 #
     17 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
     18 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
     19 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
     20 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
     21 # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
     22 # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
     23 # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
     24 # SOFTWARE.
     25 import asyncio
     26 import hashlib
     27 from typing import Dict, List, TYPE_CHECKING, Tuple, Set
     28 from collections import defaultdict
     29 import logging
     30 
     31 from aiorpcx import TaskGroup, run_in_thread, RPCError
     32 
     33 from . import util
     34 from .transaction import Transaction, PartialTransaction
     35 from .util import bh2u, make_aiohttp_session, NetworkJobOnDefaultServer, random_shuffled_copy
     36 from .bitcoin import address_to_scripthash, is_address
     37 from .logging import Logger
     38 from .interface import GracefulDisconnect, NetworkTimeout
     39 
     40 if TYPE_CHECKING:
     41     from .network import Network
     42     from .address_synchronizer import AddressSynchronizer
     43 
     44 
     45 class SynchronizerFailure(Exception): pass
     46 
     47 
     48 def history_status(h):
     49     if not h:
     50         return None
     51     status = ''
     52     for tx_hash, height in h:
     53         status += tx_hash + ':%d:' % height
     54     return bh2u(hashlib.sha256(status.encode('ascii')).digest())
     55 
     56 
     57 class SynchronizerBase(NetworkJobOnDefaultServer):
     58     """Subscribe over the network to a set of addresses, and monitor their statuses.
     59     Every time a status changes, run a coroutine provided by the subclass.
     60     """
     61     def __init__(self, network: 'Network'):
     62         self.asyncio_loop = network.asyncio_loop
     63         self._reset_request_counters()
     64 
     65         NetworkJobOnDefaultServer.__init__(self, network)
     66 
     67     def _reset(self):
     68         super()._reset()
     69         self.requested_addrs = set()
     70         self.scripthash_to_address = {}
     71         self._processed_some_notifications = False  # so that we don't miss them
     72         self._reset_request_counters()
     73         # Queues
     74         self.add_queue = asyncio.Queue()
     75         self.status_queue = asyncio.Queue()
     76 
     77     async def _run_tasks(self, *, taskgroup):
     78         await super()._run_tasks(taskgroup=taskgroup)
     79         try:
     80             async with taskgroup as group:
     81                 await group.spawn(self.send_subscriptions())
     82                 await group.spawn(self.handle_status())
     83                 await group.spawn(self.main())
     84         finally:
     85             # we are being cancelled now
     86             # TODO: libbitcoin (this would be handled by zeromq.Client.stop()
     87             print("self.session.unsubscribe(self.status_queue)")
     88 
     89     def _reset_request_counters(self):
     90         self._requests_sent = 0
     91         self._requests_answered = 0
     92 
     93     def add(self, addr):
     94         asyncio.run_coroutine_threadsafe(self._add_address(addr), self.asyncio_loop)
     95 
     96     async def _add_address(self, addr: str):
     97         # note: this method is async as add_queue.put_nowait is not thread-safe.
     98         if not is_address(addr): raise ValueError(f"invalid bitcoin address {addr}")
     99         if addr in self.requested_addrs: return
    100         self.requested_addrs.add(addr)
    101         self.add_queue.put_nowait(addr)
    102 
    103     async def _on_address_status(self, addr, status):
    104         """Handle the change of the status of an address."""
    105         raise NotImplementedError()  # implemented by subclasses
    106 
    107     async def send_subscriptions(self):
    108         async def subscribe_to_address(addr):
    109             h = address_to_scripthash(addr)
    110             self.scripthash_to_address[h] = addr
    111             self._requests_sent += 1
    112             async with self._network_request_semaphore:
    113                 # TODO: libbitcoin
    114                 print("await self.session.subscribe('blockchain.scripthash.subscribe', [h], self.status_queue)")
    115                 # TODO: libbitcoin XXX: Review this, it's probably incorrect
    116                 print(f"DEBUG: network: subscribe_to_address: {h}")
    117                 await self.interface.client._subscribe_to_scripthash(h, self.status_queue)
    118             self._requests_answered += 1
    119             self.requested_addrs.remove(addr)
    120 
    121         while True:
    122             addr = await self.add_queue.get()
    123             await self.taskgroup.spawn(subscribe_to_address, addr)
    124 
    125     async def handle_status(self):
    126         while True:
    127             h, status = await self.status_queue.get()
    128             addr = self.scripthash_to_address[h]
    129             await self.taskgroup.spawn(self._on_address_status, addr, status)
    130             self._processed_some_notifications = True
    131 
    132     def num_requests_sent_and_answered(self) -> Tuple[int, int]:
    133         return self._requests_sent, self._requests_answered
    134 
    135     async def main(self):
    136         raise NotImplementedError()  # implemented by subclasses
    137 
    138 
    139 class Synchronizer(SynchronizerBase):
    140     '''The synchronizer keeps the wallet up-to-date with its set of
    141     addresses and their transactions.  It subscribes over the network
    142     to wallet addresses, gets the wallet to generate new addresses
    143     when necessary, requests the transaction history of any addresses
    144     we don't have the full history of, and requests binary transaction
    145     data of any transactions the wallet doesn't have.
    146     '''
    147     def __init__(self, wallet: 'AddressSynchronizer'):
    148         self.wallet = wallet
    149         SynchronizerBase.__init__(self, wallet.network)
    150 
    151     def _reset(self):
    152         super()._reset()
    153         self.requested_tx = {}
    154         self.requested_histories = set()
    155         self._stale_histories = dict()  # type: Dict[str, asyncio.Task]
    156 
    157     def diagnostic_name(self):
    158         return self.wallet.diagnostic_name()
    159 
    160     def is_up_to_date(self):
    161         return (not self.requested_addrs
    162                 and not self.requested_histories
    163                 and not self.requested_tx
    164                 and not self._stale_histories)
    165 
    166     async def _on_address_status(self, addr, status):
    167         history = self.wallet.db.get_addr_history(addr)
    168         if history_status(history) == status:
    169             return
    170         # No point in requesting history twice for the same announced status.
    171         # However if we got announced a new status, we should request history again:
    172         if (addr, status) in self.requested_histories:
    173             return
    174         # request address history
    175         self.requested_histories.add((addr, status))
    176         self._stale_histories.pop(addr, asyncio.Future()).cancel()
    177         h = address_to_scripthash(addr)
    178         self._requests_sent += 1
    179         async with self._network_request_semaphore:
    180             result = await self.interface.get_history_for_scripthash(h)
    181         self._requests_answered += 1
    182         self.logger.info(f"receiving history {addr} {len(result)}")
    183         hist = list(map(lambda item: (item['tx_hash'], item['height']), result))
    184         # tx_fees
    185         tx_fees = [(item['tx_hash'], item.get('fee')) for item in result]
    186         tx_fees = dict(filter(lambda x:x[1] is not None, tx_fees))
    187         # Check that the status corresponds to what was announced
    188         if history_status(hist) != status:
    189             # could happen naturally if history changed between getting status and history (race)
    190             self.logger.info(f"error: status mismatch: {addr}. we'll wait a bit for status update.")
    191             # The server is supposed to send a new status notification, which will trigger a new
    192             # get_history. We shall wait a bit for this to happen, otherwise we disconnect.
    193             async def disconnect_if_still_stale():
    194                 timeout = self.network.get_network_timeout_seconds(NetworkTimeout.Generic)
    195                 await asyncio.sleep(timeout)
    196                 raise SynchronizerFailure(f"timeout reached waiting for addr {addr}: history still stale")
    197             self._stale_histories[addr] = await self.taskgroup.spawn(disconnect_if_still_stale)
    198         else:
    199             self._stale_histories.pop(addr, asyncio.Future()).cancel()
    200             # Store received history
    201             self.wallet.receive_history_callback(addr, hist, tx_fees)
    202             # Request transactions we don't have
    203             await self._request_missing_txs(hist)
    204 
    205         # Remove request; this allows up_to_date to be True
    206         self.requested_histories.discard((addr, status))
    207 
    208     async def _request_missing_txs(self, hist, *, allow_server_not_finding_tx=False):
    209         # "hist" is a list of [tx_hash, tx_height] lists
    210         transaction_hashes = []
    211         for tx_hash, tx_height in hist:
    212             if tx_hash in self.requested_tx:
    213                 continue
    214             tx = self.wallet.db.get_transaction(tx_hash)
    215             if tx and not isinstance(tx, PartialTransaction):
    216                 continue  # already have complete tx
    217             transaction_hashes.append(tx_hash)
    218             self.requested_tx[tx_hash] = tx_height
    219 
    220         if not transaction_hashes: return
    221         async with TaskGroup() as group:
    222             for tx_hash in transaction_hashes:
    223                 await group.spawn(self._get_transaction(tx_hash, allow_server_not_finding_tx=allow_server_not_finding_tx))
    224 
    225     async def _get_transaction(self, tx_hash, *, allow_server_not_finding_tx=False):
    226         self._requests_sent += 1
    227         try:
    228             async with self._network_request_semaphore:
    229                 raw_tx = await self.interface.get_transaction(tx_hash)
    230         except RPCError as e:
    231             # most likely, "No such mempool or blockchain transaction"
    232             if allow_server_not_finding_tx:
    233                 self.requested_tx.pop(tx_hash)
    234                 return
    235             else:
    236                 raise
    237         finally:
    238             self._requests_answered += 1
    239         tx = Transaction(raw_tx)
    240         if tx_hash != tx.txid():
    241             raise SynchronizerFailure(f"received tx does not match expected txid ({tx_hash} != {tx.txid()})")
    242         tx_height = self.requested_tx.pop(tx_hash)
    243         self.wallet.receive_tx_callback(tx_hash, tx, tx_height)
    244         self.logger.info(f"received tx {tx_hash} height: {tx_height} bytes: {len(raw_tx)}")
    245         # callbacks
    246         util.trigger_callback('new_transaction', self.wallet, tx)
    247 
    248     async def main(self):
    249         self.wallet.set_up_to_date(False)
    250         # request missing txns, if any
    251         for addr in random_shuffled_copy(self.wallet.db.get_history()):
    252             history = self.wallet.db.get_addr_history(addr)
    253             # Old electrum servers returned ['*'] when all history for the address
    254             # was pruned. This no longer happens but may remain in old wallets.
    255             if history == ['*']: continue
    256             await self._request_missing_txs(history, allow_server_not_finding_tx=True)
    257         # add addresses to bootstrap
    258         for addr in random_shuffled_copy(self.wallet.get_addresses()):
    259             await self._add_address(addr)
    260         # main loop
    261         while True:
    262             await asyncio.sleep(0.1)
    263             await run_in_thread(self.wallet.synchronize)
    264             up_to_date = self.is_up_to_date()
    265             if (up_to_date != self.wallet.is_up_to_date()
    266                     or up_to_date and self._processed_some_notifications):
    267                 self._processed_some_notifications = False
    268                 if up_to_date:
    269                     self._reset_request_counters()
    270                 self.wallet.set_up_to_date(up_to_date)
    271                 util.trigger_callback('wallet_updated', self.wallet)
    272 
    273 
    274 class Notifier(SynchronizerBase):
    275     """Watch addresses. Every time the status of an address changes,
    276     an HTTP POST is sent to the corresponding URL.
    277     """
    278     def __init__(self, network):
    279         SynchronizerBase.__init__(self, network)
    280         self.watched_addresses = defaultdict(list)  # type: Dict[str, List[str]]
    281         self._start_watching_queue = asyncio.Queue()  # type: asyncio.Queue[Tuple[str, str]]
    282 
    283     async def main(self):
    284         # resend existing subscriptions if we were restarted
    285         for addr in self.watched_addresses:
    286             await self._add_address(addr)
    287         # main loop
    288         while True:
    289             addr, url = await self._start_watching_queue.get()
    290             self.watched_addresses[addr].append(url)
    291             await self._add_address(addr)
    292 
    293     async def start_watching_addr(self, addr: str, url: str):
    294         await self._start_watching_queue.put((addr, url))
    295 
    296     async def stop_watching_addr(self, addr: str):
    297         self.watched_addresses.pop(addr, None)
    298         # TODO blockchain.scripthash.unsubscribe
    299 
    300     async def _on_address_status(self, addr, status):
    301         if addr not in self.watched_addresses:
    302             return
    303         self.logger.info(f'new status for addr {addr}')
    304         headers = {'content-type': 'application/json'}
    305         data = {'address': addr, 'status': status}
    306         for url in self.watched_addresses[addr]:
    307             try:
    308                 async with make_aiohttp_session(proxy=self.network.proxy, headers=headers) as session:
    309                     async with session.post(url, json=data, headers=headers) as resp:
    310                         await resp.text()
    311             except Exception as e:
    312                 self.logger.info(repr(e))
    313             else:
    314                 self.logger.info(f'Got Response for {addr}')