commit 2a1b717a21c2344e78cd35522ff72d36bee78ddc
parent 748662dedede8f79459125b11517a110f01b7359
Author: Neil Booth <kyuupichan@gmail.com>
Date: Mon, 31 Aug 2015 17:44:50 +0900
Fix scripts/peers
Needed an almost complete rewrite now we use select.
Diffstat:
2 files changed, 70 insertions(+), 60 deletions(-)
diff --git a/lib/__init__.py b/lib/__init__.py
@@ -3,7 +3,7 @@ from util import format_satoshis, print_msg, print_json, print_error, set_verbos
from wallet import Synchronizer, WalletStorage
from wallet import Wallet, Imported_Wallet
from network import Network, DEFAULT_SERVERS, DEFAULT_PORTS, pick_random_server
-from interface import Interface
+from interface import Connection, Interface
from simple_config import SimpleConfig, get_config, set_config
import bitcoin
import account
diff --git a/scripts/util.py b/scripts/util.py
@@ -1,71 +1,81 @@
-import time, electrum, Queue
-from electrum import Interface, SimpleConfig
+import select, time, electrum, Queue
+from electrum import Connection, Interface, SimpleConfig
from electrum.network import filter_protocol, parse_servers
+from collections import defaultdict
# electrum.util.set_verbosity(1)
+def get_interfaces(servers, timeout=10):
+ '''Returns a map of servers to connected interfaces. If any
+ connections fail or timeout, they will be missing from the map.
+ '''
+ socket_queue = Queue.Queue()
+ config = SimpleConfig()
+ connecting = {}
+ for server in servers:
+ if server not in connecting:
+ connecting[server] = Connection(server, socket_queue, config.path)
+ interfaces = {}
+ timeout = time.time() + timeout
+ while time.time() < timeout:
+ try:
+ server, socket = socket_queue.get(True, 1)
+ except Queue.Empty:
+ continue
+ connecting.pop(server)
+ if socket:
+ interfaces[server] = Interface(server, socket)
+ return interfaces
+
+def wait_on_interfaces(interfaces, timeout=10):
+ '''Return a map of servers to a list of (request, response) tuples.
+ Waits timeout seconds, or until each interface has a response'''
+ result = defaultdict(list)
+ timeout = time.time() + timeout
+ while len(result) < len(interfaces) and time.time() < timeout:
+ rin = [i for i in interfaces.values()]
+ win = [i for i in interfaces.values() if i.unsent_requests]
+ rout, wout, xout = select.select(rin, win, [], 1)
+ for interface in wout:
+ interface.send_requests()
+ for interface in rout:
+ notifications, responses = interface.get_responses()
+ if responses:
+ result[interface.server].extend(responses)
+ return result
def get_peers():
- # 1. start interface and wait for connection
- q = Queue.Queue()
- interface = electrum.Interface('ecdsa.net:110:s', q)
- interface.start()
- i, r = q.get()
- if not interface.is_connected():
- raise BaseException("not connected")
+ peers = []
+ # 1. get connected interfaces
+ server = 'ecdsa.net:110:s'
+ interfaces = get_interfaces([server])
+ if not interfaces:
+ print "No connection to", server
+ return []
# 2. get list of peers
- interface.send_request({'id':0, 'method':'server.peers.subscribe','params':[]})
- i, r = q.get(timeout=10000)
- peers = parse_servers(r.get('result'))
- peers = filter_protocol(peers,'s')
- i.stop()
+ interface = interfaces[server]
+ interface.queue_request({'id':0, 'method': 'server.peers.subscribe',
+ 'params': []})
+ responses = wait_on_interfaces(interfaces)
+ responses = responses.get(server)
+ if responses:
+ response = responses[0][1] # One response, (req, response) tuple
+ peers = parse_servers(response.get('result'))
+ peers = filter_protocol(peers,'s')
return peers
def send_request(peers, request):
print "Contacting %d servers"%len(peers)
- # start interfaces
- q2 = Queue.Queue()
- config = SimpleConfig()
- interfaces = map(lambda server: Interface(server, q2, config), peers)
- reached_servers = []
- for i in interfaces:
- i.start()
- t0 = time.time()
- while peers:
- try:
- i, r = q2.get(timeout=1)
- except:
- if time.time() - t0 > 10:
- print "timeout"
- break
- else:
- continue
- if i.server in peers:
- peers.remove(i.server)
- if i.is_connected():
- reached_servers.append(i)
- else:
- print "Connection failed:", i.server
-
- print "%d servers could be reached"%len(reached_servers)
-
- results_queue = Queue.Queue()
- for i in reached_servers:
- i.send_request(request, results_queue)
- results = {}
- t0 = time.time()
- while reached_servers:
- try:
- i, r = results_queue.get(timeout=1)
- except:
- if time.time() - t0 > 10:
- break
- else:
- continue
- results[i.server] = r.get('result')
- reached_servers.remove(i)
- i.stop()
-
- for i in reached_servers:
- print i.server, "did not answer"
+ interfaces = get_interfaces(peers)
+ print "%d servers could be reached" % len(interfaces)
+ for peer in peers:
+ if not peer in interfaces:
+ print "Connection failed:", peer
+ for i in interfaces.values():
+ i.queue_request(request)
+ responses = wait_on_interfaces(interfaces)
+ for peer in interfaces:
+ if not peer in responses:
+ print peer, "did not answer"
+ results = dict(zip(responses.keys(), [t[0][1] for t in responses.values()]))
print "%d answers"%len(results)
return results