commit 2e61359d50cf4fbee6e0880e33a59e9b66634676
parent 23f56ffa8ab64e9d22f2c1427c5114114c2ff879
Author: SomberNight <somber.night@protonmail.com>
Date: Thu, 13 Sep 2018 21:20:55 +0200
network: stop pending connections when stopping network
Diffstat:
1 file changed, 26 insertions(+), 9 deletions(-)
diff --git a/electrum/network.py b/electrum/network.py
@@ -224,10 +224,11 @@ class Network(PrintError):
self.auto_connect = self.config.get('auto_connect', True)
self.connecting = set()
self.requested_chunks = set()
- self.socket_queue = queue.Queue()
+ self.server_queue = None
+ self.server_queue_group = None
+ self.asyncio_loop = asyncio.get_event_loop()
self.start_network(deserialize_server(self.default_server)[2],
deserialize_proxy(self.config.get('proxy')))
- self.asyncio_loop = asyncio.get_event_loop()
@staticmethod
def get_instance():
@@ -417,12 +418,12 @@ class Network(PrintError):
@with_interface_lock
def start_interface(self, server):
- if (not server in self.interfaces and not server in self.connecting):
+ if server not in self.interfaces and server not in self.connecting:
if server == self.default_server:
self.print_error("connecting to %s as new interface" % server)
self.set_status('connecting')
self.connecting.add(server)
- self.socket_queue.put(server)
+ self.server_queue.put(server)
def start_random_interface(self):
with self.interface_lock:
@@ -482,13 +483,24 @@ class Network(PrintError):
@with_interface_lock
def start_network(self, protocol: str, proxy: Optional[dict]):
assert not self.interface and not self.interfaces
- assert not self.connecting and self.socket_queue.empty()
+ assert not self.connecting and not self.server_queue
+ assert not self.server_queue_group
self.print_error('starting network')
self.disconnected_servers = set([]) # note: needs self.interface_lock
self.protocol = protocol
+ self._init_server_queue()
self.set_proxy(proxy)
self.start_interface(self.default_server)
+ def _init_server_queue(self):
+ self.server_queue = queue.Queue()
+ self.server_queue_group = server_queue_group = TaskGroup()
+ async def job():
+ forever = asyncio.Event()
+ async with server_queue_group as group:
+ await group.spawn(forever.wait())
+ asyncio.run_coroutine_threadsafe(job(), self.asyncio_loop)
+
@with_interface_lock
def stop_network(self):
self.print_error("stopping network")
@@ -499,8 +511,13 @@ class Network(PrintError):
assert self.interface is None
assert not self.interfaces
self.connecting.clear()
+ self._stop_server_queue()
+
+ def _stop_server_queue(self):
# Get a new queue - no old pending connections thanks!
- self.socket_queue = queue.Queue()
+ self.server_queue = None
+ asyncio.run_coroutine_threadsafe(self.server_queue_group.cancel_remaining(), self.asyncio_loop)
+ self.server_queue_group = None
def set_parameters(self, net_params: NetworkParameters):
proxy = net_params.proxy
@@ -768,9 +785,9 @@ class Network(PrintError):
async def maintain_sessions(self):
while True:
- while self.socket_queue.qsize() > 0:
- server = self.socket_queue.get()
- asyncio.get_event_loop().create_task(self.new_interface(server))
+ while self.server_queue.qsize() > 0:
+ server = self.server_queue.get()
+ await self.server_queue_group.spawn(self.new_interface(server))
remove = []
for k, i in self.interfaces.items():
if i.fut.done() and not i.exception: