zeromq.py (18114B)
1 #!/usr/bin/env python 2 # 3 # Electrum - lightweight Bitcoin client 4 # Copyright (C) 2011 thomasv@gitorious 5 # Copyright (C) 2021 Ivan J. <parazyd@dyne.org> 6 # Copyright (C) 2018 Harm Aarts <harmaarts@gmail.com> 7 # 8 # Permission is hereby granted, free of charge, to any person 9 # obtaining a copy of this software and associated documentation files 10 # (the "Software"), to deal in the Software without restriction, 11 # including without limitation the rights to use, copy, modify, merge, 12 # publish, distribute, sublicense, and/or sell copies of the Software, 13 # and to permit persons to whom the Software is furnished to do so, 14 # subject to the following conditions: 15 # 16 # The above copyright notice and this permission notice shall be 17 # included in all copies or substantial portions of the Software. 18 # 19 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 20 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 21 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 22 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS 23 # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 24 # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 25 # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 26 # SOFTWARE. 27 import asyncio 28 import logging 29 import functools 30 import hashlib 31 import struct 32 from random import randint 33 from binascii import hexlify, unhexlify 34 35 import zmq 36 import zmq.asyncio 37 38 from .logging import Logger 39 from .libbitcoin_errors import make_error_code, ErrorCode 40 from .util import bh2u 41 42 43 from datetime import datetime 44 def __(msg): 45 print("***********************") 46 print("*** DEBUG %s ***: %s" % (datetime.now().strftime("%H:%M:%S"), msg)) 47 48 49 def create_random_id(): 50 """Generate a random request ID""" 51 max_uint32 = 4294967295 52 return randint(0, max_uint32) 53 54 55 def checksum(hash_, index): 56 """This method takes a transaction hash and an index and returns 57 a checksum. 58 This checksum is based on 49 bits starting from the 12th byte of the 59 reversed hash. Combined with the last 15 bits of the 4 byte index. 60 """ 61 mask = 0xffffffffffff8000 62 magic_start_position = 12 63 64 hash_bytes = bytes.fromhex(hash_)[::-1] 65 last_20_bytes = hash_bytes[magic_start_position:] 66 67 assert len(hash_bytes) == 32 68 assert index < 2**32 69 70 hash_upper_49_bits = to_int(last_20_bytes) & mask 71 index_lower_15_bits = index & ~mask 72 73 return hash_upper_49_bits | index_lower_15_bits 74 75 76 def to_int(some_bytes): 77 return int.from_bytes(some_bytes, byteorder='little') 78 79 80 def pack_block_index(index): 81 if isinstance(index, str): 82 index = unhexlify(index) 83 assert len(index) == 32 84 return index 85 elif isinstance(index, int): 86 return struct.pack('<I', index) 87 else: 88 raise ValueError(f"Unknown index type {type(index)} v:{index}, should be int or bytearray") 89 90 91 def unpack_table(row_fmt, data): 92 # get the number of rows 93 row_size = struct.calcsize(row_fmt) 94 nrows = len(data) // row_size 95 96 # unpack 97 rows = [] 98 for idx in range(nrows): 99 offset = idx * row_size 100 row = struct.unpack_from(row_fmt, data, offset) 101 rows.append(row) 102 return rows 103 104 105 class ClientSettings: 106 """Class implementing client settings""" 107 def __init__(self, timeout=10, context=None, loop=None): 108 __("Zeromq ClientSettings: __init__") 109 self._timeout = timeout 110 self._context = context 111 self._loop = loop 112 113 @property 114 def context(self): 115 """zmq context property""" 116 if not self._context: 117 ctx = zmq.asyncio.Context() 118 ctx.linger = 500 # in milliseconds 119 self._context = ctx 120 return self._context 121 122 @context.setter 123 def context(self, context): 124 self._context = context 125 126 @property 127 def timeout(self): 128 """Set to None for no timeout""" 129 return self._timeout 130 131 @timeout.setter 132 def timeout(self, timeout): 133 self._timeout = timeout 134 135 136 class Request: 137 """Class implementing a _send_ request. 138 This is either a simple request/response affair or a subscription. 139 """ 140 def __init__(self, socket, command, data): 141 __("Zeromq Request: __init__") 142 self.id_ = create_random_id() 143 self.socket = socket 144 self.command = command 145 self.data = data 146 self.future = asyncio.Future() 147 self.queue = None 148 149 async def send(self): 150 """Send the zmq request""" 151 __(f"Zeromq Request: send: {self.command}, {self.data}") 152 request = [self.command, struct.pack('<I', self.id_), self.data] 153 await self.socket.send_multipart(request) 154 155 def is_subscription(self): 156 """If the request is a subscription, then the response to this 157 request is a notification. 158 """ 159 return self.queue is not None 160 161 def __str__(self): 162 return 'Request(command, ID) {}, {:d}'.format(self.command, 163 self.id_) 164 165 166 class InvalidServerResponseException(Exception): pass 167 168 169 class Response: 170 """Class implementing a request response""" 171 def __init__(self, frame): 172 __("Zeromq Response: __init__") 173 if len(frame) != 3: 174 raise InvalidServerResponseException( 175 'Length of the frame was not 3: %d' % len(frame)) 176 177 self.command = frame[0] 178 self.request_id = struct.unpack('<I', frame[1])[0] 179 error_code = struct.unpack('<I', frame[2][:4])[0] 180 self.error_code = make_error_code(error_code) 181 self.data = frame[2][4:] 182 183 def is_bound_for_queue(self): 184 return len(self.data) > 0 185 186 def __str__(self): 187 return 'Response(command, request ID, error code, data):' \ 188 + ' %s, %d, %s, %s' \ 189 % (self.command, self.request_id, self.error_code, self.data) 190 191 192 class RequestCollection: 193 """RequestCollection carries a list of Requests and matches incoming 194 responses to them. 195 """ 196 def __init__(self, socket, loop): 197 __("Zeromq RequestCollection: __init__") 198 self._socket = socket 199 self._requests = {} 200 self._task = asyncio.ensure_future(self._run(), loop=loop) 201 202 async def _run(self): 203 while True: 204 __("Zeromq RequestCollection: _run loop iteration") 205 await self._receive() 206 207 async def stop(self): 208 """Stops listening for incoming responses (or subscription) messages). 209 Returns the number of _responses_ expected but which are now dropped 210 on the floor. 211 """ 212 __("Zeromq RequestCollection: stop") 213 self._task.cancel() 214 try: 215 await self._task 216 except asyncio.CancelledError: 217 return len(self._requests) 218 219 async def _receive(self): 220 __("Zeromq RequestCollection: receive") 221 frame = await self._socket.recv_multipart() 222 response = Response(frame) 223 224 if response.request_id in self._requests: 225 self._handle_response(response) 226 else: 227 __("Zeromq RequestCollection: receive: unhandled response %s:%s" % (response.command, response.request_id)) 228 229 def _handle_response(self, response): 230 __("Zeromq RequestCollection: _handle_response") 231 request = self._requests[response.request_id] 232 233 if request.is_subscription(): 234 if response.is_bound_for_queue(): 235 # TODO: decode the data into something usable 236 request.queue.put_nowait(response.data) 237 else: 238 request.future.set_result(response) 239 else: 240 self.delete_request(request) 241 request.future.set_result(response) 242 243 def add_request(self, request): 244 __("Zeromq RequestCollection: add_request") 245 # TODO: we should maybe check if the request.id_ is unique 246 self._requests[request.id_] = request 247 248 def delete_request(self, request): 249 __("Zeromq RequestCollection: delete_request") 250 del self._requests[request.id_] 251 252 253 class Client: 254 """This class represents a connection to a libbitcoin server. 255 hostname -- the server DNS name to connect to. 256 ports -- a dictionary containing four keys; query/heartbeat/block/tx 257 """ 258 # def __init__(self, hostname, ports, settings=ClientSettings()): 259 def __init__(self, hostname, ports, loop): 260 __("Zeromq Client: __init__") 261 self._hostname = hostname 262 self._ports = ports 263 # self._settings = settings 264 self._settings = ClientSettings(loop=loop) 265 self._query_socket = self._create_query_socket() 266 self._block_socket = self._create_block_socket() 267 self._request_collection = RequestCollection( 268 self._query_socket, self._settings._loop) 269 270 async def stop(self): 271 __("Zeromq Client: stop") 272 self._query_socket.close() 273 self._block_socket.close() 274 return await self._request_collection.stop() 275 276 def _create_block_socket(self): 277 __("Zeromq Client: _create_block_socket") 278 socket = self._settings.context.socket( 279 zmq.SUB, io_loop=self._settings._loop) # pylint: disable=E1101 280 socket.connect(self.__server_url(self._hostname, 281 self._ports['block'])) 282 socket.setsockopt_string(zmq.SUBSCRIBE, '') # pylint: disable=E1101 283 return socket 284 285 def _create_query_socket(self): 286 __("Zeromq Client: _create_query_socket") 287 socket = self._settings.context.socket( 288 zmq.DEALER, io_loop=self._settings._loop) # pylint: disable=E1101 289 socket.connect(self.__server_url(self._hostname, 290 self._ports['query'])) 291 return socket 292 293 async def _subscription_request(self, command, data): 294 __("Zeromq Client: _subscription_request") 295 request = await self._request(command, data) 296 request.queue = asyncio.Queue(loop=self._settings._loop) 297 error_code, _ = await self._wait_for_response(request) 298 return error_code, request.queue 299 300 async def _simple_request(self, command, data): 301 __("Zeromq Client: _simple_request") 302 return await self._wait_for_response( 303 await self._request(command, data)) 304 305 async def _request(self, command, data): 306 """Make a generic request. Both options are byte objects 307 specified like b'blockchain.fetch_block_header' as an example. 308 """ 309 __("Zeromq Client: _request") 310 request = Request(self._query_socket, command, data) 311 await request.send() 312 self._request_collection.add_request(request) 313 return request 314 315 async def _wait_for_response(self, request): 316 __("Zeromq Client: _wait_for_response") 317 try: 318 response = await asyncio.wait_for(request.future, 319 self._settings.timeout) 320 except asyncio.TimeoutError: 321 self._request_collection.delete_request(request) 322 return ErrorCode.channel_timeout, None 323 324 assert response.command == request.command 325 assert response.request_id == request.id_ 326 return response.error_code, response.data 327 328 @staticmethod 329 def __server_url(hostname, port): 330 return 'tcp://' + hostname + ':' + str(port) 331 332 async def last_height(self): 333 __("Zeromq Client: last_height") 334 command = b'blockchain.fetch_last_height' 335 error_code, data = await self._simple_request(command, b'') 336 if error_code: 337 return error_code, None 338 height = struct.unpack('<I', data)[0] 339 return error_code, height 340 341 async def subscribe_to_blocks(self, queue): 342 __("Zeromq Client: subscribe_to_blocks") 343 asyncio.ensure_future(self._listen_for_blocks(queue)) 344 return queue 345 346 async def _listen_for_blocks(self, queue): 347 __("Zeromq Client: _listen_for_blocks") 348 _ec, tip = await self.last_height() 349 _, header = await self.block_header(tip) 350 queue.put_nowait((0, tip, header)) 351 while True: 352 __("Zeromq Client: _listen_for_blocks loop iteration") 353 frame = await self._block_socket.recv_multipart() 354 seq = struct.unpack('<H', frame[0])[0] 355 height = struct.unpack('<I', frame[1])[0] 356 block_data = frame[2] 357 block_header = block_data[:80] 358 # block_header = raw[:80] 359 # version = block_header[:4] 360 # prev_merkle_root = block_header[4:36] 361 # merkle_root = block_header[36:68] 362 # timestamp = block_header[68:72] 363 # bits = block_header[72:76] 364 # nonce = blockheader[76:80] 365 queue.put_nowait((seq, height, block_header)) 366 367 async def _subscribe_to_scripthash(self, sh, queue): 368 __("Zeromq Client: _subscribe_to_scripthash (stub)") 369 # TODO: libbitcoin here get history and make status (also review this entire func) 370 # https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-methods.html#blockchain-scripthash-subscribe 371 # https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-basics.html#status 372 # https://parazyd.org/git/electrum-obelisk/file/electrumobelisk/protocol.py.html#l57 373 # queue.put_nowait((something,)) 374 # while True: 375 # recv and put in queue 376 377 378 async def block_header(self, index): 379 """Fetches the block header by height or integer index""" 380 __("Zeromq Client: block_header") 381 command = b'blockchain.fetch_block_header' 382 data = pack_block_index(index) 383 error_code, data = await self._simple_request(command, data) 384 if error_code: 385 return error_code, None 386 return error_code, data 387 388 async def block_transaction_hashes(self, index): 389 __("Zeromq Client: block_transaction_hashes") 390 command = b'blockchain.fetch_block_transaction_hashes' 391 data = pack_block_index(index) 392 error_code, data = await self._simple_request(command, data) 393 if error_code: 394 return error_code, None 395 data = unpack_table('32s', data) 396 return error_code, data 397 398 async def transaction(self, hash_): 399 __("Zeromq Client: transaction") 400 command = b'blockchain.fetch_transaction2' 401 error_code, data = await self._simple_request( 402 command, bytes.fromhex(hash_)[::-1]) 403 if error_code: 404 return error_code, None 405 return None, data 406 407 async def mempool_transaction(self, hash_): 408 __("Zeromq Client: mempool_transaction") 409 command = b'transaction_pool.fetch_transaction2' 410 error_code, data = await self._simple_request( 411 command, bytes.fromhex(hash_)[::-1]) 412 if error_code: 413 return error_code, None 414 return None, bh2u(data) 415 416 async def broadcast_transaction(self, rawtx): 417 __("Zeromq Client: broadcast_transaction") 418 __(rawtx) 419 command = b'transaction_pool.broadcast' 420 return await self._simple_request(command, unhexlify(rawtx)) 421 422 async def history4(self, scripthash, height=0): 423 __("Zeromq Client: history4") 424 command = b'blockchain.fetch_history4' 425 decoded_address = unhexlify(scripthash)[::-1] # TODO: check byte order 426 error_code, raw_points = await self._simple_request( 427 command, decoded_address + struct.pack('<I', height)) 428 if error_code: 429 return error_code, None 430 431 def make_tuple(row): 432 kind, tx_hash, index, height, value = row 433 return ( 434 kind, 435 #COutPoint(tx_hash, index), # TODO: libbitcoin XXX: 436 (tx_hash, index), 437 height, 438 value, 439 checksum(tx_hash[::-1].hex(), index), 440 ) 441 442 rows = unpack_table('<B32sIIQ', raw_points) 443 points = [make_tuple(row) for row in rows] 444 correlated_points = Client.__correlate(points) 445 return None, correlated_points 446 447 async def balance(self, scripthash): 448 __("Zeromq Client: balance") 449 error, hist = await self.history4(scripthash) 450 if error: 451 return error, None 452 utxo = Client.__receives_without_spends(hist) 453 return None, functools.reduce( 454 lambda accumulator, point: accumulator + point['value'], utxo, 0) 455 456 async def unspent(self, scripthash): 457 __("Zeromq Client: unspent") 458 error, hist = await self.history4(scripthash) 459 if error: 460 return error, None 461 return None, Client.__receives_without_spends(hist) 462 463 @staticmethod 464 def __receives_without_spends(hist): 465 return (point for point in hist if 'spent' not in point) 466 467 @staticmethod 468 def __correlate(points): 469 transfers, checksum_to_index = Client.__find_receives(points) 470 transfers = Client.__correlate_spends_to_receives( 471 points, transfers, checksum_to_index) 472 return transfers 473 474 @staticmethod 475 def __correlate_spends_to_receives(points, transfers, checksum_to_index): 476 for point in points: 477 if point[0] == 0: # receive 478 continue 479 480 spent = { 481 'hash': point[1].hash, 482 'height': point[2], 483 'index': point[1].n, 484 } 485 if point[3] not in checksum_to_index: 486 transfers.append({'spent': spent}) 487 else: 488 transfers[checksum_to_index[point[3]]]['spent'] = spent 489 490 return transfers 491 492 @staticmethod 493 def __find_receives(points): 494 transfers = [] 495 checksum_to_index = {} 496 497 for point in points: 498 if point[0] == 1: # spent 499 continue 500 501 transfers.append({ 502 'received': { 503 'hash': point[1].hash, 504 'height': point[2], 505 'index': point[1].n, 506 }, 507 'value': point[3], 508 }) 509 510 checksum_to_index[point[4]] = len(transfers) - 1 511 512 return transfers, checksum_to_index