lnpeer.py (101473B)
1 #!/usr/bin/env python3 2 # 3 # Copyright (C) 2018 The Electrum developers 4 # Distributed under the MIT software license, see the accompanying 5 # file LICENCE or http://www.opensource.org/licenses/mit-license.php 6 7 import zlib 8 from collections import OrderedDict, defaultdict 9 import asyncio 10 import os 11 import time 12 from typing import Tuple, Dict, TYPE_CHECKING, Optional, Union, Set 13 from datetime import datetime 14 import functools 15 16 import aiorpcx 17 from aiorpcx import TaskGroup 18 19 from .crypto import sha256, sha256d 20 from . import bitcoin, util 21 from . import ecc 22 from .ecc import sig_string_from_r_and_s, der_sig_from_sig_string 23 from . import constants 24 from .util import (bh2u, bfh, log_exceptions, ignore_exceptions, chunks, SilentTaskGroup, 25 UnrelatedTransactionException) 26 from . import transaction 27 from .transaction import PartialTxOutput, match_script_against_template 28 from .logging import Logger 29 from .lnonion import (new_onion_packet, OnionFailureCode, calc_hops_data_for_payment, 30 process_onion_packet, OnionPacket, construct_onion_error, OnionRoutingFailure, 31 ProcessedOnionPacket, UnsupportedOnionPacketVersion, InvalidOnionMac, InvalidOnionPubkey, 32 OnionFailureCodeMetaFlag) 33 from .lnchannel import Channel, RevokeAndAck, RemoteCtnTooFarInFuture, ChannelState, PeerState 34 from . import lnutil 35 from .lnutil import (Outpoint, LocalConfig, RECEIVED, UpdateAddHtlc, 36 RemoteConfig, OnlyPubkeyKeypair, ChannelConstraints, RevocationStore, 37 funding_output_script, get_per_commitment_secret_from_seed, 38 secret_to_pubkey, PaymentFailure, LnFeatures, 39 LOCAL, REMOTE, HTLCOwner, 40 ln_compare_features, privkey_to_pubkey, MIN_FINAL_CLTV_EXPIRY_ACCEPTED, 41 LightningPeerConnectionClosed, HandshakeFailed, 42 RemoteMisbehaving, ShortChannelID, 43 IncompatibleLightningFeatures, derive_payment_secret_from_payment_preimage, 44 LN_MAX_FUNDING_SAT, calc_fees_for_commitment_tx, 45 UpfrontShutdownScriptViolation) 46 from .lnutil import FeeUpdate, channel_id_from_funding_tx 47 from .lntransport import LNTransport, LNTransportBase 48 from .lnmsg import encode_msg, decode_msg 49 from .interface import GracefulDisconnect 50 from .lnrouter import fee_for_edge_msat 51 from .lnutil import ln_dummy_address 52 from .json_db import StoredDict 53 from .invoices import PR_PAID 54 55 if TYPE_CHECKING: 56 from .lnworker import LNGossip, LNWallet 57 from .lnrouter import LNPaymentRoute 58 from .transaction import PartialTransaction 59 60 61 LN_P2P_NETWORK_TIMEOUT = 20 62 63 64 class Peer(Logger): 65 LOGGING_SHORTCUT = 'P' 66 67 def __init__( 68 self, 69 lnworker: Union['LNGossip', 'LNWallet'], 70 pubkey: bytes, 71 transport: LNTransportBase, 72 *, is_channel_backup= False): 73 74 self.is_channel_backup = is_channel_backup 75 self._sent_init = False # type: bool 76 self._received_init = False # type: bool 77 self.initialized = asyncio.Future() 78 self.got_disconnected = asyncio.Event() 79 self.querying = asyncio.Event() 80 self.transport = transport 81 self.pubkey = pubkey # remote pubkey 82 self.lnworker = lnworker 83 self.privkey = self.transport.privkey # local privkey 84 self.features = self.lnworker.features # type: LnFeatures 85 self.their_features = LnFeatures(0) # type: LnFeatures 86 self.node_ids = [self.pubkey, privkey_to_pubkey(self.privkey)] 87 assert self.node_ids[0] != self.node_ids[1] 88 self.network = lnworker.network 89 self.ping_time = 0 90 self.reply_channel_range = asyncio.Queue() 91 # gossip uses a single queue to preserve message order 92 self.gossip_queue = asyncio.Queue() 93 self.ordered_messages = ['accept_channel', 'funding_signed', 'funding_created', 'accept_channel', 'channel_reestablish', 'closing_signed'] 94 self.ordered_message_queues = defaultdict(asyncio.Queue) # for messsage that are ordered 95 self.temp_id_to_id = {} # to forward error messages 96 self.funding_created_sent = set() # for channels in PREOPENING 97 self.funding_signed_sent = set() # for channels in PREOPENING 98 self.shutdown_received = {} # chan_id -> asyncio.Future() 99 self.announcement_signatures = defaultdict(asyncio.Queue) 100 self.orphan_channel_updates = OrderedDict() 101 Logger.__init__(self) 102 self.taskgroup = SilentTaskGroup() 103 # HTLCs offered by REMOTE, that we started removing but are still active: 104 self.received_htlcs_pending_removal = set() # type: Set[Tuple[Channel, int]] 105 self.received_htlc_removed_event = asyncio.Event() 106 self._htlc_switch_iterstart_event = asyncio.Event() 107 self._htlc_switch_iterdone_event = asyncio.Event() 108 109 def send_message(self, message_name: str, **kwargs): 110 assert type(message_name) is str 111 self.logger.debug(f"Sending {message_name.upper()}") 112 if message_name.upper() != "INIT" and not self.is_initialized(): 113 raise Exception("tried to send message before we are initialized") 114 raw_msg = encode_msg(message_name, **kwargs) 115 self._store_raw_msg_if_local_update(raw_msg, message_name=message_name, channel_id=kwargs.get("channel_id")) 116 self.transport.send_bytes(raw_msg) 117 118 def _store_raw_msg_if_local_update(self, raw_msg: bytes, *, message_name: str, channel_id: Optional[bytes]): 119 is_commitment_signed = message_name == "commitment_signed" 120 if not (message_name.startswith("update_") or is_commitment_signed): 121 return 122 assert channel_id 123 chan = self.get_channel_by_id(channel_id) 124 if not chan: 125 raise Exception(f"channel {channel_id.hex()} not found for peer {self.pubkey.hex()}") 126 chan.hm.store_local_update_raw_msg(raw_msg, is_commitment_signed=is_commitment_signed) 127 if is_commitment_signed: 128 # saving now, to ensure replaying updates works (in case of channel reestablishment) 129 self.lnworker.save_channel(chan) 130 131 def maybe_set_initialized(self): 132 if self.initialized.done(): 133 return 134 if self._sent_init and self._received_init: 135 self.initialized.set_result(True) 136 137 def is_initialized(self) -> bool: 138 return (self.initialized.done() 139 and not self.initialized.cancelled() 140 and self.initialized.exception() is None 141 and self.initialized.result() is True) 142 143 async def initialize(self): 144 if isinstance(self.transport, LNTransport): 145 await self.transport.handshake() 146 features = self.features.for_init_message() 147 b = int.bit_length(features) 148 flen = b // 8 + int(bool(b % 8)) 149 self.send_message( 150 "init", gflen=0, flen=flen, 151 features=features, 152 init_tlvs={ 153 'networks': 154 {'chains': constants.net.rev_genesis_bytes()} 155 }) 156 self._sent_init = True 157 self.maybe_set_initialized() 158 159 @property 160 def channels(self) -> Dict[bytes, Channel]: 161 return self.lnworker.channels_for_peer(self.pubkey) 162 163 def get_channel_by_id(self, channel_id: bytes) -> Optional[Channel]: 164 # note: this is faster than self.channels.get(channel_id) 165 chan = self.lnworker.get_channel_by_id(channel_id) 166 if not chan: 167 return None 168 if chan.node_id != self.pubkey: 169 return None 170 return chan 171 172 def diagnostic_name(self): 173 return self.lnworker.__class__.__name__ + ', ' + self.transport.name() 174 175 def ping_if_required(self): 176 if time.time() - self.ping_time > 120: 177 self.send_message('ping', num_pong_bytes=4, byteslen=4) 178 self.ping_time = time.time() 179 180 def process_message(self, message): 181 message_type, payload = decode_msg(message) 182 # only process INIT if we are a backup 183 if self.is_channel_backup is True and message_type != 'init': 184 return 185 if message_type in self.ordered_messages: 186 chan_id = payload.get('channel_id') or payload["temporary_channel_id"] 187 self.ordered_message_queues[chan_id].put_nowait((message_type, payload)) 188 else: 189 if message_type != 'error' and 'channel_id' in payload: 190 chan = self.get_channel_by_id(payload['channel_id']) 191 if chan is None: 192 raise Exception('Got unknown '+ message_type) 193 args = (chan, payload) 194 else: 195 args = (payload,) 196 try: 197 f = getattr(self, 'on_' + message_type) 198 except AttributeError: 199 #self.logger.info("Received '%s'" % message_type.upper(), payload) 200 return 201 # raw message is needed to check signature 202 if message_type in ['node_announcement', 'channel_announcement', 'channel_update']: 203 payload['raw'] = message 204 execution_result = f(*args) 205 if asyncio.iscoroutinefunction(f): 206 asyncio.ensure_future(self.taskgroup.spawn(execution_result)) 207 208 def on_error(self, payload): 209 self.logger.info(f"remote peer sent error [DO NOT TRUST THIS MESSAGE]: {payload['data'].decode('ascii')}") 210 chan_id = payload.get("channel_id") 211 if chan_id in self.temp_id_to_id: 212 chan_id = self.temp_id_to_id[chan_id] 213 self.ordered_message_queues[chan_id].put_nowait((None, {'error':payload['data']})) 214 215 def on_ping(self, payload): 216 l = payload['num_pong_bytes'] 217 self.send_message('pong', byteslen=l) 218 219 def on_pong(self, payload): 220 pass 221 222 async def wait_for_message(self, expected_name, channel_id): 223 q = self.ordered_message_queues[channel_id] 224 name, payload = await asyncio.wait_for(q.get(), LN_P2P_NETWORK_TIMEOUT) 225 if payload.get('error'): 226 raise Exception('Remote peer reported error [DO NOT TRUST THIS MESSAGE]: ' + repr(payload.get('error'))) 227 if name != expected_name: 228 raise Exception(f"Received unexpected '{name}'") 229 return payload 230 231 def on_init(self, payload): 232 if self._received_init: 233 self.logger.info("ALREADY INITIALIZED BUT RECEIVED INIT") 234 return 235 self.their_features = LnFeatures(int.from_bytes(payload['features'], byteorder="big")) 236 their_globalfeatures = int.from_bytes(payload['globalfeatures'], byteorder="big") 237 self.their_features |= their_globalfeatures 238 # check transitive dependencies for received features 239 if not self.their_features.validate_transitive_dependencies(): 240 raise GracefulDisconnect("remote did not set all dependencies for the features they sent") 241 # check if features are compatible, and set self.features to what we negotiated 242 try: 243 self.features = ln_compare_features(self.features, self.their_features) 244 except IncompatibleLightningFeatures as e: 245 self.initialized.set_exception(e) 246 raise GracefulDisconnect(f"{str(e)}") 247 # check that they are on the same chain as us, if provided 248 their_networks = payload["init_tlvs"].get("networks") 249 if their_networks: 250 their_chains = list(chunks(their_networks["chains"], 32)) 251 if constants.net.rev_genesis_bytes() not in their_chains: 252 raise GracefulDisconnect(f"no common chain found with remote. (they sent: {their_chains})") 253 # all checks passed 254 self.lnworker.on_peer_successfully_established(self) 255 self._received_init = True 256 self.maybe_set_initialized() 257 258 def on_node_announcement(self, payload): 259 if self.lnworker.channel_db: 260 self.gossip_queue.put_nowait(('node_announcement', payload)) 261 262 def on_channel_announcement(self, payload): 263 if self.lnworker.channel_db: 264 self.gossip_queue.put_nowait(('channel_announcement', payload)) 265 266 def on_channel_update(self, payload): 267 self.maybe_save_remote_update(payload) 268 if self.lnworker.channel_db: 269 self.gossip_queue.put_nowait(('channel_update', payload)) 270 271 def maybe_save_remote_update(self, payload): 272 if not self.channels: 273 return 274 for chan in self.channels.values(): 275 if chan.short_channel_id == payload['short_channel_id']: 276 chan.set_remote_update(payload['raw']) 277 self.logger.info("saved remote_update") 278 break 279 else: 280 # Save (some bounded number of) orphan channel updates for later 281 # as it might be for our own direct channel with this peer 282 # (and we might not yet know the short channel id for that) 283 # Background: this code is here to deal with a bug in LND, 284 # see https://github.com/lightningnetwork/lnd/issues/3651 285 # and https://github.com/lightningnetwork/lightning-rfc/pull/657 286 # This code assumes gossip_queries is set. BOLT7: "if the 287 # gossip_queries feature is negotiated, [a node] MUST NOT 288 # send gossip it did not generate itself" 289 short_channel_id = ShortChannelID(payload['short_channel_id']) 290 self.logger.info(f'received orphan channel update {short_channel_id}') 291 self.orphan_channel_updates[short_channel_id] = payload 292 while len(self.orphan_channel_updates) > 25: 293 self.orphan_channel_updates.popitem(last=False) 294 295 def on_announcement_signatures(self, chan: Channel, payload): 296 if chan.config[LOCAL].was_announced: 297 h, local_node_sig, local_bitcoin_sig = self.send_announcement_signatures(chan) 298 else: 299 self.announcement_signatures[chan.channel_id].put_nowait(payload) 300 301 def handle_disconnect(func): 302 @functools.wraps(func) 303 async def wrapper_func(self, *args, **kwargs): 304 try: 305 return await func(self, *args, **kwargs) 306 except GracefulDisconnect as e: 307 self.logger.log(e.log_level, f"Disconnecting: {repr(e)}") 308 except (LightningPeerConnectionClosed, IncompatibleLightningFeatures, 309 aiorpcx.socks.SOCKSError) as e: 310 self.logger.info(f"Disconnecting: {repr(e)}") 311 finally: 312 self.close_and_cleanup() 313 return wrapper_func 314 315 @ignore_exceptions # do not kill outer taskgroup 316 @log_exceptions 317 @handle_disconnect 318 async def main_loop(self): 319 async with self.taskgroup as group: 320 await group.spawn(self._message_loop()) 321 await group.spawn(self.htlc_switch()) 322 await group.spawn(self.query_gossip()) 323 await group.spawn(self.process_gossip()) 324 325 async def process_gossip(self): 326 while True: 327 await asyncio.sleep(5) 328 if not self.network.lngossip: 329 continue 330 chan_anns = [] 331 chan_upds = [] 332 node_anns = [] 333 while True: 334 name, payload = await self.gossip_queue.get() 335 if name == 'channel_announcement': 336 chan_anns.append(payload) 337 elif name == 'channel_update': 338 chan_upds.append(payload) 339 elif name == 'node_announcement': 340 node_anns.append(payload) 341 else: 342 raise Exception('unknown message') 343 if self.gossip_queue.empty(): 344 break 345 # verify in peer's TaskGroup so that we fail the connection 346 self.verify_channel_announcements(chan_anns) 347 self.verify_node_announcements(node_anns) 348 if self.network.lngossip: 349 await self.network.lngossip.process_gossip(chan_anns, node_anns, chan_upds) 350 351 def verify_channel_announcements(self, chan_anns): 352 for payload in chan_anns: 353 h = sha256d(payload['raw'][2+256:]) 354 pubkeys = [payload['node_id_1'], payload['node_id_2'], payload['bitcoin_key_1'], payload['bitcoin_key_2']] 355 sigs = [payload['node_signature_1'], payload['node_signature_2'], payload['bitcoin_signature_1'], payload['bitcoin_signature_2']] 356 for pubkey, sig in zip(pubkeys, sigs): 357 if not ecc.verify_signature(pubkey, sig, h): 358 raise Exception('signature failed') 359 360 def verify_node_announcements(self, node_anns): 361 for payload in node_anns: 362 pubkey = payload['node_id'] 363 signature = payload['signature'] 364 h = sha256d(payload['raw'][66:]) 365 if not ecc.verify_signature(pubkey, signature, h): 366 raise Exception('signature failed') 367 368 async def query_gossip(self): 369 try: 370 await asyncio.wait_for(self.initialized, LN_P2P_NETWORK_TIMEOUT) 371 except Exception as e: 372 raise GracefulDisconnect(f"Failed to initialize: {e!r}") from e 373 if self.lnworker == self.lnworker.network.lngossip: 374 try: 375 ids, complete = await asyncio.wait_for(self.get_channel_range(), LN_P2P_NETWORK_TIMEOUT) 376 except asyncio.TimeoutError as e: 377 raise GracefulDisconnect("query_channel_range timed out") from e 378 self.logger.info('Received {} channel ids. (complete: {})'.format(len(ids), complete)) 379 await self.lnworker.add_new_ids(ids) 380 while True: 381 todo = self.lnworker.get_ids_to_query() 382 if not todo: 383 await asyncio.sleep(1) 384 continue 385 await self.get_short_channel_ids(todo) 386 387 async def get_channel_range(self): 388 first_block = constants.net.BLOCK_HEIGHT_FIRST_LIGHTNING_CHANNELS 389 num_blocks = self.lnworker.network.get_local_height() - first_block 390 self.query_channel_range(first_block, num_blocks) 391 intervals = [] 392 ids = set() 393 # note: implementations behave differently... 394 # "sane implementation that follows BOLT-07" example: 395 # query_channel_range. <<< first_block 497000, num_blocks 79038 396 # on_reply_channel_range. >>> first_block 497000, num_blocks 39516, num_ids 4648, complete True 397 # on_reply_channel_range. >>> first_block 536516, num_blocks 19758, num_ids 5734, complete True 398 # on_reply_channel_range. >>> first_block 556274, num_blocks 9879, num_ids 13712, complete True 399 # on_reply_channel_range. >>> first_block 566153, num_blocks 9885, num_ids 18114, complete True 400 # lnd example: 401 # query_channel_range. <<< first_block 497000, num_blocks 79038 402 # on_reply_channel_range. >>> first_block 497000, num_blocks 79038, num_ids 8000, complete False 403 # on_reply_channel_range. >>> first_block 497000, num_blocks 79038, num_ids 8000, complete False 404 # on_reply_channel_range. >>> first_block 497000, num_blocks 79038, num_ids 8000, complete False 405 # on_reply_channel_range. >>> first_block 497000, num_blocks 79038, num_ids 8000, complete False 406 # on_reply_channel_range. >>> first_block 497000, num_blocks 79038, num_ids 5344, complete True 407 while True: 408 index, num, complete, _ids = await self.reply_channel_range.get() 409 ids.update(_ids) 410 intervals.append((index, index+num)) 411 intervals.sort() 412 while len(intervals) > 1: 413 a,b = intervals[0] 414 c,d = intervals[1] 415 if not (a <= c and a <= b and c <= d): 416 raise Exception(f"insane reply_channel_range intervals {(a,b,c,d)}") 417 if b >= c: 418 intervals = [(a,d)] + intervals[2:] 419 else: 420 break 421 if len(intervals) == 1 and complete: 422 a, b = intervals[0] 423 if a <= first_block and b >= first_block + num_blocks: 424 break 425 return ids, complete 426 427 def request_gossip(self, timestamp=0): 428 if timestamp == 0: 429 self.logger.info('requesting whole channel graph') 430 else: 431 self.logger.info(f'requesting channel graph since {datetime.fromtimestamp(timestamp).ctime()}') 432 self.send_message( 433 'gossip_timestamp_filter', 434 chain_hash=constants.net.rev_genesis_bytes(), 435 first_timestamp=timestamp, 436 timestamp_range=b'\xff'*4) 437 438 def query_channel_range(self, first_block, num_blocks): 439 self.logger.info(f'query channel range {first_block} {num_blocks}') 440 self.send_message( 441 'query_channel_range', 442 chain_hash=constants.net.rev_genesis_bytes(), 443 first_blocknum=first_block, 444 number_of_blocks=num_blocks) 445 446 def decode_short_ids(self, encoded): 447 if encoded[0] == 0: 448 decoded = encoded[1:] 449 elif encoded[0] == 1: 450 decoded = zlib.decompress(encoded[1:]) 451 else: 452 raise Exception(f'decode_short_ids: unexpected first byte: {encoded[0]}') 453 ids = [decoded[i:i+8] for i in range(0, len(decoded), 8)] 454 return ids 455 456 def on_reply_channel_range(self, payload): 457 first = payload['first_blocknum'] 458 num = payload['number_of_blocks'] 459 complete = bool(int.from_bytes(payload['complete'], 'big')) 460 encoded = payload['encoded_short_ids'] 461 ids = self.decode_short_ids(encoded) 462 #self.logger.info(f"on_reply_channel_range. >>> first_block {first}, num_blocks {num}, num_ids {len(ids)}, complete {repr(payload['complete'])}") 463 self.reply_channel_range.put_nowait((first, num, complete, ids)) 464 465 async def get_short_channel_ids(self, ids): 466 self.logger.info(f'Querying {len(ids)} short_channel_ids') 467 assert not self.querying.is_set() 468 self.query_short_channel_ids(ids) 469 await self.querying.wait() 470 self.querying.clear() 471 472 def query_short_channel_ids(self, ids, compressed=True): 473 ids = sorted(ids) 474 s = b''.join(ids) 475 encoded = zlib.compress(s) if compressed else s 476 prefix = b'\x01' if compressed else b'\x00' 477 self.send_message( 478 'query_short_channel_ids', 479 chain_hash=constants.net.rev_genesis_bytes(), 480 len=1+len(encoded), 481 encoded_short_ids=prefix+encoded) 482 483 async def _message_loop(self): 484 try: 485 await asyncio.wait_for(self.initialize(), LN_P2P_NETWORK_TIMEOUT) 486 except (OSError, asyncio.TimeoutError, HandshakeFailed) as e: 487 raise GracefulDisconnect(f'initialize failed: {repr(e)}') from e 488 async for msg in self.transport.read_messages(): 489 self.process_message(msg) 490 await asyncio.sleep(.01) 491 492 def on_reply_short_channel_ids_end(self, payload): 493 self.querying.set() 494 495 def close_and_cleanup(self): 496 try: 497 if self.transport: 498 self.transport.close() 499 except: 500 pass 501 self.lnworker.peer_closed(self) 502 self.got_disconnected.set() 503 504 def is_static_remotekey(self): 505 return self.features.supports(LnFeatures.OPTION_STATIC_REMOTEKEY_OPT) 506 507 def is_upfront_shutdown_script(self): 508 return self.features.supports(LnFeatures.OPTION_UPFRONT_SHUTDOWN_SCRIPT_OPT) 509 510 def upfront_shutdown_script_from_payload(self, payload, msg_identifier: str) -> Optional[bytes]: 511 if msg_identifier not in ['accept', 'open']: 512 raise ValueError("msg_identifier must be either 'accept' or 'open'") 513 514 uss_tlv = payload[msg_identifier + '_channel_tlvs'].get( 515 'upfront_shutdown_script') 516 517 if uss_tlv and self.is_upfront_shutdown_script(): 518 upfront_shutdown_script = uss_tlv['shutdown_scriptpubkey'] 519 else: 520 upfront_shutdown_script = b'' 521 self.logger.info(f"upfront shutdown script received: {upfront_shutdown_script}") 522 return upfront_shutdown_script 523 524 def make_local_config(self, funding_sat: int, push_msat: int, initiator: HTLCOwner) -> LocalConfig: 525 channel_seed = os.urandom(32) 526 initial_msat = funding_sat * 1000 - push_msat if initiator == LOCAL else push_msat 527 528 static_remotekey = None 529 # sending empty bytes as the upfront_shutdown_script will give us the 530 # flexibility to decide an address at closing time 531 upfront_shutdown_script = b'' 532 533 if self.is_static_remotekey(): 534 wallet = self.lnworker.wallet 535 assert wallet.txin_type == 'p2wpkh' 536 addr = wallet.get_new_sweep_address_for_channel() 537 static_remotekey = bfh(wallet.get_public_key(addr)) 538 else: 539 static_remotekey = None 540 dust_limit_sat = bitcoin.DUST_LIMIT_DEFAULT_SAT_LEGACY 541 reserve_sat = max(funding_sat // 100, dust_limit_sat) 542 # for comparison of defaults, see 543 # https://github.com/ACINQ/eclair/blob/afa378fbb73c265da44856b4ad0f2128a88ae6c6/eclair-core/src/main/resources/reference.conf#L66 544 # https://github.com/ElementsProject/lightning/blob/0056dd75572a8857cff36fcbdb1a2295a1ac9253/lightningd/options.c#L657 545 # https://github.com/lightningnetwork/lnd/blob/56b61078c5b2be007d318673a5f3b40c6346883a/config.go#L81 546 local_config = LocalConfig.from_seed( 547 channel_seed=channel_seed, 548 static_remotekey=static_remotekey, 549 upfront_shutdown_script=upfront_shutdown_script, 550 to_self_delay=self.network.config.get('lightning_to_self_delay', 7 * 144), 551 dust_limit_sat=dust_limit_sat, 552 max_htlc_value_in_flight_msat=funding_sat * 1000, 553 max_accepted_htlcs=30, 554 initial_msat=initial_msat, 555 reserve_sat=reserve_sat, 556 funding_locked_received=False, 557 was_announced=False, 558 current_commitment_signature=None, 559 current_htlc_signatures=b'', 560 htlc_minimum_msat=1, 561 ) 562 local_config.validate_params(funding_sat=funding_sat) 563 return local_config 564 565 def temporarily_reserve_funding_tx_change_address(func): 566 # During the channel open flow, if we initiated, we might have used a change address 567 # of ours in the funding tx. The funding tx is not part of the wallet history 568 # at that point yet, but we should already consider this change address as 'used'. 569 @functools.wraps(func) 570 async def wrapper(self: 'Peer', *args, **kwargs): 571 funding_tx = kwargs['funding_tx'] # type: PartialTransaction 572 wallet = self.lnworker.wallet 573 change_addresses = [txout.address for txout in funding_tx.outputs() 574 if wallet.is_change(txout.address)] 575 for addr in change_addresses: 576 wallet.set_reserved_state_of_address(addr, reserved=True) 577 try: 578 return await func(self, *args, **kwargs) 579 finally: 580 for addr in change_addresses: 581 self.lnworker.wallet.set_reserved_state_of_address(addr, reserved=False) 582 return wrapper 583 584 @log_exceptions 585 @temporarily_reserve_funding_tx_change_address 586 async def channel_establishment_flow( 587 self, *, 588 funding_tx: 'PartialTransaction', 589 funding_sat: int, 590 push_msat: int, 591 temp_channel_id: bytes 592 ) -> Tuple[Channel, 'PartialTransaction']: 593 """Implements the channel opening flow. 594 595 -> open_channel message 596 <- accept_channel message 597 -> funding_created message 598 <- funding_signed message 599 600 Channel configurations are initialized in this method. 601 """ 602 # will raise if init fails 603 await asyncio.wait_for(self.initialized, LN_P2P_NETWORK_TIMEOUT) 604 # trampoline is not yet in features 605 if not self.lnworker.channel_db and not self.lnworker.is_trampoline_peer(self.pubkey): 606 raise Exception('Not a trampoline node: ' + str(self.their_features)) 607 608 feerate = self.lnworker.current_feerate_per_kw() 609 local_config = self.make_local_config(funding_sat, push_msat, LOCAL) 610 611 if funding_sat > LN_MAX_FUNDING_SAT: 612 raise Exception( 613 f"MUST set funding_satoshis to less than 2^24 satoshi. " 614 f"{funding_sat} sat > {LN_MAX_FUNDING_SAT}") 615 if push_msat > 1000 * funding_sat: 616 raise Exception( 617 f"MUST set push_msat to equal or less than 1000 * funding_satoshis: " 618 f"{push_msat} msat > {1000 * funding_sat} msat") 619 if funding_sat < lnutil.MIN_FUNDING_SAT: 620 raise Exception(f"funding_sat too low: {funding_sat} < {lnutil.MIN_FUNDING_SAT}") 621 622 # for the first commitment transaction 623 per_commitment_secret_first = get_per_commitment_secret_from_seed( 624 local_config.per_commitment_secret_seed, 625 RevocationStore.START_INDEX 626 ) 627 per_commitment_point_first = secret_to_pubkey( 628 int.from_bytes(per_commitment_secret_first, 'big')) 629 self.send_message( 630 "open_channel", 631 temporary_channel_id=temp_channel_id, 632 chain_hash=constants.net.rev_genesis_bytes(), 633 funding_satoshis=funding_sat, 634 push_msat=push_msat, 635 dust_limit_satoshis=local_config.dust_limit_sat, 636 feerate_per_kw=feerate, 637 max_accepted_htlcs=local_config.max_accepted_htlcs, 638 funding_pubkey=local_config.multisig_key.pubkey, 639 revocation_basepoint=local_config.revocation_basepoint.pubkey, 640 htlc_basepoint=local_config.htlc_basepoint.pubkey, 641 payment_basepoint=local_config.payment_basepoint.pubkey, 642 delayed_payment_basepoint=local_config.delayed_basepoint.pubkey, 643 first_per_commitment_point=per_commitment_point_first, 644 to_self_delay=local_config.to_self_delay, 645 max_htlc_value_in_flight_msat=local_config.max_htlc_value_in_flight_msat, 646 channel_flags=0x00, # not willing to announce channel 647 channel_reserve_satoshis=local_config.reserve_sat, 648 htlc_minimum_msat=local_config.htlc_minimum_msat, 649 open_channel_tlvs={ 650 'upfront_shutdown_script': 651 {'shutdown_scriptpubkey': local_config.upfront_shutdown_script} 652 } 653 ) 654 655 # <- accept_channel 656 payload = await self.wait_for_message('accept_channel', temp_channel_id) 657 remote_per_commitment_point = payload['first_per_commitment_point'] 658 funding_txn_minimum_depth = payload['minimum_depth'] 659 if funding_txn_minimum_depth <= 0: 660 raise Exception(f"minimum depth too low, {funding_txn_minimum_depth}") 661 if funding_txn_minimum_depth > 30: 662 raise Exception(f"minimum depth too high, {funding_txn_minimum_depth}") 663 664 upfront_shutdown_script = self.upfront_shutdown_script_from_payload( 665 payload, 'accept') 666 667 remote_config = RemoteConfig( 668 payment_basepoint=OnlyPubkeyKeypair(payload['payment_basepoint']), 669 multisig_key=OnlyPubkeyKeypair(payload["funding_pubkey"]), 670 htlc_basepoint=OnlyPubkeyKeypair(payload['htlc_basepoint']), 671 delayed_basepoint=OnlyPubkeyKeypair(payload['delayed_payment_basepoint']), 672 revocation_basepoint=OnlyPubkeyKeypair(payload['revocation_basepoint']), 673 to_self_delay=payload['to_self_delay'], 674 dust_limit_sat=payload['dust_limit_satoshis'], 675 max_htlc_value_in_flight_msat=payload['max_htlc_value_in_flight_msat'], 676 max_accepted_htlcs=payload["max_accepted_htlcs"], 677 initial_msat=push_msat, 678 reserve_sat=payload["channel_reserve_satoshis"], 679 htlc_minimum_msat=payload['htlc_minimum_msat'], 680 next_per_commitment_point=remote_per_commitment_point, 681 current_per_commitment_point=None, 682 upfront_shutdown_script=upfront_shutdown_script 683 ) 684 remote_config.validate_params(funding_sat=funding_sat) 685 # if channel_reserve_satoshis is less than dust_limit_satoshis within the open_channel message: 686 # MUST reject the channel. 687 if remote_config.reserve_sat < local_config.dust_limit_sat: 688 raise Exception("violated constraint: remote_config.reserve_sat < local_config.dust_limit_sat") 689 # if channel_reserve_satoshis from the open_channel message is less than dust_limit_satoshis: 690 # MUST reject the channel. 691 if local_config.reserve_sat < remote_config.dust_limit_sat: 692 raise Exception("violated constraint: local_config.reserve_sat < remote_config.dust_limit_sat") 693 694 # -> funding created 695 # replace dummy output in funding tx 696 redeem_script = funding_output_script(local_config, remote_config) 697 funding_address = bitcoin.redeem_script_to_address('p2wsh', redeem_script) 698 funding_output = PartialTxOutput.from_address_and_value(funding_address, funding_sat) 699 dummy_output = PartialTxOutput.from_address_and_value(ln_dummy_address(), funding_sat) 700 funding_tx.outputs().remove(dummy_output) 701 funding_tx.add_outputs([funding_output]) 702 funding_tx.set_rbf(False) 703 if not funding_tx.is_segwit(): 704 raise Exception('Funding transaction is not segwit') 705 funding_txid = funding_tx.txid() 706 assert funding_txid 707 funding_index = funding_tx.outputs().index(funding_output) 708 # build remote commitment transaction 709 channel_id, funding_txid_bytes = channel_id_from_funding_tx(funding_txid, funding_index) 710 outpoint = Outpoint(funding_txid, funding_index) 711 constraints = ChannelConstraints( 712 capacity=funding_sat, 713 is_initiator=True, 714 funding_txn_minimum_depth=funding_txn_minimum_depth 715 ) 716 chan_dict = self.create_channel_storage( 717 channel_id, outpoint, local_config, remote_config, constraints) 718 chan = Channel( 719 chan_dict, 720 sweep_address=self.lnworker.sweep_address, 721 lnworker=self.lnworker, 722 initial_feerate=feerate 723 ) 724 chan.storage['funding_inputs'] = [txin.prevout.to_json() for txin in funding_tx.inputs()] 725 if isinstance(self.transport, LNTransport): 726 chan.add_or_update_peer_addr(self.transport.peer_addr) 727 sig_64, _ = chan.sign_next_commitment() 728 self.temp_id_to_id[temp_channel_id] = channel_id 729 730 self.send_message("funding_created", 731 temporary_channel_id=temp_channel_id, 732 funding_txid=funding_txid_bytes, 733 funding_output_index=funding_index, 734 signature=sig_64) 735 self.funding_created_sent.add(channel_id) 736 737 # <- funding signed 738 payload = await self.wait_for_message('funding_signed', channel_id) 739 self.logger.info('received funding_signed') 740 remote_sig = payload['signature'] 741 chan.receive_new_commitment(remote_sig, []) 742 chan.open_with_first_pcp(remote_per_commitment_point, remote_sig) 743 chan.set_state(ChannelState.OPENING) 744 self.lnworker.add_new_channel(chan) 745 return chan, funding_tx 746 747 def create_channel_storage(self, channel_id, outpoint, local_config, remote_config, constraints): 748 chan_dict = { 749 "node_id": self.pubkey.hex(), 750 "channel_id": channel_id.hex(), 751 "short_channel_id": None, 752 "funding_outpoint": outpoint, 753 "remote_config": remote_config, 754 "local_config": local_config, 755 "constraints": constraints, 756 "remote_update": None, 757 "state": ChannelState.PREOPENING.name, 758 'onion_keys': {}, 759 'data_loss_protect_remote_pcp': {}, 760 "log": {}, 761 "revocation_store": {}, 762 "static_remotekey_enabled": self.is_static_remotekey(), # stored because it cannot be "downgraded", per BOLT2 763 } 764 return StoredDict(chan_dict, self.lnworker.db if self.lnworker else None, []) 765 766 async def on_open_channel(self, payload): 767 """Implements the channel acceptance flow. 768 769 <- open_channel message 770 -> accept_channel message 771 <- funding_created message 772 -> funding_signed message 773 774 Channel configurations are initialized in this method. 775 """ 776 # <- open_channel 777 if payload['chain_hash'] != constants.net.rev_genesis_bytes(): 778 raise Exception('wrong chain_hash') 779 funding_sat = payload['funding_satoshis'] 780 push_msat = payload['push_msat'] 781 feerate = payload['feerate_per_kw'] # note: we are not validating this 782 temp_chan_id = payload['temporary_channel_id'] 783 local_config = self.make_local_config(funding_sat, push_msat, REMOTE) 784 if funding_sat > LN_MAX_FUNDING_SAT: 785 raise Exception( 786 f"MUST set funding_satoshis to less than 2^24 satoshi. " 787 f"{funding_sat} sat > {LN_MAX_FUNDING_SAT}") 788 if push_msat > 1000 * funding_sat: 789 raise Exception( 790 f"MUST set push_msat to equal or less than 1000 * funding_satoshis: " 791 f"{push_msat} msat > {1000 * funding_sat} msat") 792 if funding_sat < lnutil.MIN_FUNDING_SAT: 793 raise Exception(f"funding_sat too low: {funding_sat} < {lnutil.MIN_FUNDING_SAT}") 794 795 upfront_shutdown_script = self.upfront_shutdown_script_from_payload( 796 payload, 'open') 797 798 remote_config = RemoteConfig( 799 payment_basepoint=OnlyPubkeyKeypair(payload['payment_basepoint']), 800 multisig_key=OnlyPubkeyKeypair(payload['funding_pubkey']), 801 htlc_basepoint=OnlyPubkeyKeypair(payload['htlc_basepoint']), 802 delayed_basepoint=OnlyPubkeyKeypair(payload['delayed_payment_basepoint']), 803 revocation_basepoint=OnlyPubkeyKeypair(payload['revocation_basepoint']), 804 to_self_delay=payload['to_self_delay'], 805 dust_limit_sat=payload['dust_limit_satoshis'], 806 max_htlc_value_in_flight_msat=payload['max_htlc_value_in_flight_msat'], 807 max_accepted_htlcs=payload['max_accepted_htlcs'], 808 initial_msat=funding_sat * 1000 - push_msat, 809 reserve_sat=payload['channel_reserve_satoshis'], 810 htlc_minimum_msat=payload['htlc_minimum_msat'], 811 next_per_commitment_point=payload['first_per_commitment_point'], 812 current_per_commitment_point=None, 813 upfront_shutdown_script=upfront_shutdown_script, 814 ) 815 816 remote_config.validate_params(funding_sat=funding_sat) 817 # The receiving node MUST fail the channel if: 818 # the funder's amount for the initial commitment transaction is not 819 # sufficient for full fee payment. 820 if remote_config.initial_msat < calc_fees_for_commitment_tx( 821 num_htlcs=0, 822 feerate=feerate, 823 is_local_initiator=False)[REMOTE]: 824 raise Exception( 825 "the funder's amount for the initial commitment transaction " 826 "is not sufficient for full fee payment") 827 # The receiving node MUST fail the channel if: 828 # both to_local and to_remote amounts for the initial commitment transaction are 829 # less than or equal to channel_reserve_satoshis (see BOLT 3). 830 if (local_config.initial_msat <= 1000 * payload['channel_reserve_satoshis'] 831 and remote_config.initial_msat <= 1000 * payload['channel_reserve_satoshis']): 832 raise Exception( 833 "both to_local and to_remote amounts for the initial commitment " 834 "transaction are less than or equal to channel_reserve_satoshis") 835 # note: we ignore payload['channel_flags'], which e.g. contains 'announce_channel'. 836 # Notably if the remote sets 'announce_channel' to True, we will ignore that too, 837 # but we will not play along with actually announcing the channel (so we keep it private). 838 839 # -> accept channel 840 # for the first commitment transaction 841 per_commitment_secret_first = get_per_commitment_secret_from_seed( 842 local_config.per_commitment_secret_seed, 843 RevocationStore.START_INDEX 844 ) 845 per_commitment_point_first = secret_to_pubkey( 846 int.from_bytes(per_commitment_secret_first, 'big')) 847 min_depth = 3 848 self.send_message( 849 'accept_channel', 850 temporary_channel_id=temp_chan_id, 851 dust_limit_satoshis=local_config.dust_limit_sat, 852 max_htlc_value_in_flight_msat=local_config.max_htlc_value_in_flight_msat, 853 channel_reserve_satoshis=local_config.reserve_sat, 854 htlc_minimum_msat=local_config.htlc_minimum_msat, 855 minimum_depth=min_depth, 856 to_self_delay=local_config.to_self_delay, 857 max_accepted_htlcs=local_config.max_accepted_htlcs, 858 funding_pubkey=local_config.multisig_key.pubkey, 859 revocation_basepoint=local_config.revocation_basepoint.pubkey, 860 payment_basepoint=local_config.payment_basepoint.pubkey, 861 delayed_payment_basepoint=local_config.delayed_basepoint.pubkey, 862 htlc_basepoint=local_config.htlc_basepoint.pubkey, 863 first_per_commitment_point=per_commitment_point_first, 864 accept_channel_tlvs={ 865 'upfront_shutdown_script': 866 {'shutdown_scriptpubkey': local_config.upfront_shutdown_script} 867 } 868 ) 869 870 # <- funding created 871 funding_created = await self.wait_for_message('funding_created', temp_chan_id) 872 873 # -> funding signed 874 funding_idx = funding_created['funding_output_index'] 875 funding_txid = bh2u(funding_created['funding_txid'][::-1]) 876 channel_id, funding_txid_bytes = channel_id_from_funding_tx(funding_txid, funding_idx) 877 constraints = ChannelConstraints( 878 capacity=funding_sat, 879 is_initiator=False, 880 funding_txn_minimum_depth=min_depth 881 ) 882 outpoint = Outpoint(funding_txid, funding_idx) 883 chan_dict = self.create_channel_storage( 884 channel_id, outpoint, local_config, remote_config, constraints) 885 chan = Channel( 886 chan_dict, 887 sweep_address=self.lnworker.sweep_address, 888 lnworker=self.lnworker, 889 initial_feerate=feerate 890 ) 891 chan.storage['init_timestamp'] = int(time.time()) 892 if isinstance(self.transport, LNTransport): 893 chan.add_or_update_peer_addr(self.transport.peer_addr) 894 remote_sig = funding_created['signature'] 895 chan.receive_new_commitment(remote_sig, []) 896 sig_64, _ = chan.sign_next_commitment() 897 self.send_message('funding_signed', 898 channel_id=channel_id, 899 signature=sig_64, 900 ) 901 self.funding_signed_sent.add(chan.channel_id) 902 chan.open_with_first_pcp(payload['first_per_commitment_point'], remote_sig) 903 chan.set_state(ChannelState.OPENING) 904 self.lnworker.add_new_channel(chan) 905 906 async def trigger_force_close(self, channel_id: bytes): 907 await self.initialized 908 latest_point = secret_to_pubkey(42) # we need a valid point (BOLT2) 909 self.send_message( 910 "channel_reestablish", 911 channel_id=channel_id, 912 next_commitment_number=0, 913 next_revocation_number=0, 914 your_last_per_commitment_secret=0, 915 my_current_per_commitment_point=latest_point) 916 917 async def reestablish_channel(self, chan: Channel): 918 await self.initialized 919 chan_id = chan.channel_id 920 assert ChannelState.PREOPENING < chan.get_state() < ChannelState.FORCE_CLOSING 921 if chan.peer_state != PeerState.DISCONNECTED: 922 self.logger.info(f'reestablish_channel was called but channel {chan.get_id_for_log()} ' 923 f'already in peer_state {chan.peer_state!r}') 924 return 925 chan.peer_state = PeerState.REESTABLISHING 926 util.trigger_callback('channel', self.lnworker.wallet, chan) 927 # BOLT-02: "A node [...] upon disconnection [...] MUST reverse any uncommitted updates sent by the other side" 928 chan.hm.discard_unsigned_remote_updates() 929 # ctns 930 oldest_unrevoked_local_ctn = chan.get_oldest_unrevoked_ctn(LOCAL) 931 latest_local_ctn = chan.get_latest_ctn(LOCAL) 932 next_local_ctn = chan.get_next_ctn(LOCAL) 933 oldest_unrevoked_remote_ctn = chan.get_oldest_unrevoked_ctn(REMOTE) 934 latest_remote_ctn = chan.get_latest_ctn(REMOTE) 935 next_remote_ctn = chan.get_next_ctn(REMOTE) 936 assert self.features.supports(LnFeatures.OPTION_DATA_LOSS_PROTECT_OPT) 937 # send message 938 if chan.is_static_remotekey_enabled(): 939 latest_secret, latest_point = chan.get_secret_and_point(LOCAL, 0) 940 else: 941 latest_secret, latest_point = chan.get_secret_and_point(LOCAL, latest_local_ctn) 942 if oldest_unrevoked_remote_ctn == 0: 943 last_rev_secret = 0 944 else: 945 last_rev_index = oldest_unrevoked_remote_ctn - 1 946 last_rev_secret = chan.revocation_store.retrieve_secret(RevocationStore.START_INDEX - last_rev_index) 947 self.send_message( 948 "channel_reestablish", 949 channel_id=chan_id, 950 next_commitment_number=next_local_ctn, 951 next_revocation_number=oldest_unrevoked_remote_ctn, 952 your_last_per_commitment_secret=last_rev_secret, 953 my_current_per_commitment_point=latest_point) 954 self.logger.info(f'channel_reestablish ({chan.get_id_for_log()}): sent channel_reestablish with ' 955 f'(next_local_ctn={next_local_ctn}, ' 956 f'oldest_unrevoked_remote_ctn={oldest_unrevoked_remote_ctn})') 957 while True: 958 try: 959 msg = await self.wait_for_message('channel_reestablish', chan_id) 960 break 961 except asyncio.TimeoutError: 962 self.logger.info('waiting to receive channel_reestablish...') 963 continue 964 their_next_local_ctn = msg["next_commitment_number"] 965 their_oldest_unrevoked_remote_ctn = msg["next_revocation_number"] 966 their_local_pcp = msg.get("my_current_per_commitment_point") 967 their_claim_of_our_last_per_commitment_secret = msg.get("your_last_per_commitment_secret") 968 self.logger.info(f'channel_reestablish ({chan.get_id_for_log()}): received channel_reestablish with ' 969 f'(their_next_local_ctn={their_next_local_ctn}, ' 970 f'their_oldest_unrevoked_remote_ctn={their_oldest_unrevoked_remote_ctn})') 971 # sanity checks of received values 972 if their_next_local_ctn < 0: 973 raise RemoteMisbehaving(f"channel reestablish: their_next_local_ctn < 0") 974 if their_oldest_unrevoked_remote_ctn < 0: 975 raise RemoteMisbehaving(f"channel reestablish: their_oldest_unrevoked_remote_ctn < 0") 976 # Replay un-acked local updates (including commitment_signed) byte-for-byte. 977 # If we have sent them a commitment signature that they "lost" (due to disconnect), 978 # we need to make sure we replay the same local updates, as otherwise they could 979 # end up with two (or more) signed valid commitment transactions at the same ctn. 980 # Multiple valid ctxs at the same ctn is a major headache for pre-signing spending txns, 981 # e.g. for watchtowers, hence we must ensure these ctxs coincide. 982 # We replay the local updates even if they were not yet committed. 983 unacked = chan.hm.get_unacked_local_updates() 984 n_replayed_msgs = 0 985 for ctn, messages in unacked.items(): 986 if ctn < their_next_local_ctn: 987 # They claim to have received these messages and the corresponding 988 # commitment_signed, hence we must not replay them. 989 continue 990 for raw_upd_msg in messages: 991 self.transport.send_bytes(raw_upd_msg) 992 n_replayed_msgs += 1 993 self.logger.info(f'channel_reestablish ({chan.get_id_for_log()}): replayed {n_replayed_msgs} unacked messages') 994 995 we_are_ahead = False 996 they_are_ahead = False 997 # compare remote ctns 998 if next_remote_ctn != their_next_local_ctn: 999 if their_next_local_ctn == latest_remote_ctn and chan.hm.is_revack_pending(REMOTE): 1000 # We replayed the local updates (see above), which should have contained a commitment_signed 1001 # (due to is_revack_pending being true), and this should have remedied this situation. 1002 pass 1003 else: 1004 self.logger.warning(f"channel_reestablish ({chan.get_id_for_log()}): " 1005 f"expected remote ctn {next_remote_ctn}, got {their_next_local_ctn}") 1006 if their_next_local_ctn < next_remote_ctn: 1007 we_are_ahead = True 1008 else: 1009 they_are_ahead = True 1010 # compare local ctns 1011 if oldest_unrevoked_local_ctn != their_oldest_unrevoked_remote_ctn: 1012 if oldest_unrevoked_local_ctn - 1 == their_oldest_unrevoked_remote_ctn: 1013 # A node: 1014 # if next_revocation_number is equal to the commitment number of the last revoke_and_ack 1015 # the receiving node sent, AND the receiving node hasn't already received a closing_signed: 1016 # MUST re-send the revoke_and_ack. 1017 last_secret, last_point = chan.get_secret_and_point(LOCAL, oldest_unrevoked_local_ctn - 1) 1018 next_secret, next_point = chan.get_secret_and_point(LOCAL, oldest_unrevoked_local_ctn + 1) 1019 self.send_message( 1020 "revoke_and_ack", 1021 channel_id=chan.channel_id, 1022 per_commitment_secret=last_secret, 1023 next_per_commitment_point=next_point) 1024 else: 1025 self.logger.warning(f"channel_reestablish ({chan.get_id_for_log()}): " 1026 f"expected local ctn {oldest_unrevoked_local_ctn}, got {their_oldest_unrevoked_remote_ctn}") 1027 if their_oldest_unrevoked_remote_ctn < oldest_unrevoked_local_ctn: 1028 we_are_ahead = True 1029 else: 1030 they_are_ahead = True 1031 # option_data_loss_protect 1032 def are_datalossprotect_fields_valid() -> bool: 1033 if their_local_pcp is None or their_claim_of_our_last_per_commitment_secret is None: 1034 return False 1035 if their_oldest_unrevoked_remote_ctn > 0: 1036 our_pcs, __ = chan.get_secret_and_point(LOCAL, their_oldest_unrevoked_remote_ctn - 1) 1037 else: 1038 assert their_oldest_unrevoked_remote_ctn == 0 1039 our_pcs = bytes(32) 1040 if our_pcs != their_claim_of_our_last_per_commitment_secret: 1041 self.logger.error(f"channel_reestablish ({chan.get_id_for_log()}): " 1042 f"(DLP) local PCS mismatch: {bh2u(our_pcs)} != {bh2u(their_claim_of_our_last_per_commitment_secret)}") 1043 return False 1044 if chan.is_static_remotekey_enabled(): 1045 return True 1046 try: 1047 __, our_remote_pcp = chan.get_secret_and_point(REMOTE, their_next_local_ctn - 1) 1048 except RemoteCtnTooFarInFuture: 1049 pass 1050 else: 1051 if our_remote_pcp != their_local_pcp: 1052 self.logger.error(f"channel_reestablish ({chan.get_id_for_log()}): " 1053 f"(DLP) remote PCP mismatch: {bh2u(our_remote_pcp)} != {bh2u(their_local_pcp)}") 1054 return False 1055 return True 1056 1057 if not are_datalossprotect_fields_valid(): 1058 raise RemoteMisbehaving("channel_reestablish: data loss protect fields invalid") 1059 1060 if they_are_ahead: 1061 self.logger.warning(f"channel_reestablish ({chan.get_id_for_log()}): " 1062 f"remote is ahead of us! They should force-close. Remote PCP: {bh2u(their_local_pcp)}") 1063 # data_loss_protect_remote_pcp is used in lnsweep 1064 chan.set_data_loss_protect_remote_pcp(their_next_local_ctn - 1, their_local_pcp) 1065 self.lnworker.save_channel(chan) 1066 chan.peer_state = PeerState.BAD 1067 return 1068 elif we_are_ahead: 1069 self.logger.warning(f"channel_reestablish ({chan.get_id_for_log()}): we are ahead of remote! trying to force-close.") 1070 await self.lnworker.try_force_closing(chan_id) 1071 return 1072 1073 chan.peer_state = PeerState.GOOD 1074 if chan.is_funded() and their_next_local_ctn == next_local_ctn == 1: 1075 self.send_funding_locked(chan) 1076 # checks done 1077 if chan.is_funded() and chan.config[LOCAL].funding_locked_received: 1078 self.mark_open(chan) 1079 util.trigger_callback('channel', self.lnworker.wallet, chan) 1080 # if we have sent a previous shutdown, it must be retransmitted (Bolt2) 1081 if chan.get_state() == ChannelState.SHUTDOWN: 1082 await self.send_shutdown(chan) 1083 1084 def send_funding_locked(self, chan: Channel): 1085 channel_id = chan.channel_id 1086 per_commitment_secret_index = RevocationStore.START_INDEX - 1 1087 per_commitment_point_second = secret_to_pubkey(int.from_bytes( 1088 get_per_commitment_secret_from_seed(chan.config[LOCAL].per_commitment_secret_seed, per_commitment_secret_index), 'big')) 1089 # note: if funding_locked was not yet received, we might send it multiple times 1090 self.send_message("funding_locked", channel_id=channel_id, next_per_commitment_point=per_commitment_point_second) 1091 if chan.is_funded() and chan.config[LOCAL].funding_locked_received: 1092 self.mark_open(chan) 1093 1094 def on_funding_locked(self, chan: Channel, payload): 1095 self.logger.info(f"on_funding_locked. channel: {bh2u(chan.channel_id)}") 1096 if not chan.config[LOCAL].funding_locked_received: 1097 their_next_point = payload["next_per_commitment_point"] 1098 chan.config[REMOTE].next_per_commitment_point = their_next_point 1099 chan.config[LOCAL].funding_locked_received = True 1100 self.lnworker.save_channel(chan) 1101 if chan.is_funded(): 1102 self.mark_open(chan) 1103 1104 def on_network_update(self, chan: Channel, funding_tx_depth: int): 1105 """ 1106 Only called when the channel is OPEN. 1107 1108 Runs on the Network thread. 1109 """ 1110 if not chan.config[LOCAL].was_announced and funding_tx_depth >= 6: 1111 # don't announce our channels 1112 # FIXME should this be a field in chan.local_state maybe? 1113 return 1114 chan.config[LOCAL].was_announced = True 1115 self.lnworker.save_channel(chan) 1116 coro = self.handle_announcements(chan) 1117 asyncio.run_coroutine_threadsafe(coro, self.network.asyncio_loop) 1118 1119 @log_exceptions 1120 async def handle_announcements(self, chan: Channel): 1121 h, local_node_sig, local_bitcoin_sig = self.send_announcement_signatures(chan) 1122 announcement_signatures_msg = await self.announcement_signatures[chan.channel_id].get() 1123 remote_node_sig = announcement_signatures_msg["node_signature"] 1124 remote_bitcoin_sig = announcement_signatures_msg["bitcoin_signature"] 1125 if not ecc.verify_signature(chan.config[REMOTE].multisig_key.pubkey, remote_bitcoin_sig, h): 1126 raise Exception("bitcoin_sig invalid in announcement_signatures") 1127 if not ecc.verify_signature(self.pubkey, remote_node_sig, h): 1128 raise Exception("node_sig invalid in announcement_signatures") 1129 1130 node_sigs = [remote_node_sig, local_node_sig] 1131 bitcoin_sigs = [remote_bitcoin_sig, local_bitcoin_sig] 1132 bitcoin_keys = [chan.config[REMOTE].multisig_key.pubkey, chan.config[LOCAL].multisig_key.pubkey] 1133 1134 if self.node_ids[0] > self.node_ids[1]: 1135 node_sigs.reverse() 1136 bitcoin_sigs.reverse() 1137 node_ids = list(reversed(self.node_ids)) 1138 bitcoin_keys.reverse() 1139 else: 1140 node_ids = self.node_ids 1141 1142 self.send_message("channel_announcement", 1143 node_signatures_1=node_sigs[0], 1144 node_signatures_2=node_sigs[1], 1145 bitcoin_signature_1=bitcoin_sigs[0], 1146 bitcoin_signature_2=bitcoin_sigs[1], 1147 len=0, 1148 #features not set (defaults to zeros) 1149 chain_hash=constants.net.rev_genesis_bytes(), 1150 short_channel_id=chan.short_channel_id, 1151 node_id_1=node_ids[0], 1152 node_id_2=node_ids[1], 1153 bitcoin_key_1=bitcoin_keys[0], 1154 bitcoin_key_2=bitcoin_keys[1] 1155 ) 1156 1157 def mark_open(self, chan: Channel): 1158 assert chan.is_funded() 1159 # only allow state transition from "FUNDED" to "OPEN" 1160 old_state = chan.get_state() 1161 if old_state == ChannelState.OPEN: 1162 return 1163 if old_state != ChannelState.FUNDED: 1164 self.logger.info(f"cannot mark open ({chan.get_id_for_log()}), current state: {repr(old_state)}") 1165 return 1166 assert chan.config[LOCAL].funding_locked_received 1167 chan.set_state(ChannelState.OPEN) 1168 util.trigger_callback('channel', self.lnworker.wallet, chan) 1169 # peer may have sent us a channel update for the incoming direction previously 1170 pending_channel_update = self.orphan_channel_updates.get(chan.short_channel_id) 1171 if pending_channel_update: 1172 chan.set_remote_update(pending_channel_update['raw']) 1173 self.logger.info(f"CHANNEL OPENING COMPLETED ({chan.get_id_for_log()})") 1174 forwarding_enabled = self.network.config.get('lightning_forward_payments', False) 1175 if forwarding_enabled: 1176 # send channel_update of outgoing edge to peer, 1177 # so that channel can be used to to receive payments 1178 self.logger.info(f"sending channel update for outgoing edge ({chan.get_id_for_log()})") 1179 chan_upd = chan.get_outgoing_gossip_channel_update() 1180 self.transport.send_bytes(chan_upd) 1181 1182 def send_announcement_signatures(self, chan: Channel): 1183 chan_ann = chan.construct_channel_announcement_without_sigs() 1184 preimage = chan_ann[256+2:] 1185 msg_hash = sha256d(preimage) 1186 bitcoin_signature = ecc.ECPrivkey(chan.config[LOCAL].multisig_key.privkey).sign(msg_hash, sig_string_from_r_and_s) 1187 node_signature = ecc.ECPrivkey(self.privkey).sign(msg_hash, sig_string_from_r_and_s) 1188 self.send_message("announcement_signatures", 1189 channel_id=chan.channel_id, 1190 short_channel_id=chan.short_channel_id, 1191 node_signature=node_signature, 1192 bitcoin_signature=bitcoin_signature 1193 ) 1194 return msg_hash, node_signature, bitcoin_signature 1195 1196 def on_update_fail_htlc(self, chan: Channel, payload): 1197 htlc_id = payload["id"] 1198 reason = payload["reason"] 1199 self.logger.info(f"on_update_fail_htlc. chan {chan.short_channel_id}. htlc_id {htlc_id}") 1200 chan.receive_fail_htlc(htlc_id, error_bytes=reason) # TODO handle exc and maybe fail channel (e.g. bad htlc_id) 1201 self.maybe_send_commitment(chan) 1202 1203 def maybe_send_commitment(self, chan: Channel): 1204 # REMOTE should revoke first before we can sign a new ctx 1205 if chan.hm.is_revack_pending(REMOTE): 1206 return 1207 # if there are no changes, we will not (and must not) send a new commitment 1208 if not chan.has_pending_changes(REMOTE): 1209 return 1210 self.logger.info(f'send_commitment. chan {chan.short_channel_id}. ctn: {chan.get_next_ctn(REMOTE)}.') 1211 sig_64, htlc_sigs = chan.sign_next_commitment() 1212 self.send_message("commitment_signed", channel_id=chan.channel_id, signature=sig_64, num_htlcs=len(htlc_sigs), htlc_signature=b"".join(htlc_sigs)) 1213 1214 def pay(self, *, 1215 route: 'LNPaymentRoute', 1216 chan: Channel, 1217 amount_msat: int, 1218 total_msat: int, 1219 payment_hash: bytes, 1220 min_final_cltv_expiry: int, 1221 payment_secret: bytes = None, 1222 trampoline_onion=None) -> UpdateAddHtlc: 1223 1224 assert amount_msat > 0, "amount_msat is not greater zero" 1225 assert len(route) > 0 1226 if not chan.can_send_update_add_htlc(): 1227 raise PaymentFailure("Channel cannot send update_add_htlc") 1228 # add features learned during "init" for direct neighbour: 1229 route[0].node_features |= self.features 1230 local_height = self.network.get_local_height() 1231 final_cltv = local_height + min_final_cltv_expiry 1232 hops_data, amount_msat, cltv = calc_hops_data_for_payment( 1233 route, 1234 amount_msat, 1235 final_cltv, 1236 total_msat=total_msat, 1237 payment_secret=payment_secret) 1238 num_hops = len(hops_data) 1239 self.logger.info(f"lnpeer.pay len(route)={len(route)}") 1240 for i in range(len(route)): 1241 self.logger.info(f" {i}: edge={route[i].short_channel_id} hop_data={hops_data[i]!r}") 1242 assert final_cltv <= cltv, (final_cltv, cltv) 1243 session_key = os.urandom(32) # session_key 1244 # if we are forwarding a trampoline payment, add trampoline onion 1245 if trampoline_onion: 1246 self.logger.info(f'adding trampoline onion to final payload') 1247 trampoline_payload = hops_data[num_hops-2].payload 1248 trampoline_payload["trampoline_onion_packet"] = { 1249 "version": trampoline_onion.version, 1250 "public_key": trampoline_onion.public_key, 1251 "hops_data": trampoline_onion.hops_data, 1252 "hmac": trampoline_onion.hmac 1253 } 1254 # create onion packet 1255 payment_path_pubkeys = [x.node_id for x in route] 1256 onion = new_onion_packet(payment_path_pubkeys, session_key, hops_data, associated_data=payment_hash) # must use another sessionkey 1257 self.logger.info(f"starting payment. len(route)={len(hops_data)}.") 1258 # create htlc 1259 if cltv > local_height + lnutil.NBLOCK_CLTV_EXPIRY_TOO_FAR_INTO_FUTURE: 1260 raise PaymentFailure(f"htlc expiry too far into future. (in {cltv-local_height} blocks)") 1261 htlc = UpdateAddHtlc(amount_msat=amount_msat, payment_hash=payment_hash, cltv_expiry=cltv, timestamp=int(time.time())) 1262 htlc = chan.add_htlc(htlc) 1263 chan.set_onion_key(htlc.htlc_id, session_key) # should it be the outer onion secret? 1264 self.logger.info(f"starting payment. htlc: {htlc}") 1265 self.send_message( 1266 "update_add_htlc", 1267 channel_id=chan.channel_id, 1268 id=htlc.htlc_id, 1269 cltv_expiry=htlc.cltv_expiry, 1270 amount_msat=htlc.amount_msat, 1271 payment_hash=htlc.payment_hash, 1272 onion_routing_packet=onion.to_bytes()) 1273 self.maybe_send_commitment(chan) 1274 return htlc 1275 1276 def send_revoke_and_ack(self, chan: Channel): 1277 self.logger.info(f'send_revoke_and_ack. chan {chan.short_channel_id}. ctn: {chan.get_oldest_unrevoked_ctn(LOCAL)}') 1278 rev = chan.revoke_current_commitment() 1279 self.lnworker.save_channel(chan) 1280 self.send_message("revoke_and_ack", 1281 channel_id=chan.channel_id, 1282 per_commitment_secret=rev.per_commitment_secret, 1283 next_per_commitment_point=rev.next_per_commitment_point) 1284 self.maybe_send_commitment(chan) 1285 1286 def on_commitment_signed(self, chan: Channel, payload): 1287 if chan.peer_state == PeerState.BAD: 1288 return 1289 self.logger.info(f'on_commitment_signed. chan {chan.short_channel_id}. ctn: {chan.get_next_ctn(LOCAL)}.') 1290 # make sure there were changes to the ctx, otherwise the remote peer is misbehaving 1291 if not chan.has_pending_changes(LOCAL): 1292 # TODO if feerate changed A->B->A; so there were updates but the value is identical, 1293 # then it might be legal to send a commitment_signature 1294 # see https://github.com/lightningnetwork/lightning-rfc/pull/618 1295 raise RemoteMisbehaving('received commitment_signed without pending changes') 1296 # REMOTE should wait until we have revoked 1297 if chan.hm.is_revack_pending(LOCAL): 1298 raise RemoteMisbehaving('received commitment_signed before we revoked previous ctx') 1299 data = payload["htlc_signature"] 1300 htlc_sigs = list(chunks(data, 64)) 1301 chan.receive_new_commitment(payload["signature"], htlc_sigs) 1302 self.send_revoke_and_ack(chan) 1303 1304 def on_update_fulfill_htlc(self, chan: Channel, payload): 1305 preimage = payload["payment_preimage"] 1306 payment_hash = sha256(preimage) 1307 htlc_id = payload["id"] 1308 self.logger.info(f"on_update_fulfill_htlc. chan {chan.short_channel_id}. htlc_id {htlc_id}") 1309 chan.receive_htlc_settle(preimage, htlc_id) # TODO handle exc and maybe fail channel (e.g. bad htlc_id) 1310 self.lnworker.save_preimage(payment_hash, preimage) 1311 self.maybe_send_commitment(chan) 1312 1313 def on_update_fail_malformed_htlc(self, chan: Channel, payload): 1314 htlc_id = payload["id"] 1315 failure_code = payload["failure_code"] 1316 self.logger.info(f"on_update_fail_malformed_htlc. chan {chan.get_id_for_log()}. " 1317 f"htlc_id {htlc_id}. failure_code={failure_code}") 1318 if failure_code & OnionFailureCodeMetaFlag.BADONION == 0: 1319 asyncio.ensure_future(self.lnworker.try_force_closing(chan.channel_id)) 1320 raise RemoteMisbehaving(f"received update_fail_malformed_htlc with unexpected failure code: {failure_code}") 1321 reason = OnionRoutingFailure(code=failure_code, data=payload["sha256_of_onion"]) 1322 chan.receive_fail_htlc(htlc_id, error_bytes=None, reason=reason) 1323 self.maybe_send_commitment(chan) 1324 1325 def on_update_add_htlc(self, chan: Channel, payload): 1326 payment_hash = payload["payment_hash"] 1327 htlc_id = payload["id"] 1328 cltv_expiry = payload["cltv_expiry"] 1329 amount_msat_htlc = payload["amount_msat"] 1330 onion_packet = payload["onion_routing_packet"] 1331 htlc = UpdateAddHtlc( 1332 amount_msat=amount_msat_htlc, 1333 payment_hash=payment_hash, 1334 cltv_expiry=cltv_expiry, 1335 timestamp=int(time.time()), 1336 htlc_id=htlc_id) 1337 self.logger.info(f"on_update_add_htlc. chan {chan.short_channel_id}. htlc={str(htlc)}") 1338 if chan.get_state() != ChannelState.OPEN: 1339 raise RemoteMisbehaving(f"received update_add_htlc while chan.get_state() != OPEN. state was {chan.get_state()!r}") 1340 if cltv_expiry > bitcoin.NLOCKTIME_BLOCKHEIGHT_MAX: 1341 asyncio.ensure_future(self.lnworker.try_force_closing(chan.channel_id)) 1342 raise RemoteMisbehaving(f"received update_add_htlc with cltv_expiry > BLOCKHEIGHT_MAX. value was {cltv_expiry}") 1343 # add htlc 1344 chan.receive_htlc(htlc, onion_packet) 1345 util.trigger_callback('htlc_added', chan, htlc, RECEIVED) 1346 1347 def maybe_forward_htlc( 1348 self, *, 1349 htlc: UpdateAddHtlc, 1350 processed_onion: ProcessedOnionPacket) -> Tuple[bytes, int]: 1351 1352 # Forward HTLC 1353 # FIXME: there are critical safety checks MISSING here 1354 # - for example; atm we forward first and then persist "forwarding_info", 1355 # so if we segfault in-between and restart, we might forward an HTLC twice... 1356 # (same for trampoline forwarding) 1357 forwarding_enabled = self.network.config.get('lightning_forward_payments', False) 1358 if not forwarding_enabled: 1359 self.logger.info(f"forwarding is disabled. failing htlc.") 1360 raise OnionRoutingFailure(code=OnionFailureCode.PERMANENT_CHANNEL_FAILURE, data=b'') 1361 chain = self.network.blockchain() 1362 if chain.is_tip_stale(): 1363 raise OnionRoutingFailure(code=OnionFailureCode.TEMPORARY_NODE_FAILURE, data=b'') 1364 try: 1365 next_chan_scid = processed_onion.hop_data.payload["short_channel_id"]["short_channel_id"] 1366 except: 1367 raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_PAYLOAD, data=b'\x00\x00\x00') 1368 next_chan = self.lnworker.get_channel_by_short_id(next_chan_scid) 1369 local_height = chain.height() 1370 if next_chan is None: 1371 self.logger.info(f"cannot forward htlc. cannot find next_chan {next_chan_scid}") 1372 raise OnionRoutingFailure(code=OnionFailureCode.UNKNOWN_NEXT_PEER, data=b'') 1373 outgoing_chan_upd = next_chan.get_outgoing_gossip_channel_update()[2:] 1374 outgoing_chan_upd_len = len(outgoing_chan_upd).to_bytes(2, byteorder="big") 1375 if not next_chan.can_send_update_add_htlc(): 1376 self.logger.info(f"cannot forward htlc. next_chan {next_chan_scid} cannot send ctx updates. " 1377 f"chan state {next_chan.get_state()!r}, peer state: {next_chan.peer_state!r}") 1378 data = outgoing_chan_upd_len + outgoing_chan_upd 1379 raise OnionRoutingFailure(code=OnionFailureCode.TEMPORARY_CHANNEL_FAILURE, data=data) 1380 try: 1381 next_cltv_expiry = processed_onion.hop_data.payload["outgoing_cltv_value"]["outgoing_cltv_value"] 1382 except: 1383 raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_PAYLOAD, data=b'\x00\x00\x00') 1384 if htlc.cltv_expiry - next_cltv_expiry < next_chan.forwarding_cltv_expiry_delta: 1385 data = htlc.cltv_expiry.to_bytes(4, byteorder="big") + outgoing_chan_upd_len + outgoing_chan_upd 1386 raise OnionRoutingFailure(code=OnionFailureCode.INCORRECT_CLTV_EXPIRY, data=data) 1387 if htlc.cltv_expiry - lnutil.MIN_FINAL_CLTV_EXPIRY_ACCEPTED <= local_height \ 1388 or next_cltv_expiry <= local_height: 1389 data = outgoing_chan_upd_len + outgoing_chan_upd 1390 raise OnionRoutingFailure(code=OnionFailureCode.EXPIRY_TOO_SOON, data=data) 1391 if max(htlc.cltv_expiry, next_cltv_expiry) > local_height + lnutil.NBLOCK_CLTV_EXPIRY_TOO_FAR_INTO_FUTURE: 1392 raise OnionRoutingFailure(code=OnionFailureCode.EXPIRY_TOO_FAR, data=b'') 1393 try: 1394 next_amount_msat_htlc = processed_onion.hop_data.payload["amt_to_forward"]["amt_to_forward"] 1395 except: 1396 raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_PAYLOAD, data=b'\x00\x00\x00') 1397 forwarding_fees = fee_for_edge_msat( 1398 forwarded_amount_msat=next_amount_msat_htlc, 1399 fee_base_msat=next_chan.forwarding_fee_base_msat, 1400 fee_proportional_millionths=next_chan.forwarding_fee_proportional_millionths) 1401 if htlc.amount_msat - next_amount_msat_htlc < forwarding_fees: 1402 data = next_amount_msat_htlc.to_bytes(8, byteorder="big") + outgoing_chan_upd_len + outgoing_chan_upd 1403 raise OnionRoutingFailure(code=OnionFailureCode.FEE_INSUFFICIENT, data=data) 1404 self.logger.info(f'forwarding htlc to {next_chan.node_id}') 1405 next_htlc = UpdateAddHtlc( 1406 amount_msat=next_amount_msat_htlc, 1407 payment_hash=htlc.payment_hash, 1408 cltv_expiry=next_cltv_expiry, 1409 timestamp=int(time.time())) 1410 next_htlc = next_chan.add_htlc(next_htlc) 1411 next_peer = self.lnworker.peers[next_chan.node_id] 1412 try: 1413 next_peer.send_message( 1414 "update_add_htlc", 1415 channel_id=next_chan.channel_id, 1416 id=next_htlc.htlc_id, 1417 cltv_expiry=next_cltv_expiry, 1418 amount_msat=next_amount_msat_htlc, 1419 payment_hash=next_htlc.payment_hash, 1420 onion_routing_packet=processed_onion.next_packet.to_bytes() 1421 ) 1422 except BaseException as e: 1423 self.logger.info(f"failed to forward htlc: error sending message. {e}") 1424 data = outgoing_chan_upd_len + outgoing_chan_upd 1425 raise OnionRoutingFailure(code=OnionFailureCode.TEMPORARY_CHANNEL_FAILURE, data=data) 1426 return next_chan_scid, next_htlc.htlc_id 1427 1428 def maybe_forward_trampoline( 1429 self, *, 1430 chan: Channel, 1431 htlc: UpdateAddHtlc, 1432 trampoline_onion: ProcessedOnionPacket): 1433 1434 payload = trampoline_onion.hop_data.payload 1435 payment_hash = htlc.payment_hash 1436 payment_secret = os.urandom(32) 1437 try: 1438 outgoing_node_id = payload["outgoing_node_id"]["outgoing_node_id"] 1439 amt_to_forward = payload["amt_to_forward"]["amt_to_forward"] 1440 cltv_from_onion = payload["outgoing_cltv_value"]["outgoing_cltv_value"] 1441 if "invoice_features" in payload: 1442 self.logger.info('forward_trampoline: legacy') 1443 next_trampoline_onion = None 1444 invoice_features = payload["invoice_features"]["invoice_features"] 1445 invoice_routing_info = payload["invoice_routing_info"]["invoice_routing_info"] 1446 else: 1447 self.logger.info('forward_trampoline: end-to-end') 1448 invoice_features = LnFeatures.BASIC_MPP_OPT 1449 next_trampoline_onion = trampoline_onion.next_packet 1450 except Exception as e: 1451 self.logger.exception('') 1452 raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_PAYLOAD, data=b'\x00\x00\x00') 1453 1454 # these are the fee/cltv paid by the sender 1455 # pay_to_node will raise if they are not sufficient 1456 trampoline_cltv_delta = htlc.cltv_expiry - cltv_from_onion 1457 trampoline_fee = htlc.amount_msat - amt_to_forward 1458 1459 @log_exceptions 1460 async def forward_trampoline_payment(): 1461 try: 1462 await self.lnworker.pay_to_node( 1463 node_pubkey=outgoing_node_id, 1464 payment_hash=payment_hash, 1465 payment_secret=payment_secret, 1466 amount_to_pay=amt_to_forward, 1467 min_cltv_expiry=cltv_from_onion, 1468 r_tags=[], 1469 invoice_features=invoice_features, 1470 fwd_trampoline_onion=next_trampoline_onion, 1471 fwd_trampoline_fee=trampoline_fee, 1472 fwd_trampoline_cltv_delta=trampoline_cltv_delta, 1473 attempts=1) 1474 except OnionRoutingFailure as e: 1475 # FIXME: cannot use payment_hash as key 1476 self.lnworker.trampoline_forwarding_failures[payment_hash] = e 1477 except PaymentFailure as e: 1478 # FIXME: adapt the error code 1479 error_reason = OnionRoutingFailure(code=OnionFailureCode.UNKNOWN_NEXT_PEER, data=b'') 1480 self.lnworker.trampoline_forwarding_failures[payment_hash] = error_reason 1481 1482 asyncio.ensure_future(forward_trampoline_payment()) 1483 1484 def maybe_fulfill_htlc( 1485 self, *, 1486 chan: Channel, 1487 htlc: UpdateAddHtlc, 1488 processed_onion: ProcessedOnionPacket, 1489 is_trampoline: bool = False) -> Tuple[Optional[bytes], Optional[OnionPacket]]: 1490 1491 """As a final recipient of an HTLC, decide if we should fulfill it. 1492 Return (preimage, trampoline_onion_packet) with at most a single element not None 1493 """ 1494 def log_fail_reason(reason: str): 1495 self.logger.info(f"maybe_fulfill_htlc. will FAIL HTLC: chan {chan.short_channel_id}. " 1496 f"{reason}. htlc={str(htlc)}. onion_payload={processed_onion.hop_data.payload}") 1497 1498 try: 1499 amt_to_forward = processed_onion.hop_data.payload["amt_to_forward"]["amt_to_forward"] 1500 except: 1501 log_fail_reason(f"'amt_to_forward' missing from onion") 1502 raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_PAYLOAD, data=b'\x00\x00\x00') 1503 1504 # Check that our blockchain tip is sufficiently recent so that we have an approx idea of the height. 1505 # We should not release the preimage for an HTLC that its sender could already time out as 1506 # then they might try to force-close and it becomes a race. 1507 chain = self.network.blockchain() 1508 if chain.is_tip_stale(): 1509 log_fail_reason(f"our chain tip is stale") 1510 raise OnionRoutingFailure(code=OnionFailureCode.TEMPORARY_NODE_FAILURE, data=b'') 1511 local_height = chain.height() 1512 exc_incorrect_or_unknown_pd = OnionRoutingFailure( 1513 code=OnionFailureCode.INCORRECT_OR_UNKNOWN_PAYMENT_DETAILS, 1514 data=amt_to_forward.to_bytes(8, byteorder="big") + local_height.to_bytes(4, byteorder="big")) 1515 if local_height + MIN_FINAL_CLTV_EXPIRY_ACCEPTED > htlc.cltv_expiry: 1516 log_fail_reason(f"htlc.cltv_expiry is unreasonably close") 1517 raise exc_incorrect_or_unknown_pd 1518 try: 1519 cltv_from_onion = processed_onion.hop_data.payload["outgoing_cltv_value"]["outgoing_cltv_value"] 1520 except: 1521 log_fail_reason(f"'outgoing_cltv_value' missing from onion") 1522 raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_PAYLOAD, data=b'\x00\x00\x00') 1523 1524 if not is_trampoline: 1525 if cltv_from_onion != htlc.cltv_expiry: 1526 log_fail_reason(f"cltv_from_onion != htlc.cltv_expiry") 1527 raise OnionRoutingFailure( 1528 code=OnionFailureCode.FINAL_INCORRECT_CLTV_EXPIRY, 1529 data=htlc.cltv_expiry.to_bytes(4, byteorder="big")) 1530 try: 1531 total_msat = processed_onion.hop_data.payload["payment_data"]["total_msat"] 1532 except: 1533 total_msat = amt_to_forward # fall back to "amt_to_forward" 1534 1535 if not is_trampoline and amt_to_forward != htlc.amount_msat: 1536 log_fail_reason(f"amt_to_forward != htlc.amount_msat") 1537 raise OnionRoutingFailure( 1538 code=OnionFailureCode.FINAL_INCORRECT_HTLC_AMOUNT, 1539 data=htlc.amount_msat.to_bytes(8, byteorder="big")) 1540 1541 try: 1542 payment_secret_from_onion = processed_onion.hop_data.payload["payment_data"]["payment_secret"] 1543 except: 1544 if total_msat > amt_to_forward: 1545 # payment_secret is required for MPP 1546 log_fail_reason(f"'payment_secret' missing from onion") 1547 raise exc_incorrect_or_unknown_pd 1548 # TODO fail here if invoice has set PAYMENT_SECRET_REQ 1549 payment_secret_from_onion = None 1550 1551 if total_msat > amt_to_forward: 1552 mpp_status = self.lnworker.check_received_mpp_htlc(payment_secret_from_onion, chan.short_channel_id, htlc, total_msat) 1553 if mpp_status is None: 1554 return None, None 1555 if mpp_status is False: 1556 log_fail_reason(f"MPP_TIMEOUT") 1557 raise OnionRoutingFailure(code=OnionFailureCode.MPP_TIMEOUT, data=b'') 1558 assert mpp_status is True 1559 1560 # if there is a trampoline_onion, maybe_fulfill_htlc will be called again 1561 if processed_onion.trampoline_onion_packet: 1562 # TODO: we should check that all trampoline_onions are the same 1563 return None, processed_onion.trampoline_onion_packet 1564 1565 info = self.lnworker.get_payment_info(htlc.payment_hash) 1566 if info is None: 1567 log_fail_reason(f"no payment_info found for RHASH {htlc.payment_hash.hex()}") 1568 raise exc_incorrect_or_unknown_pd 1569 preimage = self.lnworker.get_preimage(htlc.payment_hash) 1570 if payment_secret_from_onion: 1571 if payment_secret_from_onion != derive_payment_secret_from_payment_preimage(preimage): 1572 log_fail_reason(f'incorrect payment secret {payment_secret_from_onion.hex()} != {derive_payment_secret_from_payment_preimage(preimage).hex()}') 1573 raise exc_incorrect_or_unknown_pd 1574 invoice_msat = info.amount_msat 1575 if not (invoice_msat is None or invoice_msat <= total_msat <= 2 * invoice_msat): 1576 log_fail_reason(f"total_msat={total_msat} too different from invoice_msat={invoice_msat}") 1577 raise exc_incorrect_or_unknown_pd 1578 self.logger.info(f"maybe_fulfill_htlc. will FULFILL HTLC: chan {chan.short_channel_id}. htlc={str(htlc)}") 1579 self.lnworker.set_request_status(htlc.payment_hash, PR_PAID) 1580 return preimage, None 1581 1582 def fulfill_htlc(self, chan: Channel, htlc_id: int, preimage: bytes): 1583 self.logger.info(f"_fulfill_htlc. chan {chan.short_channel_id}. htlc_id {htlc_id}") 1584 assert chan.can_send_ctx_updates(), f"cannot send updates: {chan.short_channel_id}" 1585 assert chan.hm.is_htlc_irrevocably_added_yet(htlc_proposer=REMOTE, htlc_id=htlc_id) 1586 self.received_htlcs_pending_removal.add((chan, htlc_id)) 1587 chan.settle_htlc(preimage, htlc_id) 1588 self.send_message( 1589 "update_fulfill_htlc", 1590 channel_id=chan.channel_id, 1591 id=htlc_id, 1592 payment_preimage=preimage) 1593 1594 def fail_htlc(self, *, chan: Channel, htlc_id: int, error_bytes: bytes): 1595 self.logger.info(f"fail_htlc. chan {chan.short_channel_id}. htlc_id {htlc_id}.") 1596 assert chan.can_send_ctx_updates(), f"cannot send updates: {chan.short_channel_id}" 1597 self.received_htlcs_pending_removal.add((chan, htlc_id)) 1598 chan.fail_htlc(htlc_id) 1599 self.send_message( 1600 "update_fail_htlc", 1601 channel_id=chan.channel_id, 1602 id=htlc_id, 1603 len=len(error_bytes), 1604 reason=error_bytes) 1605 1606 def fail_malformed_htlc(self, *, chan: Channel, htlc_id: int, reason: OnionRoutingFailure): 1607 self.logger.info(f"fail_malformed_htlc. chan {chan.short_channel_id}. htlc_id {htlc_id}.") 1608 assert chan.can_send_ctx_updates(), f"cannot send updates: {chan.short_channel_id}" 1609 if not (reason.code & OnionFailureCodeMetaFlag.BADONION and len(reason.data) == 32): 1610 raise Exception(f"unexpected reason when sending 'update_fail_malformed_htlc': {reason!r}") 1611 self.received_htlcs_pending_removal.add((chan, htlc_id)) 1612 chan.fail_htlc(htlc_id) 1613 self.send_message( 1614 "update_fail_malformed_htlc", 1615 channel_id=chan.channel_id, 1616 id=htlc_id, 1617 sha256_of_onion=reason.data, 1618 failure_code=reason.code) 1619 1620 def on_revoke_and_ack(self, chan: Channel, payload): 1621 if chan.peer_state == PeerState.BAD: 1622 return 1623 self.logger.info(f'on_revoke_and_ack. chan {chan.short_channel_id}. ctn: {chan.get_oldest_unrevoked_ctn(REMOTE)}') 1624 rev = RevokeAndAck(payload["per_commitment_secret"], payload["next_per_commitment_point"]) 1625 chan.receive_revocation(rev) 1626 self.lnworker.save_channel(chan) 1627 self.maybe_send_commitment(chan) 1628 1629 def on_update_fee(self, chan: Channel, payload): 1630 feerate = payload["feerate_per_kw"] 1631 chan.update_fee(feerate, False) 1632 1633 async def maybe_update_fee(self, chan: Channel): 1634 """ 1635 called when our fee estimates change 1636 """ 1637 if not chan.can_send_ctx_updates(): 1638 return 1639 feerate_per_kw = self.lnworker.current_feerate_per_kw() 1640 if not chan.constraints.is_initiator: 1641 if constants.net is not constants.BitcoinRegtest: 1642 chan_feerate = chan.get_latest_feerate(LOCAL) 1643 ratio = chan_feerate / feerate_per_kw 1644 if ratio < 0.5: 1645 # Note that we trust the Electrum server about fee rates 1646 # Thus, automated force-closing might not be a good idea 1647 # Maybe we should display something in the GUI instead 1648 self.logger.warning( 1649 f"({chan.get_id_for_log()}) feerate is {chan_feerate} sat/kw, " 1650 f"current recommended feerate is {feerate_per_kw} sat/kw, consider force closing!") 1651 return 1652 chan_fee = chan.get_next_feerate(REMOTE) 1653 if feerate_per_kw < chan_fee / 2: 1654 self.logger.info("FEES HAVE FALLEN") 1655 elif feerate_per_kw > chan_fee * 2: 1656 self.logger.info("FEES HAVE RISEN") 1657 else: 1658 return 1659 self.logger.info(f"(chan: {chan.get_id_for_log()}) current pending feerate {chan_fee}. " 1660 f"new feerate {feerate_per_kw}") 1661 chan.update_fee(feerate_per_kw, True) 1662 self.send_message( 1663 "update_fee", 1664 channel_id=chan.channel_id, 1665 feerate_per_kw=feerate_per_kw) 1666 self.maybe_send_commitment(chan) 1667 1668 @log_exceptions 1669 async def close_channel(self, chan_id: bytes): 1670 chan = self.channels[chan_id] 1671 self.shutdown_received[chan_id] = asyncio.Future() 1672 await self.send_shutdown(chan) 1673 payload = await self.shutdown_received[chan_id] 1674 txid = await self._shutdown(chan, payload, is_local=True) 1675 self.logger.info(f'({chan.get_id_for_log()}) Channel closed {txid}') 1676 return txid 1677 1678 async def on_shutdown(self, chan: Channel, payload): 1679 their_scriptpubkey = payload['scriptpubkey'] 1680 their_upfront_scriptpubkey = chan.config[REMOTE].upfront_shutdown_script 1681 1682 # BOLT-02 check if they use the upfront shutdown script they advertized 1683 if their_upfront_scriptpubkey: 1684 if not (their_scriptpubkey == their_upfront_scriptpubkey): 1685 raise UpfrontShutdownScriptViolation("remote didn't use upfront shutdown script it commited to in channel opening") 1686 1687 # BOLT-02 restrict the scriptpubkey to some templates: 1688 if not (match_script_against_template(their_scriptpubkey, transaction.SCRIPTPUBKEY_TEMPLATE_WITNESS_V0) 1689 or match_script_against_template(their_scriptpubkey, transaction.SCRIPTPUBKEY_TEMPLATE_P2SH) 1690 or match_script_against_template(their_scriptpubkey, transaction.SCRIPTPUBKEY_TEMPLATE_P2PKH)): 1691 raise Exception(f'scriptpubkey in received shutdown message does not conform to any template: {their_scriptpubkey.hex()}') 1692 chan_id = chan.channel_id 1693 if chan_id in self.shutdown_received: 1694 self.shutdown_received[chan_id].set_result(payload) 1695 else: 1696 chan = self.channels[chan_id] 1697 await self.send_shutdown(chan) 1698 txid = await self._shutdown(chan, payload, is_local=False) 1699 self.logger.info(f'({chan.get_id_for_log()}) Channel closed by remote peer {txid}') 1700 1701 def can_send_shutdown(self, chan: Channel): 1702 if chan.get_state() >= ChannelState.OPENING: 1703 return True 1704 if chan.constraints.is_initiator and chan.channel_id in self.funding_created_sent: 1705 return True 1706 if not chan.constraints.is_initiator and chan.channel_id in self.funding_signed_sent: 1707 return True 1708 return False 1709 1710 async def send_shutdown(self, chan: Channel): 1711 if not self.can_send_shutdown(chan): 1712 raise Exception('cannot send shutdown') 1713 1714 if chan.config[LOCAL].upfront_shutdown_script: 1715 scriptpubkey = chan.config[LOCAL].upfront_shutdown_script 1716 else: 1717 scriptpubkey = bfh(bitcoin.address_to_script(chan.sweep_address)) 1718 assert scriptpubkey 1719 1720 # wait until no more pending updates (bolt2) 1721 chan.set_can_send_ctx_updates(False) 1722 while chan.has_pending_changes(REMOTE): 1723 await asyncio.sleep(0.1) 1724 self.send_message('shutdown', channel_id=chan.channel_id, len=len(scriptpubkey), scriptpubkey=scriptpubkey) 1725 chan.set_state(ChannelState.SHUTDOWN) 1726 # can fullfill or fail htlcs. cannot add htlcs, because state != OPEN 1727 chan.set_can_send_ctx_updates(True) 1728 1729 @log_exceptions 1730 async def _shutdown(self, chan: Channel, payload, *, is_local: bool): 1731 # wait until no HTLCs remain in either commitment transaction 1732 while len(chan.hm.htlcs(LOCAL)) + len(chan.hm.htlcs(REMOTE)) > 0: 1733 self.logger.info(f'(chan: {chan.short_channel_id}) waiting for htlcs to settle...') 1734 await asyncio.sleep(1) 1735 # if no HTLCs remain, we must not send updates 1736 chan.set_can_send_ctx_updates(False) 1737 their_scriptpubkey = payload['scriptpubkey'] 1738 if chan.config[LOCAL].upfront_shutdown_script: 1739 our_scriptpubkey = chan.config[LOCAL].upfront_shutdown_script 1740 else: 1741 our_scriptpubkey = bfh(bitcoin.address_to_script(chan.sweep_address)) 1742 assert our_scriptpubkey 1743 1744 # estimate fee of closing tx 1745 our_sig, closing_tx = chan.make_closing_tx(our_scriptpubkey, their_scriptpubkey, fee_sat=0) 1746 fee_rate = self.network.config.fee_per_kb() 1747 our_fee = fee_rate * closing_tx.estimated_size() // 1000 1748 # BOLT2: The sending node MUST set fee less than or equal to the base fee of the final ctx 1749 max_fee = chan.get_latest_fee(LOCAL if is_local else REMOTE) 1750 our_fee = min(our_fee, max_fee) 1751 drop_remote = False 1752 def send_closing_signed(): 1753 our_sig, closing_tx = chan.make_closing_tx(our_scriptpubkey, their_scriptpubkey, fee_sat=our_fee, drop_remote=drop_remote) 1754 self.send_message('closing_signed', channel_id=chan.channel_id, fee_satoshis=our_fee, signature=our_sig) 1755 def verify_signature(tx, sig): 1756 their_pubkey = chan.config[REMOTE].multisig_key.pubkey 1757 preimage_hex = tx.serialize_preimage(0) 1758 pre_hash = sha256d(bfh(preimage_hex)) 1759 return ecc.verify_signature(their_pubkey, sig, pre_hash) 1760 # the funder sends the first 'closing_signed' message 1761 if chan.constraints.is_initiator: 1762 send_closing_signed() 1763 # negotiate fee 1764 while True: 1765 # FIXME: the remote SHOULD send closing_signed, but some don't. 1766 cs_payload = await self.wait_for_message('closing_signed', chan.channel_id) 1767 their_fee = cs_payload['fee_satoshis'] 1768 if their_fee > max_fee: 1769 raise Exception(f'the proposed fee exceeds the base fee of the latest commitment transaction {is_local, their_fee, max_fee}') 1770 their_sig = cs_payload['signature'] 1771 # verify their sig: they might have dropped their output 1772 our_sig, closing_tx = chan.make_closing_tx(our_scriptpubkey, their_scriptpubkey, fee_sat=their_fee, drop_remote=False) 1773 if verify_signature(closing_tx, their_sig): 1774 drop_remote = False 1775 else: 1776 our_sig, closing_tx = chan.make_closing_tx(our_scriptpubkey, their_scriptpubkey, fee_sat=their_fee, drop_remote=True) 1777 if verify_signature(closing_tx, their_sig): 1778 drop_remote = True 1779 else: 1780 raise Exception('failed to verify their signature') 1781 # Agree if difference is lower or equal to one (see below) 1782 if abs(our_fee - their_fee) < 2: 1783 our_fee = their_fee 1784 break 1785 # this will be "strictly between" (as in BOLT2) previous values because of the above 1786 our_fee = (our_fee + their_fee) // 2 1787 # another round 1788 send_closing_signed() 1789 # the non-funder replies 1790 if not chan.constraints.is_initiator: 1791 send_closing_signed() 1792 # add signatures 1793 closing_tx.add_signature_to_txin( 1794 txin_idx=0, 1795 signing_pubkey=chan.config[LOCAL].multisig_key.pubkey.hex(), 1796 sig=bh2u(der_sig_from_sig_string(our_sig) + b'\x01')) 1797 closing_tx.add_signature_to_txin( 1798 txin_idx=0, 1799 signing_pubkey=chan.config[REMOTE].multisig_key.pubkey.hex(), 1800 sig=bh2u(der_sig_from_sig_string(their_sig) + b'\x01')) 1801 # save local transaction and set state 1802 try: 1803 self.lnworker.wallet.add_transaction(closing_tx) 1804 except UnrelatedTransactionException: 1805 pass # this can happen if (~all the balance goes to REMOTE) 1806 chan.set_state(ChannelState.CLOSING) 1807 # broadcast 1808 await self.network.try_broadcasting(closing_tx, 'closing') 1809 return closing_tx.txid() 1810 1811 async def htlc_switch(self): 1812 await self.initialized 1813 while True: 1814 self._htlc_switch_iterdone_event.set() 1815 self._htlc_switch_iterdone_event.clear() 1816 await asyncio.sleep(0.1) # TODO maybe make this partly event-driven 1817 self._htlc_switch_iterstart_event.set() 1818 self._htlc_switch_iterstart_event.clear() 1819 self.ping_if_required() 1820 self._maybe_cleanup_received_htlcs_pending_removal() 1821 for chan_id, chan in self.channels.items(): 1822 if not chan.can_send_ctx_updates(): 1823 continue 1824 self.maybe_send_commitment(chan) 1825 done = set() 1826 unfulfilled = chan.hm.log.get('unfulfilled_htlcs', {}) 1827 for htlc_id, (local_ctn, remote_ctn, onion_packet_hex, forwarding_info) in unfulfilled.items(): 1828 if not chan.hm.is_htlc_irrevocably_added_yet(htlc_proposer=REMOTE, htlc_id=htlc_id): 1829 continue 1830 htlc = chan.hm.get_htlc_by_id(REMOTE, htlc_id) 1831 error_reason = None # type: Optional[OnionRoutingFailure] 1832 error_bytes = None # type: Optional[bytes] 1833 preimage = None 1834 fw_info = None 1835 onion_packet_bytes = bytes.fromhex(onion_packet_hex) 1836 onion_packet = None 1837 try: 1838 onion_packet = OnionPacket.from_bytes(onion_packet_bytes) 1839 except OnionRoutingFailure as e: 1840 error_reason = e 1841 else: 1842 try: 1843 preimage, fw_info, error_bytes = await self.process_unfulfilled_htlc( 1844 chan=chan, 1845 htlc=htlc, 1846 forwarding_info=forwarding_info, 1847 onion_packet_bytes=onion_packet_bytes, 1848 onion_packet=onion_packet) 1849 except OnionRoutingFailure as e: 1850 error_bytes = construct_onion_error(e, onion_packet, our_onion_private_key=self.privkey) 1851 if fw_info: 1852 unfulfilled[htlc_id] = local_ctn, remote_ctn, onion_packet_hex, fw_info 1853 elif preimage or error_reason or error_bytes: 1854 if preimage: 1855 await self.lnworker.enable_htlc_settle.wait() 1856 self.fulfill_htlc(chan, htlc.htlc_id, preimage) 1857 elif error_bytes: 1858 self.fail_htlc( 1859 chan=chan, 1860 htlc_id=htlc.htlc_id, 1861 error_bytes=error_bytes) 1862 else: 1863 self.fail_malformed_htlc( 1864 chan=chan, 1865 htlc_id=htlc.htlc_id, 1866 reason=error_reason) 1867 done.add(htlc_id) 1868 # cleanup 1869 for htlc_id in done: 1870 unfulfilled.pop(htlc_id) 1871 1872 def _maybe_cleanup_received_htlcs_pending_removal(self) -> None: 1873 done = set() 1874 for chan, htlc_id in self.received_htlcs_pending_removal: 1875 if chan.hm.is_htlc_irrevocably_removed_yet(htlc_proposer=REMOTE, htlc_id=htlc_id): 1876 done.add((chan, htlc_id)) 1877 if done: 1878 for key in done: 1879 self.received_htlcs_pending_removal.remove(key) 1880 self.received_htlc_removed_event.set() 1881 self.received_htlc_removed_event.clear() 1882 1883 async def wait_one_htlc_switch_iteration(self) -> None: 1884 """Waits until the HTLC switch does a full iteration or the peer disconnects, 1885 whichever happens first. 1886 """ 1887 async def htlc_switch_iteration(): 1888 await self._htlc_switch_iterstart_event.wait() 1889 await self._htlc_switch_iterdone_event.wait() 1890 1891 async with TaskGroup(wait=any) as group: 1892 await group.spawn(htlc_switch_iteration()) 1893 await group.spawn(self.got_disconnected.wait()) 1894 1895 async def process_unfulfilled_htlc( 1896 self, *, 1897 chan: Channel, 1898 htlc: UpdateAddHtlc, 1899 forwarding_info: Tuple[str, int], 1900 onion_packet_bytes: bytes, 1901 onion_packet: OnionPacket) -> Tuple[Optional[bytes], Union[bool, None, Tuple[str, int]], Optional[bytes]]: 1902 """ 1903 return (preimage, fw_info, error_bytes) with at most a single element that is not None 1904 raise an OnionRoutingFailure if we need to fail the htlc 1905 """ 1906 payment_hash = htlc.payment_hash 1907 processed_onion = self.process_onion_packet( 1908 onion_packet, 1909 payment_hash=payment_hash, 1910 onion_packet_bytes=onion_packet_bytes) 1911 if processed_onion.are_we_final: 1912 preimage, trampoline_onion_packet = self.maybe_fulfill_htlc( 1913 chan=chan, 1914 htlc=htlc, 1915 processed_onion=processed_onion) 1916 # trampoline forwarding 1917 if trampoline_onion_packet: 1918 if not forwarding_info: 1919 trampoline_onion = self.process_onion_packet( 1920 trampoline_onion_packet, 1921 payment_hash=htlc.payment_hash, 1922 onion_packet_bytes=onion_packet_bytes, 1923 is_trampoline=True) 1924 if trampoline_onion.are_we_final: 1925 preimage, _ = self.maybe_fulfill_htlc( 1926 chan=chan, 1927 htlc=htlc, 1928 processed_onion=trampoline_onion, 1929 is_trampoline=True) 1930 else: 1931 await self.lnworker.enable_htlc_forwarding.wait() 1932 self.maybe_forward_trampoline( 1933 chan=chan, 1934 htlc=htlc, 1935 trampoline_onion=trampoline_onion) 1936 # return True so that this code gets executed only once 1937 return None, True, None 1938 else: 1939 preimage = self.lnworker.get_preimage(payment_hash) 1940 error_reason = self.lnworker.trampoline_forwarding_failures.pop(payment_hash, None) 1941 if error_reason: 1942 self.logger.info(f'trampoline forwarding failure: {error_reason.code_name()}') 1943 raise error_reason 1944 1945 elif not forwarding_info: 1946 await self.lnworker.enable_htlc_forwarding.wait() 1947 next_chan_id, next_htlc_id = self.maybe_forward_htlc( 1948 htlc=htlc, 1949 processed_onion=processed_onion) 1950 fw_info = (next_chan_id.hex(), next_htlc_id) 1951 return None, fw_info, None 1952 else: 1953 preimage = self.lnworker.get_preimage(payment_hash) 1954 next_chan_id_hex, htlc_id = forwarding_info 1955 next_chan = self.lnworker.get_channel_by_short_id(bytes.fromhex(next_chan_id_hex)) 1956 if next_chan: 1957 error_bytes, error_reason = next_chan.pop_fail_htlc_reason(htlc_id) 1958 if error_bytes: 1959 return None, None, error_bytes 1960 if error_reason: 1961 raise error_reason 1962 if preimage: 1963 return preimage, None, None 1964 return None, None, None 1965 1966 def process_onion_packet( 1967 self, 1968 onion_packet: OnionPacket, *, 1969 payment_hash: bytes, 1970 onion_packet_bytes: bytes, 1971 is_trampoline: bool = False) -> ProcessedOnionPacket: 1972 1973 failure_data = sha256(onion_packet_bytes) 1974 try: 1975 processed_onion = process_onion_packet( 1976 onion_packet, 1977 associated_data=payment_hash, 1978 our_onion_private_key=self.privkey, 1979 is_trampoline=is_trampoline) 1980 except UnsupportedOnionPacketVersion: 1981 raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_VERSION, data=failure_data) 1982 except InvalidOnionPubkey: 1983 raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_KEY, data=failure_data) 1984 except InvalidOnionMac: 1985 raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_HMAC, data=failure_data) 1986 except Exception as e: 1987 self.logger.info(f"error processing onion packet: {e!r}") 1988 raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_VERSION, data=failure_data) 1989 if self.network.config.get('test_fail_malformed_htlc'): 1990 raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_VERSION, data=failure_data) 1991 if self.network.config.get('test_fail_htlcs_with_temp_node_failure'): 1992 raise OnionRoutingFailure(code=OnionFailureCode.TEMPORARY_NODE_FAILURE, data=b'') 1993 return processed_onion