commit 2934fd17be660887e6d0154d10dfb31310e18532
parent cf23127ac1990c57d75b9438b9a3cad0d7158e9a
Author: ThomasV <thomasv@electrum.org>
Date: Wed, 25 Nov 2015 10:32:46 +0100
fix websockets
Diffstat:
3 files changed, 20 insertions(+), 54 deletions(-)
diff --git a/electrum b/electrum
@@ -269,8 +269,6 @@ class ClientThread(util.DaemonThread):
util.DaemonThread.__init__(self)
self.server = server
self.client_pipe = util.SocketPipe(s)
- self.response_queue = Queue.Queue()
- self.server.add_client(self)
self.subscriptions = defaultdict(list)
self.network = self.server.network
@@ -316,7 +314,6 @@ class ClientThread(util.DaemonThread):
response = {'error':err}
# send response and exit
self.client_pipe.send(response)
- self.server.remove_client(self)
@@ -330,24 +327,9 @@ class NetworkServer(util.DaemonThread):
self.pipe = util.QueuePipe()
self.network = network
self.lock = threading.RLock()
- # each GUI is a client of the daemon
- self.clients = []
# gui is None is we run as daemon
self.gui = None
- def add_client(self, client):
- for key in ['fee', 'status', 'banner', 'updated', 'servers', 'interfaces']:
- value = self.network.get_status_value(key)
- client.response_queue.put({'method':'network.status', 'params':[key, value]})
- with self.lock:
- self.clients.append(client)
- print_error("new client:", len(self.clients))
-
- def remove_client(self, client):
- with self.lock:
- self.clients.remove(client)
- print_error("client quit:", len(self.clients))
-
def run(self):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(('', 0))
diff --git a/lib/network.py b/lib/network.py
@@ -537,7 +537,7 @@ class Network(util.DaemonThread):
self.process_response(interface, response, callback)
def send(self, messages, callback):
- '''Messages is a list of (method, value) tuples'''
+ '''Messages is a list of (method, params) tuples'''
with self.lock:
self.pending_sends.append((messages, callback))
diff --git a/lib/websockets.py b/lib/websockets.py
@@ -46,15 +46,12 @@ class ElectrumWebSocket(WebSocket):
class WsClientThread(util.DaemonThread):
- def __init__(self, config, server):
+ def __init__(self, config, network):
util.DaemonThread.__init__(self)
- self.server = server
+ self.network = network
self.config = config
self.response_queue = Queue.Queue()
- self.server.add_client(self)
self.subscriptions = defaultdict(list)
- self.sub_ws = defaultdict(list)
- self.counter = 0
def make_request(self, request_id):
# read json file
@@ -77,13 +74,11 @@ class WsClientThread(util.DaemonThread):
addr, amount = self.make_request(request_id)
except:
continue
- method = 'blockchain.address.subscribe'
- params = [addr]
- request = {'method':method, 'params':params, 'id':self.counter}
- self.subscriptions[method].append(params)
- self.sub_ws[self.counter] = ws, amount, request
- self.counter += 1
- self.server.send_request(self, request)
+ l = self.subscriptions.get(addr, [])
+ l.append((ws, amount))
+ self.subscriptions[addr] = l
+ self.network.send([('blockchain.address.subscribe', [addr])], self.response_queue.put)
+
def run(self):
threading.Thread(target=self.reading_thread).start()
@@ -92,32 +87,21 @@ class WsClientThread(util.DaemonThread):
r = self.response_queue.get(timeout=0.1)
except Queue.Empty:
continue
- id = r.get('id')
- if id is None:
- method = r.get('method')
- params = r.get('params')
- else:
- ws, amount, rr = self.sub_ws[id]
- method = rr.get('method')
- params = rr.get('params')
-
+ util.print_error('response', r)
+ method = r.get('method')
+ params = r.get('params')
result = r.get('result')
-
if method == 'blockchain.address.subscribe':
- util.print_error('response', r)
if result is not None:
- request = {'method':'blockchain.address.get_balance', 'params':params, 'id':self.counter}
- self.server.send_request(self, request)
- self.sub_ws[self.counter] = ws, amount, request
- self.counter += 1
-
- if r.get('method') == 'blockchain.address.get_balance':
- util.print_error('response', r)
- if not ws.closed:
- if sum(result.values()) >=amount:
- ws.sendMessage(unicode('paid'))
-
- self.server.remove_client(self)
+ self.network.send([('blockchain.address.get_balance', params)], self.response_queue.put)
+ elif method == 'blockchain.address.get_balance':
+ addr = params[0]
+ l = self.subscriptions.get(addr, [])
+ for ws, amount in :l:
+ if not ws.closed:
+ if sum(result.values()) >=amount:
+ ws.sendMessage(unicode('paid'))
+
class WebSocketServer(threading.Thread):