commit 308dc6aa6b2ae94d142374720ca0f4fdd200baa2
parent e68f318b12775fc26e7e3a3a45605bc5a1311f4c
Author: ThomasV <thomasv@electrum.org>
Date: Wed, 15 May 2019 12:30:19 +0200
use a single queue for gossip messages, so that they are processed in the correct order
Diffstat:
3 files changed, 60 insertions(+), 52 deletions(-)
diff --git a/electrum/lnpeer.py b/electrum/lnpeer.py
@@ -69,10 +69,8 @@ class Peer(Logger):
self.channel_db = lnworker.network.channel_db
self.ping_time = 0
self.reply_channel_range = asyncio.Queue()
- # gossip message queues
- self.channel_announcements = asyncio.Queue()
- self.channel_updates = asyncio.Queue()
- self.node_announcements = asyncio.Queue()
+ # gossip uses a single queue to preserve message order
+ self.gossip_queue = asyncio.Queue()
# channel messsage queues
self.shutdown_received = defaultdict(asyncio.Future)
self.channel_accepted = defaultdict(asyncio.Queue)
@@ -181,13 +179,13 @@ class Peer(Logger):
self.initialized.set()
def on_node_announcement(self, payload):
- self.node_announcements.put_nowait(payload)
+ self.gossip_queue.put_nowait(('node_announcement', payload))
def on_channel_announcement(self, payload):
- self.channel_announcements.put_nowait(payload)
+ self.gossip_queue.put_nowait(('channel_announcement', payload))
def on_channel_update(self, payload):
- self.channel_updates.put_nowait(payload)
+ self.gossip_queue.put_nowait(('channel_update', payload))
def on_announcement_signatures(self, payload):
channel_id = payload['channel_id']
@@ -212,39 +210,42 @@ class Peer(Logger):
async def main_loop(self):
async with aiorpcx.TaskGroup() as group:
await group.spawn(self._message_loop())
- await group.spawn(self._run_gossip())
- await group.spawn(self.verify_node_announcements())
- await group.spawn(self.verify_channel_announcements())
- await group.spawn(self.verify_channel_updates())
+ await group.spawn(self.query_gossip())
+ await group.spawn(self.process_gossip())
- async def verify_node_announcements(self):
+ @log_exceptions
+ async def process_gossip(self):
+ # verify in peer's TaskGroup so that we fail the connection
+ # forward to channel_db.gossip_queue
while True:
- payload = await self.node_announcements.get()
- pubkey = payload['node_id']
- signature = payload['signature']
- h = sha256d(payload['raw'][66:])
- if not ecc.verify_signature(pubkey, signature, h):
+ name, payload = await self.gossip_queue.get()
+ if name == 'node_announcement':
+ self.verify_node_announcement(payload)
+ elif name == 'channel_announcement':
+ self.verify_channel_announcement(payload)
+ elif name == 'channel_update':
+ pass
+ else:
+ raise Exception('unknown message')
+ self.channel_db.gossip_queue.put_nowait((name, payload))
+
+ def verify_node_announcement(self, payload):
+ pubkey = payload['node_id']
+ signature = payload['signature']
+ h = sha256d(payload['raw'][66:])
+ if not ecc.verify_signature(pubkey, signature, h):
+ raise Exception('signature failed')
+
+ def verify_channel_announcement(self, payload):
+ h = sha256d(payload['raw'][2+256:])
+ pubkeys = [payload['node_id_1'], payload['node_id_2'], payload['bitcoin_key_1'], payload['bitcoin_key_2']]
+ sigs = [payload['node_signature_1'], payload['node_signature_2'], payload['bitcoin_signature_1'], payload['bitcoin_signature_2']]
+ for pubkey, sig in zip(pubkeys, sigs):
+ if not ecc.verify_signature(pubkey, sig, h):
raise Exception('signature failed')
- self.channel_db.node_anns.append(payload)
-
- async def verify_channel_announcements(self):
- while True:
- payload = await self.channel_announcements.get()
- h = sha256d(payload['raw'][2+256:])
- pubkeys = [payload['node_id_1'], payload['node_id_2'], payload['bitcoin_key_1'], payload['bitcoin_key_2']]
- sigs = [payload['node_signature_1'], payload['node_signature_2'], payload['bitcoin_signature_1'], payload['bitcoin_signature_2']]
- for pubkey, sig in zip(pubkeys, sigs):
- if not ecc.verify_signature(pubkey, sig, h):
- raise Exception('signature failed')
- self.channel_db.chan_anns.append(payload)
-
- async def verify_channel_updates(self):
- while True:
- payload = await self.channel_updates.get()
- self.channel_db.chan_upds.append(payload)
@log_exceptions
- async def _run_gossip(self):
+ async def query_gossip(self):
await asyncio.wait_for(self.initialized.wait(), 10)
if self.lnworker == self.lnworker.network.lngossip:
ids, complete = await asyncio.wait_for(self.get_channel_range(), 10)
diff --git a/electrum/lnrouter.py b/electrum/lnrouter.py
@@ -35,6 +35,7 @@ from collections import defaultdict
from typing import Sequence, List, Tuple, Optional, Dict, NamedTuple, TYPE_CHECKING, Set
import binascii
import base64
+import asyncio
from sqlalchemy import Column, ForeignKey, Integer, String, Boolean
from sqlalchemy.orm.query import Query
@@ -223,20 +224,7 @@ class ChannelDB(SqlDB):
self._channel_updates_for_private_channels = {} # type: Dict[Tuple[bytes, bytes], dict]
self.ca_verifier = LNChannelVerifier(network, self)
self.update_counts()
- self.node_anns = []
- self.chan_anns = []
- self.chan_upds = []
-
- def process_gossip(self):
- if self.chan_anns:
- self.on_channel_announcement(self.chan_anns)
- self.chan_anns = []
- if self.chan_upds:
- self.on_channel_update(self.chan_upds)
- self.chan_upds = []
- if self.node_anns:
- self.on_node_announcement(self.node_anns)
- self.node_anns = []
+ self.gossip_queue = asyncio.Queue()
@sql
def update_counts(self):
diff --git a/electrum/lnworker.py b/electrum/lnworker.py
@@ -244,7 +244,7 @@ class LNGossip(LNWorker):
def start_network(self, network: 'Network'):
super().start_network(network)
- asyncio.run_coroutine_threadsafe(self.network.main_taskgroup.spawn(self.gossip_task()), self.network.asyncio_loop)
+ asyncio.run_coroutine_threadsafe(self.network.main_taskgroup.spawn(self.process_gossip()), self.network.asyncio_loop)
def add_new_ids(self, ids):
#if complete:
@@ -259,10 +259,29 @@ class LNGossip(LNWorker):
self.unknown_ids = set(l[N:])
return l[0:N]
- async def gossip_task(self):
+ @log_exceptions
+ async def process_gossip(self):
while True:
await asyncio.sleep(5)
- self.channel_db.process_gossip()
+ chan_anns = []
+ chan_upds = []
+ node_anns = []
+ while True:
+ name, payload = await self.channel_db.gossip_queue.get()
+ if name == 'channel_announcement':
+ chan_anns.append(payload)
+ elif name == 'channel_update':
+ chan_upds.append(payload)
+ elif name == 'node_announcement':
+ node_anns.append(payload)
+ else:
+ raise Exception('unknown message')
+ if self.channel_db.gossip_queue.empty():
+ break
+ self.channel_db.on_channel_announcement(chan_anns)
+ self.channel_db.on_channel_update(chan_upds)
+ self.channel_db.on_node_announcement(node_anns)
+ # refresh gui
known = self.channel_db.num_channels
unknown = len(self.unknown_ids)
self.logger.info(f'Channels: {known} of {known+unknown}')