electrum

Electrum Bitcoin wallet
git clone https://git.parazyd.org/electrum
Log | Files | Refs | Submodules

zeromq.py (18114B)


      1 #!/usr/bin/env python
      2 #
      3 # Electrum - lightweight Bitcoin client
      4 # Copyright (C) 2011 thomasv@gitorious
      5 # Copyright (C) 2021 Ivan J. <parazyd@dyne.org>
      6 # Copyright (C) 2018 Harm Aarts <harmaarts@gmail.com>
      7 #
      8 # Permission is hereby granted, free of charge, to any person
      9 # obtaining a copy of this software and associated documentation files
     10 # (the "Software"), to deal in the Software without restriction,
     11 # including without limitation the rights to use, copy, modify, merge,
     12 # publish, distribute, sublicense, and/or sell copies of the Software,
     13 # and to permit persons to whom the Software is furnished to do so,
     14 # subject to the following conditions:
     15 #
     16 # The above copyright notice and this permission notice shall be
     17 # included in all copies or substantial portions of the Software.
     18 #
     19 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
     20 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
     21 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
     22 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
     23 # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
     24 # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
     25 # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
     26 # SOFTWARE.
     27 import asyncio
     28 import logging
     29 import functools
     30 import hashlib
     31 import struct
     32 from random import randint
     33 from binascii import hexlify, unhexlify
     34 
     35 import zmq
     36 import zmq.asyncio
     37 
     38 from .logging import Logger
     39 from .libbitcoin_errors import make_error_code, ErrorCode
     40 from .util import bh2u
     41 
     42 
     43 from datetime import datetime
     44 def __(msg):
     45     print("***********************")
     46     print("*** DEBUG %s ***: %s" % (datetime.now().strftime("%H:%M:%S"), msg))
     47 
     48 
     49 def create_random_id():
     50     """Generate a random request ID"""
     51     max_uint32 = 4294967295
     52     return randint(0, max_uint32)
     53 
     54 
     55 def checksum(hash_, index):
     56     """This method takes a transaction hash and an index and returns
     57     a checksum.
     58     This checksum is based on 49 bits starting from the 12th byte of the
     59     reversed hash. Combined with the last 15 bits of the 4 byte index.
     60     """
     61     mask = 0xffffffffffff8000
     62     magic_start_position = 12
     63 
     64     hash_bytes = bytes.fromhex(hash_)[::-1]
     65     last_20_bytes = hash_bytes[magic_start_position:]
     66 
     67     assert len(hash_bytes) == 32
     68     assert index < 2**32
     69 
     70     hash_upper_49_bits = to_int(last_20_bytes) & mask
     71     index_lower_15_bits = index & ~mask
     72 
     73     return hash_upper_49_bits | index_lower_15_bits
     74 
     75 
     76 def to_int(some_bytes):
     77     return int.from_bytes(some_bytes, byteorder='little')
     78 
     79 
     80 def pack_block_index(index):
     81     if isinstance(index, str):
     82         index = unhexlify(index)
     83         assert len(index) == 32
     84         return index
     85     elif isinstance(index, int):
     86         return struct.pack('<I', index)
     87     else:
     88         raise ValueError(f"Unknown index type {type(index)} v:{index}, should be int or bytearray")
     89 
     90 
     91 def unpack_table(row_fmt, data):
     92     # get the number of rows
     93     row_size = struct.calcsize(row_fmt)
     94     nrows = len(data) // row_size
     95 
     96     # unpack
     97     rows = []
     98     for idx in range(nrows):
     99         offset = idx * row_size
    100         row = struct.unpack_from(row_fmt, data, offset)
    101         rows.append(row)
    102     return rows
    103 
    104 
    105 class ClientSettings:
    106     """Class implementing client settings"""
    107     def __init__(self, timeout=10, context=None, loop=None):
    108         __("Zeromq ClientSettings: __init__")
    109         self._timeout = timeout
    110         self._context = context
    111         self._loop = loop
    112 
    113     @property
    114     def context(self):
    115         """zmq context property"""
    116         if not self._context:
    117             ctx = zmq.asyncio.Context()
    118             ctx.linger = 500  # in milliseconds
    119             self._context = ctx
    120         return self._context
    121 
    122     @context.setter
    123     def context(self, context):
    124         self._context = context
    125 
    126     @property
    127     def timeout(self):
    128         """Set to None for no timeout"""
    129         return self._timeout
    130 
    131     @timeout.setter
    132     def timeout(self, timeout):
    133         self._timeout = timeout
    134 
    135 
    136 class Request:
    137     """Class implementing a _send_ request.
    138     This is either a simple request/response affair or a subscription.
    139     """
    140     def __init__(self, socket, command, data):
    141         __("Zeromq Request: __init__")
    142         self.id_ = create_random_id()
    143         self.socket = socket
    144         self.command = command
    145         self.data = data
    146         self.future = asyncio.Future()
    147         self.queue = None
    148 
    149     async def send(self):
    150         """Send the zmq request"""
    151         __(f"Zeromq Request: send: {self.command}, {self.data}")
    152         request = [self.command, struct.pack('<I', self.id_), self.data]
    153         await self.socket.send_multipart(request)
    154 
    155     def is_subscription(self):
    156         """If the request is a subscription, then the response to this
    157         request is a notification.
    158         """
    159         return self.queue is not None
    160 
    161     def __str__(self):
    162         return 'Request(command, ID) {}, {:d}'.format(self.command,
    163                                                       self.id_)
    164 
    165 
    166 class InvalidServerResponseException(Exception): pass
    167 
    168 
    169 class Response:
    170     """Class implementing a request response"""
    171     def __init__(self, frame):
    172         __("Zeromq Response: __init__")
    173         if len(frame) != 3:
    174             raise InvalidServerResponseException(
    175                 'Length of the frame was not 3: %d' % len(frame))
    176 
    177         self.command = frame[0]
    178         self.request_id = struct.unpack('<I', frame[1])[0]
    179         error_code = struct.unpack('<I', frame[2][:4])[0]
    180         self.error_code = make_error_code(error_code)
    181         self.data = frame[2][4:]
    182 
    183     def is_bound_for_queue(self):
    184         return len(self.data) > 0
    185 
    186     def __str__(self):
    187         return 'Response(command, request ID, error code, data):' \
    188             + ' %s, %d, %s, %s' \
    189             % (self.command, self.request_id, self.error_code, self.data)
    190 
    191 
    192 class RequestCollection:
    193     """RequestCollection carries a list of Requests and matches incoming
    194     responses to them.
    195     """
    196     def __init__(self, socket, loop):
    197         __("Zeromq RequestCollection: __init__")
    198         self._socket = socket
    199         self._requests = {}
    200         self._task = asyncio.ensure_future(self._run(), loop=loop)
    201 
    202     async def _run(self):
    203         while True:
    204             __("Zeromq RequestCollection: _run loop iteration")
    205             await self._receive()
    206 
    207     async def stop(self):
    208         """Stops listening for incoming responses (or subscription) messages).
    209         Returns the number of _responses_ expected but which are now dropped
    210         on the floor.
    211         """
    212         __("Zeromq RequestCollection: stop")
    213         self._task.cancel()
    214         try:
    215             await self._task
    216         except asyncio.CancelledError:
    217             return len(self._requests)
    218 
    219     async def _receive(self):
    220         __("Zeromq RequestCollection: receive")
    221         frame = await self._socket.recv_multipart()
    222         response = Response(frame)
    223 
    224         if response.request_id in self._requests:
    225             self._handle_response(response)
    226         else:
    227             __("Zeromq RequestCollection: receive: unhandled response %s:%s" % (response.command, response.request_id))
    228 
    229     def _handle_response(self, response):
    230         __("Zeromq RequestCollection: _handle_response")
    231         request = self._requests[response.request_id]
    232 
    233         if request.is_subscription():
    234             if response.is_bound_for_queue():
    235                 # TODO: decode the data into something usable
    236                 request.queue.put_nowait(response.data)
    237             else:
    238                 request.future.set_result(response)
    239         else:
    240             self.delete_request(request)
    241             request.future.set_result(response)
    242 
    243     def add_request(self, request):
    244         __("Zeromq RequestCollection: add_request")
    245         # TODO: we should maybe check if the request.id_ is unique
    246         self._requests[request.id_] = request
    247 
    248     def delete_request(self, request):
    249         __("Zeromq RequestCollection: delete_request")
    250         del self._requests[request.id_]
    251 
    252 
    253 class Client:
    254     """This class represents a connection to a libbitcoin server.
    255     hostname -- the server DNS name to connect to.
    256     ports -- a dictionary containing four keys; query/heartbeat/block/tx
    257     """
    258     # def __init__(self, hostname, ports, settings=ClientSettings()):
    259     def __init__(self, hostname, ports, loop):
    260         __("Zeromq Client: __init__")
    261         self._hostname = hostname
    262         self._ports = ports
    263         # self._settings = settings
    264         self._settings = ClientSettings(loop=loop)
    265         self._query_socket = self._create_query_socket()
    266         self._block_socket = self._create_block_socket()
    267         self._request_collection = RequestCollection(
    268             self._query_socket, self._settings._loop)
    269 
    270     async def stop(self):
    271         __("Zeromq Client: stop")
    272         self._query_socket.close()
    273         self._block_socket.close()
    274         return await self._request_collection.stop()
    275 
    276     def _create_block_socket(self):
    277         __("Zeromq Client: _create_block_socket")
    278         socket = self._settings.context.socket(
    279             zmq.SUB, io_loop=self._settings._loop)  # pylint: disable=E1101
    280         socket.connect(self.__server_url(self._hostname,
    281                                          self._ports['block']))
    282         socket.setsockopt_string(zmq.SUBSCRIBE, '')  # pylint: disable=E1101
    283         return socket
    284 
    285     def _create_query_socket(self):
    286         __("Zeromq Client: _create_query_socket")
    287         socket = self._settings.context.socket(
    288             zmq.DEALER, io_loop=self._settings._loop)  # pylint: disable=E1101
    289         socket.connect(self.__server_url(self._hostname,
    290                                          self._ports['query']))
    291         return socket
    292 
    293     async def _subscription_request(self, command, data):
    294         __("Zeromq Client: _subscription_request")
    295         request = await self._request(command, data)
    296         request.queue = asyncio.Queue(loop=self._settings._loop)
    297         error_code, _ = await self._wait_for_response(request)
    298         return error_code, request.queue
    299 
    300     async def _simple_request(self, command, data):
    301         __("Zeromq Client: _simple_request")
    302         return await self._wait_for_response(
    303             await self._request(command, data))
    304 
    305     async def _request(self, command, data):
    306         """Make a generic request. Both options are byte objects
    307         specified like b'blockchain.fetch_block_header' as an example.
    308         """
    309         __("Zeromq Client: _request")
    310         request = Request(self._query_socket, command, data)
    311         await request.send()
    312         self._request_collection.add_request(request)
    313         return request
    314 
    315     async def _wait_for_response(self, request):
    316         __("Zeromq Client: _wait_for_response")
    317         try:
    318             response = await asyncio.wait_for(request.future,
    319                                               self._settings.timeout)
    320         except asyncio.TimeoutError:
    321             self._request_collection.delete_request(request)
    322             return ErrorCode.channel_timeout, None
    323 
    324         assert response.command == request.command
    325         assert response.request_id == request.id_
    326         return response.error_code, response.data
    327 
    328     @staticmethod
    329     def __server_url(hostname, port):
    330         return 'tcp://' + hostname + ':' + str(port)
    331 
    332     async def last_height(self):
    333         __("Zeromq Client: last_height")
    334         command = b'blockchain.fetch_last_height'
    335         error_code, data = await self._simple_request(command, b'')
    336         if error_code:
    337             return error_code, None
    338         height = struct.unpack('<I', data)[0]
    339         return error_code, height
    340 
    341     async def subscribe_to_blocks(self, queue):
    342         __("Zeromq Client: subscribe_to_blocks")
    343         asyncio.ensure_future(self._listen_for_blocks(queue))
    344         return queue
    345 
    346     async def _listen_for_blocks(self, queue):
    347         __("Zeromq Client: _listen_for_blocks")
    348         _ec, tip = await self.last_height()
    349         _, header = await self.block_header(tip)
    350         queue.put_nowait((0, tip, header))
    351         while True:
    352             __("Zeromq Client: _listen_for_blocks loop iteration")
    353             frame = await self._block_socket.recv_multipart()
    354             seq = struct.unpack('<H', frame[0])[0]
    355             height = struct.unpack('<I', frame[1])[0]
    356             block_data = frame[2]
    357             block_header = block_data[:80]
    358             # block_header = raw[:80]
    359             # version = block_header[:4]
    360             # prev_merkle_root = block_header[4:36]
    361             # merkle_root = block_header[36:68]
    362             # timestamp = block_header[68:72]
    363             # bits = block_header[72:76]
    364             # nonce = blockheader[76:80]
    365             queue.put_nowait((seq, height, block_header))
    366 
    367     async def _subscribe_to_scripthash(self, sh, queue):
    368         __("Zeromq Client: _subscribe_to_scripthash (stub)")
    369         # TODO: libbitcoin here get history and make status (also review this entire func)
    370         # https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-methods.html#blockchain-scripthash-subscribe
    371         # https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-basics.html#status
    372         # https://parazyd.org/git/electrum-obelisk/file/electrumobelisk/protocol.py.html#l57
    373         # queue.put_nowait((something,))
    374         # while True:
    375         #     recv and put in queue
    376 
    377 
    378     async def block_header(self, index):
    379         """Fetches the block header by height or integer index"""
    380         __("Zeromq Client: block_header")
    381         command = b'blockchain.fetch_block_header'
    382         data = pack_block_index(index)
    383         error_code, data = await self._simple_request(command, data)
    384         if error_code:
    385             return error_code, None
    386         return error_code, data
    387 
    388     async def block_transaction_hashes(self, index):
    389         __("Zeromq Client: block_transaction_hashes")
    390         command = b'blockchain.fetch_block_transaction_hashes'
    391         data = pack_block_index(index)
    392         error_code, data = await self._simple_request(command, data)
    393         if error_code:
    394             return error_code, None
    395         data = unpack_table('32s', data)
    396         return error_code, data
    397 
    398     async def transaction(self, hash_):
    399         __("Zeromq Client: transaction")
    400         command = b'blockchain.fetch_transaction2'
    401         error_code, data = await self._simple_request(
    402             command, bytes.fromhex(hash_)[::-1])
    403         if error_code:
    404             return error_code, None
    405         return None, data
    406 
    407     async def mempool_transaction(self, hash_):
    408         __("Zeromq Client: mempool_transaction")
    409         command = b'transaction_pool.fetch_transaction2'
    410         error_code, data = await self._simple_request(
    411             command, bytes.fromhex(hash_)[::-1])
    412         if error_code:
    413             return error_code, None
    414         return None, bh2u(data)
    415 
    416     async def broadcast_transaction(self, rawtx):
    417         __("Zeromq Client: broadcast_transaction")
    418         __(rawtx)
    419         command = b'transaction_pool.broadcast'
    420         return await self._simple_request(command, unhexlify(rawtx))
    421 
    422     async def history4(self, scripthash, height=0):
    423         __("Zeromq Client: history4")
    424         command = b'blockchain.fetch_history4'
    425         decoded_address = unhexlify(scripthash)[::-1]  # TODO: check byte order
    426         error_code, raw_points = await self._simple_request(
    427             command, decoded_address + struct.pack('<I', height))
    428         if error_code:
    429             return error_code, None
    430 
    431         def make_tuple(row):
    432             kind, tx_hash, index, height, value = row
    433             return (
    434                 kind,
    435                 #COutPoint(tx_hash, index),  # TODO: libbitcoin XXX:
    436                 (tx_hash, index),
    437                 height,
    438                 value,
    439                 checksum(tx_hash[::-1].hex(), index),
    440             )
    441 
    442         rows = unpack_table('<B32sIIQ', raw_points)
    443         points = [make_tuple(row) for row in rows]
    444         correlated_points = Client.__correlate(points)
    445         return None, correlated_points
    446 
    447     async def balance(self, scripthash):
    448         __("Zeromq Client: balance")
    449         error, hist = await self.history4(scripthash)
    450         if error:
    451             return error, None
    452         utxo = Client.__receives_without_spends(hist)
    453         return None, functools.reduce(
    454             lambda accumulator, point: accumulator + point['value'], utxo, 0)
    455 
    456     async def unspent(self, scripthash):
    457         __("Zeromq Client: unspent")
    458         error, hist = await self.history4(scripthash)
    459         if error:
    460             return error, None
    461         return None, Client.__receives_without_spends(hist)
    462 
    463     @staticmethod
    464     def __receives_without_spends(hist):
    465         return (point for point in hist if 'spent' not in point)
    466 
    467     @staticmethod
    468     def __correlate(points):
    469         transfers, checksum_to_index = Client.__find_receives(points)
    470         transfers = Client.__correlate_spends_to_receives(
    471             points, transfers, checksum_to_index)
    472         return transfers
    473 
    474     @staticmethod
    475     def __correlate_spends_to_receives(points, transfers, checksum_to_index):
    476         for point in points:
    477             if point[0] == 0: # receive
    478                 continue
    479 
    480             spent = {
    481                 'hash': point[1].hash,
    482                 'height': point[2],
    483                 'index': point[1].n,
    484             }
    485             if point[3] not in checksum_to_index:
    486                 transfers.append({'spent': spent})
    487             else:
    488                 transfers[checksum_to_index[point[3]]]['spent'] = spent
    489 
    490         return transfers
    491 
    492     @staticmethod
    493     def __find_receives(points):
    494         transfers = []
    495         checksum_to_index = {}
    496 
    497         for point in points:
    498             if point[0] == 1:  # spent
    499                 continue
    500 
    501             transfers.append({
    502                 'received': {
    503                     'hash': point[1].hash,
    504                     'height': point[2],
    505                     'index': point[1].n,
    506                 },
    507                 'value': point[3],
    508             })
    509 
    510             checksum_to_index[point[4]] = len(transfers) - 1
    511 
    512         return transfers, checksum_to_index