commit 72688a5cfa25066d1fd6eee646b874725359ff04
parent 58f9ab3492dfb9f7401f92db6b75f97819e1e5d8
Author: ThomasV <thomasv@gitorious>
Date: Fri, 13 Mar 2015 23:04:29 +0100
clean implementation of daemon threads
Diffstat:
8 files changed, 53 insertions(+), 111 deletions(-)
diff --git a/lib/blockchain.py b/lib/blockchain.py
@@ -19,74 +19,53 @@
import threading, time, Queue, os, sys, shutil
from util import user_dir, print_error
+import util
from bitcoin import *
-class Blockchain(threading.Thread):
+class Blockchain(util.DaemonThread):
def __init__(self, config, network):
- threading.Thread.__init__(self)
- self.daemon = True
+ util.DaemonThread.__init__(self)
self.config = config
self.network = network
self.lock = threading.Lock()
self.local_height = 0
- self.running = False
self.headers_url = 'http://headers.electrum.org/blockchain_headers'
self.set_local_height()
self.queue = Queue.Queue()
-
def height(self):
return self.local_height
-
- def stop(self):
- with self.lock: self.running = False
-
-
- def is_running(self):
- with self.lock: return self.running
-
-
def run(self):
self.init_headers_file()
self.set_local_height()
print_error( "blocks:", self.local_height )
- with self.lock:
- self.running = True
-
while self.is_running():
-
try:
- result = self.queue.get()
+ result = self.queue.get(timeout=0.1)
except Queue.Empty:
continue
-
- if not result: continue
-
+ if not result:
+ continue
i, header = result
- if not header: continue
-
+ if not header:
+ continue
height = header.get('block_height')
-
if height <= self.local_height:
continue
-
if height > self.local_height + 50:
if not self.get_and_verify_chunks(i, header, height):
continue
-
if height > self.local_height:
# get missing parts from interface (until it connects to my chain)
chain = self.get_chain( i, header )
-
# skip that server if the result is not consistent
if not chain:
print_error('e')
continue
-
# verify the chain
if self.verify_chain( chain ):
print_error("height:", height, i.server)
@@ -96,13 +75,9 @@ class Blockchain(threading.Thread):
print_error("error", i.server)
# todo: dismiss that server
continue
-
-
self.network.new_blockchain_height(height, i)
-
-
def verify_chain(self, chain):
first_header = chain[0]
diff --git a/lib/daemon.py b/lib/daemon.py
@@ -66,18 +66,17 @@ def get_daemon(config, start_daemon=True):
-class ClientThread(threading.Thread):
+class ClientThread(util.DaemonThread):
def __init__(self, server, s):
- threading.Thread.__init__(self)
+ util.DaemonThread.__init__(self)
self.server = server
- self.daemon = True
self.client_pipe = util.SocketPipe(s)
self.response_queue = Queue.Queue()
self.server.add_client(self)
def reading_thread(self):
- while self.running:
+ while self.is_running():
try:
request = self.client_pipe.get()
except util.timeout:
@@ -91,9 +90,8 @@ class ClientThread(threading.Thread):
self.server.send_request(self, request)
def run(self):
- self.running = True
threading.Thread(target=self.reading_thread).start()
- while self.running:
+ while self.is_running():
try:
response = self.response_queue.get(timeout=0.1)
except Queue.Empty:
@@ -109,11 +107,10 @@ class ClientThread(threading.Thread):
-class NetworkServer(threading.Thread):
+class NetworkServer(util.DaemonThread):
def __init__(self, config):
- threading.Thread.__init__(self)
- self.daemon = True
+ util.DaemonThread.__init__(self)
self.debug = False
self.config = config
self.network = Network(config)
@@ -128,18 +125,6 @@ class NetworkServer(threading.Thread):
self.request_id = 0
self.requests = {}
- def is_running(self):
- with self.lock:
- return self.running
-
- def stop(self):
- with self.lock:
- self.running = False
-
- def start(self):
- self.running = True
- threading.Thread.start(self)
-
def add_client(self, client):
for key in ['status','banner','updated','servers','interfaces']:
value = self.network.get_status_value(key)
diff --git a/lib/network.py b/lib/network.py
@@ -120,20 +120,18 @@ def serialize_server(host, port, protocol):
return str(':'.join([host, port, protocol]))
-class Network(threading.Thread):
+class Network(util.DaemonThread):
def __init__(self, config=None):
if config is None:
config = {} # Do not use mutables as default values!
- threading.Thread.__init__(self)
- self.daemon = True
+ util.DaemonThread.__init__(self)
self.config = SimpleConfig(config) if type(config) == type({}) else config
self.lock = threading.Lock()
self.num_server = 8 if not self.config.get('oneserver') else 0
self.blockchain = Blockchain(self.config, self)
self.interfaces = {}
self.queue = Queue.Queue()
- self.running = False
# Server for addresses and transactions
self.default_server = self.config.get('server')
# Sanitize default server
@@ -270,10 +268,9 @@ class Network(threading.Thread):
self.response_queue = response_queue
self.start_interfaces()
t = threading.Thread(target=self.process_requests_thread)
- t.daemon = True
t.start()
self.blockchain.start()
- threading.Thread.start(self)
+ util.DaemonThread.start(self)
def set_proxy(self, proxy):
self.proxy = proxy
@@ -540,15 +537,6 @@ class Network(threading.Thread):
self.addresses[addr] = result
self.response_queue.put(r)
- def stop(self):
- self.print_error("stopping network")
- with self.lock:
- self.running = False
-
- def is_running(self):
- with self.lock:
- return self.running
-
def get_header(self, tx_height):
return self.blockchain.read_header(tx_height)
diff --git a/lib/network_proxy.py b/lib/network_proxy.py
@@ -33,13 +33,13 @@ from daemon import NetworkServer
-class NetworkProxy(threading.Thread):
+class NetworkProxy(util.DaemonThread):
def __init__(self, socket, config=None):
if config is None:
config = {} # Do not use mutables as default arguments!
- threading.Thread.__init__(self)
+ util.DaemonThread.__init__(self)
self.config = SimpleConfig(config) if type(config) == type({}) else config
self.message_id = 0
self.unanswered_requests = {}
@@ -48,8 +48,6 @@ class NetworkProxy(threading.Thread):
self.lock = threading.Lock()
self.pending_transactions_for_notifications = []
self.callbacks = {}
- self.running = True
- self.daemon = True
if socket:
self.pipe = util.SocketPipe(socket)
@@ -70,8 +68,6 @@ class NetworkProxy(threading.Thread):
self.server_height = 0
self.interfaces = []
- def is_running(self):
- return self.running
def run(self):
while self.is_running():
@@ -213,9 +209,6 @@ class NetworkProxy(threading.Thread):
def set_parameters(self, *args):
return self.synchronous_get([('network.set_parameters', args)])[0]
- def stop(self):
- self.running = False
-
def stop_daemon(self):
return self.send([('daemon.stop',[])], None)
diff --git a/lib/synchronizer.py b/lib/synchronizer.py
@@ -22,31 +22,22 @@ import time
import Queue
import bitcoin
+import util
from util import print_error
from transaction import Transaction
-class WalletSynchronizer(threading.Thread):
+class WalletSynchronizer(util.DaemonThread):
def __init__(self, wallet, network):
- threading.Thread.__init__(self)
- self.daemon = True
+ util.DaemonThread.__init__(self)
self.wallet = wallet
self.network = network
self.was_updated = True
- self.running = False
self.lock = threading.Lock()
self.queue = Queue.Queue()
self.address_queue = Queue.Queue()
- def stop(self):
- with self.lock:
- self.running = False
-
- def is_running(self):
- with self.lock:
- return self.running
-
def add(self, address):
self.address_queue.put(address)
@@ -57,8 +48,6 @@ class WalletSynchronizer(threading.Thread):
self.network.send(messages, self.queue.put)
def run(self):
- with self.lock:
- self.running = True
while self.is_running():
while not self.network.is_connected():
time.sleep(0.1)
diff --git a/lib/util.py b/lib/util.py
@@ -4,6 +4,7 @@ import shutil
from datetime import datetime
import urlparse
import urllib
+import threading
class NotEnoughFunds(Exception): pass
@@ -20,6 +21,30 @@ class MyEncoder(json.JSONEncoder):
return super(MyEncoder, self).default(obj)
+class DaemonThread(threading.Thread):
+ """ daemon thread that terminates cleanly """
+
+ def __init__(self):
+ threading.Thread.__init__(self)
+ self.parent_thread = threading.currentThread()
+ self.running = False
+ self.running_lock = threading.Lock()
+
+ def start(self):
+ with self.running_lock:
+ self.running = True
+ return threading.Thread.start(self)
+
+ def is_running(self):
+ with self.running_lock:
+ return self.running and self.parent_thread.is_alive()
+
+ def stop(self):
+ with self.running_lock:
+ self.running = False
+
+
+
is_verbose = False
def set_verbosity(b):
global is_verbose
diff --git a/lib/verifier.py b/lib/verifier.py
@@ -18,28 +18,27 @@
import threading, time, Queue, os, sys, shutil
+
+import util
from util import user_dir, print_error
from bitcoin import *
-class TxVerifier(threading.Thread):
+class TxVerifier(util.DaemonThread):
""" Simple Payment Verification """
def __init__(self, network, storage):
- threading.Thread.__init__(self)
- self.daemon = True
+ util.DaemonThread.__init__(self)
self.storage = storage
self.network = network
self.transactions = {} # requested verifications (with height sent by the requestor)
self.verified_tx = storage.get('verified_tx3',{}) # height, timestamp of verified transactions
self.merkle_roots = storage.get('merkle_roots',{}) # hashed by me
self.lock = threading.Lock()
- self.running = False
self.queue = Queue.Queue()
-
def get_confirmations(self, tx):
""" return the number of confirmations of a monitored transaction. """
with self.lock:
@@ -47,11 +46,9 @@ class TxVerifier(threading.Thread):
height, timestamp, pos = self.verified_tx[tx]
conf = (self.network.get_local_height() - height + 1)
if conf <= 0: timestamp = None
-
elif tx in self.transactions:
conf = -1
timestamp = None
-
else:
conf = 0
timestamp = None
@@ -87,17 +84,8 @@ class TxVerifier(threading.Thread):
if tx_hash not in self.transactions.keys():
self.transactions[tx_hash] = tx_height
- def stop(self):
- with self.lock: self.running = False
-
- def is_running(self):
- with self.lock: return self.running
-
def run(self):
- with self.lock:
- self.running = True
requested_merkle = []
-
while self.is_running():
# request missing tx
for tx_hash, tx_height in self.transactions.items():
diff --git a/lib/wallet.py b/lib/wallet.py
@@ -134,9 +134,8 @@ class WalletStorage(object):
def write(self):
s = json.dumps(self.data, indent=4, sort_keys=True)
- f = open(self.path,"w")
- f.write(s)
- f.close()
+ with open(self.path,"w") as f:
+ f.write(s)
if 'ANDROID_DATA' not in os.environ:
import stat
os.chmod(self.path,stat.S_IREAD | stat.S_IWRITE)