obelisk

Electrum server using libbitcoin as its backend
git clone https://git.parazyd.org/obelisk
Log | Files | Refs | README | LICENSE

protocol.py (31114B)


      1 #!/usr/bin/env python3
      2 # Copyright (C) 2020-2021 Ivan J. <parazyd@dyne.org>
      3 #
      4 # This file is part of obelisk
      5 #
      6 # This program is free software: you can redistribute it and/or modify
      7 # it under the terms of the GNU Affero General Public License version 3
      8 # as published by the Free Software Foundation.
      9 #
     10 # This program is distributed in the hope that it will be useful,
     11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
     12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     13 # GNU Affero General Public License for more details.
     14 #
     15 # You should have received a copy of the GNU Affero General Public License
     16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
     17 """Implementation of the Electrum protocol as found on
     18 https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-methods.html
     19 """
     20 import asyncio
     21 import json
     22 import struct
     23 from binascii import unhexlify
     24 from traceback import print_exc
     25 
     26 from obelisk.errors_jsonrpc import JsonRPCError
     27 from obelisk.errors_libbitcoin import ZMQError
     28 from obelisk.mempool_api import get_mempool_fee_estimates
     29 from obelisk.merkle import merkle_branch, merkle_branch_and_root
     30 from obelisk.util import (
     31     bh2u,
     32     block_to_header,
     33     is_boolean,
     34     is_hash256_str,
     35     is_hex_str,
     36     is_non_negative_integer,
     37     safe_hexlify,
     38     sha256,
     39     double_sha256,
     40     hash_to_hex_str,
     41 )
     42 from obelisk.zeromq import Client
     43 
     44 VERSION = "0.0"
     45 SERVER_PROTO_MIN = "1.4"
     46 SERVER_PROTO_MAX = "1.4.2"
     47 DONATION_ADDR = "bc1q7an9p5pz6pjwjk4r48zke2yfaevafzpglg26mz"
     48 
     49 BANNER = ("""
     50 Welcome to obelisk
     51 
     52 "Tools for the people"
     53 
     54 obelisk is a server that uses libbitcoin-server as its backend.
     55 Source code can be found at: https://github.com/parazyd/obelisk
     56 
     57 Please consider donating: %s
     58 """ % DONATION_ADDR)
     59 
     60 
     61 class ElectrumProtocol(asyncio.Protocol):  # pylint: disable=R0904,R0902
     62     """Class implementing the Electrum protocol, with async support"""
     63 
     64     def __init__(self, log, chain, endpoints, server_cfg):
     65         self.log = log
     66         self.stopped = False
     67         self.endpoints = endpoints
     68         self.server_cfg = server_cfg
     69         self.loop = asyncio.get_event_loop()
     70         self.bx = Client(log, endpoints, self.loop)
     71         self.block_queue = None
     72         self.peers = {}
     73 
     74         self.chain = chain
     75         if self.chain == "mainnet":  # pragma: no cover
     76             self.genesis = "000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f"
     77         elif self.chain == "testnet":
     78             self.genesis = "000000000933ea01ad0ee984209779baaec3ced90fa3f408719526f8d77f4943"
     79         else:
     80             raise ValueError(f"Invalid chain '{chain}'")  # pragma: no cover
     81 
     82         # Here we map available methods to their respective functions
     83         self.methodmap = {
     84             "blockchain.block.header": self.block_header,
     85             "blockchain.block.headers": self.block_headers,
     86             "blockchain.estimatefee": self.estimatefee,
     87             "blockchain.headers.subscribe": self.headers_subscribe,
     88             "blockchain.relayfee": self.relayfee,
     89             "blockchain.scripthash.get_balance": self.scripthash_get_balance,
     90             "blockchain.scripthash.get_history": self.scripthash_get_history,
     91             "blockchain.scripthash.get_mempool": self.scripthash_get_mempool,
     92             "blockchain.scripthash.listunspent": self.scripthash_listunspent,
     93             "blockchain.scripthash.subscribe": self.scripthash_subscribe,
     94             "blockchain.scripthash.unsubscribe": self.scripthash_unsubscribe,
     95             "blockchain.transaction.broadcast": self.transaction_broadcast,
     96             "blockchain.transaction.get": self.transaction_get,
     97             "blockchain.transaction.get_merkle": self.transaction_get_merkle,
     98             "blockchain.transaction.id_from_pos": self.transaction_id_from_pos,
     99             "mempool.get_fee_histogram": self.get_fee_histogram,
    100             "server_add_peer": self.add_peer,
    101             "server.banner": self.banner,
    102             "server.donation_address": self.donation_address,
    103             "server.features": self.server_features,
    104             "server.peers.subscribe": self.peers_subscribe,
    105             "server.ping": self.ping,
    106             "server.version": self.server_version,
    107         }
    108 
    109     async def stop(self):
    110         """Destructor function"""
    111         self.log.debug("ElectrumProtocol.stop()")
    112         self.stopped = True
    113         if self.bx:
    114             for i in self.peers:
    115                 await self._peer_cleanup(i)
    116             await self.bx.stop()
    117 
    118     async def _peer_cleanup(self, peer):
    119         """Cleanup tasks and data for peer"""
    120         self.log.debug("Cleaning up data for %s", peer)
    121         for i in self.peers[peer]["tasks"]:
    122             i.cancel()
    123         for i in self.peers[peer]["sh"]:
    124             self.peers[peer]["sh"][i]["task"].cancel()
    125 
    126     @staticmethod
    127     def _get_peer(writer):
    128         peer_t = writer._transport.get_extra_info("peername")  # pylint: disable=W0212
    129         return f"{peer_t[0]}:{peer_t[1]}"
    130 
    131     async def recv(self, reader, writer):
    132         """Loop ran upon a connection which acts as a JSON-RPC handler"""
    133         recv_buf = bytearray()
    134         self.peers[self._get_peer(writer)] = {"tasks": [], "sh": {}}
    135 
    136         while not self.stopped:
    137             data = await reader.read(4096)
    138             if not data or len(data) == 0:
    139                 await self._peer_cleanup(self._get_peer(writer))
    140                 return
    141             recv_buf.extend(data)
    142             lb = recv_buf.find(b"\n")
    143             if lb == -1:
    144                 continue
    145             while lb != -1:
    146                 line = recv_buf[:lb].rstrip()
    147                 recv_buf = recv_buf[lb + 1:]
    148                 lb = recv_buf.find(b"\n")
    149                 try:
    150                     line = line.decode("utf-8")
    151                     query = json.loads(line)
    152                 except (UnicodeDecodeError, json.JSONDecodeError) as err:
    153                     self.log.debug("%s", print_exc)
    154                     self.log.debug("Decode error: %s", repr(err))
    155                     break
    156                 self.log.debug("=> %s", line)
    157                 await self.handle_query(writer, query)
    158 
    159     async def _send_notification(self, writer, method, params):
    160         """Send JSON-RPC notification to given writer"""
    161         response = {"jsonrpc": "2.0", "method": method, "params": params}
    162         self.log.debug("<= %s", response)
    163         writer.write(json.dumps(response).encode("utf-8") + b"\n")
    164         await writer.drain()
    165 
    166     async def _send_response(self, writer, result, nid):
    167         """Send successful JSON-RPC response to given writer"""
    168         response = {"jsonrpc": "2.0", "result": result, "id": nid}
    169         self.log.debug("<= %s", response)
    170         writer.write(json.dumps(response).encode("utf-8") + b"\n")
    171         await writer.drain()
    172 
    173     async def _send_error(self, writer, error):
    174         """Send JSON-RPC error to given writer"""
    175         response = {"jsonrpc": "2.0", "error": error, "id": None}
    176         self.log.debug("<= %s", response)
    177         writer.write(json.dumps(response).encode("utf-8") + b"\n")
    178         await writer.drain()
    179 
    180     async def _send_reply(self, writer, resp, query):
    181         """Wrap function for sending replies"""
    182         if "error" in resp:
    183             return await self._send_error(writer, resp["error"])
    184         return await self._send_response(writer, resp["result"], query["id"])
    185 
    186     async def handle_query(self, writer, query):  # pylint: disable=R0915,R0912,R0911
    187         """Electrum protocol method handler mapper"""
    188         if "method" not in query or "id" not in query:
    189             return await self._send_reply(writer, JsonRPCError.invalidrequest(),
    190                                           None)
    191 
    192         method = query["method"]
    193         func = self.methodmap.get(method)
    194         if not func:
    195             self.log.error("Unhandled method %s, query=%s", method, query)
    196             return await self._send_reply(writer, JsonRPCError.methodnotfound(),
    197                                           query)
    198         resp = await func(writer, query)
    199         return await self._send_reply(writer, resp, query)
    200 
    201     async def _merkle_proof_for_headers(self, height, idx):
    202         """Extremely inefficient merkle proof for headers"""
    203         # The following works, but is extremely inefficient.
    204         # The best solution would be to figure something out in
    205         # libbitcoin-server
    206         cp_headers = []
    207 
    208         for i in range(0, height + 1):
    209             _ec, data = await self.bx.fetch_block_header(i)
    210             if _ec and _ec != ZMQError.success:
    211                 self.log.error("bx.fetch_block_header: %s", _ec.name)
    212                 return JsonRPCError.internalerror()
    213             cp_headers.append(data)
    214 
    215         branch, root = merkle_branch_and_root(
    216             [double_sha256(i) for i in cp_headers], idx)
    217 
    218         return {
    219             "branch": [hash_to_hex_str(i) for i in branch],
    220             "header": safe_hexlify(cp_headers[idx]),
    221             "root": hash_to_hex_str(root),
    222         }
    223 
    224     async def block_header(self, writer, query):  # pylint: disable=W0613,R0911
    225         """Method: blockchain.block.header
    226         Return the block header at the given height.
    227         """
    228         if "params" not in query or len(query["params"]) < 1:
    229             return JsonRPCError.invalidparams()
    230         index = query["params"][0]
    231         cp_height = query["params"][1] if len(query["params"]) == 2 else 0
    232 
    233         if not is_non_negative_integer(index):
    234             return JsonRPCError.invalidparams()
    235         if not is_non_negative_integer(cp_height):
    236             return JsonRPCError.invalidparams()
    237         if cp_height != 0 and not index <= cp_height:
    238             return JsonRPCError.invalidparams()
    239 
    240         if cp_height == 0:
    241             _ec, header = await self.bx.fetch_block_header(index)
    242             if _ec and _ec != ZMQError.success:
    243                 self.log.error("bx.fetch_block_header: %s", _ec.name)
    244                 return JsonRPCError.internalerror()
    245             return {"result": safe_hexlify(header)}
    246 
    247         res = await self._merkle_proof_for_headers(cp_height, index)
    248         return {"result": res}
    249 
    250     async def block_headers(self, writer, query):  # pylint: disable=W0613,R0911
    251         """Method: blockchain.block.headers
    252         Return a concatenated chunk of block headers from the main chain.
    253         """
    254         if "params" not in query or len(query["params"]) < 2:
    255             return JsonRPCError.invalidparams()
    256         # Electrum doesn't allow max_chunk_size to be less than 2016
    257         # gopher://bitreich.org/9/memecache/convenience-store.mkv
    258         max_chunk_size = 2016
    259         start_height = query["params"][0]
    260         count = query["params"][1]
    261         cp_height = query["params"][2] if len(query["params"]) == 3 else 0
    262 
    263         if not is_non_negative_integer(start_height):
    264             return JsonRPCError.invalidparams()
    265         if not is_non_negative_integer(count):
    266             return JsonRPCError.invalidparams()
    267         # BUG: spec says <= cp_height
    268         if cp_height != 0 and not start_height + (count - 1) < cp_height:
    269             return JsonRPCError.invalidparams()
    270 
    271         count = min(count, max_chunk_size)
    272         headers = bytearray()
    273         for i in range(count):
    274             _ec, data = await self.bx.fetch_block_header(start_height + i)
    275             if _ec and _ec != ZMQError.success:
    276                 self.log.error("bx.fetch_block_header: %s", _ec.name)
    277                 return JsonRPCError.internalerror()
    278             headers.extend(data)
    279 
    280         resp = {
    281             "hex": safe_hexlify(headers),
    282             "count": len(headers) // 80,
    283             "max": max_chunk_size,
    284         }
    285 
    286         if cp_height > 0:
    287             data = await self._merkle_proof_for_headers(
    288                 cp_height, start_height + (len(headers) // 80) - 1)
    289             resp["branch"] = data["branch"]
    290             resp["root"] = data["root"]
    291 
    292         return {"result": resp}
    293 
    294     async def estimatefee(self, writer, query):  # pylint: disable=W0613,disable=R0911
    295         """Method: blockchain.estimatefee
    296         Return the estimated transaction fee per kilobyte for a transaction
    297         to be confirmed within a certain number of blocks.
    298         """
    299         # NOTE: This solution is using the mempool.space API.
    300         # Let's try to eventually solve it with some internal logic.
    301         if "params" not in query or len(query["params"]) != 1:
    302             return JsonRPCError.invalidparams()
    303 
    304         num_blocks = query["params"][0]
    305         if not is_non_negative_integer(num_blocks):
    306             return JsonRPCError.invalidparams()
    307 
    308         if self.chain == "testnet":
    309             return {"result": 0.00001}
    310 
    311         fee_dict = get_mempool_fee_estimates()
    312         if not fee_dict:
    313             return {"result": -1}
    314 
    315         # Good enough.
    316         if num_blocks < 3:
    317             return {"result": "{:.8f}".format(fee_dict["fastestFee"] / 100000)}
    318 
    319         if num_blocks < 6:
    320             return {"result": "{:.8f}".format(fee_dict["halfHourFee"] / 100000)}
    321 
    322         if num_blocks < 10:
    323             return {"result": "{:.8f}".format(fee_dict["hourFee"] / 100000)}
    324 
    325         return {"result": "{:.8f}".format(fee_dict["minimumFee"] / 100000)}
    326 
    327     async def header_notifier(self, writer):
    328         self.block_queue = asyncio.Queue()
    329         await self.bx.subscribe_to_blocks(self.block_queue)
    330         while True:
    331             item = await self.block_queue.get()
    332             if len(item) != 3:
    333                 self.log.debug("error: item from block queue len != 3")
    334                 continue
    335 
    336             header = block_to_header(item[2])
    337             params = [{"height": item[1], "hex": safe_hexlify(header)}]
    338             await self._send_notification(writer,
    339                                           "blockchain.headers.subscribe",
    340                                           params)
    341 
    342     async def headers_subscribe(self, writer, query):  # pylint: disable=W0613
    343         """Method: blockchain.headers.subscribe
    344         Subscribe to receive block headers when a new block is found.
    345         """
    346         # Tip height and header are returned upon request
    347         _ec, height = await self.bx.fetch_last_height()
    348         if _ec and _ec != ZMQError.success:
    349             self.log.error("bx.fetch_last_height: %s", _ec.name)
    350             return JsonRPCError.internalerror()
    351         _ec, tip_header = await self.bx.fetch_block_header(height)
    352         if _ec and _ec != ZMQError.success:
    353             self.log.error("bx.fetch_block_header: %s", _ec.name)
    354             return JsonRPCError.internalerror()
    355 
    356         self.peers[self._get_peer(writer)]["tasks"].append(
    357             asyncio.create_task(self.header_notifier(writer)))
    358         ret = {"height": height, "hex": safe_hexlify(tip_header)}
    359         return {"result": ret}
    360 
    361     async def relayfee(self, writer, query):  # pylint: disable=W0613
    362         """Method: blockchain.relayfee
    363         Return the minimum fee a low-priority transaction must pay in order
    364         to be accepted to the daemon’s memory pool.
    365         """
    366         return {"result": 0.00001}
    367 
    368     async def scripthash_get_balance(self, writer, query):  # pylint: disable=W0613
    369         """Method: blockchain.scripthash.get_balance
    370         Return the confirmed and unconfirmed balances of a script hash.
    371         """
    372         if "params" not in query or len(query["params"]) != 1:
    373             return JsonRPCError.invalidparams()
    374 
    375         if not is_hash256_str(query["params"][0]):
    376             return JsonRPCError.invalidparams()
    377 
    378         _ec, data = await self.bx.fetch_balance(query["params"][0])
    379         if _ec and _ec != ZMQError.success:
    380             self.log.error("bx.fetch_balance: %s", _ec.name)
    381             return JsonRPCError.internalerror()
    382 
    383         ret = {"confirmed": data[0], "unconfirmed": data[1]}
    384         return {"result": ret}
    385 
    386     async def scripthash_get_history(self, writer, query):  # pylint: disable=W0613
    387         """Method: blockchain.scripthash.get_history
    388         Return the confirmed and unconfirmed history of a script hash.
    389         """
    390         if "params" not in query or len(query["params"]) != 1:
    391             return JsonRPCError.invalidparams()
    392 
    393         if not is_hash256_str(query["params"][0]):
    394             return JsonRPCError.invalidparams()
    395 
    396         _ec, data = await self.bx.fetch_history4(query["params"][0])
    397         if _ec and _ec != ZMQError.success:
    398             self.log.error("bx.fetch_history4: %s", _ec.name)
    399             return JsonRPCError.internalerror()
    400 
    401         self.log.debug("hist: %s", data)
    402         ret = []
    403         # TODO: mempool
    404         for i in data:
    405             if "received" in i:
    406                 ret.append({
    407                     "height": i["received"]["height"],
    408                     "tx_hash": hash_to_hex_str(i["received"]["hash"]),
    409                 })
    410             if "spent" in i:
    411                 ret.append({
    412                     "height": i["spent"]["height"],
    413                     "tx_hash": hash_to_hex_str(i["spent"]["hash"]),
    414                 })
    415 
    416         return {"result": ret}
    417 
    418     async def scripthash_get_mempool(self, writer, query):  # pylint: disable=W0613
    419         """Method: blockchain.scripthash.get_mempool
    420         Return the unconfirmed transactions of a script hash.
    421         """
    422         # TODO: Implement
    423         return JsonRPCError.invalidrequest()
    424 
    425     async def scripthash_listunspent(self, writer, query):  # pylint: disable=W0613
    426         """Method: blockchain.scripthash.listunspent
    427         Return an ordered list of UTXOs sent to a script hash.
    428         """
    429         if "params" not in query or len(query["params"]) != 1:
    430             return JsonRPCError.invalidparams()
    431 
    432         scripthash = query["params"][0]
    433         if not is_hash256_str(scripthash):
    434             return JsonRPCError.invalidparams()
    435 
    436         _ec, utxo = await self.bx.fetch_utxo(scripthash)
    437         if _ec and _ec != ZMQError.success:
    438             self.log.error("bx.fetch_utxo: %s", _ec.name)
    439             return JsonRPCError.internalerror()
    440 
    441         ret = []
    442         for i in utxo:
    443             rec = i["received"]
    444             ret.append({
    445                 "tx_pos": rec["index"],
    446                 "value": i["value"],
    447                 "tx_hash": hash_to_hex_str(rec["hash"]),
    448                 "height": rec["height"] if rec["height"] != 4294967295 else 0,
    449             })
    450 
    451         return {"result": ret}
    452 
    453     async def scripthash_renewer(self, scripthash, queue):
    454         while True:
    455             try:
    456                 self.log.debug("scriphash renewer: %s", scripthash)
    457                 _ec = await self.bx.subscribe_scripthash(scripthash, queue)
    458                 if _ec and _ec != ZMQError.success:
    459                     self.log.error("bx.subscribe_scripthash: %s", _ec.name)
    460                 await asyncio.sleep(60)
    461             except asyncio.CancelledError:
    462                 self.log.debug("subscription cancelled: %s", scripthash)
    463                 break
    464 
    465     async def scripthash_notifier(self, writer, scripthash):
    466         # TODO: Mempool
    467         # TODO: This is still flaky and not always notified. Investigate.
    468         self.log.debug("notifier")
    469         method = "blockchain.scripthash.subscribe"
    470         queue = asyncio.Queue()
    471         renew_task = asyncio.create_task(
    472             self.scripthash_renewer(scripthash, queue))
    473 
    474         while True:
    475             try:
    476                 item = await queue.get()
    477                 _ec, height, txid = struct.unpack("<HI32s", item)
    478 
    479                 self.log.debug("shnotifier: _ec: %d", _ec)
    480                 self.log.debug("shnotifier: height: %d", height)
    481                 self.log.debug("shnotifier: txid: %s", hash_to_hex_str(txid))
    482 
    483                 if (_ec == ZMQError.service_stopped.value and height == 0 and
    484                         not self.stopped):
    485                     self.log.debug("subscription expired: %s", scripthash)
    486                     # Subscription expired
    487                     continue
    488 
    489                 self.peers[self._get_peer(writer)]["sh"]["status"].append(
    490                     (hash_to_hex_str(txid), height))
    491 
    492                 params = [
    493                     scripthash,
    494                     ElectrumProtocol.__scripthash_status_encode(self.peers[
    495                         self._get_peer(writer)]["sh"]["scripthash"]["status"]),
    496                 ]
    497                 await self._send_notification(writer, method, params)
    498             except asyncio.CancelledError:
    499                 break
    500         renew_task.cancel()
    501 
    502     async def scripthash_subscribe(self, writer, query):  # pylint: disable=W0613
    503         """Method: blockchain.scripthash.subscribe
    504         Subscribe to a script hash.
    505         """
    506         if "params" not in query or len(query["params"]) != 1:
    507             return JsonRPCError.invalidparams()
    508 
    509         scripthash = query["params"][0]
    510         if not is_hash256_str(scripthash):
    511             return JsonRPCError.invalidparams()
    512 
    513         _ec, history = await self.bx.fetch_history4(scripthash)
    514         if _ec and _ec != ZMQError.success:
    515             self.log.error("bx.fetch_history4: %s", _ec.name)
    516             return JsonRPCError.internalerror()
    517 
    518         # TODO: Check how history4 acts for mempool/unconfirmed
    519         status = ElectrumProtocol.__scripthash_status_from_history(history)
    520         self.peers[self._get_peer(writer)]["sh"][scripthash] = {
    521             "status": status
    522         }
    523 
    524         task = asyncio.create_task(self.scripthash_notifier(writer, scripthash))
    525         self.peers[self._get_peer(writer)]["sh"][scripthash]["task"] = task
    526 
    527         if len(history) < 1:
    528             return {"result": None}
    529         return {"result": ElectrumProtocol.__scripthash_status_encode(status)}
    530 
    531     @staticmethod
    532     def __scripthash_status_from_history(history):
    533         status = []
    534         for i in history:
    535             if "received" in i:
    536                 status.append((
    537                     hash_to_hex_str(i["received"]["hash"]),
    538                     i["received"]["height"],
    539                 ))
    540             if "spent" in i:
    541                 status.append((
    542                     hash_to_hex_str(i["spent"]["hash"]),
    543                     i["spent"]["height"],
    544                 ))
    545         return status
    546 
    547     @staticmethod
    548     def __scripthash_status_encode(status):
    549         concat = ""
    550         for txid, height in status:
    551             concat += txid + ":%d:" % height
    552         return bh2u(sha256(concat.encode("ascii")))
    553 
    554     async def scripthash_unsubscribe(self, writer, query):  # pylint: disable=W0613
    555         """Method: blockchain.scripthash.unsubscribe
    556         Unsubscribe from a script hash, preventing future notifications
    557         if its status changes.
    558         """
    559         if "params" not in query or len(query["params"]) != 1:
    560             return JsonRPCError.invalidparams()
    561 
    562         scripthash = query["params"][0]
    563         if not is_hash256_str(scripthash):
    564             return JsonRPCError.invalidparams()
    565 
    566         if scripthash in self.peers[self._get_peer(writer)]["sh"]:
    567             self.peers[self._get_peer(
    568                 writer)]["sh"][scripthash]["task"].cancel()
    569             # await self.bx.unsubscribe_scripthash(scripthash)
    570             del self.peers[self._get_peer(writer)]["sh"][scripthash]
    571             return {"result": True}
    572 
    573         return {"result": False}
    574 
    575     async def transaction_broadcast(self, writer, query):  # pylint: disable=W0613
    576         """Method: blockchain.transaction.broadcast
    577         Broadcast a transaction to the network.
    578         """
    579         # Note: Not yet implemented in bs v4
    580         if "params" not in query or len(query["params"]) != 1:
    581             return JsonRPCError.invalidparams()
    582 
    583         hextx = query["params"][0]
    584         if not is_hex_str(hextx):
    585             return JsonRPCError.invalidparams()
    586 
    587         _ec, _ = await self.bx.broadcast_transaction(unhexlify(hextx)[::-1])
    588         if _ec and _ec != ZMQError.success:
    589             self.log.error("bx.broadcast_transaction: %s", _ec.name)
    590             return JsonRPCError.internalerror()
    591 
    592         rawtx = unhexlify(hextx)
    593         txid = double_sha256(rawtx)
    594         return {"result": hash_to_hex_str(txid)}
    595 
    596     async def transaction_get(self, writer, query):  # pylint: disable=W0613
    597         """Method: blockchain.transaction.get
    598         Return a raw transaction.
    599         """
    600         if "params" not in query or len(query["params"]) < 1:
    601             return JsonRPCError.invalidparams()
    602 
    603         tx_hash = query["params"][0]
    604         verbose = query["params"][1] if len(query["params"]) > 1 else False
    605 
    606         if not is_hex_str(tx_hash):
    607             return JsonRPCError.invalidparams()
    608 
    609         # _ec, rawtx = await self.bx.fetch_blockchain_transaction(tx_hash)
    610         _ec, rawtx = await self.bx.fetch_mempool_transaction(tx_hash)
    611         if _ec and _ec != ZMQError.success and _ec != ZMQError.not_found:
    612             self.log.error("fetch_mempool_transaction: %s", _ec.name)
    613             return JsonRPCError.internalerror()
    614 
    615         # Behaviour is undefined in spec
    616         if not rawtx:
    617             return JsonRPCError.internalerror()
    618             # return {"result": None}
    619 
    620         if verbose:
    621             # TODO: Help needed
    622             return JsonRPCError.invalidrequest()
    623 
    624         return {"result": bh2u(rawtx)}
    625 
    626     async def transaction_get_merkle(self, writer, query):  # pylint: disable=W0613
    627         """Method: blockchain.transaction.get_merkle
    628         Return the merkle branch to a confirmed transaction given its
    629         hash and height.
    630         """
    631         if "params" not in query or len(query["params"]) != 2:
    632             return JsonRPCError.invalidparams()
    633 
    634         tx_hash = query["params"][0]
    635         height = query["params"][1]
    636 
    637         if not is_hash256_str(tx_hash):
    638             return JsonRPCError.invalidparams()
    639         if not is_non_negative_integer(height):
    640             return JsonRPCError.invalidparams()
    641 
    642         _ec, hashes = await self.bx.fetch_block_transaction_hashes(height)
    643         if _ec and _ec != ZMQError.success:
    644             self.log.error("bx.fetch_block_transaction_hashes: %s", _ec.name)
    645             return JsonRPCError.internalerror()
    646 
    647         # Decouple from tuples
    648         hashes = [i[0] for i in hashes]
    649         tx_pos = hashes.index(unhexlify(tx_hash)[::-1])
    650         branch = merkle_branch(hashes, tx_pos)
    651 
    652         res = {
    653             "block_height": int(height),
    654             "pos": int(tx_pos),
    655             "merkle": branch,
    656         }
    657         return {"result": res}
    658 
    659     async def transaction_id_from_pos(self, writer, query):  # pylint: disable=R0911,W0613
    660         """Method: blockchain.transaction.id_from_pos
    661         Return a transaction hash and optionally a merkle proof, given a
    662         block height and a position in the block.
    663         """
    664         if "params" not in query or len(query["params"]) < 2:
    665             return JsonRPCError.invalidparams()
    666 
    667         height = query["params"][0]
    668         tx_pos = query["params"][1]
    669         merkle = query["params"][2] if len(query["params"]) > 2 else False
    670 
    671         if not is_non_negative_integer(height):
    672             return JsonRPCError.invalidparams()
    673         if not is_non_negative_integer(tx_pos):
    674             return JsonRPCError.invalidparams()
    675         if not is_boolean(merkle):
    676             return JsonRPCError.invalidparams()
    677 
    678         _ec, hashes = await self.bx.fetch_block_transaction_hashes(height)
    679         if _ec and _ec != ZMQError.success:
    680             self.log.error("bx.fetch_block_transaction_hashes: %s", _ec.name)
    681             return JsonRPCError.internalerror()
    682 
    683         if len(hashes) - 1 < tx_pos:
    684             return JsonRPCError.internalerror()
    685 
    686         # Decouple from tuples
    687         hashes = [i[0] for i in hashes]
    688         txid = hash_to_hex_str(hashes[tx_pos])
    689 
    690         if not merkle:
    691             return {"result": txid}
    692 
    693         branch = merkle_branch(hashes, tx_pos)
    694         return {"result": {"tx_hash": txid, "merkle": branch}}
    695 
    696     async def get_fee_histogram(self, writer, query):  # pylint: disable=W0613
    697         """Method: mempool.get_fee_histogram
    698         Return a histogram of the fee rates paid by transactions in the
    699         memory pool, weighted by transaction size.
    700         """
    701         # TODO: Help wanted
    702         return {"result": [[0, 0]]}
    703 
    704     async def add_peer(self, writer, query):  # pylint: disable=W0613
    705         """Method: server.add_peer
    706         A newly-started server uses this call to get itself into other
    707         servers’ peers lists. It should not be used by wallet clients.
    708         """
    709         # TODO: Help wanted
    710         return {"result": False}
    711 
    712     async def banner(self, writer, query):  # pylint: disable=W0613
    713         """Method: server.banner
    714         Return a banner to be shown in the Electrum console.
    715         """
    716         _, bsversion = await self.bx.server_version()
    717         banner = "%s\nobelisk version: %s\nlibbitcoin-server version: %s" % (
    718             BANNER,
    719             VERSION,
    720             bsversion.decode(),
    721         )
    722         return {"result": banner}
    723 
    724     async def donation_address(self, writer, query):  # pylint: disable=W0613
    725         """Method: server.donation_address
    726         Return a server donation address.
    727         """
    728         return {"result": DONATION_ADDR}
    729 
    730     async def server_features(self, writer, query):  # pylint: disable=W0613 # pragma: no cover
    731         """Method: server.features
    732         Return a list of features and services supported by the server.
    733         """
    734         cfg = self.server_cfg
    735         hosts = {}
    736         for host in cfg["server_hostnames"]:
    737             hosts[host] = {"tcp_port": cfg["server_port"]}
    738 
    739         return {
    740             "result": {
    741                 "genesis_hash": self.genesis,
    742                 "hosts": hosts,
    743                 "protocol_max": SERVER_PROTO_MAX,
    744                 "protocol_min": SERVER_PROTO_MIN,
    745                 "pruning": None,
    746                 "server_version": f"obelisk {VERSION}",
    747                 "hash_function": "sha256",
    748             }
    749         }
    750 
    751     async def peers_subscribe(self, writer, query):  # pylint: disable=W0613
    752         """Method: server.peers.subscribe
    753         Return a list of peer servers. Despite the name this is not a
    754         subscription and the server must send no notifications.
    755         """
    756         # TODO: Help wanted
    757         return {"result": []}
    758 
    759     async def ping(self, writer, query):  # pylint: disable=W0613
    760         """Method: server.ping
    761         Ping the server to ensure it is responding, and to keep the session
    762         alive. The server may disconnect clients that have sent no requests
    763         for roughly 10 minutes.
    764         """
    765         return {"result": None}
    766 
    767     async def server_version(self, writer, query):  # pylint: disable=W0613
    768         """Method: server.version
    769         Identify the client to the server and negotiate the protocol version.
    770         """
    771         if "params" not in query or len(query["params"]) != 2:
    772             return JsonRPCError.invalidparams()
    773 
    774         client_ver = query["params"][1]
    775 
    776         if isinstance(client_ver, list):
    777             client_min, client_max = client_ver[0], client_ver[1]
    778         else:
    779             client_min = client_max = client_ver
    780 
    781         version = min(client_max, SERVER_PROTO_MAX)
    782 
    783         if version < max(client_min, SERVER_PROTO_MIN):
    784             return JsonRPCError.protonotsupported()
    785 
    786         return {"result": [f"obelisk {VERSION}", version]}