commit 328315f94baeccaa646c713a5626960709879c0a
parent ac26abfed3e9f4d36b4419c90ca9f48aa285c4ee
Author: ThomasV <thomasv@gitorious>
Date: Sun, 8 Sep 2013 17:23:01 +0200
separate blockchain and network
Diffstat:
8 files changed, 288 insertions(+), 243 deletions(-)
diff --git a/electrum b/electrum
@@ -128,18 +128,14 @@ if __name__ == '__main__':
#sys.exit("Error: Unknown GUI: " + gui_name )
# network interface
- interface = Interface(config, True)
- interface.start(wait = False)
- interface.send([('server.peers.subscribe',[])])
+ network = Network(config)
+ network.start()
+ #interface.send([('server.peers.subscribe',[])])
- blockchain = BlockchainVerifier(interface, config)
- blockchain.start()
-
- gui = gui.ElectrumGui(config, interface, blockchain)
+ gui = gui.ElectrumGui(config, network)
gui.main(url)
- interface.stop()
- blockchain.stop()
+ network.stop()
# we use daemon threads, their termination is enforced.
# this sleep command gives them time to terminate cleanly.
diff --git a/gui/gui_classic.py b/gui/gui_classic.py
@@ -568,8 +568,6 @@ class ElectrumWindow(QMainWindow):
self.config.set_key('io_dir', os.path.dirname(fileName), True)
return fileName
-
-
def close(self):
QMainWindow.close(self)
self.run_hook('close_main_window')
@@ -1367,7 +1365,7 @@ class ElectrumWindow(QMainWindow):
console.history = self.config.get("console-history",[])
console.history_index = len(console.history)
- console.updateNamespace({'wallet' : self.wallet, 'interface' : self.wallet.interface, 'gui':self})
+ console.updateNamespace({'wallet' : self.wallet, 'network' : self.wallet.network, 'gui':self})
console.updateNamespace({'util' : util, 'bitcoin':bitcoin})
c = commands.Commands(self.wallet, self.wallet.interface, lambda: self.console.set_json(True))
@@ -2258,10 +2256,11 @@ class OpenFileEventFilter(QObject):
class ElectrumGui:
- def __init__(self, config, interface, blockchain, app=None):
- self.interface = interface
+ def __init__(self, config, network, app=None):
+ self.network = network
+ #self.interface = interface
self.config = config
- self.blockchain = blockchain
+ #self.blockchain = network.blockchain
self.windows = []
self.efilter = OpenFileEventFilter(self.windows)
if app is None:
@@ -2281,7 +2280,7 @@ class ElectrumGui:
else:
wallet = Wallet(storage)
- wallet.start_threads(self.interface, self.blockchain)
+ wallet.start_threads(self.network)
s = Timer()
s.start()
diff --git a/lib/__init__.py b/lib/__init__.py
@@ -3,7 +3,7 @@ from util import format_satoshis, print_msg, print_json, print_error, set_verbos
from wallet import WalletSynchronizer, WalletStorage
from wallet_factory import WalletFactory as Wallet
from verifier import TxVerifier
-from blockchain import BlockchainVerifier
+from network import Network
from interface import Interface, pick_random_server, DEFAULT_SERVERS
from simple_config import SimpleConfig
import bitcoin
diff --git a/lib/blockchain.py b/lib/blockchain.py
@@ -22,10 +22,9 @@ from util import user_dir, appdata_dir, print_error
from bitcoin import *
-class BlockchainVerifier(threading.Thread):
- """ Simple Payment Verification """
+class Blockchain(threading.Thread):
- def __init__(self, interface, config):
+ def __init__(self, config):
threading.Thread.__init__(self)
self.daemon = True
self.config = config
@@ -34,112 +33,62 @@ class BlockchainVerifier(threading.Thread):
self.local_height = 0
self.running = False
self.headers_url = 'http://headers.electrum.org/blockchain_headers'
- self.interface = interface
- interface.register_channel('verifier')
self.set_local_height()
+ self.queue = Queue.Queue()
-
-
- def start_interfaces(self):
- import interface
- servers = interface.DEFAULT_SERVERS
- servers = interface.filter_protocol(servers,'s')
- print_error("using %d servers"% len(servers))
- self.interfaces = map ( lambda server: interface.Interface({'server':server} ), servers )
-
- for i in self.interfaces:
- i.start()
- # subscribe to block headers
- i.register_channel('verifier')
- i.register_channel('get_header')
- i.send([ ('blockchain.headers.subscribe',[])], 'verifier')
- # note: each interface should send its results directly to a queue, instead of channels
- # pass the queue to the interface, so that several can share the same queue
-
-
- def get_new_response(self):
- # listen to interfaces, forward to verifier using the queue
- while self.is_running():
- for i in self.interfaces:
- try:
- r = i.get_response('verifier',timeout=0)
- except Queue.Empty:
- continue
-
- result = r.get('result')
- if result:
- return (i,result)
-
- time.sleep(1)
-
-
-
-
+
def stop(self):
with self.lock: self.running = False
- #self.interface.poke('verifier')
+
def is_running(self):
with self.lock: return self.running
- def request_header(self, i, h):
- print_error("requesting header %d from %s"%(h, i.server))
- i.send([ ('blockchain.block.get_header',[h])], 'get_header')
+ def run(self):
+ self.init_headers_file()
+ self.set_local_height()
+ print_error( "blocks:", self.local_height )
+
+ with self.lock:
+ self.running = True
+
+ while self.is_running():
- def retrieve_header(self, i):
- while True:
try:
- r = i.get_response('get_header',timeout=1)
+ i, result = self.queue.get()
except Queue.Empty:
- print_error('timeout')
continue
- if r.get('error'):
- print_error('Verifier received an error:', r)
- continue
-
- # 3. handle response
- method = r['method']
- params = r['params']
- result = r['result']
+ header= result.get('result')
+ #print_error( i.server, header )
+ height = header.get('block_height')
- if method == 'blockchain.block.get_header':
- return result
-
+ if height > self.local_height + 50:
+ self.get_chunks(i, header, height)
+ i.trigger_callback('updated')
- def get_chain(self, interface, final_header):
+ if height > self.local_height:
+ # get missing parts from interface (until it connects to my chain)
+ chain = self.get_chain( i, header )
- header = final_header
- chain = [ final_header ]
- requested_header = False
-
- while self.is_running():
+ # skip that server if the result is not consistent
+ if not chain: continue
+
+ # verify the chain
+ if self.verify_chain( chain ):
+ print_error("height:", height, i.server)
+ for header in chain:
+ self.save_header(header)
+ self.height = height
+ else:
+ print_error("error", i.server)
+ # todo: dismiss that server
- if requested_header:
- header = self.retrieve_header(interface)
- if not header: return
- chain = [ header ] + chain
- requested_header = False
+ i.trigger_callback('updated')
- height = header.get('block_height')
- previous_header = self.read_header(height -1)
- if not previous_header:
- self.request_header(interface, height - 1)
- requested_header = True
- continue
- # verify that it connects to my chain
- prev_hash = self.hash_header(previous_header)
- if prev_hash != header.get('prev_block_hash'):
- print_error("reorg")
- self.request_header(interface, height - 1)
- requested_header = True
- continue
- else:
- # the chain is complete
- return chain
def verify_chain(self, chain):
@@ -166,37 +115,6 @@ class BlockchainVerifier(threading.Thread):
return True
- def get_chunks(self, i, header, height):
- requested_chunks = []
- min_index = (self.local_height + 1)/2016
- max_index = (height + 1)/2016
- for n in range(min_index, max_index + 1):
- print_error( "requesting chunk", n )
- i.send([ ('blockchain.block.get_chunk',[n])], 'get_header')
- requested_chunks.append(n)
- break
-
- while requested_chunks:
- try:
- r = i.get_response('get_header',timeout=1)
- except Queue.Empty:
- continue
- if not r: continue
-
- if r.get('error'):
- print_error('Verifier received an error:', r)
- continue
-
- # 3. handle response
- method = r['method']
- params = r['params']
- result = r['result']
-
- if method == 'blockchain.block.get_chunk':
- index = params[0]
- self.verify_chunk(index, result)
- requested_chunks.remove(index)
-
def verify_chunk(self, index, hexdata):
data = hexdata.decode('hex')
@@ -259,8 +177,6 @@ class BlockchainVerifier(threading.Thread):
return True
-
-
def header_to_string(self, res):
s = int_to_hex(res.get('version'),4) \
+ rev_hex(res.get('prev_block_hash')) \
@@ -383,65 +299,100 @@ class BlockchainVerifier(threading.Thread):
return new_bits, new_target
+ def request_header(self, i, h):
+ print_error("requesting header %d from %s"%(h, i.server))
+ i.send([ ('blockchain.block.get_header',[h])], 'get_header')
+ def retrieve_header(self, i):
+ while True:
+ try:
+ r = i.get_response('get_header',timeout=1)
+ except Queue.Empty:
+ print_error('timeout')
+ continue
- def run(self):
- self.start_interfaces()
-
- self.init_headers_file()
- self.set_local_height()
- print_error( "blocks:", self.local_height )
+ if r.get('error'):
+ print_error('Verifier received an error:', r)
+ continue
- with self.lock:
- self.running = True
+ # 3. handle response
+ method = r['method']
+ params = r['params']
+ result = r['result']
+
+ if method == 'blockchain.block.get_header':
+ return result
+
+
+
+ def get_chain(self, interface, final_header):
+ header = final_header
+ chain = [ final_header ]
+ requested_header = False
+
while self.is_running():
- i, header = self.get_new_response()
-
+ if requested_header:
+ header = self.retrieve_header(interface)
+ if not header: return
+ chain = [ header ] + chain
+ requested_header = False
+
height = header.get('block_height')
+ previous_header = self.read_header(height -1)
+ if not previous_header:
+ self.request_header(interface, height - 1)
+ requested_header = True
+ continue
- if height > self.local_height + 50:
- self.get_chunks(i, header, height)
- self.interface.trigger_callback('updated')
+ # verify that it connects to my chain
+ prev_hash = self.hash_header(previous_header)
+ if prev_hash != header.get('prev_block_hash'):
+ print_error("reorg")
+ self.request_header(interface, height - 1)
+ requested_header = True
+ continue
- if height > self.local_height:
- # get missing parts from interface (until it connects to my chain)
- chain = self.get_chain( i, header )
+ else:
+ # the chain is complete
+ return chain
- # skip that server if the result is not consistent
- if not chain: continue
-
- # verify the chain
- if self.verify_chain( chain ):
- print_error("height:", height, i.server)
- for header in chain:
- self.save_header(header)
- self.height = height
- else:
- print_error("error", i.server)
- # todo: dismiss that server
- self.interface.trigger_callback('updated')
-
+ def get_chunks(self, i, header, height):
+ requested_chunks = []
+ min_index = (self.local_height + 1)/2016
+ max_index = (height + 1)/2016
+ for n in range(min_index, max_index + 1):
+ print_error( "requesting chunk", n )
+ i.send([ ('blockchain.block.get_chunk',[n])], 'get_header')
+ requested_chunks.append(n)
+ break
+ while requested_chunks:
+ try:
+ r = i.get_response('get_header',timeout=1)
+ except Queue.Empty:
+ continue
+ if not r: continue
+ if r.get('error'):
+ print_error('Verifier received an error:', r)
+ continue
-if __name__ == "__main__":
- import interface, simple_config
-
- config = simple_config.SimpleConfig({'verbose':True})
+ # 3. handle response
+ method = r['method']
+ params = r['params']
+ result = r['result']
+
+ if method == 'blockchain.block.get_chunk':
+ index = params[0]
+ self.verify_chunk(index, result)
+ requested_chunks.remove(index)
- i0 = interface.Interface()
- i0.start()
- bv = BlockchainVerifier(i0, config)
- bv.start()
- # listen to interfaces, forward to verifier using the queue
- while 1:
- time.sleep(1)
diff --git a/lib/interface.py b/lib/interface.py
@@ -66,18 +66,21 @@ def pick_random_server():
class Interface(threading.Thread):
+
def register_callback(self, event, callback):
with self.lock:
if not self.callbacks.get(event):
self.callbacks[event] = []
self.callbacks[event].append(callback)
+
def trigger_callback(self, event):
with self.lock:
callbacks = self.callbacks.get(event,[])[:]
if callbacks:
[callback() for callback in callbacks]
+
def init_server(self, host, port, proxy=None, use_ssl=True):
self.host = host
self.port = port
@@ -188,16 +191,19 @@ class Interface(threading.Thread):
return
response_queue = self.responses[channel]
- response_queue.put({'method':method, 'params':params, 'result':result, 'id':msg_id})
+ response_queue.put((self, {'method':method, 'params':params, 'result':result, 'id':msg_id}))
def get_response(self, channel='default', block=True, timeout=10000000000):
- return self.responses[channel].get(block, timeout)
+ i, r = self.responses[channel].get(block, timeout)
+ return r
- def register_channel(self, channel):
+ def register_channel(self, channel, queue=None):
+ if queue is None:
+ queue = Queue.Queue()
with self.lock:
- self.responses[channel] = Queue.Queue()
+ self.responses[channel] = queue
def poke(self, channel):
self.responses[channel].put(None)
@@ -418,7 +424,7 @@ class Interface(threading.Thread):
- def __init__(self, config=None, loop=False):
+ def __init__(self, config=None):
self.server = random.choice(filter_protocol(DEFAULT_SERVERS, 's'))
self.proxy = None
@@ -428,7 +434,6 @@ class Interface(threading.Thread):
threading.Thread.__init__(self)
self.daemon = True
- self.loop = loop
self.config = config
self.connect_event = threading.Event()
@@ -457,32 +462,11 @@ class Interface(threading.Thread):
if self.config.get('auto_cycle') is None:
self.config.set_key('auto_cycle', True, False)
- if not self.is_connected and self.config.get('auto_cycle'):
- print_msg("Using random server...")
- servers = filter_protocol(DEFAULT_SERVERS, 's')
- while servers:
- server = random.choice( servers )
- servers.remove(server)
- print server
- self.config.set_key('server', server, False)
- self.init_with_server(self.config)
- if self.is_connected: break
-
- if not self.is_connected:
- print 'no server available'
- self.connect_event.set() # to finish start
- self.server = 'ecdsa.org:50001:t'
- self.proxy = None
- return
+ if not self.is_connected:
+ self.connect_event.set()
+ return
self.connect_event.set()
- if self.is_connected:
- self.send([('server.version', [ELECTRUM_VERSION, PROTOCOL_VERSION])])
- self.send([('server.banner',[])])
- self.trigger_callback('connected')
- else:
- self.trigger_callback('notconnected')
- #print_error("Failed to connect " + self.connection_msg)
def init_with_server(self, config):
@@ -532,12 +516,6 @@ class Interface(threading.Thread):
return out
- def resend_subscriptions(self):
- for channel, messages in self.subscriptions.items():
- if messages:
- self.send(messages, channel)
-
-
def parse_proxy_options(self, s):
if type(s) == type({}): return s # fixme: type should be fixed
@@ -625,26 +603,24 @@ class Interface(threading.Thread):
return out
- def start(self, wait=True):
+ def start(self, queue):
+ self.queue = queue
threading.Thread.start(self)
- if wait:
- # wait until connection is established
- self.connect_event.wait()
- if not self.is_connected:
- return False
- return True
+
+
def run(self):
- while True:
- self.init_interface()
- if self.is_connected:
- self.resend_subscriptions()
- self.run_tcp() if self.protocol in 'st' else self.run_http()
+ self.init_interface()
+ if self.is_connected:
+ self.send([('server.version', [ELECTRUM_VERSION, PROTOCOL_VERSION])])
+ self.change_status()
+ self.run_tcp() if self.protocol in 'st' else self.run_http()
+ self.change_status()
+
+ def change_status(self):
+ self.queue.put(self)
- self.trigger_callback('disconnected')
- if not self.loop: break
- time.sleep(5)
diff --git a/lib/network.py b/lib/network.py
@@ -0,0 +1,121 @@
+import interface
+from blockchain import Blockchain
+import threading, time, Queue, os, sys, shutil
+from util import user_dir, appdata_dir, print_error
+from bitcoin import *
+
+
+class Network(threading.Thread):
+
+ def __init__(self, config):
+ threading.Thread.__init__(self)
+ self.daemon = True
+ self.config = config
+ self.lock = threading.Lock()
+ self.blockchain = Blockchain(config)
+ self.interfaces = {}
+ self.queue = Queue.Queue()
+ self.default_server = self.config.get('server')
+ self.servers_list = interface.filter_protocol(interface.DEFAULT_SERVERS,'s')
+
+
+
+ def start_interfaces(self):
+
+ for server in self.servers_list:
+ self.interfaces[server] = interface.Interface({'server':server})
+
+ for i in self.interfaces.values():
+ i.start(self.queue)
+
+ if self.default_server:
+ self.interface = interface.Interface({'server':self.default_server})
+ self.interface.start(self.queue)
+ else:
+ self.interface = self.interfaces[0]
+
+
+
+
+
+
+ def run(self):
+ self.blockchain.start()
+ self.start_interfaces()
+
+ with self.lock:
+ self.running = True
+
+ while self.is_running():
+ i = self.queue.get()
+
+ if i.is_connected:
+ i.register_channel('verifier', self.blockchain.queue)
+ i.register_channel('get_header')
+ i.send([ ('blockchain.headers.subscribe',[])], 'verifier')
+ if i == self.interface:
+ i.send([('server.banner',[])])
+ i.send([('server.peers.subscribe',[])])
+ else:
+ self.interfaces.pop(i.server)
+ if i == self.interface:
+ if self.default_server is None:
+ print_msg("Using random server...")
+ server = random.choice( self.servers_list )
+ self.interface = interface.Interface({'server':self.default_server})
+ else:
+ #i.trigger_callback('disconnected')
+ pass
+
+ def on_peers(self, resut):
+ pass
+
+ def on_banner(self, result):
+ pass
+
+ def stop(self):
+ with self.lock: self.running = False
+
+ def is_running(self):
+ with self.lock: return self.running
+
+
+ def resend_subscriptions(self):
+ for channel, messages in self.subscriptions.items():
+ if messages:
+ self.send(messages, channel)
+
+
+ def auto_cycle(self):
+ if not self.is_connected and self.config.get('auto_cycle'):
+ print_msg("Using random server...")
+ servers = filter_protocol(DEFAULT_SERVERS, 's')
+ while servers:
+ server = random.choice( servers )
+ servers.remove(server)
+ print server
+ self.config.set_key('server', server, False)
+ self.init_with_server(self.config)
+ if self.is_connected: break
+
+ if not self.is_connected:
+ print 'no server available'
+ self.connect_event.set() # to finish start
+ self.server = 'ecdsa.org:50001:t'
+ self.proxy = None
+ return
+
+
+
+
+if __name__ == "__main__":
+ import simple_config
+ config = simple_config.SimpleConfig({'verbose':True})
+ network = Network(config)
+ network.start()
+
+ while 1:
+ time.sleep(1)
+
+
+
diff --git a/lib/wallet.py b/lib/wallet.py
@@ -1343,10 +1343,11 @@ class Wallet:
return True
- def start_threads(self, interface, blockchain):
+ def start_threads(self, network):
from verifier import TxVerifier
- self.interface = interface
- self.verifier = TxVerifier(interface, blockchain, self.storage)
+ self.network = network
+ self.interface = network.interface
+ self.verifier = TxVerifier(self.interface, network.blockchain, self.storage)
self.verifier.start()
self.set_verifier(self.verifier)
self.synchronizer = WalletSynchronizer(self)
@@ -1370,7 +1371,7 @@ class WalletSynchronizer(threading.Thread):
wallet.synchronizer = self
self.interface = self.wallet.interface
self.interface.register_channel('synchronizer')
- self.wallet.interface.register_callback('connected', lambda: self.wallet.set_up_to_date(False))
+ #self.wallet.network.register_callback('connected', lambda: self.wallet.set_up_to_date(False))
self.was_updated = True
self.running = False
self.lock = threading.Lock()
diff --git a/setup.py b/setup.py
@@ -53,22 +53,23 @@ setup(name = "Electrum",
package_dir = {'electrum': 'lib', 'electrum_gui': 'gui', 'electrum_plugins':'plugins'},
scripts= ['electrum'],
data_files = data_files,
- py_modules = ['electrum.version',
- 'electrum.wallet',
- 'electrum.wallet_bitkey',
- 'electrum.wallet_factory',
- 'electrum.interface',
+ py_modules = ['electrum.account',
+ 'electrum.bitcoin',
'electrum.blockchain',
'electrum.commands',
+ 'electrum.interface',
'electrum.mnemonic',
+ 'electrum.msqr',
+ 'electrum.network',
'electrum.simple_config',
'electrum.socks',
- 'electrum.msqr',
- 'electrum.util',
- 'electrum.account',
- 'electrum.bitcoin',
'electrum.transaction',
+ 'electrum.util',
+ 'electrum.version',
'electrum.verifier',
+ 'electrum.wallet',
+ 'electrum.wallet_bitkey',
+ 'electrum.wallet_factory',
'electrum_gui.gui_gtk',
'electrum_gui.qt_console',
'electrum_gui.gui_classic',