commit 25e8f42479f13b252cb2923e44919dcac38f2950
parent 95697b92c9c29b27f006815b62dc8eb27522e2b5
Author: parazyd <parazyd@dyne.org>
Date: Fri, 9 Apr 2021 01:23:20 +0200
Implement scripthash subscriptions.
In theory this works, but in practice and v4 - it doesn't.
Needs more investigation.
Diffstat:
2 files changed, 43 insertions(+), 6 deletions(-)
diff --git a/electrumobelisk/protocol.py b/electrumobelisk/protocol.py
@@ -63,7 +63,6 @@ class ElectrumProtocol(asyncio.Protocol): # pylint: disable=R0904,R0902
# Consider renaming bx to something else
self.bx = Client(log, endpoints, self.loop)
self.block_queue = None
- self.tx_queue = None
# TODO: Clean up on client disconnect
self.tasks = []
self.sh_subscriptions = {}
@@ -115,6 +114,9 @@ class ElectrumProtocol(asyncio.Protocol): # pylint: disable=R0904,R0902
"""Destructor function"""
self.log.debug("ElectrumProtocol.stop()")
if self.bx:
+ for i in self.sh_subscriptions:
+ self.log.debug("bx.unsubscribe %s", i)
+ await self.bx.unsubscribe_scripthash(i)
await self.bx.stop()
self.stopped = True
@@ -352,6 +354,18 @@ class ElectrumProtocol(asyncio.Protocol): # pylint: disable=R0904,R0902
})
return {"result": ret}
+ async def scripthash_notifier(self, writer, scripthash):
+ # TODO: Figure out how this actually works
+ _ec, sh_queue = await self.bx.subscribe_scripthash(scripthash)
+ if _ec and ec != 0:
+ self.log.error("bx.subscribe_scripthash failed:", repr(_ec))
+ return
+
+ while True:
+ # item = (seq, height, block_data)
+ item = await sh_queue.get()
+ self.log.debug("sh_subscription item: %s", item)
+
async def blockchain_scripthash_subscribe(self, writer, query): # pylint: disable=W0613
"""Method: blockchain.scripthash.subscribe
Subscribe to a script hash.
@@ -367,8 +381,9 @@ class ElectrumProtocol(asyncio.Protocol): # pylint: disable=R0904,R0902
if _ec and _ec != 0:
return {"error": "request corrupted"}
- # TODO: task for tx subscription
- self.sh_subscriptions[scripthash] = "foo"
+ task = asyncio.create_task(self.scripthash_notifier(
+ writer, scripthash))
+ self.sh_subscriptions[scripthash] = {"task": task}
if len(history) < 1:
return {"result": None}
@@ -380,10 +395,13 @@ class ElectrumProtocol(asyncio.Protocol): # pylint: disable=R0904,R0902
status.append(safe_hexlify(i[kind]["hash"][::-1]))
status.append(str(i[kind]["height"])) # str because of join
+ self.sh_subscriptions[scripthash]["status"] = status
+ return {"result": self.scripthash_status(status)}
+
+ def scripthash_status(self, status):
# TODO: Check if trailing colon is necessary
concat = ":".join(status) + ":"
- res = hash_to_hex_str(sha256(concat.encode()))
- return {"result": res}
+ return hash_to_hex_str(sha256(concat.encode()))
async def blockchain_scripthash_unsubscribe(self, writer, query): # pylint: disable=W0613
"""Method: blockchain.scripthash.unsubscribe
@@ -398,8 +416,11 @@ class ElectrumProtocol(asyncio.Protocol): # pylint: disable=R0904,R0902
return {"error": "invalid scripthash"}
if scripthash in self.sh_subscriptions:
- self.sh_subscriptions[scripthash].cancel()
+ self.sh_subscriptions[scripthash]["task"].cancel()
+ await self.bx.unsubscribe_scripthash(scripthash)
+ del self.sh_subscriptions[scripthash]
return {"result": True}
+
return {"result": False}
async def blockchain_transaction_broadcast(self, writer, query): # pylint: disable=W0613
diff --git a/electrumobelisk/zeromq.py b/electrumobelisk/zeromq.py
@@ -337,6 +337,22 @@ class Client:
return error_code, None
return error_code, bh2u(data)
+ async def subscribe_scripthash(self, scripthash):
+ """Subscribe to scripthash"""
+ command = b"subscribe.key"
+ decoded_address = unhexlify(scripthash)
+ return await self._subscription_request(command, decoded_address)
+
+ async def unsubscribe_scripthash(self, scripthash):
+ """Unsubscribe scripthash"""
+ # TODO: This call should ideally also remove the subscription
+ # request from the RequestCollection.
+ # This call solicits a final call from the server with an
+ # `error::service_stopped` error code.
+ command = b"unsubscribe.key"
+ decoded_address = unhexlify(scripthash)
+ return await self._simple_request(command, decoded_address)
+
async def fetch_history4(self, scripthash, height=0):
"""Fetch history for given scripthash"""
# BUG: There is something strange happening sometimes, for example