commit 6e30894af6da909374cafc16223b99407c3850e5
parent 55e4aa2a4750b8600717becf49a2635867c5be8b
Author: ThomasV <thomasv@gitorious>
Date: Mon, 10 Mar 2014 16:16:27 +0100
daemon; initial commit
Diffstat:
5 files changed, 322 insertions(+), 64 deletions(-)
diff --git a/electrum b/electrum
@@ -114,15 +114,16 @@ def run_command(cmd, password=None, args=[]):
cmd_runner.password = password
if cmd.requires_network and not options.offline:
- cmd_runner.network = xmlrpclib.ServerProxy('http://localhost:8000')
while True:
try:
- if cmd_runner.network.ping() == 'pong':
- break
+ cmd_runner.network = NetworkProxy(config)
+ cmd_runner.network.start()
+ break
except socket.error:
if cmd.name != 'daemon':
- start_daemon()
+ print "starting daemon"
+ start_daemon(config)
else:
print "Daemon not running"
sys.exit(1)
@@ -133,6 +134,7 @@ def run_command(cmd, password=None, args=[]):
else:
cmd_runner.network = None
+
try:
result = func(*args[1:])
except Exception:
@@ -153,50 +155,7 @@ def run_command(cmd, password=None, args=[]):
-def start_server():
- network = Network(config)
- if not network.start(wait=True):
- print_msg("Not connected, aborting.")
- sys.exit(1)
- print_msg("Network daemon connected to " + network.interface.connection_msg)
- from SimpleXMLRPCServer import SimpleXMLRPCServer
- server = SimpleXMLRPCServer(('localhost',8000), allow_none=True, logRequests=False)
- server.network = network
- server.register_function(lambda: 'pong', 'ping')
- server.register_function(network.synchronous_get, 'synchronous_get')
- server.register_function(network.get_servers, 'get_servers')
- server.register_function(network.main_server, 'main_server')
- server.register_function(network.send, 'send')
- server.register_function(network.subscribe, 'subscribe')
- server.register_function(network.is_connected, 'is_connected')
- server.register_function(network.is_up_to_date, 'is_up_to_date')
- server.register_function(lambda: setattr(server,'running', False), 'stop')
- return server
-
-def start_daemon():
- pid = os.fork()
- if (pid == 0): # The first child.
- os.chdir("/")
- os.setsid()
- os.umask(0)
- pid2 = os.fork()
- if (pid2 == 0): # Second child
- server = start_server()
- server.running = True
- timeout = 60
- t0 = time.time()
- server.socket.settimeout(timeout)
- while server.running:
- server.handle_request()
- t = time.time()
- if t - t0 > 0.9*timeout:
- break
- if not server.network.is_connected():
- break
- t0 = t
- sys.exit(0)
- time.sleep(2)
if __name__ == '__main__':
diff --git a/lib/__init__.py b/lib/__init__.py
@@ -14,3 +14,4 @@ from plugins import BasePlugin
from mnemonic import mn_encode as mnemonic_encode
from mnemonic import mn_decode as mnemonic_decode
from commands import Commands, known_commands
+from daemon import start_daemon, NetworkProxy
diff --git a/lib/daemon.py b/lib/daemon.py
@@ -0,0 +1,307 @@
+#!/usr/bin/env python
+#
+# Electrum - lightweight Bitcoin client
+# Copyright (C) 2014 Thomas Voegtlin
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import socket
+import select
+import time
+import sys
+import os
+import threading
+import traceback
+import json
+import Queue
+from network import Network
+
+
+
+class NetworkProxy(threading.Thread):
+ # connects to daemon
+ # sends requests, runs callbacks
+
+ def __init__(self, config):
+ threading.Thread.__init__(self)
+ self.daemon = True
+ self.config = config
+ self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ self.socket.connect(('', 8000))
+ self.message_id = 0
+ self.unanswered_requests = {}
+ self.subscriptions = {}
+ self.debug = True
+ self.lock = threading.Lock()
+
+
+ def parse_json(self, message):
+ s = message.find('\n')
+ if s==-1:
+ return None, message
+ j = json.loads( message[0:s] )
+ return j, message[s+1:]
+
+
+ def run(self):
+ # read responses and trigger callbacks
+ message = ''
+ while True:
+ try:
+ data = self.socket.recv(1024)
+ except:
+ data = ''
+ if not data:
+ break
+
+ message += data
+ while True:
+ response, message = self.parse_json(message)
+ if response is not None:
+ self.process(response)
+ else:
+ break
+
+ print "NetworkProxy: exiting"
+
+
+ def process(self, response):
+ # runs callbacks
+ #print "<--", response
+
+ msg_id = response.get('id')
+ with self.lock:
+ method, params, callback = self.unanswered_requests.pop(msg_id)
+
+ result = response.get('result')
+ callback({'method':method, 'params':params, 'result':result, 'id':msg_id})
+
+
+ def send(self, messages, callback):
+ # detect if it is a subscription
+ with self.lock:
+ if self.subscriptions.get(callback) is None:
+ self.subscriptions[callback] = []
+ for message in messages:
+ if message not in self.subscriptions[callback]:
+ self.subscriptions[callback].append(message)
+
+ self.do_send( messages, callback )
+
+
+ def do_send(self, messages, callback):
+ """return the ids of the requests that we sent"""
+ out = ''
+ ids = []
+ for m in messages:
+ method, params = m
+ request = json.dumps( { 'id':self.message_id, 'method':method, 'params':params } )
+ self.unanswered_requests[self.message_id] = method, params, callback
+ ids.append(self.message_id)
+ # print "-->", request
+ self.message_id += 1
+ out += request + '\n'
+ while out:
+ sent = self.socket.send( out )
+ out = out[sent:]
+ return ids
+
+
+ def synchronous_get(self, requests, timeout=100000000):
+ queue = Queue.Queue()
+ ids = self.do_send(requests, queue.put)
+ id2 = ids[:]
+ res = {}
+ while ids:
+ r = queue.get(True, timeout)
+ _id = r.get('id')
+ if _id in ids:
+ ids.remove(_id)
+ res[_id] = r.get('result')
+ out = []
+ for _id in id2:
+ out.append(res[_id])
+ return out
+
+
+ def get_servers(self):
+ return self.synchronous_get([('network.getservers',[])])[0]
+
+ def stop(self):
+ return self.synchronous_get([('network.shutdown',[])])[0]
+
+
+
+
+
+
+class ClientThread(threading.Thread):
+ # read messages from client (socket), and sends them to Network
+ # responses are sent back on the same socket
+
+ def __init__(self, server, network, socket):
+ threading.Thread.__init__(self)
+ self.server = server
+ self.daemon = True
+ self.s = socket
+ self.s.settimeout(0.1)
+ self.network = network
+ self.queue = Queue.Queue()
+ self.unanswered_requests = {}
+
+
+ def run(self):
+ message = ''
+ while True:
+ self.send_responses()
+ try:
+ data = self.s.recv(1024)
+ except socket.timeout:
+ continue
+
+ if not data:
+ break
+ message += data
+
+ while True:
+ cmd, message = self.parse_json(message)
+ if not cmd:
+ break
+ self.process(cmd)
+
+ #print "client thread terminating"
+
+
+ def parse_json(self, message):
+ n = message.find('\n')
+ if n==-1:
+ return None, message
+ j = json.loads( message[0:n] )
+ return j, message[n+1:]
+
+
+ def process(self, request):
+ #print "<--", request
+ method = request['method']
+ params = request['params']
+ _id = request['id']
+
+ if method.startswith('network.'):
+ if method == 'network.shutdown':
+ self.server.running = False
+ r = {'id':_id, 'result':True}
+ elif method == 'network.getservers':
+ servers = self.network.get_servers()
+ r = {'id':_id, 'result':servers}
+ else:
+ r = {'id':_id, 'error':'unknown method'}
+ self.queue.put(r)
+ return
+
+ def cb(i,r):
+ _id = r.get('id')
+ if _id is not None:
+ my_id = self.unanswered_requests.pop(_id)
+ r['id'] = my_id
+ self.queue.put(r)
+
+ new_id = self.network.interface.send([(method, params)], cb) [0]
+ self.unanswered_requests[new_id] = _id
+
+
+ def send_responses(self):
+ while True:
+ try:
+ r = self.queue.get_nowait()
+ except Queue.Empty:
+ break
+ out = json.dumps(r) + '\n'
+ while out:
+ n = self.s.send(out)
+ out = out[n:]
+ #print "-->", r
+
+
+#Server:
+# start network() object
+# accept connections, forward requests
+
+
+class NetworkServer:
+
+ def __init__(self, config):
+ network = Network(config)
+ if not network.start(wait=True):
+ print_msg("Not connected, aborting.")
+ sys.exit(1)
+ self.network = network
+ self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ self.server.bind(('', 8000))
+ self.server.listen(5)
+ self.server.settimeout(1)
+ self.running = False
+ self.timeout = 60
+
+
+ def main_loop(self):
+ self.running = True
+ t = time.time()
+ while self.running:
+ try:
+ connection, address = self.server.accept()
+ except socket.timeout:
+ if time.time() - t > self.timeout:
+ break
+ continue
+ t = time.time()
+ client = ClientThread(self, self.network, connection)
+ client.start()
+ #print "Done."
+
+
+
+
+def start_daemon(config):
+ pid = os.fork()
+ if (pid == 0): # The first child.
+ os.chdir("/")
+ os.setsid()
+ os.umask(0)
+ pid2 = os.fork()
+ if (pid2 == 0): # Second child
+ server = NetworkServer(config)
+ try:
+ server.main_loop()
+ except KeyboardInterrupt:
+ print "Ctrl C - Stopping server"
+ sys.exit(1)
+
+ sys.exit(0)
+
+ # should use a signal
+ time.sleep(2)
+
+
+
+if __name__ == '__main__':
+ import simple_config
+ config = simple_config.SimpleConfig({'verbose':True, 'server':'ecdsa.net:50002:s'})
+ server = NetworkServer(config)
+ try:
+ server.main_loop()
+ except KeyboardInterrupt:
+ print "Ctrl C - Stopping server"
+ sys.exit(1)
diff --git a/lib/network.py b/lib/network.py
@@ -413,24 +413,14 @@ class Network(threading.Thread):
-class NetworkProxy:
- # interface to the network object.
- # handle subscriptions and callbacks
- # the network object can be jsonrpc server
- def __init__(self, network):
- self.network = network
-
-
-
-
if __name__ == "__main__":
- import simple_config
- config = simple_config.SimpleConfig({'verbose':True, 'server':'ecdsa.org:50002:s'})
- network = Network(config)
+ network = NetworkProxy({})
network.start()
+ print network.get_servers()
- while 1:
- time.sleep(1)
-
-
+ q = Queue.Queue()
+ network.send([('blockchain.headers.subscribe',[])], q.put)
+ while True:
+ r = q.get(timeout=10000)
+ print r
diff --git a/setup.py b/setup.py
@@ -71,6 +71,7 @@ setup(
'electrum.blockchain',
'electrum.bmp',
'electrum.commands',
+ 'electrum.daemon',
'electrum.i18n',
'electrum.interface',
'electrum.mnemonic',