electrum

Electrum Bitcoin wallet
git clone https://git.parazyd.org/electrum
Log | Files | Refs | Submodules

commit a40207cbbbee56c54875ebe3a27780878cbb9def
parent 50b4f785a9b6d212575767cbb050dd4318f9903b
Author: ThomasV <thomasv@electrum.org>
Date:   Mon,  4 Feb 2019 12:37:30 +0100

Refactor LNPeer in order to support HTLC forwarding:
 1. Do not perform channel updates in coroutines, because they would get executed in random order.
 2. After applying channel updates, wait only for the relevant commitment (local or remote) and not for both, because local and remote might be out of sync (BOLT 2).
 3. When waiting for a commitment, wait until a given ctn has been reached, because a queue cannot be shared by several coroutines

Diffstat:
Melectrum/lnchannel.py | 3+++
Melectrum/lnpeer.py | 116++++++++++++++++++++++++++++++++++++++++++++-----------------------------------
2 files changed, 67 insertions(+), 52 deletions(-)

diff --git a/electrum/lnchannel.py b/electrum/lnchannel.py @@ -594,6 +594,9 @@ class Channel(PrintError): feerate = self.constraints.feerate return self.make_commitment(subject, this_point, ctn, feerate, False) + def get_current_ctn(self, subject): + return self.config[subject].ctn + def total_msat(self, direction): assert type(direction) is Direction sub = LOCAL if direction == SENT else REMOTE diff --git a/electrum/lnpeer.py b/electrum/lnpeer.py @@ -68,8 +68,6 @@ class Peer(PrintError): self.channel_reestablished = defaultdict(asyncio.Future) self.funding_signed = defaultdict(asyncio.Queue) self.funding_created = defaultdict(asyncio.Queue) - self.revoke_and_ack = defaultdict(asyncio.Queue) - self.commitment_signed = defaultdict(asyncio.Queue) self.announcement_signatures = defaultdict(asyncio.Queue) self.closing_signed = defaultdict(asyncio.Queue) self.payment_preimages = defaultdict(asyncio.Queue) @@ -79,10 +77,11 @@ class Peer(PrintError): self.localfeatures |= LnLocalFeatures.OPTION_DATA_LOSS_PROTECT_REQ self.attempted_route = {} self.orphan_channel_updates = OrderedDict() + self.pending_updates = defaultdict(bool) def send_message(self, message_name: str, **kwargs): assert type(message_name) is str - self.print_error("Sending '%s'"%message_name.upper()) + #self.print_error("Sending '%s'"%message_name.upper()) self.transport.send_bytes(encode_msg(message_name, **kwargs)) async def initialize(self): @@ -108,7 +107,7 @@ class Peer(PrintError): try: f = getattr(self, 'on_' + message_type) except AttributeError: - self.print_error("Received '%s'" % message_type.upper(), payload) + #self.print_error("Received '%s'" % message_type.upper(), payload) return # raw message is needed to check signature if message_type=='node_announcement': @@ -122,7 +121,7 @@ class Peer(PrintError): self.print_error("error", payload["data"].decode("ascii")) chan_id = payload.get("channel_id") for d in [ self.channel_accepted, self.funding_signed, - self.funding_created, self.revoke_and_ack, self.commitment_signed, + self.funding_created, self.announcement_signatures, self.closing_signed ]: if chan_id in d: d[chan_id].put_nowait({'error':payload['data']}) @@ -749,8 +748,7 @@ class Peer(PrintError): return h, node_signature, bitcoin_signature - @log_exceptions - async def on_update_fail_htlc(self, payload): + def on_update_fail_htlc(self, payload): channel_id = payload["channel_id"] htlc_id = int.from_bytes(payload["id"], "big") key = (channel_id, htlc_id) @@ -762,7 +760,7 @@ class Peer(PrintError): self.print_error("UPDATE_FAIL_HTLC. cannot decode! attempted route is MISSING. {}".format(key)) else: try: - await self._handle_error_code_from_failed_htlc(payload["reason"], route, channel_id, htlc_id) + self._handle_error_code_from_failed_htlc(payload["reason"], route, channel_id, htlc_id) except Exception: # exceptions are suppressed as failing to handle an error code # should not block us from removing the htlc @@ -770,10 +768,15 @@ class Peer(PrintError): # process update_fail_htlc on channel chan = self.channels[channel_id] chan.receive_fail_htlc(htlc_id) - await self.receive_and_revoke(chan) + local_ctn = chan.get_current_ctn(LOCAL) + asyncio.ensure_future(self._on_update_fail_htlc(chan, htlc_id, local_ctn)) + + @log_exceptions + async def _on_update_fail_htlc(self, chan, htlc_id, local_ctn): + await self.await_local(chan, local_ctn) self.network.trigger_callback('ln_message', self.lnworker, 'Payment failed', htlc_id) - async def _handle_error_code_from_failed_htlc(self, error_reason, route: List['RouteEdge'], channel_id, htlc_id): + def _handle_error_code_from_failed_htlc(self, error_reason, route: List['RouteEdge'], channel_id, htlc_id): chan = self.channels[channel_id] failure_msg, sender_idx = decode_onion_error(error_reason, [x.node_id for x in route], @@ -814,23 +817,22 @@ class Peer(PrintError): else: self.network.path_finder.blacklist.add(short_chan_id) - def send_commitment(self, chan: Channel): + def maybe_send_commitment(self, chan: Channel): + if not self.pending_updates[chan]: + return + self.print_error('send_commitment') sig_64, htlc_sigs = chan.sign_next_commitment() self.send_message("commitment_signed", channel_id=chan.channel_id, signature=sig_64, num_htlcs=len(htlc_sigs), htlc_signature=b"".join(htlc_sigs)) - return len(htlc_sigs) + self.pending_updates[chan] = False - async def send_and_revoke(self, chan: Channel): - """ generic channel update flow """ - self.send_commitment(chan) - await self.receive_revoke_and_ack(chan) - await self.receive_commitment(chan) - self.send_revoke_and_ack(chan) + async def await_remote(self, chan: Channel, ctn: int): + self.maybe_send_commitment(chan) + while chan.get_current_ctn(REMOTE) <= ctn: + await asyncio.sleep(0.1) - async def receive_and_revoke(self, chan: Channel): - await self.receive_commitment(chan) - self.send_revoke_and_ack(chan) - self.send_commitment(chan) - await self.receive_revoke_and_ack(chan) + async def await_local(self, chan: Channel, ctn: int): + while chan.get_current_ctn(LOCAL) <= ctn: + await asyncio.sleep(0.1) async def pay(self, route: List['RouteEdge'], chan: Channel, amount_msat: int, payment_hash: bytes, min_final_cltv_expiry: int): @@ -845,6 +847,7 @@ class Peer(PrintError): # create htlc htlc = {'amount_msat':amount_msat, 'payment_hash':payment_hash, 'cltv_expiry':cltv} htlc_id = chan.add_htlc(htlc) + remote_ctn = chan.get_current_ctn(REMOTE) chan.onion_keys[htlc_id] = secret_key self.attempted_route[(chan.channel_id, htlc_id)] = route self.print_error(f"starting payment. route: {route}") @@ -855,14 +858,10 @@ class Peer(PrintError): amount_msat=amount_msat, payment_hash=payment_hash, onion_routing_packet=onion.to_bytes()) - await self.send_and_revoke(chan) + self.pending_updates[chan] = True + await self.await_remote(chan, remote_ctn) return UpdateAddHtlc(**htlc, htlc_id=htlc_id) - async def receive_revoke_and_ack(self, chan: Channel): - revoke_and_ack_msg = await self.revoke_and_ack[chan.channel_id].get() - chan.receive_revocation(RevokeAndAck(revoke_and_ack_msg["per_commitment_secret"], revoke_and_ack_msg["next_per_commitment_point"])) - self.lnworker.save_channel(chan) - def send_revoke_and_ack(self, chan: Channel): rev, _ = chan.revoke_current_commitment() self.lnworker.save_channel(chan) @@ -871,36 +870,34 @@ class Peer(PrintError): per_commitment_secret=rev.per_commitment_secret, next_per_commitment_point=rev.next_per_commitment_point) - async def receive_commitment(self, chan: Channel, commitment_signed_msg=None): - if commitment_signed_msg is None: - commitment_signed_msg = await self.commitment_signed[chan.channel_id].get() - data = commitment_signed_msg["htlc_signature"] - htlc_sigs = [data[i:i+64] for i in range(0, len(data), 64)] - chan.receive_new_commitment(commitment_signed_msg["signature"], htlc_sigs) - return len(htlc_sigs) - def on_commitment_signed(self, payload): - self.print_error("commitment_signed", payload) + self.print_error("on_commitment_signed") channel_id = payload['channel_id'] - self.commitment_signed[channel_id].put_nowait(payload) + chan = self.channels[channel_id] + data = payload["htlc_signature"] + htlc_sigs = [data[i:i+64] for i in range(0, len(data), 64)] + chan.receive_new_commitment(payload["signature"], htlc_sigs) + self.send_revoke_and_ack(chan) - @log_exceptions - async def on_update_fulfill_htlc(self, update_fulfill_htlc_msg): + def on_update_fulfill_htlc(self, update_fulfill_htlc_msg): self.print_error("update_fulfill") chan = self.channels[update_fulfill_htlc_msg["channel_id"]] preimage = update_fulfill_htlc_msg["payment_preimage"] htlc_id = int.from_bytes(update_fulfill_htlc_msg["id"], "big") chan.receive_htlc_settle(preimage, htlc_id) - await self.receive_and_revoke(chan) + local_ctn = chan.get_current_ctn(LOCAL) + asyncio.ensure_future(self._on_update_fulfill_htlc(chan, htlc_id, preimage, local_ctn)) + + @log_exceptions + async def _on_update_fulfill_htlc(self, chan, htlc_id, preimage, local_ctn): + await self.await_local(chan, local_ctn) self.network.trigger_callback('ln_message', self.lnworker, 'Payment sent', htlc_id) - # used in lightning-integration self.payment_preimages[sha256(preimage)].put_nowait(preimage) def on_update_fail_malformed_htlc(self, payload): self.print_error("error", payload["data"].decode("ascii")) - @log_exceptions - async def on_update_add_htlc(self, payload): + def on_update_add_htlc(self, payload): # no onion routing for the moment: we assume we are the end node self.print_error('on_update_add_htlc') # check if this in our list of requests @@ -919,7 +916,12 @@ class Peer(PrintError): # add htlc htlc = {'amount_msat': amount_msat_htlc, 'payment_hash':payment_hash, 'cltv_expiry':cltv_expiry} htlc_id = chan.receive_htlc(htlc) - await self.receive_and_revoke(chan) + local_ctn = chan.get_current_ctn(LOCAL) + asyncio.ensure_future(self._on_update_add_htlc(chan, local_ctn, htlc_id, htlc, payment_hash, cltv_expiry, amount_msat_htlc, processed_onion)) + + @log_exceptions + async def _on_update_add_htlc(self, chan, local_ctn, htlc_id, htlc, payment_hash, cltv_expiry, amount_msat_htlc, processed_onion): + await self.await_local(chan, local_ctn) # Forward HTLC # FIXME: this is not robust to us going offline before payment is fulfilled if not processed_onion.are_we_final: @@ -936,6 +938,7 @@ class Peer(PrintError): next_amount_msat_htlc = int.from_bytes(dph.amt_to_forward, 'big') next_htlc = {'amount_msat':next_amount_msat_htlc, 'payment_hash':payment_hash, 'cltv_expiry':next_cltv_expiry} next_htlc_id = next_chan.add_htlc(next_htlc) + next_remote_ctn = next_chan.get_current_ctn(REMOTE) next_peer.send_message( "update_add_htlc", channel_id=next_chan.channel_id, @@ -945,7 +948,8 @@ class Peer(PrintError): payment_hash=payment_hash, onion_routing_packet=processed_onion.next_packet.to_bytes() ) - await next_peer.send_and_revoke(next_chan) + next_peer.pending_updates[next_chan] = True + await next_peer.await_remote(next_chan, next_remote_ctn) # wait until we get paid preimage = await next_peer.payment_preimages[payment_hash].get() # fulfill the original htlc @@ -989,29 +993,35 @@ class Peer(PrintError): async def fulfill_htlc(self, chan: Channel, htlc_id: int, preimage: bytes): chan.settle_htlc(preimage, htlc_id) + remote_ctn = chan.get_current_ctn(REMOTE) self.send_message("update_fulfill_htlc", channel_id=chan.channel_id, id=htlc_id, payment_preimage=preimage) - await self.send_and_revoke(chan) + self.pending_updates[chan] = True + await self.await_remote(chan, remote_ctn) self.network.trigger_callback('ln_message', self.lnworker, 'Payment received', htlc_id) async def fail_htlc(self, chan: Channel, htlc_id: int, onion_packet: OnionPacket, reason: OnionRoutingFailureMessage): self.print_error(f"failing received htlc {(bh2u(chan.channel_id), htlc_id)}. reason: {reason}") chan.fail_htlc(htlc_id) + remote_ctn = chan.get_current_ctn(REMOTE) error_packet = construct_onion_error(reason, onion_packet, our_onion_private_key=self.privkey) self.send_message("update_fail_htlc", channel_id=chan.channel_id, id=htlc_id, len=len(error_packet), reason=error_packet) - await self.send_and_revoke(chan) + self.pending_updates[chan] = True + await self.await_remote(chan, remote_ctn) def on_revoke_and_ack(self, payload): - self.print_error("got revoke_and_ack") + self.print_error("on_revoke_and_ack") channel_id = payload["channel_id"] - self.revoke_and_ack[channel_id].put_nowait(payload) + chan = self.channels[channel_id] + chan.receive_revocation(RevokeAndAck(payload["per_commitment_secret"], payload["next_per_commitment_point"])) + self.lnworker.save_channel(chan) def on_update_fee(self, payload): channel_id = payload["channel_id"] @@ -1036,10 +1046,12 @@ class Peer(PrintError): else: return chan.update_fee(feerate_per_kw, True) + remote_ctn = chan.get_current_ctn(REMOTE) self.send_message("update_fee", channel_id=chan.channel_id, feerate_per_kw=feerate_per_kw) - await self.send_and_revoke(chan) + self.pending_updates[chan] = True + await self.await_remote(chan, remote_ctn) def on_closing_signed(self, payload): chan_id = payload["channel_id"]