watch_address.py (1249B)
1 #!/usr/bin/env python3 2 3 import sys 4 import asyncio 5 6 from electrum.network import Network 7 from electrum.util import print_msg, create_and_start_event_loop 8 from electrum.synchronizer import SynchronizerBase 9 from electrum.simple_config import SimpleConfig 10 11 12 try: 13 addr = sys.argv[1] 14 except Exception: 15 print("usage: watch_address <bitcoin_address>") 16 sys.exit(1) 17 18 config = SimpleConfig() 19 20 # start network 21 loop = create_and_start_event_loop()[0] 22 network = Network(config) 23 network.start() 24 25 26 class Notifier(SynchronizerBase): 27 def __init__(self, network): 28 SynchronizerBase.__init__(self, network) 29 self.watched_addresses = set() 30 self.watch_queue = asyncio.Queue() 31 32 async def main(self): 33 # resend existing subscriptions if we were restarted 34 for addr in self.watched_addresses: 35 await self._add_address(addr) 36 # main loop 37 while True: 38 addr = await self.watch_queue.get() 39 self.watched_addresses.add(addr) 40 await self._add_address(addr) 41 42 async def _on_address_status(self, addr, status): 43 print_msg(f"addr {addr}, status {status}") 44 45 46 notifier = Notifier(network) 47 asyncio.run_coroutine_threadsafe(notifier.watch_queue.put(addr), loop)