electrum

Electrum Bitcoin wallet
git clone https://git.parazyd.org/electrum
Log | Files | Refs | Submodules

old_interface.py (47478B)


      1 #!/usr/bin/env python
      2 #
      3 # Electrum - lightweight Bitcoin client
      4 # Copyright (C) 2011 thomasv@gitorious
      5 #
      6 # Permission is hereby granted, free of charge, to any person
      7 # obtaining a copy of this software and associated documentation files
      8 # (the "Software"), to deal in the Software without restriction,
      9 # including without limitation the rights to use, copy, modify, merge,
     10 # publish, distribute, sublicense, and/or sell copies of the Software,
     11 # and to permit persons to whom the Software is furnished to do so,
     12 # subject to the following conditions:
     13 #
     14 # The above copyright notice and this permission notice shall be
     15 # included in all copies or substantial portions of the Software.
     16 #
     17 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
     18 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
     19 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
     20 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
     21 # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
     22 # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
     23 # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
     24 # SOFTWARE.
     25 import os
     26 import re
     27 import ssl
     28 import sys
     29 import traceback
     30 import asyncio
     31 import socket
     32 from typing import Tuple, Union, List, TYPE_CHECKING, Optional, Set, NamedTuple, Any, Sequence
     33 from collections import defaultdict
     34 from ipaddress import IPv4Network, IPv6Network, ip_address, IPv6Address, IPv4Address
     35 import itertools
     36 import logging
     37 import hashlib
     38 import functools
     39 
     40 import aiorpcx
     41 from aiorpcx import TaskGroup
     42 from aiorpcx import RPCSession, Notification, NetAddress, NewlineFramer
     43 from aiorpcx.curio import timeout_after, TaskTimeout
     44 from aiorpcx.jsonrpc import JSONRPC, CodeMessageError
     45 from aiorpcx.rawsocket import RSClient
     46 import certifi
     47 
     48 from .util import (ignore_exceptions, log_exceptions, bfh, SilentTaskGroup, MySocksProxy,
     49                    is_integer, is_non_negative_integer, is_hash256_str, is_hex_str,
     50                    is_int_or_float, is_non_negative_int_or_float)
     51 from . import util
     52 from . import x509
     53 from . import pem
     54 from . import version
     55 from . import blockchain
     56 from .blockchain import Blockchain, HEADER_SIZE
     57 from . import bitcoin
     58 from . import constants
     59 from .i18n import _
     60 from .logging import Logger
     61 from .transaction import Transaction
     62 
     63 if TYPE_CHECKING:
     64     from .network import Network
     65     from .simple_config import SimpleConfig
     66 
     67 
     68 ca_path = certifi.where()
     69 
     70 BUCKET_NAME_OF_ONION_SERVERS = 'onion'
     71 
     72 MAX_INCOMING_MSG_SIZE = 1_000_000  # in bytes
     73 
     74 _KNOWN_NETWORK_PROTOCOLS = {'t', 's'}
     75 PREFERRED_NETWORK_PROTOCOL = 's'
     76 assert PREFERRED_NETWORK_PROTOCOL in _KNOWN_NETWORK_PROTOCOLS
     77 
     78 
     79 class NetworkTimeout:
     80     # seconds
     81     class Generic:
     82         NORMAL = 30
     83         RELAXED = 45
     84         MOST_RELAXED = 600
     85 
     86     class Urgent(Generic):
     87         NORMAL = 10
     88         RELAXED = 20
     89         MOST_RELAXED = 60
     90 
     91 
     92 def assert_non_negative_integer(val: Any) -> None:
     93     if not is_non_negative_integer(val):
     94         raise RequestCorrupted(f'{val!r} should be a non-negative integer')
     95 
     96 
     97 def assert_integer(val: Any) -> None:
     98     if not is_integer(val):
     99         raise RequestCorrupted(f'{val!r} should be an integer')
    100 
    101 
    102 def assert_int_or_float(val: Any) -> None:
    103     if not is_int_or_float(val):
    104         raise RequestCorrupted(f'{val!r} should be int or float')
    105 
    106 
    107 def assert_non_negative_int_or_float(val: Any) -> None:
    108     if not is_non_negative_int_or_float(val):
    109         raise RequestCorrupted(f'{val!r} should be a non-negative int or float')
    110 
    111 
    112 def assert_hash256_str(val: Any) -> None:
    113     if not is_hash256_str(val):
    114         raise RequestCorrupted(f'{val!r} should be a hash256 str')
    115 
    116 
    117 def assert_hex_str(val: Any) -> None:
    118     if not is_hex_str(val):
    119         raise RequestCorrupted(f'{val!r} should be a hex str')
    120 
    121 
    122 def assert_dict_contains_field(d: Any, *, field_name: str) -> Any:
    123     if not isinstance(d, dict):
    124         raise RequestCorrupted(f'{d!r} should be a dict')
    125     if field_name not in d:
    126         raise RequestCorrupted(f'required field {field_name!r} missing from dict')
    127     return d[field_name]
    128 
    129 
    130 def assert_list_or_tuple(val: Any) -> None:
    131     if not isinstance(val, (list, tuple)):
    132         raise RequestCorrupted(f'{val!r} should be a list or tuple')
    133 
    134 
    135 class NotificationSession(RPCSession):
    136 
    137     def __init__(self, *args, interface: 'Interface', **kwargs):
    138         super(NotificationSession, self).__init__(*args, **kwargs)
    139         self.subscriptions = defaultdict(list)
    140         self.cache = {}
    141         self.default_timeout = NetworkTimeout.Generic.NORMAL
    142         self._msg_counter = itertools.count(start=1)
    143         self.interface = interface
    144         self.cost_hard_limit = 0  # disable aiorpcx resource limits
    145 
    146     async def handle_request(self, request):
    147         self.maybe_log(f"--> {request}")
    148         try:
    149             if isinstance(request, Notification):
    150                 params, result = request.args[:-1], request.args[-1]
    151                 key = self.get_hashable_key_for_rpc_call(request.method, params)
    152                 if key in self.subscriptions:
    153                     self.cache[key] = result
    154                     for queue in self.subscriptions[key]:
    155                         await queue.put(request.args)
    156                 else:
    157                     raise Exception(f'unexpected notification')
    158             else:
    159                 raise Exception(f'unexpected request. not a notification')
    160         except Exception as e:
    161             self.interface.logger.info(f"error handling request {request}. exc: {repr(e)}")
    162             await self.close()
    163 
    164     async def send_request(self, *args, timeout=None, **kwargs):
    165         # note: semaphores/timeouts/backpressure etc are handled by
    166         # aiorpcx. the timeout arg here in most cases should not be set
    167         msg_id = next(self._msg_counter)
    168         self.maybe_log(f"<-- {args} {kwargs} (id: {msg_id})")
    169         try:
    170             # note: RPCSession.send_request raises TaskTimeout in case of a timeout.
    171             # TaskTimeout is a subclass of CancelledError, which is *suppressed* in TaskGroups
    172             response = await asyncio.wait_for(
    173                 super().send_request(*args, **kwargs),
    174                 timeout)
    175         except (TaskTimeout, asyncio.TimeoutError) as e:
    176             raise RequestTimedOut(f'request timed out: {args} (id: {msg_id})') from e
    177         except CodeMessageError as e:
    178             self.maybe_log(f"--> {repr(e)} (id: {msg_id})")
    179             raise
    180         else:
    181             self.maybe_log(f"--> {response} (id: {msg_id})")
    182             return response
    183 
    184     def set_default_timeout(self, timeout):
    185         self.sent_request_timeout = timeout
    186         self.max_send_delay = timeout
    187 
    188     async def subscribe(self, method: str, params: List, queue: asyncio.Queue):
    189         # note: until the cache is written for the first time,
    190         # each 'subscribe' call might make a request on the network.
    191         key = self.get_hashable_key_for_rpc_call(method, params)
    192         self.subscriptions[key].append(queue)
    193         if key in self.cache:
    194             result = self.cache[key]
    195         else:
    196             result = await self.send_request(method, params)
    197             self.cache[key] = result
    198         await queue.put(params + [result])
    199 
    200     def unsubscribe(self, queue):
    201         """Unsubscribe a callback to free object references to enable GC."""
    202         # note: we can't unsubscribe from the server, so we keep receiving
    203         # subsequent notifications
    204         for v in self.subscriptions.values():
    205             if queue in v:
    206                 v.remove(queue)
    207 
    208     @classmethod
    209     def get_hashable_key_for_rpc_call(cls, method, params):
    210         """Hashable index for subscriptions and cache"""
    211         return str(method) + repr(params)
    212 
    213     def maybe_log(self, msg: str) -> None:
    214         if not self.interface: return
    215         if self.interface.debug or self.interface.network.debug:
    216             self.interface.logger.debug(msg)
    217 
    218     def default_framer(self):
    219         # overridden so that max_size can be customized
    220         max_size = int(self.interface.network.config.get('network_max_incoming_msg_size',
    221                                                          MAX_INCOMING_MSG_SIZE))
    222         return NewlineFramer(max_size=max_size)
    223 
    224 
    225 class NetworkException(Exception): pass
    226 
    227 
    228 class GracefulDisconnect(NetworkException):
    229     log_level = logging.INFO
    230 
    231     def __init__(self, *args, log_level=None, **kwargs):
    232         Exception.__init__(self, *args, **kwargs)
    233         if log_level is not None:
    234             self.log_level = log_level
    235 
    236 
    237 class RequestTimedOut(GracefulDisconnect):
    238     def __str__(self):
    239         return _("Network request timed out.")
    240 
    241 
    242 class RequestCorrupted(Exception): pass
    243 
    244 class ErrorParsingSSLCert(Exception): pass
    245 class ErrorGettingSSLCertFromServer(Exception): pass
    246 class ErrorSSLCertFingerprintMismatch(Exception): pass
    247 class InvalidOptionCombination(Exception): pass
    248 class ConnectError(NetworkException): pass
    249 
    250 
    251 class _RSClient(RSClient):
    252     async def create_connection(self):
    253         try:
    254             return await super().create_connection()
    255         except OSError as e:
    256             # note: using "from e" here will set __cause__ of ConnectError
    257             raise ConnectError(e) from e
    258 
    259 
    260 class ServerAddr:
    261 
    262     def __init__(self, host: str, port: Union[int, str], *, protocol: str = None):
    263         assert isinstance(host, str), repr(host)
    264         if protocol is None:
    265             protocol = 's'
    266         if not host:
    267             raise ValueError('host must not be empty')
    268         if host[0] == '[' and host[-1] == ']':  # IPv6
    269             host = host[1:-1]
    270         try:
    271             net_addr = NetAddress(host, port)  # this validates host and port
    272         except Exception as e:
    273             raise ValueError(f"cannot construct ServerAddr: invalid host or port (host={host}, port={port})") from e
    274         if protocol not in _KNOWN_NETWORK_PROTOCOLS:
    275             raise ValueError(f"invalid network protocol: {protocol}")
    276         self.host = str(net_addr.host)  # canonical form (if e.g. IPv6 address)
    277         self.port = int(net_addr.port)
    278         self.protocol = protocol
    279         self._net_addr_str = str(net_addr)
    280 
    281     @classmethod
    282     def from_str(cls, s: str) -> 'ServerAddr':
    283         # host might be IPv6 address, hence do rsplit:
    284         host, port, protocol = str(s).rsplit(':', 2)
    285         return ServerAddr(host=host, port=port, protocol=protocol)
    286 
    287     @classmethod
    288     def from_str_with_inference(cls, s: str) -> Optional['ServerAddr']:
    289         """Construct ServerAddr from str, guessing missing details.
    290         Ongoing compatibility not guaranteed.
    291         """
    292         if not s:
    293             return None
    294         items = str(s).rsplit(':', 2)
    295         if len(items) < 2:
    296             return None  # although maybe we could guess the port too?
    297         host = items[0]
    298         port = items[1]
    299         if len(items) >= 3:
    300             protocol = items[2]
    301         else:
    302             protocol = PREFERRED_NETWORK_PROTOCOL
    303         return ServerAddr(host=host, port=port, protocol=protocol)
    304 
    305     def to_friendly_name(self) -> str:
    306         # note: this method is closely linked to from_str_with_inference
    307         if self.protocol == 's':  # hide trailing ":s"
    308             return self.net_addr_str()
    309         return str(self)
    310 
    311     def __str__(self):
    312         return '{}:{}'.format(self.net_addr_str(), self.protocol)
    313 
    314     def to_json(self) -> str:
    315         return str(self)
    316 
    317     def __repr__(self):
    318         return f'<ServerAddr host={self.host} port={self.port} protocol={self.protocol}>'
    319 
    320     def net_addr_str(self) -> str:
    321         return self._net_addr_str
    322 
    323     def __eq__(self, other):
    324         if not isinstance(other, ServerAddr):
    325             return False
    326         return (self.host == other.host
    327                 and self.port == other.port
    328                 and self.protocol == other.protocol)
    329 
    330     def __ne__(self, other):
    331         return not (self == other)
    332 
    333     def __hash__(self):
    334         return hash((self.host, self.port, self.protocol))
    335 
    336 
    337 def _get_cert_path_for_host(*, config: 'SimpleConfig', host: str) -> str:
    338     filename = host
    339     try:
    340         ip = ip_address(host)
    341     except ValueError:
    342         pass
    343     else:
    344         if isinstance(ip, IPv6Address):
    345             filename = f"ipv6_{ip.packed.hex()}"
    346     return os.path.join(config.path, 'certs', filename)
    347 
    348 
    349 class Interface(Logger):
    350 
    351     LOGGING_SHORTCUT = 'i'
    352 
    353     def __init__(self, *, network: 'Network', server: ServerAddr, proxy: Optional[dict]):
    354         self.ready = asyncio.Future()
    355         self.got_disconnected = asyncio.Event()
    356         self.server = server
    357         Logger.__init__(self)
    358         assert network.config.path
    359         self.cert_path = _get_cert_path_for_host(config=network.config, host=self.host)
    360         self.blockchain = None  # type: Optional[Blockchain]
    361         self._requested_chunks = set()  # type: Set[int]
    362         self.network = network
    363         self.proxy = MySocksProxy.from_proxy_dict(proxy)
    364         self.session = None  # type: Optional[NotificationSession]
    365         self._ipaddr_bucket = None
    366 
    367         # Latest block header and corresponding height, as claimed by the server.
    368         # Note that these values are updated before they are verified.
    369         # Especially during initial header sync, verification can take a long time.
    370         # Failing verification will get the interface closed.
    371         self.tip_header = None
    372         self.tip = 0
    373 
    374         self.fee_estimates_eta = {}
    375 
    376         # Dump network messages (only for this interface).  Set at runtime from the console.
    377         self.debug = False
    378 
    379         self.taskgroup = SilentTaskGroup()
    380 
    381         async def spawn_task():
    382             task = await self.network.taskgroup.spawn(self.run())
    383             if sys.version_info >= (3, 8):
    384                 task.set_name(f"interface::{str(server)}")
    385         asyncio.run_coroutine_threadsafe(spawn_task(), self.network.asyncio_loop)
    386 
    387     @property
    388     def host(self):
    389         return self.server.host
    390 
    391     @property
    392     def port(self):
    393         return self.server.port
    394 
    395     @property
    396     def protocol(self):
    397         return self.server.protocol
    398 
    399     def diagnostic_name(self):
    400         return self.server.net_addr_str()
    401 
    402     def __str__(self):
    403         return f"<Interface {self.diagnostic_name()}>"
    404 
    405     async def is_server_ca_signed(self, ca_ssl_context):
    406         """Given a CA enforcing SSL context, returns True if the connection
    407         can be established. Returns False if the server has a self-signed
    408         certificate but otherwise is okay. Any other failures raise.
    409         """
    410         try:
    411             await self.open_session(ca_ssl_context, exit_early=True)
    412         except ConnectError as e:
    413             cause = e.__cause__
    414             if isinstance(cause, ssl.SSLError) and cause.reason == 'CERTIFICATE_VERIFY_FAILED':
    415                 # failures due to self-signed certs are normal
    416                 return False
    417             raise
    418         return True
    419 
    420     async def _try_saving_ssl_cert_for_first_time(self, ca_ssl_context):
    421         ca_signed = await self.is_server_ca_signed(ca_ssl_context)
    422         if ca_signed:
    423             if self._get_expected_fingerprint():
    424                 raise InvalidOptionCombination("cannot use --serverfingerprint with CA signed servers")
    425             with open(self.cert_path, 'w') as f:
    426                 # empty file means this is CA signed, not self-signed
    427                 f.write('')
    428         else:
    429             await self._save_certificate()
    430 
    431     def _is_saved_ssl_cert_available(self):
    432         if not os.path.exists(self.cert_path):
    433             return False
    434         with open(self.cert_path, 'r') as f:
    435             contents = f.read()
    436         if contents == '':  # CA signed
    437             if self._get_expected_fingerprint():
    438                 raise InvalidOptionCombination("cannot use --serverfingerprint with CA signed servers")
    439             return True
    440         # pinned self-signed cert
    441         try:
    442             b = pem.dePem(contents, 'CERTIFICATE')
    443         except SyntaxError as e:
    444             self.logger.info(f"error parsing already saved cert: {e}")
    445             raise ErrorParsingSSLCert(e) from e
    446         try:
    447             x = x509.X509(b)
    448         except Exception as e:
    449             self.logger.info(f"error parsing already saved cert: {e}")
    450             raise ErrorParsingSSLCert(e) from e
    451         try:
    452             x.check_date()
    453         except x509.CertificateError as e:
    454             self.logger.info(f"certificate has expired: {e}")
    455             os.unlink(self.cert_path)  # delete pinned cert only in this case
    456             return False
    457         self._verify_certificate_fingerprint(bytearray(b))
    458         return True
    459 
    460     async def _get_ssl_context(self):
    461         if self.protocol != 's':
    462             # using plaintext TCP
    463             return None
    464 
    465         # see if we already have cert for this server; or get it for the first time
    466         ca_sslc = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=ca_path)
    467         if not self._is_saved_ssl_cert_available():
    468             try:
    469                 await self._try_saving_ssl_cert_for_first_time(ca_sslc)
    470             except (OSError, ConnectError, aiorpcx.socks.SOCKSError) as e:
    471                 raise ErrorGettingSSLCertFromServer(e) from e
    472         # now we have a file saved in our certificate store
    473         siz = os.stat(self.cert_path).st_size
    474         if siz == 0:
    475             # CA signed cert
    476             sslc = ca_sslc
    477         else:
    478             # pinned self-signed cert
    479             sslc = ssl.create_default_context(ssl.Purpose.SERVER_AUTH, cafile=self.cert_path)
    480             sslc.check_hostname = 0
    481         return sslc
    482 
    483     def handle_disconnect(func):
    484         @functools.wraps(func)
    485         async def wrapper_func(self: 'Interface', *args, **kwargs):
    486             try:
    487                 return await func(self, *args, **kwargs)
    488             except GracefulDisconnect as e:
    489                 self.logger.log(e.log_level, f"disconnecting due to {repr(e)}")
    490             except aiorpcx.jsonrpc.RPCError as e:
    491                 self.logger.warning(f"disconnecting due to {repr(e)}")
    492                 self.logger.debug(f"(disconnect) trace for {repr(e)}", exc_info=True)
    493             finally:
    494                 self.got_disconnected.set()
    495                 await self.network.connection_down(self)
    496                 # if was not 'ready' yet, schedule waiting coroutines:
    497                 self.ready.cancel()
    498         return wrapper_func
    499 
    500     @ignore_exceptions  # do not kill network.taskgroup
    501     @log_exceptions
    502     @handle_disconnect
    503     async def run(self):
    504         try:
    505             ssl_context = await self._get_ssl_context()
    506         except (ErrorParsingSSLCert, ErrorGettingSSLCertFromServer) as e:
    507             self.logger.info(f'disconnecting due to: {repr(e)}')
    508             return
    509         try:
    510             await self.open_session(ssl_context)
    511         except (asyncio.CancelledError, ConnectError, aiorpcx.socks.SOCKSError) as e:
    512             # make SSL errors for main interface more visible (to help servers ops debug cert pinning issues)
    513             if (isinstance(e, ConnectError) and isinstance(e.__cause__, ssl.SSLError)
    514                     and self.is_main_server() and not self.network.auto_connect):
    515                 self.logger.warning(f'Cannot connect to main server due to SSL error '
    516                                     f'(maybe cert changed compared to "{self.cert_path}"). Exc: {repr(e)}')
    517             else:
    518                 self.logger.info(f'disconnecting due to: {repr(e)}')
    519             return
    520 
    521     def _mark_ready(self) -> None:
    522         if self.ready.cancelled():
    523             raise GracefulDisconnect('conn establishment was too slow; *ready* future was cancelled')
    524         if self.ready.done():
    525             return
    526 
    527         assert self.tip_header
    528         chain = blockchain.check_header(self.tip_header)
    529         if not chain:
    530             self.blockchain = blockchain.get_best_chain()
    531         else:
    532             self.blockchain = chain
    533         assert self.blockchain is not None
    534 
    535         self.logger.info(f"set blockchain with height {self.blockchain.height()}")
    536 
    537         self.ready.set_result(1)
    538 
    539     async def _save_certificate(self) -> None:
    540         if not os.path.exists(self.cert_path):
    541             # we may need to retry this a few times, in case the handshake hasn't completed
    542             for _ in range(10):
    543                 dercert = await self._fetch_certificate()
    544                 if dercert:
    545                     self.logger.info("succeeded in getting cert")
    546                     self._verify_certificate_fingerprint(dercert)
    547                     with open(self.cert_path, 'w') as f:
    548                         cert = ssl.DER_cert_to_PEM_cert(dercert)
    549                         # workaround android bug
    550                         cert = re.sub("([^\n])-----END CERTIFICATE-----","\\1\n-----END CERTIFICATE-----",cert)
    551                         f.write(cert)
    552                         # even though close flushes we can't fsync when closed.
    553                         # and we must flush before fsyncing, cause flush flushes to OS buffer
    554                         # fsync writes to OS buffer to disk
    555                         f.flush()
    556                         os.fsync(f.fileno())
    557                     break
    558                 await asyncio.sleep(1)
    559             else:
    560                 raise GracefulDisconnect("could not get certificate after 10 tries")
    561 
    562     async def _fetch_certificate(self) -> bytes:
    563         sslc = ssl.SSLContext()
    564         async with _RSClient(session_factory=RPCSession,
    565                              host=self.host, port=self.port,
    566                              ssl=sslc, proxy=self.proxy) as session:
    567             asyncio_transport = session.transport._asyncio_transport  # type: asyncio.BaseTransport
    568             ssl_object = asyncio_transport.get_extra_info("ssl_object")  # type: ssl.SSLObject
    569             return ssl_object.getpeercert(binary_form=True)
    570 
    571     def _get_expected_fingerprint(self) -> Optional[str]:
    572         if self.is_main_server():
    573             return self.network.config.get("serverfingerprint")
    574 
    575     def _verify_certificate_fingerprint(self, certificate):
    576         expected_fingerprint = self._get_expected_fingerprint()
    577         if not expected_fingerprint:
    578             return
    579         fingerprint = hashlib.sha256(certificate).hexdigest()
    580         fingerprints_match = fingerprint.lower() == expected_fingerprint.lower()
    581         if not fingerprints_match:
    582             util.trigger_callback('cert_mismatch')
    583             raise ErrorSSLCertFingerprintMismatch('Refusing to connect to server due to cert fingerprint mismatch')
    584         self.logger.info("cert fingerprint verification passed")
    585 
    586     async def get_block_header(self, height, assert_mode):
    587         self.logger.info(f'requesting block header {height} in mode {assert_mode}')
    588         # use lower timeout as we usually have network.bhi_lock here
    589         timeout = self.network.get_network_timeout_seconds(NetworkTimeout.Urgent)
    590         res = await self.session.send_request('blockchain.block.header', [height], timeout=timeout)
    591         return blockchain.deserialize_header(bytes.fromhex(res), height)
    592 
    593     async def request_chunk(self, height: int, tip=None, *, can_return_early=False):
    594         if not is_non_negative_integer(height):
    595             raise Exception(f"{repr(height)} is not a block height")
    596         index = height // 2016
    597         if can_return_early and index in self._requested_chunks:
    598             return
    599         self.logger.info(f"requesting chunk from height {height}")
    600         size = 2016
    601         if tip is not None:
    602             size = min(size, tip - index * 2016 + 1)
    603             size = max(size, 0)
    604         try:
    605             self._requested_chunks.add(index)
    606             res = await self.session.send_request('blockchain.block.headers', [index * 2016, size])
    607         finally:
    608             self._requested_chunks.discard(index)
    609         assert_dict_contains_field(res, field_name='count')
    610         assert_dict_contains_field(res, field_name='hex')
    611         assert_dict_contains_field(res, field_name='max')
    612         assert_non_negative_integer(res['count'])
    613         assert_non_negative_integer(res['max'])
    614         assert_hex_str(res['hex'])
    615         if len(res['hex']) != HEADER_SIZE * 2 * res['count']:
    616             raise RequestCorrupted('inconsistent chunk hex and count')
    617         # we never request more than 2016 headers, but we enforce those fit in a single response
    618         if res['max'] < 2016:
    619             raise RequestCorrupted(f"server uses too low 'max' count for block.headers: {res['max']} < 2016")
    620         if res['count'] != size:
    621             raise RequestCorrupted(f"expected {size} headers but only got {res['count']}")
    622         conn = self.blockchain.connect_chunk(index, res['hex'])
    623         if not conn:
    624             return conn, 0
    625         return conn, res['count']
    626 
    627     def is_main_server(self) -> bool:
    628         return (self.network.interface == self or
    629                 self.network.interface is None and self.network.default_server == self.server)
    630 
    631     async def open_session(self, sslc, exit_early=False):
    632         session_factory = lambda *args, iface=self, **kwargs: NotificationSession(*args, **kwargs, interface=iface)
    633         async with _RSClient(session_factory=session_factory,
    634                              host=self.host, port=self.port,
    635                              ssl=sslc, proxy=self.proxy) as session:
    636             self.session = session  # type: NotificationSession
    637             self.session.set_default_timeout(self.network.get_network_timeout_seconds(NetworkTimeout.Generic))
    638             try:
    639                 ver = await session.send_request('server.version', [self.client_name(), version.PROTOCOL_VERSION])
    640             except aiorpcx.jsonrpc.RPCError as e:
    641                 raise GracefulDisconnect(e)  # probably 'unsupported protocol version'
    642             if exit_early:
    643                 return
    644             if ver[1] != version.PROTOCOL_VERSION:
    645                 raise GracefulDisconnect(f'server violated protocol-version-negotiation. '
    646                                          f'we asked for {version.PROTOCOL_VERSION!r}, they sent {ver[1]!r}')
    647             if not self.network.check_interface_against_healthy_spread_of_connected_servers(self):
    648                 raise GracefulDisconnect(f'too many connected servers already '
    649                                          f'in bucket {self.bucket_based_on_ipaddress()}')
    650             self.logger.info(f"connection established. version: {ver}")
    651 
    652             try:
    653                 async with self.taskgroup as group:
    654                     await group.spawn(self.ping)
    655                     await group.spawn(self.request_fee_estimates)
    656                     await group.spawn(self.run_fetch_blocks)
    657                     await group.spawn(self.monitor_connection)
    658             except aiorpcx.jsonrpc.RPCError as e:
    659                 if e.code in (JSONRPC.EXCESSIVE_RESOURCE_USAGE,
    660                               JSONRPC.SERVER_BUSY,
    661                               JSONRPC.METHOD_NOT_FOUND):
    662                     raise GracefulDisconnect(e, log_level=logging.WARNING) from e
    663                 raise
    664 
    665     async def monitor_connection(self):
    666         while True:
    667             await asyncio.sleep(1)
    668             if not self.session or self.session.is_closing():
    669                 raise GracefulDisconnect('session was closed')
    670 
    671     async def ping(self):
    672         while True:
    673             await asyncio.sleep(300)
    674             await self.session.send_request('server.ping')
    675 
    676     async def request_fee_estimates(self):
    677         from .simple_config import FEE_ETA_TARGETS
    678         while True:
    679             async with TaskGroup() as group:
    680                 fee_tasks = []
    681                 for i in FEE_ETA_TARGETS:
    682                     fee_tasks.append((i, await group.spawn(self.get_estimatefee(i))))
    683             for nblock_target, task in fee_tasks:
    684                 fee = task.result()
    685                 if fee < 0: continue
    686                 self.fee_estimates_eta[nblock_target] = fee
    687             self.network.update_fee_estimates()
    688             await asyncio.sleep(60)
    689 
    690     async def close(self, *, force_after: int = None):
    691         """Closes the connection and waits for it to be closed.
    692         We try to flush buffered data to the wire, so this can take some time.
    693         """
    694         if force_after is None:
    695             # We give up after a while and just abort the connection.
    696             # Note: specifically if the server is running Fulcrum, waiting seems hopeless,
    697             #       the connection must be aborted (see https://github.com/cculianu/Fulcrum/issues/76)
    698             force_after = 1  # seconds
    699         if self.session:
    700             await self.session.close(force_after=force_after)
    701         # monitor_connection will cancel tasks
    702 
    703     async def run_fetch_blocks(self):
    704         header_queue = asyncio.Queue()
    705         await self.session.subscribe('blockchain.headers.subscribe', [], header_queue)
    706         while True:
    707             item = await header_queue.get()
    708             raw_header = item[0]
    709             height = raw_header['height']
    710             header = blockchain.deserialize_header(bfh(raw_header['hex']), height)
    711             self.tip_header = header
    712             self.tip = height
    713             if self.tip < constants.net.max_checkpoint():
    714                 raise GracefulDisconnect('server tip below max checkpoint')
    715             self._mark_ready()
    716             await self._process_header_at_tip()
    717             # header processing done
    718             util.trigger_callback('blockchain_updated')
    719             util.trigger_callback('network_updated')
    720             await self.network.switch_unwanted_fork_interface()
    721             await self.network.switch_lagging_interface()
    722 
    723     async def _process_header_at_tip(self):
    724         height, header = self.tip, self.tip_header
    725         async with self.network.bhi_lock:
    726             if self.blockchain.height() >= height and self.blockchain.check_header(header):
    727                 # another interface amended the blockchain
    728                 self.logger.info(f"skipping header {height}")
    729                 return
    730             _, height = await self.step(height, header)
    731             # in the simple case, height == self.tip+1
    732             if height <= self.tip:
    733                 await self.sync_until(height)
    734 
    735     async def sync_until(self, height, next_height=None):
    736         if next_height is None:
    737             next_height = self.tip
    738         last = None
    739         while last is None or height <= next_height:
    740             prev_last, prev_height = last, height
    741             if next_height > height + 10:
    742                 could_connect, num_headers = await self.request_chunk(height, next_height)
    743                 if not could_connect:
    744                     if height <= constants.net.max_checkpoint():
    745                         raise GracefulDisconnect('server chain conflicts with checkpoints or genesis')
    746                     last, height = await self.step(height)
    747                     continue
    748                 util.trigger_callback('network_updated')
    749                 height = (height // 2016 * 2016) + num_headers
    750                 assert height <= next_height+1, (height, self.tip)
    751                 last = 'catchup'
    752             else:
    753                 last, height = await self.step(height)
    754             assert (prev_last, prev_height) != (last, height), 'had to prevent infinite loop in interface.sync_until'
    755         return last, height
    756 
    757     async def step(self, height, header=None):
    758         assert 0 <= height <= self.tip, (height, self.tip)
    759         if header is None:
    760             header = await self.get_block_header(height, 'catchup')
    761 
    762         chain = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header)
    763         if chain:
    764             self.blockchain = chain if isinstance(chain, Blockchain) else self.blockchain
    765             # note: there is an edge case here that is not handled.
    766             # we might know the blockhash (enough for check_header) but
    767             # not have the header itself. e.g. regtest chain with only genesis.
    768             # this situation resolves itself on the next block
    769             return 'catchup', height+1
    770 
    771         can_connect = blockchain.can_connect(header) if 'mock' not in header else header['mock']['connect'](height)
    772         if not can_connect:
    773             self.logger.info(f"can't connect {height}")
    774             height, header, bad, bad_header = await self._search_headers_backwards(height, header)
    775             chain = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header)
    776             can_connect = blockchain.can_connect(header) if 'mock' not in header else header['mock']['connect'](height)
    777             assert chain or can_connect
    778         if can_connect:
    779             self.logger.info(f"could connect {height}")
    780             height += 1
    781             if isinstance(can_connect, Blockchain):  # not when mocking
    782                 self.blockchain = can_connect
    783                 self.blockchain.save_header(header)
    784             return 'catchup', height
    785 
    786         good, bad, bad_header = await self._search_headers_binary(height, bad, bad_header, chain)
    787         return await self._resolve_potential_chain_fork_given_forkpoint(good, bad, bad_header)
    788 
    789     async def _search_headers_binary(self, height, bad, bad_header, chain):
    790         assert bad == bad_header['block_height']
    791         _assert_header_does_not_check_against_any_chain(bad_header)
    792 
    793         self.blockchain = chain if isinstance(chain, Blockchain) else self.blockchain
    794         good = height
    795         while True:
    796             assert good < bad, (good, bad)
    797             height = (good + bad) // 2
    798             self.logger.info(f"binary step. good {good}, bad {bad}, height {height}")
    799             header = await self.get_block_header(height, 'binary')
    800             chain = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header)
    801             if chain:
    802                 self.blockchain = chain if isinstance(chain, Blockchain) else self.blockchain
    803                 good = height
    804             else:
    805                 bad = height
    806                 bad_header = header
    807             if good + 1 == bad:
    808                 break
    809 
    810         mock = 'mock' in bad_header and bad_header['mock']['connect'](height)
    811         real = not mock and self.blockchain.can_connect(bad_header, check_height=False)
    812         if not real and not mock:
    813             raise Exception('unexpected bad header during binary: {}'.format(bad_header))
    814         _assert_header_does_not_check_against_any_chain(bad_header)
    815 
    816         self.logger.info(f"binary search exited. good {good}, bad {bad}")
    817         return good, bad, bad_header
    818 
    819     async def _resolve_potential_chain_fork_given_forkpoint(self, good, bad, bad_header):
    820         assert good + 1 == bad
    821         assert bad == bad_header['block_height']
    822         _assert_header_does_not_check_against_any_chain(bad_header)
    823         # 'good' is the height of a block 'good_header', somewhere in self.blockchain.
    824         # bad_header connects to good_header; bad_header itself is NOT in self.blockchain.
    825 
    826         bh = self.blockchain.height()
    827         assert bh >= good, (bh, good)
    828         if bh == good:
    829             height = good + 1
    830             self.logger.info(f"catching up from {height}")
    831             return 'no_fork', height
    832 
    833         # this is a new fork we don't yet have
    834         height = bad + 1
    835         self.logger.info(f"new fork at bad height {bad}")
    836         forkfun = self.blockchain.fork if 'mock' not in bad_header else bad_header['mock']['fork']
    837         b = forkfun(bad_header)  # type: Blockchain
    838         self.blockchain = b
    839         assert b.forkpoint == bad
    840         return 'fork', height
    841 
    842     async def _search_headers_backwards(self, height, header):
    843         async def iterate():
    844             nonlocal height, header
    845             checkp = False
    846             if height <= constants.net.max_checkpoint():
    847                 height = constants.net.max_checkpoint()
    848                 checkp = True
    849             header = await self.get_block_header(height, 'backward')
    850             chain = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header)
    851             can_connect = blockchain.can_connect(header) if 'mock' not in header else header['mock']['connect'](height)
    852             if chain or can_connect:
    853                 return False
    854             if checkp:
    855                 raise GracefulDisconnect("server chain conflicts with checkpoints")
    856             return True
    857 
    858         bad, bad_header = height, header
    859         _assert_header_does_not_check_against_any_chain(bad_header)
    860         with blockchain.blockchains_lock: chains = list(blockchain.blockchains.values())
    861         local_max = max([0] + [x.height() for x in chains]) if 'mock' not in header else float('inf')
    862         height = min(local_max + 1, height - 1)
    863         while await iterate():
    864             bad, bad_header = height, header
    865             delta = self.tip - height
    866             height = self.tip - 2 * delta
    867 
    868         _assert_header_does_not_check_against_any_chain(bad_header)
    869         self.logger.info(f"exiting backward mode at {height}")
    870         return height, header, bad, bad_header
    871 
    872     @classmethod
    873     def client_name(cls) -> str:
    874         return f'electrum/{version.ELECTRUM_VERSION}'
    875 
    876     def is_tor(self):
    877         return self.host.endswith('.onion')
    878 
    879     def ip_addr(self) -> Optional[str]:
    880         session = self.session
    881         if not session: return None
    882         peer_addr = session.remote_address()
    883         if not peer_addr: return None
    884         return str(peer_addr.host)
    885 
    886     def bucket_based_on_ipaddress(self) -> str:
    887         def do_bucket():
    888             if self.is_tor():
    889                 return BUCKET_NAME_OF_ONION_SERVERS
    890             try:
    891                 ip_addr = ip_address(self.ip_addr())  # type: Union[IPv4Address, IPv6Address]
    892             except ValueError:
    893                 return ''
    894             if not ip_addr:
    895                 return ''
    896             if ip_addr.is_loopback:  # localhost is exempt
    897                 return ''
    898             if ip_addr.version == 4:
    899                 slash16 = IPv4Network(ip_addr).supernet(prefixlen_diff=32-16)
    900                 return str(slash16)
    901             elif ip_addr.version == 6:
    902                 slash48 = IPv6Network(ip_addr).supernet(prefixlen_diff=128-48)
    903                 return str(slash48)
    904             return ''
    905 
    906         if not self._ipaddr_bucket:
    907             self._ipaddr_bucket = do_bucket()
    908         return self._ipaddr_bucket
    909 
    910     async def get_merkle_for_transaction(self, tx_hash: str, tx_height: int) -> dict:
    911         if not is_hash256_str(tx_hash):
    912             raise Exception(f"{repr(tx_hash)} is not a txid")
    913         if not is_non_negative_integer(tx_height):
    914             raise Exception(f"{repr(tx_height)} is not a block height")
    915         # do request
    916         res = await self.session.send_request('blockchain.transaction.get_merkle', [tx_hash, tx_height])
    917         # check response
    918         block_height = assert_dict_contains_field(res, field_name='block_height')
    919         merkle = assert_dict_contains_field(res, field_name='merkle')
    920         pos = assert_dict_contains_field(res, field_name='pos')
    921         # note: tx_height was just a hint to the server, don't enforce the response to match it
    922         assert_non_negative_integer(block_height)
    923         assert_non_negative_integer(pos)
    924         assert_list_or_tuple(merkle)
    925         for item in merkle:
    926             assert_hash256_str(item)
    927         return res
    928 
    929     async def get_transaction(self, tx_hash: str, *, timeout=None) -> str:
    930         if not is_hash256_str(tx_hash):
    931             raise Exception(f"{repr(tx_hash)} is not a txid")
    932         raw = await self.session.send_request('blockchain.transaction.get', [tx_hash], timeout=timeout)
    933         # validate response
    934         if not is_hex_str(raw):
    935             raise RequestCorrupted(f"received garbage (non-hex) as tx data (txid {tx_hash}): {raw!r}")
    936         tx = Transaction(raw)
    937         try:
    938             tx.deserialize()  # see if raises
    939         except Exception as e:
    940             raise RequestCorrupted(f"cannot deserialize received transaction (txid {tx_hash})") from e
    941         if tx.txid() != tx_hash:
    942             raise RequestCorrupted(f"received tx does not match expected txid {tx_hash} (got {tx.txid()})")
    943         return raw
    944 
    945     async def get_history_for_scripthash(self, sh: str) -> List[dict]:
    946         if not is_hash256_str(sh):
    947             raise Exception(f"{repr(sh)} is not a scripthash")
    948         # do request
    949         res = await self.session.send_request('blockchain.scripthash.get_history', [sh])
    950         # check response
    951         assert_list_or_tuple(res)
    952         prev_height = 1
    953         for tx_item in res:
    954             height = assert_dict_contains_field(tx_item, field_name='height')
    955             assert_dict_contains_field(tx_item, field_name='tx_hash')
    956             assert_integer(height)
    957             assert_hash256_str(tx_item['tx_hash'])
    958             if height in (-1, 0):
    959                 assert_dict_contains_field(tx_item, field_name='fee')
    960                 assert_non_negative_integer(tx_item['fee'])
    961                 prev_height = - float("inf")  # this ensures confirmed txs can't follow mempool txs
    962             else:
    963                 # check monotonicity of heights
    964                 if height < prev_height:
    965                     raise RequestCorrupted(f'heights of confirmed txs must be in increasing order')
    966                 prev_height = height
    967         hashes = set(map(lambda item: item['tx_hash'], res))
    968         if len(hashes) != len(res):
    969             # Either server is sending garbage... or maybe if server is race-prone
    970             # a recently mined tx could be included in both last block and mempool?
    971             # Still, it's simplest to just disregard the response.
    972             raise RequestCorrupted(f"server history has non-unique txids for sh={sh}")
    973         return res
    974 
    975     async def listunspent_for_scripthash(self, sh: str) -> List[dict]:
    976         if not is_hash256_str(sh):
    977             raise Exception(f"{repr(sh)} is not a scripthash")
    978         # do request
    979         res = await self.session.send_request('blockchain.scripthash.listunspent', [sh])
    980         # check response
    981         assert_list_or_tuple(res)
    982         for utxo_item in res:
    983             assert_dict_contains_field(utxo_item, field_name='tx_pos')
    984             assert_dict_contains_field(utxo_item, field_name='value')
    985             assert_dict_contains_field(utxo_item, field_name='tx_hash')
    986             assert_dict_contains_field(utxo_item, field_name='height')
    987             assert_non_negative_integer(utxo_item['tx_pos'])
    988             assert_non_negative_integer(utxo_item['value'])
    989             assert_non_negative_integer(utxo_item['height'])
    990             assert_hash256_str(utxo_item['tx_hash'])
    991         return res
    992 
    993     async def get_balance_for_scripthash(self, sh: str) -> dict:
    994         if not is_hash256_str(sh):
    995             raise Exception(f"{repr(sh)} is not a scripthash")
    996         # do request
    997         res = await self.session.send_request('blockchain.scripthash.get_balance', [sh])
    998         # check response
    999         assert_dict_contains_field(res, field_name='confirmed')
   1000         assert_dict_contains_field(res, field_name='unconfirmed')
   1001         assert_non_negative_integer(res['confirmed'])
   1002         assert_non_negative_integer(res['unconfirmed'])
   1003         return res
   1004 
   1005     async def get_txid_from_txpos(self, tx_height: int, tx_pos: int, merkle: bool):
   1006         if not is_non_negative_integer(tx_height):
   1007             raise Exception(f"{repr(tx_height)} is not a block height")
   1008         if not is_non_negative_integer(tx_pos):
   1009             raise Exception(f"{repr(tx_pos)} should be non-negative integer")
   1010         # do request
   1011         res = await self.session.send_request(
   1012             'blockchain.transaction.id_from_pos',
   1013             [tx_height, tx_pos, merkle],
   1014         )
   1015         # check response
   1016         if merkle:
   1017             assert_dict_contains_field(res, field_name='tx_hash')
   1018             assert_dict_contains_field(res, field_name='merkle')
   1019             assert_hash256_str(res['tx_hash'])
   1020             assert_list_or_tuple(res['merkle'])
   1021             for node_hash in res['merkle']:
   1022                 assert_hash256_str(node_hash)
   1023         else:
   1024             assert_hash256_str(res)
   1025         return res
   1026 
   1027     async def get_fee_histogram(self) -> Sequence[Tuple[Union[float, int], int]]:
   1028         # do request
   1029         res = await self.session.send_request('mempool.get_fee_histogram')
   1030         # check response
   1031         assert_list_or_tuple(res)
   1032         prev_fee = float('inf')
   1033         for fee, s in res:
   1034             assert_non_negative_int_or_float(fee)
   1035             assert_non_negative_integer(s)
   1036             if fee >= prev_fee:  # check monotonicity
   1037                 raise RequestCorrupted(f'fees must be in decreasing order')
   1038             prev_fee = fee
   1039         return res
   1040 
   1041     async def get_server_banner(self) -> str:
   1042         # do request
   1043         res = await self.session.send_request('server.banner')
   1044         # check response
   1045         if not isinstance(res, str):
   1046             raise RequestCorrupted(f'{res!r} should be a str')
   1047         return res
   1048 
   1049     async def get_donation_address(self) -> str:
   1050         # do request
   1051         res = await self.session.send_request('server.donation_address')
   1052         # check response
   1053         if not res:  # ignore empty string
   1054             return ''
   1055         if not bitcoin.is_address(res):
   1056             # note: do not hard-fail -- allow server to use future-type
   1057             #       bitcoin address we do not recognize
   1058             self.logger.info(f"invalid donation address from server: {repr(res)}")
   1059             res = ''
   1060         return res
   1061 
   1062     async def get_relay_fee(self) -> int:
   1063         """Returns the min relay feerate in sat/kbyte."""
   1064         # do request
   1065         res = await self.session.send_request('blockchain.relayfee')
   1066         # check response
   1067         assert_non_negative_int_or_float(res)
   1068         relayfee = int(res * bitcoin.COIN)
   1069         relayfee = max(0, relayfee)
   1070         return relayfee
   1071 
   1072     async def get_estimatefee(self, num_blocks: int) -> int:
   1073         """Returns a feerate estimate for getting confirmed within
   1074         num_blocks blocks, in sat/kbyte.
   1075         """
   1076         if not is_non_negative_integer(num_blocks):
   1077             raise Exception(f"{repr(num_blocks)} is not a num_blocks")
   1078         # do request
   1079         res = await self.session.send_request('blockchain.estimatefee', [num_blocks])
   1080         # check response
   1081         if res != -1:
   1082             assert_non_negative_int_or_float(res)
   1083             res = int(res * bitcoin.COIN)
   1084         return res
   1085 
   1086 
   1087 def _assert_header_does_not_check_against_any_chain(header: dict) -> None:
   1088     chain_bad = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header)
   1089     if chain_bad:
   1090         raise Exception('bad_header must not check!')
   1091 
   1092 
   1093 def check_cert(host, cert):
   1094     try:
   1095         b = pem.dePem(cert, 'CERTIFICATE')
   1096         x = x509.X509(b)
   1097     except:
   1098         traceback.print_exc(file=sys.stdout)
   1099         return
   1100 
   1101     try:
   1102         x.check_date()
   1103         expired = False
   1104     except:
   1105         expired = True
   1106 
   1107     m = "host: %s\n"%host
   1108     m += "has_expired: %s\n"% expired
   1109     util.print_msg(m)
   1110 
   1111 
   1112 # Used by tests
   1113 def _match_hostname(name, val):
   1114     if val == name:
   1115         return True
   1116 
   1117     return val.startswith('*.') and name.endswith(val[1:])
   1118 
   1119 
   1120 def test_certificates():
   1121     from .simple_config import SimpleConfig
   1122     config = SimpleConfig()
   1123     mydir = os.path.join(config.path, "certs")
   1124     certs = os.listdir(mydir)
   1125     for c in certs:
   1126         p = os.path.join(mydir,c)
   1127         with open(p, encoding='utf-8') as f:
   1128             cert = f.read()
   1129         check_cert(c, cert)
   1130 
   1131 if __name__ == "__main__":
   1132     test_certificates()