commit 37747d74695daf000ffb0db9626191193a6b0555
parent 796853106537e7909729172a1315935028e6765c
Author: SomberNight <somber.night@protonmail.com>
Date: Thu, 9 Jan 2020 17:50:05 +0100
split network main_taskgroup: create daemon.taskgroup
network.main_taskgroup restarts every time the proxy settings are changed,
many long-running tasks (some introduced with lightning) are not prepared for and do not want this.
Diffstat:
3 files changed, 58 insertions(+), 23 deletions(-)
diff --git a/electrum/daemon.py b/electrum/daemon.py
@@ -29,7 +29,7 @@ import time
import traceback
import sys
import threading
-from typing import Dict, Optional, Tuple
+from typing import Dict, Optional, Tuple, Iterable
from base64 import b64decode
from collections import defaultdict
@@ -39,6 +39,7 @@ import jsonrpcclient
import jsonrpcserver
from jsonrpcserver import response
from jsonrpcclient.clients.aiohttp_client import AiohttpClient
+from aiorpcx import TaskGroup
from .network import Network
from .util import (json_decode, to_bytes, to_string, profiler, standardize_path, constant_time_compare)
@@ -280,28 +281,44 @@ class Daemon(Logger):
if fd is None:
raise Exception('failed to lock daemon; already running?')
self.asyncio_loop = asyncio.get_event_loop()
- if config.get('offline'):
- self.network = None
- else:
- self.network = Network(config)
+ self.network = None
+ if not config.get('offline'):
+ self.network = Network(config, daemon=self)
self.fx = FxThread(config, self.network)
self.gui_object = None
# path -> wallet; make sure path is standardized.
self._wallets = {} # type: Dict[str, Abstract_Wallet]
- jobs = [self.fx.run]
+ daemon_jobs = []
# Setup JSONRPC server
if listen_jsonrpc:
- jobs.append(self.start_jsonrpc(config, fd))
+ daemon_jobs.append(self.start_jsonrpc(config, fd))
# request server
- if self.config.get('run_payserver'):
+ self.pay_server = None
+ if not config.get('offline') and self.config.get('run_payserver'):
self.pay_server = PayServer(self)
- jobs.append(self.pay_server.run())
+ daemon_jobs.append(self.pay_server.run())
# server-side watchtower
- if self.config.get('run_watchtower'):
+ self.watchtower = None
+ if not config.get('offline') and self.config.get('run_watchtower'):
self.watchtower = WatchTowerServer(self.network)
- jobs.append(self.watchtower.run)
+ daemon_jobs.append(self.watchtower.run)
if self.network:
- self.network.start(jobs)
+ self.network.start(jobs=[self.fx.run])
+
+ self.taskgroup = TaskGroup()
+ asyncio.run_coroutine_threadsafe(self._run(jobs=daemon_jobs), self.asyncio_loop)
+
+ @log_exceptions
+ async def _run(self, jobs: Iterable = None):
+ if jobs is None:
+ jobs = []
+ try:
+ async with self.taskgroup as group:
+ [await group.spawn(job) for job in jobs]
+ except BaseException as e:
+ self.logger.exception('daemon.taskgroup died.')
+ finally:
+ self.logger.info("stopping daemon.taskgroup")
async def authenticate(self, headers):
if self.rpc_password == '':
@@ -462,7 +479,7 @@ class Daemon(Logger):
def is_running(self):
with self.running_lock:
- return self.running
+ return self.running and not self.taskgroup.closed()
def stop(self):
with self.running_lock:
@@ -477,8 +494,15 @@ class Daemon(Logger):
if self.network:
self.logger.info("shutting down network")
self.network.stop()
- self.logger.info("stopping, removing lockfile")
+ self.logger.info("stopping taskgroup")
+ fut = asyncio.run_coroutine_threadsafe(self.taskgroup.cancel_remaining(), self.asyncio_loop)
+ try:
+ fut.result(timeout=2)
+ except (asyncio.TimeoutError, asyncio.CancelledError):
+ pass
+ self.logger.info("removing lockfile")
remove_lockfile(get_lockfile(self.config))
+ self.logger.info("stopped")
def run_gui(self, config, plugins):
threading.current_thread().setName('GUI')
diff --git a/electrum/lnworker.py b/electrum/lnworker.py
@@ -188,10 +188,11 @@ class LNWorker(Logger):
def start_network(self, network: 'Network'):
self.network = network
self.config = network.config
+ daemon = network.daemon
self.channel_db = self.network.channel_db
self._last_tried_peer = {} # type: Dict[LNPeerAddr, float] # LNPeerAddr -> unix timestamp
self._add_peers_from_config()
- asyncio.run_coroutine_threadsafe(self.network.main_taskgroup.spawn(self.main_loop()), self.network.asyncio_loop)
+ asyncio.run_coroutine_threadsafe(daemon.taskgroup.spawn(self.main_loop()), self.network.asyncio_loop)
def _add_peers_from_config(self):
peer_list = self.config.get('lightning_peers', [])
@@ -306,7 +307,7 @@ class LNGossip(LNWorker):
def start_network(self, network: 'Network'):
super().start_network(network)
- asyncio.run_coroutine_threadsafe(self.network.main_taskgroup.spawn(self.maintain_db()), self.network.asyncio_loop)
+ asyncio.run_coroutine_threadsafe(network.daemon.taskgroup.spawn(self.maintain_db()), self.network.asyncio_loop)
async def maintain_db(self):
await self.channel_db.load_data()
@@ -409,6 +410,7 @@ class LNWallet(LNWorker):
self.lnwatcher = LNWatcher(network)
self.lnwatcher.start_network(network)
self.network = network
+ daemon = network.daemon
self.network.register_callback(self.on_update_open_channel, ['update_open_channel'])
self.network.register_callback(self.on_update_closed_channel, ['update_closed_channel'])
for chan_id, chan in self.channels.items():
@@ -422,8 +424,8 @@ class LNWallet(LNWorker):
self.sync_with_local_watchtower(),
self.sync_with_remote_watchtower(),
]:
- # FIXME: exceptions in those coroutines will cancel network.main_taskgroup
- asyncio.run_coroutine_threadsafe(self.network.main_taskgroup.spawn(coro), self.network.asyncio_loop)
+ # FIXME: exceptions in those coroutines will cancel daemon.taskgroup
+ asyncio.run_coroutine_threadsafe(daemon.taskgroup.spawn(coro), self.network.asyncio_loop)
def peer_closed(self, peer):
for chan in self.channels_for_peer(peer.pubkey).values():
diff --git a/electrum/network.py b/electrum/network.py
@@ -33,7 +33,7 @@ import json
import sys
import ipaddress
import asyncio
-from typing import NamedTuple, Optional, Sequence, List, Dict, Tuple, TYPE_CHECKING
+from typing import NamedTuple, Optional, Sequence, List, Dict, Tuple, TYPE_CHECKING, Iterable
import traceback
import concurrent
from concurrent import futures
@@ -67,6 +67,7 @@ if TYPE_CHECKING:
from .lnworker import LNGossip
from .lnwatcher import WatchTower
from .transaction import Transaction
+ from .daemon import Daemon
_logger = get_logger(__name__)
@@ -237,7 +238,7 @@ class Network(Logger):
LOGGING_SHORTCUT = 'n'
- def __init__(self, config: SimpleConfig):
+ def __init__(self, config: SimpleConfig, *, daemon: 'Daemon' = None):
global _INSTANCE
assert _INSTANCE is None, "Network is a singleton!"
_INSTANCE = self
@@ -250,6 +251,9 @@ class Network(Logger):
assert isinstance(config, SimpleConfig), f"config should be a SimpleConfig instead of {type(config)}"
self.config = config
+
+ self.daemon = daemon
+
blockchain.read_blockchains(self.config)
self.logger.info(f"blockchains {list(map(lambda b: b.forkpoint, blockchain.blockchains.values()))}")
self._blockchain_preferred_block = self.config.get('blockchain_preferred_block', None) # type: Optional[Dict]
@@ -747,7 +751,7 @@ class Network(Logger):
self.trigger_callback('network_updated')
if blockchain_updated: self.trigger_callback('blockchain_updated')
- async def _close_interface(self, interface):
+ async def _close_interface(self, interface: Interface):
if interface:
with self.interfaces_lock:
if self.interfaces.get(interface.server) == interface:
@@ -1185,7 +1189,12 @@ class Network(Logger):
self.trigger_callback('network_updated')
- def start(self, jobs: List=None):
+ def start(self, jobs: Iterable = None):
+ """Schedule starting the network, along with the given job co-routines.
+
+ Note: the jobs will *restart* every time the network restarts, e.g. on proxy
+ setting changes.
+ """
self._jobs = jobs or []
asyncio.run_coroutine_threadsafe(self._start(), self.asyncio_loop)
@@ -1264,7 +1273,7 @@ class Network(Logger):
except asyncio.CancelledError:
# suppress spurious cancellations
group = self.main_taskgroup
- if not group or group._closed:
+ if not group or group.closed():
raise
await asyncio.sleep(0.1)