commit 28ec2b6be35f4d2e8dfdfbc33200ffa31478ecae
parent 139fc78e196ec76ebc22ec7b50a8e6b84522062b
Author: parazyd <parazyd@dyne.org>
Date: Fri, 12 Mar 2021 18:20:26 +0100
Initial ZeroMQ Interface implementation.
We can now fully sync the headers, and get notified about new headers.
Testing with:
./run_electrum --testnet -V i --oneserver
The libbitcoin v4 testnet server is hardcoded in the Interface class for now.
grep for "TODO: libbitcoin" for further work.
Diffstat:
7 files changed, 829 insertions(+), 415 deletions(-)
diff --git a/electrum/interface.py b/electrum/interface.py
@@ -2,6 +2,7 @@
#
# Electrum - lightweight Bitcoin client
# Copyright (C) 2011 thomasv@gitorious
+# Copyright (C) 2021 Ivan J. <parazyd@dyne.org>
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation files
@@ -23,42 +24,34 @@
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import os
-import re
-import ssl
import sys
-import traceback
import asyncio
-import socket
-from typing import Tuple, Union, List, TYPE_CHECKING, Optional, Set, NamedTuple, Any, Sequence
+from typing import (Tuple, Union, List, TYPE_CHECKING, Optional, Set,
+ NamedTuple, Any, Sequence)
from collections import defaultdict
-from ipaddress import IPv4Network, IPv6Network, ip_address, IPv6Address, IPv4Address
-import itertools
+from ipaddress import (IPv4Network, IPv6Network, ip_address, IPv6Address,
+ IPv4Address)
+from binascii import hexlify, unhexlify
import logging
-import hashlib
-import functools
-
-import aiorpcx
-from aiorpcx import TaskGroup
-from aiorpcx import RPCSession, Notification, NetAddress, NewlineFramer
-from aiorpcx.curio import timeout_after, TaskTimeout
-from aiorpcx.jsonrpc import JSONRPC, CodeMessageError
-from aiorpcx.rawsocket import RSClient
+
+from aiorpcx import NetAddress
import certifi
-from .util import (ignore_exceptions, log_exceptions, bfh, SilentTaskGroup, MySocksProxy,
- is_integer, is_non_negative_integer, is_hash256_str, is_hex_str,
- is_int_or_float, is_non_negative_int_or_float)
+from .util import (ignore_exceptions, log_exceptions, bfh, SilentTaskGroup,
+ MySocksProxy, is_integer, is_non_negative_integer,
+ is_hash256_str, is_hex_str, is_int_or_float,
+ is_non_negative_int_or_float)
from . import util
-from . import x509
-from . import pem
from . import version
from . import blockchain
from .blockchain import Blockchain, HEADER_SIZE
from . import bitcoin
from . import constants
+from . import zeromq
from .i18n import _
from .logging import Logger
from .transaction import Transaction
+from .merkle import merkle_branch
if TYPE_CHECKING:
from .network import Network
@@ -126,102 +119,11 @@ def assert_dict_contains_field(d: Any, *, field_name: str) -> Any:
raise RequestCorrupted(f'required field {field_name!r} missing from dict')
return d[field_name]
-
def assert_list_or_tuple(val: Any) -> None:
if not isinstance(val, (list, tuple)):
raise RequestCorrupted(f'{val!r} should be a list or tuple')
-class NotificationSession(RPCSession):
-
- def __init__(self, *args, interface: 'Interface', **kwargs):
- super(NotificationSession, self).__init__(*args, **kwargs)
- self.subscriptions = defaultdict(list)
- self.cache = {}
- self.default_timeout = NetworkTimeout.Generic.NORMAL
- self._msg_counter = itertools.count(start=1)
- self.interface = interface
- self.cost_hard_limit = 0 # disable aiorpcx resource limits
-
- async def handle_request(self, request):
- self.maybe_log(f"--> {request}")
- try:
- if isinstance(request, Notification):
- params, result = request.args[:-1], request.args[-1]
- key = self.get_hashable_key_for_rpc_call(request.method, params)
- if key in self.subscriptions:
- self.cache[key] = result
- for queue in self.subscriptions[key]:
- await queue.put(request.args)
- else:
- raise Exception(f'unexpected notification')
- else:
- raise Exception(f'unexpected request. not a notification')
- except Exception as e:
- self.interface.logger.info(f"error handling request {request}. exc: {repr(e)}")
- await self.close()
-
- async def send_request(self, *args, timeout=None, **kwargs):
- # note: semaphores/timeouts/backpressure etc are handled by
- # aiorpcx. the timeout arg here in most cases should not be set
- msg_id = next(self._msg_counter)
- self.maybe_log(f"<-- {args} {kwargs} (id: {msg_id})")
- try:
- # note: RPCSession.send_request raises TaskTimeout in case of a timeout.
- # TaskTimeout is a subclass of CancelledError, which is *suppressed* in TaskGroups
- response = await asyncio.wait_for(
- super().send_request(*args, **kwargs),
- timeout)
- except (TaskTimeout, asyncio.TimeoutError) as e:
- raise RequestTimedOut(f'request timed out: {args} (id: {msg_id})') from e
- except CodeMessageError as e:
- self.maybe_log(f"--> {repr(e)} (id: {msg_id})")
- raise
- else:
- self.maybe_log(f"--> {response} (id: {msg_id})")
- return response
-
- def set_default_timeout(self, timeout):
- self.sent_request_timeout = timeout
- self.max_send_delay = timeout
-
- async def subscribe(self, method: str, params: List, queue: asyncio.Queue):
- # note: until the cache is written for the first time,
- # each 'subscribe' call might make a request on the network.
- key = self.get_hashable_key_for_rpc_call(method, params)
- self.subscriptions[key].append(queue)
- if key in self.cache:
- result = self.cache[key]
- else:
- result = await self.send_request(method, params)
- self.cache[key] = result
- await queue.put(params + [result])
-
- def unsubscribe(self, queue):
- """Unsubscribe a callback to free object references to enable GC."""
- # note: we can't unsubscribe from the server, so we keep receiving
- # subsequent notifications
- for v in self.subscriptions.values():
- if queue in v:
- v.remove(queue)
-
- @classmethod
- def get_hashable_key_for_rpc_call(cls, method, params):
- """Hashable index for subscriptions and cache"""
- return str(method) + repr(params)
-
- def maybe_log(self, msg: str) -> None:
- if not self.interface: return
- if self.interface.debug or self.interface.network.debug:
- self.interface.logger.debug(msg)
-
- def default_framer(self):
- # overridden so that max_size can be customized
- max_size = int(self.interface.network.config.get('network_max_incoming_msg_size',
- MAX_INCOMING_MSG_SIZE))
- return NewlineFramer(max_size=max_size)
-
-
class NetworkException(Exception): pass
@@ -248,15 +150,6 @@ class InvalidOptionCombination(Exception): pass
class ConnectError(NetworkException): pass
-class _RSClient(RSClient):
- async def create_connection(self):
- try:
- return await super().create_connection()
- except OSError as e:
- # note: using "from e" here will set __cause__ of ConnectError
- raise ConnectError(e) from e
-
-
class ServerAddr:
def __init__(self, host: str, port: Union[int, str], *, protocol: str = None):
@@ -284,6 +177,7 @@ class ServerAddr:
host, port, protocol = str(s).rsplit(':', 2)
return ServerAddr(host=host, port=port, protocol=protocol)
+
@classmethod
def from_str_with_inference(cls, s: str) -> Optional['ServerAddr']:
"""Construct ServerAddr from str, guessing missing details.
@@ -328,7 +222,7 @@ class ServerAddr:
and self.protocol == other.protocol)
def __ne__(self, other):
- return not (self == other)
+ return not self == other
def __hash__(self):
return hash((self.host, self.port, self.protocol))
@@ -346,11 +240,18 @@ def _get_cert_path_for_host(*, config: 'SimpleConfig', host: str) -> str:
return os.path.join(config.path, 'certs', filename)
+from datetime import datetime
+def __(msg):
+ print("***********************")
+ print("*** DEBUG %s ***: %s" % (datetime.now().strftime("%H:%M:%S"), msg))
+
+
class Interface(Logger):
LOGGING_SHORTCUT = 'i'
def __init__(self, *, network: 'Network', server: ServerAddr, proxy: Optional[dict]):
+ __("Interface: __init__")
self.ready = asyncio.Future()
self.got_disconnected = asyncio.Event()
self.server = server
@@ -364,6 +265,15 @@ class Interface(Logger):
self.session = None # type: Optional[NotificationSession]
self._ipaddr_bucket = None
+ # TODO: libbitcoin (these are for testnet2.libbitcoin.net)
+ # This should be incorporated with ServerAddr somehow.
+ self.client = None
+ self.bs = 'testnet2.libbitcoin.net'
+ self.bsports = {'query': 29091,
+ 'heartbeat': 29092,
+ 'block': 29093,
+ 'tx': 29094}
+
# Latest block header and corresponding height, as claimed by the server.
# Note that these values are updated before they are verified.
# Especially during initial header sync, verification can take a long time.
@@ -379,6 +289,7 @@ class Interface(Logger):
self.taskgroup = SilentTaskGroup()
async def spawn_task():
+ __("Interface: spawn_task")
task = await self.network.taskgroup.spawn(self.run())
if sys.version_info >= (3, 8):
task.set_name(f"interface::{str(server)}")
@@ -402,125 +313,23 @@ class Interface(Logger):
def __str__(self):
return f"<Interface {self.diagnostic_name()}>"
- async def is_server_ca_signed(self, ca_ssl_context):
- """Given a CA enforcing SSL context, returns True if the connection
- can be established. Returns False if the server has a self-signed
- certificate but otherwise is okay. Any other failures raise.
- """
- try:
- await self.open_session(ca_ssl_context, exit_early=True)
- except ConnectError as e:
- cause = e.__cause__
- if isinstance(cause, ssl.SSLError) and cause.reason == 'CERTIFICATE_VERIFY_FAILED':
- # failures due to self-signed certs are normal
- return False
- raise
- return True
-
- async def _try_saving_ssl_cert_for_first_time(self, ca_ssl_context):
- ca_signed = await self.is_server_ca_signed(ca_ssl_context)
- if ca_signed:
- if self._get_expected_fingerprint():
- raise InvalidOptionCombination("cannot use --serverfingerprint with CA signed servers")
- with open(self.cert_path, 'w') as f:
- # empty file means this is CA signed, not self-signed
- f.write('')
- else:
- await self._save_certificate()
-
- def _is_saved_ssl_cert_available(self):
- if not os.path.exists(self.cert_path):
- return False
- with open(self.cert_path, 'r') as f:
- contents = f.read()
- if contents == '': # CA signed
- if self._get_expected_fingerprint():
- raise InvalidOptionCombination("cannot use --serverfingerprint with CA signed servers")
- return True
- # pinned self-signed cert
- try:
- b = pem.dePem(contents, 'CERTIFICATE')
- except SyntaxError as e:
- self.logger.info(f"error parsing already saved cert: {e}")
- raise ErrorParsingSSLCert(e) from e
- try:
- x = x509.X509(b)
- except Exception as e:
- self.logger.info(f"error parsing already saved cert: {e}")
- raise ErrorParsingSSLCert(e) from e
- try:
- x.check_date()
- except x509.CertificateError as e:
- self.logger.info(f"certificate has expired: {e}")
- os.unlink(self.cert_path) # delete pinned cert only in this case
- return False
- self._verify_certificate_fingerprint(bytearray(b))
- return True
-
- async def _get_ssl_context(self):
- if self.protocol != 's':
- # using plaintext TCP
- return None
-
- # see if we already have cert for this server; or get it for the first time
- ca_sslc = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=ca_path)
- if not self._is_saved_ssl_cert_available():
- try:
- await self._try_saving_ssl_cert_for_first_time(ca_sslc)
- except (OSError, ConnectError, aiorpcx.socks.SOCKSError) as e:
- raise ErrorGettingSSLCertFromServer(e) from e
- # now we have a file saved in our certificate store
- siz = os.stat(self.cert_path).st_size
- if siz == 0:
- # CA signed cert
- sslc = ca_sslc
- else:
- # pinned self-signed cert
- sslc = ssl.create_default_context(ssl.Purpose.SERVER_AUTH, cafile=self.cert_path)
- sslc.check_hostname = 0
- return sslc
-
- def handle_disconnect(func):
- @functools.wraps(func)
- async def wrapper_func(self: 'Interface', *args, **kwargs):
- try:
- return await func(self, *args, **kwargs)
- except GracefulDisconnect as e:
- self.logger.log(e.log_level, f"disconnecting due to {repr(e)}")
- except aiorpcx.jsonrpc.RPCError as e:
- self.logger.warning(f"disconnecting due to {repr(e)}")
- self.logger.debug(f"(disconnect) trace for {repr(e)}", exc_info=True)
- finally:
- self.got_disconnected.set()
- await self.network.connection_down(self)
- # if was not 'ready' yet, schedule waiting coroutines:
- self.ready.cancel()
- return wrapper_func
-
- @ignore_exceptions # do not kill network.taskgroup
+ # @ignore_exceptions # do not kill network.taskgroup
@log_exceptions
- @handle_disconnect
+ # @handle_disconnect
async def run(self):
- try:
- ssl_context = await self._get_ssl_context()
- except (ErrorParsingSSLCert, ErrorGettingSSLCertFromServer) as e:
- self.logger.info(f'disconnecting due to: {repr(e)}')
- return
- try:
- await self.open_session(ssl_context)
- except (asyncio.CancelledError, ConnectError, aiorpcx.socks.SOCKSError) as e:
- # make SSL errors for main interface more visible (to help servers ops debug cert pinning issues)
- if (isinstance(e, ConnectError) and isinstance(e.__cause__, ssl.SSLError)
- and self.is_main_server() and not self.network.auto_connect):
- self.logger.warning(f'Cannot connect to main server due to SSL error '
- f'(maybe cert changed compared to "{self.cert_path}"). Exc: {repr(e)}')
- else:
- self.logger.info(f'disconnecting due to: {repr(e)}')
- return
+ __("Interface: run")
+ self.client = zeromq.Client(self.bs, self.bsports,
+ loop=self.network.asyncio_loop)
+ async with self.taskgroup as group:
+ await group.spawn(self.ping)
+ await group.spawn(self.request_fee_estimates)
+ await group.spawn(self.run_fetch_blocks)
+ await group.spawn(self.monitor_connection)
def _mark_ready(self) -> None:
+ __("Interface: _mark_ready")
if self.ready.cancelled():
- raise GracefulDisconnect('conn establishment was too slow; *ready* future was cancelled')
+ raise GracefulDisconnect('conn establishment was too slow; %s' % '*ready* future was cancelled')
if self.ready.done():
return
@@ -536,61 +345,20 @@ class Interface(Logger):
self.ready.set_result(1)
- async def _save_certificate(self) -> None:
- if not os.path.exists(self.cert_path):
- # we may need to retry this a few times, in case the handshake hasn't completed
- for _ in range(10):
- dercert = await self._fetch_certificate()
- if dercert:
- self.logger.info("succeeded in getting cert")
- self._verify_certificate_fingerprint(dercert)
- with open(self.cert_path, 'w') as f:
- cert = ssl.DER_cert_to_PEM_cert(dercert)
- # workaround android bug
- cert = re.sub("([^\n])-----END CERTIFICATE-----","\\1\n-----END CERTIFICATE-----",cert)
- f.write(cert)
- # even though close flushes we can't fsync when closed.
- # and we must flush before fsyncing, cause flush flushes to OS buffer
- # fsync writes to OS buffer to disk
- f.flush()
- os.fsync(f.fileno())
- break
- await asyncio.sleep(1)
- else:
- raise GracefulDisconnect("could not get certificate after 10 tries")
-
- async def _fetch_certificate(self) -> bytes:
- sslc = ssl.SSLContext()
- async with _RSClient(session_factory=RPCSession,
- host=self.host, port=self.port,
- ssl=sslc, proxy=self.proxy) as session:
- asyncio_transport = session.transport._asyncio_transport # type: asyncio.BaseTransport
- ssl_object = asyncio_transport.get_extra_info("ssl_object") # type: ssl.SSLObject
- return ssl_object.getpeercert(binary_form=True)
-
- def _get_expected_fingerprint(self) -> Optional[str]:
- if self.is_main_server():
- return self.network.config.get("serverfingerprint")
-
- def _verify_certificate_fingerprint(self, certificate):
- expected_fingerprint = self._get_expected_fingerprint()
- if not expected_fingerprint:
- return
- fingerprint = hashlib.sha256(certificate).hexdigest()
- fingerprints_match = fingerprint.lower() == expected_fingerprint.lower()
- if not fingerprints_match:
- util.trigger_callback('cert_mismatch')
- raise ErrorSSLCertFingerprintMismatch('Refusing to connect to server due to cert fingerprint mismatch')
- self.logger.info("cert fingerprint verification passed")
-
async def get_block_header(self, height, assert_mode):
+ __(f"Interface: get_block_header: {height}")
self.logger.info(f'requesting block header {height} in mode {assert_mode}')
# use lower timeout as we usually have network.bhi_lock here
timeout = self.network.get_network_timeout_seconds(NetworkTimeout.Urgent)
- res = await self.session.send_request('blockchain.block.header', [height], timeout=timeout)
- return blockchain.deserialize_header(bytes.fromhex(res), height)
+ # ORIG: res = await self.session.send_request('blockchain.block.header', [height], timeout=timeout)
+ _ec, res = await self.client.block_header(height)
+ if _ec is not None and _ec != 0:
+ raise RequestCorrupted(f'got error {_ec}')
+ #return blockchain.deserialize_header(bytes.fromhex(res), height)
+ return blockchain.deserialize_header(res, height)
async def request_chunk(self, height: int, tip=None, *, can_return_early=False):
+ __("Interface: request_chunk")
if not is_non_negative_integer(height):
raise Exception(f"{repr(height)} is not a block height")
index = height // 2016
@@ -603,9 +371,23 @@ class Interface(Logger):
size = max(size, 0)
try:
self._requested_chunks.add(index)
- res = await self.session.send_request('blockchain.block.headers', [index * 2016, size])
+ #ORIG: res = await self.session.send_request('blockchain.block.headers', [index * 2016, size])
+ concat = bytearray()
+ for i in range(size):
+ _ec, data = await self.client.block_header(index*2016+i)
+ if _ec is not None and _ec != 0:
+ # TODO: Don't imply error means we reached tip
+ break
+ concat.extend(data)
finally:
self._requested_chunks.discard(index)
+ # TODO: max in case of libbitcoin is unnecessary
+ res = {
+ 'hex': str(hexlify(concat), 'utf-8'),
+ 'count': len(concat)//80,
+ 'max': 2016,
+ }
+ # TODO: cleanup
assert_dict_contains_field(res, field_name='count')
assert_dict_contains_field(res, field_name='hex')
assert_dict_contains_field(res, field_name='max')
@@ -625,58 +407,30 @@ class Interface(Logger):
return conn, res['count']
def is_main_server(self) -> bool:
+ # __("Interface: is_main_server")
return (self.network.interface == self or
self.network.interface is None and self.network.default_server == self.server)
- async def open_session(self, sslc, exit_early=False):
- session_factory = lambda *args, iface=self, **kwargs: NotificationSession(*args, **kwargs, interface=iface)
- async with _RSClient(session_factory=session_factory,
- host=self.host, port=self.port,
- ssl=sslc, proxy=self.proxy) as session:
- self.session = session # type: NotificationSession
- self.session.set_default_timeout(self.network.get_network_timeout_seconds(NetworkTimeout.Generic))
- try:
- ver = await session.send_request('server.version', [self.client_name(), version.PROTOCOL_VERSION])
- except aiorpcx.jsonrpc.RPCError as e:
- raise GracefulDisconnect(e) # probably 'unsupported protocol version'
- if exit_early:
- return
- if ver[1] != version.PROTOCOL_VERSION:
- raise GracefulDisconnect(f'server violated protocol-version-negotiation. '
- f'we asked for {version.PROTOCOL_VERSION!r}, they sent {ver[1]!r}')
- if not self.network.check_interface_against_healthy_spread_of_connected_servers(self):
- raise GracefulDisconnect(f'too many connected servers already '
- f'in bucket {self.bucket_based_on_ipaddress()}')
- self.logger.info(f"connection established. version: {ver}")
-
- try:
- async with self.taskgroup as group:
- await group.spawn(self.ping)
- await group.spawn(self.request_fee_estimates)
- await group.spawn(self.run_fetch_blocks)
- await group.spawn(self.monitor_connection)
- except aiorpcx.jsonrpc.RPCError as e:
- if e.code in (JSONRPC.EXCESSIVE_RESOURCE_USAGE,
- JSONRPC.SERVER_BUSY,
- JSONRPC.METHOD_NOT_FOUND):
- raise GracefulDisconnect(e, log_level=logging.WARNING) from e
- raise
-
async def monitor_connection(self):
+ __("Interface: monitor_connection")
while True:
await asyncio.sleep(1)
- if not self.session or self.session.is_closing():
+ if not self.client:
+ # TODO: libbitcoin ^ Implement is_closing() in zeromq.Client and check ^
raise GracefulDisconnect('session was closed')
async def ping(self):
+ __("Interface: ping")
while True:
await asyncio.sleep(300)
- await self.session.send_request('server.ping')
+ __("Interface: ping loop iteration")
+ # TODO: libbitcoin bs heartbeat service here?
async def request_fee_estimates(self):
+ __("Interface: request_fee_estimates")
from .simple_config import FEE_ETA_TARGETS
while True:
- async with TaskGroup() as group:
+ async with SilentTaskGroup() as group:
fee_tasks = []
for i in FEE_ETA_TARGETS:
fee_tasks.append((i, await group.spawn(self.get_estimatefee(i))))
@@ -688,26 +442,24 @@ class Interface(Logger):
await asyncio.sleep(60)
async def close(self, *, force_after: int = None):
- """Closes the connection and waits for it to be closed.
- We try to flush buffered data to the wire, so this can take some time.
- """
- if force_after is None:
- # We give up after a while and just abort the connection.
- # Note: specifically if the server is running Fulcrum, waiting seems hopeless,
- # the connection must be aborted (see https://github.com/cculianu/Fulcrum/issues/76)
- force_after = 1 # seconds
+ __("Interface: close")
+ # TODO: libbitcoin
if self.session:
- await self.session.close(force_after=force_after)
- # monitor_connection will cancel tasks
+ await self.session.stop()
+ if self.client:
+ await self.client.stop()
async def run_fetch_blocks(self):
+ __("Interface: run_fetch_blocks")
header_queue = asyncio.Queue()
- await self.session.subscribe('blockchain.headers.subscribe', [], header_queue)
+ # ORIG: await self.session.subscribe('blockchain.headers.subscribe', [], header_queue)
+ await self.client.subscribe_to_blocks(header_queue)
while True:
item = await header_queue.get()
- raw_header = item[0]
- height = raw_header['height']
- header = blockchain.deserialize_header(bfh(raw_header['hex']), height)
+ # TODO: block to header
+ header = item[2]
+ height = item[1]
+ header = blockchain.deserialize_header(header, height)
self.tip_header = header
self.tip = height
if self.tip < constants.net.max_checkpoint():
@@ -721,6 +473,7 @@ class Interface(Logger):
await self.network.switch_lagging_interface()
async def _process_header_at_tip(self):
+ __("Interface: _process_header_at_tip")
height, header = self.tip, self.tip_header
async with self.network.bhi_lock:
if self.blockchain.height() >= height and self.blockchain.check_header(header):
@@ -733,6 +486,7 @@ class Interface(Logger):
await self.sync_until(height)
async def sync_until(self, height, next_height=None):
+ __("Interface: sync_until")
if next_height is None:
next_height = self.tip
last = None
@@ -755,6 +509,7 @@ class Interface(Logger):
return last, height
async def step(self, height, header=None):
+ __("Interface: step")
assert 0 <= height <= self.tip, (height, self.tip)
if header is None:
header = await self.get_block_header(height, 'catchup')
@@ -787,6 +542,7 @@ class Interface(Logger):
return await self._resolve_potential_chain_fork_given_forkpoint(good, bad, bad_header)
async def _search_headers_binary(self, height, bad, bad_header, chain):
+ __("Interface: _search_headers_binary")
assert bad == bad_header['block_height']
_assert_header_does_not_check_against_any_chain(bad_header)
@@ -817,6 +573,7 @@ class Interface(Logger):
return good, bad, bad_header
async def _resolve_potential_chain_fork_given_forkpoint(self, good, bad, bad_header):
+ __("Interface: _resolve_potential_chain_fork_given_forkpoint")
assert good + 1 == bad
assert bad == bad_header['block_height']
_assert_header_does_not_check_against_any_chain(bad_header)
@@ -840,6 +597,7 @@ class Interface(Logger):
return 'fork', height
async def _search_headers_backwards(self, height, header):
+ __("Interface: _search_headers_backwards")
async def iterate():
nonlocal height, header
checkp = False
@@ -871,24 +629,31 @@ class Interface(Logger):
@classmethod
def client_name(cls) -> str:
+ __("Interface: client_name")
return f'electrum/{version.ELECTRUM_VERSION}'
def is_tor(self):
+ __("Interface: is_tor")
return self.host.endswith('.onion')
def ip_addr(self) -> Optional[str]:
- session = self.session
- if not session: return None
- peer_addr = session.remote_address()
- if not peer_addr: return None
- return str(peer_addr.host)
+ __("Interface: ip_addr")
+ return None
+ # TODO: libbitcoin
+ # This seems always None upstream since remote_address does not exist?
+ # session = self.session
+ # if not session: return None
+ # peer_addr = session.remote_address()
+ # if not peer_addr: return None
+ # return str(peer_addr.host)
def bucket_based_on_ipaddress(self) -> str:
+ __("Interface: bucket_based_on_ipaddress")
def do_bucket():
if self.is_tor():
return BUCKET_NAME_OF_ONION_SERVERS
try:
- ip_addr = ip_address(self.ip_addr()) # type: Union[IPv4Address, IPv6Address]
+ ip_addr = ip_address(self.ip_addr()) # type: Union[IPv5Address, IPv6Address]
except ValueError:
return ''
if not ip_addr:
@@ -908,13 +673,20 @@ class Interface(Logger):
return self._ipaddr_bucket
async def get_merkle_for_transaction(self, tx_hash: str, tx_height: int) -> dict:
+ __("Interface: get_merkle_for_transaction")
if not is_hash256_str(tx_hash):
raise Exception(f"{repr(tx_hash)} is not a txid")
if not is_non_negative_integer(tx_height):
raise Exception(f"{repr(tx_height)} is not a block height")
# do request
- res = await self.session.send_request('blockchain.transaction.get_merkle', [tx_hash, tx_height])
- # check response
+ # ORIG: res = await self.session.send_request('blockchain.transaction.get_merkle', [tx_hash, tx_height])
+ # TODO: Rework to use txid rather than height with libbitcoin?
+ _ec, hashes = await self.client.block_transaction_hashes(tx_height)
+ if _ec is not None and _ec != 0:
+ raise RequestCorrupted(f'got error {_ec}')
+ tx_pos = hashes.index(unhexlify(tx_hash)[::-1])
+ branch = merkle_branch(hashes, tx_pos)
+ res = {'block_height': tx_height, 'merkle': branch, 'pos': tx_pos}
block_height = assert_dict_contains_field(res, field_name='block_height')
merkle = assert_dict_contains_field(res, field_name='merkle')
pos = assert_dict_contains_field(res, field_name='pos')
@@ -927,9 +699,12 @@ class Interface(Logger):
return res
async def get_transaction(self, tx_hash: str, *, timeout=None) -> str:
+ __("Interface: get_transaction")
if not is_hash256_str(tx_hash):
raise Exception(f"{repr(tx_hash)} is not a txid")
- raw = await self.session.send_request('blockchain.transaction.get', [tx_hash], timeout=timeout)
+ # ORIG: raw = await self.session.send_request('blockchain.transaction.get', [tx_hash], timeout=timeout)
+ raw = self.client.mempool_transaction(tx_hash)
+ # raw = self.client.transaction(tx_hash)
# validate response
if not is_hex_str(raw):
raise RequestCorrupted(f"received garbage (non-hex) as tx data (txid {tx_hash}): {raw!r}")
@@ -942,11 +717,18 @@ class Interface(Logger):
raise RequestCorrupted(f"received tx does not match expected txid {tx_hash} (got {tx.txid()})")
return raw
+
async def get_history_for_scripthash(self, sh: str) -> List[dict]:
+ __(f"Interface: get_history_for_scripthash {sh}")
if not is_hash256_str(sh):
raise Exception(f"{repr(sh)} is not a scripthash")
# do request
- res = await self.session.send_request('blockchain.scripthash.get_history', [sh])
+ # ORIG: res = await self.session.send_request('blockchain.scripthash.get_history', [sh])
+ _ec, history = await self.client.history4(sh)
+ if _ec is not None and _ec != 0:
+ raise RequestCorrupted('got error %d' % _ec)
+ __("Interface: get_history_for_scripthash: got history: %s" % (history))
+ res = {}
# check response
assert_list_or_tuple(res)
prev_height = 1
@@ -970,13 +752,20 @@ class Interface(Logger):
# a recently mined tx could be included in both last block and mempool?
# Still, it's simplest to just disregard the response.
raise RequestCorrupted(f"server history has non-unique txids for sh={sh}")
+
return res
async def listunspent_for_scripthash(self, sh: str) -> List[dict]:
+ __(f"Interface: listunspent_for_scripthash {sh}")
if not is_hash256_str(sh):
raise Exception(f"{repr(sh)} is not a scripthash")
# do request
- res = await self.session.send_request('blockchain.scripthash.listunspent', [sh])
+ # ORIG: res = await self.session.send_request('blockchain.scripthash.listunspent', [sh])
+ _ec, unspent = await self.client.unspent(sh)
+ if _ec is not None and _ec != 0:
+ raise RequestCorrupted('got error %d' % _ec)
+ __("Interface: listunspent_for_scripthash: got unspent: %s" % unspent)
+ res = {}
# check response
assert_list_or_tuple(res)
for utxo_item in res:
@@ -991,10 +780,17 @@ class Interface(Logger):
return res
async def get_balance_for_scripthash(self, sh: str) -> dict:
+ __(f"Interface: get_balance_for_scripthash {sh}")
if not is_hash256_str(sh):
raise Exception(f"{repr(sh)} is not a scripthash")
# do request
- res = await self.session.send_request('blockchain.scripthash.get_balance', [sh])
+ # ORIG: res = await self.sessions.send_request('blockchains.scripthash.get_balance', [sh])
+ _ec, balance = await self.client.balance(sh)
+ if _ec is not None and _ec != 0:
+ raise RequestCorrupted('got error %d' % _ec)
+ __("Interface: get_balance_for_scripthash: got balance: %s" % balance)
+ # TODO: libbitcoin
+ res = {}
# check response
assert_dict_contains_field(res, field_name='confirmed')
assert_dict_contains_field(res, field_name='unconfirmed')
@@ -1003,30 +799,40 @@ class Interface(Logger):
return res
async def get_txid_from_txpos(self, tx_height: int, tx_pos: int, merkle: bool):
+ __("Interface: get_txid_from_txpos")
if not is_non_negative_integer(tx_height):
raise Exception(f"{repr(tx_height)} is not a block height")
if not is_non_negative_integer(tx_pos):
raise Exception(f"{repr(tx_pos)} should be non-negative integer")
# do request
- res = await self.session.send_request(
- 'blockchain.transaction.id_from_pos',
- [tx_height, tx_pos, merkle],
- )
+ # ORIG: res = await self.session.send_request(
+ # 'blockchain.transaction.id_from_pos',
+ # [tx_height, tx_pos, merkle],
+ # )
+ _ec, hashes = await self.client.block_transaction_hashes(tx_height)
+ if _ec is not None and _ec != 0:
+ raise RequestCorrupted('got error %d' % _ec)
+ txid = hexlify(hashes[tx_pos][::-1])
# check response
- if merkle:
- assert_dict_contains_field(res, field_name='tx_hash')
- assert_dict_contains_field(res, field_name='merkle')
- assert_hash256_str(res['tx_hash'])
- assert_list_or_tuple(res['merkle'])
- for node_hash in res['merkle']:
- assert_hash256_str(node_hash)
- else:
- assert_hash256_str(res)
+ if not merkle:
+ assert_hash256_str(txid)
+ return txid
+ branch = merkle_branch(hashes, tx_pos)
+ res = {'tx_hash': txid, 'merkle': branch}
+ assert_dict_contains_field(res, field_name='tx_hash')
+ assert_dict_contains_field(res, field_name='merkle')
+ assert_hash256_str(res['tx_hash'])
+ assert_list_or_tuple(res['merkle'])
+ for node_hash in res['merkle']:
+ assert_hash256_str(node_hash)
return res
async def get_fee_histogram(self) -> Sequence[Tuple[Union[float, int], int]]:
+ __("Interface: get_fee_histogram")
# do request
- res = await self.session.send_request('mempool.get_fee_histogram')
+ # ORIG: res = await self.session.send_request('mempool.get_fee_histogram')
+ # TODO: libbitcoin
+ res = [[0, 0]]
# check response
assert_list_or_tuple(res)
prev_fee = float('inf')
@@ -1039,16 +845,22 @@ class Interface(Logger):
return res
async def get_server_banner(self) -> str:
+ __("Interface: get_server_banner")
# do request
- res = await self.session.send_request('server.banner')
+ # ORIG: res = await self.session.send_request('server.banner')
+ # TODO: libbitcoin
+ res = 'libbitcoin'
# check response
if not isinstance(res, str):
raise RequestCorrupted(f'{res!r} should be a str')
return res
async def get_donation_address(self) -> str:
+ __("Interface: get_donation_address")
# do request
- res = await self.session.send_request('server.donation_address')
+ # ORIG: res = await self.session.send_request('server.donation_address')
+ # TODO: libbitcoin
+ res = None
# check response
if not res: # ignore empty string
return ''
@@ -1061,8 +873,11 @@ class Interface(Logger):
async def get_relay_fee(self) -> int:
"""Returns the min relay feerate in sat/kbyte."""
+ __("Interface: get_relay_fee")
# do request
- res = await self.session.send_request('blockchain.relayfee')
+ # ORIG: res = await self.session.send_request('blockchain.relayfee')
+ # TODO: libbitcoin
+ res = 0.00001
# check response
assert_non_negative_int_or_float(res)
relayfee = int(res * bitcoin.COIN)
@@ -1070,13 +885,16 @@ class Interface(Logger):
return relayfee
async def get_estimatefee(self, num_blocks: int) -> int:
- """Returns a feerate estimate for getting confirmed within
+ """Returns a feerate estimtte for getting confirmed within
num_blocks blocks, in sat/kbyte.
"""
+ __("Interface: get_estimatefee")
if not is_non_negative_integer(num_blocks):
raise Exception(f"{repr(num_blocks)} is not a num_blocks")
# do request
- res = await self.session.send_request('blockchain.estimatefee', [num_blocks])
+ # ORIG: res = await self.session.send_request('blockchain.estimatefee', [num_blocks])
+ # TODO: libbitcoin
+ res = -1
# check response
if res != -1:
assert_non_negative_int_or_float(res)
@@ -1085,48 +903,7 @@ class Interface(Logger):
def _assert_header_does_not_check_against_any_chain(header: dict) -> None:
+ __("Interface: _assert_header_does_not_check_against_any_chain")
chain_bad = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header)
if chain_bad:
raise Exception('bad_header must not check!')
-
-
-def check_cert(host, cert):
- try:
- b = pem.dePem(cert, 'CERTIFICATE')
- x = x509.X509(b)
- except:
- traceback.print_exc(file=sys.stdout)
- return
-
- try:
- x.check_date()
- expired = False
- except:
- expired = True
-
- m = "host: %s\n"%host
- m += "has_expired: %s\n"% expired
- util.print_msg(m)
-
-
-# Used by tests
-def _match_hostname(name, val):
- if val == name:
- return True
-
- return val.startswith('*.') and name.endswith(val[1:])
-
-
-def test_certificates():
- from .simple_config import SimpleConfig
- config = SimpleConfig()
- mydir = os.path.join(config.path, "certs")
- certs = os.listdir(mydir)
- for c in certs:
- p = os.path.join(mydir,c)
- with open(p, encoding='utf-8') as f:
- cert = f.read()
- check_cert(c, cert)
-
-if __name__ == "__main__":
- test_certificates()
diff --git a/electrum/libbitcoin_errors.py b/electrum/libbitcoin_errors.py
@@ -0,0 +1,72 @@
+import enum
+
+
+def make_error_code(ec):
+ if not ec:
+ return None
+ return ErrorCode(ec)
+
+
+class ErrorCode(enum.Enum):
+
+ nothing = 0
+
+ service_stopped = 1
+ operation_failed = 2
+
+ # blockchain errors
+ not_found = 3
+ duplicate = 4
+ unspent_output = 5
+ unsupported_payment_type = 6
+
+ # network errors
+ resolve_failed = 7
+ network_unreachable = 8
+ address_in_use = 9
+ listen_failed = 10
+ accept_failed = 11
+ bad_stream = 12
+ channel_timeout = 13
+
+ # transaction pool
+ blockchain_reorganized = 14
+ pool_filled = 15
+
+ # validate tx
+ coinbase_transaction = 16
+ is_not_standard = 17
+ double_spend = 18
+ input_not_found = 19
+
+ # check_transaction()
+ empty_transaction = 20
+ output_value_overflow = 21
+ invalid_coinbase_script_size = 22
+ previous_output_null = 23
+
+ # validate block
+ previous_block_invalid = 24
+
+ # check_block()
+ size_limits = 25
+ proof_of_work = 26
+ futuristic_timestamp = 27
+ first_not_coinbase = 28
+ extra_coinbases = 29
+ too_many_sigs = 30
+ merkle_mismatch = 31
+
+ # accept_block()
+ incorrect_proof_of_work = 32
+ timestamp_too_early = 33
+ non_final_transaction = 34
+ checkpoints_failed = 35
+ old_version_block = 36
+ coinbase_height_mismatch = 37
+
+ # connect_block()
+ duplicate_or_spent = 38
+ validate_inputs_failed = 39
+ fees_out_of_range = 40
+ coinbase_too_large = 41
diff --git a/electrum/merkle.py b/electrum/merkle.py
@@ -0,0 +1,62 @@
+#!/usr/bin/env python
+#
+# Electrum -lightweight Bitcoin client
+# Copyright (C) 2021 Ivan J. <parazyd@dyne.org>
+#
+# Permission is hereby granted, free of charge, to any person
+# obtaining a copy of this software and associated documentation files
+# (the "Software"), to deal in the Software without restriction,
+# including without limitation the rights to use, copy, modify, merge,
+# publish, distribute, sublicense, and/or sell copies of the Software,
+# and to permit persons to whom the Software is furnished to do so,
+# subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be
+# included in all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+# SOFTWARE.
+"""Module for calculating merkle branches"""
+from math import ceil, log
+
+from .crypto import sha256d
+
+
+def branch_length(hash_count):
+ """Return the length of a merkle branch given the number of hashes"""
+ return ceil(log(hash_count, 2))
+
+
+def merkle_branch_and_root(hashes, index):
+ """Return a (merkle branch, merkle_root) pair given hashes, and the
+ index of one of those hashes.
+ """
+ hashes = list(hashes)
+ if not isinstance(index, int):
+ raise TypeError('index must be an integer')
+ # This also asserts hashes is not empty
+ if not 0 <= index < len(hashes):
+ raise ValueError('index out of range')
+ length = branch_length(len(hashes))
+
+ branch = []
+ for _ in range(length):
+ if len(hashes) & 1:
+ hashes.append(hashes[-1])
+ branch.append(hashes[index ^ 1])
+ index >>= 1
+ hashes = [sha256d(hashes[n] + hashes[n+1])
+ for n in range(0, len(hashes), 2)]
+ return branch, hashes[0]
+
+def merkle_branch(tx_hashes, tx_pos):
+ """Return a merkle branch given hashes and the tx position"""
+ branch, _root = merkle_branch_and_root(tx_hashes, tx_pos)
+ branch = [bytes(reversed(h)).hex() for h in branch]
+ return branch
diff --git a/electrum/network.py b/electrum/network.py
@@ -449,7 +449,7 @@ class Network(Logger, NetworkRetryManager[ServerAddr]):
async def _request_server_info(self, interface: 'Interface'):
await interface.ready
- session = interface.session
+ # TODO: libbitcoin: session = interface.session
async def get_banner():
self.banner = await interface.get_server_banner()
@@ -457,7 +457,9 @@ class Network(Logger, NetworkRetryManager[ServerAddr]):
async def get_donation_address():
self.donation_address = await interface.get_donation_address()
async def get_server_peers():
- server_peers = await session.send_request('server.peers.subscribe')
+ # ORIG: server_peers = await session.send_request('server.peers.subscribe')
+ # TODO: libbitcoin
+ server_peers = []
random.shuffle(server_peers)
max_accepted_peers = len(constants.net.DEFAULT_SERVERS) + NUM_RECENT_SERVERS
server_peers = server_peers[:max_accepted_peers]
@@ -880,6 +882,7 @@ class Network(Logger, NetworkRetryManager[ServerAddr]):
if timeout is None:
timeout = self.get_network_timeout_seconds(NetworkTimeout.Urgent)
try:
+ # TODO: libbitcoin
out = await self.interface.session.send_request('blockchain.transaction.broadcast', [tx.serialize()], timeout=timeout)
# note: both 'out' and exception messages are untrusted input from the server
except (RequestTimedOut, asyncio.CancelledError, asyncio.TimeoutError):
@@ -1329,6 +1332,7 @@ class Network(Logger, NetworkRetryManager[ServerAddr]):
while not self.is_connected():
await asyncio.sleep(1)
session = self.interface.session
+ # TODO: libbitcoin
return parse_servers(await session.send_request('server.peers.subscribe'))
async def send_multiple_requests(self, servers: Sequence[ServerAddr], method: str, params: Sequence):
@@ -1342,6 +1346,7 @@ class Network(Logger, NetworkRetryManager[ServerAddr]):
await interface.close()
return
try:
+ # TODO: libbitcoin
res = await interface.session.send_request(method, params, timeout=10)
except Exception as e:
res = e
diff --git a/electrum/synchronizer.py b/electrum/synchronizer.py
@@ -83,7 +83,8 @@ class SynchronizerBase(NetworkJobOnDefaultServer):
await group.spawn(self.main())
finally:
# we are being cancelled now
- self.session.unsubscribe(self.status_queue)
+ # TODO: libbitcoin
+ print("self.session.unsubscribe(self.status_queue)")
def _reset_request_counters(self):
self._requests_sent = 0
@@ -110,7 +111,8 @@ class SynchronizerBase(NetworkJobOnDefaultServer):
self._requests_sent += 1
try:
async with self._network_request_semaphore:
- await self.session.subscribe('blockchain.scripthash.subscribe', [h], self.status_queue)
+ # TODO: libbitcoin
+ print("await self.session.subscribe('blockchain.scripthash.subscribe', [h], self.status_queue)")
except RPCError as e:
if e.message == 'history too large': # no unique error code
raise GracefulDisconnect(e, log_level=logging.ERROR) from e
diff --git a/electrum/util.py b/electrum/util.py
@@ -1225,7 +1225,9 @@ class NetworkJobOnDefaultServer(Logger, ABC):
@property
def session(self):
- s = self.interface.session
+ # ORIG: s = self.interface.session
+ # TODO: libbitcoin
+ s = self.interface.client
assert s is not None
return s
@@ -1525,6 +1527,7 @@ class JsonRPCClient:
self._id = 0
async def request(self, endpoint, *args):
+ # TODO: libbitcoin
self._id += 1
data = ('{"jsonrpc": "2.0", "id":"%d", "method": "%s", "params": %s }'
% (self._id, endpoint, json.dumps(args)))
diff --git a/electrum/zeromq.py b/electrum/zeromq.py
@@ -0,0 +1,493 @@
+#!/usr/bin/env python
+#
+# Electrum - lightweight Bitcoin client
+# Copyright (C) 2011 thomasv@gitorious
+# Copyright (C) 2021 Ivan J. <parazyd@dyne.org>
+# Copyright (C) 2018 Harm Aarts <harmaarts@gmail.com>
+#
+# Permission is hereby granted, free of charge, to any person
+# obtaining a copy of this software and associated documentation files
+# (the "Software"), to deal in the Software without restriction,
+# including without limitation the rights to use, copy, modify, merge,
+# publish, distribute, sublicense, and/or sell copies of the Software,
+# and to permit persons to whom the Software is furnished to do so,
+# subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be
+# included in all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+# SOFTWARE.
+import asyncio
+import logging
+import functools
+import hashlib
+import struct
+from random import randint
+from binascii import hexlify, unhexlify
+
+import zmq
+import zmq.asyncio
+
+from .logging import Logger
+from .libbitcoin_errors import make_error_code, ErrorCode
+
+from datetime import datetime
+def __(msg):
+ print("***********************")
+ print("*** DEBUG %s ***: %s" % (datetime.now().strftime("%H:%M:%S"), msg))
+
+
+def create_random_id():
+ """Generate a random request ID"""
+ max_uint32 = 4294967295
+ return randint(0, max_uint32)
+
+
+def checksum(hash_, index):
+ """This method takes a transaction hash and an index and returns
+ a checksum.
+ This checksum is based on 49 bits starting from the 12th byte of the
+ reversed hash. Combined with the last 15 bits of the 4 byte index.
+ """
+ mask = 0xffffffffffff8000
+ magic_start_position = 12
+
+ hash_bytes = bytes.fromhex(hash_)[::-1]
+ last_20_bytes = hash_bytes[magic_start_position:]
+
+ assert len(hash_bytes) == 32
+ assert index < 2**32
+
+ hash_upper_49_bits = to_int(last_20_bytes) & mask
+ index_lower_15_bits = index & ~mask
+
+ return hash_upper_49_bits | index_lower_15_bits
+
+
+def to_int(some_bytes):
+ return int.from_bytes(some_bytes, byteorder='little')
+
+
+def pack_block_index(index):
+ if isinstance(index, str):
+ index = unhexlify(index)
+ assert len(index) == 32
+ return index
+ elif isinstance(index, int):
+ return struct.pack('<I', index)
+ else:
+ raise ValueError(f"Unknown index type {type(index)} v:{index}, should be int or bytearray")
+
+
+def unpack_table(row_fmt, data):
+ # get the number of rows
+ row_size = struct.calcsize(row_fmt)
+ nrows = len(data) // row_size
+
+ # unpack
+ rows = []
+ for idx in range(nrows):
+ offset = idx * row_size
+ row = struct.unpack_from(row_fmt, data, offset)
+ rows.append(row)
+ return rows
+
+
+class ClientSettings:
+ """Class implementing client settings"""
+ def __init__(self, timeout=10, context=None, loop=None):
+ __("Zeromq ClientSettings: __init__")
+ self._timeout = timeout
+ self._context = context
+ self._loop = loop
+
+ @property
+ def context(self):
+ """zmq context property"""
+ if not self._context:
+ ctx = zmq.asyncio.Context()
+ ctx.linger = 500 # in milliseconds
+ self._context = ctx
+ return self._context
+
+ @context.setter
+ def context(self, context):
+ self._context = context
+
+ @property
+ def timeout(self):
+ """Set to None for no timeout"""
+ return self._timeout
+
+ @timeout.setter
+ def timeout(self, timeout):
+ self._timeout = timeout
+
+
+class Request:
+ """Class implementing a _send_ request.
+ This is either a simple request/response affair or a subscription.
+ """
+ def __init__(self, socket, command, data):
+ __("Zeromq Request: __init__")
+ self.id_ = create_random_id()
+ self.socket = socket
+ self.command = command
+ self.data = data
+ self.future = asyncio.Future()
+ self.queue = None
+
+ async def send(self):
+ """Send the zmq request"""
+ __(f"Zeromq Request: send: {self.command}, {self.data}")
+ request = [self.command, struct.pack('<I', self.id_), self.data]
+ await self.socket.send_multipart(request)
+
+ def is_subscription(self):
+ """If the request is a subscription, then the response to this
+ request is a notification.
+ """
+ return self.queue is not None
+
+ def __str__(self):
+ return 'Request(command, ID) {}, {:d}'.format(self.command,
+ self.id_)
+
+
+class InvalidServerResponseException(Exception): pass
+
+
+class Response:
+ """Class implementing a request response"""
+ def __init__(self, frame):
+ __("Zeromq Response: __init__")
+ if len(frame) != 3:
+ raise InvalidServerResponseException(
+ 'Length of the frame was not 3: %d' % len(frame))
+
+ self.command = frame[0]
+ self.request_id = struct.unpack('<I', frame[1])[0]
+ error_code = struct.unpack('<I', frame[2][:4])[0]
+ self.error_code = make_error_code(error_code)
+ self.data = frame[2][4:]
+
+ def is_bound_for_queue(self):
+ return len(self.data) > 0
+
+ def __str__(self):
+ return 'Response(command, request ID, error code, data):' \
+ + ' %s, %d, %s, %s' \
+ % (self.command, self.request_id, self.error_code, self.data)
+
+
+class RequestCollection:
+ """RequestCollection carries a list of Requests and matches incoming
+ responses to them.
+ """
+ def __init__(self, socket, loop):
+ __("Zeromq RequestCollection: __init__")
+ self._socket = socket
+ self._requests = {}
+ self._task = asyncio.ensure_future(self._run(), loop=loop)
+
+ async def _run(self):
+ while True:
+ __("Zeromq RequestCollection: _run loop iteration")
+ await self._receive()
+
+ async def stop(self):
+ """Stops listening for incoming responses (or subscription) messages).
+ Returns the number of _responses_ expected but which are now dropped
+ on the floor.
+ """
+ __("Zeromq RequestCollection: stop")
+ self._task.cancel()
+ try:
+ await self._task
+ except asyncio.CancelledError:
+ return len(self._requests)
+
+ async def _receive(self):
+ __("Zeromq RequestCollection: receive")
+ frame = await self._socket.recv_multipart()
+ response = Response(frame)
+
+ if response.request_id in self._requests:
+ self._handle_response(response)
+ else:
+ __("Zeromq RequestCollection: receive: unhandled response %s:%s" % (response.command, response.request_id))
+
+ def _handle_response(self, response):
+ __("Zeromq RequestCollection: _handle_response")
+ request = self._requests[response.request_id]
+
+ if request.is_subscription():
+ if response.is_bound_for_queue():
+ # TODO: decode the data into something usable
+ request.queue.put_nowait(response.data)
+ else:
+ request.future.set_result(response)
+ else:
+ self.delete_request(request)
+ request.future.set_result(response)
+
+ def add_request(self, request):
+ __("Zeromq RequestCollection: add_request")
+ # TODO: we should maybe check if the request.id_ is unique
+ self._requests[request.id_] = request
+
+ def delete_request(self, request):
+ __("Zeromq RequestCollection: delete_request")
+ del self._requests[request.id_]
+
+
+class Client:
+ """This class represents a connection to a libbitcoin server.
+ hostname -- the server DNS name to connect to.
+ ports -- a dictionary containing four keys; query/heartbeat/block/tx
+ """
+ # def __init__(self, hostname, ports, settings=ClientSettings()):
+ def __init__(self, hostname, ports, loop):
+ __("Zeromq Client: __init__")
+ self._hostname = hostname
+ self._ports = ports
+ # self._settings = settings
+ self._settings = ClientSettings(loop=loop)
+ self._query_socket = self._create_query_socket()
+ self._block_socket = self._create_block_socket()
+ self._request_collection = RequestCollection(
+ self._query_socket, self._settings._loop)
+
+ async def stop(self):
+ __("Zeromq Client: stop")
+ self._query_socket.close()
+ self._block_socket.close()
+ return await self._request_collection.stop()
+
+ def _create_block_socket(self):
+ __("Zeromq Client: _create_block_socket")
+ socket = self._settings.context.socket(
+ zmq.SUB, io_loop=self._settings._loop) # pylint: disable=E1101
+ socket.connect(self.__server_url(self._hostname,
+ self._ports['block']))
+ socket.setsockopt_string(zmq.SUBSCRIBE, '') # pylint: disable=E1101
+ return socket
+
+ def _create_query_socket(self):
+ __("Zeromq Client: _create_query_socket")
+ socket = self._settings.context.socket(
+ zmq.DEALER, io_loop=self._settings._loop) # pylint: disable=E1101
+ socket.connect(self.__server_url(self._hostname,
+ self._ports['query']))
+ return socket
+
+ async def _subscription_request(self, command, data):
+ __("Zeromq Client: _subscription_request")
+ request = await self._request(command, data)
+ request.queue = asyncio.Queue(loop=self._settings._loop)
+ error_code, _ = await self._wait_for_response(request)
+ return error_code, request.queue
+
+ async def _simple_request(self, command, data):
+ __("Zeromq Client: _simple_request")
+ return await self._wait_for_response(
+ await self._request(command, data))
+
+ async def _request(self, command, data):
+ """Make a generic request. Both options are byte objects
+ specified like b'blockchain.fetch_block_header' as an example.
+ """
+ __("Zeromq Client: _request")
+ request = Request(self._query_socket, command, data)
+ await request.send()
+ self._request_collection.add_request(request)
+ return request
+
+ async def _wait_for_response(self, request):
+ __("Zeromq Client: _wait_for_response")
+ try:
+ response = await asyncio.wait_for(request.future,
+ self._settings.timeout)
+ except asyncio.TimeoutError:
+ self._request_collection.delete_request(request)
+ return ErrorCode.channel_timeout, None
+
+ assert response.command == request.command
+ assert response.request_id == request.id_
+ return response.error_code, response.data
+
+ @staticmethod
+ def __server_url(hostname, port):
+ return 'tcp://' + hostname + ':' + str(port)
+
+ async def last_height(self):
+ __("Zeromq Client: last_height")
+ command = b'blockchain.fetch_last_height'
+ error_code, data = await self._simple_request(command, b'')
+ if error_code:
+ return error_code, None
+ height = struct.unpack('<I', data)[0]
+ return error_code, height
+
+ async def subscribe_to_blocks(self, queue):
+ __("Zeromq Client: subscribe_to_blocks")
+ asyncio.ensure_future(self._listen_for_blocks(queue))
+ return queue
+
+ async def _listen_for_blocks(self, queue):
+ __("Zeromq Client: _listen_for_blocks")
+ _ec, tip = await self.last_height()
+ _, header = await self.block_header(tip)
+ queue.put_nowait((0, tip, header))
+ while True:
+ __("Zeromq Client: _listen_for_blocks loop iteration")
+ frame = await self._block_socket.recv_multipart()
+ seq = struct.unpack('<H', frame[0])[0]
+ height = struct.unpack('<I', frame[1])[0]
+ block_data = frame[2]
+ block_header = block_data[:80]
+ # block_header = raw[:80]
+ # version = block_header[:4]
+ # prev_merkle_root = block_header[4:36]
+ # merkle_root = block_header[36:68]
+ # timestamp = block_header[68:72]
+ # bits = block_header[72:76]
+ # nonce = blockheader[76:80]
+ queue.put_nowait((seq, height, block_header))
+
+ async def block_header(self, index):
+ """Fetches the block header by height or integer index"""
+ __("Zeromq Client: block_header")
+ command = b'blockchain.fetch_block_header'
+ data = pack_block_index(index)
+ error_code, data = await self._simple_request(command, data)
+ if error_code:
+ return error_code, None
+ return error_code, data
+
+ async def block_transaction_hashes(self, index):
+ __("Zeromq Client: block_transaction_hashes")
+ command = b'blockchain.fetch_block_transaction_hashes'
+ data = pack_block_index(index)
+ error_code, data = await self._simple_request(command, data)
+ if error_code:
+ return error_code, None
+ data = unpack_table('32s', data)
+ return error_code, data
+
+ async def transaction(self, hash_):
+ __("Zeromq Client: transaction")
+ command = b'blockchain.fetch_transaction2'
+ error_code, data = await self._simple_request(
+ command, bytes.fromhex(hash_)[::-1])
+ if error_code:
+ return error_code, None
+ return None, data
+
+ async def mempool_transaction(self, hash_):
+ __("Zeromq Client: mempool_transaction")
+ command = b'transaction_pool.fetch_transaction2'
+ error_code, data = await self._simple_request(
+ command, bytes.fromhex(hash_)[::-1])
+ if error_code:
+ return error_code, None
+ return None, data
+
+ async def history4(self, scripthash, height=0):
+ __("Zeromq Client: history4")
+ command = b'blockchain.fetch_history4'
+ decoded_address = unhexlify(scripthash)[::-1] # TODO: check byte order
+ error_code, raw_points = await self._simple_request(
+ command, decoded_address + struct.pack('<I', height))
+ if error_code:
+ return error_code, None
+
+ def make_tuple(row):
+ kind, tx_hash, index, height, value = row
+ return (
+ kind,
+ #COutPoint(tx_hash, index),
+ (tx_hash, index),
+ height,
+ value,
+ checksum(tx_hash[::-1].hex(), index),
+ )
+
+ rows = unpack_table('<B32sIIQ', raw_points)
+ points = [make_tuple(row) for row in rows]
+ correlated_points = Client.__correlate(points)
+ return None, correlated_points
+
+ async def balance(self, scripthash):
+ __("Zeromq Client: balance")
+ error, hist = await self.history4(scripthash)
+ if error:
+ return error, None
+ utxo = Client.__receives_without_spends(hist)
+ return None, functools.reduce(
+ lambda accumulator, point: accumulator + point['value'], utxo, 0)
+
+ async def unspent(self, scripthash):
+ __("Zeromq Client: unspent")
+ error, hist = await self.history4(scripthash)
+ if error:
+ return error, None
+ return None, Client.__receives_without_spends(hist)
+
+ @staticmethod
+ def __receives_without_spends(hist):
+ return (point for point in hist if 'spent' not in point)
+
+ @staticmethod
+ def __correlate(points):
+ transfers, checksum_to_index = Client.__find_receives(points)
+ transfers = Client.__correlate_spends_to_receives(
+ points, transfers, checksum_to_index)
+ return transfers
+
+ @staticmethod
+ def __correlate_spends_to_receives(points, transfers, checksum_to_index):
+ for point in points:
+ if point[0] == 0: # receive
+ continue
+
+ spent = {
+ 'hash': point[1].hash,
+ 'height': point[2],
+ 'index': point[1].n,
+ }
+ if point[3] not in checksum_to_index:
+ transfers.append({'spent': spent})
+ else:
+ transfers[checksum_to_index[point[3]]]['spent'] = spent
+
+ return transfers
+
+ @staticmethod
+ def __find_receives(points):
+ transfers = []
+ checksum_to_index = {}
+
+ for point in points:
+ if point[0] == 1: # spent
+ continue
+
+ transfers.append({
+ 'received': {
+ 'hash': point[1].hash,
+ 'height': point[2],
+ 'index': point[1].n,
+ },
+ 'value': point[3],
+ })
+
+ checksum_to_index[point[4]] = len(transfers) - 1
+
+ return transfers, checksum_to_index