obelisk

Electrum server using libbitcoin as its backend
git clone https://git.parazyd.org/obelisk
Log | Files | Refs | README | LICENSE

commit a4c9fd4118e22f7318cc7ac59be9cf4ef3f60572
parent 1007fde5ad844e4ce2c8b517f132f7e45d270000
Author: parazyd <parazyd@dyne.org>
Date:   Tue, 13 Apr 2021 00:01:24 +0200

Rename electrumobelisk to obelisk.

Diffstat:
Delectrumobelisk/merkle.py | 57---------------------------------------------------------
Delectrumobelisk/protocol.py | 665-------------------------------------------------------------------------------
Delectrumobelisk/zeromq.py | 477-------------------------------------------------------------------------------
Dobelisk.py | 109-------------------------------------------------------------------------------
Aobelisk/__init__.py | 0
Relectrumobelisk/errors.py -> obelisk/errors.py | 0
Relectrumobelisk/libbitcoin_errors.py -> obelisk/libbitcoin_errors.py | 0
Aobelisk/merkle.py | 57+++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Aobelisk/obelisk | 2++
Aobelisk/protocol.py | 665+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Relectrumobelisk/util.py -> obelisk/util.py | 0
Aobelisk/zeromq.py | 477+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Arun_obelisk | 109+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
13 files changed, 1310 insertions(+), 1308 deletions(-)

