electrum

Electrum Bitcoin wallet
git clone https://git.parazyd.org/electrum
Log | Files | Refs | Submodules

lnpeer.py (101473B)


      1 #!/usr/bin/env python3
      2 #
      3 # Copyright (C) 2018 The Electrum developers
      4 # Distributed under the MIT software license, see the accompanying
      5 # file LICENCE or http://www.opensource.org/licenses/mit-license.php
      6 
      7 import zlib
      8 from collections import OrderedDict, defaultdict
      9 import asyncio
     10 import os
     11 import time
     12 from typing import Tuple, Dict, TYPE_CHECKING, Optional, Union, Set
     13 from datetime import datetime
     14 import functools
     15 
     16 import aiorpcx
     17 from aiorpcx import TaskGroup
     18 
     19 from .crypto import sha256, sha256d
     20 from . import bitcoin, util
     21 from . import ecc
     22 from .ecc import sig_string_from_r_and_s, der_sig_from_sig_string
     23 from . import constants
     24 from .util import (bh2u, bfh, log_exceptions, ignore_exceptions, chunks, SilentTaskGroup,
     25                    UnrelatedTransactionException)
     26 from . import transaction
     27 from .transaction import PartialTxOutput, match_script_against_template
     28 from .logging import Logger
     29 from .lnonion import (new_onion_packet, OnionFailureCode, calc_hops_data_for_payment,
     30                       process_onion_packet, OnionPacket, construct_onion_error, OnionRoutingFailure,
     31                       ProcessedOnionPacket, UnsupportedOnionPacketVersion, InvalidOnionMac, InvalidOnionPubkey,
     32                       OnionFailureCodeMetaFlag)
     33 from .lnchannel import Channel, RevokeAndAck, RemoteCtnTooFarInFuture, ChannelState, PeerState
     34 from . import lnutil
     35 from .lnutil import (Outpoint, LocalConfig, RECEIVED, UpdateAddHtlc,
     36                      RemoteConfig, OnlyPubkeyKeypair, ChannelConstraints, RevocationStore,
     37                      funding_output_script, get_per_commitment_secret_from_seed,
     38                      secret_to_pubkey, PaymentFailure, LnFeatures,
     39                      LOCAL, REMOTE, HTLCOwner,
     40                      ln_compare_features, privkey_to_pubkey, MIN_FINAL_CLTV_EXPIRY_ACCEPTED,
     41                      LightningPeerConnectionClosed, HandshakeFailed,
     42                      RemoteMisbehaving, ShortChannelID,
     43                      IncompatibleLightningFeatures, derive_payment_secret_from_payment_preimage,
     44                      LN_MAX_FUNDING_SAT, calc_fees_for_commitment_tx,
     45                      UpfrontShutdownScriptViolation)
     46 from .lnutil import FeeUpdate, channel_id_from_funding_tx
     47 from .lntransport import LNTransport, LNTransportBase
     48 from .lnmsg import encode_msg, decode_msg
     49 from .interface import GracefulDisconnect
     50 from .lnrouter import fee_for_edge_msat
     51 from .lnutil import ln_dummy_address
     52 from .json_db import StoredDict
     53 from .invoices import PR_PAID
     54 
     55 if TYPE_CHECKING:
     56     from .lnworker import LNGossip, LNWallet
     57     from .lnrouter import LNPaymentRoute
     58     from .transaction import PartialTransaction
     59 
     60 
     61 LN_P2P_NETWORK_TIMEOUT = 20
     62 
     63 
     64 class Peer(Logger):
     65     LOGGING_SHORTCUT = 'P'
     66 
     67     def __init__(
     68             self,
     69             lnworker: Union['LNGossip', 'LNWallet'],
     70             pubkey: bytes,
     71             transport: LNTransportBase,
     72             *, is_channel_backup= False):
     73 
     74         self.is_channel_backup = is_channel_backup
     75         self._sent_init = False  # type: bool
     76         self._received_init = False  # type: bool
     77         self.initialized = asyncio.Future()
     78         self.got_disconnected = asyncio.Event()
     79         self.querying = asyncio.Event()
     80         self.transport = transport
     81         self.pubkey = pubkey  # remote pubkey
     82         self.lnworker = lnworker
     83         self.privkey = self.transport.privkey  # local privkey
     84         self.features = self.lnworker.features  # type: LnFeatures
     85         self.their_features = LnFeatures(0)  # type: LnFeatures
     86         self.node_ids = [self.pubkey, privkey_to_pubkey(self.privkey)]
     87         assert self.node_ids[0] != self.node_ids[1]
     88         self.network = lnworker.network
     89         self.ping_time = 0
     90         self.reply_channel_range = asyncio.Queue()
     91         # gossip uses a single queue to preserve message order
     92         self.gossip_queue = asyncio.Queue()
     93         self.ordered_messages = ['accept_channel', 'funding_signed', 'funding_created', 'accept_channel', 'channel_reestablish', 'closing_signed']
     94         self.ordered_message_queues = defaultdict(asyncio.Queue) # for messsage that are ordered
     95         self.temp_id_to_id = {}   # to forward error messages
     96         self.funding_created_sent = set() # for channels in PREOPENING
     97         self.funding_signed_sent = set()  # for channels in PREOPENING
     98         self.shutdown_received = {} # chan_id -> asyncio.Future()
     99         self.announcement_signatures = defaultdict(asyncio.Queue)
    100         self.orphan_channel_updates = OrderedDict()
    101         Logger.__init__(self)
    102         self.taskgroup = SilentTaskGroup()
    103         # HTLCs offered by REMOTE, that we started removing but are still active:
    104         self.received_htlcs_pending_removal = set()  # type: Set[Tuple[Channel, int]]
    105         self.received_htlc_removed_event = asyncio.Event()
    106         self._htlc_switch_iterstart_event = asyncio.Event()
    107         self._htlc_switch_iterdone_event = asyncio.Event()
    108 
    109     def send_message(self, message_name: str, **kwargs):
    110         assert type(message_name) is str
    111         self.logger.debug(f"Sending {message_name.upper()}")
    112         if message_name.upper() != "INIT" and not self.is_initialized():
    113             raise Exception("tried to send message before we are initialized")
    114         raw_msg = encode_msg(message_name, **kwargs)
    115         self._store_raw_msg_if_local_update(raw_msg, message_name=message_name, channel_id=kwargs.get("channel_id"))
    116         self.transport.send_bytes(raw_msg)
    117 
    118     def _store_raw_msg_if_local_update(self, raw_msg: bytes, *, message_name: str, channel_id: Optional[bytes]):
    119         is_commitment_signed = message_name == "commitment_signed"
    120         if not (message_name.startswith("update_") or is_commitment_signed):
    121             return
    122         assert channel_id
    123         chan = self.get_channel_by_id(channel_id)
    124         if not chan:
    125             raise Exception(f"channel {channel_id.hex()} not found for peer {self.pubkey.hex()}")
    126         chan.hm.store_local_update_raw_msg(raw_msg, is_commitment_signed=is_commitment_signed)
    127         if is_commitment_signed:
    128             # saving now, to ensure replaying updates works (in case of channel reestablishment)
    129             self.lnworker.save_channel(chan)
    130 
    131     def maybe_set_initialized(self):
    132         if self.initialized.done():
    133             return
    134         if self._sent_init and self._received_init:
    135             self.initialized.set_result(True)
    136 
    137     def is_initialized(self) -> bool:
    138         return (self.initialized.done()
    139                 and not self.initialized.cancelled()
    140                 and self.initialized.exception() is None
    141                 and self.initialized.result() is True)
    142 
    143     async def initialize(self):
    144         if isinstance(self.transport, LNTransport):
    145             await self.transport.handshake()
    146         features = self.features.for_init_message()
    147         b = int.bit_length(features)
    148         flen = b // 8 + int(bool(b % 8))
    149         self.send_message(
    150             "init", gflen=0, flen=flen,
    151             features=features,
    152             init_tlvs={
    153                 'networks':
    154                 {'chains': constants.net.rev_genesis_bytes()}
    155             })
    156         self._sent_init = True
    157         self.maybe_set_initialized()
    158 
    159     @property
    160     def channels(self) -> Dict[bytes, Channel]:
    161         return self.lnworker.channels_for_peer(self.pubkey)
    162 
    163     def get_channel_by_id(self, channel_id: bytes) -> Optional[Channel]:
    164         # note: this is faster than self.channels.get(channel_id)
    165         chan = self.lnworker.get_channel_by_id(channel_id)
    166         if not chan:
    167             return None
    168         if chan.node_id != self.pubkey:
    169             return None
    170         return chan
    171 
    172     def diagnostic_name(self):
    173         return self.lnworker.__class__.__name__ + ', ' + self.transport.name()
    174 
    175     def ping_if_required(self):
    176         if time.time() - self.ping_time > 120:
    177             self.send_message('ping', num_pong_bytes=4, byteslen=4)
    178             self.ping_time = time.time()
    179 
    180     def process_message(self, message):
    181         message_type, payload = decode_msg(message)
    182         # only process INIT if we are a backup
    183         if self.is_channel_backup is True and message_type != 'init':
    184             return
    185         if message_type in self.ordered_messages:
    186             chan_id = payload.get('channel_id') or payload["temporary_channel_id"]
    187             self.ordered_message_queues[chan_id].put_nowait((message_type, payload))
    188         else:
    189             if message_type != 'error' and 'channel_id' in payload:
    190                 chan = self.get_channel_by_id(payload['channel_id'])
    191                 if chan is None:
    192                     raise Exception('Got unknown '+ message_type)
    193                 args = (chan, payload)
    194             else:
    195                 args = (payload,)
    196             try:
    197                 f = getattr(self, 'on_' + message_type)
    198             except AttributeError:
    199                 #self.logger.info("Received '%s'" % message_type.upper(), payload)
    200                 return
    201             # raw message is needed to check signature
    202             if message_type in ['node_announcement', 'channel_announcement', 'channel_update']:
    203                 payload['raw'] = message
    204             execution_result = f(*args)
    205             if asyncio.iscoroutinefunction(f):
    206                 asyncio.ensure_future(self.taskgroup.spawn(execution_result))
    207 
    208     def on_error(self, payload):
    209         self.logger.info(f"remote peer sent error [DO NOT TRUST THIS MESSAGE]: {payload['data'].decode('ascii')}")
    210         chan_id = payload.get("channel_id")
    211         if chan_id in self.temp_id_to_id:
    212             chan_id = self.temp_id_to_id[chan_id]
    213         self.ordered_message_queues[chan_id].put_nowait((None, {'error':payload['data']}))
    214 
    215     def on_ping(self, payload):
    216         l = payload['num_pong_bytes']
    217         self.send_message('pong', byteslen=l)
    218 
    219     def on_pong(self, payload):
    220         pass
    221 
    222     async def wait_for_message(self, expected_name, channel_id):
    223         q = self.ordered_message_queues[channel_id]
    224         name, payload = await asyncio.wait_for(q.get(), LN_P2P_NETWORK_TIMEOUT)
    225         if payload.get('error'):
    226             raise Exception('Remote peer reported error [DO NOT TRUST THIS MESSAGE]: ' + repr(payload.get('error')))
    227         if name != expected_name:
    228             raise Exception(f"Received unexpected '{name}'")
    229         return payload
    230 
    231     def on_init(self, payload):
    232         if self._received_init:
    233             self.logger.info("ALREADY INITIALIZED BUT RECEIVED INIT")
    234             return
    235         self.their_features = LnFeatures(int.from_bytes(payload['features'], byteorder="big"))
    236         their_globalfeatures = int.from_bytes(payload['globalfeatures'], byteorder="big")
    237         self.their_features |= their_globalfeatures
    238         # check transitive dependencies for received features
    239         if not self.their_features.validate_transitive_dependencies():
    240             raise GracefulDisconnect("remote did not set all dependencies for the features they sent")
    241         # check if features are compatible, and set self.features to what we negotiated
    242         try:
    243             self.features = ln_compare_features(self.features, self.their_features)
    244         except IncompatibleLightningFeatures as e:
    245             self.initialized.set_exception(e)
    246             raise GracefulDisconnect(f"{str(e)}")
    247         # check that they are on the same chain as us, if provided
    248         their_networks = payload["init_tlvs"].get("networks")
    249         if their_networks:
    250             their_chains = list(chunks(their_networks["chains"], 32))
    251             if constants.net.rev_genesis_bytes() not in their_chains:
    252                 raise GracefulDisconnect(f"no common chain found with remote. (they sent: {their_chains})")
    253         # all checks passed
    254         self.lnworker.on_peer_successfully_established(self)
    255         self._received_init = True
    256         self.maybe_set_initialized()
    257 
    258     def on_node_announcement(self, payload):
    259         if self.lnworker.channel_db:
    260             self.gossip_queue.put_nowait(('node_announcement', payload))
    261 
    262     def on_channel_announcement(self, payload):
    263         if self.lnworker.channel_db:
    264             self.gossip_queue.put_nowait(('channel_announcement', payload))
    265 
    266     def on_channel_update(self, payload):
    267         self.maybe_save_remote_update(payload)
    268         if self.lnworker.channel_db:
    269             self.gossip_queue.put_nowait(('channel_update', payload))
    270 
    271     def maybe_save_remote_update(self, payload):
    272         if not self.channels:
    273             return
    274         for chan in self.channels.values():
    275             if chan.short_channel_id == payload['short_channel_id']:
    276                 chan.set_remote_update(payload['raw'])
    277                 self.logger.info("saved remote_update")
    278                 break
    279         else:
    280             # Save (some bounded number of) orphan channel updates for later
    281             # as it might be for our own direct channel with this peer
    282             # (and we might not yet know the short channel id for that)
    283             # Background: this code is here to deal with a bug in LND,
    284             # see https://github.com/lightningnetwork/lnd/issues/3651
    285             # and https://github.com/lightningnetwork/lightning-rfc/pull/657
    286             # This code assumes gossip_queries is set. BOLT7: "if the
    287             # gossip_queries feature is negotiated, [a node] MUST NOT
    288             # send gossip it did not generate itself"
    289             short_channel_id = ShortChannelID(payload['short_channel_id'])
    290             self.logger.info(f'received orphan channel update {short_channel_id}')
    291             self.orphan_channel_updates[short_channel_id] = payload
    292             while len(self.orphan_channel_updates) > 25:
    293                 self.orphan_channel_updates.popitem(last=False)
    294 
    295     def on_announcement_signatures(self, chan: Channel, payload):
    296         if chan.config[LOCAL].was_announced:
    297             h, local_node_sig, local_bitcoin_sig = self.send_announcement_signatures(chan)
    298         else:
    299             self.announcement_signatures[chan.channel_id].put_nowait(payload)
    300 
    301     def handle_disconnect(func):
    302         @functools.wraps(func)
    303         async def wrapper_func(self, *args, **kwargs):
    304             try:
    305                 return await func(self, *args, **kwargs)
    306             except GracefulDisconnect as e:
    307                 self.logger.log(e.log_level, f"Disconnecting: {repr(e)}")
    308             except (LightningPeerConnectionClosed, IncompatibleLightningFeatures,
    309                     aiorpcx.socks.SOCKSError) as e:
    310                 self.logger.info(f"Disconnecting: {repr(e)}")
    311             finally:
    312                 self.close_and_cleanup()
    313         return wrapper_func
    314 
    315     @ignore_exceptions  # do not kill outer taskgroup
    316     @log_exceptions
    317     @handle_disconnect
    318     async def main_loop(self):
    319         async with self.taskgroup as group:
    320             await group.spawn(self._message_loop())
    321             await group.spawn(self.htlc_switch())
    322             await group.spawn(self.query_gossip())
    323             await group.spawn(self.process_gossip())
    324 
    325     async def process_gossip(self):
    326         while True:
    327             await asyncio.sleep(5)
    328             if not self.network.lngossip:
    329                 continue
    330             chan_anns = []
    331             chan_upds = []
    332             node_anns = []
    333             while True:
    334                 name, payload = await self.gossip_queue.get()
    335                 if name == 'channel_announcement':
    336                     chan_anns.append(payload)
    337                 elif name == 'channel_update':
    338                     chan_upds.append(payload)
    339                 elif name == 'node_announcement':
    340                     node_anns.append(payload)
    341                 else:
    342                     raise Exception('unknown message')
    343                 if self.gossip_queue.empty():
    344                     break
    345             # verify in peer's TaskGroup so that we fail the connection
    346             self.verify_channel_announcements(chan_anns)
    347             self.verify_node_announcements(node_anns)
    348             if self.network.lngossip:
    349                 await self.network.lngossip.process_gossip(chan_anns, node_anns, chan_upds)
    350 
    351     def verify_channel_announcements(self, chan_anns):
    352         for payload in chan_anns:
    353             h = sha256d(payload['raw'][2+256:])
    354             pubkeys = [payload['node_id_1'], payload['node_id_2'], payload['bitcoin_key_1'], payload['bitcoin_key_2']]
    355             sigs = [payload['node_signature_1'], payload['node_signature_2'], payload['bitcoin_signature_1'], payload['bitcoin_signature_2']]
    356             for pubkey, sig in zip(pubkeys, sigs):
    357                 if not ecc.verify_signature(pubkey, sig, h):
    358                     raise Exception('signature failed')
    359 
    360     def verify_node_announcements(self, node_anns):
    361         for payload in node_anns:
    362             pubkey = payload['node_id']
    363             signature = payload['signature']
    364             h = sha256d(payload['raw'][66:])
    365             if not ecc.verify_signature(pubkey, signature, h):
    366                 raise Exception('signature failed')
    367 
    368     async def query_gossip(self):
    369         try:
    370             await asyncio.wait_for(self.initialized, LN_P2P_NETWORK_TIMEOUT)
    371         except Exception as e:
    372             raise GracefulDisconnect(f"Failed to initialize: {e!r}") from e
    373         if self.lnworker == self.lnworker.network.lngossip:
    374             try:
    375                 ids, complete = await asyncio.wait_for(self.get_channel_range(), LN_P2P_NETWORK_TIMEOUT)
    376             except asyncio.TimeoutError as e:
    377                 raise GracefulDisconnect("query_channel_range timed out") from e
    378             self.logger.info('Received {} channel ids. (complete: {})'.format(len(ids), complete))
    379             await self.lnworker.add_new_ids(ids)
    380             while True:
    381                 todo = self.lnworker.get_ids_to_query()
    382                 if not todo:
    383                     await asyncio.sleep(1)
    384                     continue
    385                 await self.get_short_channel_ids(todo)
    386 
    387     async def get_channel_range(self):
    388         first_block = constants.net.BLOCK_HEIGHT_FIRST_LIGHTNING_CHANNELS
    389         num_blocks = self.lnworker.network.get_local_height() - first_block
    390         self.query_channel_range(first_block, num_blocks)
    391         intervals = []
    392         ids = set()
    393         # note: implementations behave differently...
    394         # "sane implementation that follows BOLT-07" example:
    395         #   query_channel_range. <<< first_block 497000, num_blocks 79038
    396         #   on_reply_channel_range. >>> first_block 497000, num_blocks 39516, num_ids 4648, complete True
    397         #   on_reply_channel_range. >>> first_block 536516, num_blocks 19758, num_ids 5734, complete True
    398         #   on_reply_channel_range. >>> first_block 556274, num_blocks 9879, num_ids 13712, complete True
    399         #   on_reply_channel_range. >>> first_block 566153, num_blocks 9885, num_ids 18114, complete True
    400         # lnd example:
    401         #   query_channel_range. <<< first_block 497000, num_blocks 79038
    402         #   on_reply_channel_range. >>> first_block 497000, num_blocks 79038, num_ids 8000, complete False
    403         #   on_reply_channel_range. >>> first_block 497000, num_blocks 79038, num_ids 8000, complete False
    404         #   on_reply_channel_range. >>> first_block 497000, num_blocks 79038, num_ids 8000, complete False
    405         #   on_reply_channel_range. >>> first_block 497000, num_blocks 79038, num_ids 8000, complete False
    406         #   on_reply_channel_range. >>> first_block 497000, num_blocks 79038, num_ids 5344, complete True
    407         while True:
    408             index, num, complete, _ids = await self.reply_channel_range.get()
    409             ids.update(_ids)
    410             intervals.append((index, index+num))
    411             intervals.sort()
    412             while len(intervals) > 1:
    413                 a,b = intervals[0]
    414                 c,d = intervals[1]
    415                 if not (a <= c and a <= b and c <= d):
    416                     raise Exception(f"insane reply_channel_range intervals {(a,b,c,d)}")
    417                 if b >= c:
    418                     intervals = [(a,d)] + intervals[2:]
    419                 else:
    420                     break
    421             if len(intervals) == 1 and complete:
    422                 a, b = intervals[0]
    423                 if a <= first_block and b >= first_block + num_blocks:
    424                     break
    425         return ids, complete
    426 
    427     def request_gossip(self, timestamp=0):
    428         if timestamp == 0:
    429             self.logger.info('requesting whole channel graph')
    430         else:
    431             self.logger.info(f'requesting channel graph since {datetime.fromtimestamp(timestamp).ctime()}')
    432         self.send_message(
    433             'gossip_timestamp_filter',
    434             chain_hash=constants.net.rev_genesis_bytes(),
    435             first_timestamp=timestamp,
    436             timestamp_range=b'\xff'*4)
    437 
    438     def query_channel_range(self, first_block, num_blocks):
    439         self.logger.info(f'query channel range {first_block} {num_blocks}')
    440         self.send_message(
    441             'query_channel_range',
    442             chain_hash=constants.net.rev_genesis_bytes(),
    443             first_blocknum=first_block,
    444             number_of_blocks=num_blocks)
    445 
    446     def decode_short_ids(self, encoded):
    447         if encoded[0] == 0:
    448             decoded = encoded[1:]
    449         elif encoded[0] == 1:
    450             decoded = zlib.decompress(encoded[1:])
    451         else:
    452             raise Exception(f'decode_short_ids: unexpected first byte: {encoded[0]}')
    453         ids = [decoded[i:i+8] for i in range(0, len(decoded), 8)]
    454         return ids
    455 
    456     def on_reply_channel_range(self, payload):
    457         first = payload['first_blocknum']
    458         num = payload['number_of_blocks']
    459         complete = bool(int.from_bytes(payload['complete'], 'big'))
    460         encoded = payload['encoded_short_ids']
    461         ids = self.decode_short_ids(encoded)
    462         #self.logger.info(f"on_reply_channel_range. >>> first_block {first}, num_blocks {num}, num_ids {len(ids)}, complete {repr(payload['complete'])}")
    463         self.reply_channel_range.put_nowait((first, num, complete, ids))
    464 
    465     async def get_short_channel_ids(self, ids):
    466         self.logger.info(f'Querying {len(ids)} short_channel_ids')
    467         assert not self.querying.is_set()
    468         self.query_short_channel_ids(ids)
    469         await self.querying.wait()
    470         self.querying.clear()
    471 
    472     def query_short_channel_ids(self, ids, compressed=True):
    473         ids = sorted(ids)
    474         s = b''.join(ids)
    475         encoded = zlib.compress(s) if compressed else s
    476         prefix = b'\x01' if compressed else b'\x00'
    477         self.send_message(
    478             'query_short_channel_ids',
    479             chain_hash=constants.net.rev_genesis_bytes(),
    480             len=1+len(encoded),
    481             encoded_short_ids=prefix+encoded)
    482 
    483     async def _message_loop(self):
    484         try:
    485             await asyncio.wait_for(self.initialize(), LN_P2P_NETWORK_TIMEOUT)
    486         except (OSError, asyncio.TimeoutError, HandshakeFailed) as e:
    487             raise GracefulDisconnect(f'initialize failed: {repr(e)}') from e
    488         async for msg in self.transport.read_messages():
    489             self.process_message(msg)
    490             await asyncio.sleep(.01)
    491 
    492     def on_reply_short_channel_ids_end(self, payload):
    493         self.querying.set()
    494 
    495     def close_and_cleanup(self):
    496         try:
    497             if self.transport:
    498                 self.transport.close()
    499         except:
    500             pass
    501         self.lnworker.peer_closed(self)
    502         self.got_disconnected.set()
    503 
    504     def is_static_remotekey(self):
    505         return self.features.supports(LnFeatures.OPTION_STATIC_REMOTEKEY_OPT)
    506 
    507     def is_upfront_shutdown_script(self):
    508         return self.features.supports(LnFeatures.OPTION_UPFRONT_SHUTDOWN_SCRIPT_OPT)
    509 
    510     def upfront_shutdown_script_from_payload(self, payload, msg_identifier: str) -> Optional[bytes]:
    511         if msg_identifier not in ['accept', 'open']:
    512             raise ValueError("msg_identifier must be either 'accept' or 'open'")
    513 
    514         uss_tlv = payload[msg_identifier + '_channel_tlvs'].get(
    515             'upfront_shutdown_script')
    516 
    517         if uss_tlv and self.is_upfront_shutdown_script():
    518             upfront_shutdown_script = uss_tlv['shutdown_scriptpubkey']
    519         else:
    520             upfront_shutdown_script = b''
    521         self.logger.info(f"upfront shutdown script received: {upfront_shutdown_script}")
    522         return upfront_shutdown_script
    523 
    524     def make_local_config(self, funding_sat: int, push_msat: int, initiator: HTLCOwner) -> LocalConfig:
    525         channel_seed = os.urandom(32)
    526         initial_msat = funding_sat * 1000 - push_msat if initiator == LOCAL else push_msat
    527 
    528         static_remotekey = None
    529         # sending empty bytes as the upfront_shutdown_script will give us the
    530         # flexibility to decide an address at closing time
    531         upfront_shutdown_script = b''
    532 
    533         if self.is_static_remotekey():
    534             wallet = self.lnworker.wallet
    535             assert wallet.txin_type == 'p2wpkh'
    536             addr = wallet.get_new_sweep_address_for_channel()
    537             static_remotekey = bfh(wallet.get_public_key(addr))
    538         else:
    539             static_remotekey = None
    540         dust_limit_sat = bitcoin.DUST_LIMIT_DEFAULT_SAT_LEGACY
    541         reserve_sat = max(funding_sat // 100, dust_limit_sat)
    542         # for comparison of defaults, see
    543         # https://github.com/ACINQ/eclair/blob/afa378fbb73c265da44856b4ad0f2128a88ae6c6/eclair-core/src/main/resources/reference.conf#L66
    544         # https://github.com/ElementsProject/lightning/blob/0056dd75572a8857cff36fcbdb1a2295a1ac9253/lightningd/options.c#L657
    545         # https://github.com/lightningnetwork/lnd/blob/56b61078c5b2be007d318673a5f3b40c6346883a/config.go#L81
    546         local_config = LocalConfig.from_seed(
    547             channel_seed=channel_seed,
    548             static_remotekey=static_remotekey,
    549             upfront_shutdown_script=upfront_shutdown_script,
    550             to_self_delay=self.network.config.get('lightning_to_self_delay', 7 * 144),
    551             dust_limit_sat=dust_limit_sat,
    552             max_htlc_value_in_flight_msat=funding_sat * 1000,
    553             max_accepted_htlcs=30,
    554             initial_msat=initial_msat,
    555             reserve_sat=reserve_sat,
    556             funding_locked_received=False,
    557             was_announced=False,
    558             current_commitment_signature=None,
    559             current_htlc_signatures=b'',
    560             htlc_minimum_msat=1,
    561         )
    562         local_config.validate_params(funding_sat=funding_sat)
    563         return local_config
    564 
    565     def temporarily_reserve_funding_tx_change_address(func):
    566         # During the channel open flow, if we initiated, we might have used a change address
    567         # of ours in the funding tx. The funding tx is not part of the wallet history
    568         # at that point yet, but we should already consider this change address as 'used'.
    569         @functools.wraps(func)
    570         async def wrapper(self: 'Peer', *args, **kwargs):
    571             funding_tx = kwargs['funding_tx']  # type: PartialTransaction
    572             wallet = self.lnworker.wallet
    573             change_addresses = [txout.address for txout in funding_tx.outputs()
    574                                 if wallet.is_change(txout.address)]
    575             for addr in change_addresses:
    576                 wallet.set_reserved_state_of_address(addr, reserved=True)
    577             try:
    578                 return await func(self, *args, **kwargs)
    579             finally:
    580                 for addr in change_addresses:
    581                     self.lnworker.wallet.set_reserved_state_of_address(addr, reserved=False)
    582         return wrapper
    583 
    584     @log_exceptions
    585     @temporarily_reserve_funding_tx_change_address
    586     async def channel_establishment_flow(
    587             self, *,
    588             funding_tx: 'PartialTransaction',
    589             funding_sat: int,
    590             push_msat: int,
    591             temp_channel_id: bytes
    592     ) -> Tuple[Channel, 'PartialTransaction']:
    593         """Implements the channel opening flow.
    594 
    595         -> open_channel message
    596         <- accept_channel message
    597         -> funding_created message
    598         <- funding_signed message
    599 
    600         Channel configurations are initialized in this method.
    601         """
    602         # will raise if init fails
    603         await asyncio.wait_for(self.initialized, LN_P2P_NETWORK_TIMEOUT)
    604         # trampoline is not yet in features
    605         if not self.lnworker.channel_db and not self.lnworker.is_trampoline_peer(self.pubkey):
    606             raise Exception('Not a trampoline node: ' + str(self.their_features))
    607 
    608         feerate = self.lnworker.current_feerate_per_kw()
    609         local_config = self.make_local_config(funding_sat, push_msat, LOCAL)
    610 
    611         if funding_sat > LN_MAX_FUNDING_SAT:
    612             raise Exception(
    613                 f"MUST set funding_satoshis to less than 2^24 satoshi. "
    614                 f"{funding_sat} sat > {LN_MAX_FUNDING_SAT}")
    615         if push_msat > 1000 * funding_sat:
    616             raise Exception(
    617                 f"MUST set push_msat to equal or less than 1000 * funding_satoshis: "
    618                 f"{push_msat} msat > {1000 * funding_sat} msat")
    619         if funding_sat < lnutil.MIN_FUNDING_SAT:
    620             raise Exception(f"funding_sat too low: {funding_sat} < {lnutil.MIN_FUNDING_SAT}")
    621 
    622         # for the first commitment transaction
    623         per_commitment_secret_first = get_per_commitment_secret_from_seed(
    624             local_config.per_commitment_secret_seed,
    625             RevocationStore.START_INDEX
    626         )
    627         per_commitment_point_first = secret_to_pubkey(
    628             int.from_bytes(per_commitment_secret_first, 'big'))
    629         self.send_message(
    630             "open_channel",
    631             temporary_channel_id=temp_channel_id,
    632             chain_hash=constants.net.rev_genesis_bytes(),
    633             funding_satoshis=funding_sat,
    634             push_msat=push_msat,
    635             dust_limit_satoshis=local_config.dust_limit_sat,
    636             feerate_per_kw=feerate,
    637             max_accepted_htlcs=local_config.max_accepted_htlcs,
    638             funding_pubkey=local_config.multisig_key.pubkey,
    639             revocation_basepoint=local_config.revocation_basepoint.pubkey,
    640             htlc_basepoint=local_config.htlc_basepoint.pubkey,
    641             payment_basepoint=local_config.payment_basepoint.pubkey,
    642             delayed_payment_basepoint=local_config.delayed_basepoint.pubkey,
    643             first_per_commitment_point=per_commitment_point_first,
    644             to_self_delay=local_config.to_self_delay,
    645             max_htlc_value_in_flight_msat=local_config.max_htlc_value_in_flight_msat,
    646             channel_flags=0x00,  # not willing to announce channel
    647             channel_reserve_satoshis=local_config.reserve_sat,
    648             htlc_minimum_msat=local_config.htlc_minimum_msat,
    649             open_channel_tlvs={
    650                 'upfront_shutdown_script':
    651                     {'shutdown_scriptpubkey': local_config.upfront_shutdown_script}
    652             }
    653         )
    654 
    655         # <- accept_channel
    656         payload = await self.wait_for_message('accept_channel', temp_channel_id)
    657         remote_per_commitment_point = payload['first_per_commitment_point']
    658         funding_txn_minimum_depth = payload['minimum_depth']
    659         if funding_txn_minimum_depth <= 0:
    660             raise Exception(f"minimum depth too low, {funding_txn_minimum_depth}")
    661         if funding_txn_minimum_depth > 30:
    662             raise Exception(f"minimum depth too high, {funding_txn_minimum_depth}")
    663 
    664         upfront_shutdown_script = self.upfront_shutdown_script_from_payload(
    665             payload, 'accept')
    666 
    667         remote_config = RemoteConfig(
    668             payment_basepoint=OnlyPubkeyKeypair(payload['payment_basepoint']),
    669             multisig_key=OnlyPubkeyKeypair(payload["funding_pubkey"]),
    670             htlc_basepoint=OnlyPubkeyKeypair(payload['htlc_basepoint']),
    671             delayed_basepoint=OnlyPubkeyKeypair(payload['delayed_payment_basepoint']),
    672             revocation_basepoint=OnlyPubkeyKeypair(payload['revocation_basepoint']),
    673             to_self_delay=payload['to_self_delay'],
    674             dust_limit_sat=payload['dust_limit_satoshis'],
    675             max_htlc_value_in_flight_msat=payload['max_htlc_value_in_flight_msat'],
    676             max_accepted_htlcs=payload["max_accepted_htlcs"],
    677             initial_msat=push_msat,
    678             reserve_sat=payload["channel_reserve_satoshis"],
    679             htlc_minimum_msat=payload['htlc_minimum_msat'],
    680             next_per_commitment_point=remote_per_commitment_point,
    681             current_per_commitment_point=None,
    682             upfront_shutdown_script=upfront_shutdown_script
    683         )
    684         remote_config.validate_params(funding_sat=funding_sat)
    685         # if channel_reserve_satoshis is less than dust_limit_satoshis within the open_channel message:
    686         #     MUST reject the channel.
    687         if remote_config.reserve_sat < local_config.dust_limit_sat:
    688             raise Exception("violated constraint: remote_config.reserve_sat < local_config.dust_limit_sat")
    689         # if channel_reserve_satoshis from the open_channel message is less than dust_limit_satoshis:
    690         #     MUST reject the channel.
    691         if local_config.reserve_sat < remote_config.dust_limit_sat:
    692             raise Exception("violated constraint: local_config.reserve_sat < remote_config.dust_limit_sat")
    693 
    694         # -> funding created
    695         # replace dummy output in funding tx
    696         redeem_script = funding_output_script(local_config, remote_config)
    697         funding_address = bitcoin.redeem_script_to_address('p2wsh', redeem_script)
    698         funding_output = PartialTxOutput.from_address_and_value(funding_address, funding_sat)
    699         dummy_output = PartialTxOutput.from_address_and_value(ln_dummy_address(), funding_sat)
    700         funding_tx.outputs().remove(dummy_output)
    701         funding_tx.add_outputs([funding_output])
    702         funding_tx.set_rbf(False)
    703         if not funding_tx.is_segwit():
    704             raise Exception('Funding transaction is not segwit')
    705         funding_txid = funding_tx.txid()
    706         assert funding_txid
    707         funding_index = funding_tx.outputs().index(funding_output)
    708         # build remote commitment transaction
    709         channel_id, funding_txid_bytes = channel_id_from_funding_tx(funding_txid, funding_index)
    710         outpoint = Outpoint(funding_txid, funding_index)
    711         constraints = ChannelConstraints(
    712             capacity=funding_sat,
    713             is_initiator=True,
    714             funding_txn_minimum_depth=funding_txn_minimum_depth
    715         )
    716         chan_dict = self.create_channel_storage(
    717             channel_id, outpoint, local_config, remote_config, constraints)
    718         chan = Channel(
    719             chan_dict,
    720             sweep_address=self.lnworker.sweep_address,
    721             lnworker=self.lnworker,
    722             initial_feerate=feerate
    723         )
    724         chan.storage['funding_inputs'] = [txin.prevout.to_json() for txin in funding_tx.inputs()]
    725         if isinstance(self.transport, LNTransport):
    726             chan.add_or_update_peer_addr(self.transport.peer_addr)
    727         sig_64, _ = chan.sign_next_commitment()
    728         self.temp_id_to_id[temp_channel_id] = channel_id
    729 
    730         self.send_message("funding_created",
    731             temporary_channel_id=temp_channel_id,
    732             funding_txid=funding_txid_bytes,
    733             funding_output_index=funding_index,
    734             signature=sig_64)
    735         self.funding_created_sent.add(channel_id)
    736 
    737         # <- funding signed
    738         payload = await self.wait_for_message('funding_signed', channel_id)
    739         self.logger.info('received funding_signed')
    740         remote_sig = payload['signature']
    741         chan.receive_new_commitment(remote_sig, [])
    742         chan.open_with_first_pcp(remote_per_commitment_point, remote_sig)
    743         chan.set_state(ChannelState.OPENING)
    744         self.lnworker.add_new_channel(chan)
    745         return chan, funding_tx
    746 
    747     def create_channel_storage(self, channel_id, outpoint, local_config, remote_config, constraints):
    748         chan_dict = {
    749             "node_id": self.pubkey.hex(),
    750             "channel_id": channel_id.hex(),
    751             "short_channel_id": None,
    752             "funding_outpoint": outpoint,
    753             "remote_config": remote_config,
    754             "local_config": local_config,
    755             "constraints": constraints,
    756             "remote_update": None,
    757             "state": ChannelState.PREOPENING.name,
    758             'onion_keys': {},
    759             'data_loss_protect_remote_pcp': {},
    760             "log": {},
    761             "revocation_store": {},
    762             "static_remotekey_enabled": self.is_static_remotekey(), # stored because it cannot be "downgraded", per BOLT2
    763         }
    764         return StoredDict(chan_dict, self.lnworker.db if self.lnworker else None, [])
    765 
    766     async def on_open_channel(self, payload):
    767         """Implements the channel acceptance flow.
    768 
    769         <- open_channel message
    770         -> accept_channel message
    771         <- funding_created message
    772         -> funding_signed message
    773 
    774         Channel configurations are initialized in this method.
    775         """
    776         # <- open_channel
    777         if payload['chain_hash'] != constants.net.rev_genesis_bytes():
    778             raise Exception('wrong chain_hash')
    779         funding_sat = payload['funding_satoshis']
    780         push_msat = payload['push_msat']
    781         feerate = payload['feerate_per_kw']  # note: we are not validating this
    782         temp_chan_id = payload['temporary_channel_id']
    783         local_config = self.make_local_config(funding_sat, push_msat, REMOTE)
    784         if funding_sat > LN_MAX_FUNDING_SAT:
    785             raise Exception(
    786                 f"MUST set funding_satoshis to less than 2^24 satoshi. "
    787                 f"{funding_sat} sat > {LN_MAX_FUNDING_SAT}")
    788         if push_msat > 1000 * funding_sat:
    789             raise Exception(
    790                 f"MUST set push_msat to equal or less than 1000 * funding_satoshis: "
    791                 f"{push_msat} msat > {1000 * funding_sat} msat")
    792         if funding_sat < lnutil.MIN_FUNDING_SAT:
    793             raise Exception(f"funding_sat too low: {funding_sat} < {lnutil.MIN_FUNDING_SAT}")
    794 
    795         upfront_shutdown_script = self.upfront_shutdown_script_from_payload(
    796             payload, 'open')
    797 
    798         remote_config = RemoteConfig(
    799             payment_basepoint=OnlyPubkeyKeypair(payload['payment_basepoint']),
    800             multisig_key=OnlyPubkeyKeypair(payload['funding_pubkey']),
    801             htlc_basepoint=OnlyPubkeyKeypair(payload['htlc_basepoint']),
    802             delayed_basepoint=OnlyPubkeyKeypair(payload['delayed_payment_basepoint']),
    803             revocation_basepoint=OnlyPubkeyKeypair(payload['revocation_basepoint']),
    804             to_self_delay=payload['to_self_delay'],
    805             dust_limit_sat=payload['dust_limit_satoshis'],
    806             max_htlc_value_in_flight_msat=payload['max_htlc_value_in_flight_msat'],
    807             max_accepted_htlcs=payload['max_accepted_htlcs'],
    808             initial_msat=funding_sat * 1000 - push_msat,
    809             reserve_sat=payload['channel_reserve_satoshis'],
    810             htlc_minimum_msat=payload['htlc_minimum_msat'],
    811             next_per_commitment_point=payload['first_per_commitment_point'],
    812             current_per_commitment_point=None,
    813             upfront_shutdown_script=upfront_shutdown_script,
    814         )
    815 
    816         remote_config.validate_params(funding_sat=funding_sat)
    817         # The receiving node MUST fail the channel if:
    818         #     the funder's amount for the initial commitment transaction is not
    819         #     sufficient for full fee payment.
    820         if remote_config.initial_msat < calc_fees_for_commitment_tx(
    821                 num_htlcs=0,
    822                 feerate=feerate,
    823                 is_local_initiator=False)[REMOTE]:
    824             raise Exception(
    825                 "the funder's amount for the initial commitment transaction "
    826                 "is not sufficient for full fee payment")
    827         # The receiving node MUST fail the channel if:
    828         #     both to_local and to_remote amounts for the initial commitment transaction are
    829         #     less than or equal to channel_reserve_satoshis (see BOLT 3).
    830         if (local_config.initial_msat <= 1000 * payload['channel_reserve_satoshis']
    831                 and remote_config.initial_msat <= 1000 * payload['channel_reserve_satoshis']):
    832             raise Exception(
    833                 "both to_local and to_remote amounts for the initial commitment "
    834                 "transaction are less than or equal to channel_reserve_satoshis")
    835         # note: we ignore payload['channel_flags'],  which e.g. contains 'announce_channel'.
    836         #       Notably if the remote sets 'announce_channel' to True, we will ignore that too,
    837         #       but we will not play along with actually announcing the channel (so we keep it private).
    838 
    839         # -> accept channel
    840         # for the first commitment transaction
    841         per_commitment_secret_first = get_per_commitment_secret_from_seed(
    842             local_config.per_commitment_secret_seed,
    843             RevocationStore.START_INDEX
    844         )
    845         per_commitment_point_first = secret_to_pubkey(
    846             int.from_bytes(per_commitment_secret_first, 'big'))
    847         min_depth = 3
    848         self.send_message(
    849             'accept_channel',
    850             temporary_channel_id=temp_chan_id,
    851             dust_limit_satoshis=local_config.dust_limit_sat,
    852             max_htlc_value_in_flight_msat=local_config.max_htlc_value_in_flight_msat,
    853             channel_reserve_satoshis=local_config.reserve_sat,
    854             htlc_minimum_msat=local_config.htlc_minimum_msat,
    855             minimum_depth=min_depth,
    856             to_self_delay=local_config.to_self_delay,
    857             max_accepted_htlcs=local_config.max_accepted_htlcs,
    858             funding_pubkey=local_config.multisig_key.pubkey,
    859             revocation_basepoint=local_config.revocation_basepoint.pubkey,
    860             payment_basepoint=local_config.payment_basepoint.pubkey,
    861             delayed_payment_basepoint=local_config.delayed_basepoint.pubkey,
    862             htlc_basepoint=local_config.htlc_basepoint.pubkey,
    863             first_per_commitment_point=per_commitment_point_first,
    864             accept_channel_tlvs={
    865                 'upfront_shutdown_script':
    866                     {'shutdown_scriptpubkey': local_config.upfront_shutdown_script}
    867             }
    868         )
    869 
    870         # <- funding created
    871         funding_created = await self.wait_for_message('funding_created', temp_chan_id)
    872 
    873         # -> funding signed
    874         funding_idx = funding_created['funding_output_index']
    875         funding_txid = bh2u(funding_created['funding_txid'][::-1])
    876         channel_id, funding_txid_bytes = channel_id_from_funding_tx(funding_txid, funding_idx)
    877         constraints = ChannelConstraints(
    878             capacity=funding_sat,
    879             is_initiator=False,
    880             funding_txn_minimum_depth=min_depth
    881         )
    882         outpoint = Outpoint(funding_txid, funding_idx)
    883         chan_dict = self.create_channel_storage(
    884             channel_id, outpoint, local_config, remote_config, constraints)
    885         chan = Channel(
    886             chan_dict,
    887             sweep_address=self.lnworker.sweep_address,
    888             lnworker=self.lnworker,
    889             initial_feerate=feerate
    890         )
    891         chan.storage['init_timestamp'] = int(time.time())
    892         if isinstance(self.transport, LNTransport):
    893             chan.add_or_update_peer_addr(self.transport.peer_addr)
    894         remote_sig = funding_created['signature']
    895         chan.receive_new_commitment(remote_sig, [])
    896         sig_64, _ = chan.sign_next_commitment()
    897         self.send_message('funding_signed',
    898             channel_id=channel_id,
    899             signature=sig_64,
    900         )
    901         self.funding_signed_sent.add(chan.channel_id)
    902         chan.open_with_first_pcp(payload['first_per_commitment_point'], remote_sig)
    903         chan.set_state(ChannelState.OPENING)
    904         self.lnworker.add_new_channel(chan)
    905 
    906     async def trigger_force_close(self, channel_id: bytes):
    907         await self.initialized
    908         latest_point = secret_to_pubkey(42) # we need a valid point (BOLT2)
    909         self.send_message(
    910             "channel_reestablish",
    911             channel_id=channel_id,
    912             next_commitment_number=0,
    913             next_revocation_number=0,
    914             your_last_per_commitment_secret=0,
    915             my_current_per_commitment_point=latest_point)
    916 
    917     async def reestablish_channel(self, chan: Channel):
    918         await self.initialized
    919         chan_id = chan.channel_id
    920         assert ChannelState.PREOPENING < chan.get_state() < ChannelState.FORCE_CLOSING
    921         if chan.peer_state != PeerState.DISCONNECTED:
    922             self.logger.info(f'reestablish_channel was called but channel {chan.get_id_for_log()} '
    923                              f'already in peer_state {chan.peer_state!r}')
    924             return
    925         chan.peer_state = PeerState.REESTABLISHING
    926         util.trigger_callback('channel', self.lnworker.wallet, chan)
    927         # BOLT-02: "A node [...] upon disconnection [...] MUST reverse any uncommitted updates sent by the other side"
    928         chan.hm.discard_unsigned_remote_updates()
    929         # ctns
    930         oldest_unrevoked_local_ctn = chan.get_oldest_unrevoked_ctn(LOCAL)
    931         latest_local_ctn = chan.get_latest_ctn(LOCAL)
    932         next_local_ctn = chan.get_next_ctn(LOCAL)
    933         oldest_unrevoked_remote_ctn = chan.get_oldest_unrevoked_ctn(REMOTE)
    934         latest_remote_ctn = chan.get_latest_ctn(REMOTE)
    935         next_remote_ctn = chan.get_next_ctn(REMOTE)
    936         assert self.features.supports(LnFeatures.OPTION_DATA_LOSS_PROTECT_OPT)
    937         # send message
    938         if chan.is_static_remotekey_enabled():
    939             latest_secret, latest_point = chan.get_secret_and_point(LOCAL, 0)
    940         else:
    941             latest_secret, latest_point = chan.get_secret_and_point(LOCAL, latest_local_ctn)
    942         if oldest_unrevoked_remote_ctn == 0:
    943             last_rev_secret = 0
    944         else:
    945             last_rev_index = oldest_unrevoked_remote_ctn - 1
    946             last_rev_secret = chan.revocation_store.retrieve_secret(RevocationStore.START_INDEX - last_rev_index)
    947         self.send_message(
    948             "channel_reestablish",
    949             channel_id=chan_id,
    950             next_commitment_number=next_local_ctn,
    951             next_revocation_number=oldest_unrevoked_remote_ctn,
    952             your_last_per_commitment_secret=last_rev_secret,
    953             my_current_per_commitment_point=latest_point)
    954         self.logger.info(f'channel_reestablish ({chan.get_id_for_log()}): sent channel_reestablish with '
    955                          f'(next_local_ctn={next_local_ctn}, '
    956                          f'oldest_unrevoked_remote_ctn={oldest_unrevoked_remote_ctn})')
    957         while True:
    958             try:
    959                 msg = await self.wait_for_message('channel_reestablish', chan_id)
    960                 break
    961             except asyncio.TimeoutError:
    962                 self.logger.info('waiting to receive channel_reestablish...')
    963                 continue
    964         their_next_local_ctn = msg["next_commitment_number"]
    965         their_oldest_unrevoked_remote_ctn = msg["next_revocation_number"]
    966         their_local_pcp = msg.get("my_current_per_commitment_point")
    967         their_claim_of_our_last_per_commitment_secret = msg.get("your_last_per_commitment_secret")
    968         self.logger.info(f'channel_reestablish ({chan.get_id_for_log()}): received channel_reestablish with '
    969                          f'(their_next_local_ctn={their_next_local_ctn}, '
    970                          f'their_oldest_unrevoked_remote_ctn={their_oldest_unrevoked_remote_ctn})')
    971         # sanity checks of received values
    972         if their_next_local_ctn < 0:
    973             raise RemoteMisbehaving(f"channel reestablish: their_next_local_ctn < 0")
    974         if their_oldest_unrevoked_remote_ctn < 0:
    975             raise RemoteMisbehaving(f"channel reestablish: their_oldest_unrevoked_remote_ctn < 0")
    976         # Replay un-acked local updates (including commitment_signed) byte-for-byte.
    977         # If we have sent them a commitment signature that they "lost" (due to disconnect),
    978         # we need to make sure we replay the same local updates, as otherwise they could
    979         # end up with two (or more) signed valid commitment transactions at the same ctn.
    980         # Multiple valid ctxs at the same ctn is a major headache for pre-signing spending txns,
    981         # e.g. for watchtowers, hence we must ensure these ctxs coincide.
    982         # We replay the local updates even if they were not yet committed.
    983         unacked = chan.hm.get_unacked_local_updates()
    984         n_replayed_msgs = 0
    985         for ctn, messages in unacked.items():
    986             if ctn < their_next_local_ctn:
    987                 # They claim to have received these messages and the corresponding
    988                 # commitment_signed, hence we must not replay them.
    989                 continue
    990             for raw_upd_msg in messages:
    991                 self.transport.send_bytes(raw_upd_msg)
    992                 n_replayed_msgs += 1
    993         self.logger.info(f'channel_reestablish ({chan.get_id_for_log()}): replayed {n_replayed_msgs} unacked messages')
    994 
    995         we_are_ahead = False
    996         they_are_ahead = False
    997         # compare remote ctns
    998         if next_remote_ctn != their_next_local_ctn:
    999             if their_next_local_ctn == latest_remote_ctn and chan.hm.is_revack_pending(REMOTE):
   1000                 # We replayed the local updates (see above), which should have contained a commitment_signed
   1001                 # (due to is_revack_pending being true), and this should have remedied this situation.
   1002                 pass
   1003             else:
   1004                 self.logger.warning(f"channel_reestablish ({chan.get_id_for_log()}): "
   1005                                     f"expected remote ctn {next_remote_ctn}, got {their_next_local_ctn}")
   1006                 if their_next_local_ctn < next_remote_ctn:
   1007                     we_are_ahead = True
   1008                 else:
   1009                     they_are_ahead = True
   1010         # compare local ctns
   1011         if oldest_unrevoked_local_ctn != their_oldest_unrevoked_remote_ctn:
   1012             if oldest_unrevoked_local_ctn - 1 == their_oldest_unrevoked_remote_ctn:
   1013                 # A node:
   1014                 #    if next_revocation_number is equal to the commitment number of the last revoke_and_ack
   1015                 #    the receiving node sent, AND the receiving node hasn't already received a closing_signed:
   1016                 #        MUST re-send the revoke_and_ack.
   1017                 last_secret, last_point = chan.get_secret_and_point(LOCAL, oldest_unrevoked_local_ctn - 1)
   1018                 next_secret, next_point = chan.get_secret_and_point(LOCAL, oldest_unrevoked_local_ctn + 1)
   1019                 self.send_message(
   1020                     "revoke_and_ack",
   1021                     channel_id=chan.channel_id,
   1022                     per_commitment_secret=last_secret,
   1023                     next_per_commitment_point=next_point)
   1024             else:
   1025                 self.logger.warning(f"channel_reestablish ({chan.get_id_for_log()}): "
   1026                                     f"expected local ctn {oldest_unrevoked_local_ctn}, got {their_oldest_unrevoked_remote_ctn}")
   1027                 if their_oldest_unrevoked_remote_ctn < oldest_unrevoked_local_ctn:
   1028                     we_are_ahead = True
   1029                 else:
   1030                     they_are_ahead = True
   1031         # option_data_loss_protect
   1032         def are_datalossprotect_fields_valid() -> bool:
   1033             if their_local_pcp is None or their_claim_of_our_last_per_commitment_secret is None:
   1034                 return False
   1035             if their_oldest_unrevoked_remote_ctn > 0:
   1036                 our_pcs, __ = chan.get_secret_and_point(LOCAL, their_oldest_unrevoked_remote_ctn - 1)
   1037             else:
   1038                 assert their_oldest_unrevoked_remote_ctn == 0
   1039                 our_pcs = bytes(32)
   1040             if our_pcs != their_claim_of_our_last_per_commitment_secret:
   1041                 self.logger.error(f"channel_reestablish ({chan.get_id_for_log()}): "
   1042                                   f"(DLP) local PCS mismatch: {bh2u(our_pcs)} != {bh2u(their_claim_of_our_last_per_commitment_secret)}")
   1043                 return False
   1044             if chan.is_static_remotekey_enabled():
   1045                 return True
   1046             try:
   1047                 __, our_remote_pcp = chan.get_secret_and_point(REMOTE, their_next_local_ctn - 1)
   1048             except RemoteCtnTooFarInFuture:
   1049                 pass
   1050             else:
   1051                 if our_remote_pcp != their_local_pcp:
   1052                     self.logger.error(f"channel_reestablish ({chan.get_id_for_log()}): "
   1053                                       f"(DLP) remote PCP mismatch: {bh2u(our_remote_pcp)} != {bh2u(their_local_pcp)}")
   1054                     return False
   1055             return True
   1056 
   1057         if not are_datalossprotect_fields_valid():
   1058             raise RemoteMisbehaving("channel_reestablish: data loss protect fields invalid")
   1059 
   1060         if they_are_ahead:
   1061             self.logger.warning(f"channel_reestablish ({chan.get_id_for_log()}): "
   1062                                 f"remote is ahead of us! They should force-close. Remote PCP: {bh2u(their_local_pcp)}")
   1063             # data_loss_protect_remote_pcp is used in lnsweep
   1064             chan.set_data_loss_protect_remote_pcp(their_next_local_ctn - 1, their_local_pcp)
   1065             self.lnworker.save_channel(chan)
   1066             chan.peer_state = PeerState.BAD
   1067             return
   1068         elif we_are_ahead:
   1069             self.logger.warning(f"channel_reestablish ({chan.get_id_for_log()}): we are ahead of remote! trying to force-close.")
   1070             await self.lnworker.try_force_closing(chan_id)
   1071             return
   1072 
   1073         chan.peer_state = PeerState.GOOD
   1074         if chan.is_funded() and their_next_local_ctn == next_local_ctn == 1:
   1075             self.send_funding_locked(chan)
   1076         # checks done
   1077         if chan.is_funded() and chan.config[LOCAL].funding_locked_received:
   1078             self.mark_open(chan)
   1079         util.trigger_callback('channel', self.lnworker.wallet, chan)
   1080         # if we have sent a previous shutdown, it must be retransmitted (Bolt2)
   1081         if chan.get_state() == ChannelState.SHUTDOWN:
   1082             await self.send_shutdown(chan)
   1083 
   1084     def send_funding_locked(self, chan: Channel):
   1085         channel_id = chan.channel_id
   1086         per_commitment_secret_index = RevocationStore.START_INDEX - 1
   1087         per_commitment_point_second = secret_to_pubkey(int.from_bytes(
   1088             get_per_commitment_secret_from_seed(chan.config[LOCAL].per_commitment_secret_seed, per_commitment_secret_index), 'big'))
   1089         # note: if funding_locked was not yet received, we might send it multiple times
   1090         self.send_message("funding_locked", channel_id=channel_id, next_per_commitment_point=per_commitment_point_second)
   1091         if chan.is_funded() and chan.config[LOCAL].funding_locked_received:
   1092             self.mark_open(chan)
   1093 
   1094     def on_funding_locked(self, chan: Channel, payload):
   1095         self.logger.info(f"on_funding_locked. channel: {bh2u(chan.channel_id)}")
   1096         if not chan.config[LOCAL].funding_locked_received:
   1097             their_next_point = payload["next_per_commitment_point"]
   1098             chan.config[REMOTE].next_per_commitment_point = their_next_point
   1099             chan.config[LOCAL].funding_locked_received = True
   1100             self.lnworker.save_channel(chan)
   1101         if chan.is_funded():
   1102             self.mark_open(chan)
   1103 
   1104     def on_network_update(self, chan: Channel, funding_tx_depth: int):
   1105         """
   1106         Only called when the channel is OPEN.
   1107 
   1108         Runs on the Network thread.
   1109         """
   1110         if not chan.config[LOCAL].was_announced and funding_tx_depth >= 6:
   1111             # don't announce our channels
   1112             # FIXME should this be a field in chan.local_state maybe?
   1113             return
   1114             chan.config[LOCAL].was_announced = True
   1115             self.lnworker.save_channel(chan)
   1116             coro = self.handle_announcements(chan)
   1117             asyncio.run_coroutine_threadsafe(coro, self.network.asyncio_loop)
   1118 
   1119     @log_exceptions
   1120     async def handle_announcements(self, chan: Channel):
   1121         h, local_node_sig, local_bitcoin_sig = self.send_announcement_signatures(chan)
   1122         announcement_signatures_msg = await self.announcement_signatures[chan.channel_id].get()
   1123         remote_node_sig = announcement_signatures_msg["node_signature"]
   1124         remote_bitcoin_sig = announcement_signatures_msg["bitcoin_signature"]
   1125         if not ecc.verify_signature(chan.config[REMOTE].multisig_key.pubkey, remote_bitcoin_sig, h):
   1126             raise Exception("bitcoin_sig invalid in announcement_signatures")
   1127         if not ecc.verify_signature(self.pubkey, remote_node_sig, h):
   1128             raise Exception("node_sig invalid in announcement_signatures")
   1129 
   1130         node_sigs = [remote_node_sig, local_node_sig]
   1131         bitcoin_sigs = [remote_bitcoin_sig, local_bitcoin_sig]
   1132         bitcoin_keys = [chan.config[REMOTE].multisig_key.pubkey, chan.config[LOCAL].multisig_key.pubkey]
   1133 
   1134         if self.node_ids[0] > self.node_ids[1]:
   1135             node_sigs.reverse()
   1136             bitcoin_sigs.reverse()
   1137             node_ids = list(reversed(self.node_ids))
   1138             bitcoin_keys.reverse()
   1139         else:
   1140             node_ids = self.node_ids
   1141 
   1142         self.send_message("channel_announcement",
   1143             node_signatures_1=node_sigs[0],
   1144             node_signatures_2=node_sigs[1],
   1145             bitcoin_signature_1=bitcoin_sigs[0],
   1146             bitcoin_signature_2=bitcoin_sigs[1],
   1147             len=0,
   1148             #features not set (defaults to zeros)
   1149             chain_hash=constants.net.rev_genesis_bytes(),
   1150             short_channel_id=chan.short_channel_id,
   1151             node_id_1=node_ids[0],
   1152             node_id_2=node_ids[1],
   1153             bitcoin_key_1=bitcoin_keys[0],
   1154             bitcoin_key_2=bitcoin_keys[1]
   1155         )
   1156 
   1157     def mark_open(self, chan: Channel):
   1158         assert chan.is_funded()
   1159         # only allow state transition from "FUNDED" to "OPEN"
   1160         old_state = chan.get_state()
   1161         if old_state == ChannelState.OPEN:
   1162             return
   1163         if old_state != ChannelState.FUNDED:
   1164             self.logger.info(f"cannot mark open ({chan.get_id_for_log()}), current state: {repr(old_state)}")
   1165             return
   1166         assert chan.config[LOCAL].funding_locked_received
   1167         chan.set_state(ChannelState.OPEN)
   1168         util.trigger_callback('channel', self.lnworker.wallet, chan)
   1169         # peer may have sent us a channel update for the incoming direction previously
   1170         pending_channel_update = self.orphan_channel_updates.get(chan.short_channel_id)
   1171         if pending_channel_update:
   1172             chan.set_remote_update(pending_channel_update['raw'])
   1173         self.logger.info(f"CHANNEL OPENING COMPLETED ({chan.get_id_for_log()})")
   1174         forwarding_enabled = self.network.config.get('lightning_forward_payments', False)
   1175         if forwarding_enabled:
   1176             # send channel_update of outgoing edge to peer,
   1177             # so that channel can be used to to receive payments
   1178             self.logger.info(f"sending channel update for outgoing edge ({chan.get_id_for_log()})")
   1179             chan_upd = chan.get_outgoing_gossip_channel_update()
   1180             self.transport.send_bytes(chan_upd)
   1181 
   1182     def send_announcement_signatures(self, chan: Channel):
   1183         chan_ann = chan.construct_channel_announcement_without_sigs()
   1184         preimage = chan_ann[256+2:]
   1185         msg_hash = sha256d(preimage)
   1186         bitcoin_signature = ecc.ECPrivkey(chan.config[LOCAL].multisig_key.privkey).sign(msg_hash, sig_string_from_r_and_s)
   1187         node_signature = ecc.ECPrivkey(self.privkey).sign(msg_hash, sig_string_from_r_and_s)
   1188         self.send_message("announcement_signatures",
   1189             channel_id=chan.channel_id,
   1190             short_channel_id=chan.short_channel_id,
   1191             node_signature=node_signature,
   1192             bitcoin_signature=bitcoin_signature
   1193         )
   1194         return msg_hash, node_signature, bitcoin_signature
   1195 
   1196     def on_update_fail_htlc(self, chan: Channel, payload):
   1197         htlc_id = payload["id"]
   1198         reason = payload["reason"]
   1199         self.logger.info(f"on_update_fail_htlc. chan {chan.short_channel_id}. htlc_id {htlc_id}")
   1200         chan.receive_fail_htlc(htlc_id, error_bytes=reason)  # TODO handle exc and maybe fail channel (e.g. bad htlc_id)
   1201         self.maybe_send_commitment(chan)
   1202 
   1203     def maybe_send_commitment(self, chan: Channel):
   1204         # REMOTE should revoke first before we can sign a new ctx
   1205         if chan.hm.is_revack_pending(REMOTE):
   1206             return
   1207         # if there are no changes, we will not (and must not) send a new commitment
   1208         if not chan.has_pending_changes(REMOTE):
   1209             return
   1210         self.logger.info(f'send_commitment. chan {chan.short_channel_id}. ctn: {chan.get_next_ctn(REMOTE)}.')
   1211         sig_64, htlc_sigs = chan.sign_next_commitment()
   1212         self.send_message("commitment_signed", channel_id=chan.channel_id, signature=sig_64, num_htlcs=len(htlc_sigs), htlc_signature=b"".join(htlc_sigs))
   1213 
   1214     def pay(self, *,
   1215             route: 'LNPaymentRoute',
   1216             chan: Channel,
   1217             amount_msat: int,
   1218             total_msat: int,
   1219             payment_hash: bytes,
   1220             min_final_cltv_expiry: int,
   1221             payment_secret: bytes = None,
   1222             trampoline_onion=None) -> UpdateAddHtlc:
   1223 
   1224         assert amount_msat > 0, "amount_msat is not greater zero"
   1225         assert len(route) > 0
   1226         if not chan.can_send_update_add_htlc():
   1227             raise PaymentFailure("Channel cannot send update_add_htlc")
   1228         # add features learned during "init" for direct neighbour:
   1229         route[0].node_features |= self.features
   1230         local_height = self.network.get_local_height()
   1231         final_cltv = local_height + min_final_cltv_expiry
   1232         hops_data, amount_msat, cltv = calc_hops_data_for_payment(
   1233             route,
   1234             amount_msat,
   1235             final_cltv,
   1236             total_msat=total_msat,
   1237             payment_secret=payment_secret)
   1238         num_hops = len(hops_data)
   1239         self.logger.info(f"lnpeer.pay len(route)={len(route)}")
   1240         for i in range(len(route)):
   1241             self.logger.info(f"  {i}: edge={route[i].short_channel_id} hop_data={hops_data[i]!r}")
   1242         assert final_cltv <= cltv, (final_cltv, cltv)
   1243         session_key = os.urandom(32) # session_key
   1244         # if we are forwarding a trampoline payment, add trampoline onion
   1245         if trampoline_onion:
   1246             self.logger.info(f'adding trampoline onion to final payload')
   1247             trampoline_payload = hops_data[num_hops-2].payload
   1248             trampoline_payload["trampoline_onion_packet"] = {
   1249                 "version": trampoline_onion.version,
   1250                 "public_key": trampoline_onion.public_key,
   1251                 "hops_data": trampoline_onion.hops_data,
   1252                 "hmac": trampoline_onion.hmac
   1253             }
   1254         # create onion packet
   1255         payment_path_pubkeys = [x.node_id for x in route]
   1256         onion = new_onion_packet(payment_path_pubkeys, session_key, hops_data, associated_data=payment_hash) # must use another sessionkey
   1257         self.logger.info(f"starting payment. len(route)={len(hops_data)}.")
   1258         # create htlc
   1259         if cltv > local_height + lnutil.NBLOCK_CLTV_EXPIRY_TOO_FAR_INTO_FUTURE:
   1260             raise PaymentFailure(f"htlc expiry too far into future. (in {cltv-local_height} blocks)")
   1261         htlc = UpdateAddHtlc(amount_msat=amount_msat, payment_hash=payment_hash, cltv_expiry=cltv, timestamp=int(time.time()))
   1262         htlc = chan.add_htlc(htlc)
   1263         chan.set_onion_key(htlc.htlc_id, session_key) # should it be the outer onion secret?
   1264         self.logger.info(f"starting payment. htlc: {htlc}")
   1265         self.send_message(
   1266             "update_add_htlc",
   1267             channel_id=chan.channel_id,
   1268             id=htlc.htlc_id,
   1269             cltv_expiry=htlc.cltv_expiry,
   1270             amount_msat=htlc.amount_msat,
   1271             payment_hash=htlc.payment_hash,
   1272             onion_routing_packet=onion.to_bytes())
   1273         self.maybe_send_commitment(chan)
   1274         return htlc
   1275 
   1276     def send_revoke_and_ack(self, chan: Channel):
   1277         self.logger.info(f'send_revoke_and_ack. chan {chan.short_channel_id}. ctn: {chan.get_oldest_unrevoked_ctn(LOCAL)}')
   1278         rev = chan.revoke_current_commitment()
   1279         self.lnworker.save_channel(chan)
   1280         self.send_message("revoke_and_ack",
   1281             channel_id=chan.channel_id,
   1282             per_commitment_secret=rev.per_commitment_secret,
   1283             next_per_commitment_point=rev.next_per_commitment_point)
   1284         self.maybe_send_commitment(chan)
   1285 
   1286     def on_commitment_signed(self, chan: Channel, payload):
   1287         if chan.peer_state == PeerState.BAD:
   1288             return
   1289         self.logger.info(f'on_commitment_signed. chan {chan.short_channel_id}. ctn: {chan.get_next_ctn(LOCAL)}.')
   1290         # make sure there were changes to the ctx, otherwise the remote peer is misbehaving
   1291         if not chan.has_pending_changes(LOCAL):
   1292             # TODO if feerate changed A->B->A; so there were updates but the value is identical,
   1293             #      then it might be legal to send a commitment_signature
   1294             #      see https://github.com/lightningnetwork/lightning-rfc/pull/618
   1295             raise RemoteMisbehaving('received commitment_signed without pending changes')
   1296         # REMOTE should wait until we have revoked
   1297         if chan.hm.is_revack_pending(LOCAL):
   1298             raise RemoteMisbehaving('received commitment_signed before we revoked previous ctx')
   1299         data = payload["htlc_signature"]
   1300         htlc_sigs = list(chunks(data, 64))
   1301         chan.receive_new_commitment(payload["signature"], htlc_sigs)
   1302         self.send_revoke_and_ack(chan)
   1303 
   1304     def on_update_fulfill_htlc(self, chan: Channel, payload):
   1305         preimage = payload["payment_preimage"]
   1306         payment_hash = sha256(preimage)
   1307         htlc_id = payload["id"]
   1308         self.logger.info(f"on_update_fulfill_htlc. chan {chan.short_channel_id}. htlc_id {htlc_id}")
   1309         chan.receive_htlc_settle(preimage, htlc_id)  # TODO handle exc and maybe fail channel (e.g. bad htlc_id)
   1310         self.lnworker.save_preimage(payment_hash, preimage)
   1311         self.maybe_send_commitment(chan)
   1312 
   1313     def on_update_fail_malformed_htlc(self, chan: Channel, payload):
   1314         htlc_id = payload["id"]
   1315         failure_code = payload["failure_code"]
   1316         self.logger.info(f"on_update_fail_malformed_htlc. chan {chan.get_id_for_log()}. "
   1317                          f"htlc_id {htlc_id}. failure_code={failure_code}")
   1318         if failure_code & OnionFailureCodeMetaFlag.BADONION == 0:
   1319             asyncio.ensure_future(self.lnworker.try_force_closing(chan.channel_id))
   1320             raise RemoteMisbehaving(f"received update_fail_malformed_htlc with unexpected failure code: {failure_code}")
   1321         reason = OnionRoutingFailure(code=failure_code, data=payload["sha256_of_onion"])
   1322         chan.receive_fail_htlc(htlc_id, error_bytes=None, reason=reason)
   1323         self.maybe_send_commitment(chan)
   1324 
   1325     def on_update_add_htlc(self, chan: Channel, payload):
   1326         payment_hash = payload["payment_hash"]
   1327         htlc_id = payload["id"]
   1328         cltv_expiry = payload["cltv_expiry"]
   1329         amount_msat_htlc = payload["amount_msat"]
   1330         onion_packet = payload["onion_routing_packet"]
   1331         htlc = UpdateAddHtlc(
   1332             amount_msat=amount_msat_htlc,
   1333             payment_hash=payment_hash,
   1334             cltv_expiry=cltv_expiry,
   1335             timestamp=int(time.time()),
   1336             htlc_id=htlc_id)
   1337         self.logger.info(f"on_update_add_htlc. chan {chan.short_channel_id}. htlc={str(htlc)}")
   1338         if chan.get_state() != ChannelState.OPEN:
   1339             raise RemoteMisbehaving(f"received update_add_htlc while chan.get_state() != OPEN. state was {chan.get_state()!r}")
   1340         if cltv_expiry > bitcoin.NLOCKTIME_BLOCKHEIGHT_MAX:
   1341             asyncio.ensure_future(self.lnworker.try_force_closing(chan.channel_id))
   1342             raise RemoteMisbehaving(f"received update_add_htlc with cltv_expiry > BLOCKHEIGHT_MAX. value was {cltv_expiry}")
   1343         # add htlc
   1344         chan.receive_htlc(htlc, onion_packet)
   1345         util.trigger_callback('htlc_added', chan, htlc, RECEIVED)
   1346 
   1347     def maybe_forward_htlc(
   1348             self, *,
   1349             htlc: UpdateAddHtlc,
   1350             processed_onion: ProcessedOnionPacket) -> Tuple[bytes, int]:
   1351 
   1352         # Forward HTLC
   1353         # FIXME: there are critical safety checks MISSING here
   1354         #        - for example; atm we forward first and then persist "forwarding_info",
   1355         #          so if we segfault in-between and restart, we might forward an HTLC twice...
   1356         #          (same for trampoline forwarding)
   1357         forwarding_enabled = self.network.config.get('lightning_forward_payments', False)
   1358         if not forwarding_enabled:
   1359             self.logger.info(f"forwarding is disabled. failing htlc.")
   1360             raise OnionRoutingFailure(code=OnionFailureCode.PERMANENT_CHANNEL_FAILURE, data=b'')
   1361         chain = self.network.blockchain()
   1362         if chain.is_tip_stale():
   1363             raise OnionRoutingFailure(code=OnionFailureCode.TEMPORARY_NODE_FAILURE, data=b'')
   1364         try:
   1365             next_chan_scid = processed_onion.hop_data.payload["short_channel_id"]["short_channel_id"]
   1366         except:
   1367             raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_PAYLOAD, data=b'\x00\x00\x00')
   1368         next_chan = self.lnworker.get_channel_by_short_id(next_chan_scid)
   1369         local_height = chain.height()
   1370         if next_chan is None:
   1371             self.logger.info(f"cannot forward htlc. cannot find next_chan {next_chan_scid}")
   1372             raise OnionRoutingFailure(code=OnionFailureCode.UNKNOWN_NEXT_PEER, data=b'')
   1373         outgoing_chan_upd = next_chan.get_outgoing_gossip_channel_update()[2:]
   1374         outgoing_chan_upd_len = len(outgoing_chan_upd).to_bytes(2, byteorder="big")
   1375         if not next_chan.can_send_update_add_htlc():
   1376             self.logger.info(f"cannot forward htlc. next_chan {next_chan_scid} cannot send ctx updates. "
   1377                              f"chan state {next_chan.get_state()!r}, peer state: {next_chan.peer_state!r}")
   1378             data = outgoing_chan_upd_len + outgoing_chan_upd
   1379             raise OnionRoutingFailure(code=OnionFailureCode.TEMPORARY_CHANNEL_FAILURE, data=data)
   1380         try:
   1381             next_cltv_expiry = processed_onion.hop_data.payload["outgoing_cltv_value"]["outgoing_cltv_value"]
   1382         except:
   1383             raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_PAYLOAD, data=b'\x00\x00\x00')
   1384         if htlc.cltv_expiry - next_cltv_expiry < next_chan.forwarding_cltv_expiry_delta:
   1385             data = htlc.cltv_expiry.to_bytes(4, byteorder="big") + outgoing_chan_upd_len + outgoing_chan_upd
   1386             raise OnionRoutingFailure(code=OnionFailureCode.INCORRECT_CLTV_EXPIRY, data=data)
   1387         if htlc.cltv_expiry - lnutil.MIN_FINAL_CLTV_EXPIRY_ACCEPTED <= local_height \
   1388                 or next_cltv_expiry <= local_height:
   1389             data = outgoing_chan_upd_len + outgoing_chan_upd
   1390             raise OnionRoutingFailure(code=OnionFailureCode.EXPIRY_TOO_SOON, data=data)
   1391         if max(htlc.cltv_expiry, next_cltv_expiry) > local_height + lnutil.NBLOCK_CLTV_EXPIRY_TOO_FAR_INTO_FUTURE:
   1392             raise OnionRoutingFailure(code=OnionFailureCode.EXPIRY_TOO_FAR, data=b'')
   1393         try:
   1394             next_amount_msat_htlc = processed_onion.hop_data.payload["amt_to_forward"]["amt_to_forward"]
   1395         except:
   1396             raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_PAYLOAD, data=b'\x00\x00\x00')
   1397         forwarding_fees = fee_for_edge_msat(
   1398             forwarded_amount_msat=next_amount_msat_htlc,
   1399             fee_base_msat=next_chan.forwarding_fee_base_msat,
   1400             fee_proportional_millionths=next_chan.forwarding_fee_proportional_millionths)
   1401         if htlc.amount_msat - next_amount_msat_htlc < forwarding_fees:
   1402             data = next_amount_msat_htlc.to_bytes(8, byteorder="big") + outgoing_chan_upd_len + outgoing_chan_upd
   1403             raise OnionRoutingFailure(code=OnionFailureCode.FEE_INSUFFICIENT, data=data)
   1404         self.logger.info(f'forwarding htlc to {next_chan.node_id}')
   1405         next_htlc = UpdateAddHtlc(
   1406             amount_msat=next_amount_msat_htlc,
   1407             payment_hash=htlc.payment_hash,
   1408             cltv_expiry=next_cltv_expiry,
   1409             timestamp=int(time.time()))
   1410         next_htlc = next_chan.add_htlc(next_htlc)
   1411         next_peer = self.lnworker.peers[next_chan.node_id]
   1412         try:
   1413             next_peer.send_message(
   1414                 "update_add_htlc",
   1415                 channel_id=next_chan.channel_id,
   1416                 id=next_htlc.htlc_id,
   1417                 cltv_expiry=next_cltv_expiry,
   1418                 amount_msat=next_amount_msat_htlc,
   1419                 payment_hash=next_htlc.payment_hash,
   1420                 onion_routing_packet=processed_onion.next_packet.to_bytes()
   1421             )
   1422         except BaseException as e:
   1423             self.logger.info(f"failed to forward htlc: error sending message. {e}")
   1424             data = outgoing_chan_upd_len + outgoing_chan_upd
   1425             raise OnionRoutingFailure(code=OnionFailureCode.TEMPORARY_CHANNEL_FAILURE, data=data)
   1426         return next_chan_scid, next_htlc.htlc_id
   1427 
   1428     def maybe_forward_trampoline(
   1429             self, *,
   1430             chan: Channel,
   1431             htlc: UpdateAddHtlc,
   1432             trampoline_onion: ProcessedOnionPacket):
   1433 
   1434         payload = trampoline_onion.hop_data.payload
   1435         payment_hash = htlc.payment_hash
   1436         payment_secret = os.urandom(32)
   1437         try:
   1438             outgoing_node_id = payload["outgoing_node_id"]["outgoing_node_id"]
   1439             amt_to_forward = payload["amt_to_forward"]["amt_to_forward"]
   1440             cltv_from_onion = payload["outgoing_cltv_value"]["outgoing_cltv_value"]
   1441             if "invoice_features" in payload:
   1442                 self.logger.info('forward_trampoline: legacy')
   1443                 next_trampoline_onion = None
   1444                 invoice_features = payload["invoice_features"]["invoice_features"]
   1445                 invoice_routing_info = payload["invoice_routing_info"]["invoice_routing_info"]
   1446             else:
   1447                 self.logger.info('forward_trampoline: end-to-end')
   1448                 invoice_features = LnFeatures.BASIC_MPP_OPT
   1449                 next_trampoline_onion = trampoline_onion.next_packet
   1450         except Exception as e:
   1451             self.logger.exception('')
   1452             raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_PAYLOAD, data=b'\x00\x00\x00')
   1453 
   1454         # these are the fee/cltv paid by the sender
   1455         # pay_to_node will raise if they are not sufficient
   1456         trampoline_cltv_delta = htlc.cltv_expiry - cltv_from_onion
   1457         trampoline_fee = htlc.amount_msat - amt_to_forward
   1458 
   1459         @log_exceptions
   1460         async def forward_trampoline_payment():
   1461             try:
   1462                 await self.lnworker.pay_to_node(
   1463                     node_pubkey=outgoing_node_id,
   1464                     payment_hash=payment_hash,
   1465                     payment_secret=payment_secret,
   1466                     amount_to_pay=amt_to_forward,
   1467                     min_cltv_expiry=cltv_from_onion,
   1468                     r_tags=[],
   1469                     invoice_features=invoice_features,
   1470                     fwd_trampoline_onion=next_trampoline_onion,
   1471                     fwd_trampoline_fee=trampoline_fee,
   1472                     fwd_trampoline_cltv_delta=trampoline_cltv_delta,
   1473                     attempts=1)
   1474             except OnionRoutingFailure as e:
   1475                 # FIXME: cannot use payment_hash as key
   1476                 self.lnworker.trampoline_forwarding_failures[payment_hash] = e
   1477             except PaymentFailure as e:
   1478                 # FIXME: adapt the error code
   1479                 error_reason = OnionRoutingFailure(code=OnionFailureCode.UNKNOWN_NEXT_PEER, data=b'')
   1480                 self.lnworker.trampoline_forwarding_failures[payment_hash] = error_reason
   1481 
   1482         asyncio.ensure_future(forward_trampoline_payment())
   1483 
   1484     def maybe_fulfill_htlc(
   1485             self, *,
   1486             chan: Channel,
   1487             htlc: UpdateAddHtlc,
   1488             processed_onion: ProcessedOnionPacket,
   1489             is_trampoline: bool = False) -> Tuple[Optional[bytes], Optional[OnionPacket]]:
   1490 
   1491         """As a final recipient of an HTLC, decide if we should fulfill it.
   1492         Return (preimage, trampoline_onion_packet) with at most a single element not None
   1493         """
   1494         def log_fail_reason(reason: str):
   1495             self.logger.info(f"maybe_fulfill_htlc. will FAIL HTLC: chan {chan.short_channel_id}. "
   1496                              f"{reason}. htlc={str(htlc)}. onion_payload={processed_onion.hop_data.payload}")
   1497 
   1498         try:
   1499             amt_to_forward = processed_onion.hop_data.payload["amt_to_forward"]["amt_to_forward"]
   1500         except:
   1501             log_fail_reason(f"'amt_to_forward' missing from onion")
   1502             raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_PAYLOAD, data=b'\x00\x00\x00')
   1503 
   1504         # Check that our blockchain tip is sufficiently recent so that we have an approx idea of the height.
   1505         # We should not release the preimage for an HTLC that its sender could already time out as
   1506         # then they might try to force-close and it becomes a race.
   1507         chain = self.network.blockchain()
   1508         if chain.is_tip_stale():
   1509             log_fail_reason(f"our chain tip is stale")
   1510             raise OnionRoutingFailure(code=OnionFailureCode.TEMPORARY_NODE_FAILURE, data=b'')
   1511         local_height = chain.height()
   1512         exc_incorrect_or_unknown_pd = OnionRoutingFailure(
   1513             code=OnionFailureCode.INCORRECT_OR_UNKNOWN_PAYMENT_DETAILS,
   1514             data=amt_to_forward.to_bytes(8, byteorder="big") + local_height.to_bytes(4, byteorder="big"))
   1515         if local_height + MIN_FINAL_CLTV_EXPIRY_ACCEPTED > htlc.cltv_expiry:
   1516             log_fail_reason(f"htlc.cltv_expiry is unreasonably close")
   1517             raise exc_incorrect_or_unknown_pd
   1518         try:
   1519             cltv_from_onion = processed_onion.hop_data.payload["outgoing_cltv_value"]["outgoing_cltv_value"]
   1520         except:
   1521             log_fail_reason(f"'outgoing_cltv_value' missing from onion")
   1522             raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_PAYLOAD, data=b'\x00\x00\x00')
   1523 
   1524         if not is_trampoline:
   1525             if cltv_from_onion != htlc.cltv_expiry:
   1526                 log_fail_reason(f"cltv_from_onion != htlc.cltv_expiry")
   1527                 raise OnionRoutingFailure(
   1528                     code=OnionFailureCode.FINAL_INCORRECT_CLTV_EXPIRY,
   1529                     data=htlc.cltv_expiry.to_bytes(4, byteorder="big"))
   1530         try:
   1531             total_msat = processed_onion.hop_data.payload["payment_data"]["total_msat"]
   1532         except:
   1533             total_msat = amt_to_forward # fall back to "amt_to_forward"
   1534 
   1535         if not is_trampoline and amt_to_forward != htlc.amount_msat:
   1536             log_fail_reason(f"amt_to_forward != htlc.amount_msat")
   1537             raise OnionRoutingFailure(
   1538                 code=OnionFailureCode.FINAL_INCORRECT_HTLC_AMOUNT,
   1539                 data=htlc.amount_msat.to_bytes(8, byteorder="big"))
   1540 
   1541         try:
   1542             payment_secret_from_onion = processed_onion.hop_data.payload["payment_data"]["payment_secret"]
   1543         except:
   1544             if total_msat > amt_to_forward:
   1545                 # payment_secret is required for MPP
   1546                 log_fail_reason(f"'payment_secret' missing from onion")
   1547                 raise exc_incorrect_or_unknown_pd
   1548             # TODO fail here if invoice has set PAYMENT_SECRET_REQ
   1549             payment_secret_from_onion = None
   1550 
   1551         if total_msat > amt_to_forward:
   1552             mpp_status = self.lnworker.check_received_mpp_htlc(payment_secret_from_onion, chan.short_channel_id, htlc, total_msat)
   1553             if mpp_status is None:
   1554                 return None, None
   1555             if mpp_status is False:
   1556                 log_fail_reason(f"MPP_TIMEOUT")
   1557                 raise OnionRoutingFailure(code=OnionFailureCode.MPP_TIMEOUT, data=b'')
   1558             assert mpp_status is True
   1559 
   1560         # if there is a trampoline_onion, maybe_fulfill_htlc will be called again
   1561         if processed_onion.trampoline_onion_packet:
   1562             # TODO: we should check that all trampoline_onions are the same
   1563             return None, processed_onion.trampoline_onion_packet
   1564 
   1565         info = self.lnworker.get_payment_info(htlc.payment_hash)
   1566         if info is None:
   1567             log_fail_reason(f"no payment_info found for RHASH {htlc.payment_hash.hex()}")
   1568             raise exc_incorrect_or_unknown_pd
   1569         preimage = self.lnworker.get_preimage(htlc.payment_hash)
   1570         if payment_secret_from_onion:
   1571             if payment_secret_from_onion != derive_payment_secret_from_payment_preimage(preimage):
   1572                 log_fail_reason(f'incorrect payment secret {payment_secret_from_onion.hex()} != {derive_payment_secret_from_payment_preimage(preimage).hex()}')
   1573                 raise exc_incorrect_or_unknown_pd
   1574         invoice_msat = info.amount_msat
   1575         if not (invoice_msat is None or invoice_msat <= total_msat <= 2 * invoice_msat):
   1576             log_fail_reason(f"total_msat={total_msat} too different from invoice_msat={invoice_msat}")
   1577             raise exc_incorrect_or_unknown_pd
   1578         self.logger.info(f"maybe_fulfill_htlc. will FULFILL HTLC: chan {chan.short_channel_id}. htlc={str(htlc)}")
   1579         self.lnworker.set_request_status(htlc.payment_hash, PR_PAID)
   1580         return preimage, None
   1581 
   1582     def fulfill_htlc(self, chan: Channel, htlc_id: int, preimage: bytes):
   1583         self.logger.info(f"_fulfill_htlc. chan {chan.short_channel_id}. htlc_id {htlc_id}")
   1584         assert chan.can_send_ctx_updates(), f"cannot send updates: {chan.short_channel_id}"
   1585         assert chan.hm.is_htlc_irrevocably_added_yet(htlc_proposer=REMOTE, htlc_id=htlc_id)
   1586         self.received_htlcs_pending_removal.add((chan, htlc_id))
   1587         chan.settle_htlc(preimage, htlc_id)
   1588         self.send_message(
   1589             "update_fulfill_htlc",
   1590             channel_id=chan.channel_id,
   1591             id=htlc_id,
   1592             payment_preimage=preimage)
   1593 
   1594     def fail_htlc(self, *, chan: Channel, htlc_id: int, error_bytes: bytes):
   1595         self.logger.info(f"fail_htlc. chan {chan.short_channel_id}. htlc_id {htlc_id}.")
   1596         assert chan.can_send_ctx_updates(), f"cannot send updates: {chan.short_channel_id}"
   1597         self.received_htlcs_pending_removal.add((chan, htlc_id))
   1598         chan.fail_htlc(htlc_id)
   1599         self.send_message(
   1600             "update_fail_htlc",
   1601             channel_id=chan.channel_id,
   1602             id=htlc_id,
   1603             len=len(error_bytes),
   1604             reason=error_bytes)
   1605 
   1606     def fail_malformed_htlc(self, *, chan: Channel, htlc_id: int, reason: OnionRoutingFailure):
   1607         self.logger.info(f"fail_malformed_htlc. chan {chan.short_channel_id}. htlc_id {htlc_id}.")
   1608         assert chan.can_send_ctx_updates(), f"cannot send updates: {chan.short_channel_id}"
   1609         if not (reason.code & OnionFailureCodeMetaFlag.BADONION and len(reason.data) == 32):
   1610             raise Exception(f"unexpected reason when sending 'update_fail_malformed_htlc': {reason!r}")
   1611         self.received_htlcs_pending_removal.add((chan, htlc_id))
   1612         chan.fail_htlc(htlc_id)
   1613         self.send_message(
   1614             "update_fail_malformed_htlc",
   1615             channel_id=chan.channel_id,
   1616             id=htlc_id,
   1617             sha256_of_onion=reason.data,
   1618             failure_code=reason.code)
   1619 
   1620     def on_revoke_and_ack(self, chan: Channel, payload):
   1621         if chan.peer_state == PeerState.BAD:
   1622             return
   1623         self.logger.info(f'on_revoke_and_ack. chan {chan.short_channel_id}. ctn: {chan.get_oldest_unrevoked_ctn(REMOTE)}')
   1624         rev = RevokeAndAck(payload["per_commitment_secret"], payload["next_per_commitment_point"])
   1625         chan.receive_revocation(rev)
   1626         self.lnworker.save_channel(chan)
   1627         self.maybe_send_commitment(chan)
   1628 
   1629     def on_update_fee(self, chan: Channel, payload):
   1630         feerate = payload["feerate_per_kw"]
   1631         chan.update_fee(feerate, False)
   1632 
   1633     async def maybe_update_fee(self, chan: Channel):
   1634         """
   1635         called when our fee estimates change
   1636         """
   1637         if not chan.can_send_ctx_updates():
   1638             return
   1639         feerate_per_kw = self.lnworker.current_feerate_per_kw()
   1640         if not chan.constraints.is_initiator:
   1641             if constants.net is not constants.BitcoinRegtest:
   1642                 chan_feerate = chan.get_latest_feerate(LOCAL)
   1643                 ratio = chan_feerate / feerate_per_kw
   1644                 if ratio < 0.5:
   1645                     # Note that we trust the Electrum server about fee rates
   1646                     # Thus, automated force-closing might not be a good idea
   1647                     # Maybe we should display something in the GUI instead
   1648                     self.logger.warning(
   1649                         f"({chan.get_id_for_log()}) feerate is {chan_feerate} sat/kw, "
   1650                         f"current recommended feerate is {feerate_per_kw} sat/kw, consider force closing!")
   1651             return
   1652         chan_fee = chan.get_next_feerate(REMOTE)
   1653         if feerate_per_kw < chan_fee / 2:
   1654             self.logger.info("FEES HAVE FALLEN")
   1655         elif feerate_per_kw > chan_fee * 2:
   1656             self.logger.info("FEES HAVE RISEN")
   1657         else:
   1658             return
   1659         self.logger.info(f"(chan: {chan.get_id_for_log()}) current pending feerate {chan_fee}. "
   1660                          f"new feerate {feerate_per_kw}")
   1661         chan.update_fee(feerate_per_kw, True)
   1662         self.send_message(
   1663             "update_fee",
   1664             channel_id=chan.channel_id,
   1665             feerate_per_kw=feerate_per_kw)
   1666         self.maybe_send_commitment(chan)
   1667 
   1668     @log_exceptions
   1669     async def close_channel(self, chan_id: bytes):
   1670         chan = self.channels[chan_id]
   1671         self.shutdown_received[chan_id] = asyncio.Future()
   1672         await self.send_shutdown(chan)
   1673         payload = await self.shutdown_received[chan_id]
   1674         txid = await self._shutdown(chan, payload, is_local=True)
   1675         self.logger.info(f'({chan.get_id_for_log()}) Channel closed {txid}')
   1676         return txid
   1677 
   1678     async def on_shutdown(self, chan: Channel, payload):
   1679         their_scriptpubkey = payload['scriptpubkey']
   1680         their_upfront_scriptpubkey = chan.config[REMOTE].upfront_shutdown_script
   1681 
   1682         # BOLT-02 check if they use the upfront shutdown script they advertized
   1683         if their_upfront_scriptpubkey:
   1684             if not (their_scriptpubkey == their_upfront_scriptpubkey):
   1685                 raise UpfrontShutdownScriptViolation("remote didn't use upfront shutdown script it commited to in channel opening")
   1686 
   1687         # BOLT-02 restrict the scriptpubkey to some templates:
   1688         if not (match_script_against_template(their_scriptpubkey, transaction.SCRIPTPUBKEY_TEMPLATE_WITNESS_V0)
   1689                 or match_script_against_template(their_scriptpubkey, transaction.SCRIPTPUBKEY_TEMPLATE_P2SH)
   1690                 or match_script_against_template(their_scriptpubkey, transaction.SCRIPTPUBKEY_TEMPLATE_P2PKH)):
   1691             raise Exception(f'scriptpubkey in received shutdown message does not conform to any template: {their_scriptpubkey.hex()}')
   1692         chan_id = chan.channel_id
   1693         if chan_id in self.shutdown_received:
   1694             self.shutdown_received[chan_id].set_result(payload)
   1695         else:
   1696             chan = self.channels[chan_id]
   1697             await self.send_shutdown(chan)
   1698             txid = await self._shutdown(chan, payload, is_local=False)
   1699             self.logger.info(f'({chan.get_id_for_log()}) Channel closed by remote peer {txid}')
   1700 
   1701     def can_send_shutdown(self, chan: Channel):
   1702         if chan.get_state() >= ChannelState.OPENING:
   1703             return True
   1704         if chan.constraints.is_initiator and chan.channel_id in self.funding_created_sent:
   1705             return True
   1706         if not chan.constraints.is_initiator and chan.channel_id in self.funding_signed_sent:
   1707             return True
   1708         return False
   1709 
   1710     async def send_shutdown(self, chan: Channel):
   1711         if not self.can_send_shutdown(chan):
   1712             raise Exception('cannot send shutdown')
   1713 
   1714         if chan.config[LOCAL].upfront_shutdown_script:
   1715             scriptpubkey = chan.config[LOCAL].upfront_shutdown_script
   1716         else:
   1717             scriptpubkey = bfh(bitcoin.address_to_script(chan.sweep_address))
   1718         assert scriptpubkey
   1719 
   1720         # wait until no more pending updates (bolt2)
   1721         chan.set_can_send_ctx_updates(False)
   1722         while chan.has_pending_changes(REMOTE):
   1723             await asyncio.sleep(0.1)
   1724         self.send_message('shutdown', channel_id=chan.channel_id, len=len(scriptpubkey), scriptpubkey=scriptpubkey)
   1725         chan.set_state(ChannelState.SHUTDOWN)
   1726         # can fullfill or fail htlcs. cannot add htlcs, because state != OPEN
   1727         chan.set_can_send_ctx_updates(True)
   1728 
   1729     @log_exceptions
   1730     async def _shutdown(self, chan: Channel, payload, *, is_local: bool):
   1731         # wait until no HTLCs remain in either commitment transaction
   1732         while len(chan.hm.htlcs(LOCAL)) + len(chan.hm.htlcs(REMOTE)) > 0:
   1733             self.logger.info(f'(chan: {chan.short_channel_id}) waiting for htlcs to settle...')
   1734             await asyncio.sleep(1)
   1735         # if no HTLCs remain, we must not send updates
   1736         chan.set_can_send_ctx_updates(False)
   1737         their_scriptpubkey = payload['scriptpubkey']
   1738         if chan.config[LOCAL].upfront_shutdown_script:
   1739             our_scriptpubkey = chan.config[LOCAL].upfront_shutdown_script
   1740         else:
   1741             our_scriptpubkey = bfh(bitcoin.address_to_script(chan.sweep_address))
   1742         assert our_scriptpubkey
   1743 
   1744         # estimate fee of closing tx
   1745         our_sig, closing_tx = chan.make_closing_tx(our_scriptpubkey, their_scriptpubkey, fee_sat=0)
   1746         fee_rate = self.network.config.fee_per_kb()
   1747         our_fee = fee_rate * closing_tx.estimated_size() // 1000
   1748         # BOLT2: The sending node MUST set fee less than or equal to the base fee of the final ctx
   1749         max_fee = chan.get_latest_fee(LOCAL if is_local else REMOTE)
   1750         our_fee = min(our_fee, max_fee)
   1751         drop_remote = False
   1752         def send_closing_signed():
   1753             our_sig, closing_tx = chan.make_closing_tx(our_scriptpubkey, their_scriptpubkey, fee_sat=our_fee, drop_remote=drop_remote)
   1754             self.send_message('closing_signed', channel_id=chan.channel_id, fee_satoshis=our_fee, signature=our_sig)
   1755         def verify_signature(tx, sig):
   1756             their_pubkey = chan.config[REMOTE].multisig_key.pubkey
   1757             preimage_hex = tx.serialize_preimage(0)
   1758             pre_hash = sha256d(bfh(preimage_hex))
   1759             return ecc.verify_signature(their_pubkey, sig, pre_hash)
   1760         # the funder sends the first 'closing_signed' message
   1761         if chan.constraints.is_initiator:
   1762             send_closing_signed()
   1763         # negotiate fee
   1764         while True:
   1765             # FIXME: the remote SHOULD send closing_signed, but some don't.
   1766             cs_payload = await self.wait_for_message('closing_signed', chan.channel_id)
   1767             their_fee = cs_payload['fee_satoshis']
   1768             if their_fee > max_fee:
   1769                 raise Exception(f'the proposed fee exceeds the base fee of the latest commitment transaction {is_local, their_fee, max_fee}')
   1770             their_sig = cs_payload['signature']
   1771             # verify their sig: they might have dropped their output
   1772             our_sig, closing_tx = chan.make_closing_tx(our_scriptpubkey, their_scriptpubkey, fee_sat=their_fee, drop_remote=False)
   1773             if verify_signature(closing_tx, their_sig):
   1774                 drop_remote = False
   1775             else:
   1776                 our_sig, closing_tx = chan.make_closing_tx(our_scriptpubkey, their_scriptpubkey, fee_sat=their_fee, drop_remote=True)
   1777                 if verify_signature(closing_tx, their_sig):
   1778                     drop_remote = True
   1779                 else:
   1780                     raise Exception('failed to verify their signature')
   1781             # Agree if difference is lower or equal to one (see below)
   1782             if abs(our_fee - their_fee) < 2:
   1783                 our_fee = their_fee
   1784                 break
   1785             # this will be "strictly between" (as in BOLT2) previous values because of the above
   1786             our_fee = (our_fee + their_fee) // 2
   1787             # another round
   1788             send_closing_signed()
   1789         # the non-funder replies
   1790         if not chan.constraints.is_initiator:
   1791             send_closing_signed()
   1792         # add signatures
   1793         closing_tx.add_signature_to_txin(
   1794             txin_idx=0,
   1795             signing_pubkey=chan.config[LOCAL].multisig_key.pubkey.hex(),
   1796             sig=bh2u(der_sig_from_sig_string(our_sig) + b'\x01'))
   1797         closing_tx.add_signature_to_txin(
   1798             txin_idx=0,
   1799             signing_pubkey=chan.config[REMOTE].multisig_key.pubkey.hex(),
   1800             sig=bh2u(der_sig_from_sig_string(their_sig) + b'\x01'))
   1801         # save local transaction and set state
   1802         try:
   1803             self.lnworker.wallet.add_transaction(closing_tx)
   1804         except UnrelatedTransactionException:
   1805             pass  # this can happen if (~all the balance goes to REMOTE)
   1806         chan.set_state(ChannelState.CLOSING)
   1807         # broadcast
   1808         await self.network.try_broadcasting(closing_tx, 'closing')
   1809         return closing_tx.txid()
   1810 
   1811     async def htlc_switch(self):
   1812         await self.initialized
   1813         while True:
   1814             self._htlc_switch_iterdone_event.set()
   1815             self._htlc_switch_iterdone_event.clear()
   1816             await asyncio.sleep(0.1)  # TODO maybe make this partly event-driven
   1817             self._htlc_switch_iterstart_event.set()
   1818             self._htlc_switch_iterstart_event.clear()
   1819             self.ping_if_required()
   1820             self._maybe_cleanup_received_htlcs_pending_removal()
   1821             for chan_id, chan in self.channels.items():
   1822                 if not chan.can_send_ctx_updates():
   1823                     continue
   1824                 self.maybe_send_commitment(chan)
   1825                 done = set()
   1826                 unfulfilled = chan.hm.log.get('unfulfilled_htlcs', {})
   1827                 for htlc_id, (local_ctn, remote_ctn, onion_packet_hex, forwarding_info) in unfulfilled.items():
   1828                     if not chan.hm.is_htlc_irrevocably_added_yet(htlc_proposer=REMOTE, htlc_id=htlc_id):
   1829                         continue
   1830                     htlc = chan.hm.get_htlc_by_id(REMOTE, htlc_id)
   1831                     error_reason = None  # type: Optional[OnionRoutingFailure]
   1832                     error_bytes = None  # type: Optional[bytes]
   1833                     preimage = None
   1834                     fw_info = None
   1835                     onion_packet_bytes = bytes.fromhex(onion_packet_hex)
   1836                     onion_packet = None
   1837                     try:
   1838                         onion_packet = OnionPacket.from_bytes(onion_packet_bytes)
   1839                     except OnionRoutingFailure as e:
   1840                         error_reason = e
   1841                     else:
   1842                         try:
   1843                             preimage, fw_info, error_bytes = await self.process_unfulfilled_htlc(
   1844                                 chan=chan,
   1845                                 htlc=htlc,
   1846                                 forwarding_info=forwarding_info,
   1847                                 onion_packet_bytes=onion_packet_bytes,
   1848                                 onion_packet=onion_packet)
   1849                         except OnionRoutingFailure as e:
   1850                             error_bytes = construct_onion_error(e, onion_packet, our_onion_private_key=self.privkey)
   1851                     if fw_info:
   1852                         unfulfilled[htlc_id] = local_ctn, remote_ctn, onion_packet_hex, fw_info
   1853                     elif preimage or error_reason or error_bytes:
   1854                         if preimage:
   1855                             await self.lnworker.enable_htlc_settle.wait()
   1856                             self.fulfill_htlc(chan, htlc.htlc_id, preimage)
   1857                         elif error_bytes:
   1858                             self.fail_htlc(
   1859                                 chan=chan,
   1860                                 htlc_id=htlc.htlc_id,
   1861                                 error_bytes=error_bytes)
   1862                         else:
   1863                             self.fail_malformed_htlc(
   1864                                 chan=chan,
   1865                                 htlc_id=htlc.htlc_id,
   1866                                 reason=error_reason)
   1867                         done.add(htlc_id)
   1868                 # cleanup
   1869                 for htlc_id in done:
   1870                     unfulfilled.pop(htlc_id)
   1871 
   1872     def _maybe_cleanup_received_htlcs_pending_removal(self) -> None:
   1873         done = set()
   1874         for chan, htlc_id in self.received_htlcs_pending_removal:
   1875             if chan.hm.is_htlc_irrevocably_removed_yet(htlc_proposer=REMOTE, htlc_id=htlc_id):
   1876                 done.add((chan, htlc_id))
   1877         if done:
   1878             for key in done:
   1879                 self.received_htlcs_pending_removal.remove(key)
   1880             self.received_htlc_removed_event.set()
   1881             self.received_htlc_removed_event.clear()
   1882 
   1883     async def wait_one_htlc_switch_iteration(self) -> None:
   1884         """Waits until the HTLC switch does a full iteration or the peer disconnects,
   1885         whichever happens first.
   1886         """
   1887         async def htlc_switch_iteration():
   1888             await self._htlc_switch_iterstart_event.wait()
   1889             await self._htlc_switch_iterdone_event.wait()
   1890 
   1891         async with TaskGroup(wait=any) as group:
   1892             await group.spawn(htlc_switch_iteration())
   1893             await group.spawn(self.got_disconnected.wait())
   1894 
   1895     async def process_unfulfilled_htlc(
   1896             self, *,
   1897             chan: Channel,
   1898             htlc: UpdateAddHtlc,
   1899             forwarding_info: Tuple[str, int],
   1900             onion_packet_bytes: bytes,
   1901             onion_packet: OnionPacket) -> Tuple[Optional[bytes], Union[bool, None, Tuple[str, int]], Optional[bytes]]:
   1902         """
   1903         return (preimage, fw_info, error_bytes) with at most a single element that is not None
   1904         raise an OnionRoutingFailure if we need to fail the htlc
   1905         """
   1906         payment_hash = htlc.payment_hash
   1907         processed_onion = self.process_onion_packet(
   1908             onion_packet,
   1909             payment_hash=payment_hash,
   1910             onion_packet_bytes=onion_packet_bytes)
   1911         if processed_onion.are_we_final:
   1912             preimage, trampoline_onion_packet = self.maybe_fulfill_htlc(
   1913                 chan=chan,
   1914                 htlc=htlc,
   1915                 processed_onion=processed_onion)
   1916             # trampoline forwarding
   1917             if trampoline_onion_packet:
   1918                 if not forwarding_info:
   1919                     trampoline_onion = self.process_onion_packet(
   1920                         trampoline_onion_packet,
   1921                         payment_hash=htlc.payment_hash,
   1922                         onion_packet_bytes=onion_packet_bytes,
   1923                         is_trampoline=True)
   1924                     if trampoline_onion.are_we_final:
   1925                         preimage, _ = self.maybe_fulfill_htlc(
   1926                             chan=chan,
   1927                             htlc=htlc,
   1928                             processed_onion=trampoline_onion,
   1929                             is_trampoline=True)
   1930                     else:
   1931                         await self.lnworker.enable_htlc_forwarding.wait()
   1932                         self.maybe_forward_trampoline(
   1933                             chan=chan,
   1934                             htlc=htlc,
   1935                             trampoline_onion=trampoline_onion)
   1936                         # return True so that this code gets executed only once
   1937                         return None, True, None
   1938                 else:
   1939                     preimage = self.lnworker.get_preimage(payment_hash)
   1940                     error_reason = self.lnworker.trampoline_forwarding_failures.pop(payment_hash, None)
   1941                     if error_reason:
   1942                         self.logger.info(f'trampoline forwarding failure: {error_reason.code_name()}')
   1943                         raise error_reason
   1944 
   1945         elif not forwarding_info:
   1946             await self.lnworker.enable_htlc_forwarding.wait()
   1947             next_chan_id, next_htlc_id = self.maybe_forward_htlc(
   1948                 htlc=htlc,
   1949                 processed_onion=processed_onion)
   1950             fw_info = (next_chan_id.hex(), next_htlc_id)
   1951             return None, fw_info, None
   1952         else:
   1953             preimage = self.lnworker.get_preimage(payment_hash)
   1954             next_chan_id_hex, htlc_id = forwarding_info
   1955             next_chan = self.lnworker.get_channel_by_short_id(bytes.fromhex(next_chan_id_hex))
   1956             if next_chan:
   1957                 error_bytes, error_reason = next_chan.pop_fail_htlc_reason(htlc_id)
   1958                 if error_bytes:
   1959                     return None, None, error_bytes
   1960                 if error_reason:
   1961                     raise error_reason
   1962         if preimage:
   1963             return preimage, None, None
   1964         return None, None, None
   1965 
   1966     def process_onion_packet(
   1967             self,
   1968             onion_packet: OnionPacket, *,
   1969             payment_hash: bytes,
   1970             onion_packet_bytes: bytes,
   1971             is_trampoline: bool = False) -> ProcessedOnionPacket:
   1972 
   1973         failure_data = sha256(onion_packet_bytes)
   1974         try:
   1975             processed_onion = process_onion_packet(
   1976                 onion_packet,
   1977                 associated_data=payment_hash,
   1978                 our_onion_private_key=self.privkey,
   1979                 is_trampoline=is_trampoline)
   1980         except UnsupportedOnionPacketVersion:
   1981             raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_VERSION, data=failure_data)
   1982         except InvalidOnionPubkey:
   1983             raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_KEY, data=failure_data)
   1984         except InvalidOnionMac:
   1985             raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_HMAC, data=failure_data)
   1986         except Exception as e:
   1987             self.logger.info(f"error processing onion packet: {e!r}")
   1988             raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_VERSION, data=failure_data)
   1989         if self.network.config.get('test_fail_malformed_htlc'):
   1990             raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_VERSION, data=failure_data)
   1991         if self.network.config.get('test_fail_htlcs_with_temp_node_failure'):
   1992             raise OnionRoutingFailure(code=OnionFailureCode.TEMPORARY_NODE_FAILURE, data=b'')
   1993         return processed_onion