commit ac884eb3c34a42bd93aebc966a7dd0d764057f76
parent 28c5825f415ba115f57dbf060743734013a8ac07
Author: ThomasV <thomasv@electrum.org>
Date: Sun, 23 Feb 2020 17:18:45 +0100
lnpeer: Use a single queue per channel for messages that are ordered.
Forward error messages with 'temporary_channel_id' to the correct channel_id
Diffstat:
M | electrum/lnpeer.py | | | 94 | ++++++++++++++++++++++++++++++------------------------------------------------- |
1 file changed, 35 insertions(+), 59 deletions(-)
diff --git a/electrum/lnpeer.py b/electrum/lnpeer.py
@@ -82,15 +82,11 @@ class Peer(Logger):
self.reply_channel_range = asyncio.Queue()
# gossip uses a single queue to preserve message order
self.gossip_queue = asyncio.Queue()
- # channel messsage queues
+ self.ordered_messages = ['accept_channel', 'funding_signed', 'funding_created', 'accept_channel', 'channel_reestablish', 'closing_signed']
+ self.ordered_message_queues = defaultdict(asyncio.Queue) # for messsage that are ordered
+ self.temp_id_to_id = {} # to forward error messages
self.shutdown_received = defaultdict(asyncio.Future)
- self.channel_accepted = defaultdict(asyncio.Queue)
- self.channel_reestablished = defaultdict(asyncio.Queue)
- self.funding_signed = defaultdict(asyncio.Queue)
- self.funding_created = defaultdict(asyncio.Queue)
self.announcement_signatures = defaultdict(asyncio.Queue)
- self.closing_signed = defaultdict(asyncio.Queue)
- #
self.orphan_channel_updates = OrderedDict()
self._local_changed_events = defaultdict(asyncio.Event)
self._remote_changed_events = defaultdict(asyncio.Event)
@@ -147,26 +143,28 @@ class Peer(Logger):
def process_message(self, message):
message_type, payload = decode_msg(message)
- try:
- f = getattr(self, 'on_' + message_type)
- except AttributeError:
- #self.logger.info("Received '%s'" % message_type.upper(), payload)
- return
- # raw message is needed to check signature
- if message_type in ['node_announcement', 'channel_announcement', 'channel_update']:
- payload['raw'] = message
- execution_result = f(payload)
- if asyncio.iscoroutinefunction(f):
- asyncio.ensure_future(execution_result)
+ if message_type in self.ordered_messages:
+ chan_id = payload.get('channel_id') or payload["temporary_channel_id"]
+ self.ordered_message_queues[chan_id].put_nowait((message_type, payload))
+ else:
+ try:
+ f = getattr(self, 'on_' + message_type)
+ except AttributeError:
+ #self.logger.info("Received '%s'" % message_type.upper(), payload)
+ return
+ # raw message is needed to check signature
+ if message_type in ['node_announcement', 'channel_announcement', 'channel_update']:
+ payload['raw'] = message
+ execution_result = f(payload)
+ if asyncio.iscoroutinefunction(f):
+ asyncio.ensure_future(execution_result)
def on_error(self, payload):
self.logger.info(f"on_error: {payload['data'].decode('ascii')}")
chan_id = payload.get("channel_id")
- for d in [ self.channel_accepted, self.funding_signed,
- self.funding_created, self.channel_reestablished,
- self.announcement_signatures, self.closing_signed ]:
- if chan_id in d:
- d[chan_id].put_nowait({'error':payload['data']})
+ if chan_id in self.temp_id_to_id:
+ chan_id = self.temp_id_to_id[chan_id]
+ self.ordered_message_queues[chan_id].put_nowait((None, {'error':payload['data']}))
def on_ping(self, payload):
l = int.from_bytes(payload['num_pong_bytes'], 'big')
@@ -175,21 +173,14 @@ class Peer(Logger):
def on_pong(self, payload):
pass
- def on_accept_channel(self, payload):
- temp_chan_id = payload["temporary_channel_id"]
- if temp_chan_id not in self.channel_accepted:
- raise Exception("Got unknown accept_channel")
- self.channel_accepted[temp_chan_id].put_nowait(payload)
-
- def on_funding_signed(self, payload):
- channel_id = payload['channel_id']
- if channel_id not in self.funding_signed: raise Exception("Got unknown funding_signed")
- self.funding_signed[channel_id].put_nowait(payload)
-
- def on_funding_created(self, payload):
- channel_id = payload['temporary_channel_id']
- if channel_id not in self.funding_created: raise Exception("Got unknown funding_created")
- self.funding_created[channel_id].put_nowait(payload)
+ async def wait_for_message(self, expected_name, channel_id):
+ q = self.ordered_message_queues[channel_id]
+ name, payload = await asyncio.wait_for(q.get(), LN_P2P_NETWORK_TIMEOUT)
+ if payload.get('error'):
+ raise Exception('Remote peer reported error: ' + repr(payload.get('error')))
+ if name != expected_name:
+ raise Exception(f"Received unexpected '{name}'")
+ return payload
def on_init(self, payload):
if self._received_init:
@@ -533,9 +524,7 @@ class Peer(Logger):
channel_reserve_satoshis=local_config.reserve_sat,
htlc_minimum_msat=1,
)
- payload = await asyncio.wait_for(self.channel_accepted[temp_channel_id].get(), LN_P2P_NETWORK_TIMEOUT)
- if payload.get('error'):
- raise Exception('Remote Lightning peer reported error: ' + repr(payload.get('error')))
+ payload = await self.wait_for_message('accept_channel', temp_channel_id)
remote_per_commitment_point = payload['first_per_commitment_point']
funding_txn_minimum_depth = int.from_bytes(payload['minimum_depth'], 'big')
if funding_txn_minimum_depth <= 0:
@@ -601,12 +590,13 @@ class Peer(Logger):
lnworker=self.lnworker,
initial_feerate=feerate)
sig_64, _ = chan.sign_next_commitment()
+ self.temp_id_to_id[temp_channel_id] = channel_id
self.send_message("funding_created",
temporary_channel_id=temp_channel_id,
funding_txid=funding_txid_bytes,
funding_output_index=funding_index,
signature=sig_64)
- payload = await asyncio.wait_for(self.funding_signed[channel_id].get(), LN_P2P_NETWORK_TIMEOUT)
+ payload = await self.wait_for_message('funding_signed', channel_id)
self.logger.info('received funding_signed')
remote_sig = payload['signature']
chan.receive_new_commitment(remote_sig, [])
@@ -666,7 +656,7 @@ class Peer(Logger):
htlc_basepoint=local_config.htlc_basepoint.pubkey,
first_per_commitment_point=per_commitment_point_first,
)
- funding_created = await self.funding_created[temp_chan_id].get()
+ funding_created = await self.wait_for_message('funding_created', temp_chan_id)
funding_idx = int.from_bytes(funding_created['funding_output_index'], 'big')
funding_txid = bh2u(funding_created['funding_txid'][::-1])
channel_id, funding_txid_bytes = channel_id_from_funding_tx(funding_txid, funding_idx)
@@ -715,14 +705,6 @@ class Peer(Logger):
raise Exception(f'reserve too high: {remote_reserve_sat}, funding_sat: {funding_sat}')
return remote_reserve_sat
- def on_channel_reestablish(self, payload):
- chan_id = payload["channel_id"]
- chan = self.channels.get(chan_id)
- if not chan:
- self.logger.info(f"Received unknown channel_reestablish {bh2u(chan_id)} {payload}")
- raise Exception('Unknown channel_reestablish')
- self.channel_reestablished[chan_id].put_nowait(payload)
-
@log_exceptions
async def reestablish_channel(self, chan: Channel):
await self.initialized
@@ -765,8 +747,7 @@ class Peer(Logger):
self.logger.info(f'channel_reestablish: sent channel_reestablish with '
f'(next_local_ctn={next_local_ctn}, '
f'oldest_unrevoked_remote_ctn={oldest_unrevoked_remote_ctn})')
-
- msg = await self.channel_reestablished[chan_id].get()
+ msg = await self.wait_for_message('channel_reestablish', chan_id)
their_next_local_ctn = int.from_bytes(msg["next_local_commitment_number"], 'big')
their_oldest_unrevoked_remote_ctn = int.from_bytes(msg["next_remote_revocation_number"], 'big')
their_local_pcp = msg.get("my_current_per_commitment_point")
@@ -1356,11 +1337,6 @@ class Peer(Logger):
feerate_per_kw=feerate_per_kw)
await self.await_remote(chan, remote_ctn)
- def on_closing_signed(self, payload):
- chan_id = payload["channel_id"]
- if chan_id not in self.closing_signed: raise Exception("Got unknown closing_signed")
- self.closing_signed[chan_id].put_nowait(payload)
-
@log_exceptions
async def close_channel(self, chan_id: bytes):
chan = self.channels[chan_id]
@@ -1404,7 +1380,7 @@ class Peer(Logger):
our_sig, closing_tx = chan.make_closing_tx(scriptpubkey, payload['scriptpubkey'], fee_sat=our_fee)
self.send_message('closing_signed', channel_id=chan.channel_id, fee_satoshis=our_fee, signature=our_sig)
# FIXME: the remote SHOULD send closing_signed, but some don't.
- cs_payload = await asyncio.wait_for(self.closing_signed[chan.channel_id].get(), LN_P2P_NETWORK_TIMEOUT)
+ cs_payload = await self.wait_for_message('closing_signed', chan.channel_id)
their_fee = int.from_bytes(cs_payload['fee_satoshis'], 'big')
their_sig = cs_payload['signature']
if our_fee == their_fee: