commit 782cc4fe226a099332fe169aa52a4cd0febd1cc6
parent 6d8965401cba90565b358935979417b8a50281c9
Author: ThomasV <thomasv@gitorious>
Date: Sun, 18 Mar 2012 01:33:57 +0300
Merge branch 'master' of gitorious.org:electrum/electrum
Diffstat:
4 files changed, 230 insertions(+), 174 deletions(-)
diff --git a/client/blocks b/client/blocks
@@ -0,0 +1,12 @@
+#!/usr/bin/env python
+
+import socket, time, interface
+
+def cb(block_number):
+ print block_number
+
+i = interface.AsynchronousInterface('ecdsa.org', 50001, newblock_callback=cb)
+i.start_session([],"blocks")
+
+while True:
+ time.sleep(1)
diff --git a/client/electrum b/client/electrum
@@ -164,7 +164,11 @@ if __name__ == '__main__':
# open session
if cmd not in ['password', 'mktx', 'history', 'label', 'contacts', 'help', 'validateaddress', 'signmessage', 'verifymessage', 'eval', 'create', 'addresses', 'import']:
- interface.start_session(wallet)
+
+ addresses = wallet.all_addresses()
+ version = wallet.electrum_version
+ address_callback = wallet.retrieve_status_callback
+ interface.start_session(addresses, version, address_callback)
interface.update_wallet(wallet)
wallet.save()
diff --git a/client/interface.py b/client/interface.py
@@ -27,21 +27,32 @@ DEFAULT_SERVERS = ['ecdsa.org','electrum.novit.ro'] # list of default servers
class Interface:
- def __init__(self, host, port):
+ def __init__(self, host, port, address_callback=None, history_callback=None, newblock_callback=None):
self.host = host
self.port = port
+ self.address_callback = address_callback
+ self.history_callback = history_callback
+ self.newblock_callback = newblock_callback
self.servers = DEFAULT_SERVERS # actual list from IRC
self.rtime = 0
self.blocks = 0
self.message = ''
self.was_updated = True # fixme: use a semaphore
- self.is_up_to_date = False # True after the first poll
+ self.is_up_to_date = False
self.is_connected = False
self.disconnected_event = threading.Event()
self.disconnected_event.clear()
+ #only asynchrnous
+ self.addresses_waiting_for_status = []
+ self.addresses_waiting_for_history = []
+ self.tx_event = threading.Event()
+ self.up_to_date_event = threading.Event()
+ self.up_to_date_event.clear()
+
+
def send_tx(self, data):
out = self.handler('transaction.broadcast', data )
return out
@@ -49,84 +60,98 @@ class Interface:
def get_servers(self):
pass
- def start_session(self, wallet):
+ def start_session(self, addresses, version):
pass
-class NativeInterface(Interface):
- """This is the original Electrum protocol. It uses polling, and a non-persistent tcp connection"""
+ def handle_json_response(self, c):
+ #print c
+ msg_id = c.get('id')
+ result = c.get('result')
+ error = c.get('error')
+ if msg_id is None:
+ print "error: message without ID"
+ return
- def __init__(self, host, port):
- Interface.__init__(self, host, port)
+ method, params = self.messages[msg_id]
+ if error:
+ print "received error:", c, method, params
+ else:
+ self.handle_response(method, params, result)
- def start_session(self, wallet):
- addresses = wallet.all_addresses()
- version = wallet.electrum_version
- out = self.handler('session.new', [ version, addresses ] )
- self.session_id, self.message = ast.literal_eval( out )
- thread.start_new_thread(self.poll_thread, (wallet,))
- def update_session(self, addresses):
- out = self.handler('session.update', [ self.session_id, addresses ] )
- return out
- def handler(self, method, params = ''):
- import time
- cmds = {'session.new':'new_session',
- 'peers':'peers',
- 'session.poll':'poll',
- 'session.update':'update_session',
- 'transaction.broadcast':'tx',
- 'address.get_history':'h',
- 'address.subscribe':'address.subscribe'
- }
- cmd = cmds[method]
- if type(params) != type(''): params = repr( params )
- t1 = time.time()
- request = repr ( (cmd, params) ) + "#"
- s = socket.socket( socket.AF_INET, socket.SOCK_STREAM)
- s.settimeout(DEFAULT_TIMEOUT)
- s.connect(( self.host if cmd!='peers' else self.peers_server, self.port) )
- s.send( request )
- out = ''
- while 1:
- msg = s.recv(1024)
- if msg: out += msg
- else: break
- s.close()
- self.rtime = time.time() - t1
- self.is_connected = True
- if cmd in[ 'peers','h']:
- out = ast.literal_eval( out )
- return out
+ def handle_response(self, method, params, result):
- def poll_interval(self):
- return 5
-
- def retrieve_history(self, address):
- out = self.handler('address.get_history', address )
- return out
-
- def get_history(self, addr, history_callback):
- data = self.retrieve_history(addr)
- apply(history_callback, (addr, data) )
- self.was_updated = True
+ if method == 'session.new':
+ self.session_id, self.message = ast.literal_eval( result )
+ self.was_updated = True
- def subscribe(self, addr, status_callback):
- status = self.handler('address.subscribe', [ self.session_id, addr ] )
- apply(status_callback, (addr, status) )
+ elif method == 'server.banner':
+ self.message = result
+ self.was_updated = True
- def update_wallet(self, wallet):
- while True:
- changed_addresses = self.poll()
+ elif method == 'session.poll':
+ blocks, changed_addresses = ast.literal_eval( result )
+ if blocks == -1: raise BaseException("session not found")
+ self.blocks = int(blocks)
if changed_addresses:
self.is_up_to_date = False
+ self.was_updated = True
+ for addr, status in changed_addresses.items():
+ apply(self.address_callback, (addr, status))
else:
self.is_up_to_date = True
- break
- for addr, status in changed_addresses.items():
- wallet.receive_status_callback(addr, status)
+ elif method == 'server.peers':
+ self.servers = map( lambda x:x[1], result )
+
+ elif method == 'address.subscribe':
+ addr = params[-1]
+ if addr in self.addresses_waiting_for_status:
+ self.addresses_waiting_for_status.remove(addr)
+ apply(self.address_callback,(addr, result))
+
+ elif method == 'address.get_history':
+ addr = params[0]
+ if addr in self.addresses_waiting_for_history:
+ self.addresses_waiting_for_history.remove(addr)
+ apply(self.history_callback, (addr, result))
+ self.was_updated = True
+
+ elif method == 'transaction.broadcast':
+ self.tx_result = result
+ self.tx_event.set()
+
+ elif method == 'numblocks.subscribe':
+ self.blocks = result
+ if self.newblock_callback: apply(self.newblock_callback,(result,))
+ else:
+ print "received message:", c, method, params
+
+
+
+class PollingInterface(Interface):
+ """ non-persistent connection. synchronous calls"""
+
+ def start_session(self, addresses, version):
+ self.handler([('session.new', [ version, addresses ])] )
+ thread.start_new_thread(self.poll_thread, ())
+
+ def poll_interval(self):
+ return 5
+
+ def get_history(self, address):
+ self.handler([('address.get_history', [address] )])
+
+ def subscribe(self, addresses):
+ for addr in addresses:
+ self.handler([('address.subscribe', [ self.session_id, addr ] )])
+
+ def update_wallet(self):
+ while True:
+ self.handler([('session.poll', self.session_id )])
+ if self.is_up_to_date: break
#if is_new or wallet.remote_url:
# self.was_updated = True
@@ -137,17 +162,10 @@ class NativeInterface(Interface):
#else:
# return False
- def poll(self):
- out = self.handler('session.poll', self.session_id )
- blocks, changed_addr = ast.literal_eval( out )
- if blocks == -1: raise BaseException("session not found")
- self.blocks = int(blocks)
- return changed_addr
-
- def poll_thread(self, wallet):
+ def poll_thread(self):
while self.is_connected:
try:
- self.update_wallet(wallet)
+ self.update_wallet()
time.sleep(self.poll_interval())
except socket.gaierror:
break
@@ -171,8 +189,7 @@ class NativeInterface(Interface):
for server in DEFAULT_SERVERS:
try:
self.peers_server = server
- out = self.handler('peers')
- self.servers = map( lambda x:x[1], out )
+ self.handler([('server.peers',[])])
# print "Received server list from %s" % self.peers_server, out
break
except socket.timeout:
@@ -186,53 +203,100 @@ class NativeInterface(Interface):
-class HttpInterface(NativeInterface):
- def handler(self, method, params = []):
+
+class NativeInterface(PollingInterface):
+
+ def handler(self, messages):
+ import time
+ cmds = {'session.new':'new_session',
+ 'server.peers':'peers',
+ 'session.poll':'poll',
+ 'transaction.broadcast':'tx',
+ 'address.get_history':'h',
+ 'address.subscribe':'address.subscribe'
+ }
+
+ for m in messages:
+ method, params = m
+ cmd = cmds[method]
+
+ if cmd=='h':
+ str_params = params[0]
+ elif type(params) != type(''):
+ str_params = repr( params )
+ else:
+ str_params = params
+ t1 = time.time()
+ request = repr ( (cmd, str_params) ) + "#"
+ s = socket.socket( socket.AF_INET, socket.SOCK_STREAM)
+ s.settimeout(DEFAULT_TIMEOUT)
+ s.connect(( self.host if cmd!='peers' else self.peers_server, self.port) )
+ s.send( request )
+ out = ''
+ while 1:
+ msg = s.recv(1024)
+ if msg: out += msg
+ else: break
+ s.close()
+ self.rtime = time.time() - t1
+ self.is_connected = True
+ if cmd in[ 'peers','h']:
+ out = ast.literal_eval( out )
+
+ if out=='': out=None #fixme
+
+ self.handle_response(method, params, out)
+
+
+
+
+
+class HttpInterface(PollingInterface):
+
+ def __init__(self, host, port, address_callback=None, history_callback=None, newblock_callback=None):
+ Interface.__init__(self, host, port, address_callback, history_callback, newblock_callback)
+ self.message_id = 0
+ self.messages = {}
+
+ def handler(self, messages):
import urllib2, json, time
- if type(params) != type([]): params = [ params ]
- t1 = time.time()
- data = { 'method':method, 'id':'jsonrpc', 'params':params }
+
+ data = []
+ for m in messages:
+ method, params = m
+ if type(params) != type([]): params = [params]
+ t1 = time.time()
+ data.append( { 'method':method, 'id':self.message_id, 'params':params } )
+ self.messages[self.message_id] = (method, params)
+ self.message_id += 1
+
data_json = json.dumps(data)
- host = 'http://%s:%d'%( self.host if method!='peers' else self.peers_server, self.port )
+ host = 'http://%s:%d'%( self.host if method!='server.peers' else self.peers_server, self.port )
req = urllib2.Request(host, data_json, {'content-type': 'application/json'})
response_stream = urllib2.urlopen(req)
response = json.loads( response_stream.read() )
- out = response.get('result')
- if not out:
- print response
+
self.rtime = time.time() - t1
self.is_connected = True
- return out
+
+ for item in response:
+ self.handle_json_response(item)
import threading
-class TCPInterface(Interface):
- """json-rpc over persistent TCP connection"""
+class AsynchronousInterface(Interface):
+ """json-rpc over persistent TCP connection, asynchronous"""
- def __init__(self, host, port):
- Interface.__init__(self, host, port)
+ def __init__(self, host, port, address_callback=None, history_callback=None, newblock_callback=None):
+ Interface.__init__(self, host, port, address_callback, history_callback, newblock_callback)
self.message_id = 0
self.messages = {}
- self.tx_event = threading.Event()
- self.addresses_waiting_for_status = []
- self.addresses_waiting_for_history = []
- # up to date
- self.is_up_to_date = False
- self.up_to_date_event = threading.Event()
- self.up_to_date_event.clear()
-
- def send(self, method, params = []):
- request = json.dumps( { 'id':self.message_id, 'method':method, 'params':params } )
- self.messages[self.message_id] = (method, params)
- self.s.send( request + '\n' )
- self.message_id += 1
-
- def listen_thread(self, wallet):
+ def listen_thread(self):
try:
self.is_connected = True
out = ''
@@ -250,90 +314,59 @@ class TCPInterface(Interface):
c = out[0:s]
out = out[s+1:]
c = json.loads(c)
-
- #print c
- msg_id = c.get('id')
- result = c.get('result')
- error = c.get('error')
-
- if msg_id is None:
- print "error: message without ID"
- continue
-
- method, params = self.messages[msg_id]
-
- if method == 'server.banner':
- self.message = result
- self.was_updated = True
-
- elif method == 'server.peers':
- self.servers = map( lambda x:x[1], result )
-
- elif method == 'address.subscribe':
- addr = params[0]
- if addr in self.addresses_waiting_for_status:
- self.addresses_waiting_for_status.remove(addr)
- wallet.receive_status_callback(addr, result)
-
- elif method == 'address.get_history':
- addr = params[0]
- if addr in self.addresses_waiting_for_history:
- self.addresses_waiting_for_history.remove(addr)
- wallet.receive_history_callback(addr, result)
- self.was_updated = True
-
- elif method == 'transaction.broadcast':
- self.tx_result = result
- self.tx_event.set()
-
- elif method == 'numblocks.subscribe':
- self.blocks = result
-
- else:
- print "received message:", c
-
+ self.handle_json_response(c)
if self.addresses_waiting_for_status or self.addresses_waiting_for_history:
self.is_up_to_date = False
else:
self.is_up_to_date = True
self.up_to_date_event.set()
+
except:
traceback.print_exc(file=sys.stdout)
self.is_connected = False
self.disconnected_event.set()
- def update_wallet(self,wallet):
+ def update_wallet(self,cb):
self.up_to_date_event.wait()
+ def send(self, messages):
+ out = ''
+ for m in messages:
+ method, params = m
+ request = json.dumps( { 'id':self.message_id, 'method':method, 'params':params } )
+ self.messages[self.message_id] = (method, params)
+ self.message_id += 1
+ out += request + '\n'
+ self.s.send( out )
+
def send_tx(self, data):
self.tx_event.clear()
- self.send('transaction.broadcast', [data] )
+ self.send([('transaction.broadcast', [data])])
self.tx_event.wait()
return self.tx_result
- def subscribe(self, address, callback):
- self.send('address.subscribe', [address])
- self.addresses_waiting_for_status.append(address)
-
+ def subscribe(self, addresses):
+ messages = []
+ for addr in addresses:
+ messages.append(('address.subscribe', [addr]))
+ self.addresses_waiting_for_status.append(addr)
+ self.send(messages)
+
def get_servers(self):
- self.send('server.peers')
+ self.send([('server.peers',[])])
- def get_history(self, addr, callback):
- self.send('address.get_history', [addr])
- self.addresses_waiting_for_history.append(addr)
+ def get_history(self, addr):
+ self.send([('address.get_history', [addr])])
+ self.addresses_waiting_for_history.append(addr)
- def start_session(self, wallet):
+ def start_session(self, addresses, version):
self.s = socket.socket( socket.AF_INET, socket.SOCK_STREAM )
self.s.settimeout(1)
self.s.connect(( self.host, self.port))
- thread.start_new_thread(self.listen_thread, (wallet,))
- self.send('client.version', [wallet.electrum_version])
- self.send('server.banner')
- self.send('numblocks.subscribe')
- for address in wallet.all_addresses():
- self.subscribe(address, wallet.receive_status_callback)
-
+ thread.start_new_thread(self.listen_thread, ())
+ self.send([('client.version', [version]), ('server.banner',[]), ('numblocks.subscribe',[])])
+ self.subscribe(addresses)
@@ -345,16 +378,20 @@ def new_interface(wallet):
else:
host = random.choice( DEFAULT_SERVERS ) # random choice when the wallet is created
port = wallet.port
+ address_cb = wallet.receive_status_callback
+ history_cb = wallet.receive_history_callback
if port == 50000:
- interface = NativeInterface(host,port)
+ InterfaceClass = NativeInterface
elif port == 50001:
- interface = TCPInterface(host,port)
+ InterfaceClass = AsynchronousInterface
elif port in [80, 81, 8080, 8081]:
- interface = HttpInterface(host,port)
+ InterfaceClass = HttpInterface
else:
print "unknown port number: %d. using native protocol."%port
- interface = NativeInterface(host,port)
+ InterfaceClass = NativeInterface
+
+ interface = InterfaceClass(host, port, address_cb, history_cb)
return interface
@@ -362,7 +399,9 @@ def new_interface(wallet):
def loop_interfaces_thread(wallet):
while True:
try:
- wallet.interface.start_session(wallet)
+ addresses = wallet.all_addresses()
+ version = wallet.electrum_version
+ wallet.interface.start_session(addresses, version)
wallet.interface.get_servers()
wallet.interface.disconnected_event.wait()
diff --git a/client/wallet.py b/client/wallet.py
@@ -456,7 +456,7 @@ class Wallet:
def create_new_address(self, bool):
address = self.create_new_address_without_history(bool)
- self.interface.subscribe(address, self.receive_status_callback)
+ self.interface.subscribe([address])
return address
@@ -701,11 +701,12 @@ class Wallet:
def receive_status_callback(self, addr, status):
if self.status.get(addr) != status:
+ #print "updating status for", addr, repr(self.status.get(addr)), repr(status)
self.status[addr] = status
- self.interface.get_history(addr, self.receive_history_callback)
+ self.interface.get_history(addr)
def receive_history_callback(self, addr, data):
- #print "updating history for", addr
+ #print "updating history for", addr, repr(data)
self.history[addr] = data
self.synchronize()
self.update_tx_history()