
Electrum server using libbitcoin as its backend
git clone https://git.parazyd.org/obelisk
Log | Files | Refs | README | LICENSE

protocol.py (31114B)

      1 #!/usr/bin/env python3
      2 # Copyright (C) 2020-2021 Ivan J. <parazyd@dyne.org>
      3 #
      4 # This file is part of obelisk
      5 #
      6 # This program is free software: you can redistribute it and/or modify
      7 # it under the terms of the GNU Affero General Public License version 3
      8 # as published by the Free Software Foundation.
      9 #
     10 # This program is distributed in the hope that it will be useful,
     11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
     13 # GNU Affero General Public License for more details.
     14 #
     15 # You should have received a copy of the GNU Affero General Public License
     16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
     17 """Implementation of the Electrum protocol as found on
     18 https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-methods.html
     19 """
     20 import asyncio
     21 import json
     22 import struct
     23 from binascii import unhexlify
     24 from traceback import print_exc
     26 from obelisk.errors_jsonrpc import JsonRPCError
     27 from obelisk.errors_libbitcoin import ZMQError
     28 from obelisk.mempool_api import get_mempool_fee_estimates
     29 from obelisk.merkle import merkle_branch, merkle_branch_and_root
     30 from obelisk.util import (
     31     bh2u,
     32     block_to_header,
     33     is_boolean,
     34     is_hash256_str,
     35     is_hex_str,
     36     is_non_negative_integer,
     37     safe_hexlify,
     38     sha256,
     39     double_sha256,
     40     hash_to_hex_str,
     41 )
     42 from obelisk.zeromq import Client
     44 VERSION = "0.0"
     45 SERVER_PROTO_MIN = "1.4"
     46 SERVER_PROTO_MAX = "1.4.2"
     47 DONATION_ADDR = "bc1q7an9p5pz6pjwjk4r48zke2yfaevafzpglg26mz"
     49 BANNER = ("""
     50 Welcome to obelisk
     52 "Tools for the people"
     54 obelisk is a server that uses libbitcoin-server as its backend.
     55 Source code can be found at: https://github.com/parazyd/obelisk
     57 Please consider donating: %s
     58 """ % DONATION_ADDR)
     61 class ElectrumProtocol(asyncio.Protocol):  # pylint: disable=R0904,R0902
     62     """Class implementing the Electrum protocol, with async support"""
     64     def __init__(self, log, chain, endpoints, server_cfg):
     65         self.log = log
     66         self.stopped = False
     67         self.endpoints = endpoints
     68         self.server_cfg = server_cfg
     69         self.loop = asyncio.get_event_loop()
     70         self.bx = Client(log, endpoints, self.loop)
     71         self.block_queue = None
     72         self.peers = {}
     74         self.chain = chain
     75         if self.chain == "mainnet":  # pragma: no cover
     76             self.genesis = "000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f"
     77         elif self.chain == "testnet":
     78             self.genesis = "000000000933ea01ad0ee984209779baaec3ced90fa3f408719526f8d77f4943"
     79         else:
     80             raise ValueError(f"Invalid chain '{chain}'")  # pragma: no cover
     82         # Here we map available methods to their respective functions
     83         self.methodmap = {
     84             "blockchain.block.header": self.block_header,
     85             "blockchain.block.headers": self.block_headers,
     86             "blockchain.estimatefee": self.estimatefee,
     87             "blockchain.headers.subscribe": self.headers_subscribe,
     88             "blockchain.relayfee": self.relayfee,
     89             "blockchain.scripthash.get_balance": self.scripthash_get_balance,
     90             "blockchain.scripthash.get_history": self.scripthash_get_history,
     91             "blockchain.scripthash.get_mempool": self.scripthash_get_mempool,
     92             "blockchain.scripthash.listunspent": self.scripthash_listunspent,
     93             "blockchain.scripthash.subscribe": self.scripthash_subscribe,
     94             "blockchain.scripthash.unsubscribe": self.scripthash_unsubscribe,
     95             "blockchain.transaction.broadcast": self.transaction_broadcast,
     96             "blockchain.transaction.get": self.transaction_get,
     97             "blockchain.transaction.get_merkle": self.transaction_get_merkle,
     98             "blockchain.transaction.id_from_pos": self.transaction_id_from_pos,
     99             "mempool.get_fee_histogram": self.get_fee_histogram,
    100             "server_add_peer": self.add_peer,
    101             "server.banner": self.banner,
    102             "server.donation_address": self.donation_address,
    103             "server.features": self.server_features,
    104             "server.peers.subscribe": self.peers_subscribe,
    105             "server.ping": self.ping,
    106             "server.version": self.server_version,
    107         }
    109     async def stop(self):
    110         """Destructor function"""
    111         self.log.debug("ElectrumProtocol.stop()")
    112         self.stopped = True
    113         if self.bx:
    114             for i in self.peers:
    115                 await self._peer_cleanup(i)
    116             await self.bx.stop()
    118     async def _peer_cleanup(self, peer):
    119         """Cleanup tasks and data for peer"""
    120         self.log.debug("Cleaning up data for %s", peer)
    121         for i in self.peers[peer]["tasks"]:
    122             i.cancel()
    123         for i in self.peers[peer]["sh"]:
    124             self.peers[peer]["sh"][i]["task"].cancel()
    126     @staticmethod
    127     def _get_peer(writer):
    128         peer_t = writer._transport.get_extra_info("peername")  # pylint: disable=W0212
    129         return f"{peer_t[0]}:{peer_t[1]}"
    131     async def recv(self, reader, writer):
    132         """Loop ran upon a connection which acts as a JSON-RPC handler"""
    133         recv_buf = bytearray()
    134         self.peers[self._get_peer(writer)] = {"tasks": [], "sh": {}}
    136         while not self.stopped:
    137             data = await reader.read(4096)
    138             if not data or len(data) == 0:
    139                 await self._peer_cleanup(self._get_peer(writer))
    140                 return
    141             recv_buf.extend(data)
    142             lb = recv_buf.find(b"\n")
    143             if lb == -1:
    144                 continue
    145             while lb != -1:
    146                 line = recv_buf[:lb].rstrip()
    147                 recv_buf = recv_buf[lb + 1:]
    148                 lb = recv_buf.find(b"\n")
    149                 try:
    150                     line = line.decode("utf-8")
    151                     query = json.loads(line)
    152                 except (UnicodeDecodeError, json.JSONDecodeError) as err:
    153                     self.log.debug("%s", print_exc)
    154                     self.log.debug("Decode error: %s", repr(err))
    155                     break
    156                 self.log.debug("=> %s", line)
    157                 await self.handle_query(writer, query)
    159     async def _send_notification(self, writer, method, params):
    160         """Send JSON-RPC notification to given writer"""
    161         response = {"jsonrpc": "2.0", "method": method, "params": params}
    162         self.log.debug("<= %s", response)
    163         writer.write(json.dumps(response).encode("utf-8") + b"\n")
    164         await writer.drain()
    166     async def _send_response(self, writer, result, nid):
    167         """Send successful JSON-RPC response to given writer"""
    168         response = {"jsonrpc": "2.0", "result": result, "id": nid}
    169         self.log.debug("<= %s", response)
    170         writer.write(json.dumps(response).encode("utf-8") + b"\n")
    171         await writer.drain()
    173     async def _send_error(self, writer, error):
    174         """Send JSON-RPC error to given writer"""
    175         response = {"jsonrpc": "2.0", "error": error, "id": None}
    176         self.log.debug("<= %s", response)
    177         writer.write(json.dumps(response).encode("utf-8") + b"\n")
    178         await writer.drain()
    180     async def _send_reply(self, writer, resp, query):
    181         """Wrap function for sending replies"""
    182         if "error" in resp:
    183             return await self._send_error(writer, resp["error"])
    184         return await self._send_response(writer, resp["result"], query["id"])
    186     async def handle_query(self, writer, query):  # pylint: disable=R0915,R0912,R0911
    187         """Electrum protocol method handler mapper"""
    188         if "method" not in query or "id" not in query:
    189             return await self._send_reply(writer, JsonRPCError.invalidrequest(),
    190                                           None)
    192         method = query["method"]
    193         func = self.methodmap.get(method)
    194         if not func:
    195             self.log.error("Unhandled method %s, query=%s", method, query)
    196             return await self._send_reply(writer, JsonRPCError.methodnotfound(),
    197                                           query)
    198         resp = await func(writer, query)
    199         return await self._send_reply(writer, resp, query)
    201     async def _merkle_proof_for_headers(self, height, idx):
    202         """Extremely inefficient merkle proof for headers"""
    203         # The following works, but is extremely inefficient.
    204         # The best solution would be to figure something out in
    205         # libbitcoin-server
    206         cp_headers = []
    208         for i in range(0, height + 1):
    209             _ec, data = await self.bx.fetch_block_header(i)
    210             if _ec and _ec != ZMQError.success:
    211                 self.log.error("bx.fetch_block_header: %s", _ec.name)
    212                 return JsonRPCError.internalerror()
    213             cp_headers.append(data)
    215         branch, root = merkle_branch_and_root(
    216             [double_sha256(i) for i in cp_headers], idx)
    218         return {
    219             "branch": [hash_to_hex_str(i) for i in branch],
    220             "header": safe_hexlify(cp_headers[idx]),
    221             "root": hash_to_hex_str(root),
    222         }
    224     async def block_header(self, writer, query):  # pylint: disable=W0613,R0911
    225         """Method: blockchain.block.header
    226         Return the block header at the given height.
    227         """
    228         if "params" not in query or len(query["params"]) < 1:
    229             return JsonRPCError.invalidparams()
    230         index = query["params"][0]
    231         cp_height = query["params"][1] if len(query["params"]) == 2 else 0
    233         if not is_non_negative_integer(index):
    234             return JsonRPCError.invalidparams()
    235         if not is_non_negative_integer(cp_height):
    236             return JsonRPCError.invalidparams()
    237         if cp_height != 0 and not index <= cp_height:
    238             return JsonRPCError.invalidparams()
    240         if cp_height == 0:
    241             _ec, header = await self.bx.fetch_block_header(index)
    242             if _ec and _ec != ZMQError.success:
    243                 self.log.error("bx.fetch_block_header: %s", _ec.name)
    244                 return JsonRPCError.internalerror()
    245             return {"result": safe_hexlify(header)}
    247         res = await self._merkle_proof_for_headers(cp_height, index)
    248         return {"result": res}
    250     async def block_headers(self, writer, query):  # pylint: disable=W0613,R0911
    251         """Method: blockchain.block.headers
    252         Return a concatenated chunk of block headers from the main chain.
    253         """
    254         if "params" not in query or len(query["params"]) < 2:
    255             return JsonRPCError.invalidparams()
    256         # Electrum doesn't allow max_chunk_size to be less than 2016
    257         # gopher://bitreich.org/9/memecache/convenience-store.mkv
    258         max_chunk_size = 2016
    259         start_height = query["params"][0]
    260         count = query["params"][1]
    261         cp_height = query["params"][2] if len(query["params"]) == 3 else 0
    263         if not is_non_negative_integer(start_height):
    264             return JsonRPCError.invalidparams()
    265         if not is_non_negative_integer(count):
    266             return JsonRPCError.invalidparams()
    267         # BUG: spec says <= cp_height
    268         if cp_height != 0 and not start_height + (count - 1) < cp_height:
    269             return JsonRPCError.invalidparams()
    271         count = min(count, max_chunk_size)
    272         headers = bytearray()
    273         for i in range(count):
    274             _ec, data = await self.bx.fetch_block_header(start_height + i)
    275             if _ec and _ec != ZMQError.success:
    276                 self.log.error("bx.fetch_block_header: %s", _ec.name)
    277                 return JsonRPCError.internalerror()
    278             headers.extend(data)
    280         resp = {
    281             "hex": safe_hexlify(headers),
    282             "count": len(headers) // 80,
    283             "max": max_chunk_size,
    284         }
    286         if cp_height > 0:
    287             data = await self._merkle_proof_for_headers(
    288                 cp_height, start_height + (len(headers) // 80) - 1)
    289             resp["branch"] = data["branch"]
    290             resp["root"] = data["root"]
    292         return {"result": resp}
    294     async def estimatefee(self, writer, query):  # pylint: disable=W0613,disable=R0911
    295         """Method: blockchain.estimatefee
    296         Return the estimated transaction fee per kilobyte for a transaction
    297         to be confirmed within a certain number of blocks.
    298         """
    299         # NOTE: This solution is using the mempool.space API.
    300         # Let's try to eventually solve it with some internal logic.
    301         if "params" not in query or len(query["params"]) != 1:
    302             return JsonRPCError.invalidparams()
    304         num_blocks = query["params"][0]
    305         if not is_non_negative_integer(num_blocks):
    306             return JsonRPCError.invalidparams()
    308         if self.chain == "testnet":
    309             return {"result": 0.00001}
    311         fee_dict = get_mempool_fee_estimates()
    312         if not fee_dict:
    313             return {"result": -1}
    315         # Good enough.
    316         if num_blocks < 3:
    317             return {"result": "{:.8f}".format(fee_dict["fastestFee"] / 100000)}
    319         if num_blocks < 6:
    320             return {"result": "{:.8f}".format(fee_dict["halfHourFee"] / 100000)}
    322         if num_blocks < 10:
    323             return {"result": "{:.8f}".format(fee_dict["hourFee"] / 100000)}
    325         return {"result": "{:.8f}".format(fee_dict["minimumFee"] / 100000)}
    327     async def header_notifier(self, writer):
    328         self.block_queue = asyncio.Queue()
    329         await self.bx.subscribe_to_blocks(self.block_queue)
    330         while True:
    331             item = await self.block_queue.get()
    332             if len(item) != 3:
    333                 self.log.debug("error: item from block queue len != 3")
    334                 continue
    336             header = block_to_header(item[2])
    337             params = [{"height": item[1], "hex": safe_hexlify(header)}]
    338             await self._send_notification(writer,
    339                                           "blockchain.headers.subscribe",
    340                                           params)
    342     async def headers_subscribe(self, writer, query):  # pylint: disable=W0613
    343         """Method: blockchain.headers.subscribe
    344         Subscribe to receive block headers when a new block is found.
    345         """
    346         # Tip height and header are returned upon request
    347         _ec, height = await self.bx.fetch_last_height()
    348         if _ec and _ec != ZMQError.success:
    349             self.log.error("bx.fetch_last_height: %s", _ec.name)
    350             return JsonRPCError.internalerror()
    351         _ec, tip_header = await self.bx.fetch_block_header(height)
    352         if _ec and _ec != ZMQError.success:
    353             self.log.error("bx.fetch_block_header: %s", _ec.name)
    354             return JsonRPCError.internalerror()
    356         self.peers[self._get_peer(writer)]["tasks"].append(
    357             asyncio.create_task(self.header_notifier(writer)))
    358         ret = {"height": height, "hex": safe_hexlify(tip_header)}
    359         return {"result": ret}
    361     async def relayfee(self, writer, query):  # pylint: disable=W0613
    362         """Method: blockchain.relayfee
    363         Return the minimum fee a low-priority transaction must pay in order
    364         to be accepted to the daemon’s memory pool.
    365         """
    366         return {"result": 0.00001}
    368     async def scripthash_get_balance(self, writer, query):  # pylint: disable=W0613
    369         """Method: blockchain.scripthash.get_balance
    370         Return the confirmed and unconfirmed balances of a script hash.
    371         """
    372         if "params" not in query or len(query["params"]) != 1:
    373             return JsonRPCError.invalidparams()
    375         if not is_hash256_str(query["params"][0]):
    376             return JsonRPCError.invalidparams()
    378         _ec, data = await self.bx.fetch_balance(query["params"][0])
    379         if _ec and _ec != ZMQError.success:
    380             self.log.error("bx.fetch_balance: %s", _ec.name)
    381             return JsonRPCError.internalerror()
    383         ret = {"confirmed": data[0], "unconfirmed": data[1]}
    384         return {"result": ret}
    386     async def scripthash_get_history(self, writer, query):  # pylint: disable=W0613
    387         """Method: blockchain.scripthash.get_history
    388         Return the confirmed and unconfirmed history of a script hash.
    389         """
    390         if "params" not in query or len(query["params"]) != 1:
    391             return JsonRPCError.invalidparams()
    393         if not is_hash256_str(query["params"][0]):
    394             return JsonRPCError.invalidparams()
    396         _ec, data = await self.bx.fetch_history4(query["params"][0])
    397         if _ec and _ec != ZMQError.success:
    398             self.log.error("bx.fetch_history4: %s", _ec.name)
    399             return JsonRPCError.internalerror()
    401         self.log.debug("hist: %s", data)
    402         ret = []
    403         # TODO: mempool
    404         for i in data:
    405             if "received" in i:
    406                 ret.append({
    407                     "height": i["received"]["height"],
    408                     "tx_hash": hash_to_hex_str(i["received"]["hash"]),
    409                 })
    410             if "spent" in i:
    411                 ret.append({
    412                     "height": i["spent"]["height"],
    413                     "tx_hash": hash_to_hex_str(i["spent"]["hash"]),
    414                 })
    416         return {"result": ret}
    418     async def scripthash_get_mempool(self, writer, query):  # pylint: disable=W0613
    419         """Method: blockchain.scripthash.get_mempool
    420         Return the unconfirmed transactions of a script hash.
    421         """
    422         # TODO: Implement
    423         return JsonRPCError.invalidrequest()
    425     async def scripthash_listunspent(self, writer, query):  # pylint: disable=W0613
    426         """Method: blockchain.scripthash.listunspent
    427         Return an ordered list of UTXOs sent to a script hash.
    428         """
    429         if "params" not in query or len(query["params"]) != 1:
    430             return JsonRPCError.invalidparams()
    432         scripthash = query["params"][0]
    433         if not is_hash256_str(scripthash):
    434             return JsonRPCError.invalidparams()
    436         _ec, utxo = await self.bx.fetch_utxo(scripthash)
    437         if _ec and _ec != ZMQError.success:
    438             self.log.error("bx.fetch_utxo: %s", _ec.name)
    439             return JsonRPCError.internalerror()
    441         ret = []
    442         for i in utxo:
    443             rec = i["received"]
    444             ret.append({
    445                 "tx_pos": rec["index"],
    446                 "value": i["value"],
    447                 "tx_hash": hash_to_hex_str(rec["hash"]),
    448                 "height": rec["height"] if rec["height"] != 4294967295 else 0,
    449             })
    451         return {"result": ret}
    453     async def scripthash_renewer(self, scripthash, queue):
    454         while True:
    455             try:
    456                 self.log.debug("scriphash renewer: %s", scripthash)
    457                 _ec = await self.bx.subscribe_scripthash(scripthash, queue)
    458                 if _ec and _ec != ZMQError.success:
    459                     self.log.error("bx.subscribe_scripthash: %s", _ec.name)
    460                 await asyncio.sleep(60)
    461             except asyncio.CancelledError:
    462                 self.log.debug("subscription cancelled: %s", scripthash)
    463                 break
    465     async def scripthash_notifier(self, writer, scripthash):
    466         # TODO: Mempool
    467         # TODO: This is still flaky and not always notified. Investigate.
    468         self.log.debug("notifier")
    469         method = "blockchain.scripthash.subscribe"
    470         queue = asyncio.Queue()
    471         renew_task = asyncio.create_task(
    472             self.scripthash_renewer(scripthash, queue))
    474         while True:
    475             try:
    476                 item = await queue.get()
    477                 _ec, height, txid = struct.unpack("<HI32s", item)
    479                 self.log.debug("shnotifier: _ec: %d", _ec)
    480                 self.log.debug("shnotifier: height: %d", height)
    481                 self.log.debug("shnotifier: txid: %s", hash_to_hex_str(txid))
    483                 if (_ec == ZMQError.service_stopped.value and height == 0 and
    484                         not self.stopped):
    485                     self.log.debug("subscription expired: %s", scripthash)
    486                     # Subscription expired
    487                     continue
    489                 self.peers[self._get_peer(writer)]["sh"]["status"].append(
    490                     (hash_to_hex_str(txid), height))
    492                 params = [
    493                     scripthash,
    494                     ElectrumProtocol.__scripthash_status_encode(self.peers[
    495                         self._get_peer(writer)]["sh"]["scripthash"]["status"]),
    496                 ]
    497                 await self._send_notification(writer, method, params)
    498             except asyncio.CancelledError:
    499                 break
    500         renew_task.cancel()
    502     async def scripthash_subscribe(self, writer, query):  # pylint: disable=W0613
    503         """Method: blockchain.scripthash.subscribe
    504         Subscribe to a script hash.
    505         """
    506         if "params" not in query or len(query["params"]) != 1:
    507             return JsonRPCError.invalidparams()
    509         scripthash = query["params"][0]
    510         if not is_hash256_str(scripthash):
    511             return JsonRPCError.invalidparams()
    513         _ec, history = await self.bx.fetch_history4(scripthash)
    514         if _ec and _ec != ZMQError.success:
    515             self.log.error("bx.fetch_history4: %s", _ec.name)
    516             return JsonRPCError.internalerror()
    518         # TODO: Check how history4 acts for mempool/unconfirmed
    519         status = ElectrumProtocol.__scripthash_status_from_history(history)
    520         self.peers[self._get_peer(writer)]["sh"][scripthash] = {
    521             "status": status
    522         }
    524         task = asyncio.create_task(self.scripthash_notifier(writer, scripthash))
    525         self.peers[self._get_peer(writer)]["sh"][scripthash]["task"] = task
    527         if len(history) < 1:
    528             return {"result": None}
    529         return {"result": ElectrumProtocol.__scripthash_status_encode(status)}
    531     @staticmethod
    532     def __scripthash_status_from_history(history):
    533         status = []
    534         for i in history:
    535             if "received" in i:
    536                 status.append((
    537                     hash_to_hex_str(i["received"]["hash"]),
    538                     i["received"]["height"],
    539                 ))
    540             if "spent" in i:
    541                 status.append((
    542                     hash_to_hex_str(i["spent"]["hash"]),
    543                     i["spent"]["height"],
    544                 ))
    545         return status
    547     @staticmethod
    548     def __scripthash_status_encode(status):
    549         concat = ""
    550         for txid, height in status:
    551             concat += txid + ":%d:" % height
    552         return bh2u(sha256(concat.encode("ascii")))
    554     async def scripthash_unsubscribe(self, writer, query):  # pylint: disable=W0613
    555         """Method: blockchain.scripthash.unsubscribe
    556         Unsubscribe from a script hash, preventing future notifications
    557         if its status changes.
    558         """
    559         if "params" not in query or len(query["params"]) != 1:
    560             return JsonRPCError.invalidparams()
    562         scripthash = query["params"][0]
    563         if not is_hash256_str(scripthash):
    564             return JsonRPCError.invalidparams()
    566         if scripthash in self.peers[self._get_peer(writer)]["sh"]:
    567             self.peers[self._get_peer(
    568                 writer)]["sh"][scripthash]["task"].cancel()
    569             # await self.bx.unsubscribe_scripthash(scripthash)
    570             del self.peers[self._get_peer(writer)]["sh"][scripthash]
    571             return {"result": True}
    573         return {"result": False}
    575     async def transaction_broadcast(self, writer, query):  # pylint: disable=W0613
    576         """Method: blockchain.transaction.broadcast
    577         Broadcast a transaction to the network.
    578         """
    579         # Note: Not yet implemented in bs v4
    580         if "params" not in query or len(query["params"]) != 1:
    581             return JsonRPCError.invalidparams()
    583         hextx = query["params"][0]
    584         if not is_hex_str(hextx):
    585             return JsonRPCError.invalidparams()
    587         _ec, _ = await self.bx.broadcast_transaction(unhexlify(hextx)[::-1])
    588         if _ec and _ec != ZMQError.success:
    589             self.log.error("bx.broadcast_transaction: %s", _ec.name)
    590             return JsonRPCError.internalerror()
    592         rawtx = unhexlify(hextx)
    593         txid = double_sha256(rawtx)
    594         return {"result": hash_to_hex_str(txid)}
    596     async def transaction_get(self, writer, query):  # pylint: disable=W0613
    597         """Method: blockchain.transaction.get
    598         Return a raw transaction.
    599         """
    600         if "params" not in query or len(query["params"]) < 1:
    601             return JsonRPCError.invalidparams()
    603         tx_hash = query["params"][0]
    604         verbose = query["params"][1] if len(query["params"]) > 1 else False
    606         if not is_hex_str(tx_hash):
    607             return JsonRPCError.invalidparams()
    609         # _ec, rawtx = await self.bx.fetch_blockchain_transaction(tx_hash)
    610         _ec, rawtx = await self.bx.fetch_mempool_transaction(tx_hash)
    611         if _ec and _ec != ZMQError.success and _ec != ZMQError.not_found:
    612             self.log.error("fetch_mempool_transaction: %s", _ec.name)
    613             return JsonRPCError.internalerror()
    615         # Behaviour is undefined in spec
    616         if not rawtx:
    617             return JsonRPCError.internalerror()
    618             # return {"result": None}
    620         if verbose:
    621             # TODO: Help needed
    622             return JsonRPCError.invalidrequest()
    624         return {"result": bh2u(rawtx)}
    626     async def transaction_get_merkle(self, writer, query):  # pylint: disable=W0613
    627         """Method: blockchain.transaction.get_merkle
    628         Return the merkle branch to a confirmed transaction given its
    629         hash and height.
    630         """
    631         if "params" not in query or len(query["params"]) != 2:
    632             return JsonRPCError.invalidparams()
    634         tx_hash = query["params"][0]
    635         height = query["params"][1]
    637         if not is_hash256_str(tx_hash):
    638             return JsonRPCError.invalidparams()
    639         if not is_non_negative_integer(height):
    640             return JsonRPCError.invalidparams()
    642         _ec, hashes = await self.bx.fetch_block_transaction_hashes(height)
    643         if _ec and _ec != ZMQError.success:
    644             self.log.error("bx.fetch_block_transaction_hashes: %s", _ec.name)
    645             return JsonRPCError.internalerror()
    647         # Decouple from tuples
    648         hashes = [i[0] for i in hashes]
    649         tx_pos = hashes.index(unhexlify(tx_hash)[::-1])
    650         branch = merkle_branch(hashes, tx_pos)
    652         res = {
    653             "block_height": int(height),
    654             "pos": int(tx_pos),
    655             "merkle": branch,
    656         }
    657         return {"result": res}
    659     async def transaction_id_from_pos(self, writer, query):  # pylint: disable=R0911,W0613
    660         """Method: blockchain.transaction.id_from_pos
    661         Return a transaction hash and optionally a merkle proof, given a
    662         block height and a position in the block.
    663         """
    664         if "params" not in query or len(query["params"]) < 2:
    665             return JsonRPCError.invalidparams()
    667         height = query["params"][0]
    668         tx_pos = query["params"][1]
    669         merkle = query["params"][2] if len(query["params"]) > 2 else False
    671         if not is_non_negative_integer(height):
    672             return JsonRPCError.invalidparams()
    673         if not is_non_negative_integer(tx_pos):
    674             return JsonRPCError.invalidparams()
    675         if not is_boolean(merkle):
    676             return JsonRPCError.invalidparams()
    678         _ec, hashes = await self.bx.fetch_block_transaction_hashes(height)
    679         if _ec and _ec != ZMQError.success:
    680             self.log.error("bx.fetch_block_transaction_hashes: %s", _ec.name)
    681             return JsonRPCError.internalerror()
    683         if len(hashes) - 1 < tx_pos:
    684             return JsonRPCError.internalerror()
    686         # Decouple from tuples
    687         hashes = [i[0] for i in hashes]
    688         txid = hash_to_hex_str(hashes[tx_pos])
    690         if not merkle:
    691             return {"result": txid}
    693         branch = merkle_branch(hashes, tx_pos)
    694         return {"result": {"tx_hash": txid, "merkle": branch}}
    696     async def get_fee_histogram(self, writer, query):  # pylint: disable=W0613
    697         """Method: mempool.get_fee_histogram
    698         Return a histogram of the fee rates paid by transactions in the
    699         memory pool, weighted by transaction size.
    700         """
    701         # TODO: Help wanted
    702         return {"result": [[0, 0]]}
    704     async def add_peer(self, writer, query):  # pylint: disable=W0613
    705         """Method: server.add_peer
    706         A newly-started server uses this call to get itself into other
    707         servers’ peers lists. It should not be used by wallet clients.
    708         """
    709         # TODO: Help wanted
    710         return {"result": False}
    712     async def banner(self, writer, query):  # pylint: disable=W0613
    713         """Method: server.banner
    714         Return a banner to be shown in the Electrum console.
    715         """
    716         _, bsversion = await self.bx.server_version()
    717         banner = "%s\nobelisk version: %s\nlibbitcoin-server version: %s" % (
    718             BANNER,
    719             VERSION,
    720             bsversion.decode(),
    721         )
    722         return {"result": banner}
    724     async def donation_address(self, writer, query):  # pylint: disable=W0613
    725         """Method: server.donation_address
    726         Return a server donation address.
    727         """
    728         return {"result": DONATION_ADDR}
    730     async def server_features(self, writer, query):  # pylint: disable=W0613 # pragma: no cover
    731         """Method: server.features
    732         Return a list of features and services supported by the server.
    733         """
    734         cfg = self.server_cfg
    735         hosts = {}
    736         for host in cfg["server_hostnames"]:
    737             hosts[host] = {"tcp_port": cfg["server_port"]}
    739         return {
    740             "result": {
    741                 "genesis_hash": self.genesis,
    742                 "hosts": hosts,
    743                 "protocol_max": SERVER_PROTO_MAX,
    744                 "protocol_min": SERVER_PROTO_MIN,
    745                 "pruning": None,
    746                 "server_version": f"obelisk {VERSION}",
    747                 "hash_function": "sha256",
    748             }
    749         }
    751     async def peers_subscribe(self, writer, query):  # pylint: disable=W0613
    752         """Method: server.peers.subscribe
    753         Return a list of peer servers. Despite the name this is not a
    754         subscription and the server must send no notifications.
    755         """
    756         # TODO: Help wanted
    757         return {"result": []}
    759     async def ping(self, writer, query):  # pylint: disable=W0613
    760         """Method: server.ping
    761         Ping the server to ensure it is responding, and to keep the session
    762         alive. The server may disconnect clients that have sent no requests
    763         for roughly 10 minutes.
    764         """
    765         return {"result": None}
    767     async def server_version(self, writer, query):  # pylint: disable=W0613
    768         """Method: server.version
    769         Identify the client to the server and negotiate the protocol version.
    770         """
    771         if "params" not in query or len(query["params"]) != 2:
    772             return JsonRPCError.invalidparams()
    774         client_ver = query["params"][1]
    776         if isinstance(client_ver, list):
    777             client_min, client_max = client_ver[0], client_ver[1]
    778         else:
    779             client_min = client_max = client_ver
    781         version = min(client_max, SERVER_PROTO_MAX)
    783         if version < max(client_min, SERVER_PROTO_MIN):
    784             return JsonRPCError.protonotsupported()
    786         return {"result": [f"obelisk {VERSION}", version]}