commit 8f779f504f76a48899722203d045c1e3ac850d11
parent 35adc3231b297d03ad0a0534d65665ad14c0d9f6
Author: ThomasV <thomasv@electrum.org>
Date: Fri, 13 Jul 2018 17:05:04 +0200
LNWorker: connect to multiple peers.
save exceptions in aiosafe.
enable adding peer in GUI.
Diffstat:
4 files changed, 77 insertions(+), 33 deletions(-)
diff --git a/electrum/gui/qt/channels_list.py b/electrum/gui/qt/channels_list.py
@@ -87,7 +87,7 @@ class ChannelsList(MyTreeWidget):
push_amt_inp.setAmount(0)
h.addWidget(QLabel(_('Your Node ID')), 0, 0)
h.addWidget(local_nodeid, 0, 1)
- h.addWidget(QLabel(_('Remote Node ID')), 1, 0)
+ h.addWidget(QLabel(_('Remote Node ID or connection string')), 1, 0)
h.addWidget(remote_nodeid, 1, 1)
h.addWidget(QLabel('Local amount'), 2, 0)
h.addWidget(local_amt_inp, 2, 1)
@@ -97,19 +97,43 @@ class ChannelsList(MyTreeWidget):
vbox.addLayout(Buttons(CancelButton(d), OkButton(d)))
if not d.exec_():
return
- nodeid_hex = str(remote_nodeid.text())
local_amt = local_amt_inp.get_amount()
push_amt = push_amt_inp.get_amount()
+ connect_contents = str(remote_nodeid.text())
+ rest = None
+ try:
+ nodeid_hex, rest = connect_contents.split("@")
+ except ValueError:
+ nodeid_hex = connect_contents
try:
node_id = bfh(nodeid_hex)
+ assert len(node_id) == 33
except:
- self.parent.show_error(_('Invalid node ID'))
- return
- if node_id not in self.parent.wallet.lnworker.peers and node_id not in self.parent.network.lightning_nodes:
- self.parent.show_error(_('Unknown node:') + ' ' + nodeid_hex)
+ self.parent.show_error(_('Invalid node ID, must be 33 bytes and hexadecimal'))
return
- assert local_amt >= 200000
- assert local_amt >= push_amt
+ peer = self.parent.wallet.lnworker.peers.get(node_id)
+
+ if not peer:
+ known = node_id in self.parent.network.lightning_nodes
+ if rest is not None:
+ try:
+ host, port = rest.split(":")
+ except ValueError:
+ self.parent.show_error(_('Connection strings must be in <node_pubkey>@<host>:<port> format'))
+ elif known:
+ node = self.network.lightning_nodes.get(node_id)
+ host, port = node['addresses'][0]
+ else:
+ self.parent.show_error(_('Unknown node:') + ' ' + nodeid_hex)
+ return
+ try:
+ int(port)
+ except:
+ self.parent.show_error(_('Port number must be decimal'))
+ return
+
+ self.parent.wallet.lnworker.add_peer(host, port, node_id)
+
self.main_window.protect(self.open_channel, (node_id, local_amt, push_amt))
def open_channel(self, *args, **kwargs):
diff --git a/electrum/lnbase.py b/electrum/lnbase.py
@@ -267,21 +267,24 @@ def create_ephemeral_key(privkey):
def aiosafe(f):
+ # save exception in object.
+ # f must be a method of a PrintError instance.
+ # aiosafe calls should not be nested
async def f2(*args, **kwargs):
+ self = args[0]
try:
return await f(*args, **kwargs)
- except:
- # if the loop isn't stopped
- # run_forever in network.py would not return,
- # the asyncioThread would not die,
- # and we would block on shutdown
- asyncio.get_event_loop().stop()
- traceback.print_exc()
+ except BaseException as e:
+ self.print_msg("Exception in", f.__name__, ":", e.__class__.__name__, str(e))
+ self.exception = e
return f2
+
+
class Peer(PrintError):
def __init__(self, lnworker, host, port, pubkey, request_initial_sync=False):
+ self.exception = None # set by aiosafe
self.host = host
self.port = port
self.pubkey = pubkey
@@ -307,7 +310,7 @@ class Peer(PrintError):
self.attempted_route = {}
def diagnostic_name(self):
- return self.host
+ return 'lnbase:' + self.host
def ping_if_required(self):
if time.time() - self.ping_time > 120:
@@ -455,7 +458,7 @@ class Peer(PrintError):
'alias': alias,
'addresses': addresses
}
- self.print_error('node announcement', binascii.hexlify(pubkey), alias, addresses)
+ #self.print_error('node announcement', binascii.hexlify(pubkey), alias, addresses)
self.network.trigger_callback('ln_status')
def on_init(self, payload):
@@ -476,8 +479,7 @@ class Peer(PrintError):
else:
self.announcement_signatures[channel_id].put_nowait(payload)
- @aiosafe
- async def main_loop(self):
+ async def initialize(self):
self.reader, self.writer = await asyncio.open_connection(self.host, self.port)
await self.handshake()
# send init
@@ -486,6 +488,10 @@ class Peer(PrintError):
msg = await self.read_message()
self.process_message(msg)
self.initialized.set_result(True)
+
+ @aiosafe
+ async def main_loop(self):
+ await asyncio.wait_for(self.initialize(), 5)
# loop
while True:
self.ping_if_required()
diff --git a/electrum/lnrouter.py b/electrum/lnrouter.py
@@ -70,7 +70,7 @@ class ChannelInfo(PrintError):
self.policy_node1 = ChannelInfoDirectedPolicy(msg_payload)
else:
self.policy_node2 = ChannelInfoDirectedPolicy(msg_payload)
- self.print_error('channel update', binascii.hexlify(self.channel_id).decode("ascii"), flags)
+ #self.print_error('channel update', binascii.hexlify(self.channel_id).decode("ascii"), flags)
def get_policy_for_node(self, node_id):
if node_id == self.node_id_1:
@@ -112,7 +112,7 @@ class ChannelDB(PrintError):
def on_channel_announcement(self, msg_payload):
short_channel_id = msg_payload['short_channel_id']
- self.print_error('channel announcement', binascii.hexlify(short_channel_id).decode("ascii"))
+ #self.print_error('channel announcement', binascii.hexlify(short_channel_id).decode("ascii"))
channel_info = ChannelInfo(msg_payload)
self._id_to_channel_info[short_channel_id] = channel_info
self._channels_for_node[channel_info.node_id_1].add(short_channel_id)
diff --git a/electrum/lnworker.py b/electrum/lnworker.py
@@ -5,12 +5,13 @@ import os
from decimal import Decimal
import threading
from collections import defaultdict
+import random
from . import constants
from .bitcoin import sha256, COIN
from .util import bh2u, bfh, PrintError
from .constants import set_testnet, set_simnet
-from .lnbase import Peer, privkey_to_pubkey
+from .lnbase import Peer, privkey_to_pubkey, aiosafe
from .lnaddr import lnencode, LnAddr, lndecode
from .ecc import der_sig_from_sig_string
from .transaction import Transaction
@@ -39,15 +40,13 @@ class LNWorker(PrintError):
self.peers = {}
self.channels = {x.channel_id: x for x in map(HTLCStateMachine, wallet.storage.get("channels", []))}
self.invoices = wallet.storage.get('lightning_invoices', {})
- peer_list = network.config.get('lightning_peers', node_list)
self.channel_state = {chan.channel_id: "DISCONNECTED" for chan in self.channels.values()}
for chan_id, chan in self.channels.items():
self.network.lnwatcher.watch_channel(chan, self.on_channel_utxos)
- for host, port, pubkey in peer_list:
- self.add_peer(host, int(port), bfh(pubkey))
# wait until we see confirmations
self.network.register_callback(self.on_network_update, ['updated', 'verified']) # thread safe
self.on_network_update('updated') # shortcut (don't block) if funding tx locked and verified
+ asyncio.run_coroutine_threadsafe(self.main_loop(), asyncio.get_event_loop())
def channels_for_peer(self, node_id):
assert type(node_id) is bytes
@@ -118,15 +117,9 @@ class LNWorker(PrintError):
conf = self.wallet.get_tx_height(chan.funding_outpoint.txid)[1]
peer.on_network_update(chan, conf)
- async def _open_channel_coroutine(self, node_id, amount_sat, push_sat, password):
- if node_id not in self.peers:
- node = self.network.lightning_nodes.get(node_id)
- if node is None:
- return "node not found, peers available are: " + str(self.network.lightning_nodes.keys())
- host, port = node['addresses'][0]
- self.add_peer(host, port, node_id)
+ async def _open_channel_coroutine(self, node_id, local_amount_sat, push_sat, password):
peer = self.peers[node_id]
- openingchannel = await peer.channel_establishment_flow(self.wallet, self.config, password, amount_sat, push_sat * 1000, temp_channel_id=os.urandom(32))
+ openingchannel = await peer.channel_establishment_flow(self.wallet, self.config, password, local_amount_sat + push_sat, push_sat * 1000, temp_channel_id=os.urandom(32))
self.save_channel(openingchannel)
self.network.lnwatcher.watch_channel(openingchannel, self.on_channel_utxos)
self.on_channels_updated()
@@ -192,3 +185,24 @@ class LNWorker(PrintError):
tx.add_signature_to_txin(0, none_idx, bh2u(remote_sig))
assert tx.is_complete()
return self.network.broadcast_transaction(tx)
+
+ @aiosafe
+ async def main_loop(self):
+ peer_list = self.config.get('lightning_peers', node_list)
+ for host, port, pubkey in peer_list:
+ self.add_peer(host, int(port), bfh(pubkey))
+ while True:
+ await asyncio.sleep(1)
+ for k, peer in list(self.peers.items()):
+ if peer.exception:
+ self.print_error("removing peer", peer.host)
+ self.peers.pop(k)
+ if len(self.peers) > 3:
+ continue
+ node_id = random.choice(list(self.network.lightning_nodes.keys()))
+ node = self.network.lightning_nodes.get(node_id)
+ addresses = node.get('addresses')
+ if addresses:
+ host, port = addresses[0]
+ self.print_error("trying node", bh2u(node_id))
+ self.add_peer(host, port, node_id)