commit b64c42b1eb48789146afc8218a26b2d341205e25
parent 01491dd1d07f35508973e36f43d228e835468fea
Author: Neil Booth <kyuupichan@gmail.com>
Date: Wed, 26 Aug 2015 17:07:36 +0900
Make the verifier a thread job instead of a thread
The verifier, like the synchronizer, now runs as part of the
network proxy thread.
Diffstat:
3 files changed, 40 insertions(+), 55 deletions(-)
diff --git a/lib/network_proxy.py b/lib/network_proxy.py
@@ -66,7 +66,7 @@ class NetworkProxy(util.DaemonThread):
def run(self):
while self.is_running():
- self.run_jobs() # Synchronizer, for now
+ self.run_jobs() # Synchronizer and Verifier
try:
response = self.pipe.get()
except util.timeout:
@@ -185,9 +185,6 @@ class NetworkProxy(util.DaemonThread):
def get_interfaces(self):
return self.interfaces
- def get_header(self, height):
- return self.synchronous_get([('network.get_header', [height])])[0]
-
def get_local_height(self):
return self.blockchain_height
diff --git a/lib/verifier.py b/lib/verifier.py
@@ -17,63 +17,52 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import threading
-import Queue
-
-
-import util
+from util import ThreadJob
+from functools import partial
from bitcoin import *
-class SPV(util.DaemonThread):
+class SPV(ThreadJob):
""" Simple Payment Verification """
def __init__(self, network, wallet):
- util.DaemonThread.__init__(self)
self.wallet = wallet
self.network = network
self.merkle_roots = {} # hashed by me
- self.queue = Queue.Queue()
+ self.requested_merkle = set()
def run(self):
- requested_merkle = set()
- while self.is_running():
- unverified = self.wallet.get_unverified_txs()
- for (tx_hash, tx_height) in unverified:
- if tx_hash not in self.merkle_roots and tx_hash not in requested_merkle:
- if self.network.send([ ('blockchain.transaction.get_merkle',[tx_hash, tx_height]) ], self.queue.put):
- self.print_error('requested merkle', tx_hash)
- requested_merkle.add(tx_hash)
- try:
- r = self.queue.get(timeout=0.1)
- except Queue.Empty:
- continue
- if not r:
- continue
-
- if r.get('error'):
- self.print_error('Verifier received an error:', r)
- continue
-
- # 3. handle response
- method = r['method']
- params = r['params']
- result = r['result']
-
- if method == 'blockchain.transaction.get_merkle':
- tx_hash = params[0]
- self.verify_merkle(tx_hash, result)
-
- self.print_error("stopped")
-
+ unverified = self.wallet.get_unverified_txs()
+ for (tx_hash, tx_height) in unverified:
+ if tx_hash not in self.merkle_roots and tx_hash not in self.requested_merkle:
+ request = ('blockchain.transaction.get_merkle',
+ [tx_hash, tx_height])
+ if self.network.send([request], self.merkle_response):
+ self.print_error('requested merkle', tx_hash)
+ self.requested_merkle.add(tx_hash)
+
+ def merkle_response(self, r):
+ if r.get('error'):
+ self.print_error('received an error:', r)
+ return
- def verify_merkle(self, tx_hash, result):
- tx_height = result.get('block_height')
- pos = result.get('pos')
- merkle_root = self.hash_merkle_root(result['merkle'], tx_hash, pos)
- header = self.network.get_header(tx_height)
- if not header: return
- if header.get('merkle_root') != merkle_root:
+ params = r['params']
+ result = r['result']
+
+ # Get the header asynchronously - as a thread job we cannot block
+ tx_hash = params[0]
+ request = ('network.get_header',[result.get('block_height')])
+ self.network.send([request], partial(self.verify, tx_hash, result))
+
+ def verify(self, tx_hash, merkle, header):
+ '''Verify the hash of the server-provided merkle branch to a
+ transaction matches the merkle root of its block
+ '''
+ tx_height = merkle.get('block_height')
+ pos = merkle.get('pos')
+ merkle_root = self.hash_merkle_root(merkle['merkle'], tx_hash, pos)
+ header = header.get('result')
+ if not header or header.get('merkle_root') != merkle_root:
self.print_error("merkle verification failed for", tx_hash)
return
diff --git a/lib/wallet.py b/lib/wallet.py
@@ -1086,9 +1086,7 @@ class Abstract_Wallet(object):
return True
return False
- def set_verifier(self, verifier):
- self.verifier = verifier
-
+ def prepare_for_verifier(self):
# review transactions that are in the history
for addr, hist in self.history.items():
for tx_hash, tx_height in hist:
@@ -1107,9 +1105,9 @@ class Abstract_Wallet(object):
from verifier import SPV
self.network = network
if self.network is not None:
+ self.prepare_for_verifier()
self.verifier = SPV(self.network, self)
- self.verifier.start()
- self.set_verifier(self.verifier)
+ network.add_job(self.verifier)
self.synchronizer = Synchronizer(self, network)
network.add_job(self.synchronizer)
else:
@@ -1118,9 +1116,10 @@ class Abstract_Wallet(object):
def stop_threads(self):
if self.network:
- self.verifier.stop()
self.network.remove_job(self.synchronizer)
+ self.network.remove_job(self.verifier)
self.synchronizer = None
+ self.verifier = None
self.storage.put('stored_height', self.get_local_height(), True)
def restore(self, cb):