electrum

Electrum Bitcoin wallet
git clone https://git.parazyd.org/electrum
Log | Files | Refs | Submodules

network.py (58124B)


      1 # Electrum - Lightweight Bitcoin Client
      2 # Copyright (c) 2011-2016 Thomas Voegtlin
      3 #
      4 # Permission is hereby granted, free of charge, to any person
      5 # obtaining a copy of this software and associated documentation files
      6 # (the "Software"), to deal in the Software without restriction,
      7 # including without limitation the rights to use, copy, modify, merge,
      8 # publish, distribute, sublicense, and/or sell copies of the Software,
      9 # and to permit persons to whom the Software is furnished to do so,
     10 # subject to the following conditions:
     11 #
     12 # The above copyright notice and this permission notice shall be
     13 # included in all copies or substantial portions of the Software.
     14 #
     15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
     16 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
     17 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
     18 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
     19 # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
     20 # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
     21 # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
     22 # SOFTWARE.
     23 import asyncio
     24 import time
     25 import queue
     26 import os
     27 import random
     28 import re
     29 from collections import defaultdict
     30 import threading
     31 import socket
     32 import json
     33 import sys
     34 import asyncio
     35 from typing import NamedTuple, Optional, Sequence, List, Dict, Tuple, TYPE_CHECKING, Iterable, Set, Any
     36 import traceback
     37 import concurrent
     38 from concurrent import futures
     39 import copy
     40 import functools
     41 
     42 import aiorpcx
     43 from aiorpcx import TaskGroup, ignore_after
     44 from aiohttp import ClientResponse
     45 
     46 from . import util
     47 from .util import (log_exceptions, ignore_exceptions,
     48                    bfh, SilentTaskGroup, make_aiohttp_session, send_exception_to_crash_reporter,
     49                    is_hash256_str, is_non_negative_integer, MyEncoder, NetworkRetryManager,
     50                    nullcontext)
     51 from .bitcoin import COIN
     52 from . import constants
     53 from . import blockchain
     54 from . import bitcoin
     55 from . import dns_hacks
     56 from .transaction import Transaction
     57 from .blockchain import Blockchain, HEADER_SIZE
     58 from .interface import (Interface, PREFERRED_NETWORK_PROTOCOL,
     59                         RequestTimedOut, NetworkTimeout, BUCKET_NAME_OF_ONION_SERVERS,
     60                         NetworkException, RequestCorrupted, ServerAddr)
     61 from .version import PROTOCOL_VERSION
     62 from .simple_config import SimpleConfig
     63 from .i18n import _
     64 from .logging import get_logger, Logger
     65 from .lnutil import ChannelBlackList
     66 
     67 if TYPE_CHECKING:
     68     from .channel_db import ChannelDB
     69     from .lnworker import LNGossip
     70     from .lnwatcher import WatchTower
     71     from .daemon import Daemon
     72 
     73 
     74 _logger = get_logger(__name__)
     75 
     76 
     77 NUM_TARGET_CONNECTED_SERVERS = 10
     78 NUM_STICKY_SERVERS = 4
     79 NUM_RECENT_SERVERS = 20
     80 
     81 
     82 def parse_servers(result: Sequence[Tuple[str, str, List[str]]]) -> Dict[str, dict]:
     83     """Convert servers list (from protocol method "server.peers.subscribe") into dict format.
     84     Also validate values, such as IP addresses and ports.
     85     """
     86     servers = {}
     87     for item in result:
     88         host = item[1]
     89         out = {}
     90         version = None
     91         pruning_level = '-'
     92         if len(item) > 2:
     93             for v in item[2]:
     94                 if re.match(r"[st]\d*", v):
     95                     protocol, port = v[0], v[1:]
     96                     if port == '': port = constants.net.DEFAULT_PORTS[protocol]
     97                     ServerAddr(host, port, protocol=protocol)  # check if raises
     98                     out[protocol] = port
     99                 elif re.match("v(.?)+", v):
    100                     version = v[1:]
    101                 elif re.match(r"p\d*", v):
    102                     pruning_level = v[1:]
    103                 if pruning_level == '': pruning_level = '0'
    104         if out:
    105             out['pruning'] = pruning_level
    106             out['version'] = version
    107             servers[host] = out
    108     return servers
    109 
    110 
    111 def filter_version(servers):
    112     def is_recent(version):
    113         try:
    114             return util.versiontuple(version) >= util.versiontuple(PROTOCOL_VERSION)
    115         except Exception as e:
    116             return False
    117     return {k: v for k, v in servers.items() if is_recent(v.get('version'))}
    118 
    119 
    120 def filter_noonion(servers):
    121     return {k: v for k, v in servers.items() if not k.endswith('.onion')}
    122 
    123 
    124 def filter_protocol(hostmap, *, allowed_protocols: Iterable[str] = None) -> Sequence[ServerAddr]:
    125     """Filters the hostmap for those implementing protocol."""
    126     if allowed_protocols is None:
    127         allowed_protocols = {PREFERRED_NETWORK_PROTOCOL}
    128     eligible = []
    129     for host, portmap in hostmap.items():
    130         for protocol in allowed_protocols:
    131             port = portmap.get(protocol)
    132             if port:
    133                 eligible.append(ServerAddr(host, port, protocol=protocol))
    134     return eligible
    135 
    136 
    137 def pick_random_server(hostmap=None, *, allowed_protocols: Iterable[str],
    138                        exclude_set: Set[ServerAddr] = None) -> Optional[ServerAddr]:
    139     if hostmap is None:
    140         hostmap = constants.net.DEFAULT_SERVERS
    141     if exclude_set is None:
    142         exclude_set = set()
    143     servers = set(filter_protocol(hostmap, allowed_protocols=allowed_protocols))
    144     eligible = list(servers - exclude_set)
    145     return random.choice(eligible) if eligible else None
    146 
    147 
    148 class NetworkParameters(NamedTuple):
    149     server: ServerAddr
    150     proxy: Optional[dict]
    151     auto_connect: bool
    152     oneserver: bool = False
    153 
    154 
    155 proxy_modes = ['socks4', 'socks5']
    156 
    157 
    158 def serialize_proxy(p):
    159     if not isinstance(p, dict):
    160         return None
    161     return ':'.join([p.get('mode'), p.get('host'), p.get('port'),
    162                      p.get('user', ''), p.get('password', '')])
    163 
    164 
    165 def deserialize_proxy(s: str) -> Optional[dict]:
    166     if not isinstance(s, str):
    167         return None
    168     if s.lower() == 'none':
    169         return None
    170     proxy = { "mode":"socks5", "host":"localhost" }
    171     # FIXME raw IPv6 address fails here
    172     args = s.split(':')
    173     n = 0
    174     if proxy_modes.count(args[n]) == 1:
    175         proxy["mode"] = args[n]
    176         n += 1
    177     if len(args) > n:
    178         proxy["host"] = args[n]
    179         n += 1
    180     if len(args) > n:
    181         proxy["port"] = args[n]
    182         n += 1
    183     else:
    184         proxy["port"] = "8080" if proxy["mode"] == "http" else "1080"
    185     if len(args) > n:
    186         proxy["user"] = args[n]
    187         n += 1
    188     if len(args) > n:
    189         proxy["password"] = args[n]
    190     return proxy
    191 
    192 
    193 class BestEffortRequestFailed(NetworkException): pass
    194 
    195 
    196 class TxBroadcastError(NetworkException):
    197     def get_message_for_gui(self):
    198         raise NotImplementedError()
    199 
    200 
    201 class TxBroadcastHashMismatch(TxBroadcastError):
    202     def get_message_for_gui(self):
    203         return "{}\n{}\n\n{}" \
    204             .format(_("The server returned an unexpected transaction ID when broadcasting the transaction."),
    205                     _("Consider trying to connect to a different server, or updating Electrum."),
    206                     str(self))
    207 
    208 
    209 class TxBroadcastServerReturnedError(TxBroadcastError):
    210     def get_message_for_gui(self):
    211         return "{}\n{}\n\n{}" \
    212             .format(_("The server returned an error when broadcasting the transaction."),
    213                     _("Consider trying to connect to a different server, or updating Electrum."),
    214                     str(self))
    215 
    216 
    217 class TxBroadcastUnknownError(TxBroadcastError):
    218     def get_message_for_gui(self):
    219         return "{}\n{}" \
    220             .format(_("Unknown error when broadcasting the transaction."),
    221                     _("Consider trying to connect to a different server, or updating Electrum."))
    222 
    223 
    224 class UntrustedServerReturnedError(NetworkException):
    225     def __init__(self, *, original_exception):
    226         self.original_exception = original_exception
    227 
    228     def get_message_for_gui(self) -> str:
    229         return str(self)
    230 
    231     def __str__(self):
    232         return _("The server returned an error.")
    233 
    234     def __repr__(self):
    235         return (f"<UntrustedServerReturnedError "
    236                 f"[DO NOT TRUST THIS MESSAGE] original_exception: {repr(self.original_exception)}>")
    237 
    238 
    239 _INSTANCE = None
    240 
    241 
    242 class Network(Logger, NetworkRetryManager[ServerAddr]):
    243     """The Network class manages a set of connections to remote electrum
    244     servers, each connected socket is handled by an Interface() object.
    245     """
    246 
    247     LOGGING_SHORTCUT = 'n'
    248 
    249     taskgroup: Optional[TaskGroup]
    250     interface: Optional[Interface]
    251     interfaces: Dict[ServerAddr, Interface]
    252     _connecting_ifaces: Set[ServerAddr]
    253     _closing_ifaces: Set[ServerAddr]
    254     default_server: ServerAddr
    255     _recent_servers: List[ServerAddr]
    256 
    257     channel_blacklist: 'ChannelBlackList'
    258     channel_db: Optional['ChannelDB'] = None
    259     lngossip: Optional['LNGossip'] = None
    260     local_watchtower: Optional['WatchTower'] = None
    261 
    262     def __init__(self, config: SimpleConfig, *, daemon: 'Daemon' = None):
    263         global _INSTANCE
    264         assert _INSTANCE is None, "Network is a singleton!"
    265         _INSTANCE = self
    266 
    267         Logger.__init__(self)
    268         NetworkRetryManager.__init__(
    269             self,
    270             max_retry_delay_normal=600,
    271             init_retry_delay_normal=15,
    272             max_retry_delay_urgent=10,
    273             init_retry_delay_urgent=1,
    274         )
    275 
    276         self.asyncio_loop = asyncio.get_event_loop()
    277         assert self.asyncio_loop.is_running(), "event loop not running"
    278         try:
    279             self._loop_thread = self.asyncio_loop._mythread  # type: threading.Thread  # only used for sanity checks
    280         except AttributeError as e:
    281             self.logger.warning(f"asyncio loop does not have _mythread set: {e!r}")
    282             self._loop_thread = None
    283 
    284         assert isinstance(config, SimpleConfig), f"config should be a SimpleConfig instead of {type(config)}"
    285         self.config = config
    286 
    287         self.daemon = daemon
    288 
    289         blockchain.read_blockchains(self.config)
    290         blockchain.init_headers_file_for_best_chain()
    291         self.logger.info(f"blockchains {list(map(lambda b: b.forkpoint, blockchain.blockchains.values()))}")
    292         self._blockchain_preferred_block = self.config.get('blockchain_preferred_block', None)  # type: Dict[str, Any]
    293         if self._blockchain_preferred_block is None:
    294             self._set_preferred_chain(None)
    295         self._blockchain = blockchain.get_best_chain()
    296 
    297         self._allowed_protocols = {PREFERRED_NETWORK_PROTOCOL}
    298 
    299         # Server for addresses and transactions
    300         self.default_server = self.config.get('server', None)
    301         # Sanitize default server
    302         if self.default_server:
    303             try:
    304                 self.default_server = ServerAddr.from_str(self.default_server)
    305             except:
    306                 self.logger.warning('failed to parse server-string; falling back to localhost:1:s.')
    307                 self.default_server = ServerAddr.from_str("localhost:1:s")
    308         else:
    309             self.default_server = pick_random_server(allowed_protocols=self._allowed_protocols)
    310         assert isinstance(self.default_server, ServerAddr), f"invalid type for default_server: {self.default_server!r}"
    311 
    312         self.taskgroup = None
    313 
    314         # locks
    315         self.restart_lock = asyncio.Lock()
    316         self.bhi_lock = asyncio.Lock()
    317         self.recent_servers_lock = threading.RLock()       # <- re-entrant
    318         self.interfaces_lock = threading.Lock()            # for mutating/iterating self.interfaces
    319 
    320         self.server_peers = {}  # returned by interface (servers that the main interface knows about)
    321         self._recent_servers = self._read_recent_servers()  # note: needs self.recent_servers_lock
    322 
    323         self.banner = ''
    324         self.donation_address = ''
    325         self.relay_fee = None  # type: Optional[int]
    326 
    327         dir_path = os.path.join(self.config.path, 'certs')
    328         util.make_dir(dir_path)
    329 
    330         # the main server we are currently communicating with
    331         self.interface = None
    332         self.default_server_changed_event = asyncio.Event()
    333         # Set of servers we have an ongoing connection with.
    334         # For any ServerAddr, at most one corresponding Interface object
    335         # can exist at any given time. Depending on the state of that Interface,
    336         # the ServerAddr can be found in one of the following sets.
    337         # Note: during a transition, the ServerAddr can appear in two sets momentarily.
    338         self._connecting_ifaces = set()
    339         self.interfaces = {}  # these are the ifaces in "initialised and usable" state
    340         self._closing_ifaces = set()
    341 
    342         self.auto_connect = self.config.get('auto_connect', True)
    343         self.proxy = None
    344         self._maybe_set_oneserver()
    345 
    346         # Dump network messages (all interfaces).  Set at runtime from the console.
    347         self.debug = False
    348 
    349         self._set_status('disconnected')
    350         self._has_ever_managed_to_connect_to_server = False
    351 
    352         # lightning network
    353         self.channel_blacklist = ChannelBlackList()
    354         if self.config.get('run_local_watchtower', False):
    355             from . import lnwatcher
    356             self.local_watchtower = lnwatcher.WatchTower(self)
    357             self.local_watchtower.start_network(self)
    358             asyncio.ensure_future(self.local_watchtower.start_watching())
    359 
    360     def has_internet_connection(self) -> bool:
    361         """Our guess whether the device has Internet-connectivity."""
    362         return self._has_ever_managed_to_connect_to_server
    363 
    364     def has_channel_db(self):
    365         return self.channel_db is not None
    366 
    367     def start_gossip(self):
    368         from . import lnrouter
    369         from . import channel_db
    370         from . import lnworker
    371         if not self.config.get('use_gossip'):
    372             return
    373         if self.lngossip is None:
    374             self.channel_db = channel_db.ChannelDB(self)
    375             self.path_finder = lnrouter.LNPathFinder(self.channel_db)
    376             self.channel_db.load_data()
    377             self.lngossip = lnworker.LNGossip()
    378             self.lngossip.start_network(self)
    379 
    380     async def stop_gossip(self, *, full_shutdown: bool = False):
    381         if self.lngossip:
    382             await self.lngossip.stop()
    383             self.lngossip = None
    384             self.channel_db.stop()
    385             if full_shutdown:
    386                 await self.channel_db.stopped_event.wait()
    387             self.channel_db = None
    388 
    389     def run_from_another_thread(self, coro, *, timeout=None):
    390         assert self._loop_thread != threading.current_thread(), 'must not be called from network thread'
    391         fut = asyncio.run_coroutine_threadsafe(coro, self.asyncio_loop)
    392         return fut.result(timeout)
    393 
    394     @staticmethod
    395     def get_instance() -> Optional["Network"]:
    396         return _INSTANCE
    397 
    398     def with_recent_servers_lock(func):
    399         def func_wrapper(self, *args, **kwargs):
    400             with self.recent_servers_lock:
    401                 return func(self, *args, **kwargs)
    402         return func_wrapper
    403 
    404     def _read_recent_servers(self) -> List[ServerAddr]:
    405         if not self.config.path:
    406             return []
    407         path = os.path.join(self.config.path, "recent_servers")
    408         try:
    409             with open(path, "r", encoding='utf-8') as f:
    410                 data = f.read()
    411                 servers_list = json.loads(data)
    412             return [ServerAddr.from_str(s) for s in servers_list]
    413         except:
    414             return []
    415 
    416     @with_recent_servers_lock
    417     def _save_recent_servers(self):
    418         if not self.config.path:
    419             return
    420         path = os.path.join(self.config.path, "recent_servers")
    421         s = json.dumps(self._recent_servers, indent=4, sort_keys=True, cls=MyEncoder)
    422         try:
    423             with open(path, "w", encoding='utf-8') as f:
    424                 f.write(s)
    425         except:
    426             pass
    427 
    428     async def _server_is_lagging(self) -> bool:
    429         sh = self.get_server_height()
    430         if not sh:
    431             self.logger.info('no height for main interface')
    432             return True
    433         lh = self.get_local_height()
    434         result = (lh - sh) > 1
    435         if result:
    436             self.logger.info(f'{self.default_server} is lagging ({sh} vs {lh})')
    437         return result
    438 
    439     def _set_status(self, status):
    440         self.connection_status = status
    441         self.notify('status')
    442 
    443     def is_connected(self):
    444         interface = self.interface
    445         return interface is not None and interface.ready.done()
    446 
    447     def is_connecting(self):
    448         return self.connection_status == 'connecting'
    449 
    450     async def _request_server_info(self, interface: 'Interface'):
    451         await interface.ready
    452         # TODO: libbitcoin: session = interface.session
    453 
    454         async def get_banner():
    455             self.banner = await interface.get_server_banner()
    456             self.notify('banner')
    457         async def get_donation_address():
    458             self.donation_address = await interface.get_donation_address()
    459         async def get_server_peers():
    460             # ORIG: server_peers = await session.send_request('server.peers.subscribe')
    461             # TODO: libbitcoin
    462             server_peers = []
    463             random.shuffle(server_peers)
    464             max_accepted_peers = len(constants.net.DEFAULT_SERVERS) + NUM_RECENT_SERVERS
    465             server_peers = server_peers[:max_accepted_peers]
    466             # note that 'parse_servers' also validates the data (which is untrusted input!)
    467             self.server_peers = parse_servers(server_peers)
    468             self.notify('servers')
    469         async def get_relay_fee():
    470             self.relay_fee = await interface.get_relay_fee()
    471 
    472         async with TaskGroup() as group:
    473             await group.spawn(get_banner)
    474             await group.spawn(get_donation_address)
    475             await group.spawn(get_server_peers)
    476             await group.spawn(get_relay_fee)
    477             await group.spawn(self._request_fee_estimates(interface))
    478 
    479     async def _request_fee_estimates(self, interface):
    480         self.config.requested_fee_estimates()
    481         histogram = await interface.get_fee_histogram()
    482         self.config.mempool_fees = histogram
    483         self.logger.info(f'fee_histogram {histogram}')
    484         self.notify('fee_histogram')
    485 
    486     def get_status_value(self, key):
    487         if key == 'status':
    488             value = self.connection_status
    489         elif key == 'banner':
    490             value = self.banner
    491         elif key == 'fee':
    492             value = self.config.fee_estimates
    493         elif key == 'fee_histogram':
    494             value = self.config.mempool_fees
    495         elif key == 'servers':
    496             value = self.get_servers()
    497         else:
    498             raise Exception('unexpected trigger key {}'.format(key))
    499         return value
    500 
    501     def notify(self, key):
    502         if key in ['status', 'updated']:
    503             util.trigger_callback(key)
    504         else:
    505             util.trigger_callback(key, self.get_status_value(key))
    506 
    507     def get_parameters(self) -> NetworkParameters:
    508         return NetworkParameters(server=self.default_server,
    509                                  proxy=self.proxy,
    510                                  auto_connect=self.auto_connect,
    511                                  oneserver=self.oneserver)
    512 
    513     def get_donation_address(self):
    514         if self.is_connected():
    515             return self.donation_address
    516 
    517     def get_interfaces(self) -> List[ServerAddr]:
    518         """The list of servers for the connected interfaces."""
    519         with self.interfaces_lock:
    520             return list(self.interfaces)
    521 
    522     def get_fee_estimates(self):
    523         from statistics import median
    524         from .simple_config import FEE_ETA_TARGETS
    525         if self.auto_connect:
    526             with self.interfaces_lock:
    527                 out = {}
    528                 for n in FEE_ETA_TARGETS:
    529                     try:
    530                         out[n] = int(median(filter(None, [i.fee_estimates_eta.get(n) for i in self.interfaces.values()])))
    531                     except:
    532                         continue
    533                 return out
    534         else:
    535             if not self.interface:
    536                 return {}
    537             return self.interface.fee_estimates_eta
    538 
    539     def update_fee_estimates(self):
    540         e = self.get_fee_estimates()
    541         for nblock_target, fee in e.items():
    542             self.config.update_fee_estimates(nblock_target, fee)
    543         if not hasattr(self, "_prev_fee_est") or self._prev_fee_est != e:
    544             self._prev_fee_est = copy.copy(e)
    545             self.logger.info(f'fee_estimates {e}')
    546         self.notify('fee')
    547 
    548     @with_recent_servers_lock
    549     def get_servers(self):
    550         # note: order of sources when adding servers here is crucial!
    551         # don't let "server_peers" overwrite anything,
    552         # otherwise main server can eclipse the client
    553         out = dict()
    554         # add servers received from main interface
    555         server_peers = self.server_peers
    556         if server_peers:
    557             out.update(filter_version(server_peers.copy()))
    558         # hardcoded servers
    559         out.update(constants.net.DEFAULT_SERVERS)
    560         # add recent servers
    561         for server in self._recent_servers:
    562             port = str(server.port)
    563             if server.host in out:
    564                 out[server.host].update({server.protocol: port})
    565             else:
    566                 out[server.host] = {server.protocol: port}
    567         # potentially filter out some
    568         if self.config.get('noonion'):
    569             out = filter_noonion(out)
    570         return out
    571 
    572     def _get_next_server_to_try(self) -> Optional[ServerAddr]:
    573         now = time.time()
    574         with self.interfaces_lock:
    575             connected_servers = set(self.interfaces) | self._connecting_ifaces | self._closing_ifaces
    576         # First try from recent servers. (which are persisted)
    577         # As these are servers we successfully connected to recently, they are
    578         # most likely to work. This also makes servers "sticky".
    579         # Note: with sticky servers, it is more difficult for an attacker to eclipse the client,
    580         #       however if they succeed, the eclipsing would persist. To try to balance this,
    581         #       we only give priority to recent_servers up to NUM_STICKY_SERVERS.
    582         with self.recent_servers_lock:
    583             recent_servers = list(self._recent_servers)
    584         recent_servers = [s for s in recent_servers if s.protocol in self._allowed_protocols]
    585         if len(connected_servers & set(recent_servers)) < NUM_STICKY_SERVERS:
    586             for server in recent_servers:
    587                 if server in connected_servers:
    588                     continue
    589                 if not self._can_retry_addr(server, now=now):
    590                     continue
    591                 return server
    592         # try all servers we know about, pick one at random
    593         hostmap = self.get_servers()
    594         servers = list(set(filter_protocol(hostmap, allowed_protocols=self._allowed_protocols)) - connected_servers)
    595         random.shuffle(servers)
    596         for server in servers:
    597             if not self._can_retry_addr(server, now=now):
    598                 continue
    599             return server
    600         return None
    601 
    602     def _set_proxy(self, proxy: Optional[dict]):
    603         self.proxy = proxy
    604         dns_hacks.configure_dns_depending_on_proxy(bool(proxy))
    605         self.logger.info(f'setting proxy {proxy}')
    606         util.trigger_callback('proxy_set', self.proxy)
    607 
    608     @log_exceptions
    609     async def set_parameters(self, net_params: NetworkParameters):
    610         proxy = net_params.proxy
    611         proxy_str = serialize_proxy(proxy)
    612         server = net_params.server
    613         # sanitize parameters
    614         try:
    615             if proxy:
    616                 proxy_modes.index(proxy['mode']) + 1
    617                 int(proxy['port'])
    618         except:
    619             return
    620         self.config.set_key('auto_connect', net_params.auto_connect, False)
    621         self.config.set_key('oneserver', net_params.oneserver, False)
    622         self.config.set_key('proxy', proxy_str, False)
    623         self.config.set_key('server', str(server), True)
    624         # abort if changes were not allowed by config
    625         if self.config.get('server') != str(server) \
    626                 or self.config.get('proxy') != proxy_str \
    627                 or self.config.get('oneserver') != net_params.oneserver:
    628             return
    629 
    630         async with self.restart_lock:
    631             self.auto_connect = net_params.auto_connect
    632             if self.proxy != proxy or self.oneserver != net_params.oneserver:
    633                 # Restart the network defaulting to the given server
    634                 await self.stop(full_shutdown=False)
    635                 self.default_server = server
    636                 await self._start()
    637             elif self.default_server != server:
    638                 await self.switch_to_interface(server)
    639             else:
    640                 await self.switch_lagging_interface()
    641 
    642     def _maybe_set_oneserver(self) -> None:
    643         oneserver = bool(self.config.get('oneserver', False))
    644         self.oneserver = oneserver
    645         self.num_server = NUM_TARGET_CONNECTED_SERVERS if not oneserver else 0
    646 
    647     async def _switch_to_random_interface(self):
    648         '''Switch to a random connected server other than the current one'''
    649         servers = self.get_interfaces()    # Those in connected state
    650         if self.default_server in servers:
    651             servers.remove(self.default_server)
    652         if servers:
    653             await self.switch_to_interface(random.choice(servers))
    654 
    655     async def switch_lagging_interface(self):
    656         """If auto_connect and lagging, switch interface (only within fork)."""
    657         if self.auto_connect and await self._server_is_lagging():
    658             # switch to one that has the correct header (not height)
    659             best_header = self.blockchain().header_at_tip()
    660             with self.interfaces_lock: interfaces = list(self.interfaces.values())
    661             filtered = list(filter(lambda iface: iface.tip_header == best_header, interfaces))
    662             if filtered:
    663                 chosen_iface = random.choice(filtered)
    664                 await self.switch_to_interface(chosen_iface.server)
    665 
    666     async def switch_unwanted_fork_interface(self) -> None:
    667         """If auto_connect, maybe switch to another fork/chain."""
    668         if not self.auto_connect or not self.interface:
    669             return
    670         with self.interfaces_lock: interfaces = list(self.interfaces.values())
    671         pref_height = self._blockchain_preferred_block['height']
    672         pref_hash   = self._blockchain_preferred_block['hash']
    673         # shortcut for common case
    674         if pref_height == 0:
    675             return
    676         # maybe try switching chains; starting with most desirable first
    677         matching_chains = blockchain.get_chains_that_contain_header(pref_height, pref_hash)
    678         chains_to_try = list(matching_chains) + [blockchain.get_best_chain()]
    679         for rank, chain in enumerate(chains_to_try):
    680             # check if main interface is already on this fork
    681             if self.interface.blockchain == chain:
    682                 return
    683             # switch to another random interface that is on this fork, if any
    684             filtered = [iface for iface in interfaces
    685                         if iface.blockchain == chain]
    686             if filtered:
    687                 self.logger.info(f"switching to (more) preferred fork (rank {rank})")
    688                 chosen_iface = random.choice(filtered)
    689                 await self.switch_to_interface(chosen_iface.server)
    690                 return
    691         self.logger.info("tried to switch to (more) preferred fork but no interfaces are on any")
    692 
    693     async def switch_to_interface(self, server: ServerAddr):
    694         """Switch to server as our main interface. If no connection exists,
    695         queue interface to be started. The actual switch will
    696         happen when the interface becomes ready.
    697         """
    698         self.default_server = server
    699         old_interface = self.interface
    700         old_server = old_interface.server if old_interface else None
    701 
    702         # Stop any current interface in order to terminate subscriptions,
    703         # and to cancel tasks in interface.taskgroup.
    704         if old_server and old_server != server:
    705             # don't wait for old_interface to close as that might be slow:
    706             await self.taskgroup.spawn(self._close_interface(old_interface))
    707 
    708         if server not in self.interfaces:
    709             self.interface = None
    710             await self.taskgroup.spawn(self._run_new_interface(server))
    711             return
    712 
    713         i = self.interfaces[server]
    714         if old_interface != i:
    715             self.logger.info(f"switching to {server}")
    716             assert i.ready.done(), "interface we are switching to is not ready yet"
    717             blockchain_updated = i.blockchain != self.blockchain()
    718             self.interface = i
    719             await i.taskgroup.spawn(self._request_server_info(i))
    720             util.trigger_callback('default_server_changed')
    721             self.default_server_changed_event.set()
    722             self.default_server_changed_event.clear()
    723             self._set_status('connected')
    724             util.trigger_callback('network_updated')
    725             if blockchain_updated:
    726                 util.trigger_callback('blockchain_updated')
    727 
    728     async def _close_interface(self, interface: Optional[Interface]):
    729         if not interface:
    730             return
    731         if interface.server in self._closing_ifaces:
    732             return
    733         self._closing_ifaces.add(interface.server)
    734         with self.interfaces_lock:
    735             if self.interfaces.get(interface.server) == interface:
    736                 self.interfaces.pop(interface.server)
    737         if interface == self.interface:
    738             self.interface = None
    739         try:
    740             # this can take some time if server/connection is slow:
    741             await interface.close()
    742             await interface.got_disconnected.wait()
    743         finally:
    744             self._closing_ifaces.discard(interface.server)
    745 
    746     @with_recent_servers_lock
    747     def _add_recent_server(self, server: ServerAddr) -> None:
    748         self._on_connection_successfully_established(server)
    749         # list is ordered
    750         if server in self._recent_servers:
    751             self._recent_servers.remove(server)
    752         self._recent_servers.insert(0, server)
    753         self._recent_servers = self._recent_servers[:NUM_RECENT_SERVERS]
    754         self._save_recent_servers()
    755 
    756     async def connection_down(self, interface: Interface):
    757         '''A connection to server either went down, or was never made.
    758         We distinguish by whether it is in self.interfaces.'''
    759         if not interface: return
    760         if interface.server == self.default_server:
    761             self._set_status('disconnected')
    762         await self._close_interface(interface)
    763         util.trigger_callback('network_updated')
    764 
    765     def get_network_timeout_seconds(self, request_type=NetworkTimeout.Generic) -> int:
    766         if self.oneserver and not self.auto_connect:
    767             return request_type.MOST_RELAXED
    768         if self.proxy:
    769             return request_type.RELAXED
    770         return request_type.NORMAL
    771 
    772     @ignore_exceptions  # do not kill outer taskgroup
    773     @log_exceptions
    774     async def _run_new_interface(self, server: ServerAddr):
    775         if (server in self.interfaces
    776                 or server in self._connecting_ifaces
    777                 or server in self._closing_ifaces):
    778             return
    779         self._connecting_ifaces.add(server)
    780         if server == self.default_server:
    781             self.logger.info(f"connecting to {server} as new interface")
    782             self._set_status('connecting')
    783         self._trying_addr_now(server)
    784 
    785         interface = Interface(network=self, server=server, proxy=self.proxy)
    786         # note: using longer timeouts here as DNS can sometimes be slow!
    787         timeout = self.get_network_timeout_seconds(NetworkTimeout.Generic)
    788         try:
    789             await asyncio.wait_for(interface.ready, timeout)
    790         except BaseException as e:
    791             self.logger.info(f"couldn't launch iface {server} -- {repr(e)}")
    792             await interface.close()
    793             return
    794         else:
    795             with self.interfaces_lock:
    796                 assert server not in self.interfaces
    797                 self.interfaces[server] = interface
    798         finally:
    799             self._connecting_ifaces.discard(server)
    800 
    801         if server == self.default_server:
    802             await self.switch_to_interface(server)
    803 
    804         self._has_ever_managed_to_connect_to_server = True
    805         self._add_recent_server(server)
    806         util.trigger_callback('network_updated')
    807 
    808     def check_interface_against_healthy_spread_of_connected_servers(self, iface_to_check: Interface) -> bool:
    809         # main interface is exempt. this makes switching servers easier
    810         if iface_to_check.is_main_server():
    811             return True
    812         if not iface_to_check.bucket_based_on_ipaddress():
    813             return True
    814         # bucket connected interfaces
    815         with self.interfaces_lock:
    816             interfaces = list(self.interfaces.values())
    817         if iface_to_check in interfaces:
    818             interfaces.remove(iface_to_check)
    819         buckets = defaultdict(list)
    820         for iface in interfaces:
    821             buckets[iface.bucket_based_on_ipaddress()].append(iface)
    822         # check proposed server against buckets
    823         onion_servers = buckets[BUCKET_NAME_OF_ONION_SERVERS]
    824         if iface_to_check.is_tor():
    825             # keep number of onion servers below half of all connected servers
    826             if len(onion_servers) > NUM_TARGET_CONNECTED_SERVERS // 2:
    827                 return False
    828         else:
    829             bucket = iface_to_check.bucket_based_on_ipaddress()
    830             if len(buckets[bucket]) > 0:
    831                 return False
    832         return True
    833 
    834     def best_effort_reliable(func):
    835         @functools.wraps(func)
    836         async def make_reliable_wrapper(self: 'Network', *args, **kwargs):
    837             for i in range(10):
    838                 iface = self.interface
    839                 # retry until there is a main interface
    840                 if not iface:
    841                     async with ignore_after(1):
    842                         await self.default_server_changed_event.wait()
    843                     continue  # try again
    844                 assert iface.ready.done(), "interface not ready yet"
    845                 # try actual request
    846                 try:
    847                     async with TaskGroup(wait=any) as group:
    848                         task = await group.spawn(func(self, *args, **kwargs))
    849                         await group.spawn(iface.got_disconnected.wait())
    850                 except RequestTimedOut:
    851                     await iface.close()
    852                     await iface.got_disconnected.wait()
    853                     continue  # try again
    854                 except RequestCorrupted as e:
    855                     # TODO ban server?
    856                     iface.logger.exception(f"RequestCorrupted: {e}")
    857                     await iface.close()
    858                     await iface.got_disconnected.wait()
    859                     continue  # try again
    860                 if task.done() and not task.cancelled():
    861                     return task.result()
    862                 # otherwise; try again
    863             raise BestEffortRequestFailed('no interface to do request on... gave up.')
    864         return make_reliable_wrapper
    865 
    866     def catch_server_exceptions(func):
    867         @functools.wraps(func)
    868         async def wrapper(self, *args, **kwargs):
    869             try:
    870                 return await func(self, *args, **kwargs)
    871             except aiorpcx.jsonrpc.CodeMessageError as e:
    872                 raise UntrustedServerReturnedError(original_exception=e) from e
    873         return wrapper
    874 
    875     @best_effort_reliable
    876     @catch_server_exceptions
    877     async def get_merkle_for_transaction(self, tx_hash: str, tx_height: int) -> dict:
    878         return await self.interface.get_merkle_for_transaction(tx_hash=tx_hash, tx_height=tx_height)
    879 
    880     @best_effort_reliable
    881     async def broadcast_transaction(self, tx: 'Transaction', *, timeout=None) -> None:
    882         if timeout is None:
    883             timeout = self.get_network_timeout_seconds(NetworkTimeout.Urgent)
    884         # TODO: libbitcoin
    885         _ec = await self.interface.broadcast_transaction(tx.serialize(), timeout=timeout)
    886         if _ec != 0:
    887             raise TxBroadcastServerReturnedError(f"not validated, error: {_ec!r}")
    888 
    889     async def try_broadcasting(self, tx, name):
    890         try:
    891             await self.broadcast_transaction(tx)
    892         except Exception as e:
    893             self.logger.info(f'error: could not broadcast {name} {tx.txid()}, {str(e)}')
    894         else:
    895             self.logger.info(f'success: broadcasting {name} {tx.txid()}')
    896 
    897     @staticmethod
    898     def sanitize_tx_broadcast_response(server_msg) -> str:
    899         # Unfortunately, bitcoind and hence the Electrum protocol doesn't return a useful error code.
    900         # So, we use substring matching to grok the error message.
    901         # server_msg is untrusted input so it should not be shown to the user. see #4968
    902         server_msg = str(server_msg)
    903         server_msg = server_msg.replace("\n", r"\n")
    904         # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/policy/policy.cpp
    905         # grep "reason ="
    906         policy_error_messages = {
    907             r"version": _("Transaction uses non-standard version."),
    908             r"tx-size": _("The transaction was rejected because it is too large (in bytes)."),
    909             r"scriptsig-size": None,
    910             r"scriptsig-not-pushonly": None,
    911             r"scriptpubkey":
    912                 ("scriptpubkey\n" +
    913                  _("Some of the outputs pay to a non-standard script.")),
    914             r"bare-multisig": None,
    915             r"dust":
    916                 (_("Transaction could not be broadcast due to dust outputs.\n"
    917                    "Some of the outputs are too small in value, probably lower than 1000 satoshis.\n"
    918                    "Check the units, make sure you haven't confused e.g. mBTC and BTC.")),
    919             r"multi-op-return": _("The transaction was rejected because it contains multiple OP_RETURN outputs."),
    920         }
    921         for substring in policy_error_messages:
    922             if substring in server_msg:
    923                 msg = policy_error_messages[substring]
    924                 return msg if msg else substring
    925         # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/script/script_error.cpp
    926         script_error_messages = {
    927             r"Script evaluated without error but finished with a false/empty top stack element",
    928             r"Script failed an OP_VERIFY operation",
    929             r"Script failed an OP_EQUALVERIFY operation",
    930             r"Script failed an OP_CHECKMULTISIGVERIFY operation",
    931             r"Script failed an OP_CHECKSIGVERIFY operation",
    932             r"Script failed an OP_NUMEQUALVERIFY operation",
    933             r"Script is too big",
    934             r"Push value size limit exceeded",
    935             r"Operation limit exceeded",
    936             r"Stack size limit exceeded",
    937             r"Signature count negative or greater than pubkey count",
    938             r"Pubkey count negative or limit exceeded",
    939             r"Opcode missing or not understood",
    940             r"Attempted to use a disabled opcode",
    941             r"Operation not valid with the current stack size",
    942             r"Operation not valid with the current altstack size",
    943             r"OP_RETURN was encountered",
    944             r"Invalid OP_IF construction",
    945             r"Negative locktime",
    946             r"Locktime requirement not satisfied",
    947             r"Signature hash type missing or not understood",
    948             r"Non-canonical DER signature",
    949             r"Data push larger than necessary",
    950             r"Only push operators allowed in signatures",
    951             r"Non-canonical signature: S value is unnecessarily high",
    952             r"Dummy CHECKMULTISIG argument must be zero",
    953             r"OP_IF/NOTIF argument must be minimal",
    954             r"Signature must be zero for failed CHECK(MULTI)SIG operation",
    955             r"NOPx reserved for soft-fork upgrades",
    956             r"Witness version reserved for soft-fork upgrades",
    957             r"Taproot version reserved for soft-fork upgrades",
    958             r"OP_SUCCESSx reserved for soft-fork upgrades",
    959             r"Public key version reserved for soft-fork upgrades",
    960             r"Public key is neither compressed or uncompressed",
    961             r"Stack size must be exactly one after execution",
    962             r"Extra items left on stack after execution",
    963             r"Witness program has incorrect length",
    964             r"Witness program was passed an empty witness",
    965             r"Witness program hash mismatch",
    966             r"Witness requires empty scriptSig",
    967             r"Witness requires only-redeemscript scriptSig",
    968             r"Witness provided for non-witness script",
    969             r"Using non-compressed keys in segwit",
    970             r"Invalid Schnorr signature size",
    971             r"Invalid Schnorr signature hash type",
    972             r"Invalid Schnorr signature",
    973             r"Invalid Taproot control block size",
    974             r"Too much signature validation relative to witness weight",
    975             r"OP_CHECKMULTISIG(VERIFY) is not available in tapscript",
    976             r"OP_IF/NOTIF argument must be minimal in tapscript",
    977             r"Using OP_CODESEPARATOR in non-witness script",
    978             r"Signature is found in scriptCode",
    979         }
    980         for substring in script_error_messages:
    981             if substring in server_msg:
    982                 return substring
    983         # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/validation.cpp
    984         # grep "REJECT_"
    985         # grep "TxValidationResult"
    986         # should come after script_error.cpp (due to e.g. non-mandatory-script-verify-flag)
    987         validation_error_messages = {
    988             r"coinbase": None,
    989             r"tx-size-small": None,
    990             r"non-final": None,
    991             r"txn-already-in-mempool": None,
    992             r"txn-mempool-conflict": None,
    993             r"txn-already-known": None,
    994             r"non-BIP68-final": None,
    995             r"bad-txns-nonstandard-inputs": None,
    996             r"bad-witness-nonstandard": None,
    997             r"bad-txns-too-many-sigops": None,
    998             r"mempool min fee not met":
    999                 ("mempool min fee not met\n" +
   1000                  _("Your transaction is paying a fee that is so low that the bitcoin node cannot "
   1001                    "fit it into its mempool. The mempool is already full of hundreds of megabytes "
   1002                    "of transactions that all pay higher fees. Try to increase the fee.")),
   1003             r"min relay fee not met": None,
   1004             r"absurdly-high-fee": None,
   1005             r"max-fee-exceeded": None,
   1006             r"too-long-mempool-chain": None,
   1007             r"bad-txns-spends-conflicting-tx": None,
   1008             r"insufficient fee": ("insufficient fee\n" +
   1009                  _("Your transaction is trying to replace another one in the mempool but it "
   1010                    "does not meet the rules to do so. Try to increase the fee.")),
   1011             r"too many potential replacements": None,
   1012             r"replacement-adds-unconfirmed": None,
   1013             r"mempool full": None,
   1014             r"non-mandatory-script-verify-flag": None,
   1015             r"mandatory-script-verify-flag-failed": None,
   1016             r"Transaction check failed": None,
   1017         }
   1018         for substring in validation_error_messages:
   1019             if substring in server_msg:
   1020                 msg = validation_error_messages[substring]
   1021                 return msg if msg else substring
   1022         # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/rpc/rawtransaction.cpp
   1023         # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/util/error.cpp
   1024         # grep "RPC_TRANSACTION"
   1025         # grep "RPC_DESERIALIZATION_ERROR"
   1026         rawtransaction_error_messages = {
   1027             r"Missing inputs": None,
   1028             r"Inputs missing or spent": None,
   1029             r"transaction already in block chain": None,
   1030             r"Transaction already in block chain": None,
   1031             r"TX decode failed": None,
   1032             r"Peer-to-peer functionality missing or disabled": None,
   1033             r"Transaction rejected by AcceptToMemoryPool": None,
   1034             r"AcceptToMemoryPool failed": None,
   1035             r"Fee exceeds maximum configured by user": None,
   1036         }
   1037         for substring in rawtransaction_error_messages:
   1038             if substring in server_msg:
   1039                 msg = rawtransaction_error_messages[substring]
   1040                 return msg if msg else substring
   1041         # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/consensus/tx_verify.cpp
   1042         # https://github.com/bitcoin/bitcoin/blob/c7ad94428ab6f54661d7a5441e1fdd0ebf034903/src/consensus/tx_check.cpp
   1043         # grep "REJECT_"
   1044         # grep "TxValidationResult"
   1045         tx_verify_error_messages = {
   1046             r"bad-txns-vin-empty": None,
   1047             r"bad-txns-vout-empty": None,
   1048             r"bad-txns-oversize": None,
   1049             r"bad-txns-vout-negative": None,
   1050             r"bad-txns-vout-toolarge": None,
   1051             r"bad-txns-txouttotal-toolarge": None,
   1052             r"bad-txns-inputs-duplicate": None,
   1053             r"bad-cb-length": None,
   1054             r"bad-txns-prevout-null": None,
   1055             r"bad-txns-inputs-missingorspent":
   1056                 ("bad-txns-inputs-missingorspent\n" +
   1057                  _("You might have a local transaction in your wallet that this transaction "
   1058                    "builds on top. You need to either broadcast or remove the local tx.")),
   1059             r"bad-txns-premature-spend-of-coinbase": None,
   1060             r"bad-txns-inputvalues-outofrange": None,
   1061             r"bad-txns-in-belowout": None,
   1062             r"bad-txns-fee-outofrange": None,
   1063         }
   1064         for substring in tx_verify_error_messages:
   1065             if substring in server_msg:
   1066                 msg = tx_verify_error_messages[substring]
   1067                 return msg if msg else substring
   1068         # otherwise:
   1069         return _("Unknown error")
   1070 
   1071     @best_effort_reliable
   1072     @catch_server_exceptions
   1073     async def request_chunk(self, height: int, tip=None, *, can_return_early=False):
   1074         return await self.interface.request_chunk(height, tip=tip, can_return_early=can_return_early)
   1075 
   1076     @best_effort_reliable
   1077     @catch_server_exceptions
   1078     async def get_transaction(self, tx_hash: str, *, timeout=None) -> str:
   1079         return await self.interface.get_transaction(tx_hash=tx_hash, timeout=timeout)
   1080 
   1081     @best_effort_reliable
   1082     @catch_server_exceptions
   1083     async def get_history_for_scripthash(self, sh: str) -> List[dict]:
   1084         return await self.interface.get_history_for_scripthash(sh)
   1085 
   1086     @best_effort_reliable
   1087     @catch_server_exceptions
   1088     async def listunspent_for_scripthash(self, sh: str) -> List[dict]:
   1089         return await self.interface.listunspent_for_scripthash(sh)
   1090 
   1091     @best_effort_reliable
   1092     @catch_server_exceptions
   1093     async def get_balance_for_scripthash(self, sh: str) -> dict:
   1094         return await self.interface.get_balance_for_scripthash(sh)
   1095 
   1096     @best_effort_reliable
   1097     @catch_server_exceptions
   1098     async def get_txid_from_txpos(self, tx_height, tx_pos, merkle):
   1099         return await self.interface.get_txid_from_txpos(tx_height, tx_pos, merkle)
   1100 
   1101     def blockchain(self) -> Blockchain:
   1102         interface = self.interface
   1103         if interface and interface.blockchain is not None:
   1104             self._blockchain = interface.blockchain
   1105         return self._blockchain
   1106 
   1107     def get_blockchains(self):
   1108         out = {}  # blockchain_id -> list(interfaces)
   1109         with blockchain.blockchains_lock: blockchain_items = list(blockchain.blockchains.items())
   1110         with self.interfaces_lock: interfaces_values = list(self.interfaces.values())
   1111         for chain_id, bc in blockchain_items:
   1112             r = list(filter(lambda i: i.blockchain==bc, interfaces_values))
   1113             if r:
   1114                 out[chain_id] = r
   1115         return out
   1116 
   1117     def _set_preferred_chain(self, chain: Optional[Blockchain]):
   1118         if chain:
   1119             height = chain.get_max_forkpoint()
   1120             header_hash = chain.get_hash(height)
   1121         else:
   1122             height = 0
   1123             header_hash = constants.net.GENESIS
   1124         self._blockchain_preferred_block = {
   1125             'height': height,
   1126             'hash': header_hash,
   1127         }
   1128         self.config.set_key('blockchain_preferred_block', self._blockchain_preferred_block)
   1129 
   1130     async def follow_chain_given_id(self, chain_id: str) -> None:
   1131         bc = blockchain.blockchains.get(chain_id)
   1132         if not bc:
   1133             raise Exception('blockchain {} not found'.format(chain_id))
   1134         self._set_preferred_chain(bc)
   1135         # select server on this chain
   1136         with self.interfaces_lock: interfaces = list(self.interfaces.values())
   1137         interfaces_on_selected_chain = list(filter(lambda iface: iface.blockchain == bc, interfaces))
   1138         if len(interfaces_on_selected_chain) == 0: return
   1139         chosen_iface = random.choice(interfaces_on_selected_chain)  # type: Interface
   1140         # switch to server (and save to config)
   1141         net_params = self.get_parameters()
   1142         net_params = net_params._replace(server=chosen_iface.server)
   1143         await self.set_parameters(net_params)
   1144 
   1145     async def follow_chain_given_server(self, server: ServerAddr) -> None:
   1146         # note that server_str should correspond to a connected interface
   1147         iface = self.interfaces.get(server)
   1148         if iface is None:
   1149             return
   1150         self._set_preferred_chain(iface.blockchain)
   1151         # switch to server (and save to config)
   1152         net_params = self.get_parameters()
   1153         net_params = net_params._replace(server=server)
   1154         await self.set_parameters(net_params)
   1155 
   1156     def get_server_height(self) -> int:
   1157         """Length of header chain, as claimed by main interface."""
   1158         interface = self.interface
   1159         return interface.tip if interface else 0
   1160 
   1161     def get_local_height(self):
   1162         """Length of header chain, POW-verified.
   1163         In case of a chain split, this is for the branch the main interface is on,
   1164         but it is the tip of that branch (even if main interface is behind).
   1165         """
   1166         return self.blockchain().height()
   1167 
   1168     def export_checkpoints(self, path):
   1169         """Run manually to generate blockchain checkpoints.
   1170         Kept for console use only.
   1171         """
   1172         cp = self.blockchain().get_checkpoints()
   1173         with open(path, 'w', encoding='utf-8') as f:
   1174             f.write(json.dumps(cp, indent=4))
   1175 
   1176     async def _start(self):
   1177         assert not self.taskgroup
   1178         self.taskgroup = taskgroup = SilentTaskGroup()
   1179         assert not self.interface and not self.interfaces
   1180         assert not self._connecting_ifaces
   1181         assert not self._closing_ifaces
   1182         self.logger.info('starting network')
   1183         self._clear_addr_retry_times()
   1184         self._set_proxy(deserialize_proxy(self.config.get('proxy')))
   1185         self._maybe_set_oneserver()
   1186         await self.taskgroup.spawn(self._run_new_interface(self.default_server))
   1187 
   1188         async def main():
   1189             self.logger.info("starting taskgroup.")
   1190             try:
   1191                 # note: if a task finishes with CancelledError, that
   1192                 # will NOT raise, and the group will keep the other tasks running
   1193                 async with taskgroup as group:
   1194                     await group.spawn(self._maintain_sessions())
   1195                     [await group.spawn(job) for job in self._jobs]
   1196             except asyncio.CancelledError:
   1197                 raise
   1198             except Exception as e:
   1199                 self.logger.exception("taskgroup died.")
   1200             finally:
   1201                 self.logger.info("taskgroup stopped.")
   1202         asyncio.run_coroutine_threadsafe(main(), self.asyncio_loop)
   1203 
   1204         util.trigger_callback('network_updated')
   1205 
   1206     def start(self, jobs: Iterable = None):
   1207         """Schedule starting the network, along with the given job co-routines.
   1208 
   1209         Note: the jobs will *restart* every time the network restarts, e.g. on proxy
   1210         setting changes.
   1211         """
   1212         self._jobs = jobs or []
   1213         asyncio.run_coroutine_threadsafe(self._start(), self.asyncio_loop)
   1214 
   1215     @log_exceptions
   1216     async def stop(self, *, full_shutdown: bool = True):
   1217         self.logger.info("stopping network")
   1218         # timeout: if full_shutdown, it is up to the caller to time us out,
   1219         #          otherwise if e.g. restarting due to proxy changes, we time out fast
   1220         async with (nullcontext() if full_shutdown else ignore_after(1)):
   1221             async with TaskGroup() as group:
   1222                 await group.spawn(self.taskgroup.cancel_remaining())
   1223                 if full_shutdown:
   1224                     await group.spawn(self.stop_gossip(full_shutdown=full_shutdown))
   1225         self.taskgroup = None
   1226         self.interface = None
   1227         self.interfaces = {}
   1228         self._connecting_ifaces.clear()
   1229         self._closing_ifaces.clear()
   1230         if not full_shutdown:
   1231             util.trigger_callback('network_updated')
   1232 
   1233     async def _ensure_there_is_a_main_interface(self):
   1234         if self.is_connected():
   1235             return
   1236         # if auto_connect is set, try a different server
   1237         if self.auto_connect and not self.is_connecting():
   1238             await self._switch_to_random_interface()
   1239         # if auto_connect is not set, or still no main interface, retry current
   1240         if not self.is_connected() and not self.is_connecting():
   1241             if self._can_retry_addr(self.default_server, urgent=True):
   1242                 await self.switch_to_interface(self.default_server)
   1243 
   1244     async def _maintain_sessions(self):
   1245         async def maybe_start_new_interfaces():
   1246             num_existing_ifaces = len(self.interfaces) + len(self._connecting_ifaces) + len(self._closing_ifaces)
   1247             for i in range(self.num_server - num_existing_ifaces):
   1248                 # FIXME this should try to honour "healthy spread of connected servers"
   1249                 server = self._get_next_server_to_try()
   1250                 if server:
   1251                     await self.taskgroup.spawn(self._run_new_interface(server))
   1252         async def maintain_healthy_spread_of_connected_servers():
   1253             with self.interfaces_lock: interfaces = list(self.interfaces.values())
   1254             random.shuffle(interfaces)
   1255             for iface in interfaces:
   1256                 if not self.check_interface_against_healthy_spread_of_connected_servers(iface):
   1257                     self.logger.info(f"disconnecting from {iface.server}. too many connected "
   1258                                      f"servers already in bucket {iface.bucket_based_on_ipaddress()}")
   1259                     await self._close_interface(iface)
   1260         async def maintain_main_interface():
   1261             await self._ensure_there_is_a_main_interface()
   1262             if self.is_connected():
   1263                 if self.config.is_fee_estimates_update_required():
   1264                     await self.interface.taskgroup.spawn(self._request_fee_estimates, self.interface)
   1265 
   1266         while True:
   1267             try:
   1268                 await maybe_start_new_interfaces()
   1269                 await maintain_healthy_spread_of_connected_servers()
   1270                 await maintain_main_interface()
   1271             except asyncio.CancelledError:
   1272                 # suppress spurious cancellations
   1273                 group = self.taskgroup
   1274                 if not group or group.closed():
   1275                     raise
   1276             await asyncio.sleep(0.1)
   1277 
   1278     @classmethod
   1279     async def _send_http_on_proxy(cls, method: str, url: str, params: str = None,
   1280                                   body: bytes = None, json: dict = None, headers=None,
   1281                                   on_finish=None, timeout=None):
   1282         async def default_on_finish(resp: ClientResponse):
   1283             resp.raise_for_status()
   1284             return await resp.text()
   1285         if headers is None:
   1286             headers = {}
   1287         if on_finish is None:
   1288             on_finish = default_on_finish
   1289         network = cls.get_instance()
   1290         proxy = network.proxy if network else None
   1291         async with make_aiohttp_session(proxy, timeout=timeout) as session:
   1292             if method == 'get':
   1293                 async with session.get(url, params=params, headers=headers) as resp:
   1294                     return await on_finish(resp)
   1295             elif method == 'post':
   1296                 assert body is not None or json is not None, 'body or json must be supplied if method is post'
   1297                 if body is not None:
   1298                     async with session.post(url, data=body, headers=headers) as resp:
   1299                         return await on_finish(resp)
   1300                 elif json is not None:
   1301                     async with session.post(url, json=json, headers=headers) as resp:
   1302                         return await on_finish(resp)
   1303             else:
   1304                 assert False
   1305 
   1306     @classmethod
   1307     def send_http_on_proxy(cls, method, url, **kwargs):
   1308         network = cls.get_instance()
   1309         if network:
   1310             assert network._loop_thread is not threading.currentThread()
   1311             loop = network.asyncio_loop
   1312         else:
   1313             loop = asyncio.get_event_loop()
   1314         coro = asyncio.run_coroutine_threadsafe(cls._send_http_on_proxy(method, url, **kwargs), loop)
   1315         # note: _send_http_on_proxy has its own timeout, so no timeout here:
   1316         return coro.result()
   1317 
   1318     # methods used in scripts
   1319     async def get_peers(self):
   1320         while not self.is_connected():
   1321             await asyncio.sleep(1)
   1322         session = self.interface.session
   1323         # TODO: libbitcoin
   1324         return parse_servers(await session.send_request('server.peers.subscribe'))
   1325 
   1326     async def send_multiple_requests(self, servers: Sequence[ServerAddr], method: str, params: Sequence):
   1327         responses = dict()
   1328         async def get_response(server: ServerAddr):
   1329             interface = Interface(network=self, server=server, proxy=self.proxy)
   1330             timeout = self.get_network_timeout_seconds(NetworkTimeout.Urgent)
   1331             try:
   1332                 await asyncio.wait_for(interface.ready, timeout)
   1333             except BaseException as e:
   1334                 await interface.close()
   1335                 return
   1336             try:
   1337                 # TODO: libbitcoin XXX:
   1338                 res = await interface.session.send_request(method, params, timeout=10)
   1339             except Exception as e:
   1340                 res = e
   1341             responses[interface.server] = res
   1342         async with TaskGroup() as group:
   1343             for server in servers:
   1344                 await group.spawn(get_response(server))
   1345         return responses