commit 0e42fd9f1738a05f9059f00fa3b9d9868326e7e0
parent 1011245c5e29a128edf4629d024c4cf345190282
Author: ThomasV <thomasv@electrum.org>
Date: Mon, 13 May 2019 22:33:56 +0200
parallelize short_channel_id requests
Diffstat:
4 files changed, 82 insertions(+), 62 deletions(-)
diff --git a/electrum/gui/qt/lightning_dialog.py b/electrum/gui/qt/lightning_dialog.py
@@ -78,8 +78,11 @@ class LightningDialog(QDialog):
# channel_db
network_w = QWidget()
network_vbox = QVBoxLayout(network_w)
+ self.num_peers = QLabel('')
+ network_vbox.addWidget(self.num_peers)
self.status = QLabel('')
network_vbox.addWidget(self.status)
+ network_vbox.addStretch(1)
# local
local_w = QWidget()
vbox_local = QVBoxLayout(local_w)
@@ -105,14 +108,17 @@ class LightningDialog(QDialog):
b.clicked.connect(self.on_close)
vbox.addLayout(Buttons(b))
self.watcher_list.update()
- self.gui_object.timer.timeout.connect(self.update_status)
+ self.network.register_callback(self.update_status, ['ln_status'])
- def update_status(self):
+ def update_status(self, event):
if self.network.lngossip is None:
return
channel_db = self.network.channel_db
num_peers = sum([p.initialized.is_set() for p in self.network.lngossip.peers.values()])
- msg = _('{} peers, {} nodes, {} channels.').format(num_peers, channel_db.num_nodes, channel_db.num_channels)
+ self.num_peers.setText(f'{num_peers} peers, {channel_db.num_nodes} nodes')
+ known = channel_db.num_channels
+ unknown = len(self.network.lngossip.unknown_ids)
+ msg = _(f'Channels: {known} of {known + unknown}')
self.status.setText(msg)
def on_close(self):
diff --git a/electrum/lnpeer.py b/electrum/lnpeer.py
@@ -83,7 +83,6 @@ class Peer(Logger):
self.recv_commitment_for_ctn_last = defaultdict(lambda: None) # type: Dict[Channel, Optional[int]]
self._local_changed_events = defaultdict(asyncio.Event)
self._remote_changed_events = defaultdict(asyncio.Event)
- self.receiving_channels = False
Logger.__init__(self)
def send_message(self, message_name: str, **kwargs):
@@ -197,7 +196,7 @@ class Peer(Logger):
try:
return await func(self, *args, **kwargs)
except Exception as e:
- self.logger.info("Disconnecting: {}".format(e))
+ self.logger.info("Disconnecting: {}".format(repr(e)))
finally:
self.close_and_cleanup()
return wrapper_func
@@ -207,8 +206,47 @@ class Peer(Logger):
async def main_loop(self):
async with aiorpcx.TaskGroup() as group:
await group.spawn(self._message_loop())
- # kill group if the peer times out
- await group.spawn(asyncio.wait_for(self.initialized.wait(), 10))
+ await group.spawn(self._run_gossip())
+
+ @log_exceptions
+ async def _run_gossip(self):
+ await asyncio.wait_for(self.initialized.wait(), 5)
+ if self.lnworker == self.lnworker.network.lngossip:
+ ids, complete = await asyncio.wait_for(self.get_channel_range(), 10)
+ self.logger.info('Received {} channel ids. (complete: {})'.format(len(ids), complete))
+ self.lnworker.add_new_ids(ids)
+ while True:
+ todo = self.lnworker.get_ids_to_query()
+ if not todo:
+ await asyncio.sleep(1)
+ continue
+ await self.querying_lock.acquire()
+ self.logger.info(f'Querying {len(todo)} short_channel_ids')
+ self.query_short_channel_ids(todo)
+
+ async def get_channel_range(self):
+ req_index = self.lnworker.first_block
+ req_num = self.lnworker.network.get_local_height() - req_index
+ self.query_channel_range(req_index, req_num)
+ intervals = []
+ ids = set()
+ while True:
+ index, num, complete, _ids = await self.reply_channel_range.get()
+ ids.update(_ids)
+ intervals.append((index, index+num))
+ intervals.sort()
+ while len(intervals) > 1:
+ a,b = intervals[0]
+ c,d = intervals[1]
+ if b == c:
+ intervals = [(a,d)] + intervals[2:]
+ else:
+ break
+ if len(intervals) == 1:
+ a, b = intervals[0]
+ if a <= req_index and b >= req_index + req_num:
+ break
+ return ids, complete
def request_gossip(self, timestamp=0):
if timestamp == 0:
@@ -222,7 +260,7 @@ class Peer(Logger):
timestamp_range=b'\xff'*4)
def query_channel_range(self, index, num):
- self.logger.info(f'query channel range')
+ self.logger.info(f'query channel range {index} {num}')
self.send_message(
'query_channel_range',
chain_hash=constants.net.rev_genesis_bytes(),
@@ -250,9 +288,7 @@ class Peer(Logger):
ids = self.decode_short_ids(encoded)
self.reply_channel_range.put_nowait((first, num, complete, ids))
- async def query_short_channel_ids(self, ids, compressed=True):
- await self.querying_lock.acquire()
- #self.logger.info('querying {} short_channel_ids'.format(len(ids)))
+ def query_short_channel_ids(self, ids, compressed=True):
s = b''.join(ids)
encoded = zlib.compress(s) if compressed else s
prefix = b'\x01' if compressed else b'\x00'
@@ -282,11 +318,7 @@ class Peer(Logger):
self.transport.close()
except:
pass
- for chan in self.channels.values():
- if chan.get_state() != 'FORCE_CLOSING':
- chan.set_state('DISCONNECTED')
- self.network.trigger_callback('channel', chan)
- self.lnworker.peers.pop(self.pubkey)
+ self.lnworker.peer_closed(self)
def make_local_config(self, funding_sat: int, push_msat: int, initiator: HTLCOwner) -> LocalConfig:
# key derivation
diff --git a/electrum/lnrouter.py b/electrum/lnrouter.py
@@ -359,7 +359,6 @@ class ChannelDB(SqlDB):
self.DBSession.commit()
self._update_counts()
self.logger.info('on_channel_announcement: %d/%d'%(len(new_channels), len(msg_payloads)))
- self.network.trigger_callback('ln_status')
@sql
def get_last_timestamp(self):
@@ -457,7 +456,6 @@ class ChannelDB(SqlDB):
self.DBSession.add(new_addr)
self.DBSession.commit()
self._update_counts()
- self.network.trigger_callback('ln_status')
def get_routing_policy_for_channel(self, start_node_id: bytes,
short_channel_id: bytes) -> Optional[bytes]:
diff --git a/electrum/lnworker.py b/electrum/lnworker.py
@@ -101,6 +101,7 @@ class LNWorker(Logger):
self.network.trigger_callback('ln_status')
await asyncio.start_server(cb, addr, int(port))
+ @log_exceptions
async def main_loop(self):
while True:
await asyncio.sleep(1)
@@ -165,7 +166,7 @@ class LNWorker(Logger):
host, port = self.choose_preferred_address(addrs)
peer = LNPeerAddr(host, port, bytes.fromhex(node.node_id))
if peer in self._last_tried_peer: continue
- self.logger.info('taking random ln peer from our channel db')
+ #self.logger.info('taking random ln peer from our channel db')
return [peer]
# TODO remove this. For some reason the dns seeds seem to ignore the realm byte
@@ -237,62 +238,38 @@ class LNGossip(LNWorker):
node = BIP32Node.from_rootseed(seed, xtype='standard')
xprv = node.to_xprv()
super().__init__(xprv)
+ self.localfeatures |= LnLocalFeatures.GOSSIP_QUERIES_OPT
self.localfeatures |= LnLocalFeatures.GOSSIP_QUERIES_REQ
+ self.unknown_ids = set()
def start_network(self, network: 'Network'):
super().start_network(network)
asyncio.run_coroutine_threadsafe(self.network.main_taskgroup.spawn(self.gossip_task()), self.network.asyncio_loop)
- async def gossip_task(self):
- req_index = self.first_block
- req_num = self.network.get_local_height() - req_index
- while len(self.peers) == 0:
- await asyncio.sleep(1)
- continue
- # todo: parallelize over peers
- peer = list(self.peers.values())[0]
- await peer.initialized.wait()
- # send channels_range query. peer will reply with several intervals
- peer.query_channel_range(req_index, req_num)
- intervals = []
- ids = set()
- # wait until requested range is covered
- while True:
- index, num, complete, _ids = await peer.reply_channel_range.get()
- ids.update(_ids)
- intervals.append((index, index+num))
- intervals.sort()
- while len(intervals) > 1:
- a,b = intervals[0]
- c,d = intervals[1]
- if b == c:
- intervals = [(a,d)] + intervals[2:]
- else:
- break
- if len(intervals) == 1:
- a, b = intervals[0]
- if a <= req_index and b >= req_index + req_num:
- break
- self.logger.info('Received {} channel ids. (complete: {})'.format(len(ids), complete))
- # TODO: filter results by date of last channel update, purge DB
+ def add_new_ids(self, ids):
#if complete:
# self.channel_db.purge_unknown_channels(ids)
known = self.channel_db.compare_channels(ids)
- unknown = list(ids - set(known))
- total = len(unknown)
- N = 500
- while unknown:
- self.channel_db.process_gossip()
- await peer.query_short_channel_ids(unknown[0:N])
- unknown = unknown[N:]
- self.logger.info(f'Querying channels: {total - len(unknown)}/{total}. Count: {self.channel_db.num_channels}')
+ new = ids - set(known)
+ self.unknown_ids.update(new)
- # request gossip fromm current time
- now = int(time.time())
- peer.request_gossip(now)
+ def get_ids_to_query(self):
+ N = 250
+ l = list(self.unknown_ids)
+ self.unknown_ids = set(l[N:])
+ return l[0:N]
+
+ async def gossip_task(self):
while True:
await asyncio.sleep(5)
self.channel_db.process_gossip()
+ known = self.channel_db.num_channels
+ unknown = len(self.unknown_ids)
+ self.logger.info(f'Channels: {known} of {known+unknown}')
+ self.network.trigger_callback('ln_status')
+
+ def peer_closed(self, peer):
+ self.peers.pop(peer.pubkey)
class LNWallet(LNWorker):
@@ -343,6 +320,13 @@ class LNWallet(LNWorker):
]:
asyncio.run_coroutine_threadsafe(self.network.main_taskgroup.spawn(coro), self.network.asyncio_loop)
+ def peer_closed(self, peer):
+ for chan in self.channels_for_peer(peer.pubkey).values():
+ if chan.get_state() != 'FORCE_CLOSING':
+ chan.set_state('DISCONNECTED')
+ self.network.trigger_callback('channel', chan)
+ self.peers.pop(peer.pubkey)
+
def payment_completed(self, chan: Channel, direction: Direction,
htlc: UpdateAddHtlc):
chan_id = chan.channel_id