network.py (58124B)
1 # Electrum - Lightweight Bitcoin Client 2 # Copyright (c) 2011-2016 Thomas Voegtlin 3 # 4 # Permission is hereby granted, free of charge, to any person 5 # obtaining a copy of this software and associated documentation files 6 # (the "Software"), to deal in the Software without restriction, 7 # including without limitation the rights to use, copy, modify, merge, 8 # publish, distribute, sublicense, and/or sell copies of the Software, 9 # and to permit persons to whom the Software is furnished to do so, 10 # subject to the following conditions: 11 # 12 # The above copyright notice and this permission notice shall be 13 # included in all copies or substantial portions of the Software. 14 # 15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 16 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 17 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 18 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS 19 # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 20 # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 21 # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 22 # SOFTWARE. 23 import asyncio 24 import time 25 import queue 26 import os 27 import random 28 import re 29 from collections import defaultdict 30 import threading 31 import socket 32 import json 33 import sys 34 import asyncio 35 from typing import NamedTuple, Optional, Sequence, List, Dict, Tuple, TYPE_CHECKING, Iterable, Set, Any 36 import traceback 37 import concurrent 38 from concurrent import futures 39 import copy 40 import functools 41 42 import aiorpcx 43 from aiorpcx import TaskGroup, ignore_after 44 from aiohttp import ClientResponse 45 46 from . import util 47 from .util import (log_exceptions, ignore_exceptions, 48 bfh, SilentTaskGroup, make_aiohttp_session, send_exception_to_crash_reporter, 49 is_hash256_str, is_non_negative_integer, MyEncoder, NetworkRetryManager, 50 nullcontext) 51 from .bitcoin import COIN 52 from . import constants 53 from . import blockchain 54 from . import bitcoin 55 from . import dns_hacks 56 from .transaction import Transaction 57 from .blockchain import Blockchain, HEADER_SIZE 58 from .interface import (Interface, PREFERRED_NETWORK_PROTOCOL, 59 RequestTimedOut, NetworkTimeout, BUCKET_NAME_OF_ONION_SERVERS, 60 NetworkException, RequestCorrupted, ServerAddr) 61 from .version import PROTOCOL_VERSION 62 from .simple_config import SimpleConfig 63 from .i18n import _ 64 from .logging import get_logger, Logger 65 from .lnutil import ChannelBlackList 66 67 if TYPE_CHECKING: 68 from .channel_db import ChannelDB 69 from .lnworker import LNGossip 70 from .lnwatcher import WatchTower 71 from .daemon import Daemon 72 73 74 _logger = get_logger(__name__) 75 76 77 NUM_TARGET_CONNECTED_SERVERS = 10 78 NUM_STICKY_SERVERS = 4 79 NUM_RECENT_SERVERS = 20 80 81 82 def parse_servers(result: Sequence[Tuple[str, str, List[str]]]) -> Dict[str, dict]: 83 """Convert servers list (from protocol method "server.peers.subscribe") into dict format. 84 Also validate values, such as IP addresses and ports. 85 """ 86 servers = {} 87 for item in result: 88 host = item[1] 89 out = {} 90 version = None 91 pruning_level = '-' 92 if len(item) > 2: 93 for v in item[2]: 94 if re.match(r"[st]\d*", v): 95 protocol, port = v[0], v[1:] 96 if port == '': port = constants.net.DEFAULT_PORTS[protocol] 97 ServerAddr(host, port, protocol=protocol) # check if raises 98 out[protocol] = port 99 elif re.match("v(.?)+", v): 100 version = v[1:] 101 elif re.match(r"p\d*", v): 102 pruning_level = v[1:] 103 if pruning_level == '': pruning_level = '0' 104 if out: 105 out['pruning'] = pruning_level 106 out['version'] = version 107 servers[host] = out 108 return servers 109 110 111 def filter_version(servers): 112 def is_recent(version): 113 try: 114 return util.versiontuple(version) >= util.versiontuple(PROTOCOL_VERSION) 115 except Exception as e: 116 return False 117 return {k: v for k, v in servers.items() if is_recent(v.get('version'))} 118 119 120 def filter_noonion(servers): 121 return {k: v for k, v in servers.items() if not k.endswith('.onion')} 122 123 124 def filter_protocol(hostmap, *, allowed_protocols: Iterable[str] = None) -> Sequence[ServerAddr]: 125 """Filters the hostmap for those implementing protocol.""" 126 if allowed_protocols is None: 127 allowed_protocols = {PREFERRED_NETWORK_PROTOCOL} 128 eligible = [] 129 for host, portmap in hostmap.items(): 130 for protocol in allowed_protocols: 131 port = portmap.get(protocol) 132 if port: 133 eligible.append(ServerAddr(host, port, protocol=protocol)) 134 return eligible 135 136 137 def pick_random_server(hostmap=None, *, allowed_protocols: Iterable[str], 138 exclude_set: Set[ServerAddr] = None) -> Optional[ServerAddr]: 139 if hostmap is None: 140 hostmap = constants.net.DEFAULT_SERVERS 141 if exclude_set is None: 142 exclude_set = set() 143 servers = set(filter_protocol(hostmap, allowed_protocols=allowed_protocols)) 144 eligible = list(servers - exclude_set) 145 return random.choice(eligible) if eligible else None 146 147 148 class NetworkParameters(NamedTuple): 149 server: ServerAddr 150 proxy: Optional[dict] 151 auto_connect: bool 152 oneserver: bool = False 153 154 155 proxy_modes = ['socks4', 'socks5'] 156 157 158 def serialize_proxy(p): 159 if not isinstance(p, dict): 160 return None 161 return ':'.join([p.get('mode'), p.get('host'), p.get('port'), 162 p.get('user', ''), p.get('password', '')]) 163 164 165 def deserialize_proxy(s: str) -> Optional[dict]: 166 if not isinstance(s, str): 167 return None 168 if s.lower() == 'none': 169 return None 170 proxy = { "mode":"socks5", "host":"localhost" } 171 # FIXME raw IPv6 address fails here 172 args = s.split(':') 173 n = 0 174 if proxy_modes.count(args[n]) == 1: 175 proxy["mode"] = args[n] 176 n += 1 177 if len(args) > n: 178 proxy["host"] = args[n] 179 n += 1 180 if len(args) > n: 181 proxy["port"] = args[n] 182 n += 1 183 else: 184 proxy["port"] = "8080" if proxy["mode"] == "http" else "1080" 185 if len(args) > n: 186 proxy["user"] = args[n] 187 n += 1 188 if len(args) > n: 189 proxy["password"] = args[n] 190 return proxy 191 192 193 class BestEffortRequestFailed(NetworkException): pass 194 195 196 class TxBroadcastError(NetworkException): 197 def get_message_for_gui(self): 198 raise NotImplementedError() 199 200 201 class TxBroadcastHashMismatch(TxBroadcastError): 202 def get_message_for_gui(self): 203 return "{}\n{}\n\n{}" \ 204 .format(_("The server returned an unexpected transaction ID when broadcasting the transaction."), 205 _("Consider trying to connect to a different server, or updating Electrum."), 206 str(self)) 207 208 209 class TxBroadcastServerReturnedError(TxBroadcastError): 210 def get_message_for_gui(self): 211 return "{}\n{}\n\n{}" \ 212 .format(_("The server returned an error when broadcasting the transaction."), 213 _("Consider trying to connect to a different server, or updating Electrum."), 214 str(self)) 215 216 217 class TxBroadcastUnknownError(TxBroadcastError): 218 def get_message_for_gui(self): 219 return "{}\n{}" \ 220 .format(_("Unknown error when broadcasting the transaction."), 221 _("Consider trying to connect to a different server, or updating Electrum.")) 222 223 224 class UntrustedServerReturnedError(NetworkException): 225 def __init__(self, *, original_exception): 226 self.original_exception = original_exception 227 228 def get_message_for_gui(self) -> str: 229 return str(self) 230 231 def __str__(self): 232 return _("The server returned an error.") 233 234 def __repr__(self): 235 return (f"<UntrustedServerReturnedError " 236 f"[DO NOT TRUST THIS MESSAGE] original_exception: {repr(self.original_exception)}>") 237 238 239 _INSTANCE = None 240 241 242 class Network(Logger, NetworkRetryManager[ServerAddr]): 243 """The Network class manages a set of connections to remote electrum 244 servers, each connected socket is handled by an Interface() object. 245 """ 246 247 LOGGING_SHORTCUT = 'n' 248 249 taskgroup: Optional[TaskGroup] 250 interface: Optional[Interface] 251 interfaces: Dict[ServerAddr, Interface] 252 _connecting_ifaces: Set[ServerAddr] 253 _closing_ifaces: Set[ServerAddr] 254 default_server: ServerAddr 255 _recent_servers: List[ServerAddr] 256 257 channel_blacklist: 'ChannelBlackList' 258 channel_db: Optional['ChannelDB'] = None 259 lngossip: Optional['LNGossip'] = None 260 local_watchtower: Optional['WatchTower'] = None 261 262 def __init__(self, config: SimpleConfig, *, daemon: 'Daemon' = None): 263 global _INSTANCE 264 assert _INSTANCE is None, "Network is a singleton!" 265 _INSTANCE = self 266 267 Logger.__init__(self) 268 NetworkRetryManager.__init__( 269 self, 270 max_retry_delay_normal=600, 271 init_retry_delay_normal=15, 272 max_retry_delay_urgent=10, 273 init_retry_delay_urgent=1, 274 ) 275 276 self.asyncio_loop = asyncio.get_event_loop() 277 assert self.asyncio_loop.is_running(), "event loop not running" 278 try: 279 self._loop_thread = self.asyncio_loop._mythread # type: threading.Thread # only used for sanity checks 280 except AttributeError as e: 281 self.logger.warning(f"asyncio loop does not have _mythread set: {e!r}") 282 self._loop_thread = None 283 284 assert isinstance(config, SimpleConfig), f"config should be a SimpleConfig instead of {type(config)}" 285 self.config = config 286 287 self.daemon = daemon 288 289 blockchain.read_blockchains(self.config) 290 blockchain.init_headers_file_for_best_chain() 291 self.logger.info(f"blockchains {list(map(lambda b: b.forkpoint, blockchain.blockchains.values()))}") 292 self._blockchain_preferred_block = self.config.get('blockchain_preferred_block', None) # type: Dict[str, Any] 293 if self._blockchain_preferred_block is None: 294 self._set_preferred_chain(None) 295 self._blockchain = blockchain.get_best_chain() 296 297 self._allowed_protocols = {PREFERRED_NETWORK_PROTOCOL} 298 299 # Server for addresses and transactions 300 self.default_server = self.config.get('server', None) 301 # Sanitize default server 302 if self.default_server: 303 try: 304 self.default_server = ServerAddr.from_str(self.default_server) 305 except: 306 self.logger.warning('failed to parse server-string; falling back to localhost:1:s.') 307 self.default_server = ServerAddr.from_str("localhost:1:s") 308 else: 309 self.default_server = pick_random_server(allowed_protocols=self._allowed_protocols) 310 assert isinstance(self.default_server, ServerAddr), f"invalid type for default_server: {self.default_server!r}" 311 312 self.taskgroup = None 313 314 # locks 315 self.restart_lock = asyncio.Lock() 316 self.bhi_lock = asyncio.Lock() 317 self.recent_servers_lock = threading.RLock() # <- re-entrant 318 self.interfaces_lock = threading.Lock() # for mutating/iterating self.interfaces 319 320 self.server_peers = {} # returned by interface (servers that the main interface knows about) 321 self._recent_servers = self._read_recent_servers() # note: needs self.recent_servers_lock 322 323 self.banner = '' 324 self.donation_address = '' 325 self.relay_fee = None # type: Optional[int] 326 327 dir_path = os.path.join(self.config.path, 'certs') 328 util.make_dir(dir_path) 329 330 # the main server we are currently communicating with 331 self.interface = None 332 self.default_server_changed_event = asyncio.Event() 333 # Set of servers we have an ongoing connection with. 334 # For any ServerAddr, at most one corresponding Interface object 335 # can exist at any given time. Depending on the state of that Interface, 336 # the ServerAddr can be found in one of the following sets. 337 # Note: during a transition, the ServerAddr can appear in two sets momentarily. 338 self._connecting_ifaces = set() 339 self.interfaces = {} # these are the ifaces in "initialised and usable" state 340 self._closing_ifaces = set() 341 342 self.auto_connect = self.config.get('auto_connect', True) 343 self.proxy = None 344 self._maybe_set_oneserver() 345 346 # Dump network messages (all interfaces). Set at runtime from the console. 347 self.debug = False 348 349 self._set_status('disconnected') 350 self._has_ever_managed_to_connect_to_server = False 351 352 # lightning network 353 self.channel_blacklist = ChannelBlackList() 354 if self.config.get('run_local_watchtower', False): 355 from . import lnwatcher 356 self.local_watchtower = lnwatcher.WatchTower(self) 357 self.local_watchtower.start_network(self) 358 asyncio.ensure_future(self.local_watchtower.start_watching()) 359 360 def has_internet_connection(self) -> bool: 361 """Our guess whether the device has Internet-connectivity.""" 362 return self._has_ever_managed_to_connect_to_server 363 364 def has_channel_db(self): 365 return self.channel_db is not None 366 367 def start_gossip(self): 368 from . import lnrouter 369 from . import channel_db 370 from . import lnworker 371 if not self.config.get('use_gossip'): 372 return 373 if self.lngossip is None: 374 self.channel_db = channel_db.ChannelDB(self) 375 self.path_finder = lnrouter.LNPathFinder(self.channel_db) 376 self.channel_db.load_data() 377 self.lngossip = lnworker.LNGossip() 378 self.lngossip.start_network(self) 379 380 async def stop_gossip(self, *, full_shutdown: bool = False): 381 if self.lngossip: 382 await self.lngossip.stop() 383 self.lngossip = None 384 self.channel_db.stop() 385 if full_shutdown: 386 await self.channel_db.stopped_event.wait() 387 self.channel_db = None 388 389 def run_from_another_thread(self, coro, *, timeout=None): 390 assert self._loop_thread != threading.current_thread(), 'must not be called from network thread' 391 fut = asyncio.run_coroutine_threadsafe(coro, self.asyncio_loop) 392 return fut.result(timeout) 393 394 @staticmethod 395 def get_instance() -> Optional["Network"]: 396 return _INSTANCE 397 398 def with_recent_servers_lock(func): 399 def func_wrapper(self, *args, **kwargs): 400 with self.recent_servers_lock: 401 return func(self, *args, **kwargs) 402 return func_wrapper 403 404 def _read_recent_servers(self) -> List[ServerAddr]: 405 if not self.config.path: 406 return [] 407 path = os.path.join(self.config.path, "recent_servers") 408 try: 409 with open(path, "r", encoding='utf-8') as f: 410 data = f.read() 411 servers_list = json.loads(data) 412 return [ServerAddr.from_str(s) for s in servers_list] 413 except: 414 return [] 415 416 @with_recent_servers_lock 417 def _save_recent_servers(self): 418 if not self.config.path: 419 return 420 path = os.path.join(self.config.path, "recent_servers") 421 s = json.dumps(self._recent_servers, indent=4, sort_keys=True, cls=MyEncoder) 422 try: 423 with open(path, "w", encoding='utf-8') as f: 424 f.write(s) 425 except: 426 pass 427 428 async def _server_is_lagging(self) -> bool: 429 sh = self.get_server_height() 430 if not sh: 431 self.logger.info('no height for main interface') 432 return True 433 lh = self.get_local_height() 434 result = (lh - sh) > 1 435 if result: 436 self.logger.info(f'{self.default_server} is lagging ({sh} vs {lh})') 437 return result 438 439 def _set_status(self, status): 440 self.connection_status = status 441 self.notify('status') 442 443 def is_connected(self): 444 interface = self.interface 445 return interface is not None and interface.ready.done() 446 447 def is_connecting(self): 448 return self.connection_status == 'connecting' 449 450 async def _request_server_info(self, interface: 'Interface'): 451 await interface.ready 452 # TODO: libbitcoin: session = interface.session 453 454 async def get_banner(): 455 self.banner = await interface.get_server_banner() 456 self.notify('banner') 457 async def get_donation_address(): 458 self.donation_address = await interface.get_donation_address() 459 async def get_server_peers(): 460 # ORIG: server_peers = await session.send_request('server.peers.subscribe') 461 # TODO: libbitcoin 462 server_peers = [] 463 random.shuffle(server_peers) 464 max_accepted_peers = len(constants.net.DEFAULT_SERVERS) + NUM_RECENT_SERVERS 465 server_peers = server_peers[:max_accepted_peers] 466 # note that 'parse_servers' also validates the data (which is untrusted input!) 467 self.server_peers = parse_servers(server_peers) 468 self.notify('servers') 469 async def get_relay_fee(): 470 self.relay_fee = await interface.get_relay_fee() 471 472 async with TaskGroup() as group: 473 await group.spawn(get_banner) 474 await group.spawn(get_donation_address) 475 await group.spawn(get_server_peers) 476 await group.spawn(get_relay_fee) 477 await group.spawn(self._request_fee_estimates(interface)) 478 479 async def _request_fee_estimates(self, interface): 480 self.config.requested_fee_estimates() 481 histogram = await interface.get_fee_histogram() 482 self.config.mempool_fees = histogram 483 self.logger.info(f'fee_histogram {histogram}') 484 self.notify('fee_histogram') 485 486 def get_status_value(self, key): 487 if key == 'status': 488 value = self.connection_status 489 elif key == 'banner': 490 value = self.banner 491 elif key == 'fee': 492 value = self.config.fee_estimates 493 elif key == 'fee_histogram': 494 value = self.config.mempool_fees 495 elif key == 'servers': 496 value = self.get_servers() 497 else: 498 raise Exception('unexpected trigger key {}'.format(key)) 499 return value 500 501 def notify(self, key): 502 if key in ['status', 'updated']: 503 util.trigger_callback(key) 504 else: 505 util.trigger_callback(key, self.get_status_value(key)) 506 507 def get_parameters(self) -> NetworkParameters: 508 return NetworkParameters(server=self.default_server, 509 proxy=self.proxy, 510 auto_connect=self.auto_connect, 511 oneserver=self.oneserver) 512 513 def get_donation_address(self): 514 if self.is_connected(): 515 return self.donation_address 516 517 def get_interfaces(self) -> List[ServerAddr]: 518 """The list of servers for the connected interfaces.""" 519 with self.interfaces_lock: 520 return list(self.interfaces) 521 522 def get_fee_estimates(self): 523 from statistics import median 524 from .simple_config import FEE_ETA_TARGETS 525 if self.auto_connect: 526 with self.interfaces_lock: 527 out = {} 528 for n in FEE_ETA_TARGETS: 529 try: 530 out[n] = int(median(filter(None, [i.fee_estimates_eta.get(n) for i in self.interfaces.values()]))) 531 except: 532 continue 533 return out 534 else: 535 if not self.interface: 536 return {} 537 return self.interface.fee_estimates_eta 538 539 def update_fee_estimates(self): 540 e = self.get_fee_estimates() 541 for nblock_target, fee in e.items(): 542 self.config.update_fee_estimates(nblock_target, fee) 543 if not hasattr(self, "_prev_fee_est") or self._prev_fee_est != e: 544 self._prev_fee_est = copy.copy(e) 545 self.logger.info(f'fee_estimates {e}') 546 self.notify('fee') 547 548 @with_recent_servers_lock 549 def get_servers(self): 550 # note: order of sources when adding servers here is crucial! 551 # don't let "server_peers" overwrite anything, 552 # otherwise main server can eclipse the client 553 out = dict() 554 # add servers received from main interface 555 server_peers = self.server_peers 556 if server_peers: 557 out.update(filter_version(server_peers.copy())) 558 # hardcoded servers 559 out.update(constants.net.DEFAULT_SERVERS) 560 # add recent servers 561 for server in self._recent_servers: 562 port = str(server.port) 563 if server.host in out: 564 out[server.host].update({server.protocol: port}) 565 else: 566 out[server.host] = {server.protocol: port} 567 # potentially filter out some 568 if self.config.get('noonion'): 569 out = filter_noonion(out) 570 return out 571 572 def _get_next_server_to_try(self) -> Optional[ServerAddr]: 573 now = time.time() 574 with self.interfaces_lock: 575 connected_servers = set(self.interfaces) | self._connecting_ifaces | self._closing_ifaces 576 # First try from recent servers. (which are persisted) 577 # As these are servers we successfully connected to recently, they are 578 # most likely to work. This also makes servers "sticky". 579 # Note: with sticky servers, it is more difficult for an attacker to eclipse the client, 580 # however if they succeed, the eclipsing would persist. To try to balance this, 581 # we only give priority to recent_servers up to NUM_STICKY_SERVERS. 582 with self.recent_servers_lock: 583 recent_servers = list(self._recent_servers) 584 recent_servers = [s for s in recent_servers if s.protocol in self._allowed_protocols] 585 if len(connected_servers & set(recent_servers)) < NUM_STICKY_SERVERS: 586 for server in recent_servers: 587 if server in connected_servers: 588 continue 589 if not self._can_retry_addr(server, now=now): 590 continue 591 return server 592 # try all servers we know about, pick one at random 593 hostmap = self.get_servers() 594 servers = list(set(filter_protocol(hostmap, allowed_protocols=self._allowed_protocols)) - connected_servers) 595 random.shuffle(servers) 596 for server in servers: 597 if not self._can_retry_addr(server, now=now): 598 continue 599 return server 600 return None 601 602 def _set_proxy(self, proxy: Optional[dict]): 603 self.proxy = proxy 604 dns_hacks.configure_dns_depending_on_proxy(bool(proxy)) 605 self.logger.info(f'setting proxy {proxy}') 606 util.trigger_callback('proxy_set', self.proxy) 607 608 @log_exceptions 609 async def set_parameters(self, net_params: NetworkParameters): 610 proxy = net_params.proxy 611 proxy_str = serialize_proxy(proxy) 612 server = net_params.server 613 # sanitize parameters 614 try: 615 if proxy: 616 proxy_modes.index(proxy['mode']) + 1 617 int(proxy['port']) 618 except: 619 return 620 self.config.set_key('auto_connect', net_params.auto_connect, False) 621 self.config.set_key('oneserver', net_params.oneserver, False) 622 self.config.set_key('proxy', proxy_str, False) 623 self.config.set_key('server', str(server), True) 624 # abort if changes were not allowed by config 625 if self.config.get('server') != str(server) \ 626 or self.config.get('proxy') != proxy_str \ 627 or self.config.get('oneserver') != net_params.oneserver: 628 return 629 630 async with self.restart_lock: 631 self.auto_connect = net_params.auto_connect 632 if self.proxy != proxy or self.oneserver != net_params.oneserver: 633 # Restart the network defaulting to the given server 634 await self.stop(full_shutdown=False) 635 self.default_server = server 636 await self._start() 637 elif self.default_server != server: 638 await self.switch_to_interface(server) 639 else: 640 await self.switch_lagging_interface() 641 642 def _maybe_set_oneserver(self) -> None: 643 oneserver = bool(self.config.get('oneserver', False)) 644 self.oneserver = oneserver 645 self.num_server = NUM_TARGET_CONNECTED_SERVERS if not oneserver else 0 646 647 async def _switch_to_random_interface(self): 648 '''Switch to a random connected server other than the current one''' 649 servers = self.get_interfaces() # Those in connected state 650 if self.default_server in servers: 651 servers.remove(self.default_server) 652 if servers: 653 await self.switch_to_interface(random.choice(servers)) 654 655 async def switch_lagging_interface(self): 656 """If auto_connect and lagging, switch interface (only within fork).""" 657 if self.auto_connect and await self._server_is_lagging(): 658 # switch to one that has the correct header (not height) 659 best_header = self.blockchain().header_at_tip() 660 with self.interfaces_lock: interfaces = list(self.interfaces.values()) 661 filtered = list(filter(lambda iface: iface.tip_header == best_header, interfaces)) 662 if filtered: 663 chosen_iface = random.choice(filtered) 664 await self.switch_to_interface(chosen_iface.server) 665 666 async def switch_unwanted_fork_interface(self) -> None: 667 """If auto_connect, maybe switch to another fork/chain.""" 668 if not self.auto_connect or not self.interface: 669 return 670 with self.interfaces_lock: interfaces = list(self.interfaces.values()) 671 pref_height = self._blockchain_preferred_block['height'] 672 pref_hash = self._blockchain_preferred_block['hash'] 673 # shortcut for common case 674 if pref_height == 0: 675 return 676 # maybe try switching chains; starting with most desirable first 677 matching_chains = blockchain.get_chains_that_contain_header(pref_height, pref_hash) 678 chains_to_try = list(matching_chains) + [blockchain.get_best_chain()] 679 for rank, chain in enumerate(chains_to_try): 680 # check if main interface is already on this fork 681 if self.interface.blockchain == chain: 682 return 683 # switch to another random interface that is on this fork, if any 684 filtered = [iface for iface in interfaces 685 if iface.blockchain == chain] 686 if filtered: 687 self.logger.info(f"switching to (more) preferred fork (rank {rank})") 688 chosen_iface = random.choice(filtered) 689 await self.switch_to_interface(chosen_iface.server) 690 return 691 self.logger.info("tried to switch to (more) preferred fork but no interfaces are on any") 692 693 async def switch_to_interface(self, server: ServerAddr): 694 """Switch to server as our main interface. If no connection exists, 695 queue interface to be started. The actual switch will 696 happen when the interface becomes ready. 697 """ 698 self.default_server = server 699 old_interface = self.interface 700 old_server = old_interface.server if old_interface else None 701 702 # Stop any current interface in order to terminate subscriptions, 703 # and to cancel tasks in interface.taskgroup. 704 if old_server and old_server != server: 705 # don't wait for old_interface to close as that might be slow: 706 await self.taskgroup.spawn(self._close_interface(old_interface)) 707 708 if server not in self.interfaces: 709 self.interface = None 710 await self.taskgroup.spawn(self._run_new_interface(server)) 711 return 712 713 i = self.interfaces[server] 714 if old_interface != i: 715 self.logger.info(f"switching to {server}") 716 assert i.ready.done(), "interface we are switching to is not ready yet" 717 blockchain_updated = i.blockchain != self.blockchain() 718 self.interface = i 719 await i.taskgroup.spawn(self._request_server_info(i)) 720 util.trigger_callback('default_server_changed') 721 self.default_server_changed_event.set() 722 self.default_server_changed_event.clear() 723 self._set_status('connected') 724 util.trigger_callback('network_updated') 725 if blockchain_updated: 726 util.trigger_callback('blockchain_updated') 727 728 async def _close_interface(self, interface: Optional[Interface]): 729 if not interface: 730 return 731 if interface.server in self._closing_ifaces: 732 return 733 self._closing_ifaces.add(interface.server) 734 with self.interfaces_lock: 735 if self.interfaces.get(interface.server) == interface: 736 self.interfaces.pop(interface.server) 737 if interface == self.interface: 738 self.interface = None 739 try: 740 # this can take some time if server/connection is slow: 741 await interface.close() 742 await interface.got_disconnected.wait() 743 finally: 744 self._closing_ifaces.discard(interface.server) 745 746 @with_recent_servers_lock 747 def _add_recent_server(self, server: ServerAddr) -> None: 748 self._on_connection_successfully_established(server) 749 # list is ordered 750 if server in self._recent_servers: 751 self._recent_servers.remove(server) 752 self._recent_servers.insert(0, server) 753 self._recent_servers = self._recent_servers[:NUM_RECENT_SERVERS] 754 self._save_recent_servers() 755 756 async def connection_down(self, interface: Interface): 757 '''A connection to server either went down, or was never made. 758 We distinguish by whether it is in self.interfaces.''' 759 if not interface: return 760 if interface.server == self.default_server: 761 self._set_status('disconnected') 762 await self._close_interface(interface) 763 util.trigger_callback('network_updated') 764 765 def get_network_timeout_seconds(self, request_type=NetworkTimeout.Generic) -> int: 766 if self.oneserver and not self.auto_connect: 767 return request_type.MOST_RELAXED 768 if self.proxy: 769 return request_type.RELAXED 770 return request_type.NORMAL 771 772 @ignore_exceptions # do not kill outer taskgroup 773 @log_exceptions 774 async def _run_new_interface(self, server: ServerAddr): 775 if (server in self.interfaces 776 or server in self._connecting_ifaces 777 or server in self._closing_ifaces): 778 return 779 self._connecting_ifaces.add(server) 780 if server == self.default_server: 781 self.logger.info(f"connecting to {server} as new interface") 782 self._set_status('connecting') 783 self._trying_addr_now(server) 784 785 interface = Interface(network=self, server=server, proxy=self.proxy) 786 # note: using longer timeouts here as DNS can sometimes be slow! 787 timeout = self.get_network_timeout_seconds(NetworkTimeout.Generic) 788 try: 789 await asyncio.wait_for(interface.ready, timeout) 790 except BaseException as e: 791 self.logger.info(f"couldn't launch iface {server} -- {repr(e)}") 792 await interface.close() 793 return 794 else: 795 with self.interfaces_lock: 796 assert server not in self.interfaces 797 self.interfaces[server] = interface 798 finally: 799 self._connecting_ifaces.discard(server) 800 801 if server == self.default_server: 802 await self.switch_to_interface(server) 803 804 self._has_ever_managed_to_connect_to_server = True 805 self._add_recent_server(server) 806 util.trigger_callback('network_updated') 807 808 def check_interface_against_healthy_spread_of_connected_servers(self, iface_to_check: Interface) -> bool: 809 # main interface is exempt. this makes switching servers easier 810 if iface_to_check.is_main_server(): 811 return True 812 if not iface_to_check.bucket_based_on_ipaddress(): 813 return True 814 # bucket connected interfaces 815 with self.interfaces_lock: 816 interfaces = list(self.interfaces.values()) 817 if iface_to_check in interfaces: 818 interfaces.remove(iface_to_check) 819 buckets = defaultdict(list) 820 for iface in interfaces: 821 buckets[iface.bucket_based_on_ipaddress()].append(iface) 822 # check proposed server against buckets 823 onion_servers = buckets[BUCKET_NAME_OF_ONION_SERVERS] 824 if iface_to_check.is_tor(): 825 # keep number of onion servers below half of all connected servers 826 if len(onion_servers) > NUM_TARGET_CONNECTED_SERVERS // 2: 827 return False 828 else: 829 bucket = iface_to_check.bucket_based_on_ipaddress() 830 if len(buckets[bucket]) > 0: 831 return False 832 return True 833 834 def best_effort_reliable(func): 835 @functools.wraps(func) 836 async def make_reliable_wrapper(self: 'Network', *args, **kwargs): 837 for i in range(10): 838 iface = self.interface 839 # retry until there is a main interface 840 if not iface: 841 async with ignore_after(1): 842 await self.default_server_changed_event.wait() 843 continue # try again 844 assert iface.ready.done(), "interface not ready yet" 845 # try actual request 846 try: 847 async with TaskGroup(wait=any) as group: 848 task = await group.spawn(func(self, *args, **kwargs)) 849 await group.spawn(iface.got_disconnected.wait()) 850 except RequestTimedOut: 851 await iface.close() 852 await iface.got_disconnected.wait() 853 continue # try again 854 except RequestCorrupted as e: 855 # TODO ban server? 856 iface.logger.exception(f"RequestCorrupted: {e}") 857 await iface.close() 858 await iface.got_disconnected.wait() 859 continue # try again 860 if task.done() and not task.cancelled(): 861 return task.result() 862 # otherwise; try again 863 raise BestEffortRequestFailed('no interface to do request on... gave up.') 864 return make_reliable_wrapper 865 866 def catch_server_exceptions(func): 867 @functools.wraps(func) 868 async def wrapper(self, *args, **kwargs): 869 try: 870 return await func(self, *args, **kwargs) 871 except aiorpcx.jsonrpc.CodeMessageError as e: 872 raise UntrustedServerReturnedError(original_exception=e) from e 873 return wrapper 874 875 @best_effort_reliable 876 @catch_server_exceptions 877 async def get_merkle_for_transaction(self, tx_hash: str, tx_height: int) -> dict: 878 return await self.interface.get_merkle_for_transaction(tx_hash=tx_hash, tx_height=tx_height) 879 880 @best_effort_reliable 881 async def broadcast_transaction(self, tx: 'Transaction', *, timeout=None) -> None: 882 if timeout is None: 883 timeout = self.get_network_timeout_seconds(NetworkTimeout.Urgent) 884 # TODO: libbitcoin 885 _ec = await self.interface.broadcast_transaction(tx.serialize(), timeout=timeout) 886 if _ec != 0: 887 raise TxBroadcastServerReturnedError(f"not validated, error: {_ec!r}") 888 889 async def try_broadcasting(self, tx, name): 890 try: 891 await self.broadcast_transaction(tx) 892 except Exception as e: 893 self.logger.info(f'error: could not broadcast {name} {tx.txid()}, {str(e)}') 894 else: 895 self.logger.info(f'success: broadcasting {name} {tx.txid()}') 896 897 @staticmethod 898 def sanitize_tx_broadcast_response(server_msg) -> str: 899 # Unfortunately, bitcoind and hence the Electrum protocol doesn't return a useful error code. 900 # So, we use substring matching to grok the error message. 901 # server_msg is untrusted input so it should not be shown to the user. see #4968 902 server_msg = str(server_msg) 903 server_msg = server_msg.replace("\n", r"\n") 904 # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/policy/policy.cpp 905 # grep "reason =" 906 policy_error_messages = { 907 r"version": _("Transaction uses non-standard version."), 908 r"tx-size": _("The transaction was rejected because it is too large (in bytes)."), 909 r"scriptsig-size": None, 910 r"scriptsig-not-pushonly": None, 911 r"scriptpubkey": 912 ("scriptpubkey\n" + 913 _("Some of the outputs pay to a non-standard script.")), 914 r"bare-multisig": None, 915 r"dust": 916 (_("Transaction could not be broadcast due to dust outputs.\n" 917 "Some of the outputs are too small in value, probably lower than 1000 satoshis.\n" 918 "Check the units, make sure you haven't confused e.g. mBTC and BTC.")), 919 r"multi-op-return": _("The transaction was rejected because it contains multiple OP_RETURN outputs."), 920 } 921 for substring in policy_error_messages: 922 if substring in server_msg: 923 msg = policy_error_messages[substring] 924 return msg if msg else substring 925 # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/script/script_error.cpp 926 script_error_messages = { 927 r"Script evaluated without error but finished with a false/empty top stack element", 928 r"Script failed an OP_VERIFY operation", 929 r"Script failed an OP_EQUALVERIFY operation", 930 r"Script failed an OP_CHECKMULTISIGVERIFY operation", 931 r"Script failed an OP_CHECKSIGVERIFY operation", 932 r"Script failed an OP_NUMEQUALVERIFY operation", 933 r"Script is too big", 934 r"Push value size limit exceeded", 935 r"Operation limit exceeded", 936 r"Stack size limit exceeded", 937 r"Signature count negative or greater than pubkey count", 938 r"Pubkey count negative or limit exceeded", 939 r"Opcode missing or not understood", 940 r"Attempted to use a disabled opcode", 941 r"Operation not valid with the current stack size", 942 r"Operation not valid with the current altstack size", 943 r"OP_RETURN was encountered", 944 r"Invalid OP_IF construction", 945 r"Negative locktime", 946 r"Locktime requirement not satisfied", 947 r"Signature hash type missing or not understood", 948 r"Non-canonical DER signature", 949 r"Data push larger than necessary", 950 r"Only push operators allowed in signatures", 951 r"Non-canonical signature: S value is unnecessarily high", 952 r"Dummy CHECKMULTISIG argument must be zero", 953 r"OP_IF/NOTIF argument must be minimal", 954 r"Signature must be zero for failed CHECK(MULTI)SIG operation", 955 r"NOPx reserved for soft-fork upgrades", 956 r"Witness version reserved for soft-fork upgrades", 957 r"Taproot version reserved for soft-fork upgrades", 958 r"OP_SUCCESSx reserved for soft-fork upgrades", 959 r"Public key version reserved for soft-fork upgrades", 960 r"Public key is neither compressed or uncompressed", 961 r"Stack size must be exactly one after execution", 962 r"Extra items left on stack after execution", 963 r"Witness program has incorrect length", 964 r"Witness program was passed an empty witness", 965 r"Witness program hash mismatch", 966 r"Witness requires empty scriptSig", 967 r"Witness requires only-redeemscript scriptSig", 968 r"Witness provided for non-witness script", 969 r"Using non-compressed keys in segwit", 970 r"Invalid Schnorr signature size", 971 r"Invalid Schnorr signature hash type", 972 r"Invalid Schnorr signature", 973 r"Invalid Taproot control block size", 974 r"Too much signature validation relative to witness weight", 975 r"OP_CHECKMULTISIG(VERIFY) is not available in tapscript", 976 r"OP_IF/NOTIF argument must be minimal in tapscript", 977 r"Using OP_CODESEPARATOR in non-witness script", 978 r"Signature is found in scriptCode", 979 } 980 for substring in script_error_messages: 981 if substring in server_msg: 982 return substring 983 # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/validation.cpp 984 # grep "REJECT_" 985 # grep "TxValidationResult" 986 # should come after script_error.cpp (due to e.g. non-mandatory-script-verify-flag) 987 validation_error_messages = { 988 r"coinbase": None, 989 r"tx-size-small": None, 990 r"non-final": None, 991 r"txn-already-in-mempool": None, 992 r"txn-mempool-conflict": None, 993 r"txn-already-known": None, 994 r"non-BIP68-final": None, 995 r"bad-txns-nonstandard-inputs": None, 996 r"bad-witness-nonstandard": None, 997 r"bad-txns-too-many-sigops": None, 998 r"mempool min fee not met": 999 ("mempool min fee not met\n" + 1000 _("Your transaction is paying a fee that is so low that the bitcoin node cannot " 1001 "fit it into its mempool. The mempool is already full of hundreds of megabytes " 1002 "of transactions that all pay higher fees. Try to increase the fee.")), 1003 r"min relay fee not met": None, 1004 r"absurdly-high-fee": None, 1005 r"max-fee-exceeded": None, 1006 r"too-long-mempool-chain": None, 1007 r"bad-txns-spends-conflicting-tx": None, 1008 r"insufficient fee": ("insufficient fee\n" + 1009 _("Your transaction is trying to replace another one in the mempool but it " 1010 "does not meet the rules to do so. Try to increase the fee.")), 1011 r"too many potential replacements": None, 1012 r"replacement-adds-unconfirmed": None, 1013 r"mempool full": None, 1014 r"non-mandatory-script-verify-flag": None, 1015 r"mandatory-script-verify-flag-failed": None, 1016 r"Transaction check failed": None, 1017 } 1018 for substring in validation_error_messages: 1019 if substring in server_msg: 1020 msg = validation_error_messages[substring] 1021 return msg if msg else substring 1022 # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/rpc/rawtransaction.cpp 1023 # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/util/error.cpp 1024 # grep "RPC_TRANSACTION" 1025 # grep "RPC_DESERIALIZATION_ERROR" 1026 rawtransaction_error_messages = { 1027 r"Missing inputs": None, 1028 r"Inputs missing or spent": None, 1029 r"transaction already in block chain": None, 1030 r"Transaction already in block chain": None, 1031 r"TX decode failed": None, 1032 r"Peer-to-peer functionality missing or disabled": None, 1033 r"Transaction rejected by AcceptToMemoryPool": None, 1034 r"AcceptToMemoryPool failed": None, 1035 r"Fee exceeds maximum configured by user": None, 1036 } 1037 for substring in rawtransaction_error_messages: 1038 if substring in server_msg: 1039 msg = rawtransaction_error_messages[substring] 1040 return msg if msg else substring 1041 # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/consensus/tx_verify.cpp 1042 # https://github.com/bitcoin/bitcoin/blob/c7ad94428ab6f54661d7a5441e1fdd0ebf034903/src/consensus/tx_check.cpp 1043 # grep "REJECT_" 1044 # grep "TxValidationResult" 1045 tx_verify_error_messages = { 1046 r"bad-txns-vin-empty": None, 1047 r"bad-txns-vout-empty": None, 1048 r"bad-txns-oversize": None, 1049 r"bad-txns-vout-negative": None, 1050 r"bad-txns-vout-toolarge": None, 1051 r"bad-txns-txouttotal-toolarge": None, 1052 r"bad-txns-inputs-duplicate": None, 1053 r"bad-cb-length": None, 1054 r"bad-txns-prevout-null": None, 1055 r"bad-txns-inputs-missingorspent": 1056 ("bad-txns-inputs-missingorspent\n" + 1057 _("You might have a local transaction in your wallet that this transaction " 1058 "builds on top. You need to either broadcast or remove the local tx.")), 1059 r"bad-txns-premature-spend-of-coinbase": None, 1060 r"bad-txns-inputvalues-outofrange": None, 1061 r"bad-txns-in-belowout": None, 1062 r"bad-txns-fee-outofrange": None, 1063 } 1064 for substring in tx_verify_error_messages: 1065 if substring in server_msg: 1066 msg = tx_verify_error_messages[substring] 1067 return msg if msg else substring 1068 # otherwise: 1069 return _("Unknown error") 1070 1071 @best_effort_reliable 1072 @catch_server_exceptions 1073 async def request_chunk(self, height: int, tip=None, *, can_return_early=False): 1074 return await self.interface.request_chunk(height, tip=tip, can_return_early=can_return_early) 1075 1076 @best_effort_reliable 1077 @catch_server_exceptions 1078 async def get_transaction(self, tx_hash: str, *, timeout=None) -> str: 1079 return await self.interface.get_transaction(tx_hash=tx_hash, timeout=timeout) 1080 1081 @best_effort_reliable 1082 @catch_server_exceptions 1083 async def get_history_for_scripthash(self, sh: str) -> List[dict]: 1084 return await self.interface.get_history_for_scripthash(sh) 1085 1086 @best_effort_reliable 1087 @catch_server_exceptions 1088 async def listunspent_for_scripthash(self, sh: str) -> List[dict]: 1089 return await self.interface.listunspent_for_scripthash(sh) 1090 1091 @best_effort_reliable 1092 @catch_server_exceptions 1093 async def get_balance_for_scripthash(self, sh: str) -> dict: 1094 return await self.interface.get_balance_for_scripthash(sh) 1095 1096 @best_effort_reliable 1097 @catch_server_exceptions 1098 async def get_txid_from_txpos(self, tx_height, tx_pos, merkle): 1099 return await self.interface.get_txid_from_txpos(tx_height, tx_pos, merkle) 1100 1101 def blockchain(self) -> Blockchain: 1102 interface = self.interface 1103 if interface and interface.blockchain is not None: 1104 self._blockchain = interface.blockchain 1105 return self._blockchain 1106 1107 def get_blockchains(self): 1108 out = {} # blockchain_id -> list(interfaces) 1109 with blockchain.blockchains_lock: blockchain_items = list(blockchain.blockchains.items()) 1110 with self.interfaces_lock: interfaces_values = list(self.interfaces.values()) 1111 for chain_id, bc in blockchain_items: 1112 r = list(filter(lambda i: i.blockchain==bc, interfaces_values)) 1113 if r: 1114 out[chain_id] = r 1115 return out 1116 1117 def _set_preferred_chain(self, chain: Optional[Blockchain]): 1118 if chain: 1119 height = chain.get_max_forkpoint() 1120 header_hash = chain.get_hash(height) 1121 else: 1122 height = 0 1123 header_hash = constants.net.GENESIS 1124 self._blockchain_preferred_block = { 1125 'height': height, 1126 'hash': header_hash, 1127 } 1128 self.config.set_key('blockchain_preferred_block', self._blockchain_preferred_block) 1129 1130 async def follow_chain_given_id(self, chain_id: str) -> None: 1131 bc = blockchain.blockchains.get(chain_id) 1132 if not bc: 1133 raise Exception('blockchain {} not found'.format(chain_id)) 1134 self._set_preferred_chain(bc) 1135 # select server on this chain 1136 with self.interfaces_lock: interfaces = list(self.interfaces.values()) 1137 interfaces_on_selected_chain = list(filter(lambda iface: iface.blockchain == bc, interfaces)) 1138 if len(interfaces_on_selected_chain) == 0: return 1139 chosen_iface = random.choice(interfaces_on_selected_chain) # type: Interface 1140 # switch to server (and save to config) 1141 net_params = self.get_parameters() 1142 net_params = net_params._replace(server=chosen_iface.server) 1143 await self.set_parameters(net_params) 1144 1145 async def follow_chain_given_server(self, server: ServerAddr) -> None: 1146 # note that server_str should correspond to a connected interface 1147 iface = self.interfaces.get(server) 1148 if iface is None: 1149 return 1150 self._set_preferred_chain(iface.blockchain) 1151 # switch to server (and save to config) 1152 net_params = self.get_parameters() 1153 net_params = net_params._replace(server=server) 1154 await self.set_parameters(net_params) 1155 1156 def get_server_height(self) -> int: 1157 """Length of header chain, as claimed by main interface.""" 1158 interface = self.interface 1159 return interface.tip if interface else 0 1160 1161 def get_local_height(self): 1162 """Length of header chain, POW-verified. 1163 In case of a chain split, this is for the branch the main interface is on, 1164 but it is the tip of that branch (even if main interface is behind). 1165 """ 1166 return self.blockchain().height() 1167 1168 def export_checkpoints(self, path): 1169 """Run manually to generate blockchain checkpoints. 1170 Kept for console use only. 1171 """ 1172 cp = self.blockchain().get_checkpoints() 1173 with open(path, 'w', encoding='utf-8') as f: 1174 f.write(json.dumps(cp, indent=4)) 1175 1176 async def _start(self): 1177 assert not self.taskgroup 1178 self.taskgroup = taskgroup = SilentTaskGroup() 1179 assert not self.interface and not self.interfaces 1180 assert not self._connecting_ifaces 1181 assert not self._closing_ifaces 1182 self.logger.info('starting network') 1183 self._clear_addr_retry_times() 1184 self._set_proxy(deserialize_proxy(self.config.get('proxy'))) 1185 self._maybe_set_oneserver() 1186 await self.taskgroup.spawn(self._run_new_interface(self.default_server)) 1187 1188 async def main(): 1189 self.logger.info("starting taskgroup.") 1190 try: 1191 # note: if a task finishes with CancelledError, that 1192 # will NOT raise, and the group will keep the other tasks running 1193 async with taskgroup as group: 1194 await group.spawn(self._maintain_sessions()) 1195 [await group.spawn(job) for job in self._jobs] 1196 except asyncio.CancelledError: 1197 raise 1198 except Exception as e: 1199 self.logger.exception("taskgroup died.") 1200 finally: 1201 self.logger.info("taskgroup stopped.") 1202 asyncio.run_coroutine_threadsafe(main(), self.asyncio_loop) 1203 1204 util.trigger_callback('network_updated') 1205 1206 def start(self, jobs: Iterable = None): 1207 """Schedule starting the network, along with the given job co-routines. 1208 1209 Note: the jobs will *restart* every time the network restarts, e.g. on proxy 1210 setting changes. 1211 """ 1212 self._jobs = jobs or [] 1213 asyncio.run_coroutine_threadsafe(self._start(), self.asyncio_loop) 1214 1215 @log_exceptions 1216 async def stop(self, *, full_shutdown: bool = True): 1217 self.logger.info("stopping network") 1218 # timeout: if full_shutdown, it is up to the caller to time us out, 1219 # otherwise if e.g. restarting due to proxy changes, we time out fast 1220 async with (nullcontext() if full_shutdown else ignore_after(1)): 1221 async with TaskGroup() as group: 1222 await group.spawn(self.taskgroup.cancel_remaining()) 1223 if full_shutdown: 1224 await group.spawn(self.stop_gossip(full_shutdown=full_shutdown)) 1225 self.taskgroup = None 1226 self.interface = None 1227 self.interfaces = {} 1228 self._connecting_ifaces.clear() 1229 self._closing_ifaces.clear() 1230 if not full_shutdown: 1231 util.trigger_callback('network_updated') 1232 1233 async def _ensure_there_is_a_main_interface(self): 1234 if self.is_connected(): 1235 return 1236 # if auto_connect is set, try a different server 1237 if self.auto_connect and not self.is_connecting(): 1238 await self._switch_to_random_interface() 1239 # if auto_connect is not set, or still no main interface, retry current 1240 if not self.is_connected() and not self.is_connecting(): 1241 if self._can_retry_addr(self.default_server, urgent=True): 1242 await self.switch_to_interface(self.default_server) 1243 1244 async def _maintain_sessions(self): 1245 async def maybe_start_new_interfaces(): 1246 num_existing_ifaces = len(self.interfaces) + len(self._connecting_ifaces) + len(self._closing_ifaces) 1247 for i in range(self.num_server - num_existing_ifaces): 1248 # FIXME this should try to honour "healthy spread of connected servers" 1249 server = self._get_next_server_to_try() 1250 if server: 1251 await self.taskgroup.spawn(self._run_new_interface(server)) 1252 async def maintain_healthy_spread_of_connected_servers(): 1253 with self.interfaces_lock: interfaces = list(self.interfaces.values()) 1254 random.shuffle(interfaces) 1255 for iface in interfaces: 1256 if not self.check_interface_against_healthy_spread_of_connected_servers(iface): 1257 self.logger.info(f"disconnecting from {iface.server}. too many connected " 1258 f"servers already in bucket {iface.bucket_based_on_ipaddress()}") 1259 await self._close_interface(iface) 1260 async def maintain_main_interface(): 1261 await self._ensure_there_is_a_main_interface() 1262 if self.is_connected(): 1263 if self.config.is_fee_estimates_update_required(): 1264 await self.interface.taskgroup.spawn(self._request_fee_estimates, self.interface) 1265 1266 while True: 1267 try: 1268 await maybe_start_new_interfaces() 1269 await maintain_healthy_spread_of_connected_servers() 1270 await maintain_main_interface() 1271 except asyncio.CancelledError: 1272 # suppress spurious cancellations 1273 group = self.taskgroup 1274 if not group or group.closed(): 1275 raise 1276 await asyncio.sleep(0.1) 1277 1278 @classmethod 1279 async def _send_http_on_proxy(cls, method: str, url: str, params: str = None, 1280 body: bytes = None, json: dict = None, headers=None, 1281 on_finish=None, timeout=None): 1282 async def default_on_finish(resp: ClientResponse): 1283 resp.raise_for_status() 1284 return await resp.text() 1285 if headers is None: 1286 headers = {} 1287 if on_finish is None: 1288 on_finish = default_on_finish 1289 network = cls.get_instance() 1290 proxy = network.proxy if network else None 1291 async with make_aiohttp_session(proxy, timeout=timeout) as session: 1292 if method == 'get': 1293 async with session.get(url, params=params, headers=headers) as resp: 1294 return await on_finish(resp) 1295 elif method == 'post': 1296 assert body is not None or json is not None, 'body or json must be supplied if method is post' 1297 if body is not None: 1298 async with session.post(url, data=body, headers=headers) as resp: 1299 return await on_finish(resp) 1300 elif json is not None: 1301 async with session.post(url, json=json, headers=headers) as resp: 1302 return await on_finish(resp) 1303 else: 1304 assert False 1305 1306 @classmethod 1307 def send_http_on_proxy(cls, method, url, **kwargs): 1308 network = cls.get_instance() 1309 if network: 1310 assert network._loop_thread is not threading.currentThread() 1311 loop = network.asyncio_loop 1312 else: 1313 loop = asyncio.get_event_loop() 1314 coro = asyncio.run_coroutine_threadsafe(cls._send_http_on_proxy(method, url, **kwargs), loop) 1315 # note: _send_http_on_proxy has its own timeout, so no timeout here: 1316 return coro.result() 1317 1318 # methods used in scripts 1319 async def get_peers(self): 1320 while not self.is_connected(): 1321 await asyncio.sleep(1) 1322 session = self.interface.session 1323 # TODO: libbitcoin 1324 return parse_servers(await session.send_request('server.peers.subscribe')) 1325 1326 async def send_multiple_requests(self, servers: Sequence[ServerAddr], method: str, params: Sequence): 1327 responses = dict() 1328 async def get_response(server: ServerAddr): 1329 interface = Interface(network=self, server=server, proxy=self.proxy) 1330 timeout = self.get_network_timeout_seconds(NetworkTimeout.Urgent) 1331 try: 1332 await asyncio.wait_for(interface.ready, timeout) 1333 except BaseException as e: 1334 await interface.close() 1335 return 1336 try: 1337 # TODO: libbitcoin XXX: 1338 res = await interface.session.send_request(method, params, timeout=10) 1339 except Exception as e: 1340 res = e 1341 responses[interface.server] = res 1342 async with TaskGroup() as group: 1343 for server in servers: 1344 await group.spawn(get_response(server)) 1345 return responses