commit 9ee0614edb3c53d482bb6e95ebcd8b7b5ed7d951
parent ab41c6f940490883a7a2d6922e50aae7bf2bac7e
Author: ThomasV <thomasv@gitorious>
Date: Thu, 24 Jul 2014 10:59:13 +0200
make daemon usable with the GUI
Diffstat:
10 files changed, 332 insertions(+), 258 deletions(-)
diff --git a/electrum b/electrum
@@ -69,6 +69,7 @@ def arg_parser():
parser.add_option("-g", "--gui", dest="gui", help="User interface: qt, lite, gtk, text or stdio")
parser.add_option("-w", "--wallet", dest="wallet_path", help="wallet path (default: electrum.dat)")
parser.add_option("-o", "--offline", action="store_true", dest="offline", default=False, help="remain offline")
+ parser.add_option("-d", "--daemon", action="store_true", dest="daemon", default=False, help="use daemon")
parser.add_option("-C", "--concealed", action="store_true", dest="concealed", default=False, help="don't echo seed to console when restoring")
parser.add_option("-a", "--all", action="store_true", dest="show_all", default=False, help="show all addresses")
parser.add_option("-l", "--labels", action="store_true", dest="show_labels", default=False, help="show the labels of listed addresses")
@@ -109,11 +110,9 @@ def run_command(cmd, password=None, args=None):
if args is None:
args = [] # Do not use mutables as default values!
if cmd.requires_network and not options.offline:
- network = NetworkProxy(config)
- if not network.start(start_daemon= (True if cmd.name!='daemon' else False)):
- print "Daemon not running"
- sys.exit(1)
-
+ s = daemon_socket()
+ network = NetworkProxy(s, config)
+ network.start()
if wallet:
wallet.start_threads(network)
wallet.update()
@@ -133,6 +132,7 @@ def run_command(cmd, password=None, args=None):
if cmd.requires_network and not options.offline:
if wallet:
wallet.stop_threads()
+ network.stop()
if type(result) == str:
@@ -142,6 +142,28 @@ def run_command(cmd, password=None, args=None):
+def daemon_socket(start_daemon=True):
+ import socket
+ from electrum.daemon import DAEMON_PORT
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ daemon_port = config.get('daemon_port', DAEMON_PORT)
+ daemon_started = False
+ while True:
+ try:
+ s.connect(('', daemon_port))
+ return s
+ except socket.error:
+ if not start_daemon:
+ return False
+ elif not daemon_started:
+ import subprocess
+ logfile = open(os.path.join(config.path, 'daemon.log'),'w')
+ p = subprocess.Popen([__file__,"daemon","start"], stderr=logfile, stdout=logfile)
+ print "starting daemon (PID %d)"%p.pid
+ daemon_started = True
+ else:
+ time.sleep(0.1)
+
@@ -193,8 +215,13 @@ if __name__ == '__main__':
# network interface
if not options.offline:
- network = Network(config)
- network.start()
+ if options.daemon:
+ s = daemon_socket()
+ network = NetworkProxy(s, config)
+ network.start()
+ else:
+ network = Network(config)
+ network.start()
else:
network = None
@@ -209,6 +236,38 @@ if __name__ == '__main__':
time.sleep(0.1)
sys.exit(0)
+ if cmd == 'daemon':
+ arg = args[1]
+ if arg=='start':
+ from electrum import daemon, util
+ util.set_verbosity(True)
+ print_stderr( "Starting daemon [%s]"%config.get('server'))
+ server = daemon.NetworkServer(config)
+ try:
+ server.main_loop()
+ except KeyboardInterrupt:
+ print_msg("Ctrl C - Stopping server")
+ sys.exit(0)
+ elif arg in ['status','stop']:
+ s = daemon_socket(start_daemon=False)
+ if not s:
+ print_msg("Daemon not running")
+ sys.exit(1)
+ network = NetworkProxy(s, config)
+ network.start()
+ if arg == 'status':
+ print_json({
+ 'server':network.main_server(),
+ 'connected':network.is_connected()
+ })
+ elif arg == 'stop':
+ network.stop_daemon()
+ network.stop()
+ else:
+ print "unknown command \"%s\""% arg
+ sys.exit(0)
+
+
if cmd not in known_commands:
cmd = 'help'
@@ -217,7 +276,6 @@ if __name__ == '__main__':
# instanciate wallet for command-line
storage = WalletStorage(config)
-
if cmd.name in ['create', 'restore']:
if storage.file_exists:
sys.exit("Error: Remove the existing wallet first!")
@@ -428,6 +486,5 @@ if __name__ == '__main__':
else:
run_command(cmd, password, args)
-
time.sleep(0.1)
sys.exit(0)
diff --git a/gui/qt/lite_window.py b/gui/qt/lite_window.py
@@ -811,9 +811,9 @@ class MiniDriver(QObject):
def update(self):
if not self.network:
self.initializing()
- elif not self.network.interface:
- self.initializing()
- elif not self.network.interface.is_connected:
+ #elif not self.network.interface:
+ # self.initializing()
+ elif not self.network.is_connected():
self.connecting()
if self.g.wallet is None:
diff --git a/gui/qt/main_window.py b/gui/qt/main_window.py
@@ -471,9 +471,9 @@ class ElectrumWindow(QMainWindow):
if not self.wallet.up_to_date:
text = _("Synchronizing...")
icon = QIcon(":icons/status_waiting.png")
- elif self.network.server_lag > 1:
- text = _("Server is lagging (%d blocks)"%self.network.server_lag)
- icon = QIcon(":icons/status_lagging.png")
+ #elif self.network.server_lag > 1:
+ # text = _("Server is lagging (%d blocks)"%self.network.server_lag)
+ # icon = QIcon(":icons/status_lagging.png")
else:
c, u = self.wallet.get_account_balance(self.current_account)
text = _( "Balance" ) + ": %s "%( self.format_amount(c) ) + self.base_unit()
diff --git a/lib/__init__.py b/lib/__init__.py
@@ -14,4 +14,5 @@ from plugins import BasePlugin
from mnemonic import mn_encode as mnemonic_encode
from mnemonic import mn_decode as mnemonic_decode
from commands import Commands, known_commands
-from daemon import NetworkProxy, NetworkServer
+from daemon import NetworkServer
+from network_proxy import NetworkProxy
diff --git a/lib/commands.py b/lib/commands.py
@@ -19,7 +19,7 @@
import datetime
import time
import copy
-from util import print_msg, format_satoshis
+from util import print_msg, format_satoshis, print_stderr
from bitcoin import is_valid, hash_160_to_bc_address, hash_160
from decimal import Decimal
import bitcoin
@@ -105,7 +105,6 @@ register_command('verifymessage', 3,-1, False, False, False, 'Verifies a
register_command('encrypt', 2,-1, False, False, False, 'encrypt a message with pubkey','encrypt <pubkey> <message>')
register_command('decrypt', 2,-1, False, True, True, 'decrypt a message encrypted with pubkey','decrypt <pubkey> <message>')
-register_command('daemon', 1, 1, True, False, False, '<stop|status>')
register_command('getproof', 1, 1, True, False, False, 'get merkle proof', 'getproof <address>')
register_command('getutxoaddress', 2, 2, True, False, False, 'get the address of an unspent transaction output','getutxoaddress <txid> <pos>')
register_command('sweep', 2, 3, True, False, False, 'Sweep a private key.', 'sweep privkey addr [fee]')
@@ -133,17 +132,6 @@ class Commands:
def getaddresshistory(self, addr):
return self.network.synchronous_get([ ('blockchain.address.get_history',[addr]) ])[0]
- def daemon(self, arg):
- if arg=='stop':
- return self.network.stop()
- elif arg=='status':
- return {
- 'server':self.network.main_server(),
- 'connected':self.network.is_connected()
- }
- else:
- return "unknown command \"%s\""% arg
-
def listunspent(self):
l = copy.deepcopy(self.wallet.get_unspent_coins())
for i in l: i["value"] = str(Decimal(i["value"])/100000000)
diff --git a/lib/daemon.py b/lib/daemon.py
@@ -25,182 +25,29 @@ import traceback
import json
import Queue
from network import Network
-from util import print_msg, print_stderr
+from util import print_error, print_stderr
from simple_config import SimpleConfig
-DAEMON_PORT=8001
-
-class NetworkProxy(threading.Thread):
- # connects to daemon
- # sends requests, runs callbacks
-
- def __init__(self, config=None):
- if config is None:
- config = {} # Do not use mutables as default arguments!
- threading.Thread.__init__(self)
- self.daemon = True
- self.config = SimpleConfig(config) if type(config) == type({}) else config
- self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- self.daemon_port = config.get('daemon_port', DAEMON_PORT)
- self.message_id = 0
- self.unanswered_requests = {}
- self.subscriptions = {}
- self.debug = False
- self.lock = threading.Lock()
- self.pending_transactions_for_notifications = []
-
-
- def start(self, start_daemon=False):
- daemon_started = False
- while True:
- try:
- self.socket.connect(('', self.daemon_port))
- threading.Thread.start(self)
- return True
-
- except socket.error:
- if not start_daemon:
- return False
-
- elif not daemon_started:
- print_stderr( "Starting daemon [%s]"%self.config.get('server'))
- daemon_started = True
- pid = os.fork()
- if (pid == 0): # The first child.
- os.chdir("/")
- os.setsid()
- os.umask(0)
- pid2 = os.fork()
- if (pid2 == 0): # Second child
- server = NetworkServer(self.config)
- try:
- server.main_loop()
- except KeyboardInterrupt:
- print "Ctrl C - Stopping server"
- sys.exit(1)
- sys.exit(0)
- else:
- time.sleep(0.1)
-
-
-
- def parse_json(self, message):
- s = message.find('\n')
- if s==-1:
- return None, message
- j = json.loads( message[0:s] )
- return j, message[s+1:]
-
-
- def run(self):
- # read responses and trigger callbacks
- message = ''
- while True:
- try:
- data = self.socket.recv(1024)
- except:
- data = ''
- if not data:
- break
-
- message += data
- while True:
- response, message = self.parse_json(message)
- if response is not None:
- self.process(response)
- else:
- break
-
- print "NetworkProxy: exiting"
-
- def process(self, response):
- # runs callbacks
- if self.debug: print "<--", response
-
- msg_id = response.get('id')
- with self.lock:
- method, params, callback = self.unanswered_requests.pop(msg_id)
-
- result = response.get('result')
- callback(None, {'method':method, 'params':params, 'result':result, 'id':msg_id})
-
-
- def subscribe(self, messages, callback):
- # detect if it is a subscription
- with self.lock:
- if self.subscriptions.get(callback) is None:
- self.subscriptions[callback] = []
- for message in messages:
- if message not in self.subscriptions[callback]:
- self.subscriptions[callback].append(message)
-
- self.send( messages, callback )
-
-
- def send(self, messages, callback):
- """return the ids of the requests that we sent"""
- out = ''
- ids = []
- for m in messages:
- method, params = m
- request = json.dumps( { 'id':self.message_id, 'method':method, 'params':params } )
- self.unanswered_requests[self.message_id] = method, params, callback
- ids.append(self.message_id)
- if self.debug: print "-->", request
- self.message_id += 1
- out += request + '\n'
- while out:
- sent = self.socket.send( out )
- out = out[sent:]
- return ids
-
-
- def synchronous_get(self, requests, timeout=100000000):
- queue = Queue.Queue()
- ids = self.send(requests, lambda i,x: queue.put(x))
- id2 = ids[:]
- res = {}
- while ids:
- r = queue.get(True, timeout)
- _id = r.get('id')
- if _id in ids:
- ids.remove(_id)
- res[_id] = r.get('result')
- out = []
- for _id in id2:
- out.append(res[_id])
- return out
-
-
- def get_servers(self):
- return self.synchronous_get([('network.get_servers',[])])[0]
-
- def get_header(self, height):
- return self.synchronous_get([('network.get_header',[height])])[0]
-
- def get_local_height(self):
- return self.synchronous_get([('network.get_local_height',[])])[0]
-
- def is_connected(self):
- return self.synchronous_get([('network.is_connected',[])])[0]
-
- def is_up_to_date(self):
- return self.synchronous_get([('network.is_up_to_date',[])])[0]
-
- def main_server(self):
- return self.synchronous_get([('network.main_server',[])])[0]
-
- def stop(self):
- return self.synchronous_get([('daemon.shutdown',[])])[0]
-
-
- def trigger_callback(self, cb):
- pass
+"""
+The Network object is not aware of clients/subscribers
+It only does subscribe/unsubscribe to addresses
+Which client has wich address is managed by the daemon
+Network also reports status changes
+"""
+DAEMON_PORT=8001
+def parse_json(message):
+ n = message.find('\n')
+ if n==-1:
+ return None, message
+ try:
+ j = json.loads( message[0:n] )
+ except:
+ j = None
+ return j, message[n+1:]
@@ -208,12 +55,11 @@ class ClientThread(threading.Thread):
# read messages from client (socket), and sends them to Network
# responses are sent back on the same socket
- def __init__(self, server, network, socket):
+ def __init__(self, server, network, s):
threading.Thread.__init__(self)
self.server = server
self.daemon = True
- self.s = socket
- self.s.settimeout(0.1)
+ self.s = s
self.network = network
self.queue = Queue.Queue()
self.unanswered_requests = {}
@@ -221,6 +67,7 @@ class ClientThread(threading.Thread):
def run(self):
+ self.server.add_client(self)
message = ''
while True:
self.send_responses()
@@ -228,30 +75,25 @@ class ClientThread(threading.Thread):
data = self.s.recv(1024)
except socket.timeout:
continue
-
+ except:
+ data = ''
if not data:
break
message += data
-
while True:
- cmd, message = self.parse_json(message)
+ cmd, message = parse_json(message)
if not cmd:
break
self.process(cmd)
- #print "client thread terminating"
+ self.server.remove_client(self)
- def parse_json(self, message):
- n = message.find('\n')
- if n==-1:
- return None, message
- j = json.loads( message[0:n] )
- return j, message[n+1:]
def process(self, request):
- if self.debug: print "<--", request
+ if self.debug:
+ print_error("<--", request)
method = request['method']
params = request['params']
_id = request['id']
@@ -269,11 +111,6 @@ class ClientThread(threading.Thread):
self.queue.put(out)
return
- if method == 'daemon.shutdown':
- self.server.running = False
- self.queue.put({'id':_id, 'result':True})
- return
-
def cb(i,r):
_id = r.get('id')
if _id is not None:
@@ -295,8 +132,8 @@ class ClientThread(threading.Thread):
while out:
n = self.s.send(out)
out = out[n:]
- if self.debug: print "-->", r
-
+ if self.debug:
+ print_error("-->", r)
@@ -304,39 +141,61 @@ class NetworkServer:
def __init__(self, config):
network = Network(config)
- if not network.start(wait=True):
- print_msg("Not connected, aborting.")
- sys.exit(1)
+ network.start(wait=False)
self.network = network
- self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.daemon_port = config.get('daemon_port', DAEMON_PORT)
- self.server.bind(('', self.daemon_port))
- self.server.listen(5)
- self.server.settimeout(1)
+ self.socket.bind(('', self.daemon_port))
+ self.socket.listen(5)
+ self.socket.settimeout(1)
self.running = False
self.timeout = config.get('daemon_timeout', 60)
+ #
+ self.lock = threading.RLock()
+ self.clients = []
+ # need to know which client subscribed to which address
+ #
+ # report status
+ self.network.status_callback = self.on_status
+
+ def add_client(self, client):
+ with self.lock:
+ self.clients.append(client)
+
+ def remove_client(self, client):
+ with self.lock:
+ self.clients.remove(client)
+ print_error("client quit:", len(self.clients))
+
+ def on_status(self, status):
+ for client in self.clients:
+ client.queue.put({'method':'network.subscribe', 'status':status})
def main_loop(self):
self.running = True
t = time.time()
while self.running:
try:
- connection, address = self.server.accept()
+ connection, address = self.socket.accept()
except socket.timeout:
- if time.time() - t > self.timeout:
- break
+ if not self.clients:
+ if time.time() - t > self.timeout:
+ break
+ else:
+ t = time.time()
continue
- t = time.time()
client = ClientThread(self, self.network, connection)
client.start()
+ print_error("daemon: timed out")
if __name__ == '__main__':
- import simple_config
- config = simple_config.SimpleConfig({'verbose':True, 'server':'ecdsa.net:50002:s'})
+ import simple_config, util
+ config = simple_config.SimpleConfig()
+ util.set_verbosity(True)
server = NetworkServer(config)
try:
server.main_loop()
diff --git a/lib/network.py b/lib/network.py
@@ -221,6 +221,7 @@ class Network(threading.Thread):
self.start_interfaces()
threading.Thread.start(self)
if wait:
+ raise
return self.wait_until_connected()
def wait_until_connected(self):
@@ -420,25 +421,3 @@ class Network(threading.Thread):
return self.blockchain.height()
-
- #def retrieve_transaction(self, tx_hash, tx_height=0):
- # import transaction
- # r = self.synchronous_get([ ('blockchain.transaction.get',[tx_hash, tx_height]) ])[0]
- # if r:
- # return transaction.Transaction(r)
-
-
-
-
-
-if __name__ == "__main__":
- network = NetworkProxy({})
- network.start()
- print network.get_servers()
-
- q = Queue.Queue()
- network.send([('blockchain.headers.subscribe',[])], q.put)
- while True:
- r = q.get(timeout=10000)
- print r
-
diff --git a/lib/network_proxy.py b/lib/network_proxy.py
@@ -0,0 +1,187 @@
+#!/usr/bin/env python
+#
+# Electrum - lightweight Bitcoin client
+# Copyright (C) 2014 Thomas Voegtlin
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import socket
+import time
+import sys
+import os
+import threading
+import traceback
+import json
+import Queue
+from network import Network
+from util import print_error, print_stderr
+from simple_config import SimpleConfig
+
+from daemon import parse_json, NetworkServer, DAEMON_PORT
+
+
+
+
+
+class NetworkProxy(threading.Thread):
+
+ def __init__(self, socket, config=None):
+ if config is None:
+ config = {} # Do not use mutables as default arguments!
+ threading.Thread.__init__(self)
+ self.config = SimpleConfig(config) if type(config) == type({}) else config
+ self.socket = socket
+ self.socket.settimeout(0.1)
+ self.message_id = 0
+ self.unanswered_requests = {}
+ self.subscriptions = {}
+ self.debug = False
+ self.lock = threading.Lock()
+ self.pending_transactions_for_notifications = []
+ self.banner = ''
+ self.callbacks = {}
+ self.running = True
+ self.daemon = True
+
+ def is_running(self):
+ return self.running
+
+ def run(self):
+ # read responses and trigger callbacks
+ message = ''
+ while self.is_running():
+ try:
+ data = self.socket.recv(1024)
+ except socket.timeout:
+ continue
+ except:
+ data = ''
+ if not data:
+ break
+ message += data
+ while True:
+ response, message = parse_json(message)
+ if response is not None:
+ self.process(response)
+ else:
+ break
+ # fixme: server does not detect if we don't call shutdown
+ self.socket.shutdown(2)
+ self.socket.close()
+ print_error("NetworkProxy thread terminating")
+
+ def process(self, response):
+ if self.debug:
+ print_error("<--", response)
+
+ if response.get('method') == 'network.subscribe':
+ status = response.get('status')
+ self.trigger_callback(status)
+ return
+
+ msg_id = response.get('id')
+ with self.lock:
+ method, params, callback = self.unanswered_requests.pop(msg_id)
+
+ result = response.get('result')
+ callback(None, {'method':method, 'params':params, 'result':result, 'id':msg_id})
+
+
+ def subscribe(self, messages, callback):
+ # detect if it is a subscription
+ with self.lock:
+ if self.subscriptions.get(callback) is None:
+ self.subscriptions[callback] = []
+ for message in messages:
+ if message not in self.subscriptions[callback]:
+ self.subscriptions[callback].append(message)
+
+ self.send( messages, callback )
+
+
+ def send(self, messages, callback):
+ """return the ids of the requests that we sent"""
+ with self.lock:
+ out = ''
+ ids = []
+ for m in messages:
+ method, params = m
+ request = json.dumps( { 'id':self.message_id, 'method':method, 'params':params } )
+ self.unanswered_requests[self.message_id] = method, params, callback
+ ids.append(self.message_id)
+ if self.debug:
+ print_error("-->", request)
+ self.message_id += 1
+ out += request + '\n'
+ while out:
+ sent = self.socket.send( out )
+ out = out[sent:]
+ return ids
+
+
+ def synchronous_get(self, requests, timeout=100000000):
+ queue = Queue.Queue()
+ ids = self.send(requests, lambda i,x: queue.put(x))
+ id2 = ids[:]
+ res = {}
+ while ids:
+ r = queue.get(True, timeout)
+ _id = r.get('id')
+ if _id in ids:
+ ids.remove(_id)
+ res[_id] = r.get('result')
+ else:
+ raise
+ out = []
+ for _id in id2:
+ out.append(res[_id])
+ return out
+
+
+ def get_servers(self):
+ return self.synchronous_get([('network.get_servers',[])])[0]
+
+ def get_header(self, height):
+ return self.synchronous_get([('network.get_header',[height])])[0]
+
+ def get_local_height(self):
+ return self.synchronous_get([('network.get_local_height',[])])[0]
+
+ def is_connected(self):
+ return self.synchronous_get([('network.is_connected',[])])[0]
+
+ def is_up_to_date(self):
+ return self.synchronous_get([('network.is_up_to_date',[])])[0]
+
+ def main_server(self):
+ return self.synchronous_get([('network.main_server',[])])[0]
+
+ def stop(self):
+ self.running = False
+
+
+ def register_callback(self, event, callback):
+ with self.lock:
+ if not self.callbacks.get(event):
+ self.callbacks[event] = []
+ self.callbacks[event].append(callback)
+
+
+ def trigger_callback(self, event):
+ with self.lock:
+ callbacks = self.callbacks.get(event,[])[:]
+ if callbacks:
+ [callback() for callback in callbacks]
+
+ print_error("trigger_callback", event, len(callbacks))
diff --git a/lib/synchronizer.py b/lib/synchronizer.py
@@ -56,8 +56,10 @@ class WalletSynchronizer(threading.Thread):
with self.lock:
self.running = True
while self.is_running():
- if not self.network.is_connected():
- self.network.wait_until_connected()
+ while not self.network.is_connected():
+ import time
+ time.sleep(5)
+ #self.network.wait_until_connected()
self.run_interface()
def run_interface(self):
diff --git a/setup.py b/setup.py
@@ -76,6 +76,7 @@ setup(
'electrum.mnemonic',
'electrum.msqr',
'electrum.network',
+ 'electrum.network_proxy',
'electrum.paymentrequest',
'electrum.paymentrequest_pb2',
'electrum.plugins',