commit 2da0c0b77e109ba48321519870e78e223f291403
parent 1212982fec1b93332a736447cb6092472af945d2
Author: ThomasV <thomasv@gitorious>
Date: Sun, 21 Oct 2012 02:57:31 +0200
big refactoring of the interface
addition of the wallet verifier class for SPV
Diffstat:
9 files changed, 411 insertions(+), 245 deletions(-)
diff --git a/electrum b/electrum
@@ -36,9 +36,9 @@ except ImportError:
sys.exit("Error: AES does not seem to be installed. Try 'sudo pip install slowaes'")
try:
- from lib import Wallet, WalletSynchronizer, format_satoshis, mnemonic, SimpleConfig, pick_random_server
+ from lib import Wallet, Interface, WalletSynchronizer, WalletVerifier, format_satoshis, mnemonic, SimpleConfig, pick_random_server
except ImportError:
- from electrum import Wallet, WalletSynchronizer, format_satoshis, mnemonic, SimpleConfig, pick_random_server
+ from electrum import Wallet, Interface, WalletSynchronizer, WalletVerifier, format_satoshis, mnemonic, SimpleConfig, pick_random_server
from decimal import Decimal
@@ -185,8 +185,11 @@ if __name__ == '__main__':
sys.exit("Error: Unknown GUI: " + pref_gui )
gui = gui.ElectrumGui(wallet, config)
- interface = WalletSynchronizer(wallet, config, True, gui.server_list_changed)
- interface.start()
+ wallet.interface = Interface(config, True, gui.server_list_changed)
+ wallet.interface.start()
+
+ WalletSynchronizer(wallet, config).start()
+ WalletVerifier(wallet, config).start()
try:
found = config.wallet_file_exists
diff --git a/lib/__init__.py b/lib/__init__.py
@@ -1,4 +1,5 @@
-from wallet import Wallet, format_satoshis
-from interface import WalletSynchronizer, Interface, pick_random_server, DEFAULT_SERVERS
+from util import format_satoshis
+from wallet import Wallet, WalletSynchronizer, WalletVerifier
+from interface import Interface, pick_random_server, DEFAULT_SERVERS
from simple_config import SimpleConfig
import bitcoin
diff --git a/lib/gui_lite.py b/lib/gui_lite.py
@@ -800,7 +800,7 @@ class MiniDriver(QObject):
self.wallet = wallet
self.window = window
- self.wallet.register_callback(self.update_callback)
+ self.wallet.interface.register_callback(self.update_callback)
self.state = None
diff --git a/lib/gui_qt.py b/lib/gui_qt.py
@@ -207,7 +207,7 @@ class ElectrumWindow(QMainWindow):
QMainWindow.__init__(self)
self.wallet = wallet
self.config = config
- self.wallet.register_callback(self.update_callback)
+ self.wallet.interface.register_callback(self.update_callback)
self.detailed_view = config.get('qt_detailed_view', False)
@@ -1577,7 +1577,7 @@ class ElectrumGui:
wallet.init_mpk( wallet.seed )
wallet.up_to_date_event.clear()
wallet.up_to_date = False
- wallet.interface.poke()
+ wallet.interface.poke('synchronizer')
waiting_dialog(waiting)
# run a dialog indicating the seed, ask the user to remember it
ElectrumWindow.show_seed_dialog(wallet)
@@ -1589,7 +1589,7 @@ class ElectrumGui:
wallet.init_mpk( wallet.seed )
wallet.up_to_date_event.clear()
wallet.up_to_date = False
- wallet.interface.poke()
+ wallet.interface.poke('synchronizer')
waiting_dialog(waiting)
if wallet.is_found():
# history and addressbook
diff --git a/lib/interface.py b/lib/interface.py
@@ -28,11 +28,11 @@ DEFAULT_TIMEOUT = 5
DEFAULT_SERVERS = [
'electrum.novit.ro:50001:t',
'electrum.pdmc.net:50001:t',
- #'ecdsa.org:50002:s',
+ 'ecdsa.org:50001:t',
'electrum.bitcoins.sk:50001:t',
'uncle-enzo.info:50001:t',
'electrum.bytesized-hosting.com:50001:t',
- 'california.stratum.bitcoin.cz:50001:t',
+ 'electrum.bitcoin.cz:50001:t',
'electrum.bitfoo.org:50001:t'
]
@@ -42,24 +42,22 @@ proxy_modes = ['socks4', 'socks5', 'http']
def pick_random_server():
return random.choice( DEFAULT_SERVERS )
-def pick_random_interface(config):
- servers = DEFAULT_SERVERS
- while servers:
- server = random.choice( servers )
- servers.remove(server)
- config.set_key('server', server, False)
- i = Interface(config)
- if i.is_connected:
- return i
- raise BaseException('no server available')
-class InterfaceAncestor(threading.Thread):
+class Interface(threading.Thread):
- def __init__(self, host, port, proxy=None, use_ssl=True):
- threading.Thread.__init__(self)
- self.daemon = True
+ def register_callback(self, update_callback):
+ with self.lock:
+ self.update_callbacks.append(update_callback)
+
+ def trigger_callbacks(self):
+ with self.lock:
+ callbacks = self.update_callbacks[:]
+ [update() for update in callbacks]
+
+
+ def init_server(self, host, port, proxy=None, use_ssl=True):
self.host = host
self.port = port
self.proxy = proxy
@@ -74,13 +72,9 @@ class InterfaceAncestor(threading.Thread):
#json
self.message_id = 0
- self.responses = Queue.Queue()
self.unanswered_requests = {}
- def poke(self):
- # push a fake response so that the getting thread exits its loop
- self.responses.put(None)
def queue_json_response(self, c):
@@ -95,12 +89,19 @@ class InterfaceAncestor(threading.Thread):
return
if msg_id is not None:
- method, params = self.unanswered_requests.pop(msg_id)
+ with self.lock:
+ method, params, channel = self.unanswered_requests.pop(msg_id)
result = c.get('result')
else:
- # notification
+ # notification. we should find the channel(s)..
method = c.get('method')
params = c.get('params')
+ with self.lock:
+ for k,v in self.subscriptions.items():
+ if (method, params) in v:
+ channel = k
+ else:
+ raise
if method == 'blockchain.numblocks.subscribe':
result = params[0]
@@ -111,32 +112,29 @@ class InterfaceAncestor(threading.Thread):
result = params[1]
params = [addr]
- self.responses.put({'method':method, 'params':params, 'result':result, 'id':msg_id})
-
-
+ response_queue = self.responses[channel]
+ response_queue.put({'method':method, 'params':params, 'result':result, 'id':msg_id})
- def subscribe(self, addresses):
- messages = []
- for addr in addresses:
- messages.append(('blockchain.address.subscribe', [addr]))
- self.send(messages)
+ def get_response(self, channel='default', block=True, timeout=10000000000):
+ return self.responses[channel].get(block, timeout)
+ def register_channel(self, channel):
+ with self.lock:
+ self.responses[channel] = Queue.Queue()
+ def poke(self, channel):
+ self.responses[channel].put(None)
-class HttpStratumInterface(InterfaceAncestor):
- """ non-persistent connection. synchronous calls"""
- def __init__(self, host, port, proxy=None, use_ssl=True):
- InterfaceAncestor.__init__(self, host, port, proxy, use_ssl)
+ def init_http(self, host, port, proxy=None, use_ssl=True):
+ self.init_server(host, port, proxy, use_ssl)
self.session_id = None
self.connection_msg = ('https' if self.use_ssl else 'http') + '://%s:%d'%( self.host, self.port )
- def get_history(self, address):
- self.send([('blockchain.address.get_history', [address] )])
- def run(self):
+ def run_http(self):
self.is_connected = True
while self.is_connected:
try:
@@ -152,13 +150,13 @@ class HttpStratumInterface(InterfaceAncestor):
break
self.is_connected = False
- self.poke()
def poll(self):
self.send([])
- def send(self, messages):
+
+ def send_http(self, messages, channel='default'):
import urllib2, json, time, cookielib
if self.proxy:
@@ -177,7 +175,7 @@ class HttpStratumInterface(InterfaceAncestor):
method, params = m
if type(params) != type([]): params = [params]
data.append( { 'method':method, 'id':self.message_id, 'params':params } )
- self.unanswered_requests[self.message_id] = method, params
+ self.unanswered_requests[self.message_id] = method, params, channel
self.message_id += 1
if data:
@@ -221,14 +219,9 @@ class HttpStratumInterface(InterfaceAncestor):
-class TcpStratumInterface(InterfaceAncestor):
- """json-rpc over persistent TCP connection, asynchronous"""
+ def init_tcp(self, host, port, proxy=None, use_ssl=True):
+ self.init_server(host, port, proxy, use_ssl)
- def __init__(self, host, port, proxy=None, use_ssl=True):
- InterfaceAncestor.__init__(self, host, port, proxy, use_ssl)
- self.init_socket()
-
- def init_socket(self):
import ssl
global proxy_modes
self.connection_msg = "%s:%d"%(self.host,self.port)
@@ -251,17 +244,18 @@ class TcpStratumInterface(InterfaceAncestor):
s.settimeout(60)
self.s = s
self.is_connected = True
- self.send([('server.version', [ELECTRUM_VERSION])])
except:
self.is_connected = False
self.s = None
- def run(self):
+
+ def run_tcp(self):
try:
out = ''
while self.is_connected:
try: msg = self.s.recv(1024)
except socket.timeout:
+ print "timeout"
# ping the server with server.version, as a real ping does not exist yet
self.send([('server.version', [ELECTRUM_VERSION])])
continue
@@ -283,17 +277,16 @@ class TcpStratumInterface(InterfaceAncestor):
traceback.print_exc(file=sys.stdout)
self.is_connected = False
- print "Poking"
- self.poke()
- def send(self, messages):
+
+ def send_tcp(self, messages, channel='default'):
"""return the ids of the requests that we sent"""
out = ''
ids = []
for m in messages:
method, params = m
request = json.dumps( { 'id':self.message_id, 'method':method, 'params':params } )
- self.unanswered_requests[self.message_id] = method, params
+ self.unanswered_requests[self.message_id] = method, params, channel
ids.append(self.message_id)
# uncomment to debug
# print "-->",request
@@ -304,18 +297,55 @@ class TcpStratumInterface(InterfaceAncestor):
out = out[sent:]
return ids
- def get_history(self, addr):
- self.send([('blockchain.address.get_history', [addr])])
-
-class Interface(TcpStratumInterface, HttpStratumInterface):
-
- def __init__(self, config = None):
+ def __init__(self, config=None, loop=False, servers_loaded_callback=None):
if config is None:
from simple_config import SimpleConfig
config = SimpleConfig()
+
+ threading.Thread.__init__(self)
+ self.daemon = True
+ self.loop = loop
+ self.config = config
+ self.servers_loaded_callback = servers_loaded_callback
+
+ self.subscriptions = {}
+ self.responses = {}
+ self.responses['default'] = Queue.Queue()
+
+ self.update_callbacks = []
+ self.lock = threading.Lock()
+ self.init_interface()
+
+
+
+ def init_interface(self):
+ if self.config.get('server'):
+ self.init_with_server(self.config)
+ else:
+ print "Using random server..."
+ servers = DEFAULT_SERVERS
+ while servers:
+ server = random.choice( servers )
+ servers.remove(server)
+ self.config.set_key('server', server, False)
+ self.init_with_server(self.config)
+ if self.is_connected: break
+
+ if not servers:
+ raise BaseException('no server available')
+
+ if self.is_connected:
+ print "Connected to " + self.connection_msg
+ self.send([('server.version', [ELECTRUM_VERSION])])
+ #self.send([('server.banner',[])], 'synchronizer')
+ else:
+ print_error("Failed to connect " + self.connection_msg)
+
+
+ def init_with_server(self, config):
s = config.get('server')
host, port, protocol = s.split(':')
@@ -327,24 +357,41 @@ class Interface(TcpStratumInterface, HttpStratumInterface):
#print protocol, host, port
if protocol in 'st':
- TcpStratumInterface.__init__(self, host, port, proxy, use_ssl=(protocol=='s'))
+ self.init_tcp(host, port, proxy, use_ssl=(protocol=='s'))
elif protocol in 'gh':
- HttpStratumInterface.__init__(self, host, port, proxy, use_ssl=(protocol=='g'))
+ self.init_http(host, port, proxy, use_ssl=(protocol=='g'))
else:
raise BaseException('Unknown protocol: %s'%protocol)
- def run(self):
- if self.protocol in 'st':
- TcpStratumInterface.run(self)
- else:
- HttpStratumInterface.run(self)
+ def send(self, messages, channel='default'):
+
+ sub = []
+ for message in messages:
+ m, v = message
+ if m[-10:] == '.subscribe':
+ sub.append(message)
+
+ if sub:
+ with self.lock:
+ if self.subscriptions.get(channel) is None:
+ self.subscriptions[channel] = []
+ self.subscriptions[channel] += sub
- def send(self, messages):
if self.protocol in 'st':
- return TcpStratumInterface.send(self, messages)
+ with self.lock:
+ out = self.send_tcp(messages, channel)
else:
- return HttpStratumInterface.send(self, messages)
+ # do not use lock, http is synchronous
+ out = self.send_http(messages, channel)
+
+ return out
+
+ def resend_subscriptions(self):
+ for channel, messages in self.subscriptions.items():
+ if messages:
+ self.send(messages, channel)
+
def parse_proxy_options(self, s):
@@ -377,12 +424,30 @@ class Interface(TcpStratumInterface, HttpStratumInterface):
print "changing server:", server, proxy
self.server = server
self.proxy = proxy
+ if self.protocol in 'st':
+ self.s.shutdown(socket.SHUT_RDWR)
+ self.s.close()
self.is_connected = False # this exits the polling loop
- self.poke()
- def is_up_to_date(self):
- return self.responses.empty() and not self.unanswered_requests
+ def is_empty(self, channel):
+ q = self.responses.get(channel)
+ if q:
+ return q.empty()
+ else:
+ return True
+
+
+ def get_pending_requests(self, channel):
+ result = []
+ with self.lock:
+ for k, v in self.unanswered_requests.items():
+ a, b, c = v
+ if c == channel: result.append(k)
+ return result
+
+ def is_up_to_date(self, channel):
+ return self.is_empty(channel) and not self.get_pending_requests(channel)
def synchronous_get(self, requests, timeout=100000000):
@@ -391,7 +456,7 @@ class Interface(TcpStratumInterface, HttpStratumInterface):
id2 = ids[:]
res = {}
while ids:
- r = self.responses.get(True, timeout)
+ r = self.responses['default'].get(True, timeout)
_id = r.get('id')
if _id in ids:
ids.remove(_id)
@@ -403,130 +468,15 @@ class Interface(TcpStratumInterface, HttpStratumInterface):
+ def run(self):
+ while True:
+ self.run_tcp() if self.protocol in 'st' else self.run_http()
+ self.trigger_callbacks()
+ if not self.loop: break
-class WalletSynchronizer(threading.Thread):
-
- def __init__(self, wallet, config, loop=False, servers_loaded_callback=None):
- threading.Thread.__init__(self)
- self.daemon = True
- self.wallet = wallet
- self.loop = loop
- self.config = config
- self.init_interface()
- self.servers_loaded_callback = servers_loaded_callback
-
- def init_interface(self):
- if self.config.get('server'):
- self.interface = Interface(self.config)
- else:
- print "Using random server..."
- self.interface = pick_random_interface(self.config)
-
- if self.interface.is_connected:
- print "Connected to " + self.interface.connection_msg
- else:
- print_error("Failed to connect " + self.interface.connection_msg)
-
- self.wallet.interface = self.interface
-
- def handle_response(self, r):
- if r is None:
- return
-
- method = r['method']
- params = r['params']
- result = r['result']
-
- if method == 'server.banner':
- self.wallet.banner = result
- self.wallet.was_updated = True
-
- elif method == 'server.peers.subscribe':
- servers = []
- for item in result:
- s = []
- host = item[1]
- ports = []
- version = None
- if len(item) > 2:
- for v in item[2]:
- if re.match("[stgh]\d+", v):
- ports.append((v[0], v[1:]))
- if re.match("v(.?)+", v):
- version = v[1:]
- if ports and version:
- servers.append((host, ports))
- self.interface.servers = servers
- # servers_loaded_callback is None for commands, but should
- # NEVER be None when using the GUI.
- if self.servers_loaded_callback is not None:
- self.servers_loaded_callback()
-
- elif method == 'blockchain.address.subscribe':
- addr = params[0]
- self.wallet.receive_status_callback(addr, result)
-
- elif method == 'blockchain.address.get_history':
- addr = params[0]
- self.wallet.receive_history_callback(addr, result)
- self.wallet.was_updated = True
-
- elif method == 'blockchain.transaction.broadcast':
- self.wallet.tx_result = result
- self.wallet.tx_event.set()
-
- elif method == 'blockchain.numblocks.subscribe':
- self.wallet.blocks = result
- self.wallet.was_updated = True
-
- elif method == 'server.version':
- pass
-
- else:
- print_error("Error: Unknown message:" + method + ", " + params + ", " + result)
-
-
- def start_interface(self):
- self.interface.start()
- if self.interface.is_connected:
- self.wallet.start_session(self.interface)
-
+ time.sleep(5)
+ self.init_interface()
+ self.resend_subscriptions()
- def run(self):
- import socket, time
- self.start_interface()
- while True:
- while self.interface.is_connected:
- new_addresses = self.wallet.synchronize()
- if new_addresses:
- self.interface.subscribe(new_addresses)
-
- if self.interface.is_up_to_date():
- if not self.wallet.up_to_date:
- self.wallet.up_to_date = True
- self.wallet.was_updated = True
- self.wallet.up_to_date_event.set()
- else:
- if self.wallet.up_to_date:
- self.wallet.up_to_date = False
- self.wallet.was_updated = True
-
- if self.wallet.was_updated:
- self.wallet.trigger_callbacks()
- self.wallet.was_updated = False
-
- response = self.interface.responses.get()
- self.handle_response(response)
-
- self.wallet.trigger_callbacks()
- if self.loop:
- time.sleep(5)
- # Server has been changed. Copy callback for new interface.
- self.proxy = self.interface.proxy
- self.init_interface()
- self.start_interface()
- continue
- else:
- break
diff --git a/lib/wallet.py b/lib/wallet.py
@@ -28,6 +28,7 @@ import threading
import random
import aes
import ecdsa
+import Queue
from ecdsa.util import string_to_number, number_to_string
from util import print_error, user_dir, format_satoshis
@@ -50,7 +51,6 @@ class Wallet:
self.config = config
self.electrum_version = ELECTRUM_VERSION
- self.update_callbacks = []
# saved fields
self.seed_version = config.get('seed_version', SEED_VERSION)
@@ -94,16 +94,6 @@ class Wallet:
raise ValueError("This wallet seed is deprecated. Please run upgrade.py for a diagnostic.")
- def register_callback(self, update_callback):
- with self.lock:
- self.update_callbacks.append(update_callback)
-
- def trigger_callbacks(self):
- with self.lock:
- callbacks = self.update_callbacks[:]
- [update() for update in callbacks]
-
-
def import_key(self, keypair, password):
address, key = keypair.split(':')
if not self.is_valid(address):
@@ -480,7 +470,8 @@ class Wallet:
return s
def get_status(self, address):
- h = self.history.get(address)
+ with self.lock:
+ h = self.history.get(address)
if not h:
status = None
else:
@@ -490,11 +481,6 @@ class Wallet:
status = status + ':%d'% len(h)
return status
- def receive_status_callback(self, addr, status):
- with self.lock:
- if self.get_status(addr) != status:
- #print "updating status for", addr, status
- self.interface.get_history(addr)
def receive_history_callback(self, addr, data):
#print "updating history for", addr
@@ -504,10 +490,26 @@ class Wallet:
self.save()
def get_tx_history(self):
- lines = self.tx_history.values()
+ with self.lock:
+ lines = self.tx_history.values()
lines = sorted(lines, key=operator.itemgetter("timestamp"))
return lines
+ def get_tx_hashes(self):
+ with self.lock:
+ hashes = self.tx_history.keys()
+ return hashes
+
+ def get_transactions_at_height(self, height):
+ with self.lock:
+ values = self.tx_history.values()[:]
+
+ out = []
+ for tx in values:
+ if tx['height'] == height:
+ out.append(tx['tx_hash'])
+ return out
+
def update_tx_history(self):
self.tx_history= {}
for addr in self.all_addresses():
@@ -751,12 +753,6 @@ class Wallet:
self.up_to_date_event.wait(10000000000)
- def start_session(self, interface):
- self.interface = interface
- self.interface.send([('server.banner',[]), ('blockchain.numblocks.subscribe',[]), ('server.peers.subscribe',[])])
- self.interface.subscribe(self.all_addresses())
-
-
def freeze(self,addr):
if addr in self.all_addresses() and addr not in self.frozen_addresses:
self.unprioritize(addr)
@@ -816,3 +812,223 @@ class Wallet:
for k, v in s.items():
self.config.set_key(k,v)
self.config.save()
+
+
+
+
+
+
+class WalletSynchronizer(threading.Thread):
+
+
+ def __init__(self, wallet, config):
+ threading.Thread.__init__(self)
+ self.daemon = True
+ self.wallet = wallet
+ self.interface = self.wallet.interface
+ self.interface.register_channel('synchronizer')
+
+
+ def synchronize_wallet(self):
+ new_addresses = self.wallet.synchronize()
+ if new_addresses:
+ self.subscribe_to_addresses(new_addresses)
+
+ if self.interface.is_up_to_date('synchronizer'):
+ if not self.wallet.up_to_date:
+ self.wallet.up_to_date = True
+ self.wallet.was_updated = True
+ self.wallet.up_to_date_event.set()
+ else:
+ if self.wallet.up_to_date:
+ self.wallet.up_to_date = False
+ self.wallet.was_updated = True
+
+ if self.wallet.was_updated:
+ self.interface.trigger_callbacks()
+ self.wallet.was_updated = False
+
+
+ def subscribe_to_addresses(self, addresses):
+ messages = []
+ for addr in addresses:
+ messages.append(('blockchain.address.subscribe', [addr]))
+ self.interface.send( messages, 'synchronizer')
+
+
+ def run(self):
+
+ # subscriptions
+ self.interface.send([('blockchain.numblocks.subscribe',[]), ('server.peers.subscribe',[])], 'synchronizer')
+ self.subscribe_to_addresses(self.wallet.all_addresses())
+
+ while True:
+ # 1. send new requests
+ self.synchronize_wallet()
+
+ # 2. get a response
+ r = self.interface.get_response('synchronizer')
+ if not r: continue
+
+ # 3. handle response
+ method = r['method']
+ params = r['params']
+ result = r['result']
+
+ if method == 'blockchain.address.subscribe':
+ addr = params[0]
+ if self.wallet.get_status(addr) != result:
+ self.interface.send([('blockchain.address.get_history', [address] )])
+
+ elif method == 'blockchain.address.get_history':
+ addr = params[0]
+ self.wallet.receive_history_callback(addr, result)
+ self.wallet.was_updated = True
+
+ elif method == 'blockchain.transaction.broadcast':
+ self.wallet.tx_result = result
+ self.wallet.tx_event.set()
+
+ elif method == 'blockchain.numblocks.subscribe':
+ self.wallet.blocks = result
+ self.wallet.was_updated = True
+
+ elif method == 'server.banner':
+ self.wallet.banner = result
+ self.wallet.was_updated = True
+
+ elif method == 'server.peers.subscribe':
+ servers = []
+ for item in result:
+ s = []
+ host = item[1]
+ ports = []
+ version = None
+ if len(item) > 2:
+ for v in item[2]:
+ if re.match("[stgh]\d+", v):
+ ports.append((v[0], v[1:]))
+ if re.match("v(.?)+", v):
+ version = v[1:]
+ if ports and version:
+ servers.append((host, ports))
+ self.interface.servers = servers
+
+ # servers_loaded_callback is None for commands, but should
+ # NEVER be None when using the GUI.
+ #if self.servers_loaded_callback is not None:
+ # self.servers_loaded_callback()
+
+ elif method == 'server.version':
+ pass
+
+ else:
+ print_error("Error: Unknown message:" + method + ", " + params + ", " + result)
+
+
+encode = lambda x: x[::-1].encode('hex')
+decode = lambda x: x.decode('hex')[::-1]
+from bitcoin import Hash, rev_hex, int_to_hex
+
+class WalletVerifier(threading.Thread):
+
+ def __init__(self, wallet, config):
+ threading.Thread.__init__(self)
+ self.daemon = True
+ self.wallet = wallet
+ self.interface = self.wallet.interface
+ self.interface.register_channel('verifier')
+ self.validated = []
+ self.merkle_roots = {}
+ self.headers = {}
+ self.lock = threading.Lock()
+
+ def run(self):
+ requested = []
+
+ while True:
+ txlist = self.wallet.get_tx_hashes()
+ for tx in txlist:
+ if tx not in requested:
+ requested.append(tx)
+ self.request_merkle(tx)
+ break
+ try:
+ r = self.interface.get_response('verifier',timeout=1)
+ except Queue.Empty:
+ continue
+
+ # 3. handle response
+ method = r['method']
+ params = r['params']
+ result = r['result']
+
+ if method == 'blockchain.transaction.get_merkle':
+ tx_hash = params[0]
+ tx_height = result.get('block_height')
+ self.merkle_roots[tx_hash] = self.hash_merkle_root(result['merkle'], tx_hash)
+ # if we already have the header, check merkle root directly
+ header = self.headers.get(tx_height)
+ if header:
+ self.validated.append(tx_hash)
+ assert header.get('merkle_root') == self.merkle_roots[tx_hash]
+ self.request_headers(tx_height)
+
+ elif method == 'blockchain.block.get_header':
+ self.validate_header(result)
+
+
+ def request_merkle(self, tx_hash):
+ self.interface.send([ ('blockchain.transaction.get_merkle',[tx_hash]) ], 'verifier')
+
+
+ def request_headers(self, tx_height, delta=10):
+ headers_requests = []
+ for height in range(tx_height-delta,tx_height+delta): # we might can request blocks that do not exist yet
+ if height not in self.headers:
+ headers_requests.append( ('blockchain.block.get_header',[height]) )
+ self.interface.send(headers_requests,'verifier')
+
+
+ def validate_header(self, header):
+ """ if there is a previous or a next block in the list, check the hash"""
+ height = header.get('block_height')
+ with self.lock:
+ self.headers[height] = header # detect conflicts
+ prev_header = next_header = None
+ if height-1 in self.headers:
+ prev_header = self.headers[height-1]
+ if height+1 in self.headers:
+ next_header = self.headers[height+1]
+
+ if prev_header:
+ prev_hash = self.hash_header(prev_header)
+ assert prev_hash == header.get('prev_block_hash')
+ if next_header:
+ _hash = self.hash_header(header)
+ assert _hash == next_header.get('prev_block_hash')
+
+ # check if there are transactions at that height
+ for tx_hash in self.wallet.get_transactions_at_height(height):
+ if tx_hash in self.validated: continue
+ # check if we already have the merkle root
+ merkle_root = self.merkle_roots.get(tx_hash)
+ if merkle_root:
+ self.validated.append(tx_hash)
+ assert header.get('merkle_root') == merkle_root
+
+ def hash_header(self, res):
+ header = int_to_hex(res.get('version'),4) \
+ + rev_hex(res.get('prev_block_hash')) \
+ + rev_hex(res.get('merkle_root')) \
+ + int_to_hex(int(res.get('timestamp')),4) \
+ + int_to_hex(int(res.get('bits')),4) \
+ + int_to_hex(int(res.get('nonce')),4)
+ return rev_hex(Hash(header.decode('hex')).encode('hex'))
+
+ def hash_merkle_root(self, merkle_s, target_hash):
+ h = decode(target_hash)
+ for item in merkle_s:
+ is_left = item[0] == 'L'
+ h = Hash( h + decode(item[1:]) ) if is_left else Hash( decode(item[1:]) + h )
+ return encode(h)
diff --git a/scripts/blocks b/scripts/blocks
@@ -8,7 +8,7 @@ i.send([('blockchain.numblocks.subscribe',[])])
while True:
try:
- r = i.responses.get(True, 100000000000)
+ r = i.get_response()
except KeyboardInterrupt:
break
if r.get('method') == 'blockchain.numblocks.subscribe':
diff --git a/scripts/peers b/scripts/peers
@@ -2,10 +2,10 @@
from electrum import Interface
-i = Interface({'server':'electrum.novit.ro:50001:t'})
+i = Interface({'server':'ecdsa.org:50001:t'})
i.start()
i.send([('server.peers.subscribe',[])])
while True:
- r = i.responses.get(True, 100000000000)
+ r = i.get_response()
print r.get('result')
diff --git a/scripts/servers b/scripts/servers
@@ -5,12 +5,12 @@ import time, Queue
servers = DEFAULT_SERVERS
interfaces = map ( lambda server: Interface({'server':server} ), servers )
-results = []
for i in interfaces:
if i.is_connected:
i.start()
i.send([('blockchain.numblocks.subscribe',[])])
+ i.status = "timed out"
else:
servers.remove(i.server)
i.status = "unreachable"
@@ -18,29 +18,25 @@ for i in interfaces:
for i in interfaces:
while True:
try:
- r = i.responses.get(True,1)
+ r = i.get_response(timeout=1)
except Queue.Empty:
break
if r.get('method') == 'blockchain.numblocks.subscribe':
- results.append((i.host, r.get('result')))
- i.status = "ok"
servers.remove(i.server)
+ i.status = "ok"
+ i.blocks = r.get('result')
break
-for s in servers:
- i.status = "timed out"
from collections import defaultdict
d = defaultdict(int)
-for e in results:
- d[e[1]] += 1
+for i in interfaces:
+ if i.status == 'ok':
+ d[i.blocks] += 1
v = d.values()
numblocks = d.keys()[v.index(max(v))]
for i in interfaces:
- print i.host, i.status
-
-for s,n in results:
- print "%30s %d "%(s, n), "ok" if abs(n-numblocks)<2 else "lagging"
+ print "%30s %s "%(i.host, i.status) #, "ok" if abs(n-numblocks)<2 else "lagging"