obelisk

Electrum server using libbitcoin as its backend
git clone https://git.parazyd.org/obelisk
Log | Files | Refs | README | LICENSE

commit 2cdb3c8bf5102a3348ef317e2f24925e12d25508
parent fafe8ca534194414392c99d80d54ac1dd687861f
Author: parazyd <parazyd@dyne.org>
Date:   Mon, 19 Apr 2021 22:10:21 +0200

Better ZMQ error messages.

Diffstat:
Mobelisk/errors_libbitcoin.py | 4++--
Mobelisk/protocol.py | 75++++++++++++++++++++++++++++++++++++++-------------------------------------
Mobelisk/zeromq.py | 31++++++++++++++++---------------
3 files changed, 56 insertions(+), 54 deletions(-)

diff --git a/obelisk/errors_libbitcoin.py b/obelisk/errors_libbitcoin.py @@ -22,10 +22,10 @@ def make_error_code(ec): """Return ErrorCode from ec""" if not ec: return None - return ErrorCode(ec) # pragma: no cover + return ZMQError(ec) # pragma: no cover -class ErrorCode(Enum): +class ZMQError(Enum): """libbitcoin error codes""" # general codes diff --git a/obelisk/protocol.py b/obelisk/protocol.py @@ -21,9 +21,10 @@ import asyncio import json import struct from binascii import unhexlify +from traceback import print_exc from obelisk.errors_jsonrpc import JsonRPCError -from obelisk.errors_libbitcoin import ErrorCode +from obelisk.errors_libbitcoin import ZMQError from obelisk.merkle import merkle_branch, merkle_branch_and_root from obelisk.util import ( bh2u, @@ -147,7 +148,8 @@ class ElectrumProtocol(asyncio.Protocol): # pylint: disable=R0904,R0902 line = line.decode("utf-8") query = json.loads(line) except (UnicodeDecodeError, json.JSONDecodeError) as err: - self.log.debug("Got error: %s", repr(err)) + self.log.debug("%s", print_exc) + self.log.debug("Decode error: %s", repr(err)) break self.log.debug("=> %s", line) await self.handle_query(writer, query) @@ -203,8 +205,8 @@ class ElectrumProtocol(asyncio.Protocol): # pylint: disable=R0904,R0902 for i in range(0, height + 1): _ec, data = await self.bx.fetch_block_header(i) - if _ec and _ec != 0: - self.log.debug("Got error: %s", repr(_ec)) + if _ec and _ec != ZMQError.success: + self.log.error("bx.fetch_block_header: %s", _ec.name) return JsonRPCError.internalerror() cp_headers.append(data) @@ -235,8 +237,8 @@ class ElectrumProtocol(asyncio.Protocol): # pylint: disable=R0904,R0902 if cp_height == 0: _ec, header = await self.bx.fetch_block_header(index) - if _ec and _ec != 0: - self.log.debug("Got error: %s", repr(_ec)) + if _ec and _ec != ZMQError.success: + self.log.error("bx.fetch_block_header: %s", _ec.name) return JsonRPCError.internalerror() return {"result": safe_hexlify(header)} @@ -268,8 +270,8 @@ class ElectrumProtocol(asyncio.Protocol): # pylint: disable=R0904,R0902 headers = bytearray() for i in range(count): _ec, data = await self.bx.fetch_block_header(start_height + i) - if _ec and _ec != 0: - self.log.debug("Got error: %s", repr(_ec)) + if _ec and _ec != ZMQError.success: + self.log.error("bx.fetch_block_header: %s", _ec.name) return JsonRPCError.internalerror() headers.extend(data) @@ -316,12 +318,12 @@ class ElectrumProtocol(asyncio.Protocol): # pylint: disable=R0904,R0902 """ # Tip height and header are returned upon request _ec, height = await self.bx.fetch_last_height() - if _ec and _ec != 0: - self.log.debug("Got error: %s", repr(_ec)) + if _ec and _ec != ZMQError.success: + self.log.error("bx.fetch_last_height: %s", _ec.name) return JsonRPCError.internalerror() _ec, tip_header = await self.bx.fetch_block_header(height) - if _ec and _ec != 0: - self.log.debug("Got error: %s", repr(_ec)) + if _ec and _ec != ZMQError.success: + self.log.error("bx.fetch_block_header: %s", _ec.name) return JsonRPCError.internalerror() self.peers[self._get_peer(writer)]["tasks"].append( @@ -347,8 +349,8 @@ class ElectrumProtocol(asyncio.Protocol): # pylint: disable=R0904,R0902 return JsonRPCError.invalidparams() _ec, data = await self.bx.fetch_balance(query["params"][0]) - if _ec and _ec != 0: - self.log.debug("Got error: %s", repr(_ec)) + if _ec and _ec != ZMQError.success: + self.log.error("bx.fetch_balance: %s", _ec.name) return JsonRPCError.internalerror() ret = {"confirmed": data[0], "unconfirmed": data[1]} @@ -365,8 +367,8 @@ class ElectrumProtocol(asyncio.Protocol): # pylint: disable=R0904,R0902 return JsonRPCError.invalidparams() _ec, data = await self.bx.fetch_history4(query["params"][0]) - if _ec and _ec != 0: - self.log.debug("Got error: %s", repr(_ec)) + if _ec and _ec != ZMQError.success: + self.log.error("bx.fetch_history4: %s", _ec.name) return JsonRPCError.internalerror() self.log.debug("hist: %s", data) @@ -405,8 +407,8 @@ class ElectrumProtocol(asyncio.Protocol): # pylint: disable=R0904,R0902 return JsonRPCError.invalidparams() _ec, utxo = await self.bx.fetch_utxo(scripthash) - if _ec and _ec != 0: - self.log.debug("Got error: %s", repr(_ec)) + if _ec and _ec != ZMQError.success: + self.log.error("bx.fetch_utxo: %s", _ec.name) return JsonRPCError.internalerror() ret = [] @@ -426,12 +428,11 @@ class ElectrumProtocol(asyncio.Protocol): # pylint: disable=R0904,R0902 try: self.log.debug("scriphash renewer: %s", scripthash) _ec = await self.bx.subscribe_scripthash(scripthash, queue) - if _ec and _ec != 0: - self.log.error("bx.subscribe_scripthash failed: %s", - repr(_ec)) + if _ec and _ec != ZMQError.success: + self.log.error("bx.subscribe_scripthash: %s", _ec.name) await asyncio.sleep(60) except asyncio.CancelledError: - self.log.debug("%s renewer cancelled", scripthash) + self.log.debug("subscription cancelled: %s", scripthash) break async def scripthash_notifier(self, writer, scripthash): @@ -448,7 +449,11 @@ class ElectrumProtocol(asyncio.Protocol): # pylint: disable=R0904,R0902 item = await queue.get() _ec, height, txid = struct.unpack("<HI32s", item) - if (_ec == ErrorCode.service_stopped.value and height == 0 and + self.log.debug("shnotifier: _ec: %d", _ec) + self.log.debug("shnotifier: height: %d", height) + self.log.debug("shnotifier: txid: %s", hash_to_hex_str(txid)) + + if (_ec == ZMQError.service_stopped.value and height == 0 and not self.stopped): self.log.debug("subscription expired: %s", scripthash) # Subscription expired @@ -457,11 +462,6 @@ class ElectrumProtocol(asyncio.Protocol): # pylint: disable=R0904,R0902 self.peers[self._get_peer(writer)]["sh"]["status"].append( (hash_to_hex_str(txid), height)) - self.log.debug("shnotifier: Got _ec: %d", _ec) - self.log.debug("shnotifier: Got height: %d", height) - self.log.debug("shnotifier: Got txid: %s", - hash_to_hex_str(txid)) - params = [ scripthash, ElectrumProtocol.__scripthash_status_encode(self.peers[ @@ -484,7 +484,8 @@ class ElectrumProtocol(asyncio.Protocol): # pylint: disable=R0904,R0902 return JsonRPCError.invalidparams() _ec, history = await self.bx.fetch_history4(scripthash) - if _ec and _ec != 0: + if _ec and _ec != ZMQError.success: + self.log.error("bx.fetch_history4: %s", _ec.name) return JsonRPCError.internalerror() # TODO: Check how history4 acts for mempool/unconfirmed @@ -557,8 +558,8 @@ class ElectrumProtocol(asyncio.Protocol): # pylint: disable=R0904,R0902 return JsonRPCError.invalidparams() _ec, _ = await self.bx.broadcast_transaction(unhexlify(hextx)[::-1]) - if _ec and _ec != 0: - self.log.debug("Got error: %s", repr(_ec)) + if _ec and _ec != ZMQError.success: + self.log.error("bx.broadcast_transaction: %s", _ec.name) return JsonRPCError.internalerror() rawtx = unhexlify(hextx) @@ -580,8 +581,8 @@ class ElectrumProtocol(asyncio.Protocol): # pylint: disable=R0904,R0902 # _ec, rawtx = await self.bx.fetch_blockchain_transaction(tx_hash) _ec, rawtx = await self.bx.fetch_mempool_transaction(tx_hash) - if _ec and _ec != 0 and _ec != ErrorCode.not_found.value: - self.log.debug("Got error: %s", repr(_ec)) + if _ec and _ec != ZMQError.success and _ec != ZMQError.not_found: + self.log.error("fetch_mempool_transaction: %s", _ec.name) return JsonRPCError.internalerror() # Behaviour is undefined in spec @@ -612,8 +613,8 @@ class ElectrumProtocol(asyncio.Protocol): # pylint: disable=R0904,R0902 return JsonRPCError.invalidparams() _ec, hashes = await self.bx.fetch_block_transaction_hashes(height) - if _ec and _ec != 0: - self.log.debug("Got error: %s", repr(_ec)) + if _ec and _ec != ZMQError.success: + self.log.error("bx.fetch_block_transaction_hashes: %s", _ec.name) return JsonRPCError.internalerror() # Decouple from tuples @@ -648,8 +649,8 @@ class ElectrumProtocol(asyncio.Protocol): # pylint: disable=R0904,R0902 return JsonRPCError.invalidparams() _ec, hashes = await self.bx.fetch_block_transaction_hashes(height) - if _ec and _ec != 0: - self.log.debug("Got error: %s", repr(_ec)) + if _ec and _ec != ZMQError.success: + self.log.error("bx.fetch_block_transaction_hashes: %s", _ec.name) return JsonRPCError.internalerror() if len(hashes) - 1 < tx_pos: diff --git a/obelisk/zeromq.py b/obelisk/zeromq.py @@ -24,7 +24,7 @@ from random import randint import zmq import zmq.asyncio -from obelisk.errors_libbitcoin import make_error_code, ErrorCode +from obelisk.errors_libbitcoin import make_error_code, ZMQError from obelisk.util import hash_to_hex_str @@ -74,6 +74,20 @@ def checksum(xhash, index): return hash_upper_49_bits | index_lower_15_bits +def make_tuple(row): + kind, height, tx_hash, index, value = row + return ( + kind, + { + "hash": tx_hash, + "index": index + }, + height, + value, + checksum(hash_to_hex_str(tx_hash), index), + ) + + def unpack_table(row_fmt, data): """Function to unpack table received from libbitcoin""" # Get the number of rows @@ -290,7 +304,7 @@ class Client: self._settings.timeout) except asyncio.TimeoutError: self._request_collection.delete_request(request) - return ErrorCode.channel_timeout, None + return ZMQError.channel_timeout, None assert response.command == request.command assert response.request_id == request.id_ @@ -368,19 +382,6 @@ class Client: if error_code: return error_code, None - def make_tuple(row): - kind, height, tx_hash, index, value = row - return ( - kind, - { - "hash": tx_hash, - "index": index - }, - height, - value, - checksum(hash_to_hex_str(tx_hash), index), - ) - rows = unpack_table("<BI32sIQ", raw_points) points = [make_tuple(row) for row in rows] correlated_points = Client.__correlate(points)