commit b6b8e2b59c752401bbae754be7a380cbd203255c
parent bd0f5497d09c336ca58688672f4b941c5ac9c688
Author: parazyd <parazyd@dyne.org>
Date: Thu, 8 Apr 2021 14:50:40 +0200
Implement blockchain.headers.subscribe
Diffstat:
3 files changed, 104 insertions(+), 26 deletions(-)
diff --git a/electrumobelisk/protocol.py b/electrumobelisk/protocol.py
@@ -24,6 +24,7 @@ from binascii import unhexlify
from electrumobelisk.hashes import double_sha256, hash_to_hex_str
from electrumobelisk.merkle import merkle_branch
from electrumobelisk.util import (
+ block_to_header,
is_boolean,
is_hash256_str,
is_hex_str,
@@ -60,6 +61,9 @@ class ElectrumProtocol(asyncio.Protocol): # pylint: disable=R0904,R0902
self.version_called = False
# Consider renaming bx to something else
self.bx = Client(log, endpoints, self.loop)
+ self.block_queue = None
+ # TODO: Clean up on client disconnect
+ self.tasks = []
if chain == "mainnet":
self.genesis = "000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f"
@@ -117,6 +121,7 @@ class ElectrumProtocol(asyncio.Protocol): # pylint: disable=R0904,R0902
data = await reader.read(4096)
if not data or len(data) == 0:
self.log.debug("Received EOF, disconnect")
+ # TODO: cancel asyncio tasks for this client here?
return
recv_buf.extend(data)
lb = recv_buf.find(b"\n")
@@ -135,6 +140,13 @@ class ElectrumProtocol(asyncio.Protocol): # pylint: disable=R0904,R0902
self.log.debug("=> " + line)
await self.handle_query(writer, query)
+ async def _send_notification(self, writer, method, params):
+ """Send JSON-RPC notification to given writer"""
+ response = {"jsonrpc": "2.0", "method": method, "params": params}
+ self.log.debug("<= %s", response)
+ writer.write(json.dumps(response).encode("utf-8") + b"\n")
+ await writer.drain()
+
async def _send_response(self, writer, result, nid):
"""Send successful JSON-RPC response to given writer"""
response = {"jsonrpc": "2.0", "result": result, "id": nid}
@@ -169,10 +181,10 @@ class ElectrumProtocol(asyncio.Protocol): # pylint: disable=R0904,R0902
if not func:
self.log.error("Unhandled method %s, query=%s", method, query)
return
- resp = await func(query)
+ resp = await func(writer, query)
return await self._send_reply(writer, resp, query)
- async def blockchain_block_header(self, query):
+ async def blockchain_block_header(self, writer, query): # pylint: disable=W0613
"""Method: blockchain.block.header
Return the block header at the given height.
"""
@@ -193,7 +205,7 @@ class ElectrumProtocol(asyncio.Protocol): # pylint: disable=R0904,R0902
return {"error": "request corrupted"}
return {"result": safe_hexlify(data)}
- async def blockchain_block_headers(self, query):
+ async def blockchain_block_headers(self, writer, query): # pylint: disable=W0613
"""Method: blockchain.block.headers
Return a concatenated chunk of block headers from the main chain.
"""
@@ -227,7 +239,7 @@ class ElectrumProtocol(asyncio.Protocol): # pylint: disable=R0904,R0902
}
return {"result": resp}
- async def blockchain_estimatefee(self, query): # pylint: disable=W0613
+ async def blockchain_estimatefee(self, writer, query): # pylint: disable=W0613
"""Method: blockchain.estimatefee
Return the estimated transaction fee per kilobyte for a transaction
to be confirmed within a certain number of blocks.
@@ -235,13 +247,41 @@ class ElectrumProtocol(asyncio.Protocol): # pylint: disable=R0904,R0902
# TODO: Help wanted
return {"result": -1}
- async def blockchain_headers_subscribe(self, query):
+ async def header_notifier(self, writer):
+ self.block_queue = asyncio.Queue()
+ await self.bx.subscribe_to_blocks(self.block_queue)
+ while True:
+ # item = (seq, height, block_data)
+ item = await self.block_queue.get()
+ if len(item) != 3:
+ self.log.debug("error: item from block queue len != 3")
+ continue
+
+ header = block_to_header(item[2])
+ params = [{"height": item[1], "hex": safe_hexlify(header)}]
+ await self._send_notification(writer,
+ "blockchain.headers.subscribe",
+ params)
+
+ async def blockchain_headers_subscribe(self, writer, query): # pylint: disable=W0613
"""Method: blockchain.headers.subscribe
Subscribe to receive block headers when a new block is found.
"""
- return
+ # Tip height and header are returned upon request
+ _ec, height = await self.bx.fetch_last_height()
+ if _ec and _ec != 0:
+ self.log.debug("Got error: %s", repr(_ec))
+ return {"error": "internal error"}
+ _ec, tip_header = await self.bx.fetch_block_header(height)
+ if _ec and _ec != 0:
+ self.log.debug("Got error: %s", repr(_ec))
+ return {"error": "internal error"}
+
+ self.tasks.append(asyncio.create_task(self.header_notifier(writer)))
+ ret = {"height": height, "hex": safe_hexlify(tip_header)}
+ return {"result": ret}
- async def blockchain_relayfee(self, query): # pylint: disable=W0613
+ async def blockchain_relayfee(self, writer, query): # pylint: disable=W0613
"""Method: blockchain.relayfee
Return the minimum fee a low-priority transaction must pay in order
to be accepted to the daemon’s memory pool.
@@ -249,7 +289,7 @@ class ElectrumProtocol(asyncio.Protocol): # pylint: disable=R0904,R0902
# TODO: Help wanted
return {"result": 0.00001}
- async def blockchain_scripthash_get_balance(self, query):
+ async def blockchain_scripthash_get_balance(self, writer, query): # pylint: disable=W0613
"""Method: blockchain.scripthash.get_balance
Return the confirmed and unconfirmed balances of a script hash.
"""
@@ -268,38 +308,38 @@ class ElectrumProtocol(asyncio.Protocol): # pylint: disable=R0904,R0902
ret = {"confirmed": data, "unconfirmed": 0}
return {"result": ret}
- async def blockchain_scripthash_get_history(self, query):
+ async def blockchain_scripthash_get_history(self, writer, query): # pylint: disable=W0613
"""Method: blockchain.scripthash.get_history
Return the confirmed and unconfirmed history of a script hash.
"""
return
- async def blockchain_scripthash_get_mempool(self, query):
+ async def blockchain_scripthash_get_mempool(self, writer, query): # pylint: disable=W0613
"""Method: blockchain.scripthash.get_mempool
Return the unconfirmed transactions of a script hash.
"""
return
- async def blockchain_scripthash_listunspent(self, query):
+ async def blockchain_scripthash_listunspent(self, writer, query): # pylint: disable=W0613
"""Method: blockchain.scripthash.listunspent
Return an ordered list of UTXOs sent to a script hash.
"""
return
- async def blockchain_scripthash_subscribe(self, query):
+ async def blockchain_scripthash_subscribe(self, writer, query): # pylint: disable=W0613
"""Method: blockchain.scripthash.subscribe
Subscribe to a script hash.
"""
return
- async def blockchain_scripthash_unsubscribe(self, query):
+ async def blockchain_scripthash_unsubscribe(self, writer, query): # pylint: disable=W0613
"""Method: blockchain.scripthash.unsubscribe
Unsubscribe from a script hash, preventing future notifications
if its status changes.
"""
return
- async def blockchain_transaction_broadcast(self, query):
+ async def blockchain_transaction_broadcast(self, writer, query): # pylint: disable=W0613
"""Method: blockchain.transaction.broadcast
Broadcast a transaction to the network.
"""
@@ -319,7 +359,7 @@ class ElectrumProtocol(asyncio.Protocol): # pylint: disable=R0904,R0902
txid = double_sha256(rawtx)
return {"result": hash_to_hex_str(txid)}
- async def blockchain_transaction_get(self, query):
+ async def blockchain_transaction_get(self, writer, query): # pylint: disable=W0613
"""Method: blockchain.transaction.get
Return a raw transaction.
"""
@@ -342,7 +382,7 @@ class ElectrumProtocol(asyncio.Protocol): # pylint: disable=R0904,R0902
return {"result", safe_hexlify(rawtx)}
- async def blockchain_transaction_get_merkle(self, query):
+ async def blockchain_transaction_get_merkle(self, writer, query): # pylint: disable=W0613
"""Method: blockchain.transaction.get_merkle
Return the merkle branch to a confirmed transaction given its
hash and height.
@@ -374,7 +414,7 @@ class ElectrumProtocol(asyncio.Protocol): # pylint: disable=R0904,R0902
}
return {"result": res}
- async def blockchain_transaction_from_pos(self, query): # pylint: disable=R0911
+ async def blockchain_transaction_from_pos(self, writer, query): # pylint: disable=R0911,W0613
"""Method: blockchain.transaction.id_from_pos
Return a transaction hash and optionally a merkle proof, given a
block height and a position in the block.
@@ -409,7 +449,7 @@ class ElectrumProtocol(asyncio.Protocol): # pylint: disable=R0904,R0902
branch = merkle_branch(hashes, tx_pos)
return {"result": {"tx_hash": txid, "merkle": branch}}
- async def mempool_get_fee_histogram(self, query): # pylint: disable=W0613
+ async def mempool_get_fee_histogram(self, writer, query): # pylint: disable=W0613
"""Method: mempool.get_fee_histogram
Return a histogram of the fee rates paid by transactions in the
memory pool, weighted by transaction size.
@@ -417,7 +457,7 @@ class ElectrumProtocol(asyncio.Protocol): # pylint: disable=R0904,R0902
# TODO: Help wanted
return {"result": [[0, 0]]}
- async def server_add_peer(self, query): # pylint: disable=W0613
+ async def server_add_peer(self, writer, query): # pylint: disable=W0613
"""Method: server.add_peer
A newly-started server uses this call to get itself into other
servers’ peers lists. It should not be used by wallet clients.
@@ -425,19 +465,19 @@ class ElectrumProtocol(asyncio.Protocol): # pylint: disable=R0904,R0902
# TODO: Help wanted
return {"result": False}
- async def server_banner(self, query): # pylint: disable=W0613
+ async def server_banner(self, writer, query): # pylint: disable=W0613
"""Method: server.banner
Return a banner to be shown in the Electrum console.
"""
return {"result": BANNER}
- async def server_donation_address(self, query): # pylint: disable=W0613
+ async def server_donation_address(self, writer, query): # pylint: disable=W0613
"""Method: server.donation_address
Return a server donation address.
"""
return {"result": DONATION_ADDR}
- async def server_features(self, query):
+ async def server_features(self, writer, query): # pylint: disable=W0613
"""Method: server.features
Return a list of features and services supported by the server.
"""
@@ -461,7 +501,7 @@ class ElectrumProtocol(asyncio.Protocol): # pylint: disable=R0904,R0902
}
}
- async def server_peers_subscribe(self, query): # pylint: disable=W0613
+ async def server_peers_subscribe(self, writer, query): # pylint: disable=W0613
"""Method: server.peers.subscribe
Return a list of peer servers. Despite the name this is not a
subscription and the server must send no notifications.
@@ -469,7 +509,7 @@ class ElectrumProtocol(asyncio.Protocol): # pylint: disable=R0904,R0902
# TODO: Help wanted
return {"result": []}
- async def server_ping(self, query): # pylint: disable=W0613
+ async def server_ping(self, writer, query): # pylint: disable=W0613
"""Method: server.ping
Ping the server to ensure it is responding, and to keep the session
alive. The server may disconnect clients that have sent no requests
@@ -477,7 +517,7 @@ class ElectrumProtocol(asyncio.Protocol): # pylint: disable=R0904,R0902
"""
return {"result": None}
- async def server_version(self, query):
+ async def server_version(self, writer, query): # pylint: disable=W0613
"""Method: server.version
Identify the client to the server and negotiate the protocol version.
Only the first server.version() message is accepted.
diff --git a/electrumobelisk/util.py b/electrumobelisk/util.py
@@ -72,3 +72,18 @@ def bh2u(val):
'01020A'
"""
return val.hex()
+
+
+def block_to_header(block):
+ """Return block header from raw block"""
+ if not isinstance(block, (bytes, bytearray)):
+ raise ValueError("block is not of type bytes/bytearray")
+ # TODO: check endianness
+ block_header = block[:80]
+ # version = block_header[:4]
+ # prev_merkle_root = block_header[4:36]
+ # merkle_root = block_header[36:68]
+ # timestamp = block_header[68:72]
+ # bits = block_header[72:76]
+ # nonce = block_header[76:80]
+ return block_header
diff --git a/electrumobelisk/zeromq.py b/electrumobelisk/zeromq.py
@@ -205,7 +205,7 @@ class RequestCollection:
self._handle_response(response)
else:
print(
- f"DEBUG: RequestCollection unhandled response {response.command}:{response.request_id}"
+ f"DEBUG: RequestCollection unhandled response {response.command}:{response.request_id}" # pylint: disable=C0301
)
def _handle_response(self, response):
@@ -294,6 +294,14 @@ class Client:
assert response.request_id == request.id_
return response.error_code, response.data
+ async def fetch_last_height(self):
+ """Fetch the blockchain tip and return integer height"""
+ command = b"blockchain.fetch_last_height"
+ error_code, data = await self._simple_request(command, b"")
+ if error_code:
+ return error_code, None
+ return error_code, struct.unpack("<I", data)[0]
+
async def fetch_block_header(self, index):
"""Fetch a block header by its height or integer index"""
command = b"blockchain.fetch_block_header"
@@ -373,6 +381,21 @@ class Client:
return error_code, functools.reduce(
lambda accumulator, point: accumulator + point["value"], utxo, 0)
+ async def subscribe_to_blocks(self, queue):
+ asyncio.ensure_future(self._listen_for_blocks(queue))
+ return queue
+
+ async def _listen_for_blocks(self, queue):
+ """Infinite loop for block subscription.
+ Returns raw blocks as they're received.
+ """
+ while True:
+ frame = await self._block_socket.recv_multipart()
+ seq = struct.unpack("<H", frame[0])[0]
+ height = struct.unpack("<I", frame[1])[0]
+ block_data = frame[2]
+ queue.put_nowait((seq, height, block_data))
+
@staticmethod
def __receives_without_spends(history):
return (point for point in history if "spent" not in point)