commit 035ecbc7cd41036bf96dd8a9b6b57d09b7c4eabc
parent bd3bfb5e535b59f2ad2f0450f4e5a2ce600f2a7b
Author: ThomasV <thomasv@gitorious>
Date: Sun, 27 Jul 2014 11:33:02 +0200
redo inter-thread communication using pipes
Diffstat:
8 files changed, 298 insertions(+), 215 deletions(-)
diff --git a/electrum b/electrum
@@ -43,6 +43,7 @@ if is_local:
sys.path.append('packages')
+from electrum import util
from electrum import SimpleConfig, Network, Wallet, WalletStorage, NetworkProxy, Commands, known_commands, pick_random_server
from electrum.util import print_msg, print_stderr, print_json, set_verbosity
@@ -150,7 +151,7 @@ def do_start_daemon():
import subprocess
logfile = open(os.path.join(config.path, 'daemon.log'),'w')
p = subprocess.Popen([__file__,"daemon"], stderr=logfile, stdout=logfile, close_fds=True)
- print "starting daemon (PID %d)"%p.pid
+ print_stderr("starting daemon (PID %d)"%p.pid)
def daemon_socket(start_daemon=True):
@@ -222,12 +223,8 @@ if __name__ == '__main__':
# network interface
if not options.offline:
s = daemon_socket(start_daemon=options.daemon)
- if s:
- network = NetworkProxy(s, config)
- network.start()
- else:
- network = Network(config)
- network.start()
+ network = NetworkProxy(s, config)
+ network.start()
else:
network = None
diff --git a/lib/daemon.py b/lib/daemon.py
@@ -24,140 +24,72 @@ import threading
import traceback
import json
import Queue
+
+import util
from network import Network
from util import print_error, print_stderr, parse_json
from simple_config import SimpleConfig
-"""
-The Network object is not aware of clients/subscribers
-It only does subscribe/unsubscribe to addresses
-Which client has wich address is managed by the daemon
-Network also reports status changes
-"""
DAEMON_PORT=8001
-
-
class ClientThread(threading.Thread):
- # read messages from client (socket), and sends them to Network
- # responses are sent back on the same socket
- def __init__(self, server, network, s):
+ def __init__(self, server, s):
threading.Thread.__init__(self)
self.server = server
self.daemon = True
- self.s = s
- self.s.settimeout(0.1)
- self.network = network
- self.queue = Queue.Queue()
- self.unanswered_requests = {}
- self.debug = False
+ self.client_pipe = util.SocketPipe(s)
+ self.daemon_pipe = util.QueuePipe(send_queue = self.server.network.requests_queue)
self.server.add_client(self)
-
- def run(self):
-
- message = ''
- while True:
- try:
- self.send_responses()
- except socket.error:
- break
-
+ def reading_thread(self):
+ while self.running:
try:
- data = self.s.recv(1024)
- except socket.timeout:
+ request = self.client_pipe.get()
+ except util.timeout:
continue
- except:
- data = ''
- if not data:
+ if request is None:
+ self.running = False
break
- message += data
- while True:
- cmd, message = parse_json(message)
- if not cmd:
- break
- self.process(cmd)
-
- self.server.remove_client(self)
+ if request.get('method') == 'daemon.stop':
+ self.server.stop()
+ continue
+ self.daemon_pipe.send(request)
-
- def process(self, request):
- if self.debug:
- print_error("<--", request)
- method = request['method']
- params = request['params']
- _id = request['id']
-
- if method == ('daemon.stop'):
- self.server.stop()
- return
-
- if method.startswith('network.'):
- out = {'id':_id}
+ def run(self):
+ self.running = True
+ threading.Thread(target=self.reading_thread).start()
+ while self.running:
try:
- f = getattr(self.network, method[8:])
- except AttributeError:
- out['error'] = "unknown method"
+ response = self.daemon_pipe.get()
+ except util.timeout:
+ continue
try:
- out['result'] = f(*params)
- except BaseException as e:
- out['error'] = str(e)
- print_error("network error", str(e))
-
- self.queue.put(out)
- return
-
- def cb(i,r):
- _id = r.get('id')
- if _id is not None:
- my_id = self.unanswered_requests.pop(_id)
- r['id'] = my_id
- self.queue.put(r)
-
- try:
- new_id = self.network.interface.send([(method, params)], cb) [0]
- except Exception as e:
- self.queue.put({'id':_id, 'error':str(e)})
- print_error("network interface error", str(e))
- return
-
- self.unanswered_requests[new_id] = _id
+ self.client_pipe.send(response)
+ except socket.error:
+ self.running = False
+ break
+ self.server.remove_client(self)
- def send_responses(self):
- while True:
- try:
- r = self.queue.get_nowait()
- except Queue.Empty:
- break
- out = json.dumps(r) + '\n'
- while out:
- n = self.s.send(out)
- out = out[n:]
- if self.debug:
- print_error("-->", r)
class NetworkServer:
def __init__(self, config):
+ self.config = config
self.network = Network(config)
- self.network.trigger_callback = self.trigger_callback
- self.network.start()
- self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- self.daemon_port = config.get('daemon_port', DAEMON_PORT)
- self.socket.bind(('', self.daemon_port))
- self.socket.listen(5)
- self.socket.settimeout(1)
+ # network sends responses on that queue
+ self.network_queue = Queue.Queue()
+ self.network.start(self.network_queue)
+
self.running = False
# daemon terminates after period of inactivity
self.timeout = config.get('daemon_timeout', 5*60)
@@ -165,16 +97,21 @@ class NetworkServer:
# each GUI is a client of the daemon
self.clients = []
- # daemon needs to know which client subscribed to which address
+ # todo: the daemon needs to know which client subscribed to which address
+
+ def is_running(self):
+ with self.lock:
+ return self.running
def stop(self):
+ self.network.stop()
with self.lock:
self.running = False
def add_client(self, client):
for key in ['status','banner','updated','servers','interfaces']:
- value = self.get_status_value(key)
- client.queue.put({'method':'network.status', 'params':[key, value]})
+ value = self.network.get_status_value(key)
+ client.daemon_pipe.get_queue.put({'method':'network.status', 'params':[key, value]})
with self.lock:
self.clients.append(client)
@@ -184,27 +121,28 @@ class NetworkServer:
self.clients.remove(client)
print_error("client quit:", len(self.clients))
- def get_status_value(self, key):
- if key == 'status':
- value = self.network.connection_status
- elif key == 'banner':
- value = self.network.banner
- elif key == 'updated':
- value = (self.network.get_local_height(), self.network.get_server_height())
- elif key == 'servers':
- value = self.network.get_servers()
- elif key == 'interfaces':
- value = self.network.get_interfaces()
- return value
-
- def trigger_callback(self, key):
- value = self.get_status_value(key)
- print_error("daemon trigger callback", key, len(self.clients))
- for client in self.clients:
- client.queue.put({'method':'network.status', 'params':[key, value]})
+
def main_loop(self):
self.running = True
+ threading.Thread(target=self.listen_thread).start()
+ while self.is_running():
+ try:
+ response = self.network_queue.get(timeout=0.1)
+ except Queue.Empty:
+ continue
+ for client in self.clients:
+ client.daemon_pipe.get_queue.put(response)
+
+ print_error("Daemon exiting")
+
+ def listen_thread(self):
+ self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ self.daemon_port = self.config.get('daemon_port', DAEMON_PORT)
+ self.socket.bind(('', self.daemon_port))
+ self.socket.listen(5)
+ self.socket.settimeout(1)
t = time.time()
while self.running:
try:
@@ -218,11 +156,10 @@ class NetworkServer:
t = time.time()
continue
t = time.time()
- client = ClientThread(self, self.network, connection)
+ client = ClientThread(self, connection)
client.start()
- print_error("Daemon exiting")
-
-
+ self.stop()
+ print_error("listen thread exiting")
if __name__ == '__main__':
diff --git a/lib/network.py b/lib/network.py
@@ -85,7 +85,6 @@ class Network(threading.Thread):
self.blockchain = Blockchain(self.config, self)
self.interfaces = {}
self.queue = Queue.Queue()
- self.callbacks = {}
self.protocol = self.config.get('protocol','s')
self.running = False
@@ -118,6 +117,9 @@ class Network(threading.Thread):
self.connection_status = 'connecting'
+ self.requests_queue = Queue.Queue()
+ self.unanswered_requests = {}
+
def get_server_height(self):
return self.heights.get(self.default_server,0)
@@ -162,21 +164,22 @@ class Network(threading.Thread):
else:
return False
-
- def register_callback(self, event, callback):
- with self.lock:
- if not self.callbacks.get(event):
- self.callbacks[event] = []
- self.callbacks[event].append(callback)
-
-
- def trigger_callback(self, event):
- # note: this method is overwritten by daemon
- with self.lock:
- callbacks = self.callbacks.get(event,[])[:]
- if callbacks:
- [callback() for callback in callbacks]
-
+ def get_status_value(self, key):
+ if key == 'status':
+ value = self.connection_status
+ elif key == 'banner':
+ value = self.banner
+ elif key == 'updated':
+ value = (self.get_local_height(), self.get_server_height())
+ elif key == 'servers':
+ value = self.get_servers()
+ elif key == 'interfaces':
+ value = self.get_interfaces()
+ return value
+
+ def trigger_callback(self, key):
+ value = self.get_status_value(key)
+ self.response_queue.put({'method':'network.status', 'params':[key, value]})
def random_server(self):
choice_list = []
@@ -234,9 +237,13 @@ class Network(threading.Thread):
for i in range(self.num_server):
self.start_random_interface()
- def start(self):
+ def start(self, response_queue):
+ self.running = True
+ self.response_queue = response_queue
self.start_interfaces()
threading.Thread.start(self)
+ threading.Thread(target=self.process_thread).start()
+ self.blockchain.start()
def set_parameters(self, host, port, protocol, proxy, auto_connect):
self.config.set_key('auto_cycle', auto_connect, True)
@@ -321,16 +328,55 @@ class Network(threading.Thread):
print_error( "Server is lagging", blockchain_height, self.get_server_height())
if self.config.get('auto_cycle'):
self.set_server(i.server)
-
self.trigger_callback('updated')
- def run(self):
- self.blockchain.start()
+ def process_thread(self):
+ while self.is_running():
+ try:
+ request = self.requests_queue.get(timeout=0.1)
+ except Queue.Empty:
+ continue
+ self.process(request)
- with self.lock:
- self.running = True
+ def process(self, request):
+ method = request['method']
+ params = request['params']
+ _id = request['id']
+
+ if method.startswith('network.'):
+ out = {'id':_id}
+ try:
+ f = getattr(self, method[8:])
+ except AttributeError:
+ out['error'] = "unknown method"
+ try:
+ out['result'] = f(*params)
+ except BaseException as e:
+ out['error'] = str(e)
+ print_error("network error", str(e))
+
+ self.response_queue.put(out)
+ return
+
+ def cb(i,r):
+ _id = r.get('id')
+ if _id is not None:
+ my_id = self.unanswered_requests.pop(_id)
+ r['id'] = my_id
+ self.response_queue.put(r)
+
+ try:
+ new_id = self.interface.send([(method, params)], cb) [0]
+ except Exception as e:
+ self.response_queue.put({'id':_id, 'error':str(e)})
+ print_error("network interface error", str(e))
+ return
+ self.unanswered_requests[new_id] = _id
+
+
+ def run(self):
while self.is_running():
try:
i = self.queue.get(timeout = 30 if self.interfaces else 3)
@@ -382,10 +428,8 @@ class Network(threading.Thread):
if self.server_is_lagging() and self.config.get('auto_cycle'):
print_error( "Server lagging, stopping interface")
self.stop_interface()
-
self.trigger_callback('updated')
-
def on_peers(self, i, r):
if not r: return
self.irc_servers = parse_servers(r.get('result'))
@@ -396,10 +440,12 @@ class Network(threading.Thread):
self.trigger_callback('banner')
def stop(self):
- with self.lock: self.running = False
+ with self.lock:
+ self.running = False
def is_running(self):
- with self.lock: return self.running
+ with self.lock:
+ return self.running
def synchronous_get(self, requests, timeout=100000000):
diff --git a/lib/network_proxy.py b/lib/network_proxy.py
@@ -24,25 +24,23 @@ import threading
import traceback
import json
import Queue
+
+import util
from network import Network
from util import print_error, print_stderr, parse_json
from simple_config import SimpleConfig
-
from daemon import NetworkServer, DAEMON_PORT
-
-
class NetworkProxy(threading.Thread):
def __init__(self, socket, config=None):
+
if config is None:
config = {} # Do not use mutables as default arguments!
threading.Thread.__init__(self)
self.config = SimpleConfig(config) if type(config) == type({}) else config
- self.socket = socket
- self.socket.settimeout(0.1)
self.message_id = 0
self.unanswered_requests = {}
self.subscriptions = {}
@@ -53,6 +51,14 @@ class NetworkProxy(threading.Thread):
self.running = True
self.daemon = True
+ if socket:
+ self.pipe = util.SocketPipe(socket)
+ self.network = None
+ else:
+ self.network = Network(config)
+ self.pipe = util.QueuePipe(send_queue=self.network.requests_queue)
+ self.network.start(self.pipe.get_queue)
+
# status variables
self.status = 'connecting'
self.servers = {}
@@ -65,35 +71,23 @@ class NetworkProxy(threading.Thread):
return self.running
def run(self):
- # read responses and trigger callbacks
- message = ''
while self.is_running():
try:
- data = self.socket.recv(1024)
- except socket.timeout:
+ response = self.pipe.get()
+ except util.timeout:
continue
- except:
- data = ''
- if not data:
+ if response is None:
break
- message += data
- while True:
- response, message = parse_json(message)
- if response is not None:
- self.process(response)
- else:
- break
- # fixme: server does not detect if we don't call shutdown
- self.socket.shutdown(2)
- self.socket.close()
+ self.process(response)
+
print_error("NetworkProxy thread terminating")
+ self.stop()
def process(self, response):
if self.debug:
print_error("<--", response)
if response.get('method') == 'network.status':
- #print_error("<--", response)
key, value = response.get('params')
if key == 'status':
self.status = value
@@ -109,48 +103,63 @@ class NetworkProxy(threading.Thread):
return
msg_id = response.get('id')
- with self.lock:
- method, params, callback = self.unanswered_requests.pop(msg_id)
-
result = response.get('result')
- callback(None, {'method':method, 'params':params, 'result':result, 'id':msg_id})
-
+ if msg_id is not None:
+ with self.lock:
+ method, params, callback = self.unanswered_requests.pop(msg_id)
+ else:
+ method = response.get('method')
+ params = response.get('params')
+ with self.lock:
+ for k,v in self.subscriptions.items():
+ if (method, params) in v:
+ callback = k
+ break
+ else:
+ print_error( "received unexpected notification", method, params)
+ return
- def subscribe(self, messages, callback):
- # detect if it is a subscription
- with self.lock:
- if self.subscriptions.get(callback) is None:
- self.subscriptions[callback] = []
- for message in messages:
- if message not in self.subscriptions[callback]:
- self.subscriptions[callback].append(message)
+ callback({'method':method, 'params':params, 'result':result, 'id':msg_id})
- self.send( messages, callback )
def send(self, messages, callback):
"""return the ids of the requests that we sent"""
+
+ # detect subscriptions
+ sub = []
+ for message in messages:
+ m, v = message
+ if m[-10:] == '.subscribe':
+ sub.append(message)
+ if sub:
+ with self.lock:
+ if self.subscriptions.get(callback) is None:
+ self.subscriptions[callback] = []
+ for message in sub:
+ if message not in self.subscriptions[callback]:
+ self.subscriptions[callback].append(message)
+
with self.lock:
- out = ''
+ requests = []
ids = []
for m in messages:
method, params = m
request = { 'id':self.message_id, 'method':method, 'params':params }
self.unanswered_requests[self.message_id] = method, params, callback
ids.append(self.message_id)
+ requests.append(request)
if self.debug:
print_error("-->", request)
self.message_id += 1
- out += json.dumps(request) + '\n'
- while out:
- sent = self.socket.send( out )
- out = out[sent:]
+
+ self.pipe.send_all(requests)
return ids
def synchronous_get(self, requests, timeout=100000000):
queue = Queue.Queue()
- ids = self.send(requests, lambda i,x: queue.put(x))
+ ids = self.send(requests, queue.put)
id2 = ids[:]
res = {}
while ids:
@@ -189,7 +198,7 @@ class NetworkProxy(threading.Thread):
return self.status == 'connecting'
def is_up_to_date(self):
- return self.synchronous_get([('network.is_up_to_date',[])])[0]
+ return self.unanswered_requests == {}
def get_parameters(self):
return self.synchronous_get([('network.get_parameters',[])])[0]
@@ -199,6 +208,8 @@ class NetworkProxy(threading.Thread):
def stop(self):
self.running = False
+ if self.network:
+ self.network.stop()
def stop_daemon(self):
return self.send([('daemon.stop',[])], None)
@@ -215,4 +226,3 @@ class NetworkProxy(threading.Thread):
if callbacks:
[callback() for callback in callbacks]
- print_error("trigger_callback", event, len(callbacks))
diff --git a/lib/synchronizer.py b/lib/synchronizer.py
@@ -54,7 +54,7 @@ class WalletSynchronizer(threading.Thread):
messages = []
for addr in addresses:
messages.append(('blockchain.address.subscribe', [addr]))
- self.network.subscribe( messages, lambda i,r: self.queue.put(r))
+ self.network.send(messages, lambda r: self.queue.put(r))
def run(self):
with self.lock:
diff --git a/lib/util.py b/lib/util.py
@@ -223,3 +223,86 @@ def parse_json(message):
except:
j = None
return j, message[n+1:]
+
+
+
+
+class timeout(Exception):
+ pass
+
+import socket, json
+
+class SocketPipe:
+
+ def __init__(self, socket):
+ self.socket = socket
+ self.message = ''
+ self.set_timeout(0.1)
+
+ def set_timeout(self, t):
+ self.socket.settimeout(t)
+
+ def get(self):
+ while True:
+ response, self.message = parse_json(self.message)
+ if response:
+ return response
+ try:
+ data = self.socket.recv(1024)
+ except socket.timeout:
+ raise timeout
+ except:
+ data = ''
+ if not data:
+ self.socket.close()
+ return None
+ self.message += data
+
+ def send(self, request):
+ out = json.dumps(request) + '\n'
+ while out:
+ sent = self.socket.send( out )
+ out = out[sent:]
+
+ def send_all(self, requests):
+ out = ''.join(map(lambda x: json.dumps(x) + '\n', requests))
+ while out:
+ sent = self.socket.send( out )
+ out = out[sent:]
+
+
+import Queue
+
+class QueuePipe:
+
+ def __init__(self, send_queue=None, get_queue=None):
+ self.send_queue = send_queue if send_queue else Queue.Queue()
+ self.get_queue = get_queue if get_queue else Queue.Queue()
+ self.set_timeout(0.1)
+
+ def get(self):
+ try:
+ return self.get_queue.get(timeout=self.timeout)
+ except Queue.Empty:
+ raise timeout
+
+ def get_all(self):
+ responses = []
+ while True:
+ try:
+ r = self.get_queue.get_nowait()
+ responses.append(r)
+ except Queue.Empty:
+ break
+ return responses
+
+ def set_timeout(self, t):
+ self.timeout = t
+
+ def send(self, request):
+ self.send_queue.put(request)
+
+ def send_all(self, requests):
+ for request in requests:
+ self.send(request)
+
diff --git a/lib/wallet.py b/lib/wallet.py
@@ -791,7 +791,7 @@ class Abstract_Wallet(object):
self.network.send([('blockchain.transaction.broadcast', [str(tx)])], self.on_broadcast)
return tx.hash()
- def on_broadcast(self, i, r):
+ def on_broadcast(self, r):
self.tx_result = r.get('result')
self.tx_event.set()
diff --git a/scripts/block_headers b/scripts/block_headers
@@ -4,17 +4,27 @@
import time, electrum
-# 1. start the interface and wait for connection
-interface = electrum.Interface('ecdsa.net:50002:s')
-interface.start(wait = True)
-if not interface.is_connected:
- print "not connected"
- exit()
+# start network
+network = electrum.NetworkProxy(False)
+network.start()
+
+# wait until connected
+while network.is_connecting():
+ time.sleep(0.1)
+
+if not network.is_connected():
+ print_msg("daemon is not connected")
+ sys.exit(1)
# 2. send the subscription
-callback = lambda _,result: electrum.print_json(result.get('result'))
-interface.send([('blockchain.headers.subscribe',[])], callback)
+callback = lambda result: electrum.print_json(result.get('result'))
+network.send([('blockchain.headers.subscribe',[])], callback)
# 3. wait for results
-while interface.is_connected:
- time.sleep(1)
+while network.is_connected():
+ try:
+ time.sleep(1)
+ except:
+ break
+
+network.stop()