old_interface.py (47478B)
1 #!/usr/bin/env python 2 # 3 # Electrum - lightweight Bitcoin client 4 # Copyright (C) 2011 thomasv@gitorious 5 # 6 # Permission is hereby granted, free of charge, to any person 7 # obtaining a copy of this software and associated documentation files 8 # (the "Software"), to deal in the Software without restriction, 9 # including without limitation the rights to use, copy, modify, merge, 10 # publish, distribute, sublicense, and/or sell copies of the Software, 11 # and to permit persons to whom the Software is furnished to do so, 12 # subject to the following conditions: 13 # 14 # The above copyright notice and this permission notice shall be 15 # included in all copies or substantial portions of the Software. 16 # 17 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 18 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 19 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 20 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS 21 # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 22 # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 23 # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 24 # SOFTWARE. 25 import os 26 import re 27 import ssl 28 import sys 29 import traceback 30 import asyncio 31 import socket 32 from typing import Tuple, Union, List, TYPE_CHECKING, Optional, Set, NamedTuple, Any, Sequence 33 from collections import defaultdict 34 from ipaddress import IPv4Network, IPv6Network, ip_address, IPv6Address, IPv4Address 35 import itertools 36 import logging 37 import hashlib 38 import functools 39 40 import aiorpcx 41 from aiorpcx import TaskGroup 42 from aiorpcx import RPCSession, Notification, NetAddress, NewlineFramer 43 from aiorpcx.curio import timeout_after, TaskTimeout 44 from aiorpcx.jsonrpc import JSONRPC, CodeMessageError 45 from aiorpcx.rawsocket import RSClient 46 import certifi 47 48 from .util import (ignore_exceptions, log_exceptions, bfh, SilentTaskGroup, MySocksProxy, 49 is_integer, is_non_negative_integer, is_hash256_str, is_hex_str, 50 is_int_or_float, is_non_negative_int_or_float) 51 from . import util 52 from . import x509 53 from . import pem 54 from . import version 55 from . import blockchain 56 from .blockchain import Blockchain, HEADER_SIZE 57 from . import bitcoin 58 from . import constants 59 from .i18n import _ 60 from .logging import Logger 61 from .transaction import Transaction 62 63 if TYPE_CHECKING: 64 from .network import Network 65 from .simple_config import SimpleConfig 66 67 68 ca_path = certifi.where() 69 70 BUCKET_NAME_OF_ONION_SERVERS = 'onion' 71 72 MAX_INCOMING_MSG_SIZE = 1_000_000 # in bytes 73 74 _KNOWN_NETWORK_PROTOCOLS = {'t', 's'} 75 PREFERRED_NETWORK_PROTOCOL = 's' 76 assert PREFERRED_NETWORK_PROTOCOL in _KNOWN_NETWORK_PROTOCOLS 77 78 79 class NetworkTimeout: 80 # seconds 81 class Generic: 82 NORMAL = 30 83 RELAXED = 45 84 MOST_RELAXED = 600 85 86 class Urgent(Generic): 87 NORMAL = 10 88 RELAXED = 20 89 MOST_RELAXED = 60 90 91 92 def assert_non_negative_integer(val: Any) -> None: 93 if not is_non_negative_integer(val): 94 raise RequestCorrupted(f'{val!r} should be a non-negative integer') 95 96 97 def assert_integer(val: Any) -> None: 98 if not is_integer(val): 99 raise RequestCorrupted(f'{val!r} should be an integer') 100 101 102 def assert_int_or_float(val: Any) -> None: 103 if not is_int_or_float(val): 104 raise RequestCorrupted(f'{val!r} should be int or float') 105 106 107 def assert_non_negative_int_or_float(val: Any) -> None: 108 if not is_non_negative_int_or_float(val): 109 raise RequestCorrupted(f'{val!r} should be a non-negative int or float') 110 111 112 def assert_hash256_str(val: Any) -> None: 113 if not is_hash256_str(val): 114 raise RequestCorrupted(f'{val!r} should be a hash256 str') 115 116 117 def assert_hex_str(val: Any) -> None: 118 if not is_hex_str(val): 119 raise RequestCorrupted(f'{val!r} should be a hex str') 120 121 122 def assert_dict_contains_field(d: Any, *, field_name: str) -> Any: 123 if not isinstance(d, dict): 124 raise RequestCorrupted(f'{d!r} should be a dict') 125 if field_name not in d: 126 raise RequestCorrupted(f'required field {field_name!r} missing from dict') 127 return d[field_name] 128 129 130 def assert_list_or_tuple(val: Any) -> None: 131 if not isinstance(val, (list, tuple)): 132 raise RequestCorrupted(f'{val!r} should be a list or tuple') 133 134 135 class NotificationSession(RPCSession): 136 137 def __init__(self, *args, interface: 'Interface', **kwargs): 138 super(NotificationSession, self).__init__(*args, **kwargs) 139 self.subscriptions = defaultdict(list) 140 self.cache = {} 141 self.default_timeout = NetworkTimeout.Generic.NORMAL 142 self._msg_counter = itertools.count(start=1) 143 self.interface = interface 144 self.cost_hard_limit = 0 # disable aiorpcx resource limits 145 146 async def handle_request(self, request): 147 self.maybe_log(f"--> {request}") 148 try: 149 if isinstance(request, Notification): 150 params, result = request.args[:-1], request.args[-1] 151 key = self.get_hashable_key_for_rpc_call(request.method, params) 152 if key in self.subscriptions: 153 self.cache[key] = result 154 for queue in self.subscriptions[key]: 155 await queue.put(request.args) 156 else: 157 raise Exception(f'unexpected notification') 158 else: 159 raise Exception(f'unexpected request. not a notification') 160 except Exception as e: 161 self.interface.logger.info(f"error handling request {request}. exc: {repr(e)}") 162 await self.close() 163 164 async def send_request(self, *args, timeout=None, **kwargs): 165 # note: semaphores/timeouts/backpressure etc are handled by 166 # aiorpcx. the timeout arg here in most cases should not be set 167 msg_id = next(self._msg_counter) 168 self.maybe_log(f"<-- {args} {kwargs} (id: {msg_id})") 169 try: 170 # note: RPCSession.send_request raises TaskTimeout in case of a timeout. 171 # TaskTimeout is a subclass of CancelledError, which is *suppressed* in TaskGroups 172 response = await asyncio.wait_for( 173 super().send_request(*args, **kwargs), 174 timeout) 175 except (TaskTimeout, asyncio.TimeoutError) as e: 176 raise RequestTimedOut(f'request timed out: {args} (id: {msg_id})') from e 177 except CodeMessageError as e: 178 self.maybe_log(f"--> {repr(e)} (id: {msg_id})") 179 raise 180 else: 181 self.maybe_log(f"--> {response} (id: {msg_id})") 182 return response 183 184 def set_default_timeout(self, timeout): 185 self.sent_request_timeout = timeout 186 self.max_send_delay = timeout 187 188 async def subscribe(self, method: str, params: List, queue: asyncio.Queue): 189 # note: until the cache is written for the first time, 190 # each 'subscribe' call might make a request on the network. 191 key = self.get_hashable_key_for_rpc_call(method, params) 192 self.subscriptions[key].append(queue) 193 if key in self.cache: 194 result = self.cache[key] 195 else: 196 result = await self.send_request(method, params) 197 self.cache[key] = result 198 await queue.put(params + [result]) 199 200 def unsubscribe(self, queue): 201 """Unsubscribe a callback to free object references to enable GC.""" 202 # note: we can't unsubscribe from the server, so we keep receiving 203 # subsequent notifications 204 for v in self.subscriptions.values(): 205 if queue in v: 206 v.remove(queue) 207 208 @classmethod 209 def get_hashable_key_for_rpc_call(cls, method, params): 210 """Hashable index for subscriptions and cache""" 211 return str(method) + repr(params) 212 213 def maybe_log(self, msg: str) -> None: 214 if not self.interface: return 215 if self.interface.debug or self.interface.network.debug: 216 self.interface.logger.debug(msg) 217 218 def default_framer(self): 219 # overridden so that max_size can be customized 220 max_size = int(self.interface.network.config.get('network_max_incoming_msg_size', 221 MAX_INCOMING_MSG_SIZE)) 222 return NewlineFramer(max_size=max_size) 223 224 225 class NetworkException(Exception): pass 226 227 228 class GracefulDisconnect(NetworkException): 229 log_level = logging.INFO 230 231 def __init__(self, *args, log_level=None, **kwargs): 232 Exception.__init__(self, *args, **kwargs) 233 if log_level is not None: 234 self.log_level = log_level 235 236 237 class RequestTimedOut(GracefulDisconnect): 238 def __str__(self): 239 return _("Network request timed out.") 240 241 242 class RequestCorrupted(Exception): pass 243 244 class ErrorParsingSSLCert(Exception): pass 245 class ErrorGettingSSLCertFromServer(Exception): pass 246 class ErrorSSLCertFingerprintMismatch(Exception): pass 247 class InvalidOptionCombination(Exception): pass 248 class ConnectError(NetworkException): pass 249 250 251 class _RSClient(RSClient): 252 async def create_connection(self): 253 try: 254 return await super().create_connection() 255 except OSError as e: 256 # note: using "from e" here will set __cause__ of ConnectError 257 raise ConnectError(e) from e 258 259 260 class ServerAddr: 261 262 def __init__(self, host: str, port: Union[int, str], *, protocol: str = None): 263 assert isinstance(host, str), repr(host) 264 if protocol is None: 265 protocol = 's' 266 if not host: 267 raise ValueError('host must not be empty') 268 if host[0] == '[' and host[-1] == ']': # IPv6 269 host = host[1:-1] 270 try: 271 net_addr = NetAddress(host, port) # this validates host and port 272 except Exception as e: 273 raise ValueError(f"cannot construct ServerAddr: invalid host or port (host={host}, port={port})") from e 274 if protocol not in _KNOWN_NETWORK_PROTOCOLS: 275 raise ValueError(f"invalid network protocol: {protocol}") 276 self.host = str(net_addr.host) # canonical form (if e.g. IPv6 address) 277 self.port = int(net_addr.port) 278 self.protocol = protocol 279 self._net_addr_str = str(net_addr) 280 281 @classmethod 282 def from_str(cls, s: str) -> 'ServerAddr': 283 # host might be IPv6 address, hence do rsplit: 284 host, port, protocol = str(s).rsplit(':', 2) 285 return ServerAddr(host=host, port=port, protocol=protocol) 286 287 @classmethod 288 def from_str_with_inference(cls, s: str) -> Optional['ServerAddr']: 289 """Construct ServerAddr from str, guessing missing details. 290 Ongoing compatibility not guaranteed. 291 """ 292 if not s: 293 return None 294 items = str(s).rsplit(':', 2) 295 if len(items) < 2: 296 return None # although maybe we could guess the port too? 297 host = items[0] 298 port = items[1] 299 if len(items) >= 3: 300 protocol = items[2] 301 else: 302 protocol = PREFERRED_NETWORK_PROTOCOL 303 return ServerAddr(host=host, port=port, protocol=protocol) 304 305 def to_friendly_name(self) -> str: 306 # note: this method is closely linked to from_str_with_inference 307 if self.protocol == 's': # hide trailing ":s" 308 return self.net_addr_str() 309 return str(self) 310 311 def __str__(self): 312 return '{}:{}'.format(self.net_addr_str(), self.protocol) 313 314 def to_json(self) -> str: 315 return str(self) 316 317 def __repr__(self): 318 return f'<ServerAddr host={self.host} port={self.port} protocol={self.protocol}>' 319 320 def net_addr_str(self) -> str: 321 return self._net_addr_str 322 323 def __eq__(self, other): 324 if not isinstance(other, ServerAddr): 325 return False 326 return (self.host == other.host 327 and self.port == other.port 328 and self.protocol == other.protocol) 329 330 def __ne__(self, other): 331 return not (self == other) 332 333 def __hash__(self): 334 return hash((self.host, self.port, self.protocol)) 335 336 337 def _get_cert_path_for_host(*, config: 'SimpleConfig', host: str) -> str: 338 filename = host 339 try: 340 ip = ip_address(host) 341 except ValueError: 342 pass 343 else: 344 if isinstance(ip, IPv6Address): 345 filename = f"ipv6_{ip.packed.hex()}" 346 return os.path.join(config.path, 'certs', filename) 347 348 349 class Interface(Logger): 350 351 LOGGING_SHORTCUT = 'i' 352 353 def __init__(self, *, network: 'Network', server: ServerAddr, proxy: Optional[dict]): 354 self.ready = asyncio.Future() 355 self.got_disconnected = asyncio.Event() 356 self.server = server 357 Logger.__init__(self) 358 assert network.config.path 359 self.cert_path = _get_cert_path_for_host(config=network.config, host=self.host) 360 self.blockchain = None # type: Optional[Blockchain] 361 self._requested_chunks = set() # type: Set[int] 362 self.network = network 363 self.proxy = MySocksProxy.from_proxy_dict(proxy) 364 self.session = None # type: Optional[NotificationSession] 365 self._ipaddr_bucket = None 366 367 # Latest block header and corresponding height, as claimed by the server. 368 # Note that these values are updated before they are verified. 369 # Especially during initial header sync, verification can take a long time. 370 # Failing verification will get the interface closed. 371 self.tip_header = None 372 self.tip = 0 373 374 self.fee_estimates_eta = {} 375 376 # Dump network messages (only for this interface). Set at runtime from the console. 377 self.debug = False 378 379 self.taskgroup = SilentTaskGroup() 380 381 async def spawn_task(): 382 task = await self.network.taskgroup.spawn(self.run()) 383 if sys.version_info >= (3, 8): 384 task.set_name(f"interface::{str(server)}") 385 asyncio.run_coroutine_threadsafe(spawn_task(), self.network.asyncio_loop) 386 387 @property 388 def host(self): 389 return self.server.host 390 391 @property 392 def port(self): 393 return self.server.port 394 395 @property 396 def protocol(self): 397 return self.server.protocol 398 399 def diagnostic_name(self): 400 return self.server.net_addr_str() 401 402 def __str__(self): 403 return f"<Interface {self.diagnostic_name()}>" 404 405 async def is_server_ca_signed(self, ca_ssl_context): 406 """Given a CA enforcing SSL context, returns True if the connection 407 can be established. Returns False if the server has a self-signed 408 certificate but otherwise is okay. Any other failures raise. 409 """ 410 try: 411 await self.open_session(ca_ssl_context, exit_early=True) 412 except ConnectError as e: 413 cause = e.__cause__ 414 if isinstance(cause, ssl.SSLError) and cause.reason == 'CERTIFICATE_VERIFY_FAILED': 415 # failures due to self-signed certs are normal 416 return False 417 raise 418 return True 419 420 async def _try_saving_ssl_cert_for_first_time(self, ca_ssl_context): 421 ca_signed = await self.is_server_ca_signed(ca_ssl_context) 422 if ca_signed: 423 if self._get_expected_fingerprint(): 424 raise InvalidOptionCombination("cannot use --serverfingerprint with CA signed servers") 425 with open(self.cert_path, 'w') as f: 426 # empty file means this is CA signed, not self-signed 427 f.write('') 428 else: 429 await self._save_certificate() 430 431 def _is_saved_ssl_cert_available(self): 432 if not os.path.exists(self.cert_path): 433 return False 434 with open(self.cert_path, 'r') as f: 435 contents = f.read() 436 if contents == '': # CA signed 437 if self._get_expected_fingerprint(): 438 raise InvalidOptionCombination("cannot use --serverfingerprint with CA signed servers") 439 return True 440 # pinned self-signed cert 441 try: 442 b = pem.dePem(contents, 'CERTIFICATE') 443 except SyntaxError as e: 444 self.logger.info(f"error parsing already saved cert: {e}") 445 raise ErrorParsingSSLCert(e) from e 446 try: 447 x = x509.X509(b) 448 except Exception as e: 449 self.logger.info(f"error parsing already saved cert: {e}") 450 raise ErrorParsingSSLCert(e) from e 451 try: 452 x.check_date() 453 except x509.CertificateError as e: 454 self.logger.info(f"certificate has expired: {e}") 455 os.unlink(self.cert_path) # delete pinned cert only in this case 456 return False 457 self._verify_certificate_fingerprint(bytearray(b)) 458 return True 459 460 async def _get_ssl_context(self): 461 if self.protocol != 's': 462 # using plaintext TCP 463 return None 464 465 # see if we already have cert for this server; or get it for the first time 466 ca_sslc = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=ca_path) 467 if not self._is_saved_ssl_cert_available(): 468 try: 469 await self._try_saving_ssl_cert_for_first_time(ca_sslc) 470 except (OSError, ConnectError, aiorpcx.socks.SOCKSError) as e: 471 raise ErrorGettingSSLCertFromServer(e) from e 472 # now we have a file saved in our certificate store 473 siz = os.stat(self.cert_path).st_size 474 if siz == 0: 475 # CA signed cert 476 sslc = ca_sslc 477 else: 478 # pinned self-signed cert 479 sslc = ssl.create_default_context(ssl.Purpose.SERVER_AUTH, cafile=self.cert_path) 480 sslc.check_hostname = 0 481 return sslc 482 483 def handle_disconnect(func): 484 @functools.wraps(func) 485 async def wrapper_func(self: 'Interface', *args, **kwargs): 486 try: 487 return await func(self, *args, **kwargs) 488 except GracefulDisconnect as e: 489 self.logger.log(e.log_level, f"disconnecting due to {repr(e)}") 490 except aiorpcx.jsonrpc.RPCError as e: 491 self.logger.warning(f"disconnecting due to {repr(e)}") 492 self.logger.debug(f"(disconnect) trace for {repr(e)}", exc_info=True) 493 finally: 494 self.got_disconnected.set() 495 await self.network.connection_down(self) 496 # if was not 'ready' yet, schedule waiting coroutines: 497 self.ready.cancel() 498 return wrapper_func 499 500 @ignore_exceptions # do not kill network.taskgroup 501 @log_exceptions 502 @handle_disconnect 503 async def run(self): 504 try: 505 ssl_context = await self._get_ssl_context() 506 except (ErrorParsingSSLCert, ErrorGettingSSLCertFromServer) as e: 507 self.logger.info(f'disconnecting due to: {repr(e)}') 508 return 509 try: 510 await self.open_session(ssl_context) 511 except (asyncio.CancelledError, ConnectError, aiorpcx.socks.SOCKSError) as e: 512 # make SSL errors for main interface more visible (to help servers ops debug cert pinning issues) 513 if (isinstance(e, ConnectError) and isinstance(e.__cause__, ssl.SSLError) 514 and self.is_main_server() and not self.network.auto_connect): 515 self.logger.warning(f'Cannot connect to main server due to SSL error ' 516 f'(maybe cert changed compared to "{self.cert_path}"). Exc: {repr(e)}') 517 else: 518 self.logger.info(f'disconnecting due to: {repr(e)}') 519 return 520 521 def _mark_ready(self) -> None: 522 if self.ready.cancelled(): 523 raise GracefulDisconnect('conn establishment was too slow; *ready* future was cancelled') 524 if self.ready.done(): 525 return 526 527 assert self.tip_header 528 chain = blockchain.check_header(self.tip_header) 529 if not chain: 530 self.blockchain = blockchain.get_best_chain() 531 else: 532 self.blockchain = chain 533 assert self.blockchain is not None 534 535 self.logger.info(f"set blockchain with height {self.blockchain.height()}") 536 537 self.ready.set_result(1) 538 539 async def _save_certificate(self) -> None: 540 if not os.path.exists(self.cert_path): 541 # we may need to retry this a few times, in case the handshake hasn't completed 542 for _ in range(10): 543 dercert = await self._fetch_certificate() 544 if dercert: 545 self.logger.info("succeeded in getting cert") 546 self._verify_certificate_fingerprint(dercert) 547 with open(self.cert_path, 'w') as f: 548 cert = ssl.DER_cert_to_PEM_cert(dercert) 549 # workaround android bug 550 cert = re.sub("([^\n])-----END CERTIFICATE-----","\\1\n-----END CERTIFICATE-----",cert) 551 f.write(cert) 552 # even though close flushes we can't fsync when closed. 553 # and we must flush before fsyncing, cause flush flushes to OS buffer 554 # fsync writes to OS buffer to disk 555 f.flush() 556 os.fsync(f.fileno()) 557 break 558 await asyncio.sleep(1) 559 else: 560 raise GracefulDisconnect("could not get certificate after 10 tries") 561 562 async def _fetch_certificate(self) -> bytes: 563 sslc = ssl.SSLContext() 564 async with _RSClient(session_factory=RPCSession, 565 host=self.host, port=self.port, 566 ssl=sslc, proxy=self.proxy) as session: 567 asyncio_transport = session.transport._asyncio_transport # type: asyncio.BaseTransport 568 ssl_object = asyncio_transport.get_extra_info("ssl_object") # type: ssl.SSLObject 569 return ssl_object.getpeercert(binary_form=True) 570 571 def _get_expected_fingerprint(self) -> Optional[str]: 572 if self.is_main_server(): 573 return self.network.config.get("serverfingerprint") 574 575 def _verify_certificate_fingerprint(self, certificate): 576 expected_fingerprint = self._get_expected_fingerprint() 577 if not expected_fingerprint: 578 return 579 fingerprint = hashlib.sha256(certificate).hexdigest() 580 fingerprints_match = fingerprint.lower() == expected_fingerprint.lower() 581 if not fingerprints_match: 582 util.trigger_callback('cert_mismatch') 583 raise ErrorSSLCertFingerprintMismatch('Refusing to connect to server due to cert fingerprint mismatch') 584 self.logger.info("cert fingerprint verification passed") 585 586 async def get_block_header(self, height, assert_mode): 587 self.logger.info(f'requesting block header {height} in mode {assert_mode}') 588 # use lower timeout as we usually have network.bhi_lock here 589 timeout = self.network.get_network_timeout_seconds(NetworkTimeout.Urgent) 590 res = await self.session.send_request('blockchain.block.header', [height], timeout=timeout) 591 return blockchain.deserialize_header(bytes.fromhex(res), height) 592 593 async def request_chunk(self, height: int, tip=None, *, can_return_early=False): 594 if not is_non_negative_integer(height): 595 raise Exception(f"{repr(height)} is not a block height") 596 index = height // 2016 597 if can_return_early and index in self._requested_chunks: 598 return 599 self.logger.info(f"requesting chunk from height {height}") 600 size = 2016 601 if tip is not None: 602 size = min(size, tip - index * 2016 + 1) 603 size = max(size, 0) 604 try: 605 self._requested_chunks.add(index) 606 res = await self.session.send_request('blockchain.block.headers', [index * 2016, size]) 607 finally: 608 self._requested_chunks.discard(index) 609 assert_dict_contains_field(res, field_name='count') 610 assert_dict_contains_field(res, field_name='hex') 611 assert_dict_contains_field(res, field_name='max') 612 assert_non_negative_integer(res['count']) 613 assert_non_negative_integer(res['max']) 614 assert_hex_str(res['hex']) 615 if len(res['hex']) != HEADER_SIZE * 2 * res['count']: 616 raise RequestCorrupted('inconsistent chunk hex and count') 617 # we never request more than 2016 headers, but we enforce those fit in a single response 618 if res['max'] < 2016: 619 raise RequestCorrupted(f"server uses too low 'max' count for block.headers: {res['max']} < 2016") 620 if res['count'] != size: 621 raise RequestCorrupted(f"expected {size} headers but only got {res['count']}") 622 conn = self.blockchain.connect_chunk(index, res['hex']) 623 if not conn: 624 return conn, 0 625 return conn, res['count'] 626 627 def is_main_server(self) -> bool: 628 return (self.network.interface == self or 629 self.network.interface is None and self.network.default_server == self.server) 630 631 async def open_session(self, sslc, exit_early=False): 632 session_factory = lambda *args, iface=self, **kwargs: NotificationSession(*args, **kwargs, interface=iface) 633 async with _RSClient(session_factory=session_factory, 634 host=self.host, port=self.port, 635 ssl=sslc, proxy=self.proxy) as session: 636 self.session = session # type: NotificationSession 637 self.session.set_default_timeout(self.network.get_network_timeout_seconds(NetworkTimeout.Generic)) 638 try: 639 ver = await session.send_request('server.version', [self.client_name(), version.PROTOCOL_VERSION]) 640 except aiorpcx.jsonrpc.RPCError as e: 641 raise GracefulDisconnect(e) # probably 'unsupported protocol version' 642 if exit_early: 643 return 644 if ver[1] != version.PROTOCOL_VERSION: 645 raise GracefulDisconnect(f'server violated protocol-version-negotiation. ' 646 f'we asked for {version.PROTOCOL_VERSION!r}, they sent {ver[1]!r}') 647 if not self.network.check_interface_against_healthy_spread_of_connected_servers(self): 648 raise GracefulDisconnect(f'too many connected servers already ' 649 f'in bucket {self.bucket_based_on_ipaddress()}') 650 self.logger.info(f"connection established. version: {ver}") 651 652 try: 653 async with self.taskgroup as group: 654 await group.spawn(self.ping) 655 await group.spawn(self.request_fee_estimates) 656 await group.spawn(self.run_fetch_blocks) 657 await group.spawn(self.monitor_connection) 658 except aiorpcx.jsonrpc.RPCError as e: 659 if e.code in (JSONRPC.EXCESSIVE_RESOURCE_USAGE, 660 JSONRPC.SERVER_BUSY, 661 JSONRPC.METHOD_NOT_FOUND): 662 raise GracefulDisconnect(e, log_level=logging.WARNING) from e 663 raise 664 665 async def monitor_connection(self): 666 while True: 667 await asyncio.sleep(1) 668 if not self.session or self.session.is_closing(): 669 raise GracefulDisconnect('session was closed') 670 671 async def ping(self): 672 while True: 673 await asyncio.sleep(300) 674 await self.session.send_request('server.ping') 675 676 async def request_fee_estimates(self): 677 from .simple_config import FEE_ETA_TARGETS 678 while True: 679 async with TaskGroup() as group: 680 fee_tasks = [] 681 for i in FEE_ETA_TARGETS: 682 fee_tasks.append((i, await group.spawn(self.get_estimatefee(i)))) 683 for nblock_target, task in fee_tasks: 684 fee = task.result() 685 if fee < 0: continue 686 self.fee_estimates_eta[nblock_target] = fee 687 self.network.update_fee_estimates() 688 await asyncio.sleep(60) 689 690 async def close(self, *, force_after: int = None): 691 """Closes the connection and waits for it to be closed. 692 We try to flush buffered data to the wire, so this can take some time. 693 """ 694 if force_after is None: 695 # We give up after a while and just abort the connection. 696 # Note: specifically if the server is running Fulcrum, waiting seems hopeless, 697 # the connection must be aborted (see https://github.com/cculianu/Fulcrum/issues/76) 698 force_after = 1 # seconds 699 if self.session: 700 await self.session.close(force_after=force_after) 701 # monitor_connection will cancel tasks 702 703 async def run_fetch_blocks(self): 704 header_queue = asyncio.Queue() 705 await self.session.subscribe('blockchain.headers.subscribe', [], header_queue) 706 while True: 707 item = await header_queue.get() 708 raw_header = item[0] 709 height = raw_header['height'] 710 header = blockchain.deserialize_header(bfh(raw_header['hex']), height) 711 self.tip_header = header 712 self.tip = height 713 if self.tip < constants.net.max_checkpoint(): 714 raise GracefulDisconnect('server tip below max checkpoint') 715 self._mark_ready() 716 await self._process_header_at_tip() 717 # header processing done 718 util.trigger_callback('blockchain_updated') 719 util.trigger_callback('network_updated') 720 await self.network.switch_unwanted_fork_interface() 721 await self.network.switch_lagging_interface() 722 723 async def _process_header_at_tip(self): 724 height, header = self.tip, self.tip_header 725 async with self.network.bhi_lock: 726 if self.blockchain.height() >= height and self.blockchain.check_header(header): 727 # another interface amended the blockchain 728 self.logger.info(f"skipping header {height}") 729 return 730 _, height = await self.step(height, header) 731 # in the simple case, height == self.tip+1 732 if height <= self.tip: 733 await self.sync_until(height) 734 735 async def sync_until(self, height, next_height=None): 736 if next_height is None: 737 next_height = self.tip 738 last = None 739 while last is None or height <= next_height: 740 prev_last, prev_height = last, height 741 if next_height > height + 10: 742 could_connect, num_headers = await self.request_chunk(height, next_height) 743 if not could_connect: 744 if height <= constants.net.max_checkpoint(): 745 raise GracefulDisconnect('server chain conflicts with checkpoints or genesis') 746 last, height = await self.step(height) 747 continue 748 util.trigger_callback('network_updated') 749 height = (height // 2016 * 2016) + num_headers 750 assert height <= next_height+1, (height, self.tip) 751 last = 'catchup' 752 else: 753 last, height = await self.step(height) 754 assert (prev_last, prev_height) != (last, height), 'had to prevent infinite loop in interface.sync_until' 755 return last, height 756 757 async def step(self, height, header=None): 758 assert 0 <= height <= self.tip, (height, self.tip) 759 if header is None: 760 header = await self.get_block_header(height, 'catchup') 761 762 chain = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header) 763 if chain: 764 self.blockchain = chain if isinstance(chain, Blockchain) else self.blockchain 765 # note: there is an edge case here that is not handled. 766 # we might know the blockhash (enough for check_header) but 767 # not have the header itself. e.g. regtest chain with only genesis. 768 # this situation resolves itself on the next block 769 return 'catchup', height+1 770 771 can_connect = blockchain.can_connect(header) if 'mock' not in header else header['mock']['connect'](height) 772 if not can_connect: 773 self.logger.info(f"can't connect {height}") 774 height, header, bad, bad_header = await self._search_headers_backwards(height, header) 775 chain = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header) 776 can_connect = blockchain.can_connect(header) if 'mock' not in header else header['mock']['connect'](height) 777 assert chain or can_connect 778 if can_connect: 779 self.logger.info(f"could connect {height}") 780 height += 1 781 if isinstance(can_connect, Blockchain): # not when mocking 782 self.blockchain = can_connect 783 self.blockchain.save_header(header) 784 return 'catchup', height 785 786 good, bad, bad_header = await self._search_headers_binary(height, bad, bad_header, chain) 787 return await self._resolve_potential_chain_fork_given_forkpoint(good, bad, bad_header) 788 789 async def _search_headers_binary(self, height, bad, bad_header, chain): 790 assert bad == bad_header['block_height'] 791 _assert_header_does_not_check_against_any_chain(bad_header) 792 793 self.blockchain = chain if isinstance(chain, Blockchain) else self.blockchain 794 good = height 795 while True: 796 assert good < bad, (good, bad) 797 height = (good + bad) // 2 798 self.logger.info(f"binary step. good {good}, bad {bad}, height {height}") 799 header = await self.get_block_header(height, 'binary') 800 chain = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header) 801 if chain: 802 self.blockchain = chain if isinstance(chain, Blockchain) else self.blockchain 803 good = height 804 else: 805 bad = height 806 bad_header = header 807 if good + 1 == bad: 808 break 809 810 mock = 'mock' in bad_header and bad_header['mock']['connect'](height) 811 real = not mock and self.blockchain.can_connect(bad_header, check_height=False) 812 if not real and not mock: 813 raise Exception('unexpected bad header during binary: {}'.format(bad_header)) 814 _assert_header_does_not_check_against_any_chain(bad_header) 815 816 self.logger.info(f"binary search exited. good {good}, bad {bad}") 817 return good, bad, bad_header 818 819 async def _resolve_potential_chain_fork_given_forkpoint(self, good, bad, bad_header): 820 assert good + 1 == bad 821 assert bad == bad_header['block_height'] 822 _assert_header_does_not_check_against_any_chain(bad_header) 823 # 'good' is the height of a block 'good_header', somewhere in self.blockchain. 824 # bad_header connects to good_header; bad_header itself is NOT in self.blockchain. 825 826 bh = self.blockchain.height() 827 assert bh >= good, (bh, good) 828 if bh == good: 829 height = good + 1 830 self.logger.info(f"catching up from {height}") 831 return 'no_fork', height 832 833 # this is a new fork we don't yet have 834 height = bad + 1 835 self.logger.info(f"new fork at bad height {bad}") 836 forkfun = self.blockchain.fork if 'mock' not in bad_header else bad_header['mock']['fork'] 837 b = forkfun(bad_header) # type: Blockchain 838 self.blockchain = b 839 assert b.forkpoint == bad 840 return 'fork', height 841 842 async def _search_headers_backwards(self, height, header): 843 async def iterate(): 844 nonlocal height, header 845 checkp = False 846 if height <= constants.net.max_checkpoint(): 847 height = constants.net.max_checkpoint() 848 checkp = True 849 header = await self.get_block_header(height, 'backward') 850 chain = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header) 851 can_connect = blockchain.can_connect(header) if 'mock' not in header else header['mock']['connect'](height) 852 if chain or can_connect: 853 return False 854 if checkp: 855 raise GracefulDisconnect("server chain conflicts with checkpoints") 856 return True 857 858 bad, bad_header = height, header 859 _assert_header_does_not_check_against_any_chain(bad_header) 860 with blockchain.blockchains_lock: chains = list(blockchain.blockchains.values()) 861 local_max = max([0] + [x.height() for x in chains]) if 'mock' not in header else float('inf') 862 height = min(local_max + 1, height - 1) 863 while await iterate(): 864 bad, bad_header = height, header 865 delta = self.tip - height 866 height = self.tip - 2 * delta 867 868 _assert_header_does_not_check_against_any_chain(bad_header) 869 self.logger.info(f"exiting backward mode at {height}") 870 return height, header, bad, bad_header 871 872 @classmethod 873 def client_name(cls) -> str: 874 return f'electrum/{version.ELECTRUM_VERSION}' 875 876 def is_tor(self): 877 return self.host.endswith('.onion') 878 879 def ip_addr(self) -> Optional[str]: 880 session = self.session 881 if not session: return None 882 peer_addr = session.remote_address() 883 if not peer_addr: return None 884 return str(peer_addr.host) 885 886 def bucket_based_on_ipaddress(self) -> str: 887 def do_bucket(): 888 if self.is_tor(): 889 return BUCKET_NAME_OF_ONION_SERVERS 890 try: 891 ip_addr = ip_address(self.ip_addr()) # type: Union[IPv4Address, IPv6Address] 892 except ValueError: 893 return '' 894 if not ip_addr: 895 return '' 896 if ip_addr.is_loopback: # localhost is exempt 897 return '' 898 if ip_addr.version == 4: 899 slash16 = IPv4Network(ip_addr).supernet(prefixlen_diff=32-16) 900 return str(slash16) 901 elif ip_addr.version == 6: 902 slash48 = IPv6Network(ip_addr).supernet(prefixlen_diff=128-48) 903 return str(slash48) 904 return '' 905 906 if not self._ipaddr_bucket: 907 self._ipaddr_bucket = do_bucket() 908 return self._ipaddr_bucket 909 910 async def get_merkle_for_transaction(self, tx_hash: str, tx_height: int) -> dict: 911 if not is_hash256_str(tx_hash): 912 raise Exception(f"{repr(tx_hash)} is not a txid") 913 if not is_non_negative_integer(tx_height): 914 raise Exception(f"{repr(tx_height)} is not a block height") 915 # do request 916 res = await self.session.send_request('blockchain.transaction.get_merkle', [tx_hash, tx_height]) 917 # check response 918 block_height = assert_dict_contains_field(res, field_name='block_height') 919 merkle = assert_dict_contains_field(res, field_name='merkle') 920 pos = assert_dict_contains_field(res, field_name='pos') 921 # note: tx_height was just a hint to the server, don't enforce the response to match it 922 assert_non_negative_integer(block_height) 923 assert_non_negative_integer(pos) 924 assert_list_or_tuple(merkle) 925 for item in merkle: 926 assert_hash256_str(item) 927 return res 928 929 async def get_transaction(self, tx_hash: str, *, timeout=None) -> str: 930 if not is_hash256_str(tx_hash): 931 raise Exception(f"{repr(tx_hash)} is not a txid") 932 raw = await self.session.send_request('blockchain.transaction.get', [tx_hash], timeout=timeout) 933 # validate response 934 if not is_hex_str(raw): 935 raise RequestCorrupted(f"received garbage (non-hex) as tx data (txid {tx_hash}): {raw!r}") 936 tx = Transaction(raw) 937 try: 938 tx.deserialize() # see if raises 939 except Exception as e: 940 raise RequestCorrupted(f"cannot deserialize received transaction (txid {tx_hash})") from e 941 if tx.txid() != tx_hash: 942 raise RequestCorrupted(f"received tx does not match expected txid {tx_hash} (got {tx.txid()})") 943 return raw 944 945 async def get_history_for_scripthash(self, sh: str) -> List[dict]: 946 if not is_hash256_str(sh): 947 raise Exception(f"{repr(sh)} is not a scripthash") 948 # do request 949 res = await self.session.send_request('blockchain.scripthash.get_history', [sh]) 950 # check response 951 assert_list_or_tuple(res) 952 prev_height = 1 953 for tx_item in res: 954 height = assert_dict_contains_field(tx_item, field_name='height') 955 assert_dict_contains_field(tx_item, field_name='tx_hash') 956 assert_integer(height) 957 assert_hash256_str(tx_item['tx_hash']) 958 if height in (-1, 0): 959 assert_dict_contains_field(tx_item, field_name='fee') 960 assert_non_negative_integer(tx_item['fee']) 961 prev_height = - float("inf") # this ensures confirmed txs can't follow mempool txs 962 else: 963 # check monotonicity of heights 964 if height < prev_height: 965 raise RequestCorrupted(f'heights of confirmed txs must be in increasing order') 966 prev_height = height 967 hashes = set(map(lambda item: item['tx_hash'], res)) 968 if len(hashes) != len(res): 969 # Either server is sending garbage... or maybe if server is race-prone 970 # a recently mined tx could be included in both last block and mempool? 971 # Still, it's simplest to just disregard the response. 972 raise RequestCorrupted(f"server history has non-unique txids for sh={sh}") 973 return res 974 975 async def listunspent_for_scripthash(self, sh: str) -> List[dict]: 976 if not is_hash256_str(sh): 977 raise Exception(f"{repr(sh)} is not a scripthash") 978 # do request 979 res = await self.session.send_request('blockchain.scripthash.listunspent', [sh]) 980 # check response 981 assert_list_or_tuple(res) 982 for utxo_item in res: 983 assert_dict_contains_field(utxo_item, field_name='tx_pos') 984 assert_dict_contains_field(utxo_item, field_name='value') 985 assert_dict_contains_field(utxo_item, field_name='tx_hash') 986 assert_dict_contains_field(utxo_item, field_name='height') 987 assert_non_negative_integer(utxo_item['tx_pos']) 988 assert_non_negative_integer(utxo_item['value']) 989 assert_non_negative_integer(utxo_item['height']) 990 assert_hash256_str(utxo_item['tx_hash']) 991 return res 992 993 async def get_balance_for_scripthash(self, sh: str) -> dict: 994 if not is_hash256_str(sh): 995 raise Exception(f"{repr(sh)} is not a scripthash") 996 # do request 997 res = await self.session.send_request('blockchain.scripthash.get_balance', [sh]) 998 # check response 999 assert_dict_contains_field(res, field_name='confirmed') 1000 assert_dict_contains_field(res, field_name='unconfirmed') 1001 assert_non_negative_integer(res['confirmed']) 1002 assert_non_negative_integer(res['unconfirmed']) 1003 return res 1004 1005 async def get_txid_from_txpos(self, tx_height: int, tx_pos: int, merkle: bool): 1006 if not is_non_negative_integer(tx_height): 1007 raise Exception(f"{repr(tx_height)} is not a block height") 1008 if not is_non_negative_integer(tx_pos): 1009 raise Exception(f"{repr(tx_pos)} should be non-negative integer") 1010 # do request 1011 res = await self.session.send_request( 1012 'blockchain.transaction.id_from_pos', 1013 [tx_height, tx_pos, merkle], 1014 ) 1015 # check response 1016 if merkle: 1017 assert_dict_contains_field(res, field_name='tx_hash') 1018 assert_dict_contains_field(res, field_name='merkle') 1019 assert_hash256_str(res['tx_hash']) 1020 assert_list_or_tuple(res['merkle']) 1021 for node_hash in res['merkle']: 1022 assert_hash256_str(node_hash) 1023 else: 1024 assert_hash256_str(res) 1025 return res 1026 1027 async def get_fee_histogram(self) -> Sequence[Tuple[Union[float, int], int]]: 1028 # do request 1029 res = await self.session.send_request('mempool.get_fee_histogram') 1030 # check response 1031 assert_list_or_tuple(res) 1032 prev_fee = float('inf') 1033 for fee, s in res: 1034 assert_non_negative_int_or_float(fee) 1035 assert_non_negative_integer(s) 1036 if fee >= prev_fee: # check monotonicity 1037 raise RequestCorrupted(f'fees must be in decreasing order') 1038 prev_fee = fee 1039 return res 1040 1041 async def get_server_banner(self) -> str: 1042 # do request 1043 res = await self.session.send_request('server.banner') 1044 # check response 1045 if not isinstance(res, str): 1046 raise RequestCorrupted(f'{res!r} should be a str') 1047 return res 1048 1049 async def get_donation_address(self) -> str: 1050 # do request 1051 res = await self.session.send_request('server.donation_address') 1052 # check response 1053 if not res: # ignore empty string 1054 return '' 1055 if not bitcoin.is_address(res): 1056 # note: do not hard-fail -- allow server to use future-type 1057 # bitcoin address we do not recognize 1058 self.logger.info(f"invalid donation address from server: {repr(res)}") 1059 res = '' 1060 return res 1061 1062 async def get_relay_fee(self) -> int: 1063 """Returns the min relay feerate in sat/kbyte.""" 1064 # do request 1065 res = await self.session.send_request('blockchain.relayfee') 1066 # check response 1067 assert_non_negative_int_or_float(res) 1068 relayfee = int(res * bitcoin.COIN) 1069 relayfee = max(0, relayfee) 1070 return relayfee 1071 1072 async def get_estimatefee(self, num_blocks: int) -> int: 1073 """Returns a feerate estimate for getting confirmed within 1074 num_blocks blocks, in sat/kbyte. 1075 """ 1076 if not is_non_negative_integer(num_blocks): 1077 raise Exception(f"{repr(num_blocks)} is not a num_blocks") 1078 # do request 1079 res = await self.session.send_request('blockchain.estimatefee', [num_blocks]) 1080 # check response 1081 if res != -1: 1082 assert_non_negative_int_or_float(res) 1083 res = int(res * bitcoin.COIN) 1084 return res 1085 1086 1087 def _assert_header_does_not_check_against_any_chain(header: dict) -> None: 1088 chain_bad = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header) 1089 if chain_bad: 1090 raise Exception('bad_header must not check!') 1091 1092 1093 def check_cert(host, cert): 1094 try: 1095 b = pem.dePem(cert, 'CERTIFICATE') 1096 x = x509.X509(b) 1097 except: 1098 traceback.print_exc(file=sys.stdout) 1099 return 1100 1101 try: 1102 x.check_date() 1103 expired = False 1104 except: 1105 expired = True 1106 1107 m = "host: %s\n"%host 1108 m += "has_expired: %s\n"% expired 1109 util.print_msg(m) 1110 1111 1112 # Used by tests 1113 def _match_hostname(name, val): 1114 if val == name: 1115 return True 1116 1117 return val.startswith('*.') and name.endswith(val[1:]) 1118 1119 1120 def test_certificates(): 1121 from .simple_config import SimpleConfig 1122 config = SimpleConfig() 1123 mydir = os.path.join(config.path, "certs") 1124 certs = os.listdir(mydir) 1125 for c in certs: 1126 p = os.path.join(mydir,c) 1127 with open(p, encoding='utf-8') as f: 1128 cert = f.read() 1129 check_cert(c, cert) 1130 1131 if __name__ == "__main__": 1132 test_certificates()