commit a63eaa3c4eae7f5ad1a0e284b5aa8269567ee5f7
parent 13a01b11d290eb56a8292982ddd78c4b8470f174
Author: ThomasV <thomasv@gitorious>
Date: Mon, 18 Aug 2014 15:13:16 +0200
daemon: multiplexing
Diffstat:
1 file changed, 32 insertions(+), 10 deletions(-)
diff --git a/lib/daemon.py b/lib/daemon.py
@@ -69,7 +69,7 @@ class ClientThread(threading.Thread):
self.server = server
self.daemon = True
self.client_pipe = util.SocketPipe(s)
- self.daemon_pipe = util.QueuePipe(send_queue = self.server.network.requests_queue)
+ self.response_queue = Queue.Queue()
self.server.add_client(self)
def reading_thread(self):
@@ -81,19 +81,17 @@ class ClientThread(threading.Thread):
if request is None:
self.running = False
break
-
if request.get('method') == 'daemon.stop':
self.server.stop()
continue
-
- self.daemon_pipe.send(request)
+ self.server.send_request(self, request)
def run(self):
self.running = True
threading.Thread(target=self.reading_thread).start()
while self.running:
try:
- response = self.daemon_pipe.get()
+ response = self.response_queue.get()
except util.timeout:
continue
try:
@@ -112,6 +110,7 @@ class NetworkServer(threading.Thread):
def __init__(self, config):
threading.Thread.__init__(self)
self.daemon = True
+ self.debug = False
self.config = config
self.network = Network(config)
# network sends responses on that queue
@@ -122,7 +121,8 @@ class NetworkServer(threading.Thread):
# each GUI is a client of the daemon
self.clients = []
- # todo: the daemon needs to know which client subscribed to which address
+ self.request_id = 0
+ self.requests = {}
def is_running(self):
with self.lock:
@@ -139,7 +139,7 @@ class NetworkServer(threading.Thread):
def add_client(self, client):
for key in ['status','banner','updated','servers','interfaces']:
value = self.network.get_status_value(key)
- client.daemon_pipe.get_queue.put({'method':'network.status', 'params':[key, value]})
+ client.response_queue.put({'method':'network.status', 'params':[key, value]})
with self.lock:
self.clients.append(client)
@@ -148,6 +148,18 @@ class NetworkServer(threading.Thread):
self.clients.remove(client)
print_error("client quit:", len(self.clients))
+
+ def send_request(self, client, request):
+ with self.lock:
+ self.request_id += 1
+ self.requests[self.request_id] = (request['id'], client)
+ request['id'] = self.request_id
+
+ if self.debug:
+ print_error("-->", request)
+ self.network.requests_queue.put(request)
+
+
def run(self):
self.network.start(self.network_queue)
while self.is_running():
@@ -155,8 +167,18 @@ class NetworkServer(threading.Thread):
response = self.network_queue.get(timeout=0.1)
except Queue.Empty:
continue
- for client in self.clients:
- client.daemon_pipe.get_queue.put(response)
+ if self.debug:
+ print_error("<--", response)
+ response_id = response.get('id')
+ if response_id:
+ with self.lock:
+ client_id, client = self.requests.pop(response_id)
+ response['id'] = client_id
+ client.response_queue.put(response)
+ else:
+ # notification
+ for client in self.clients:
+ client.response_queue.put(response)
self.network.stop()
print_error("server exiting")
@@ -201,6 +223,6 @@ if __name__ == '__main__':
try:
daemon_loop(server)
except KeyboardInterrupt:
- print "Ctrl C - Stopping server"
+ print "Ctrl C - Stopping daemon"
server.stop()
sys.exit(1)