diff --git a/electrumobelisk/merkle.py b/electrumobelisk/merkle.py @@ -1,57 +0,0 @@ -#!/usr/bin/env python3 -# Copyright (C) 2020-2021 Ivan J. <parazyd@dyne.org> -# -# This file is part of obelisk -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License version 3 -# as published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. -"""Module for calculating merkle branches""" -from math import ceil, log - -from electrumobelisk.util import double_sha256 - - -def branch_length(hash_count): - """Return the length of a merkle branch given the number of hashes""" - return ceil(log(hash_count, 2)) - - -def merkle_branch_and_root(hashes, index): - """Return a (merkle branch, merkle_root) pair given hashes, and the - index of one of those hashes. - """ - hashes = list(hashes) - if not isinstance(index, int): - raise TypeError("index must be an integer") - # This also asserts hashes is not empty - if not 0 <= index < len(hashes): - raise ValueError("index out of range") - length = branch_length(len(hashes)) - - branch = [] - for _ in range(length): - if len(hashes) & 1: - hashes.append(hashes[-1]) - branch.append(hashes[index ^ 1]) - index >>= 1 - hashes = [ - double_sha256(hashes[n] + hashes[n + 1]) - for n in range(0, len(hashes), 2) - ] - return branch, hashes[0] - - -def merkle_branch(tx_hashes, tx_pos): - """Return a merkle branch given hashes and the tx position""" - branch, _root = merkle_branch_and_root(tx_hashes, tx_pos) - branch = [bytes(reversed(h)).hex() for h in branch] - return branch diff --git a/electrumobelisk/protocol.py b/electrumobelisk/protocol.py @@ -1,665 +0,0 @@ -#!/usr/bin/env python3 -# Copyright (C) 2020-2021 Ivan J. <parazyd@dyne.org> -# -# This file is part of obelisk -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License version 3 -# as published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. -"""Implementation of the Electrum protocol as found on -https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-methods.html -""" -import asyncio -import json -from binascii import unhexlify - -from electrumobelisk.errors import ERRORS -from electrumobelisk.merkle import merkle_branch -from electrumobelisk.util import ( - bh2u, - block_to_header, - is_boolean, - is_hash256_str, - is_hex_str, - is_non_negative_integer, - safe_hexlify, - sha256, - double_sha256, - hash_to_hex_str, -) -from electrumobelisk.zeromq import Client - -VERSION = "0.0" -SERVER_PROTO_MIN = "1.4" -SERVER_PROTO_MAX = "1.4.2" -DONATION_ADDR = "bc1q7an9p5pz6pjwjk4r48zke2yfaevafzpglg26mz" - -BANNER = (""" -Welcome to obelisk - -"Tools for the people" - -obelisk is a server that uses libbitcoin-server as its backend. -Source code can be found at: https://github.com/parazyd/obelisk - -Please consider donating: %s -""" % DONATION_ADDR) - - -class ElectrumProtocol(asyncio.Protocol): # pylint: disable=R0904,R0902 - """Class implementing the Electrum protocol, with async support""" - def __init__(self, log, chain, endpoints, server_cfg): - self.log = log - self.stopped = False - self.endpoints = endpoints - self.server_cfg = server_cfg - self.loop = asyncio.get_event_loop() - # Consider renaming bx to something else - self.bx = Client(log, endpoints, self.loop) - self.block_queue = None - # TODO: Clean up on client disconnect - self.tasks = [] - self.sh_subscriptions = {} - - if chain == "mainnet": - self.genesis = "000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f" - elif chain == "testnet": - self.genesis = "000000000933ea01ad0ee984209779baaec3ced90fa3f408719526f8d77f4943" - else: - raise ValueError(f"Invalid chain '{chain}'") - - # Here we map available methods to their respective functions - self.methodmap = { - "blockchain.block.header": self.blockchain_block_header, - "blockchain.block.headers": self.blockchain_block_headers, - "blockchain.estimatefee": self.blockchain_estimatefee, - "blockchain.headers.subscribe": self.blockchain_headers_subscribe, - "blockchain.relayfee": self.blockchain_relayfee, - "blockchain.scripthash.get_balance": - self.blockchain_scripthash_get_balance, - "blockchain.scripthash.get_history": - self.blockchain_scripthash_get_history, - "blockchain.scripthash.get_mempool": - self.blockchain_scripthash_get_mempool, - "blockchain.scripthash.listunspent": - self.blockchain_scripthash_listunspent, - "blockchain.scripthash.subscribe": - self.blockchain_scripthash_subscribe, - "blockchain.scripthash.unsubscribe": - self.blockchain_scripthash_unsubscribe, - "blockchain.transaction.broadcast": - self.blockchain_transaction_broadcast, - "blockchain.transaction.get": self.blockchain_transaction_get, - "blockchain.transaction.get_merkle": - self.blockchain_transaction_get_merkle, - "blockchain.transaction.id_from_pos": - self.blockchain_transaction_from_pos, - "mempool.get_fee_histogram": self.mempool_get_fee_histogram, - "server_add_peer": self.server_add_peer, - "server.banner": self.server_banner, - "server.donation_address": self.server_donation_address, - "server.features": self.server_features, - "server.peers.subscribe": self.server_peers_subscribe, - "server.ping": self.server_ping, - "server.version": self.server_version, - } - - async def stop(self): - """Destructor function""" - self.log.debug("ElectrumProtocol.stop()") - if self.bx: - unsub_pool = [] - for i in self.sh_subscriptions: - self.log.debug("bx.unsubscribe %s", i) - unsub_pool.append(self.bx.unsubscribe_scripthash(i)) - await asyncio.gather(*unsub_pool, return_exceptions=True) - await self.bx.stop() - - # idxs = [] - # for task in self.tasks: - # idxs.append(self.tasks.index(task)) - # task.cancel() - # for i in idxs: - # del self.tasks[i] - - self.stopped = True - - async def recv(self, reader, writer): - """Loop ran upon a connection which acts as a JSON-RPC handler""" - recv_buf = bytearray() - while not self.stopped: - data = await reader.read(4096) - if not data or len(data) == 0: - self.log.debug("Received EOF, disconnect") - # TODO: cancel asyncio tasks for this client here? - return - recv_buf.extend(data) - lb = recv_buf.find(b"\n") - if lb == -1: - continue - while lb != -1: - line = recv_buf[:lb].rstrip() - recv_buf = recv_buf[lb + 1:] - lb = recv_buf.find(b"\n") - try: - line = line.decode("utf-8") - query = json.loads(line) - except (UnicodeDecodeError, json.JSONDecodeError) as err: - self.log.debug("Got error: %s", repr(err)) - break - self.log.debug("=> " + line) - await self.handle_query(writer, query) - - async def _send_notification(self, writer, method, params): - """Send JSON-RPC notification to given writer""" - response = {"jsonrpc": "2.0", "method": method, "params": params} - self.log.debug("<= %s", response) - writer.write(json.dumps(response).encode("utf-8") + b"\n") - await writer.drain() - - async def _send_response(self, writer, result, nid): - """Send successful JSON-RPC response to given writer""" - response = {"jsonrpc": "2.0", "result": result, "id": nid} - self.log.debug("<= %s", response) - writer.write(json.dumps(response).encode("utf-8") + b"\n") - await writer.drain() - - async def _send_error(self, writer, error, nid): - """Send JSON-RPC error to given writer""" - response = {"jsonrpc": "2.0", "error": error, "id": nid} - self.log.debug("<= %s", response) - writer.write(json.dumps(response).encode("utf-8") + b"\n") - await writer.drain() - - async def _send_reply(self, writer, resp, query): - """Wrap function for sending replies""" - if "error" in resp: - return await self._send_error(writer, resp["error"], query["id"]) - return await self._send_response(writer, resp["result"], query["id"]) - - async def handle_query(self, writer, query): # pylint: disable=R0915,R0912,R0911 - """Electrum protocol method handler mapper""" - if "method" not in query: - self.log.debug("No 'method' in query: %s", query) - return - if "id" not in query: - self.log.debug("No 'id' in query: %s", query) - return - - method = query["method"] - func = self.methodmap.get(method) - if not func: - self.log.error("Unhandled method %s, query=%s", method, query) - return await self._send_reply(writer, ERRORS["nomethod"], query) - resp = await func(writer, query) - return await self._send_reply(writer, resp, query) - - async def blockchain_block_header(self, writer, query): # pylint: disable=W0613 - """Method: blockchain.block.header - Return the block header at the given height. - """ - if "params" not in query or len(query["params"]) < 1: - return ERRORS["invalidparams"] - # TODO: cp_height - index = query["params"][0] - cp_height = query["params"][1] if len(query["params"]) == 2 else 0 - - if not is_non_negative_integer(index): - return ERRORS["invalidparams"] - if not is_non_negative_integer(cp_height): - return ERRORS["invalidparams"] - - _ec, data = await self.bx.fetch_block_header(index) - if _ec and _ec != 0: - self.log.debug("Got error: %s", repr(_ec)) - return ERRORS["internalerror"] - return {"result": safe_hexlify(data)} - - async def blockchain_block_headers(self, writer, query): # pylint: disable=W0613 - """Method: blockchain.block.headers - Return a concatenated chunk of block headers from the main chain. - """ - if "params" not in query or len(query["params"]) < 2: - return ERRORS["invalidparams"] - # Electrum doesn't allow max_chunk_size to be less than 2016 - # gopher://bitreich.org/9/memecache/convenience-store.mkv - # TODO: cp_height - max_chunk_size = 2016 - start_height = query["params"][0] - count = query["params"][1] - - if not is_non_negative_integer(start_height): - return ERRORS["invalidparams"] - if not is_non_negative_integer(count): - return ERRORS["invalidparams"] - - count = min(count, max_chunk_size) - headers = bytearray() - for i in range(count): - _ec, data = await self.bx.fetch_block_header(i) - if _ec and _ec != 0: - self.log.debug("Got error: %s", repr(_ec)) - return ERRORS["internalerror"] - headers.extend(data) - - resp = { - "hex": safe_hexlify(headers), - "count": len(headers) // 80, - "max": max_chunk_size, - } - return {"result": resp} - - async def blockchain_estimatefee(self, writer, query): # pylint: disable=W0613 - """Method: blockchain.estimatefee - Return the estimated transaction fee per kilobyte for a transaction - to be confirmed within a certain number of blocks. - """ - # TODO: Help wanted - return {"result": -1} - - async def header_notifier(self, writer): - self.block_queue = asyncio.Queue() - await self.bx.subscribe_to_blocks(self.block_queue) - while True: - # item = (seq, height, block_data) - item = await self.block_queue.get() - if len(item) != 3: - self.log.debug("error: item from block queue len != 3") - continue - - header = block_to_header(item[2]) - params = [{"height": item[1], "hex": safe_hexlify(header)}] - await self._send_notification(writer, - "blockchain.headers.subscribe", - params) - - async def blockchain_headers_subscribe(self, writer, query): # pylint: disable=W0613 - """Method: blockchain.headers.subscribe - Subscribe to receive block headers when a new block is found. - """ - # Tip height and header are returned upon request - _ec, height = await self.bx.fetch_last_height() - if _ec and _ec != 0: - self.log.debug("Got error: %s", repr(_ec)) - return ERRORS["internalerror"] - _ec, tip_header = await self.bx.fetch_block_header(height) - if _ec and _ec != 0: - self.log.debug("Got error: %s", repr(_ec)) - return ERRORS["internalerror"] - - self.tasks.append(asyncio.create_task(self.header_notifier(writer))) - ret = {"height": height, "hex": safe_hexlify(tip_header)} - return {"result": ret} - - async def blockchain_relayfee(self, writer, query): # pylint: disable=W0613 - """Method: blockchain.relayfee - Return the minimum fee a low-priority transaction must pay in order - to be accepted to the daemon’s memory pool. - """ - # TODO: Help wanted - return {"result": 0.00001} - - async def blockchain_scripthash_get_balance(self, writer, query): # pylint: disable=W0613 - """Method: blockchain.scripthash.get_balance - Return the confirmed and unconfirmed balances of a script hash. - """ - if "params" not in query or len(query["params"]) != 1: - return ERRORS["invalidparams"] - - if not is_hash256_str(query["params"][0]): - return ERRORS["invalidparams"] - - _ec, data = await self.bx.fetch_balance(query["params"][0]) - if _ec and _ec != 0: - self.log.debug("Got error: %s", repr(_ec)) - return ERRORS["internalerror"] - - # TODO: confirmed/unconfirmed, see what's happening in libbitcoin - ret = {"confirmed": data, "unconfirmed": 0} - return {"result": ret} - - async def blockchain_scripthash_get_history(self, writer, query): # pylint: disable=W0613 - """Method: blockchain.scripthash.get_history - Return the confirmed and unconfirmed history of a script hash. - """ - if "params" not in query or len(query["params"]) != 1: - return ERRORS["invalidparams"] - - if not is_hash256_str(query["params"][0]): - return ERRORS["invalidparams"] - - _ec, data = await self.bx.fetch_history4(query["params"][0]) - if _ec and _ec != 0: - self.log.debug("Got error: %s", repr(_ec)) - return ERRORS["internalerror"] - - self.log.debug("hist: %s", data) - ret = [] - # TODO: mempool - for i in data: - if "received" in i: - ret.append({ - "height": i["received"]["height"], - "tx_hash": hash_to_hex_str(i["received"]["hash"]), - }) - if "spent" in i: - ret.append({ - "height": i["spent"]["height"], - "tx_hash": hash_to_hex_str(i["spent"]["hash"]), - }) - - return {"result": ret} - - async def blockchain_scripthash_get_mempool(self, writer, query): # pylint: disable=W0613 - """Method: blockchain.scripthash.get_mempool - Return the unconfirmed transactions of a script hash. - """ - return - - async def blockchain_scripthash_listunspent(self, writer, query): # pylint: disable=W0613 - """Method: blockchain.scripthash.listunspent - Return an ordered list of UTXOs sent to a script hash. - """ - if "params" not in query or len(query["params"]) != 1: - return ERRORS["invalidparams"] - - scripthash = query["params"][0] - if not is_hash256_str(scripthash): - return ERRORS["invalidparams"] - - _ec, utxo = await self.bx.fetch_utxo(scripthash) - if _ec and _ec != 0: - self.log.debug("Got error: %s", repr(_ec)) - return ERRORS["internalerror"] - - # TODO: Check mempool - ret = [] - for i in utxo: - rec = i["received"] - ret.append({ - "tx_pos": rec["index"], - "value": i["value"], - "tx_hash": hash_to_hex_str(rec["hash"]), - "height": rec["height"], - }) - return {"result": ret} - - async def scripthash_notifier(self, writer, scripthash): - # TODO: Figure out how this actually works - _ec, sh_queue = await self.bx.subscribe_scripthash(scripthash) - if _ec and _ec != 0: - self.log.error("bx.subscribe_scripthash failed:", repr(_ec)) - return - - while True: - # item = (seq, height, block_data) - item = await sh_queue.get() - self.log.debug("sh_subscription item: %s", item) - - async def blockchain_scripthash_subscribe(self, writer, query): # pylint: disable=W0613 - """Method: blockchain.scripthash.subscribe - Subscribe to a script hash. - """ - if "params" not in query or len(query["params"]) != 1: - return ERRORS["invalidparamas"] - - scripthash = query["params"][0] - if not is_hash256_str(scripthash): - return ERRORS["invalidparams"] - - _ec, history = await self.bx.fetch_history4(scripthash) - if _ec and _ec != 0: - return ERRORS["internalerror"] - - task = asyncio.create_task(self.scripthash_notifier( - writer, scripthash)) - self.sh_subscriptions[scripthash] = {"task": task} - - if len(history) < 1: - return {"result": None} - - # TODO: Check how history4 acts for mempool/unconfirmed - status = [] - for i in history: - if "received" in i: - status.append(( - hash_to_hex_str(i["received"]["hash"]), - i["received"]["height"], - )) - if "spent" in i: - status.append(( - hash_to_hex_str(i["spent"]["hash"]), - i["spent"]["height"], - )) - - self.sh_subscriptions[scripthash]["status"] = status - return {"result": ElectrumProtocol.__scripthash_status(status)} - - @staticmethod - def __scripthash_status(status): - concat = "" - for txid, height in status: - concat += txid + ":%d:" % height - return bh2u(sha256(concat.encode("ascii"))) - - async def blockchain_scripthash_unsubscribe(self, writer, query): # pylint: disable=W0613 - """Method: blockchain.scripthash.unsubscribe - Unsubscribe from a script hash, preventing future notifications - if its status changes. - """ - if "params" not in query or len(query["params"]) != 1: - return ERRORS["invalidparams"] - - scripthash = query["params"][0] - if not is_hash256_str(scripthash): - return ERRORS["invalidparams"] - - if scripthash in self.sh_subscriptions: - self.sh_subscriptions[scripthash]["task"].cancel() - await self.bx.unsubscribe_scripthash(scripthash) - del self.sh_subscriptions[scripthash] - return {"result": True} - - return {"result": False} - - async def blockchain_transaction_broadcast(self, writer, query): # pylint: disable=W0613 - """Method: blockchain.transaction.broadcast - Broadcast a transaction to the network. - """ - # Note: Not yet implemented in bs v4 - if "params" not in query or len(query["params"]) != 1: - return ERRORS["invalidparams"] - - hextx = query["params"][0] - if not is_hex_str(hextx): - return ERRORS["invalidparams"] - - _ec, _ = await self.bx.broadcast_transaction(hextx) - if _ec and _ec != 0: - return ERRORS["internalerror"] - - rawtx = unhexlify(hextx) - txid = double_sha256(rawtx) - return {"result": hash_to_hex_str(txid)} - - async def blockchain_transaction_get(self, writer, query): # pylint: disable=W0613 - """Method: blockchain.transaction.get - Return a raw transaction. - """ - if "params" not in query or len(query["params"]) < 1: - return ERRORS["invalidparams"] - tx_hash = query["params"][0] - verbose = query["params"][1] if len(query["params"]) > 1 else False - - # _ec, rawtx = await self.bx.fetch_blockchain_transaction(tx_hash) - _ec, rawtx = await self.bx.fetch_mempool_transaction(tx_hash) - if _ec and _ec != 0: - self.log.debug("Got error: %s", repr(_ec)) - return ERRORS["internalerror"] - - # Behaviour is undefined in spec - if not rawtx: - return {"result": None} - - if verbose: - # TODO: Help needed - return ERRORS["invalidrequest"] - - return {"result": bh2u(rawtx)} - - async def blockchain_transaction_get_merkle(self, writer, query): # pylint: disable=W0613 - """Method: blockchain.transaction.get_merkle - Return the merkle branch to a confirmed transaction given its - hash and height. - """ - if "params" not in query or len(query["params"]) != 2: - return ERRORS["invalidparams"] - tx_hash = query["params"][0] - height = query["params"][1] - - if not is_hash256_str(tx_hash): - return ERRORS["invalidparams"] - if not is_non_negative_integer(height): - return ERRORS["invalidparams"] - - _ec, hashes = await self.bx.fetch_block_transaction_hashes(height) - if _ec and _ec != 0: - self.log.debug("Got error: %s", repr(_ec)) - return ERRORS["internalerror"] - - # Decouple from tuples - hashes = [i[0] for i in hashes] - tx_pos = hashes.index(unhexlify(tx_hash)[::-1]) - branch = merkle_branch(hashes, tx_pos) - - res = { - "block_height": int(height), - "pos": int(tx_pos), - "merkle": branch, - } - return {"result": res} - - async def blockchain_transaction_from_pos(self, writer, query): # pylint: disable=R0911,W0613 - """Method: blockchain.transaction.id_from_pos - Return a transaction hash and optionally a merkle proof, given a - block height and a position in the block. - """ - if "params" not in query or len(query["params"]) < 2: - return ERRORS["invalidparams"] - height = query["params"][0] - tx_pos = query["params"][1] - merkle = query["params"][2] if len(query["params"]) > 2 else False - - if not is_non_negative_integer(height): - return ERRORS["invalidparams"] - if not is_non_negative_integer(tx_pos): - return ERRORS["invalidparams"] - if not is_boolean(merkle): - return ERRORS["invalidparams"] - - _ec, hashes = await self.bx.fetch_block_transaction_hashes(height) - if _ec and _ec != 0: - self.log.debug("Got error: %s", repr(_ec)) - return ERRORS["internalerror"] - - if len(hashes) - 1 < tx_pos: - return ERRORS["internalerror"] - - # Decouple from tuples - hashes = [i[0] for i in hashes] - txid = hash_to_hex_str(hashes[tx_pos]) - - if not merkle: - return {"result": txid} - branch = merkle_branch(hashes, tx_pos) - return {"result": {"tx_hash": txid, "merkle": branch}} - - async def mempool_get_fee_histogram(self, writer, query): # pylint: disable=W0613 - """Method: mempool.get_fee_histogram - Return a histogram of the fee rates paid by transactions in the - memory pool, weighted by transaction size. - """ - # TODO: Help wanted - return {"result": [[0, 0]]} - - async def server_add_peer(self, writer, query): # pylint: disable=W0613 - """Method: server.add_peer - A newly-started server uses this call to get itself into other - servers’ peers lists. It should not be used by wallet clients. - """ - # TODO: Help wanted - return {"result": False} - - async def server_banner(self, writer, query): # pylint: disable=W0613 - """Method: server.banner - Return a banner to be shown in the Electrum console. - """ - return {"result": BANNER} - - async def server_donation_address(self, writer, query): # pylint: disable=W0613 - """Method: server.donation_address - Return a server donation address. - """ - return {"result": DONATION_ADDR} - - async def server_features(self, writer, query): # pylint: disable=W0613 - """Method: server.features - Return a list of features and services supported by the server. - """ - cfg = self.server_cfg - return { - "result": { - "genesis_hash": self.genesis, - "hosts": { - cfg["server_hostname"]: { - "tcp_port": cfg["server_port"], - "ssl_port": None, - }, - }, - "protocol_max": SERVER_PROTO_MAX, - "protocol_min": SERVER_PROTO_MIN, - "pruning": None, - "server_version": f"obelisk {VERSION}", - "hash_function": "sha256", - } - } - - async def server_peers_subscribe(self, writer, query): # pylint: disable=W0613 - """Method: server.peers.subscribe - Return a list of peer servers. Despite the name this is not a - subscription and the server must send no notifications. - """ - # TODO: Help wanted - return {"result": []} - - async def server_ping(self, writer, query): # pylint: disable=W0613 - """Method: server.ping - Ping the server to ensure it is responding, and to keep the session - alive. The server may disconnect clients that have sent no requests - for roughly 10 minutes. - """ - return {"result": None} - - async def server_version(self, writer, query): # pylint: disable=W0613 - """Method: server.version - Identify the client to the server and negotiate the protocol version. - """ - if "params" not in query or len(query["params"]) != 2: - return ERRORS["invalidparams"] - client_ver = query["params"][1] - if isinstance(client_ver, list): - client_min, client_max = client_ver[0], client_ver[1] - else: - client_min = client_max = client_ver - version = min(client_max, SERVER_PROTO_MAX) - if version < max(client_min, SERVER_PROTO_MIN): - return ERRORS["protonotsupported"] - return {"result": [f"obelisk {VERSION}", version]} diff --git a/electrumobelisk/zeromq.py b/electrumobelisk/zeromq.py @@ -1,477 +0,0 @@ -#!/usr/bin/env python3 -# Copyright (C) 2020-2021 Ivan J. <parazyd@dyne.org> -# -# This file is part of obelisk -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License version 3 -# as published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. -"""ZeroMQ implementation for libbitcoin""" -import asyncio -import functools -import struct -from binascii import unhexlify -from random import randint - -import zmq -import zmq.asyncio - -from electrumobelisk.libbitcoin_errors import make_error_code, ErrorCode -from electrumobelisk.util import bh2u - - -def create_random_id(): - """Generate a random request ID""" - max_uint32 = 4294967295 - return randint(0, max_uint32) - - -def pack_block_index(index): - """struct.pack given index""" - if isinstance(index, str): - index = unhexlify(index) - assert len(index) == 32 - return index - if isinstance(index, int): - return struct.pack("<I", index) - - raise ValueError( - f"Unknown index type {type(index)} v:{index}, should be int or bytearray" - ) - - -def to_int(xbytes): - """Make little-endian integer from given bytes""" - return int.from_bytes(xbytes, byteorder="little") - - -def checksum(xhash, index): - """ - This method takes a transaction hash and an index and returns a checksum. - - This checksum is based on 49 bits starting from the 12th byte of the - reversed hash. Combined with the last 15 bits of the 4 byte index. - """ - mask = 0xFFFFFFFFFFFF8000 - magic_start_position = 12 - - hash_bytes = bytes.fromhex(xhash)[::-1] - last_20_bytes = hash_bytes[magic_start_position:] - - assert len(hash_bytes) == 32 - assert index < 2**32 - - hash_upper_49_bits = to_int(last_20_bytes) & mask - index_lower_15_bits = index & ~mask - return hash_upper_49_bits | index_lower_15_bits - - -def unpack_table(row_fmt, data): - """Function to unpack table received from libbitcoin""" - # Get the number of rows - row_size = struct.calcsize(row_fmt) - nrows = len(data) // row_size - # Unpack - rows = [] - for idx in range(nrows): - offset = idx * row_size - row = struct.unpack_from(row_fmt, data, offset) - rows.append(row) - return rows - - -class ClientSettings: - """Class implementing ZMQ client settings""" - def __init__(self, timeout=10, context=None, loop=None): - self._timeout = timeout - self._context = context - self._loop = loop - - @property - def context(self): - """ZMQ context property""" - if not self._context: - ctx = zmq.asyncio.Context() - ctx.linger = 500 # in milliseconds - self._context = ctx - return self._context - - @context.setter - def context(self, context): - self._context = context - - @property - def timeout(self): - """Set to None for no timeout""" - return self._timeout - - @timeout.setter - def timeout(self, timeout): - self._timeout = timeout - - -class Request: - """Class implementing a _send_ request. - This is either a simple request/response affair or a subscription. - """ - def __init__(self, socket, command, data): - self.id_ = create_random_id() - self.socket = socket - self.command = command - self.data = data - self.future = asyncio.Future() - self.queue = None - - async def send(self): - """Send the ZMQ request""" - request = [self.command, struct.pack("<I", self.id_), self.data] - await self.socket.send_multipart(request) - - def is_subscription(self): - """If the request is a subscription, then the response to this - request is a notification. - """ - return self.queue is not None - - def __str__(self): - return "Request(command, ID) {}, {:d}".format(self.command, self.id_) - - -class InvalidServerResponseException(Exception): - """Exception for invalid server responses""" - - -class Response: - """Class implementing a request response""" - def __init__(self, frame): - if len(frame) != 3: - raise InvalidServerResponseException( - f"Length of the frame was not 3: {len(frame)}") - - self.command = frame[0] - self.request_id = struct.unpack("<I", frame[1])[0] - error_code = struct.unpack("<I", frame[2][:4])[0] - self.error_code = make_error_code(error_code) - self.data = frame[2][4:] - - def is_bound_for_queue(self): - return len(self.data) > 0 - - def __str__(self): - return ( - "Response(command, request ID, error code, data):" + - f" {self.command}, {self.request_id}, {self.error_code}, {self.data}" - ) - - -class RequestCollection: - """RequestCollection carries a list of Requests and matches incoming - responses to them. - """ - def __init__(self, socket, loop): - self._socket = socket - self._requests = {} - self._task = asyncio.ensure_future(self._run(), loop=loop) - - async def _run(self): - while True: - await self._receive() - - async def stop(self): - """Stops listening for incoming responses (or subscription) messages. - Returns the number of _responses_ expected but which are now dropped - on the floor. - """ - self._task.cancel() - try: - await self._task - except asyncio.CancelledError: - return len(self._requests) - - async def _receive(self): - frame = await self._socket.recv_multipart() - response = Response(frame) - - if response.request_id in self._requests: - self._handle_response(response) - else: - print( - f"DEBUG: RequestCollection unhandled response {response.command}:{response.request_id}" # pylint: disable=C0301 - ) - - def _handle_response(self, response): - request = self._requests[response.request_id] - - if request.is_subscription(): - if response.is_bound_for_queue(): - # TODO: decode the data into something usable - request.queue.put_nowait(response.data) - else: - request.future.set_result(response) - else: - self.delete_request(request) - request.future.set_result(response) - - def add_request(self, request): - # TODO: we should maybe check if the request.id_ is unique - self._requests[request.id_] = request - - def delete_request(self, request): - del self._requests[request.id_] - - -class Client: - """This class represents a connection to a libbitcoin server.""" - def __init__(self, log, endpoints, loop): - self.log = log - self._endpoints = endpoints - self._settings = ClientSettings(loop=loop) - self._query_socket = self._create_query_socket() - self._block_socket = self._create_block_socket() - self._request_collection = RequestCollection(self._query_socket, - self._settings._loop) - - async def stop(self): - self.log.debug("zmq Client.stop()") - self._query_socket.close() - self._block_socket.close() - return await self._request_collection.stop() - - def _create_block_socket(self): - socket = self._settings.context.socket( - zmq.SUB, # pylint: disable=E1101 - io_loop=self._settings._loop, # pylint: disable=W0212 - ) - socket.connect(self._endpoints["block"]) - socket.setsockopt_string(zmq.SUBSCRIBE, "") # pylint: disable=E1101 - return socket - - def _create_query_socket(self): - socket = self._settings.context.socket( - zmq.DEALER, # pylint: disable=E1101 - io_loop=self._settings._loop, # pylint: disable=W0212 - ) - socket.connect(self._endpoints["query"]) - return socket - - async def _subscription_request(self, command, data): - request = await self._request(command, data) - request.queue = asyncio.Queue(loop=self._settings._loop) # pylint: disable=W0212 - error_code, _ = await self._wait_for_response(request) - return error_code, request.queue - - async def _simple_request(self, command, data): - return await self._wait_for_response(await - self._request(command, data)) - - async def _request(self, command, data): - """Make a generic request. Both options are byte objects specified - like b'blockchain.fetch_block_header' as an example. - """ - request = Request(self._query_socket, command, data) - await request.send() - self._request_collection.add_request(request) - return request - - async def _wait_for_response(self, request): - try: - response = await asyncio.wait_for(request.future, - self._settings.timeout) - except asyncio.TimeoutError: - self._request_collection.delete_request(request) - return ErrorCode.channel_timeout, None - - assert response.command == request.command - assert response.request_id == request.id_ - return response.error_code, response.data - - async def fetch_last_height(self): - """Fetch the blockchain tip and return integer height""" - command = b"blockchain.fetch_last_height" - error_code, data = await self._simple_request(command, b"") - if error_code: - return error_code, None - return error_code, struct.unpack("<I", data)[0] - - async def fetch_block_header(self, index): - """Fetch a block header by its height or integer index""" - command = b"blockchain.fetch_block_header" - data = pack_block_index(index) - return await self._simple_request(command, data) - - async def fetch_block_transaction_hashes(self, index): - """Fetch transaction hashes in a block at height index""" - command = b"blockchain.fetch_block_transaction_hashes" - data = pack_block_index(index) - error_code, data = await self._simple_request(command, data) - if error_code: - return error_code, None - return error_code, unpack_table("32s", data) - - async def fetch_blockchain_transaction(self, txid): - """Fetch transaction by txid (not including mempool)""" - command = b"blockchain.fetch_transaction2" - error_code, data = await self._simple_request( - command, - bytes.fromhex(txid)[::-1]) - if error_code: - return error_code, None - return error_code, data - - async def fetch_mempool_transaction(self, txid): - """Fetch transaction by txid (including mempool)""" - command = b"transaction_pool.fetch_transaction2" - error_code, data = await self._simple_request( - command, - bytes.fromhex(txid)[::-1]) - if error_code: - return error_code, None - return error_code, data - - async def subscribe_scripthash(self, scripthash): - """Subscribe to scripthash""" - command = b"subscribe.key" - decoded_address = unhexlify(scripthash) - return await self._subscription_request(command, decoded_address) - - async def unsubscribe_scripthash(self, scripthash): - """Unsubscribe scripthash""" - # TODO: This call should ideally also remove the subscription - # request from the RequestCollection. - # This call solicits a final call from the server with an - # `error::service_stopped` error code. - command = b"unsubscribe.key" - decoded_address = unhexlify(scripthash) - return await self._simple_request(command, decoded_address) - - async def fetch_history4(self, scripthash, height=0): - """Fetch history for given scripthash""" - command = b"blockchain.fetch_history4" - decoded_address = unhexlify(scripthash) - error_code, raw_points = await self._simple_request( - command, decoded_address + struct.pack("<I", height)) - if error_code: - return error_code, None - - def make_tuple(row): - kind, height, tx_hash, index, value = row - return ( - kind, - { - "hash": tx_hash, - "index": index - }, - height, - value, - checksum(tx_hash[::-1].hex(), index), - ) - - rows = unpack_table("<BI32sIQ", raw_points) - points = [make_tuple(row) for row in rows] - correlated_points = Client.__correlate(points) - # self.log.debug("history points: %s", points) - # self.log.debug("history correlated: %s", correlated_points) - return error_code, self._sort_correlated_points(correlated_points) - - @staticmethod - def _sort_correlated_points(points): - """Sort by ascending height""" - if len(points) < 2: - return points - return sorted(points, key=lambda x: list(x.values())[0]["height"]) - - async def broadcast_transaction(self, rawtx): - """Broadcast given raw transaction""" - command = b"transaction_pool.broadcast" - return await self._simple_request(command, rawtx) - - async def fetch_balance(self, scripthash): - """Fetch balance for given scripthash""" - error_code, history = await self.fetch_history4(scripthash) - if error_code: - return error_code, None - - utxo = Client.__receives_without_spends(history) - return error_code, functools.reduce( - lambda accumulator, point: accumulator + point["value"], utxo, 0) - - async def fetch_utxo(self, scripthash): - """Find UTXO for given scripthash""" - error_code, history = await self.fetch_history4(scripthash) - if error_code: - return error_code, None - return error_code, Client.__receives_without_spends(history) - - async def subscribe_to_blocks(self, queue): - asyncio.ensure_future(self._listen_for_blocks(queue)) - return queue - - async def _listen_for_blocks(self, queue): - """Infinite loop for block subscription. - Returns raw blocks as they're received. - """ - while True: - frame = await self._block_socket.recv_multipart() - seq = struct.unpack("<H", frame[0])[0] - height = struct.unpack("<I", frame[1])[0] - block_data = frame[2] - queue.put_nowait((seq, height, block_data)) - - @staticmethod - def __receives_without_spends(history): - return (point for point in history if "spent" not in point) - - @staticmethod - def __correlate(points): - transfers, checksum_to_index = Client.__find_receives(points) - transfers = Client.__correlate_spends_to_receives( - points, transfers, checksum_to_index) - return transfers - - @staticmethod - def __correlate_spends_to_receives(points, transfers, checksum_to_index): - for point in points: - if point[0] == 1: # receive - continue - - spent = { - "hash": point[1]["hash"], - "height": point[2], - "index": point[1]["index"], - } - if point[3] not in checksum_to_index: - transfers.append({"spent": spent}) - else: - transfers[checksum_to_index[point[3]]]["spent"] = spent - - return transfers - - @staticmethod - def __find_receives(points): - transfers = [] - checksum_to_index = {} - - for point in points: - if point[0] == 0: # spent - continue - - transfers.append({ - "received": { - "hash": point[1]["hash"], - "height": point[2], - "index": point[1]["index"], - }, - "value": point[3], - }) - - checksum_to_index[point[4]] = len(transfers) - 1 - - return transfers, checksum_to_index diff --git a/obelisk.py b/obelisk.py @@ -1,109 +0,0 @@ -#!/usr/bin/env python3 -# Copyright (C) 2020-2021 Ivan J. <parazyd@dyne.org> -# -# This file is part of obelisk -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License version 3 -# as published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. -import asyncio -import sys -from argparse import ArgumentParser -from configparser import RawConfigParser, NoSectionError -from logging import getLogger, FileHandler, Formatter, StreamHandler -from os import devnull - -from electrumobelisk.protocol import ElectrumProtocol, VERSION - -# Used for destructor/cleanup -PROTOCOL = None - - -def logger_config(log, config): - """Setup logging""" - fmt = Formatter("%(asctime)s\t%(levelname)s\t%(message)s") - logstream = StreamHandler() - logstream.setFormatter(fmt) - debuglevel = config.get("obelisk", "log_level", fallback="INFO") - logstream.setLevel(debuglevel) - log.addHandler(logstream) - filename = config.get("obelisk", "log_file", fallback=devnull) - append_log = config.getboolean("obelisk", "append_log", fallback=False) - logfile = FileHandler(filename, mode=("a" if append_log else "w")) - logfile.setFormatter(fmt) - logfile.setLevel(debuglevel) - log.addHandler(logfile) - log.setLevel(debuglevel) - return log, filename - - -async def run_electrum_server(config, chain): - """Server coroutine""" - log = getLogger("obelisk") - host = config.get("obelisk", "host") - port = int(config.get("obelisk", "port")) - - endpoints = {} - endpoints["query"] = config.get("obelisk", "query") - endpoints["heart"] = config.get("obelisk", "heart") - endpoints["block"] = config.get("obelisk", "block") - endpoints["trans"] = config.get("obelisk", "trans") - - server_cfg = {} - server_cfg["server_hostname"] = "localhost" # TODO: <- should be public? - server_cfg["server_port"] = port - - global PROTOCOL - PROTOCOL = ElectrumProtocol(log, chain, endpoints, server_cfg) - - server = await asyncio.start_server(PROTOCOL.recv, host, port) - async with server: - await server.serve_forever() - - -def main(): - """Main orchestration""" - parser = ArgumentParser(description=f"obelisk {VERSION}") - parser.add_argument("config_file", help="Path to config file") - args = parser.parse_args() - - try: - config = RawConfigParser() - config.read(args.config_file) - config.options("obelisk") - except NoSectionError: - print(f"error: Invalid config file {args.config_file}") - return 1 - - log = getLogger("obelisk") - log, logfilename = logger_config(log, config) - log.info(f"Starting obelisk {VERSION}") - log.info(f"Logging to {logfilename}") - - chain = config.get("obelisk", "chain") - if chain not in ("mainnet", "testnet"): - log.error("chain is not 'mainnet' or 'testnet'") - return 1 - - try: - asyncio.run(run_electrum_server(config, chain)) - except KeyboardInterrupt: - print("\r", end="") - log.debug("Caught KeyboardInterrupt, exiting...") - if PROTOCOL: - asyncio.run(PROTOCOL.stop()) - return 0 - - return 1 - - -if __name__ == "__main__": - sys.exit(main()) diff --git a/obelisk/__init__.py b/obelisk/__init__.py diff --git a/electrumobelisk/errors.py b/obelisk/errors.py diff --git a/electrumobelisk/libbitcoin_errors.py b/obelisk/libbitcoin_errors.py diff --git a/obelisk/merkle.py b/obelisk/merkle.py @@ -0,0 +1,57 @@ +#!/usr/bin/env python3 +# Copyright (C) 2020-2021 Ivan J. <parazyd@dyne.org> +# +# This file is part of obelisk +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License version 3 +# as published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +"""Module for calculating merkle branches""" +from math import ceil, log + +from obelisk.util import double_sha256 + + +def branch_length(hash_count): + """Return the length of a merkle branch given the number of hashes""" + return ceil(log(hash_count, 2)) + + +def merkle_branch_and_root(hashes, index): + """Return a (merkle branch, merkle_root) pair given hashes, and the + index of one of those hashes. + """ + hashes = list(hashes) + if not isinstance(index, int): + raise TypeError("index must be an integer") + # This also asserts hashes is not empty + if not 0 <= index < len(hashes): + raise ValueError("index out of range") + length = branch_length(len(hashes)) + + branch = [] + for _ in range(length): + if len(hashes) & 1: + hashes.append(hashes[-1]) + branch.append(hashes[index ^ 1]) + index >>= 1 + hashes = [ + double_sha256(hashes[n] + hashes[n + 1]) + for n in range(0, len(hashes), 2) + ] + return branch, hashes[0] + + +def merkle_branch(tx_hashes, tx_pos): + """Return a merkle branch given hashes and the tx position""" + branch, _root = merkle_branch_and_root(tx_hashes, tx_pos) + branch = [bytes(reversed(h)).hex() for h in branch] + return branch diff --git a/obelisk/obelisk b/obelisk/obelisk @@ -0,0 +1 @@ +../run_obelisk+ \ No newline at end of file diff --git a/obelisk/protocol.py b/obelisk/protocol.py @@ -0,0 +1,665 @@ +#!/usr/bin/env python3 +# Copyright (C) 2020-2021 Ivan J. <parazyd@dyne.org> +# +# This file is part of obelisk +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License version 3 +# as published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +"""Implementation of the Electrum protocol as found on +https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-methods.html +""" +import asyncio +import json +from binascii import unhexlify + +from obelisk.errors import ERRORS +from obelisk.merkle import merkle_branch +from obelisk.util import ( + bh2u, + block_to_header, + is_boolean, + is_hash256_str, + is_hex_str, + is_non_negative_integer, + safe_hexlify, + sha256, + double_sha256, + hash_to_hex_str, +) +from obelisk.zeromq import Client + +VERSION = "0.0" +SERVER_PROTO_MIN = "1.4" +SERVER_PROTO_MAX = "1.4.2" +DONATION_ADDR = "bc1q7an9p5pz6pjwjk4r48zke2yfaevafzpglg26mz" + +BANNER = (""" +Welcome to obelisk + +"Tools for the people" + +obelisk is a server that uses libbitcoin-server as its backend. +Source code can be found at: https://github.com/parazyd/obelisk + +Please consider donating: %s +""" % DONATION_ADDR) + + +class ElectrumProtocol(asyncio.Protocol): # pylint: disable=R0904,R0902 + """Class implementing the Electrum protocol, with async support""" + def __init__(self, log, chain, endpoints, server_cfg): + self.log = log + self.stopped = False + self.endpoints = endpoints + self.server_cfg = server_cfg + self.loop = asyncio.get_event_loop() + # Consider renaming bx to something else + self.bx = Client(log, endpoints, self.loop) + self.block_queue = None + # TODO: Clean up on client disconnect + self.tasks = [] + self.sh_subscriptions = {} + + if chain == "mainnet": + self.genesis = "000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f" + elif chain == "testnet": + self.genesis = "000000000933ea01ad0ee984209779baaec3ced90fa3f408719526f8d77f4943" + else: + raise ValueError(f"Invalid chain '{chain}'") + + # Here we map available methods to their respective functions + self.methodmap = { + "blockchain.block.header": self.blockchain_block_header, + "blockchain.block.headers": self.blockchain_block_headers, + "blockchain.estimatefee": self.blockchain_estimatefee, + "blockchain.headers.subscribe": self.blockchain_headers_subscribe, + "blockchain.relayfee": self.blockchain_relayfee, + "blockchain.scripthash.get_balance": + self.blockchain_scripthash_get_balance, + "blockchain.scripthash.get_history": + self.blockchain_scripthash_get_history, + "blockchain.scripthash.get_mempool": + self.blockchain_scripthash_get_mempool, + "blockchain.scripthash.listunspent": + self.blockchain_scripthash_listunspent, + "blockchain.scripthash.subscribe": + self.blockchain_scripthash_subscribe, + "blockchain.scripthash.unsubscribe": + self.blockchain_scripthash_unsubscribe, + "blockchain.transaction.broadcast": + self.blockchain_transaction_broadcast, + "blockchain.transaction.get": self.blockchain_transaction_get, + "blockchain.transaction.get_merkle": + self.blockchain_transaction_get_merkle, + "blockchain.transaction.id_from_pos": + self.blockchain_transaction_from_pos, + "mempool.get_fee_histogram": self.mempool_get_fee_histogram, + "server_add_peer": self.server_add_peer, + "server.banner": self.server_banner, + "server.donation_address": self.server_donation_address, + "server.features": self.server_features, + "server.peers.subscribe": self.server_peers_subscribe, + "server.ping": self.server_ping, + "server.version": self.server_version, + } + + async def stop(self): + """Destructor function""" + self.log.debug("ElectrumProtocol.stop()") + if self.bx: + unsub_pool = [] + for i in self.sh_subscriptions: + self.log.debug("bx.unsubscribe %s", i) + unsub_pool.append(self.bx.unsubscribe_scripthash(i)) + await asyncio.gather(*unsub_pool, return_exceptions=True) + await self.bx.stop() + + # idxs = [] + # for task in self.tasks: + # idxs.append(self.tasks.index(task)) + # task.cancel() + # for i in idxs: + # del self.tasks[i] + + self.stopped = True + + async def recv(self, reader, writer): + """Loop ran upon a connection which acts as a JSON-RPC handler""" + recv_buf = bytearray() + while not self.stopped: + data = await reader.read(4096) + if not data or len(data) == 0: + self.log.debug("Received EOF, disconnect") + # TODO: cancel asyncio tasks for this client here? + return + recv_buf.extend(data) + lb = recv_buf.find(b"\n") + if lb == -1: + continue + while lb != -1: + line = recv_buf[:lb].rstrip() + recv_buf = recv_buf[lb + 1:] + lb = recv_buf.find(b"\n") + try: + line = line.decode("utf-8") + query = json.loads(line) + except (UnicodeDecodeError, json.JSONDecodeError) as err: + self.log.debug("Got error: %s", repr(err)) + break + self.log.debug("=> " + line) + await self.handle_query(writer, query) + + async def _send_notification(self, writer, method, params): + """Send JSON-RPC notification to given writer""" + response = {"jsonrpc": "2.0", "method": method, "params": params} + self.log.debug("<= %s", response) + writer.write(json.dumps(response).encode("utf-8") + b"\n") + await writer.drain() + + async def _send_response(self, writer, result, nid): + """Send successful JSON-RPC response to given writer""" + response = {"jsonrpc": "2.0", "result": result, "id": nid} + self.log.debug("<= %s", response) + writer.write(json.dumps(response).encode("utf-8") + b"\n") + await writer.drain() + + async def _send_error(self, writer, error, nid): + """Send JSON-RPC error to given writer""" + response = {"jsonrpc": "2.0", "error": error, "id": nid} + self.log.debug("<= %s", response) + writer.write(json.dumps(response).encode("utf-8") + b"\n") + await writer.drain() + + async def _send_reply(self, writer, resp, query): + """Wrap function for sending replies""" + if "error" in resp: + return await self._send_error(writer, resp["error"], query["id"]) + return await self._send_response(writer, resp["result"], query["id"]) + + async def handle_query(self, writer, query): # pylint: disable=R0915,R0912,R0911 + """Electrum protocol method handler mapper""" + if "method" not in query: + self.log.debug("No 'method' in query: %s", query) + return + if "id" not in query: + self.log.debug("No 'id' in query: %s", query) + return + + method = query["method"] + func = self.methodmap.get(method) + if not func: + self.log.error("Unhandled method %s, query=%s", method, query) + return await self._send_reply(writer, ERRORS["nomethod"], query) + resp = await func(writer, query) + return await self._send_reply(writer, resp, query) + + async def blockchain_block_header(self, writer, query): # pylint: disable=W0613 + """Method: blockchain.block.header + Return the block header at the given height. + """ + if "params" not in query or len(query["params"]) < 1: + return ERRORS["invalidparams"] + # TODO: cp_height + index = query["params"][0] + cp_height = query["params"][1] if len(query["params"]) == 2 else 0 + + if not is_non_negative_integer(index): + return ERRORS["invalidparams"] + if not is_non_negative_integer(cp_height): + return ERRORS["invalidparams"] + + _ec, data = await self.bx.fetch_block_header(index) + if _ec and _ec != 0: + self.log.debug("Got error: %s", repr(_ec)) + return ERRORS["internalerror"] + return {"result": safe_hexlify(data)} + + async def blockchain_block_headers(self, writer, query): # pylint: disable=W0613 + """Method: blockchain.block.headers + Return a concatenated chunk of block headers from the main chain. + """ + if "params" not in query or len(query["params"]) < 2: + return ERRORS["invalidparams"] + # Electrum doesn't allow max_chunk_size to be less than 2016 + # gopher://bitreich.org/9/memecache/convenience-store.mkv + # TODO: cp_height + max_chunk_size = 2016 + start_height = query["params"][0] + count = query["params"][1] + + if not is_non_negative_integer(start_height): + return ERRORS["invalidparams"] + if not is_non_negative_integer(count): + return ERRORS["invalidparams"] + + count = min(count, max_chunk_size) + headers = bytearray() + for i in range(count): + _ec, data = await self.bx.fetch_block_header(i) + if _ec and _ec != 0: + self.log.debug("Got error: %s", repr(_ec)) + return ERRORS["internalerror"] + headers.extend(data) + + resp = { + "hex": safe_hexlify(headers), + "count": len(headers) // 80, + "max": max_chunk_size, + } + return {"result": resp} + + async def blockchain_estimatefee(self, writer, query): # pylint: disable=W0613 + """Method: blockchain.estimatefee + Return the estimated transaction fee per kilobyte for a transaction + to be confirmed within a certain number of blocks. + """ + # TODO: Help wanted + return {"result": -1} + + async def header_notifier(self, writer): + self.block_queue = asyncio.Queue() + await self.bx.subscribe_to_blocks(self.block_queue) + while True: + # item = (seq, height, block_data) + item = await self.block_queue.get() + if len(item) != 3: + self.log.debug("error: item from block queue len != 3") + continue + + header = block_to_header(item[2]) + params = [{"height": item[1], "hex": safe_hexlify(header)}] + await self._send_notification(writer, + "blockchain.headers.subscribe", + params) + + async def blockchain_headers_subscribe(self, writer, query): # pylint: disable=W0613 + """Method: blockchain.headers.subscribe + Subscribe to receive block headers when a new block is found. + """ + # Tip height and header are returned upon request + _ec, height = await self.bx.fetch_last_height() + if _ec and _ec != 0: + self.log.debug("Got error: %s", repr(_ec)) + return ERRORS["internalerror"] + _ec, tip_header = await self.bx.fetch_block_header(height) + if _ec and _ec != 0: + self.log.debug("Got error: %s", repr(_ec)) + return ERRORS["internalerror"] + + self.tasks.append(asyncio.create_task(self.header_notifier(writer))) + ret = {"height": height, "hex": safe_hexlify(tip_header)} + return {"result": ret} + + async def blockchain_relayfee(self, writer, query): # pylint: disable=W0613 + """Method: blockchain.relayfee + Return the minimum fee a low-priority transaction must pay in order + to be accepted to the daemon’s memory pool. + """ + # TODO: Help wanted + return {"result": 0.00001} + + async def blockchain_scripthash_get_balance(self, writer, query): # pylint: disable=W0613 + """Method: blockchain.scripthash.get_balance + Return the confirmed and unconfirmed balances of a script hash. + """ + if "params" not in query or len(query["params"]) != 1: + return ERRORS["invalidparams"] + + if not is_hash256_str(query["params"][0]): + return ERRORS["invalidparams"] + + _ec, data = await self.bx.fetch_balance(query["params"][0]) + if _ec and _ec != 0: + self.log.debug("Got error: %s", repr(_ec)) + return ERRORS["internalerror"] + + # TODO: confirmed/unconfirmed, see what's happening in libbitcoin + ret = {"confirmed": data, "unconfirmed": 0} + return {"result": ret} + + async def blockchain_scripthash_get_history(self, writer, query): # pylint: disable=W0613 + """Method: blockchain.scripthash.get_history + Return the confirmed and unconfirmed history of a script hash. + """ + if "params" not in query or len(query["params"]) != 1: + return ERRORS["invalidparams"] + + if not is_hash256_str(query["params"][0]): + return ERRORS["invalidparams"] + + _ec, data = await self.bx.fetch_history4(query["params"][0]) + if _ec and _ec != 0: + self.log.debug("Got error: %s", repr(_ec)) + return ERRORS["internalerror"] + + self.log.debug("hist: %s", data) + ret = [] + # TODO: mempool + for i in data: + if "received" in i: + ret.append({ + "height": i["received"]["height"], + "tx_hash": hash_to_hex_str(i["received"]["hash"]), + }) + if "spent" in i: + ret.append({ + "height": i["spent"]["height"], + "tx_hash": hash_to_hex_str(i["spent"]["hash"]), + }) + + return {"result": ret} + + async def blockchain_scripthash_get_mempool(self, writer, query): # pylint: disable=W0613 + """Method: blockchain.scripthash.get_mempool + Return the unconfirmed transactions of a script hash. + """ + return + + async def blockchain_scripthash_listunspent(self, writer, query): # pylint: disable=W0613 + """Method: blockchain.scripthash.listunspent + Return an ordered list of UTXOs sent to a script hash. + """ + if "params" not in query or len(query["params"]) != 1: + return ERRORS["invalidparams"] + + scripthash = query["params"][0] + if not is_hash256_str(scripthash): + return ERRORS["invalidparams"] + + _ec, utxo = await self.bx.fetch_utxo(scripthash) + if _ec and _ec != 0: + self.log.debug("Got error: %s", repr(_ec)) + return ERRORS["internalerror"] + + # TODO: Check mempool + ret = [] + for i in utxo: + rec = i["received"] + ret.append({ + "tx_pos": rec["index"], + "value": i["value"], + "tx_hash": hash_to_hex_str(rec["hash"]), + "height": rec["height"], + }) + return {"result": ret} + + async def scripthash_notifier(self, writer, scripthash): + # TODO: Figure out how this actually works + _ec, sh_queue = await self.bx.subscribe_scripthash(scripthash) + if _ec and _ec != 0: + self.log.error("bx.subscribe_scripthash failed:", repr(_ec)) + return + + while True: + # item = (seq, height, block_data) + item = await sh_queue.get() + self.log.debug("sh_subscription item: %s", item) + + async def blockchain_scripthash_subscribe(self, writer, query): # pylint: disable=W0613 + """Method: blockchain.scripthash.subscribe + Subscribe to a script hash. + """ + if "params" not in query or len(query["params"]) != 1: + return ERRORS["invalidparamas"] + + scripthash = query["params"][0] + if not is_hash256_str(scripthash): + return ERRORS["invalidparams"] + + _ec, history = await self.bx.fetch_history4(scripthash) + if _ec and _ec != 0: + return ERRORS["internalerror"] + + task = asyncio.create_task(self.scripthash_notifier( + writer, scripthash)) + self.sh_subscriptions[scripthash] = {"task": task} + + if len(history) < 1: + return {"result": None} + + # TODO: Check how history4 acts for mempool/unconfirmed + status = [] + for i in history: + if "received" in i: + status.append(( + hash_to_hex_str(i["received"]["hash"]), + i["received"]["height"], + )) + if "spent" in i: + status.append(( + hash_to_hex_str(i["spent"]["hash"]), + i["spent"]["height"], + )) + + self.sh_subscriptions[scripthash]["status"] = status + return {"result": ElectrumProtocol.__scripthash_status(status)} + + @staticmethod + def __scripthash_status(status): + concat = "" + for txid, height in status: + concat += txid + ":%d:" % height + return bh2u(sha256(concat.encode("ascii"))) + + async def blockchain_scripthash_unsubscribe(self, writer, query): # pylint: disable=W0613 + """Method: blockchain.scripthash.unsubscribe + Unsubscribe from a script hash, preventing future notifications + if its status changes. + """ + if "params" not in query or len(query["params"]) != 1: + return ERRORS["invalidparams"] + + scripthash = query["params"][0] + if not is_hash256_str(scripthash): + return ERRORS["invalidparams"] + + if scripthash in self.sh_subscriptions: + self.sh_subscriptions[scripthash]["task"].cancel() + await self.bx.unsubscribe_scripthash(scripthash) + del self.sh_subscriptions[scripthash] + return {"result": True} + + return {"result": False} + + async def blockchain_transaction_broadcast(self, writer, query): # pylint: disable=W0613 + """Method: blockchain.transaction.broadcast + Broadcast a transaction to the network. + """ + # Note: Not yet implemented in bs v4 + if "params" not in query or len(query["params"]) != 1: + return ERRORS["invalidparams"] + + hextx = query["params"][0] + if not is_hex_str(hextx): + return ERRORS["invalidparams"] + + _ec, _ = await self.bx.broadcast_transaction(hextx) + if _ec and _ec != 0: + return ERRORS["internalerror"] + + rawtx = unhexlify(hextx) + txid = double_sha256(rawtx) + return {"result": hash_to_hex_str(txid)} + + async def blockchain_transaction_get(self, writer, query): # pylint: disable=W0613 + """Method: blockchain.transaction.get + Return a raw transaction. + """ + if "params" not in query or len(query["params"]) < 1: + return ERRORS["invalidparams"] + tx_hash = query["params"][0] + verbose = query["params"][1] if len(query["params"]) > 1 else False + + # _ec, rawtx = await self.bx.fetch_blockchain_transaction(tx_hash) + _ec, rawtx = await self.bx.fetch_mempool_transaction(tx_hash) + if _ec and _ec != 0: + self.log.debug("Got error: %s", repr(_ec)) + return ERRORS["internalerror"] + + # Behaviour is undefined in spec + if not rawtx: + return {"result": None} + + if verbose: + # TODO: Help needed + return ERRORS["invalidrequest"] + + return {"result": bh2u(rawtx)} + + async def blockchain_transaction_get_merkle(self, writer, query): # pylint: disable=W0613 + """Method: blockchain.transaction.get_merkle + Return the merkle branch to a confirmed transaction given its + hash and height. + """ + if "params" not in query or len(query["params"]) != 2: + return ERRORS["invalidparams"] + tx_hash = query["params"][0] + height = query["params"][1] + + if not is_hash256_str(tx_hash): + return ERRORS["invalidparams"] + if not is_non_negative_integer(height): + return ERRORS["invalidparams"] + + _ec, hashes = await self.bx.fetch_block_transaction_hashes(height) + if _ec and _ec != 0: + self.log.debug("Got error: %s", repr(_ec)) + return ERRORS["internalerror"] + + # Decouple from tuples + hashes = [i[0] for i in hashes] + tx_pos = hashes.index(unhexlify(tx_hash)[::-1]) + branch = merkle_branch(hashes, tx_pos) + + res = { + "block_height": int(height), + "pos": int(tx_pos), + "merkle": branch, + } + return {"result": res} + + async def blockchain_transaction_from_pos(self, writer, query): # pylint: disable=R0911,W0613 + """Method: blockchain.transaction.id_from_pos + Return a transaction hash and optionally a merkle proof, given a + block height and a position in the block. + """ + if "params" not in query or len(query["params"]) < 2: + return ERRORS["invalidparams"] + height = query["params"][0] + tx_pos = query["params"][1] + merkle = query["params"][2] if len(query["params"]) > 2 else False + + if not is_non_negative_integer(height): + return ERRORS["invalidparams"] + if not is_non_negative_integer(tx_pos): + return ERRORS["invalidparams"] + if not is_boolean(merkle): + return ERRORS["invalidparams"] + + _ec, hashes = await self.bx.fetch_block_transaction_hashes(height) + if _ec and _ec != 0: + self.log.debug("Got error: %s", repr(_ec)) + return ERRORS["internalerror"] + + if len(hashes) - 1 < tx_pos: + return ERRORS["internalerror"] + + # Decouple from tuples + hashes = [i[0] for i in hashes] + txid = hash_to_hex_str(hashes[tx_pos]) + + if not merkle: + return {"result": txid} + branch = merkle_branch(hashes, tx_pos) + return {"result": {"tx_hash": txid, "merkle": branch}} + + async def mempool_get_fee_histogram(self, writer, query): # pylint: disable=W0613 + """Method: mempool.get_fee_histogram + Return a histogram of the fee rates paid by transactions in the + memory pool, weighted by transaction size. + """ + # TODO: Help wanted + return {"result": [[0, 0]]} + + async def server_add_peer(self, writer, query): # pylint: disable=W0613 + """Method: server.add_peer + A newly-started server uses this call to get itself into other + servers’ peers lists. It should not be used by wallet clients. + """ + # TODO: Help wanted + return {"result": False} + + async def server_banner(self, writer, query): # pylint: disable=W0613 + """Method: server.banner + Return a banner to be shown in the Electrum console. + """ + return {"result": BANNER} + + async def server_donation_address(self, writer, query): # pylint: disable=W0613 + """Method: server.donation_address + Return a server donation address. + """ + return {"result": DONATION_ADDR} + + async def server_features(self, writer, query): # pylint: disable=W0613 + """Method: server.features + Return a list of features and services supported by the server. + """ + cfg = self.server_cfg + return { + "result": { + "genesis_hash": self.genesis, + "hosts": { + cfg["server_hostname"]: { + "tcp_port": cfg["server_port"], + "ssl_port": None, + }, + }, + "protocol_max": SERVER_PROTO_MAX, + "protocol_min": SERVER_PROTO_MIN, + "pruning": None, + "server_version": f"obelisk {VERSION}", + "hash_function": "sha256", + } + } + + async def server_peers_subscribe(self, writer, query): # pylint: disable=W0613 + """Method: server.peers.subscribe + Return a list of peer servers. Despite the name this is not a + subscription and the server must send no notifications. + """ + # TODO: Help wanted + return {"result": []} + + async def server_ping(self, writer, query): # pylint: disable=W0613 + """Method: server.ping + Ping the server to ensure it is responding, and to keep the session + alive. The server may disconnect clients that have sent no requests + for roughly 10 minutes. + """ + return {"result": None} + + async def server_version(self, writer, query): # pylint: disable=W0613 + """Method: server.version + Identify the client to the server and negotiate the protocol version. + """ + if "params" not in query or len(query["params"]) != 2: + return ERRORS["invalidparams"] + client_ver = query["params"][1] + if isinstance(client_ver, list): + client_min, client_max = client_ver[0], client_ver[1] + else: + client_min = client_max = client_ver + version = min(client_max, SERVER_PROTO_MAX) + if version < max(client_min, SERVER_PROTO_MIN): + return ERRORS["protonotsupported"] + return {"result": [f"obelisk {VERSION}", version]} diff --git a/electrumobelisk/util.py b/obelisk/util.py diff --git a/obelisk/zeromq.py b/obelisk/zeromq.py @@ -0,0 +1,477 @@ +#!/usr/bin/env python3 +# Copyright (C) 2020-2021 Ivan J. <parazyd@dyne.org> +# +# This file is part of obelisk +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License version 3 +# as published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +"""ZeroMQ implementation for libbitcoin""" +import asyncio +import functools +import struct +from binascii import unhexlify +from random import randint + +import zmq +import zmq.asyncio + +from obelisk.libbitcoin_errors import make_error_code, ErrorCode +from obelisk.util import bh2u + + +def create_random_id(): + """Generate a random request ID""" + max_uint32 = 4294967295 + return randint(0, max_uint32) + + +def pack_block_index(index): + """struct.pack given index""" + if isinstance(index, str): + index = unhexlify(index) + assert len(index) == 32 + return index + if isinstance(index, int): + return struct.pack("<I", index) + + raise ValueError( + f"Unknown index type {type(index)} v:{index}, should be int or bytearray" + ) + + +def to_int(xbytes): + """Make little-endian integer from given bytes""" + return int.from_bytes(xbytes, byteorder="little") + + +def checksum(xhash, index): + """ + This method takes a transaction hash and an index and returns a checksum. + + This checksum is based on 49 bits starting from the 12th byte of the + reversed hash. Combined with the last 15 bits of the 4 byte index. + """ + mask = 0xFFFFFFFFFFFF8000 + magic_start_position = 12 + + hash_bytes = bytes.fromhex(xhash)[::-1] + last_20_bytes = hash_bytes[magic_start_position:] + + assert len(hash_bytes) == 32 + assert index < 2**32 + + hash_upper_49_bits = to_int(last_20_bytes) & mask + index_lower_15_bits = index & ~mask + return hash_upper_49_bits | index_lower_15_bits + + +def unpack_table(row_fmt, data): + """Function to unpack table received from libbitcoin""" + # Get the number of rows + row_size = struct.calcsize(row_fmt) + nrows = len(data) // row_size + # Unpack + rows = [] + for idx in range(nrows): + offset = idx * row_size + row = struct.unpack_from(row_fmt, data, offset) + rows.append(row) + return rows + + +class ClientSettings: + """Class implementing ZMQ client settings""" + def __init__(self, timeout=10, context=None, loop=None): + self._timeout = timeout + self._context = context + self._loop = loop + + @property + def context(self): + """ZMQ context property""" + if not self._context: + ctx = zmq.asyncio.Context() + ctx.linger = 500 # in milliseconds + self._context = ctx + return self._context + + @context.setter + def context(self, context): + self._context = context + + @property + def timeout(self): + """Set to None for no timeout""" + return self._timeout + + @timeout.setter + def timeout(self, timeout): + self._timeout = timeout + + +class Request: + """Class implementing a _send_ request. + This is either a simple request/response affair or a subscription. + """ + def __init__(self, socket, command, data): + self.id_ = create_random_id() + self.socket = socket + self.command = command + self.data = data + self.future = asyncio.Future() + self.queue = None + + async def send(self): + """Send the ZMQ request""" + request = [self.command, struct.pack("<I", self.id_), self.data] + await self.socket.send_multipart(request) + + def is_subscription(self): + """If the request is a subscription, then the response to this + request is a notification. + """ + return self.queue is not None + + def __str__(self): + return "Request(command, ID) {}, {:d}".format(self.command, self.id_) + + +class InvalidServerResponseException(Exception): + """Exception for invalid server responses""" + + +class Response: + """Class implementing a request response""" + def __init__(self, frame): + if len(frame) != 3: + raise InvalidServerResponseException( + f"Length of the frame was not 3: {len(frame)}") + + self.command = frame[0] + self.request_id = struct.unpack("<I", frame[1])[0] + error_code = struct.unpack("<I", frame[2][:4])[0] + self.error_code = make_error_code(error_code) + self.data = frame[2][4:] + + def is_bound_for_queue(self): + return len(self.data) > 0 + + def __str__(self): + return ( + "Response(command, request ID, error code, data):" + + f" {self.command}, {self.request_id}, {self.error_code}, {self.data}" + ) + + +class RequestCollection: + """RequestCollection carries a list of Requests and matches incoming + responses to them. + """ + def __init__(self, socket, loop): + self._socket = socket + self._requests = {} + self._task = asyncio.ensure_future(self._run(), loop=loop) + + async def _run(self): + while True: + await self._receive() + + async def stop(self): + """Stops listening for incoming responses (or subscription) messages. + Returns the number of _responses_ expected but which are now dropped + on the floor. + """ + self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + return len(self._requests) + + async def _receive(self): + frame = await self._socket.recv_multipart() + response = Response(frame) + + if response.request_id in self._requests: + self._handle_response(response) + else: + print( + f"DEBUG: RequestCollection unhandled response {response.command}:{response.request_id}" # pylint: disable=C0301 + ) + + def _handle_response(self, response): + request = self._requests[response.request_id] + + if request.is_subscription(): + if response.is_bound_for_queue(): + # TODO: decode the data into something usable + request.queue.put_nowait(response.data) + else: + request.future.set_result(response) + else: + self.delete_request(request) + request.future.set_result(response) + + def add_request(self, request): + # TODO: we should maybe check if the request.id_ is unique + self._requests[request.id_] = request + + def delete_request(self, request): + del self._requests[request.id_] + + +class Client: + """This class represents a connection to a libbitcoin server.""" + def __init__(self, log, endpoints, loop): + self.log = log + self._endpoints = endpoints + self._settings = ClientSettings(loop=loop) + self._query_socket = self._create_query_socket() + self._block_socket = self._create_block_socket() + self._request_collection = RequestCollection(self._query_socket, + self._settings._loop) + + async def stop(self): + self.log.debug("zmq Client.stop()") + self._query_socket.close() + self._block_socket.close() + return await self._request_collection.stop() + + def _create_block_socket(self): + socket = self._settings.context.socket( + zmq.SUB, # pylint: disable=E1101 + io_loop=self._settings._loop, # pylint: disable=W0212 + ) + socket.connect(self._endpoints["block"]) + socket.setsockopt_string(zmq.SUBSCRIBE, "") # pylint: disable=E1101 + return socket + + def _create_query_socket(self): + socket = self._settings.context.socket( + zmq.DEALER, # pylint: disable=E1101 + io_loop=self._settings._loop, # pylint: disable=W0212 + ) + socket.connect(self._endpoints["query"]) + return socket + + async def _subscription_request(self, command, data): + request = await self._request(command, data) + request.queue = asyncio.Queue(loop=self._settings._loop) # pylint: disable=W0212 + error_code, _ = await self._wait_for_response(request) + return error_code, request.queue + + async def _simple_request(self, command, data): + return await self._wait_for_response(await + self._request(command, data)) + + async def _request(self, command, data): + """Make a generic request. Both options are byte objects specified + like b'blockchain.fetch_block_header' as an example. + """ + request = Request(self._query_socket, command, data) + await request.send() + self._request_collection.add_request(request) + return request + + async def _wait_for_response(self, request): + try: + response = await asyncio.wait_for(request.future, + self._settings.timeout) + except asyncio.TimeoutError: + self._request_collection.delete_request(request) + return ErrorCode.channel_timeout, None + + assert response.command == request.command + assert response.request_id == request.id_ + return response.error_code, response.data + + async def fetch_last_height(self): + """Fetch the blockchain tip and return integer height""" + command = b"blockchain.fetch_last_height" + error_code, data = await self._simple_request(command, b"") + if error_code: + return error_code, None + return error_code, struct.unpack("<I", data)[0] + + async def fetch_block_header(self, index): + """Fetch a block header by its height or integer index""" + command = b"blockchain.fetch_block_header" + data = pack_block_index(index) + return await self._simple_request(command, data) + + async def fetch_block_transaction_hashes(self, index): + """Fetch transaction hashes in a block at height index""" + command = b"blockchain.fetch_block_transaction_hashes" + data = pack_block_index(index) + error_code, data = await self._simple_request(command, data) + if error_code: + return error_code, None + return error_code, unpack_table("32s", data) + + async def fetch_blockchain_transaction(self, txid): + """Fetch transaction by txid (not including mempool)""" + command = b"blockchain.fetch_transaction2" + error_code, data = await self._simple_request( + command, + bytes.fromhex(txid)[::-1]) + if error_code: + return error_code, None + return error_code, data + + async def fetch_mempool_transaction(self, txid): + """Fetch transaction by txid (including mempool)""" + command = b"transaction_pool.fetch_transaction2" + error_code, data = await self._simple_request( + command, + bytes.fromhex(txid)[::-1]) + if error_code: + return error_code, None + return error_code, data + + async def subscribe_scripthash(self, scripthash): + """Subscribe to scripthash""" + command = b"subscribe.key" + decoded_address = unhexlify(scripthash) + return await self._subscription_request(command, decoded_address) + + async def unsubscribe_scripthash(self, scripthash): + """Unsubscribe scripthash""" + # TODO: This call should ideally also remove the subscription + # request from the RequestCollection. + # This call solicits a final call from the server with an + # `error::service_stopped` error code. + command = b"unsubscribe.key" + decoded_address = unhexlify(scripthash) + return await self._simple_request(command, decoded_address) + + async def fetch_history4(self, scripthash, height=0): + """Fetch history for given scripthash""" + command = b"blockchain.fetch_history4" + decoded_address = unhexlify(scripthash) + error_code, raw_points = await self._simple_request( + command, decoded_address + struct.pack("<I", height)) + if error_code: + return error_code, None + + def make_tuple(row): + kind, height, tx_hash, index, value = row + return ( + kind, + { + "hash": tx_hash, + "index": index + }, + height, + value, + checksum(tx_hash[::-1].hex(), index), + ) + + rows = unpack_table("<BI32sIQ", raw_points) + points = [make_tuple(row) for row in rows] + correlated_points = Client.__correlate(points) + # self.log.debug("history points: %s", points) + # self.log.debug("history correlated: %s", correlated_points) + return error_code, self._sort_correlated_points(correlated_points) + + @staticmethod + def _sort_correlated_points(points): + """Sort by ascending height""" + if len(points) < 2: + return points + return sorted(points, key=lambda x: list(x.values())[0]["height"]) + + async def broadcast_transaction(self, rawtx): + """Broadcast given raw transaction""" + command = b"transaction_pool.broadcast" + return await self._simple_request(command, rawtx) + + async def fetch_balance(self, scripthash): + """Fetch balance for given scripthash""" + error_code, history = await self.fetch_history4(scripthash) + if error_code: + return error_code, None + + utxo = Client.__receives_without_spends(history) + return error_code, functools.reduce( + lambda accumulator, point: accumulator + point["value"], utxo, 0) + + async def fetch_utxo(self, scripthash): + """Find UTXO for given scripthash""" + error_code, history = await self.fetch_history4(scripthash) + if error_code: + return error_code, None + return error_code, Client.__receives_without_spends(history) + + async def subscribe_to_blocks(self, queue): + asyncio.ensure_future(self._listen_for_blocks(queue)) + return queue + + async def _listen_for_blocks(self, queue): + """Infinite loop for block subscription. + Returns raw blocks as they're received. + """ + while True: + frame = await self._block_socket.recv_multipart() + seq = struct.unpack("<H", frame[0])[0] + height = struct.unpack("<I", frame[1])[0] + block_data = frame[2] + queue.put_nowait((seq, height, block_data)) + + @staticmethod + def __receives_without_spends(history): + return (point for point in history if "spent" not in point) + + @staticmethod + def __correlate(points): + transfers, checksum_to_index = Client.__find_receives(points) + transfers = Client.__correlate_spends_to_receives( + points, transfers, checksum_to_index) + return transfers + + @staticmethod + def __correlate_spends_to_receives(points, transfers, checksum_to_index): + for point in points: + if point[0] == 1: # receive + continue + + spent = { + "hash": point[1]["hash"], + "height": point[2], + "index": point[1]["index"], + } + if point[3] not in checksum_to_index: + transfers.append({"spent": spent}) + else: + transfers[checksum_to_index[point[3]]]["spent"] = spent + + return transfers + + @staticmethod + def __find_receives(points): + transfers = [] + checksum_to_index = {} + + for point in points: + if point[0] == 0: # spent + continue + + transfers.append({ + "received": { + "hash": point[1]["hash"], + "height": point[2], + "index": point[1]["index"], + }, + "value": point[3], + }) + + checksum_to_index[point[4]] = len(transfers) - 1 + + return transfers, checksum_to_index diff --git a/run_obelisk b/run_obelisk @@ -0,0 +1,109 @@ +#!/usr/bin/env python3 +# Copyright (C) 2020-2021 Ivan J. <parazyd@dyne.org> +# +# This file is part of obelisk +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License version 3 +# as published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +import asyncio +import sys +from argparse import ArgumentParser +from configparser import RawConfigParser, NoSectionError +from logging import getLogger, FileHandler, Formatter, StreamHandler +from os import devnull + +from obelisk.protocol import ElectrumProtocol, VERSION + +# Used for destructor/cleanup +PROTOCOL = None + + +def logger_config(log, config): + """Setup logging""" + fmt = Formatter("%(asctime)s\t%(levelname)s\t%(message)s") + logstream = StreamHandler() + logstream.setFormatter(fmt) + debuglevel = config.get("obelisk", "log_level", fallback="INFO") + logstream.setLevel(debuglevel) + log.addHandler(logstream) + filename = config.get("obelisk", "log_file", fallback=devnull) + append_log = config.getboolean("obelisk", "append_log", fallback=False) + logfile = FileHandler(filename, mode=("a" if append_log else "w")) + logfile.setFormatter(fmt) + logfile.setLevel(debuglevel) + log.addHandler(logfile) + log.setLevel(debuglevel) + return log, filename + + +async def run_electrum_server(config, chain): + """Server coroutine""" + log = getLogger("obelisk") + host = config.get("obelisk", "host") + port = int(config.get("obelisk", "port")) + + endpoints = {} + endpoints["query"] = config.get("obelisk", "query") + endpoints["heart"] = config.get("obelisk", "heart") + endpoints["block"] = config.get("obelisk", "block") + endpoints["trans"] = config.get("obelisk", "trans") + + server_cfg = {} + server_cfg["server_hostname"] = "localhost" # TODO: <- should be public? + server_cfg["server_port"] = port + + global PROTOCOL + PROTOCOL = ElectrumProtocol(log, chain, endpoints, server_cfg) + + server = await asyncio.start_server(PROTOCOL.recv, host, port) + async with server: + await server.serve_forever() + + +def main(): + """Main orchestration""" + parser = ArgumentParser(description=f"obelisk {VERSION}") + parser.add_argument("config_file", help="Path to config file") + args = parser.parse_args() + + try: + config = RawConfigParser() + config.read(args.config_file) + config.options("obelisk") + except NoSectionError: + print(f"error: Invalid config file {args.config_file}") + return 1 + + log = getLogger("obelisk") + log, logfilename = logger_config(log, config) + log.info(f"Starting obelisk {VERSION}") + log.info(f"Logging to {logfilename}") + + chain = config.get("obelisk", "chain") + if chain not in ("mainnet", "testnet"): + log.error("chain is not 'mainnet' or 'testnet'") + return 1 + + try: + asyncio.run(run_electrum_server(config, chain)) + except KeyboardInterrupt: + print("\r", end="") + log.debug("Caught KeyboardInterrupt, exiting...") + if PROTOCOL: + asyncio.run(PROTOCOL.stop()) + return 0 + + return 1 + + +if __name__ == "__main__": + sys.exit(main())