commit 139fc78e196ec76ebc22ec7b50a8e6b84522062b
parent 24e4aa3ab91cd1d7ace8c1996ce86249ed4c7492
Author: parazyd <>
Date: Fri, 12 Mar 2021 18:02:43 +0100
Copy to
1 file changed, 1132 insertions(+), 0 deletions(-)
diff --git a/electrum/ b/electrum/
@@ -0,0 +1,1132 @@
+#!/usr/bin/env python
+# Electrum - lightweight Bitcoin client
+# Copyright (C) 2011 thomasv@gitorious
+# Permission is hereby granted, free of charge, to any person
+# obtaining a copy of this software and associated documentation files
+# (the "Software"), to deal in the Software without restriction,
+# including without limitation the rights to use, copy, modify, merge,
+# publish, distribute, sublicense, and/or sell copies of the Software,
+# and to permit persons to whom the Software is furnished to do so,
+# subject to the following conditions:
+# The above copyright notice and this permission notice shall be
+# included in all copies or substantial portions of the Software.
+import os
+import re
+import ssl
+import sys
+import traceback
+import asyncio
+import socket
+from typing import Tuple, Union, List, TYPE_CHECKING, Optional, Set, NamedTuple, Any, Sequence
+from collections import defaultdict
+from ipaddress import IPv4Network, IPv6Network, ip_address, IPv6Address, IPv4Address
+import itertools
+import logging
+import hashlib
+import functools
+import aiorpcx
+from aiorpcx import TaskGroup
+from aiorpcx import RPCSession, Notification, NetAddress, NewlineFramer
+from aiorpcx.curio import timeout_after, TaskTimeout
+from aiorpcx.jsonrpc import JSONRPC, CodeMessageError
+from aiorpcx.rawsocket import RSClient
+import certifi
+from .util import (ignore_exceptions, log_exceptions, bfh, SilentTaskGroup, MySocksProxy,
+ is_integer, is_non_negative_integer, is_hash256_str, is_hex_str,
+ is_int_or_float, is_non_negative_int_or_float)
+from . import util
+from . import x509
+from . import pem
+from . import version
+from . import blockchain
+from .blockchain import Blockchain, HEADER_SIZE
+from . import bitcoin
+from . import constants
+from .i18n import _
+from .logging import Logger
+from .transaction import Transaction
+ from .network import Network
+ from .simple_config import SimpleConfig
+ca_path = certifi.where()
+MAX_INCOMING_MSG_SIZE = 1_000_000 # in bytes
+class NetworkTimeout:
+ # seconds
+ class Generic:
+ NORMAL = 30
+ RELAXED = 45
+ class Urgent(Generic):
+ NORMAL = 10
+ RELAXED = 20
+def assert_non_negative_integer(val: Any) -> None:
+ if not is_non_negative_integer(val):
+ raise RequestCorrupted(f'{val!r} should be a non-negative integer')
+def assert_integer(val: Any) -> None:
+ if not is_integer(val):
+ raise RequestCorrupted(f'{val!r} should be an integer')
+def assert_int_or_float(val: Any) -> None:
+ if not is_int_or_float(val):
+ raise RequestCorrupted(f'{val!r} should be int or float')
+def assert_non_negative_int_or_float(val: Any) -> None:
+ if not is_non_negative_int_or_float(val):
+ raise RequestCorrupted(f'{val!r} should be a non-negative int or float')
+def assert_hash256_str(val: Any) -> None:
+ if not is_hash256_str(val):
+ raise RequestCorrupted(f'{val!r} should be a hash256 str')
+def assert_hex_str(val: Any) -> None:
+ if not is_hex_str(val):
+ raise RequestCorrupted(f'{val!r} should be a hex str')
+def assert_dict_contains_field(d: Any, *, field_name: str) -> Any:
+ if not isinstance(d, dict):
+ raise RequestCorrupted(f'{d!r} should be a dict')
+ if field_name not in d:
+ raise RequestCorrupted(f'required field {field_name!r} missing from dict')
+ return d[field_name]
+def assert_list_or_tuple(val: Any) -> None:
+ if not isinstance(val, (list, tuple)):
+ raise RequestCorrupted(f'{val!r} should be a list or tuple')
+class NotificationSession(RPCSession):
+ def __init__(self, *args, interface: 'Interface', **kwargs):
+ super(NotificationSession, self).__init__(*args, **kwargs)
+ self.subscriptions = defaultdict(list)
+ self.cache = {}
+ self.default_timeout = NetworkTimeout.Generic.NORMAL
+ self._msg_counter = itertools.count(start=1)
+ self.interface = interface
+ self.cost_hard_limit = 0 # disable aiorpcx resource limits
+ async def handle_request(self, request):
+ self.maybe_log(f"--> {request}")
+ try:
+ if isinstance(request, Notification):
+ params, result = request.args[:-1], request.args[-1]
+ key = self.get_hashable_key_for_rpc_call(request.method, params)
+ if key in self.subscriptions:
+ self.cache[key] = result
+ for queue in self.subscriptions[key]:
+ await queue.put(request.args)
+ else:
+ raise Exception(f'unexpected notification')
+ else:
+ raise Exception(f'unexpected request. not a notification')
+ except Exception as e:
+"error handling request {request}. exc: {repr(e)}")
+ await self.close()
+ async def send_request(self, *args, timeout=None, **kwargs):
+ # note: semaphores/timeouts/backpressure etc are handled by
+ # aiorpcx. the timeout arg here in most cases should not be set
+ msg_id = next(self._msg_counter)
+ self.maybe_log(f"<-- {args} {kwargs} (id: {msg_id})")
+ try:
+ # note: RPCSession.send_request raises TaskTimeout in case of a timeout.
+ # TaskTimeout is a subclass of CancelledError, which is *suppressed* in TaskGroups
+ response = await asyncio.wait_for(
+ super().send_request(*args, **kwargs),
+ timeout)
+ except (TaskTimeout, asyncio.TimeoutError) as e:
+ raise RequestTimedOut(f'request timed out: {args} (id: {msg_id})') from e
+ except CodeMessageError as e:
+ self.maybe_log(f"--> {repr(e)} (id: {msg_id})")
+ raise
+ else:
+ self.maybe_log(f"--> {response} (id: {msg_id})")
+ return response
+ def set_default_timeout(self, timeout):
+ self.sent_request_timeout = timeout
+ self.max_send_delay = timeout
+ async def subscribe(self, method: str, params: List, queue: asyncio.Queue):
+ # note: until the cache is written for the first time,
+ # each 'subscribe' call might make a request on the network.
+ key = self.get_hashable_key_for_rpc_call(method, params)
+ self.subscriptions[key].append(queue)
+ if key in self.cache:
+ result = self.cache[key]
+ else:
+ result = await self.send_request(method, params)
+ self.cache[key] = result
+ await queue.put(params + [result])
+ def unsubscribe(self, queue):
+ """Unsubscribe a callback to free object references to enable GC."""
+ # note: we can't unsubscribe from the server, so we keep receiving
+ # subsequent notifications
+ for v in self.subscriptions.values():
+ if queue in v:
+ v.remove(queue)
+ @classmethod
+ def get_hashable_key_for_rpc_call(cls, method, params):
+ """Hashable index for subscriptions and cache"""
+ return str(method) + repr(params)
+ def maybe_log(self, msg: str) -> None:
+ if not self.interface: return
+ if self.interface.debug or
+ self.interface.logger.debug(msg)
+ def default_framer(self):
+ # overridden so that max_size can be customized
+ max_size = int('network_max_incoming_msg_size',
+ return NewlineFramer(max_size=max_size)
+class NetworkException(Exception): pass
+class GracefulDisconnect(NetworkException):
+ log_level = logging.INFO
+ def __init__(self, *args, log_level=None, **kwargs):
+ Exception.__init__(self, *args, **kwargs)
+ if log_level is not None:
+ self.log_level = log_level
+class RequestTimedOut(GracefulDisconnect):
+ def __str__(self):
+ return _("Network request timed out.")
+class RequestCorrupted(Exception): pass
+class ErrorParsingSSLCert(Exception): pass
+class ErrorGettingSSLCertFromServer(Exception): pass
+class ErrorSSLCertFingerprintMismatch(Exception): pass
+class InvalidOptionCombination(Exception): pass
+class ConnectError(NetworkException): pass
+class _RSClient(RSClient):
+ async def create_connection(self):
+ try:
+ return await super().create_connection()
+ except OSError as e:
+ # note: using "from e" here will set __cause__ of ConnectError
+ raise ConnectError(e) from e
+class ServerAddr:
+ def __init__(self, host: str, port: Union[int, str], *, protocol: str = None):
+ assert isinstance(host, str), repr(host)
+ if protocol is None:
+ protocol = 's'
+ if not host:
+ raise ValueError('host must not be empty')
+ if host[0] == '[' and host[-1] == ']': # IPv6
+ host = host[1:-1]
+ try:
+ net_addr = NetAddress(host, port) # this validates host and port
+ except Exception as e:
+ raise ValueError(f"cannot construct ServerAddr: invalid host or port (host={host}, port={port})") from e
+ if protocol not in _KNOWN_NETWORK_PROTOCOLS:
+ raise ValueError(f"invalid network protocol: {protocol}")
+ = str( # canonical form (if e.g. IPv6 address)
+ self.port = int(net_addr.port)
+ self.protocol = protocol
+ self._net_addr_str = str(net_addr)
+ @classmethod
+ def from_str(cls, s: str) -> 'ServerAddr':
+ # host might be IPv6 address, hence do rsplit:
+ host, port, protocol = str(s).rsplit(':', 2)
+ return ServerAddr(host=host, port=port, protocol=protocol)
+ @classmethod
+ def from_str_with_inference(cls, s: str) -> Optional['ServerAddr']:
+ """Construct ServerAddr from str, guessing missing details.
+ Ongoing compatibility not guaranteed.
+ """
+ if not s:
+ return None
+ items = str(s).rsplit(':', 2)
+ if len(items) < 2:
+ return None # although maybe we could guess the port too?
+ host = items[0]
+ port = items[1]
+ if len(items) >= 3:
+ protocol = items[2]
+ else:
+ return ServerAddr(host=host, port=port, protocol=protocol)
+ def to_friendly_name(self) -> str:
+ # note: this method is closely linked to from_str_with_inference
+ if self.protocol == 's': # hide trailing ":s"
+ return self.net_addr_str()
+ return str(self)
+ def __str__(self):
+ return '{}:{}'.format(self.net_addr_str(), self.protocol)
+ def to_json(self) -> str:
+ return str(self)
+ def __repr__(self):
+ return f'<ServerAddr host={} port={self.port} protocol={self.protocol}>'
+ def net_addr_str(self) -> str:
+ return self._net_addr_str
+ def __eq__(self, other):
+ if not isinstance(other, ServerAddr):
+ return False
+ return ( ==
+ and self.port == other.port
+ and self.protocol == other.protocol)
+ def __ne__(self, other):
+ return not (self == other)
+ def __hash__(self):
+ return hash((, self.port, self.protocol))
+def _get_cert_path_for_host(*, config: 'SimpleConfig', host: str) -> str:
+ filename = host
+ try:
+ ip = ip_address(host)
+ except ValueError:
+ pass
+ else:
+ if isinstance(ip, IPv6Address):
+ filename = f"ipv6_{ip.packed.hex()}"
+ return os.path.join(config.path, 'certs', filename)
+class Interface(Logger):
+ def __init__(self, *, network: 'Network', server: ServerAddr, proxy: Optional[dict]):
+ self.ready = asyncio.Future()
+ self.got_disconnected = asyncio.Event()
+ self.server = server
+ Logger.__init__(self)
+ assert network.config.path
+ self.cert_path = _get_cert_path_for_host(config=network.config,
+ self.blockchain = None # type: Optional[Blockchain]
+ self._requested_chunks = set() # type: Set[int]
+ = network
+ self.proxy = MySocksProxy.from_proxy_dict(proxy)
+ self.session = None # type: Optional[NotificationSession]
+ self._ipaddr_bucket = None
+ # Latest block header and corresponding height, as claimed by the server.
+ # Note that these values are updated before they are verified.
+ # Especially during initial header sync, verification can take a long time.
+ # Failing verification will get the interface closed.
+ self.tip_header = None
+ self.tip = 0
+ self.fee_estimates_eta = {}
+ # Dump network messages (only for this interface). Set at runtime from the console.
+ self.debug = False
+ self.taskgroup = SilentTaskGroup()
+ async def spawn_task():
+ task = await
+ if sys.version_info >= (3, 8):
+ task.set_name(f"interface::{str(server)}")
+ asyncio.run_coroutine_threadsafe(spawn_task(),
+ @property
+ def host(self):
+ return
+ @property
+ def port(self):
+ return self.server.port
+ @property
+ def protocol(self):
+ return self.server.protocol
+ def diagnostic_name(self):
+ return self.server.net_addr_str()
+ def __str__(self):
+ return f"<Interface {self.diagnostic_name()}>"
+ async def is_server_ca_signed(self, ca_ssl_context):
+ """Given a CA enforcing SSL context, returns True if the connection
+ can be established. Returns False if the server has a self-signed
+ certificate but otherwise is okay. Any other failures raise.
+ """
+ try:
+ await self.open_session(ca_ssl_context, exit_early=True)
+ except ConnectError as e:
+ cause = e.__cause__
+ if isinstance(cause, ssl.SSLError) and cause.reason == 'CERTIFICATE_VERIFY_FAILED':
+ # failures due to self-signed certs are normal
+ return False
+ raise
+ return True
+ async def _try_saving_ssl_cert_for_first_time(self, ca_ssl_context):
+ ca_signed = await self.is_server_ca_signed(ca_ssl_context)
+ if ca_signed:
+ if self._get_expected_fingerprint():
+ raise InvalidOptionCombination("cannot use --serverfingerprint with CA signed servers")
+ with open(self.cert_path, 'w') as f:
+ # empty file means this is CA signed, not self-signed
+ f.write('')
+ else:
+ await self._save_certificate()
+ def _is_saved_ssl_cert_available(self):
+ if not os.path.exists(self.cert_path):
+ return False
+ with open(self.cert_path, 'r') as f:
+ contents =
+ if contents == '': # CA signed
+ if self._get_expected_fingerprint():
+ raise InvalidOptionCombination("cannot use --serverfingerprint with CA signed servers")
+ return True
+ # pinned self-signed cert
+ try:
+ b = pem.dePem(contents, 'CERTIFICATE')
+ except SyntaxError as e:
+"error parsing already saved cert: {e}")
+ raise ErrorParsingSSLCert(e) from e
+ try:
+ x = x509.X509(b)
+ except Exception as e:
+"error parsing already saved cert: {e}")
+ raise ErrorParsingSSLCert(e) from e
+ try:
+ x.check_date()
+ except x509.CertificateError as e:
+"certificate has expired: {e}")
+ os.unlink(self.cert_path) # delete pinned cert only in this case
+ return False
+ self._verify_certificate_fingerprint(bytearray(b))
+ return True
+ async def _get_ssl_context(self):
+ if self.protocol != 's':
+ # using plaintext TCP
+ return None
+ # see if we already have cert for this server; or get it for the first time
+ ca_sslc = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=ca_path)
+ if not self._is_saved_ssl_cert_available():
+ try:
+ await self._try_saving_ssl_cert_for_first_time(ca_sslc)
+ except (OSError, ConnectError, aiorpcx.socks.SOCKSError) as e:
+ raise ErrorGettingSSLCertFromServer(e) from e
+ # now we have a file saved in our certificate store
+ siz = os.stat(self.cert_path).st_size
+ if siz == 0:
+ # CA signed cert
+ sslc = ca_sslc
+ else:
+ # pinned self-signed cert
+ sslc = ssl.create_default_context(ssl.Purpose.SERVER_AUTH, cafile=self.cert_path)
+ sslc.check_hostname = 0
+ return sslc
+ def handle_disconnect(func):
+ @functools.wraps(func)
+ async def wrapper_func(self: 'Interface', *args, **kwargs):
+ try:
+ return await func(self, *args, **kwargs)
+ except GracefulDisconnect as e:
+ self.logger.log(e.log_level, f"disconnecting due to {repr(e)}")
+ except aiorpcx.jsonrpc.RPCError as e:
+ self.logger.warning(f"disconnecting due to {repr(e)}")
+ self.logger.debug(f"(disconnect) trace for {repr(e)}", exc_info=True)
+ finally:
+ self.got_disconnected.set()
+ await
+ # if was not 'ready' yet, schedule waiting coroutines:
+ self.ready.cancel()
+ return wrapper_func
+ @ignore_exceptions # do not kill network.taskgroup
+ @log_exceptions
+ @handle_disconnect
+ async def run(self):
+ try:
+ ssl_context = await self._get_ssl_context()
+ except (ErrorParsingSSLCert, ErrorGettingSSLCertFromServer) as e:
+'disconnecting due to: {repr(e)}')
+ return
+ try:
+ await self.open_session(ssl_context)
+ except (asyncio.CancelledError, ConnectError, aiorpcx.socks.SOCKSError) as e:
+ # make SSL errors for main interface more visible (to help servers ops debug cert pinning issues)
+ if (isinstance(e, ConnectError) and isinstance(e.__cause__, ssl.SSLError)
+ and self.is_main_server() and not
+ self.logger.warning(f'Cannot connect to main server due to SSL error '
+ f'(maybe cert changed compared to "{self.cert_path}"). Exc: {repr(e)}')
+ else:
+'disconnecting due to: {repr(e)}')
+ return
+ def _mark_ready(self) -> None:
+ if self.ready.cancelled():
+ raise GracefulDisconnect('conn establishment was too slow; *ready* future was cancelled')
+ if self.ready.done():
+ return
+ assert self.tip_header
+ chain = blockchain.check_header(self.tip_header)
+ if not chain:
+ self.blockchain = blockchain.get_best_chain()
+ else:
+ self.blockchain = chain
+ assert self.blockchain is not None
+"set blockchain with height {self.blockchain.height()}")
+ self.ready.set_result(1)
+ async def _save_certificate(self) -> None:
+ if not os.path.exists(self.cert_path):
+ # we may need to retry this a few times, in case the handshake hasn't completed
+ for _ in range(10):
+ dercert = await self._fetch_certificate()
+ if dercert:
+"succeeded in getting cert")
+ self._verify_certificate_fingerprint(dercert)
+ with open(self.cert_path, 'w') as f:
+ cert = ssl.DER_cert_to_PEM_cert(dercert)
+ # workaround android bug
+ cert = re.sub("([^\n])-----END CERTIFICATE-----","\\1\n-----END CERTIFICATE-----",cert)
+ f.write(cert)
+ # even though close flushes we can't fsync when closed.
+ # and we must flush before fsyncing, cause flush flushes to OS buffer
+ # fsync writes to OS buffer to disk
+ f.flush()
+ os.fsync(f.fileno())
+ break
+ await asyncio.sleep(1)
+ else:
+ raise GracefulDisconnect("could not get certificate after 10 tries")
+ async def _fetch_certificate(self) -> bytes:
+ sslc = ssl.SSLContext()
+ async with _RSClient(session_factory=RPCSession,
+, port=self.port,
+ ssl=sslc, proxy=self.proxy) as session:
+ asyncio_transport = session.transport._asyncio_transport # type: asyncio.BaseTransport
+ ssl_object = asyncio_transport.get_extra_info("ssl_object") # type: ssl.SSLObject
+ return ssl_object.getpeercert(binary_form=True)
+ def _get_expected_fingerprint(self) -> Optional[str]:
+ if self.is_main_server():
+ return"serverfingerprint")
+ def _verify_certificate_fingerprint(self, certificate):
+ expected_fingerprint = self._get_expected_fingerprint()
+ if not expected_fingerprint:
+ return
+ fingerprint = hashlib.sha256(certificate).hexdigest()
+ fingerprints_match = fingerprint.lower() == expected_fingerprint.lower()
+ if not fingerprints_match:
+ util.trigger_callback('cert_mismatch')
+ raise ErrorSSLCertFingerprintMismatch('Refusing to connect to server due to cert fingerprint mismatch')
+"cert fingerprint verification passed")
+ async def get_block_header(self, height, assert_mode):
+'requesting block header {height} in mode {assert_mode}')
+ # use lower timeout as we usually have network.bhi_lock here
+ timeout =
+ res = await self.session.send_request('blockchain.block.header', [height], timeout=timeout)
+ return blockchain.deserialize_header(bytes.fromhex(res), height)
+ async def request_chunk(self, height: int, tip=None, *, can_return_early=False):
+ if not is_non_negative_integer(height):
+ raise Exception(f"{repr(height)} is not a block height")
+ index = height // 2016
+ if can_return_early and index in self._requested_chunks:
+ return
+"requesting chunk from height {height}")
+ size = 2016
+ if tip is not None:
+ size = min(size, tip - index * 2016 + 1)
+ size = max(size, 0)
+ try:
+ self._requested_chunks.add(index)
+ res = await self.session.send_request('blockchain.block.headers', [index * 2016, size])
+ finally:
+ self._requested_chunks.discard(index)
+ assert_dict_contains_field(res, field_name='count')
+ assert_dict_contains_field(res, field_name='hex')
+ assert_dict_contains_field(res, field_name='max')
+ assert_non_negative_integer(res['count'])
+ assert_non_negative_integer(res['max'])
+ assert_hex_str(res['hex'])
+ if len(res['hex']) != HEADER_SIZE * 2 * res['count']:
+ raise RequestCorrupted('inconsistent chunk hex and count')
+ # we never request more than 2016 headers, but we enforce those fit in a single response
+ if res['max'] < 2016:
+ raise RequestCorrupted(f"server uses too low 'max' count for block.headers: {res['max']} < 2016")
+ if res['count'] != size:
+ raise RequestCorrupted(f"expected {size} headers but only got {res['count']}")
+ conn = self.blockchain.connect_chunk(index, res['hex'])
+ if not conn:
+ return conn, 0
+ return conn, res['count']
+ def is_main_server(self) -> bool:
+ return ( == self or
+ is None and == self.server)
+ async def open_session(self, sslc, exit_early=False):
+ session_factory = lambda *args, iface=self, **kwargs: NotificationSession(*args, **kwargs, interface=iface)
+ async with _RSClient(session_factory=session_factory,
+, port=self.port,
+ ssl=sslc, proxy=self.proxy) as session:
+ self.session = session # type: NotificationSession
+ self.session.set_default_timeout(
+ try:
+ ver = await session.send_request('server.version', [self.client_name(), version.PROTOCOL_VERSION])
+ except aiorpcx.jsonrpc.RPCError as e:
+ raise GracefulDisconnect(e) # probably 'unsupported protocol version'
+ if exit_early:
+ return
+ if ver[1] != version.PROTOCOL_VERSION:
+ raise GracefulDisconnect(f'server violated protocol-version-negotiation. '
+ f'we asked for {version.PROTOCOL_VERSION!r}, they sent {ver[1]!r}')
+ if not
+ raise GracefulDisconnect(f'too many connected servers already '
+ f'in bucket {self.bucket_based_on_ipaddress()}')
+"connection established. version: {ver}")
+ try:
+ async with self.taskgroup as group:
+ await group.spawn(
+ await group.spawn(self.request_fee_estimates)
+ await group.spawn(self.run_fetch_blocks)
+ await group.spawn(self.monitor_connection)
+ except aiorpcx.jsonrpc.RPCError as e:
+ raise GracefulDisconnect(e, log_level=logging.WARNING) from e
+ raise
+ async def monitor_connection(self):
+ while True:
+ await asyncio.sleep(1)
+ if not self.session or self.session.is_closing():
+ raise GracefulDisconnect('session was closed')
+ async def ping(self):
+ while True:
+ await asyncio.sleep(300)
+ await self.session.send_request('')
+ async def request_fee_estimates(self):
+ from .simple_config import FEE_ETA_TARGETS
+ while True:
+ async with TaskGroup() as group:
+ fee_tasks = []
+ for i in FEE_ETA_TARGETS:
+ fee_tasks.append((i, await group.spawn(self.get_estimatefee(i))))
+ for nblock_target, task in fee_tasks:
+ fee = task.result()
+ if fee < 0: continue
+ self.fee_estimates_eta[nblock_target] = fee
+ await asyncio.sleep(60)
+ async def close(self, *, force_after: int = None):
+ """Closes the connection and waits for it to be closed.
+ We try to flush buffered data to the wire, so this can take some time.
+ """
+ if force_after is None:
+ # We give up after a while and just abort the connection.
+ # Note: specifically if the server is running Fulcrum, waiting seems hopeless,
+ # the connection must be aborted (see
+ force_after = 1 # seconds
+ if self.session:
+ await self.session.close(force_after=force_after)
+ # monitor_connection will cancel tasks
+ async def run_fetch_blocks(self):
+ header_queue = asyncio.Queue()
+ await self.session.subscribe('blockchain.headers.subscribe', [], header_queue)
+ while True:
+ item = await header_queue.get()
+ raw_header = item[0]
+ height = raw_header['height']
+ header = blockchain.deserialize_header(bfh(raw_header['hex']), height)
+ self.tip_header = header
+ self.tip = height
+ if self.tip <
+ raise GracefulDisconnect('server tip below max checkpoint')
+ self._mark_ready()
+ await self._process_header_at_tip()
+ # header processing done
+ util.trigger_callback('blockchain_updated')
+ util.trigger_callback('network_updated')
+ await
+ await
+ async def _process_header_at_tip(self):
+ height, header = self.tip, self.tip_header
+ async with
+ if self.blockchain.height() >= height and self.blockchain.check_header(header):
+ # another interface amended the blockchain
+"skipping header {height}")
+ return
+ _, height = await self.step(height, header)
+ # in the simple case, height == self.tip+1
+ if height <= self.tip:
+ await self.sync_until(height)
+ async def sync_until(self, height, next_height=None):
+ if next_height is None:
+ next_height = self.tip
+ last = None
+ while last is None or height <= next_height:
+ prev_last, prev_height = last, height
+ if next_height > height + 10:
+ could_connect, num_headers = await self.request_chunk(height, next_height)
+ if not could_connect:
+ if height <=
+ raise GracefulDisconnect('server chain conflicts with checkpoints or genesis')
+ last, height = await self.step(height)
+ continue
+ util.trigger_callback('network_updated')
+ height = (height // 2016 * 2016) + num_headers
+ assert height <= next_height+1, (height, self.tip)
+ last = 'catchup'
+ else:
+ last, height = await self.step(height)
+ assert (prev_last, prev_height) != (last, height), 'had to prevent infinite loop in interface.sync_until'
+ return last, height
+ async def step(self, height, header=None):
+ assert 0 <= height <= self.tip, (height, self.tip)
+ if header is None:
+ header = await self.get_block_header(height, 'catchup')
+ chain = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header)
+ if chain:
+ self.blockchain = chain if isinstance(chain, Blockchain) else self.blockchain
+ # note: there is an edge case here that is not handled.
+ # we might know the blockhash (enough for check_header) but
+ # not have the header itself. e.g. regtest chain with only genesis.
+ # this situation resolves itself on the next block
+ return 'catchup', height+1
+ can_connect = blockchain.can_connect(header) if 'mock' not in header else header['mock']['connect'](height)
+ if not can_connect:
+"can't connect {height}")
+ height, header, bad, bad_header = await self._search_headers_backwards(height, header)
+ chain = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header)
+ can_connect = blockchain.can_connect(header) if 'mock' not in header else header['mock']['connect'](height)
+ assert chain or can_connect
+ if can_connect:
+"could connect {height}")
+ height += 1
+ if isinstance(can_connect, Blockchain): # not when mocking
+ self.blockchain = can_connect
+ self.blockchain.save_header(header)
+ return 'catchup', height
+ good, bad, bad_header = await self._search_headers_binary(height, bad, bad_header, chain)
+ return await self._resolve_potential_chain_fork_given_forkpoint(good, bad, bad_header)
+ async def _search_headers_binary(self, height, bad, bad_header, chain):
+ assert bad == bad_header['block_height']
+ _assert_header_does_not_check_against_any_chain(bad_header)
+ self.blockchain = chain if isinstance(chain, Blockchain) else self.blockchain
+ good = height
+ while True:
+ assert good < bad, (good, bad)
+ height = (good + bad) // 2
+"binary step. good {good}, bad {bad}, height {height}")
+ header = await self.get_block_header(height, 'binary')
+ chain = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header)
+ if chain:
+ self.blockchain = chain if isinstance(chain, Blockchain) else self.blockchain
+ good = height
+ else:
+ bad = height
+ bad_header = header
+ if good + 1 == bad:
+ break
+ mock = 'mock' in bad_header and bad_header['mock']['connect'](height)
+ real = not mock and self.blockchain.can_connect(bad_header, check_height=False)
+ if not real and not mock:
+ raise Exception('unexpected bad header during binary: {}'.format(bad_header))
+ _assert_header_does_not_check_against_any_chain(bad_header)
+"binary search exited. good {good}, bad {bad}")
+ return good, bad, bad_header
+ async def _resolve_potential_chain_fork_given_forkpoint(self, good, bad, bad_header):
+ assert good + 1 == bad
+ assert bad == bad_header['block_height']
+ _assert_header_does_not_check_against_any_chain(bad_header)
+ # 'good' is the height of a block 'good_header', somewhere in self.blockchain.
+ # bad_header connects to good_header; bad_header itself is NOT in self.blockchain.
+ bh = self.blockchain.height()
+ assert bh >= good, (bh, good)
+ if bh == good:
+ height = good + 1
+"catching up from {height}")
+ return 'no_fork', height
+ # this is a new fork we don't yet have
+ height = bad + 1
+"new fork at bad height {bad}")
+ forkfun = self.blockchain.fork if 'mock' not in bad_header else bad_header['mock']['fork']
+ b = forkfun(bad_header) # type: Blockchain
+ self.blockchain = b
+ assert b.forkpoint == bad
+ return 'fork', height
+ async def _search_headers_backwards(self, height, header):
+ async def iterate():
+ nonlocal height, header
+ checkp = False
+ if height <=
+ height =
+ checkp = True
+ header = await self.get_block_header(height, 'backward')
+ chain = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header)
+ can_connect = blockchain.can_connect(header) if 'mock' not in header else header['mock']['connect'](height)
+ if chain or can_connect:
+ return False
+ if checkp:
+ raise GracefulDisconnect("server chain conflicts with checkpoints")
+ return True
+ bad, bad_header = height, header
+ _assert_header_does_not_check_against_any_chain(bad_header)
+ with blockchain.blockchains_lock: chains = list(blockchain.blockchains.values())
+ local_max = max([0] + [x.height() for x in chains]) if 'mock' not in header else float('inf')
+ height = min(local_max + 1, height - 1)
+ while await iterate():
+ bad, bad_header = height, header
+ delta = self.tip - height
+ height = self.tip - 2 * delta
+ _assert_header_does_not_check_against_any_chain(bad_header)
+"exiting backward mode at {height}")
+ return height, header, bad, bad_header
+ @classmethod
+ def client_name(cls) -> str:
+ return f'electrum/{version.ELECTRUM_VERSION}'
+ def is_tor(self):
+ return'.onion')
+ def ip_addr(self) -> Optional[str]:
+ session = self.session
+ if not session: return None
+ peer_addr = session.remote_address()
+ if not peer_addr: return None
+ return str(
+ def bucket_based_on_ipaddress(self) -> str:
+ def do_bucket():
+ if self.is_tor():
+ try:
+ ip_addr = ip_address(self.ip_addr()) # type: Union[IPv4Address, IPv6Address]
+ except ValueError:
+ return ''
+ if not ip_addr:
+ return ''
+ if ip_addr.is_loopback: # localhost is exempt
+ return ''
+ if ip_addr.version == 4:
+ slash16 = IPv4Network(ip_addr).supernet(prefixlen_diff=32-16)
+ return str(slash16)
+ elif ip_addr.version == 6:
+ slash48 = IPv6Network(ip_addr).supernet(prefixlen_diff=128-48)
+ return str(slash48)
+ return ''
+ if not self._ipaddr_bucket:
+ self._ipaddr_bucket = do_bucket()
+ return self._ipaddr_bucket
+ async def get_merkle_for_transaction(self, tx_hash: str, tx_height: int) -> dict:
+ if not is_hash256_str(tx_hash):
+ raise Exception(f"{repr(tx_hash)} is not a txid")
+ if not is_non_negative_integer(tx_height):
+ raise Exception(f"{repr(tx_height)} is not a block height")
+ # do request
+ res = await self.session.send_request('blockchain.transaction.get_merkle', [tx_hash, tx_height])
+ # check response
+ block_height = assert_dict_contains_field(res, field_name='block_height')
+ merkle = assert_dict_contains_field(res, field_name='merkle')
+ pos = assert_dict_contains_field(res, field_name='pos')
+ # note: tx_height was just a hint to the server, don't enforce the response to match it
+ assert_non_negative_integer(block_height)
+ assert_non_negative_integer(pos)
+ assert_list_or_tuple(merkle)
+ for item in merkle:
+ assert_hash256_str(item)
+ return res
+ async def get_transaction(self, tx_hash: str, *, timeout=None) -> str:
+ if not is_hash256_str(tx_hash):
+ raise Exception(f"{repr(tx_hash)} is not a txid")
+ raw = await self.session.send_request('blockchain.transaction.get', [tx_hash], timeout=timeout)
+ # validate response
+ if not is_hex_str(raw):
+ raise RequestCorrupted(f"received garbage (non-hex) as tx data (txid {tx_hash}): {raw!r}")
+ tx = Transaction(raw)
+ try:
+ tx.deserialize() # see if raises
+ except Exception as e:
+ raise RequestCorrupted(f"cannot deserialize received transaction (txid {tx_hash})") from e
+ if tx.txid() != tx_hash:
+ raise RequestCorrupted(f"received tx does not match expected txid {tx_hash} (got {tx.txid()})")
+ return raw
+ async def get_history_for_scripthash(self, sh: str) -> List[dict]:
+ if not is_hash256_str(sh):
+ raise Exception(f"{repr(sh)} is not a scripthash")
+ # do request
+ res = await self.session.send_request('blockchain.scripthash.get_history', [sh])
+ # check response
+ assert_list_or_tuple(res)
+ prev_height = 1
+ for tx_item in res:
+ height = assert_dict_contains_field(tx_item, field_name='height')
+ assert_dict_contains_field(tx_item, field_name='tx_hash')
+ assert_integer(height)
+ assert_hash256_str(tx_item['tx_hash'])
+ if height in (-1, 0):
+ assert_dict_contains_field(tx_item, field_name='fee')
+ assert_non_negative_integer(tx_item['fee'])
+ prev_height = - float("inf") # this ensures confirmed txs can't follow mempool txs
+ else:
+ # check monotonicity of heights
+ if height < prev_height:
+ raise RequestCorrupted(f'heights of confirmed txs must be in increasing order')
+ prev_height = height
+ hashes = set(map(lambda item: item['tx_hash'], res))
+ if len(hashes) != len(res):
+ # Either server is sending garbage... or maybe if server is race-prone
+ # a recently mined tx could be included in both last block and mempool?
+ # Still, it's simplest to just disregard the response.
+ raise RequestCorrupted(f"server history has non-unique txids for sh={sh}")
+ return res
+ async def listunspent_for_scripthash(self, sh: str) -> List[dict]:
+ if not is_hash256_str(sh):
+ raise Exception(f"{repr(sh)} is not a scripthash")
+ # do request
+ res = await self.session.send_request('blockchain.scripthash.listunspent', [sh])
+ # check response
+ assert_list_or_tuple(res)
+ for utxo_item in res:
+ assert_dict_contains_field(utxo_item, field_name='tx_pos')
+ assert_dict_contains_field(utxo_item, field_name='value')
+ assert_dict_contains_field(utxo_item, field_name='tx_hash')
+ assert_dict_contains_field(utxo_item, field_name='height')
+ assert_non_negative_integer(utxo_item['tx_pos'])
+ assert_non_negative_integer(utxo_item['value'])
+ assert_non_negative_integer(utxo_item['height'])
+ assert_hash256_str(utxo_item['tx_hash'])
+ return res
+ async def get_balance_for_scripthash(self, sh: str) -> dict:
+ if not is_hash256_str(sh):
+ raise Exception(f"{repr(sh)} is not a scripthash")
+ # do request
+ res = await self.session.send_request('blockchain.scripthash.get_balance', [sh])
+ # check response
+ assert_dict_contains_field(res, field_name='confirmed')
+ assert_dict_contains_field(res, field_name='unconfirmed')
+ assert_non_negative_integer(res['confirmed'])
+ assert_non_negative_integer(res['unconfirmed'])
+ return res
+ async def get_txid_from_txpos(self, tx_height: int, tx_pos: int, merkle: bool):
+ if not is_non_negative_integer(tx_height):
+ raise Exception(f"{repr(tx_height)} is not a block height")
+ if not is_non_negative_integer(tx_pos):
+ raise Exception(f"{repr(tx_pos)} should be non-negative integer")
+ # do request
+ res = await self.session.send_request(
+ 'blockchain.transaction.id_from_pos',
+ [tx_height, tx_pos, merkle],
+ )
+ # check response
+ if merkle:
+ assert_dict_contains_field(res, field_name='tx_hash')
+ assert_dict_contains_field(res, field_name='merkle')
+ assert_hash256_str(res['tx_hash'])
+ assert_list_or_tuple(res['merkle'])
+ for node_hash in res['merkle']:
+ assert_hash256_str(node_hash)
+ else:
+ assert_hash256_str(res)
+ return res
+ async def get_fee_histogram(self) -> Sequence[Tuple[Union[float, int], int]]:
+ # do request
+ res = await self.session.send_request('mempool.get_fee_histogram')
+ # check response
+ assert_list_or_tuple(res)
+ prev_fee = float('inf')
+ for fee, s in res:
+ assert_non_negative_int_or_float(fee)
+ assert_non_negative_integer(s)
+ if fee >= prev_fee: # check monotonicity
+ raise RequestCorrupted(f'fees must be in decreasing order')
+ prev_fee = fee
+ return res
+ async def get_server_banner(self) -> str:
+ # do request
+ res = await self.session.send_request('server.banner')
+ # check response
+ if not isinstance(res, str):
+ raise RequestCorrupted(f'{res!r} should be a str')
+ return res
+ async def get_donation_address(self) -> str:
+ # do request
+ res = await self.session.send_request('server.donation_address')
+ # check response
+ if not res: # ignore empty string
+ return ''
+ if not bitcoin.is_address(res):
+ # note: do not hard-fail -- allow server to use future-type
+ # bitcoin address we do not recognize
+"invalid donation address from server: {repr(res)}")
+ res = ''
+ return res
+ async def get_relay_fee(self) -> int:
+ """Returns the min relay feerate in sat/kbyte."""
+ # do request
+ res = await self.session.send_request('blockchain.relayfee')
+ # check response
+ assert_non_negative_int_or_float(res)
+ relayfee = int(res * bitcoin.COIN)
+ relayfee = max(0, relayfee)
+ return relayfee
+ async def get_estimatefee(self, num_blocks: int) -> int:
+ """Returns a feerate estimate for getting confirmed within
+ num_blocks blocks, in sat/kbyte.
+ """
+ if not is_non_negative_integer(num_blocks):
+ raise Exception(f"{repr(num_blocks)} is not a num_blocks")
+ # do request
+ res = await self.session.send_request('blockchain.estimatefee', [num_blocks])
+ # check response
+ if res != -1:
+ assert_non_negative_int_or_float(res)
+ res = int(res * bitcoin.COIN)
+ return res
+def _assert_header_does_not_check_against_any_chain(header: dict) -> None:
+ chain_bad = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header)
+ if chain_bad:
+ raise Exception('bad_header must not check!')
+def check_cert(host, cert):
+ try:
+ b = pem.dePem(cert, 'CERTIFICATE')
+ x = x509.X509(b)
+ except:
+ traceback.print_exc(file=sys.stdout)
+ return
+ try:
+ x.check_date()
+ expired = False
+ except:
+ expired = True
+ m = "host: %s\n"%host
+ m += "has_expired: %s\n"% expired
+ util.print_msg(m)
+# Used by tests
+def _match_hostname(name, val):
+ if val == name:
+ return True
+ return val.startswith('*.') and name.endswith(val[1:])
+def test_certificates():
+ from .simple_config import SimpleConfig
+ config = SimpleConfig()
+ mydir = os.path.join(config.path, "certs")
+ certs = os.listdir(mydir)
+ for c in certs:
+ p = os.path.join(mydir,c)
+ with open(p, encoding='utf-8') as f:
+ cert =
+ check_cert(c, cert)
+if __name__ == "__main__":
+ test_certificates()