lnworker.py (99688B)
1 # Copyright (C) 2018 The Electrum developers 2 # Distributed under the MIT software license, see the accompanying 3 # file LICENCE or http://www.opensource.org/licenses/mit-license.php 4 5 import asyncio 6 import os 7 from decimal import Decimal 8 import random 9 import time 10 from typing import (Optional, Sequence, Tuple, List, Set, Dict, TYPE_CHECKING, 11 NamedTuple, Union, Mapping, Any, Iterable) 12 import threading 13 import socket 14 import aiohttp 15 import json 16 from datetime import datetime, timezone 17 from functools import partial 18 from collections import defaultdict 19 import concurrent 20 from concurrent import futures 21 import urllib.parse 22 23 import dns.resolver 24 import dns.exception 25 from aiorpcx import run_in_thread, TaskGroup, NetAddress, ignore_after 26 27 from . import constants, util 28 from . import keystore 29 from .util import profiler, chunks 30 from .invoices import PR_TYPE_LN, PR_UNPAID, PR_EXPIRED, PR_PAID, PR_INFLIGHT, PR_FAILED, PR_ROUTING, LNInvoice, LN_EXPIRY_NEVER 31 from .util import NetworkRetryManager, JsonRPCClient 32 from .lnutil import LN_MAX_FUNDING_SAT 33 from .keystore import BIP32_KeyStore 34 from .bitcoin import COIN 35 from .transaction import Transaction 36 from .crypto import sha256 37 from .bip32 import BIP32Node 38 from .util import bh2u, bfh, InvoiceError, resolve_dns_srv, is_ip_address, log_exceptions 39 from .util import ignore_exceptions, make_aiohttp_session, SilentTaskGroup 40 from .util import timestamp_to_datetime, random_shuffled_copy 41 from .util import MyEncoder, is_private_netaddress 42 from .logging import Logger 43 from .lntransport import LNTransport, LNResponderTransport 44 from .lnpeer import Peer, LN_P2P_NETWORK_TIMEOUT 45 from .lnaddr import lnencode, LnAddr, lndecode 46 from .ecc import der_sig_from_sig_string 47 from .lnchannel import Channel 48 from .lnchannel import ChannelState, PeerState 49 from .lnrater import LNRater 50 from . import lnutil 51 from .lnutil import funding_output_script 52 from .bitcoin import redeem_script_to_address 53 from .lnutil import (Outpoint, LNPeerAddr, 54 get_compressed_pubkey_from_bech32, extract_nodeid, 55 PaymentFailure, split_host_port, ConnStringFormatError, 56 generate_keypair, LnKeyFamily, LOCAL, REMOTE, 57 MIN_FINAL_CLTV_EXPIRY_FOR_INVOICE, 58 NUM_MAX_EDGES_IN_PAYMENT_PATH, SENT, RECEIVED, HTLCOwner, 59 UpdateAddHtlc, Direction, LnFeatures, ShortChannelID, 60 HtlcLog, derive_payment_secret_from_payment_preimage, 61 NoPathFound) 62 from .lnutil import ln_dummy_address, ln_compare_features, IncompatibleLightningFeatures 63 from .lnrouter import TrampolineEdge 64 from .transaction import PartialTxOutput, PartialTransaction, PartialTxInput 65 from .lnonion import OnionFailureCode, process_onion_packet, OnionPacket, OnionRoutingFailure 66 from .lnmsg import decode_msg 67 from .i18n import _ 68 from .lnrouter import (RouteEdge, LNPaymentRoute, LNPaymentPath, is_route_sane_to_use, 69 NoChannelPolicy, LNPathInconsistent) 70 from .address_synchronizer import TX_HEIGHT_LOCAL 71 from . import lnsweep 72 from .lnwatcher import LNWalletWatcher 73 from .crypto import pw_encode_with_version_and_mac, pw_decode_with_version_and_mac 74 from .lnutil import ChannelBackupStorage 75 from .lnchannel import ChannelBackup 76 from .channel_db import UpdateStatus 77 from .channel_db import get_mychannel_info, get_mychannel_policy 78 from .submarine_swaps import SwapManager 79 from .channel_db import ChannelInfo, Policy 80 from .mpp_split import suggest_splits 81 from .trampoline import create_trampoline_route_and_onion 82 83 if TYPE_CHECKING: 84 from .network import Network 85 from .wallet import Abstract_Wallet 86 from .channel_db import ChannelDB 87 from .simple_config import SimpleConfig 88 89 90 SAVED_PR_STATUS = [PR_PAID, PR_UNPAID] # status that are persisted 91 92 93 NUM_PEERS_TARGET = 4 94 95 96 FALLBACK_NODE_LIST_TESTNET = ( 97 LNPeerAddr(host='203.132.95.10', port=9735, pubkey=bfh('038863cf8ab91046230f561cd5b386cbff8309fa02e3f0c3ed161a3aeb64a643b9')), 98 LNPeerAddr(host='2401:d002:4402:0:bf1d:986a:7598:6d49', port=9735, pubkey=bfh('038863cf8ab91046230f561cd5b386cbff8309fa02e3f0c3ed161a3aeb64a643b9')), 99 LNPeerAddr(host='50.116.3.223', port=9734, pubkey=bfh('03236a685d30096b26692dce0cf0fa7c8528bdf61dbf5363a3ef6d5c92733a3016')), 100 LNPeerAddr(host='3.16.119.191', port=9735, pubkey=bfh('03d5e17a3c213fe490e1b0c389f8cfcfcea08a29717d50a9f453735e0ab2a7c003')), 101 LNPeerAddr(host='34.250.234.192', port=9735, pubkey=bfh('03933884aaf1d6b108397e5efe5c86bcf2d8ca8d2f700eda99db9214fc2712b134')), 102 LNPeerAddr(host='88.99.209.230', port=9735, pubkey=bfh('0260d9119979caedc570ada883ff614c6efb93f7f7382e25d73ecbeba0b62df2d7')), 103 LNPeerAddr(host='160.16.233.215', port=9735, pubkey=bfh('023ea0a53af875580899da0ab0a21455d9c19160c4ea1b7774c9d4be6810b02d2c')), 104 LNPeerAddr(host='197.155.6.173', port=9735, pubkey=bfh('0269a94e8b32c005e4336bfb743c08a6e9beb13d940d57c479d95c8e687ccbdb9f')), 105 LNPeerAddr(host='2c0f:fb18:406::4', port=9735, pubkey=bfh('0269a94e8b32c005e4336bfb743c08a6e9beb13d940d57c479d95c8e687ccbdb9f')), 106 LNPeerAddr(host='163.172.94.64', port=9735, pubkey=bfh('030f0bf260acdbd3edcad84d7588ec7c5df4711e87e6a23016f989b8d3a4147230')), 107 LNPeerAddr(host='23.237.77.12', port=9735, pubkey=bfh('02312627fdf07fbdd7e5ddb136611bdde9b00d26821d14d94891395452f67af248')), 108 LNPeerAddr(host='197.155.6.172', port=9735, pubkey=bfh('02ae2f22b02375e3e9b4b4a2db4f12e1b50752b4062dbefd6e01332acdaf680379')), 109 LNPeerAddr(host='2c0f:fb18:406::3', port=9735, pubkey=bfh('02ae2f22b02375e3e9b4b4a2db4f12e1b50752b4062dbefd6e01332acdaf680379')), 110 LNPeerAddr(host='23.239.23.44', port=9740, pubkey=bfh('034fe52e98a0e9d3c21b767e1b371881265d8c7578c21f5afd6d6438da10348b36')), 111 LNPeerAddr(host='2600:3c01::f03c:91ff:fe05:349c', port=9740, pubkey=bfh('034fe52e98a0e9d3c21b767e1b371881265d8c7578c21f5afd6d6438da10348b36')), 112 ) 113 114 FALLBACK_NODE_LIST_MAINNET = [ 115 LNPeerAddr(host='172.81.181.3', port=9735, pubkey=bfh('0214382bdce7750dfcb8126df8e2b12de38536902dc36abcebdaeefdeca1df8284')), 116 LNPeerAddr(host='35.230.100.60', port=9735, pubkey=bfh('023f5e3582716bed96f6f26cfcd8037e07474d7b4743afdc8b07e692df63464d7e')), 117 LNPeerAddr(host='40.69.71.114', port=9735, pubkey=bfh('028303182c9885da93b3b25c9621d22cf34475e63c123942e402ab530c0556e675')), 118 LNPeerAddr(host='94.177.171.73', port=9735, pubkey=bfh('0276e09a267592e7451a939c932cf685f0754de382a3ca85d2fb3a864d4c365ad5')), 119 LNPeerAddr(host='34.236.113.58', port=9735, pubkey=bfh('02fa50c72ee1e2eb5f1b6d9c3032080c4c864373c4201dfa2966aa34eee1051f97')), 120 LNPeerAddr(host='52.50.244.44', port=9735, pubkey=bfh('030c3f19d742ca294a55c00376b3b355c3c90d61c6b6b39554dbc7ac19b141c14f')), 121 LNPeerAddr(host='157.245.68.47', port=9735, pubkey=bfh('03c2abfa93eacec04721c019644584424aab2ba4dff3ac9bdab4e9c97007491dda')), 122 LNPeerAddr(host='18.221.23.28', port=9735, pubkey=bfh('03abf6f44c355dec0d5aa155bdbdd6e0c8fefe318eff402de65c6eb2e1be55dc3e')), 123 LNPeerAddr(host='52.224.178.244', port=9735, pubkey=bfh('026b105ac13212c48714c6be9b11577a9ce10f10e1c88a45ce217e6331209faf8b')), 124 LNPeerAddr(host='34.239.230.56', port=9735, pubkey=bfh('03864ef025fde8fb587d989186ce6a4a186895ee44a926bfc370e2c366597a3f8f')), 125 LNPeerAddr(host='46.229.165.136', port=9735, pubkey=bfh('0390b5d4492dc2f5318e5233ab2cebf6d48914881a33ef6a9c6bcdbb433ad986d0')), 126 LNPeerAddr(host='157.230.28.160', port=9735, pubkey=bfh('0279c22ed7a068d10dc1a38ae66d2d6461e269226c60258c021b1ddcdfe4b00bc4')), 127 LNPeerAddr(host='74.108.13.152', port=9735, pubkey=bfh('0331f80652fb840239df8dc99205792bba2e559a05469915804c08420230e23c7c')), 128 LNPeerAddr(host='167.172.44.148', port=9735, pubkey=bfh('0395033b252c6f40e3756984162d68174e2bd8060a129c0d3462a9370471c6d28f')), 129 LNPeerAddr(host='138.68.14.104', port=9735, pubkey=bfh('03bb88ccc444534da7b5b64b4f7b15e1eccb18e102db0e400d4b9cfe93763aa26d')), 130 LNPeerAddr(host='3.124.63.44', port=9735, pubkey=bfh('0242a4ae0c5bef18048fbecf995094b74bfb0f7391418d71ed394784373f41e4f3')), 131 LNPeerAddr(host='2001:470:8:2e1::43', port=9735, pubkey=bfh('03baa70886d9200af0ffbd3f9e18d96008331c858456b16e3a9b41e735c6208fef')), 132 LNPeerAddr(host='2601:186:c100:6bcd:219:d1ff:fe75:dc2f', port=9735, pubkey=bfh('0298f6074a454a1f5345cb2a7c6f9fce206cd0bf675d177cdbf0ca7508dd28852f')), 133 LNPeerAddr(host='2001:41d0:e:734::1', port=9735, pubkey=bfh('03a503d8e30f2ff407096d235b5db63b4fcf3f89a653acb6f43d3fc492a7674019')), 134 LNPeerAddr(host='2a01:4f9:2b:2254::2', port=9735, pubkey=bfh('02f3069a342ae2883a6f29e275f06f28a56a6ea2e2d96f5888a3266444dcf542b6')), 135 LNPeerAddr(host='2a02:8070:24c1:100:528c:2997:6dbc:a054', port=9735, pubkey=bfh('02a45def9ae014fdd2603dd7033d157faa3a55a72b06a63ae22ef46d9fafdc6e8d')), 136 LNPeerAddr(host='2600:3c01::f03c:91ff:fe05:349c', port=9736, pubkey=bfh('02731b798b39a09f9f14e90ee601afb6ebb796d6e5797de14582a978770b33700f')), 137 LNPeerAddr(host='2a00:8a60:e012:a00::21', port=9735, pubkey=bfh('027ce055380348d7812d2ae7745701c9f93e70c1adeb2657f053f91df4f2843c71')), 138 LNPeerAddr(host='2604:a880:400:d1::8bd:1001', port=9735, pubkey=bfh('03649c72a4816f0cd546f84aafbd657e92a30ab474de7ab795e8b5650a427611f7')), 139 LNPeerAddr(host='2a01:4f8:c0c:7b31::1', port=9735, pubkey=bfh('02c16cca44562b590dd279c942200bdccfd4f990c3a69fad620c10ef2f8228eaff')), 140 LNPeerAddr(host='2001:41d0:1:b40d::1', port=9735, pubkey=bfh('026726a4b043d413b45b334876d17b8a98848129604429ec65532ba286a42efeac')), 141 ] 142 143 144 from .trampoline import trampolines_by_id, hardcoded_trampoline_nodes, is_hardcoded_trampoline 145 146 147 class PaymentInfo(NamedTuple): 148 payment_hash: bytes 149 amount_msat: Optional[int] 150 direction: int 151 status: int 152 153 154 class ErrorAddingPeer(Exception): pass 155 156 157 # set some feature flags as baseline for both LNWallet and LNGossip 158 # note that e.g. DATA_LOSS_PROTECT is needed for LNGossip as many peers require it 159 BASE_FEATURES = LnFeatures(0)\ 160 | LnFeatures.OPTION_DATA_LOSS_PROTECT_OPT\ 161 | LnFeatures.OPTION_STATIC_REMOTEKEY_OPT\ 162 | LnFeatures.VAR_ONION_OPT\ 163 | LnFeatures.PAYMENT_SECRET_OPT\ 164 | LnFeatures.OPTION_UPFRONT_SHUTDOWN_SCRIPT_OPT 165 166 # we do not want to receive unrequested gossip (see lnpeer.maybe_save_remote_update) 167 LNWALLET_FEATURES = BASE_FEATURES\ 168 | LnFeatures.OPTION_DATA_LOSS_PROTECT_REQ\ 169 | LnFeatures.OPTION_STATIC_REMOTEKEY_REQ\ 170 | LnFeatures.GOSSIP_QUERIES_REQ\ 171 | LnFeatures.BASIC_MPP_OPT\ 172 | LnFeatures.OPTION_TRAMPOLINE_ROUTING_OPT 173 174 LNGOSSIP_FEATURES = BASE_FEATURES\ 175 | LnFeatures.GOSSIP_QUERIES_OPT\ 176 | LnFeatures.GOSSIP_QUERIES_REQ 177 178 179 class LNWorker(Logger, NetworkRetryManager[LNPeerAddr]): 180 181 def __init__(self, xprv, features: LnFeatures): 182 Logger.__init__(self) 183 NetworkRetryManager.__init__( 184 self, 185 max_retry_delay_normal=3600, 186 init_retry_delay_normal=600, 187 max_retry_delay_urgent=300, 188 init_retry_delay_urgent=4, 189 ) 190 self.lock = threading.RLock() 191 self.node_keypair = generate_keypair(BIP32Node.from_xkey(xprv), LnKeyFamily.NODE_KEY) 192 self._peers = {} # type: Dict[bytes, Peer] # pubkey -> Peer # needs self.lock 193 self.taskgroup = SilentTaskGroup() 194 self.listen_server = None # type: Optional[asyncio.AbstractServer] 195 self.features = features 196 self.network = None # type: Optional[Network] 197 self.config = None # type: Optional[SimpleConfig] 198 self.stopping_soon = False # whether we are being shut down 199 200 util.register_callback(self.on_proxy_changed, ['proxy_set']) 201 202 @property 203 def channel_db(self): 204 return self.network.channel_db if self.network else None 205 206 @property 207 def peers(self) -> Mapping[bytes, Peer]: 208 """Returns a read-only copy of peers.""" 209 with self.lock: 210 return self._peers.copy() 211 212 def channels_for_peer(self, node_id: bytes) -> Dict[bytes, Channel]: 213 return {} 214 215 def get_node_alias(self, node_id: bytes) -> Optional[str]: 216 """Returns the alias of the node, or None if unknown.""" 217 node_alias = None 218 if self.channel_db: 219 node_info = self.channel_db.get_node_info_for_node_id(node_id) 220 if node_info: 221 node_alias = node_info.alias 222 else: 223 for k, v in hardcoded_trampoline_nodes().items(): 224 if v.pubkey == node_id: 225 node_alias = k 226 break 227 return node_alias 228 229 async def maybe_listen(self): 230 # FIXME: only one LNWorker can listen at a time (single port) 231 listen_addr = self.config.get('lightning_listen') 232 if listen_addr: 233 self.logger.info(f'lightning_listen enabled. will try to bind: {listen_addr!r}') 234 try: 235 netaddr = NetAddress.from_string(listen_addr) 236 except Exception as e: 237 self.logger.error(f"failed to parse config key 'lightning_listen'. got: {e!r}") 238 return 239 addr = str(netaddr.host) 240 async def cb(reader, writer): 241 transport = LNResponderTransport(self.node_keypair.privkey, reader, writer) 242 try: 243 node_id = await transport.handshake() 244 except Exception as e: 245 self.logger.info(f'handshake failure from incoming connection: {e!r}') 246 return 247 peer = Peer(self, node_id, transport) 248 with self.lock: 249 self._peers[node_id] = peer 250 await self.taskgroup.spawn(peer.main_loop()) 251 try: 252 self.listen_server = await asyncio.start_server(cb, addr, netaddr.port) 253 except OSError as e: 254 self.logger.error(f"cannot listen for lightning p2p. error: {e!r}") 255 256 @ignore_exceptions # don't kill outer taskgroup 257 async def main_loop(self): 258 self.logger.info("starting taskgroup.") 259 try: 260 async with self.taskgroup as group: 261 await group.spawn(self._maintain_connectivity()) 262 except asyncio.CancelledError: 263 raise 264 except Exception as e: 265 self.logger.exception("taskgroup died.") 266 finally: 267 self.logger.info("taskgroup stopped.") 268 269 async def _maintain_connectivity(self): 270 while True: 271 await asyncio.sleep(1) 272 if self.stopping_soon: 273 return 274 now = time.time() 275 if len(self._peers) >= NUM_PEERS_TARGET: 276 continue 277 peers = await self._get_next_peers_to_try() 278 for peer in peers: 279 if self._can_retry_addr(peer, now=now): 280 try: 281 await self._add_peer(peer.host, peer.port, peer.pubkey) 282 except ErrorAddingPeer as e: 283 self.logger.info(f"failed to add peer: {peer}. exc: {e!r}") 284 285 async def _add_peer(self, host: str, port: int, node_id: bytes) -> Peer: 286 if node_id in self._peers: 287 return self._peers[node_id] 288 port = int(port) 289 peer_addr = LNPeerAddr(host, port, node_id) 290 self._trying_addr_now(peer_addr) 291 self.logger.info(f"adding peer {peer_addr}") 292 if node_id == self.node_keypair.pubkey: 293 raise ErrorAddingPeer("cannot connect to self") 294 transport = LNTransport(self.node_keypair.privkey, peer_addr, 295 proxy=self.network.proxy) 296 peer = Peer(self, node_id, transport) 297 await self.taskgroup.spawn(peer.main_loop()) 298 with self.lock: 299 self._peers[node_id] = peer 300 return peer 301 302 def peer_closed(self, peer: Peer) -> None: 303 with self.lock: 304 self._peers.pop(peer.pubkey, None) 305 306 def num_peers(self) -> int: 307 return sum([p.is_initialized() for p in self.peers.values()]) 308 309 def start_network(self, network: 'Network'): 310 assert network 311 self.network = network 312 self.config = network.config 313 self._add_peers_from_config() 314 asyncio.run_coroutine_threadsafe(self.main_loop(), self.network.asyncio_loop) 315 316 async def stop(self): 317 if self.listen_server: 318 self.listen_server.close() 319 util.unregister_callback(self.on_proxy_changed) 320 await self.taskgroup.cancel_remaining() 321 322 def _add_peers_from_config(self): 323 peer_list = self.config.get('lightning_peers', []) 324 for host, port, pubkey in peer_list: 325 asyncio.run_coroutine_threadsafe( 326 self._add_peer(host, int(port), bfh(pubkey)), 327 self.network.asyncio_loop) 328 329 def is_good_peer(self, peer: LNPeerAddr) -> bool: 330 # the purpose of this method is to filter peers that advertise the desired feature bits 331 # it is disabled for now, because feature bits published in node announcements seem to be unreliable 332 return True 333 node_id = peer.pubkey 334 node = self.channel_db._nodes.get(node_id) 335 if not node: 336 return False 337 try: 338 ln_compare_features(self.features, node.features) 339 except IncompatibleLightningFeatures: 340 return False 341 #self.logger.info(f'is_good {peer.host}') 342 return True 343 344 def on_peer_successfully_established(self, peer: Peer) -> None: 345 if isinstance(peer.transport, LNTransport): 346 peer_addr = peer.transport.peer_addr 347 # reset connection attempt count 348 self._on_connection_successfully_established(peer_addr) 349 # add into channel db 350 if self.channel_db: 351 self.channel_db.add_recent_peer(peer_addr) 352 # save network address into channels we might have with peer 353 for chan in peer.channels.values(): 354 chan.add_or_update_peer_addr(peer_addr) 355 356 async def _get_next_peers_to_try(self) -> Sequence[LNPeerAddr]: 357 now = time.time() 358 await self.channel_db.data_loaded.wait() 359 # first try from recent peers 360 recent_peers = self.channel_db.get_recent_peers() 361 for peer in recent_peers: 362 if not peer: 363 continue 364 if peer.pubkey in self._peers: 365 continue 366 if not self._can_retry_addr(peer, now=now): 367 continue 368 if not self.is_good_peer(peer): 369 continue 370 return [peer] 371 # try random peer from graph 372 unconnected_nodes = self.channel_db.get_200_randomly_sorted_nodes_not_in(self.peers.keys()) 373 if unconnected_nodes: 374 for node_id in unconnected_nodes: 375 addrs = self.channel_db.get_node_addresses(node_id) 376 if not addrs: 377 continue 378 host, port, timestamp = self.choose_preferred_address(list(addrs)) 379 try: 380 peer = LNPeerAddr(host, port, node_id) 381 except ValueError: 382 continue 383 if not self._can_retry_addr(peer, now=now): 384 continue 385 if not self.is_good_peer(peer): 386 continue 387 #self.logger.info('taking random ln peer from our channel db') 388 return [peer] 389 390 # getting desperate... let's try hardcoded fallback list of peers 391 if constants.net in (constants.BitcoinTestnet, ): 392 fallback_list = FALLBACK_NODE_LIST_TESTNET 393 elif constants.net in (constants.BitcoinMainnet, ): 394 fallback_list = FALLBACK_NODE_LIST_MAINNET 395 else: 396 return [] # regtest?? 397 398 fallback_list = [peer for peer in fallback_list if self._can_retry_addr(peer, now=now)] 399 if fallback_list: 400 return [random.choice(fallback_list)] 401 402 # last resort: try dns seeds (BOLT-10) 403 return await run_in_thread(self._get_peers_from_dns_seeds) 404 405 def _get_peers_from_dns_seeds(self) -> Sequence[LNPeerAddr]: 406 # NOTE: potentially long blocking call, do not run directly on asyncio event loop. 407 # Return several peers to reduce the number of dns queries. 408 if not constants.net.LN_DNS_SEEDS: 409 return [] 410 dns_seed = random.choice(constants.net.LN_DNS_SEEDS) 411 self.logger.info('asking dns seed "{}" for ln peers'.format(dns_seed)) 412 try: 413 # note: this might block for several seconds 414 # this will include bech32-encoded-pubkeys and ports 415 srv_answers = resolve_dns_srv('r{}.{}'.format( 416 constants.net.LN_REALM_BYTE, dns_seed)) 417 except dns.exception.DNSException as e: 418 self.logger.info(f'failed querying (1) dns seed "{dns_seed}" for ln peers: {repr(e)}') 419 return [] 420 random.shuffle(srv_answers) 421 num_peers = 2 * NUM_PEERS_TARGET 422 srv_answers = srv_answers[:num_peers] 423 # we now have pubkeys and ports but host is still needed 424 peers = [] 425 for srv_ans in srv_answers: 426 try: 427 # note: this might block for several seconds 428 answers = dns.resolver.resolve(srv_ans['host']) 429 except dns.exception.DNSException as e: 430 self.logger.info(f'failed querying (2) dns seed "{dns_seed}" for ln peers: {repr(e)}') 431 continue 432 try: 433 ln_host = str(answers[0]) 434 port = int(srv_ans['port']) 435 bech32_pubkey = srv_ans['host'].split('.')[0] 436 pubkey = get_compressed_pubkey_from_bech32(bech32_pubkey) 437 peers.append(LNPeerAddr(ln_host, port, pubkey)) 438 except Exception as e: 439 self.logger.info(f'error with parsing peer from dns seed: {repr(e)}') 440 continue 441 self.logger.info(f'got {len(peers)} ln peers from dns seed') 442 return peers 443 444 @staticmethod 445 def choose_preferred_address(addr_list: Sequence[Tuple[str, int, int]]) -> Tuple[str, int, int]: 446 assert len(addr_list) >= 1 447 # choose first one that is an IP 448 for host, port, timestamp in addr_list: 449 if is_ip_address(host): 450 return host, port, timestamp 451 # otherwise choose one at random 452 # TODO maybe filter out onion if not on tor? 453 choice = random.choice(addr_list) 454 return choice 455 456 def on_proxy_changed(self, event, *args): 457 for peer in self.peers.values(): 458 peer.close_and_cleanup() 459 self._clear_addr_retry_times() 460 461 @log_exceptions 462 async def add_peer(self, connect_str: str) -> Peer: 463 node_id, rest = extract_nodeid(connect_str) 464 peer = self._peers.get(node_id) 465 if not peer: 466 if rest is not None: 467 host, port = split_host_port(rest) 468 else: 469 if not self.channel_db: 470 addr = trampolines_by_id().get(node_id) 471 if not addr: 472 raise ConnStringFormatError(_('Address unknown for node:') + ' ' + bh2u(node_id)) 473 host, port = addr.host, addr.port 474 else: 475 addrs = self.channel_db.get_node_addresses(node_id) 476 if not addrs: 477 raise ConnStringFormatError(_('Don\'t know any addresses for node:') + ' ' + bh2u(node_id)) 478 host, port, timestamp = self.choose_preferred_address(list(addrs)) 479 port = int(port) 480 # Try DNS-resolving the host (if needed). This is simply so that 481 # the caller gets a nice exception if it cannot be resolved. 482 try: 483 await asyncio.get_event_loop().getaddrinfo(host, port) 484 except socket.gaierror: 485 raise ConnStringFormatError(_('Hostname does not resolve (getaddrinfo failed)')) 486 # add peer 487 peer = await self._add_peer(host, port, node_id) 488 return peer 489 490 491 class LNGossip(LNWorker): 492 max_age = 14*24*3600 493 LOGGING_SHORTCUT = 'g' 494 495 def __init__(self): 496 seed = os.urandom(32) 497 node = BIP32Node.from_rootseed(seed, xtype='standard') 498 xprv = node.to_xprv() 499 super().__init__(xprv, LNGOSSIP_FEATURES) 500 self.unknown_ids = set() 501 502 def start_network(self, network: 'Network'): 503 assert network 504 super().start_network(network) 505 asyncio.run_coroutine_threadsafe(self.taskgroup.spawn(self.maintain_db()), self.network.asyncio_loop) 506 507 async def maintain_db(self): 508 await self.channel_db.data_loaded.wait() 509 while True: 510 if len(self.unknown_ids) == 0: 511 self.channel_db.prune_old_policies(self.max_age) 512 self.channel_db.prune_orphaned_channels() 513 await asyncio.sleep(120) 514 515 async def add_new_ids(self, ids: Iterable[bytes]): 516 known = self.channel_db.get_channel_ids() 517 new = set(ids) - set(known) 518 self.unknown_ids.update(new) 519 util.trigger_callback('unknown_channels', len(self.unknown_ids)) 520 util.trigger_callback('gossip_peers', self.num_peers()) 521 util.trigger_callback('ln_gossip_sync_progress') 522 523 def get_ids_to_query(self) -> Sequence[bytes]: 524 N = 500 525 l = list(self.unknown_ids) 526 self.unknown_ids = set(l[N:]) 527 util.trigger_callback('unknown_channels', len(self.unknown_ids)) 528 util.trigger_callback('ln_gossip_sync_progress') 529 return l[0:N] 530 531 def get_sync_progress_estimate(self) -> Tuple[Optional[int], Optional[int], Optional[int]]: 532 """Estimates the gossip synchronization process and returns the number 533 of synchronized channels, the total channels in the network and a 534 rescaled percentage of the synchronization process.""" 535 if self.num_peers() == 0: 536 return None, None, None 537 nchans_with_0p, nchans_with_1p, nchans_with_2p = self.channel_db.get_num_channels_partitioned_by_policy_count() 538 num_db_channels = nchans_with_0p + nchans_with_1p + nchans_with_2p 539 # some channels will never have two policies (only one is in gossip?...) 540 # so if we have at least 1 policy for a channel, we consider that channel "complete" here 541 current_est = num_db_channels - nchans_with_0p 542 total_est = len(self.unknown_ids) + num_db_channels 543 544 progress = current_est / total_est if total_est and current_est else 0 545 progress_percent = (1.0 / 0.95 * progress) * 100 546 progress_percent = min(progress_percent, 100) 547 progress_percent = round(progress_percent) 548 # take a minimal number of synchronized channels to get a more accurate 549 # percentage estimate 550 if current_est < 200: 551 progress_percent = 0 552 return current_est, total_est, progress_percent 553 554 async def process_gossip(self, chan_anns, node_anns, chan_upds): 555 await self.channel_db.data_loaded.wait() 556 self.logger.debug(f'process_gossip {len(chan_anns)} {len(node_anns)} {len(chan_upds)}') 557 # note: data processed in chunks to avoid taking sql lock for too long 558 # channel announcements 559 for chan_anns_chunk in chunks(chan_anns, 300): 560 self.channel_db.add_channel_announcement(chan_anns_chunk) 561 # node announcements 562 for node_anns_chunk in chunks(node_anns, 100): 563 self.channel_db.add_node_announcement(node_anns_chunk) 564 # channel updates 565 for chan_upds_chunk in chunks(chan_upds, 1000): 566 categorized_chan_upds = self.channel_db.add_channel_updates( 567 chan_upds_chunk, max_age=self.max_age) 568 orphaned = categorized_chan_upds.orphaned 569 if orphaned: 570 self.logger.info(f'adding {len(orphaned)} unknown channel ids') 571 orphaned_ids = [c['short_channel_id'] for c in orphaned] 572 await self.add_new_ids(orphaned_ids) 573 if categorized_chan_upds.good: 574 self.logger.debug(f'on_channel_update: {len(categorized_chan_upds.good)}/{len(chan_upds_chunk)}') 575 576 577 class LNWallet(LNWorker): 578 579 lnwatcher: Optional['LNWalletWatcher'] 580 MPP_EXPIRY = 120 581 TIMEOUT_SHUTDOWN_FAIL_PENDING_HTLCS = 3 # seconds 582 583 def __init__(self, wallet: 'Abstract_Wallet', xprv): 584 self.wallet = wallet 585 self.db = wallet.db 586 Logger.__init__(self) 587 LNWorker.__init__(self, xprv, LNWALLET_FEATURES) 588 self.config = wallet.config 589 self.lnwatcher = None 590 self.lnrater: LNRater = None 591 self.payments = self.db.get_dict('lightning_payments') # RHASH -> amount, direction, is_paid 592 self.preimages = self.db.get_dict('lightning_preimages') # RHASH -> preimage 593 # note: this sweep_address is only used as fallback; as it might result in address-reuse 594 self.sweep_address = wallet.get_new_sweep_address_for_channel() 595 self.logs = defaultdict(list) # type: Dict[str, List[HtlcLog]] # key is RHASH # (not persisted) 596 # used in tests 597 self.enable_htlc_settle = asyncio.Event() 598 self.enable_htlc_settle.set() 599 self.enable_htlc_forwarding = asyncio.Event() 600 self.enable_htlc_forwarding.set() 601 602 # note: accessing channels (besides simple lookup) needs self.lock! 603 self._channels = {} # type: Dict[bytes, Channel] 604 channels = self.db.get_dict("channels") 605 for channel_id, c in random_shuffled_copy(channels.items()): 606 self._channels[bfh(channel_id)] = Channel(c, sweep_address=self.sweep_address, lnworker=self) 607 608 self._channel_backups = {} # type: Dict[bytes, Channel] 609 channel_backups = self.db.get_dict("channel_backups") 610 for channel_id, cb in random_shuffled_copy(channel_backups.items()): 611 self._channel_backups[bfh(channel_id)] = ChannelBackup(cb, sweep_address=self.sweep_address, lnworker=self) 612 613 self.sent_htlcs = defaultdict(asyncio.Queue) # type: Dict[bytes, asyncio.Queue[HtlcLog]] 614 self.sent_htlcs_routes = dict() # (RHASH, scid, htlc_id) -> route, payment_secret, amount_msat, bucket_msat 615 self.sent_buckets = dict() # payment_secret -> (amount_sent, amount_failed) 616 self.received_mpp_htlcs = dict() # RHASH -> mpp_status, htlc_set 617 618 self.swap_manager = SwapManager(wallet=self.wallet, lnworker=self) 619 # detect inflight payments 620 self.inflight_payments = set() # (not persisted) keys of invoices that are in PR_INFLIGHT state 621 for payment_hash in self.get_payments(status='inflight').keys(): 622 self.set_invoice_status(payment_hash.hex(), PR_INFLIGHT) 623 624 self.trampoline_forwarding_failures = {} # todo: should be persisted 625 626 @property 627 def channels(self) -> Mapping[bytes, Channel]: 628 """Returns a read-only copy of channels.""" 629 with self.lock: 630 return self._channels.copy() 631 632 @property 633 def channel_backups(self) -> Mapping[bytes, Channel]: 634 """Returns a read-only copy of channels.""" 635 with self.lock: 636 return self._channel_backups.copy() 637 638 def get_channel_by_id(self, channel_id: bytes) -> Optional[Channel]: 639 return self._channels.get(channel_id, None) 640 641 def diagnostic_name(self): 642 return self.wallet.diagnostic_name() 643 644 @ignore_exceptions 645 @log_exceptions 646 async def sync_with_local_watchtower(self): 647 watchtower = self.network.local_watchtower 648 if watchtower: 649 while True: 650 for chan in self.channels.values(): 651 await self.sync_channel_with_watchtower(chan, watchtower.sweepstore) 652 await asyncio.sleep(5) 653 654 @ignore_exceptions 655 @log_exceptions 656 async def sync_with_remote_watchtower(self): 657 while True: 658 # periodically poll if the user updated 'watchtower_url' 659 await asyncio.sleep(5) 660 watchtower_url = self.config.get('watchtower_url') 661 if not watchtower_url: 662 continue 663 parsed_url = urllib.parse.urlparse(watchtower_url) 664 if not (parsed_url.scheme == 'https' or is_private_netaddress(parsed_url.hostname)): 665 self.logger.warning(f"got watchtower URL for remote tower but we won't use it! " 666 f"can only use HTTPS (except if private IP): not using {watchtower_url!r}") 667 continue 668 # try to sync with the remote watchtower 669 try: 670 async with make_aiohttp_session(proxy=self.network.proxy) as session: 671 watchtower = JsonRPCClient(session, watchtower_url) 672 watchtower.add_method('get_ctn') 673 watchtower.add_method('add_sweep_tx') 674 for chan in self.channels.values(): 675 await self.sync_channel_with_watchtower(chan, watchtower) 676 except aiohttp.client_exceptions.ClientConnectorError: 677 self.logger.info(f'could not contact remote watchtower {watchtower_url}') 678 679 async def sync_channel_with_watchtower(self, chan: Channel, watchtower): 680 outpoint = chan.funding_outpoint.to_str() 681 addr = chan.get_funding_address() 682 current_ctn = chan.get_oldest_unrevoked_ctn(REMOTE) 683 watchtower_ctn = await watchtower.get_ctn(outpoint, addr) 684 for ctn in range(watchtower_ctn + 1, current_ctn): 685 sweeptxs = chan.create_sweeptxs(ctn) 686 for tx in sweeptxs: 687 await watchtower.add_sweep_tx(outpoint, ctn, tx.inputs()[0].prevout.to_str(), tx.serialize()) 688 689 def start_network(self, network: 'Network'): 690 assert network 691 self.network = network 692 self.config = network.config 693 self.lnwatcher = LNWalletWatcher(self, network) 694 self.lnwatcher.start_network(network) 695 self.swap_manager.start_network(network=network, lnwatcher=self.lnwatcher) 696 self.lnrater = LNRater(self, network) 697 698 for chan in self.channels.values(): 699 self.lnwatcher.add_channel(chan.funding_outpoint.to_str(), chan.get_funding_address()) 700 for cb in self.channel_backups.values(): 701 self.lnwatcher.add_channel(cb.funding_outpoint.to_str(), cb.get_funding_address()) 702 703 for coro in [ 704 self.maybe_listen(), 705 self.lnwatcher.on_network_update('network_updated'), # shortcut (don't block) if funding tx locked and verified 706 self.reestablish_peers_and_channels(), 707 self.sync_with_local_watchtower(), 708 self.sync_with_remote_watchtower(), 709 ]: 710 tg_coro = self.taskgroup.spawn(coro) 711 asyncio.run_coroutine_threadsafe(tg_coro, self.network.asyncio_loop) 712 713 async def stop(self): 714 self.stopping_soon = True 715 if self.listen_server: # stop accepting new peers 716 self.listen_server.close() 717 async with ignore_after(self.TIMEOUT_SHUTDOWN_FAIL_PENDING_HTLCS): 718 await self.wait_for_received_pending_htlcs_to_get_removed() 719 await LNWorker.stop(self) 720 if self.lnwatcher: 721 await self.lnwatcher.stop() 722 self.lnwatcher = None 723 724 async def wait_for_received_pending_htlcs_to_get_removed(self): 725 assert self.stopping_soon is True 726 # We try to fail pending MPP HTLCs, and wait a bit for them to get removed. 727 # Note: even without MPP, if we just failed/fulfilled an HTLC, it is good 728 # to wait a bit for it to become irrevocably removed. 729 # Note: we don't wait for *all htlcs* to get removed, only for those 730 # that we can already fail/fulfill. e.g. forwarded htlcs cannot be removed 731 async with TaskGroup() as group: 732 for peer in self.peers.values(): 733 await group.spawn(peer.wait_one_htlc_switch_iteration()) 734 while True: 735 if all(not peer.received_htlcs_pending_removal for peer in self.peers.values()): 736 break 737 async with TaskGroup(wait=any) as group: 738 for peer in self.peers.values(): 739 await group.spawn(peer.received_htlc_removed_event.wait()) 740 741 def peer_closed(self, peer): 742 for chan in self.channels_for_peer(peer.pubkey).values(): 743 chan.peer_state = PeerState.DISCONNECTED 744 util.trigger_callback('channel', self.wallet, chan) 745 super().peer_closed(peer) 746 747 def get_payments(self, *, status=None): 748 # return one item per payment_hash 749 # note: with AMP we will have several channels per payment 750 out = defaultdict(list) 751 for chan in self.channels.values(): 752 d = chan.get_payments(status=status) 753 for k, v in d.items(): 754 out[k] += v 755 return out 756 757 def get_payment_value(self, info: Optional['PaymentInfo'], plist): 758 amount_msat = 0 759 fee_msat = None 760 for chan_id, htlc, _direction, _status in plist: 761 amount_msat += int(_direction) * htlc.amount_msat 762 if _direction == SENT and info and info.amount_msat: 763 fee_msat = (fee_msat or 0) - info.amount_msat - amount_msat 764 timestamp = min([htlc.timestamp for chan_id, htlc, _direction, _status in plist]) 765 return amount_msat, fee_msat, timestamp 766 767 def get_lightning_history(self): 768 out = {} 769 for payment_hash, plist in self.get_payments(status='settled').items(): 770 if len(plist) == 0: 771 continue 772 key = payment_hash.hex() 773 info = self.get_payment_info(payment_hash) 774 amount_msat, fee_msat, timestamp = self.get_payment_value(info, plist) 775 if info is not None: 776 label = self.wallet.get_label(key) 777 direction = ('sent' if info.direction == SENT else 'received') if len(plist)==1 else 'self-payment' 778 else: 779 direction = 'forwarding' 780 label = _('Forwarding') 781 preimage = self.get_preimage(payment_hash).hex() 782 item = { 783 'type': 'payment', 784 'label': label, 785 'timestamp': timestamp or 0, 786 'date': timestamp_to_datetime(timestamp), 787 'direction': direction, 788 'amount_msat': amount_msat, 789 'fee_msat': fee_msat, 790 'payment_hash': key, 791 'preimage': preimage, 792 } 793 # add group_id to swap transactions 794 swap = self.swap_manager.get_swap(payment_hash) 795 if swap: 796 if swap.is_reverse: 797 item['group_id'] = swap.spending_txid 798 item['group_label'] = 'Reverse swap' + ' ' + self.config.format_amount_and_units(swap.lightning_amount) 799 else: 800 item['group_id'] = swap.funding_txid 801 item['group_label'] = 'Forward swap' + ' ' + self.config.format_amount_and_units(swap.onchain_amount) 802 # done 803 out[payment_hash] = item 804 return out 805 806 def get_onchain_history(self): 807 current_height = self.wallet.get_local_height() 808 out = {} 809 # add funding events 810 for chan in self.channels.values(): 811 item = chan.get_funding_height() 812 if item is None: 813 continue 814 funding_txid, funding_height, funding_timestamp = item 815 tx_height = self.lnwatcher.get_tx_height(funding_txid) 816 item = { 817 'channel_id': bh2u(chan.channel_id), 818 'type': 'channel_opening', 819 'label': self.wallet.get_label_for_txid(funding_txid) or (_('Open channel') + ' ' + chan.get_id_for_log()), 820 'txid': funding_txid, 821 'amount_msat': chan.balance(LOCAL, ctn=0), 822 'direction': 'received', 823 'timestamp': tx_height.timestamp, 824 'fee_sat': None, 825 'fee_msat': None, 826 'height': tx_height.height, 827 'confirmations': tx_height.conf, 828 } 829 out[funding_txid] = item 830 item = chan.get_closing_height() 831 if item is None: 832 continue 833 closing_txid, closing_height, closing_timestamp = item 834 tx_height = self.lnwatcher.get_tx_height(closing_txid) 835 item = { 836 'channel_id': bh2u(chan.channel_id), 837 'txid': closing_txid, 838 'label': self.wallet.get_label_for_txid(closing_txid) or (_('Close channel') + ' ' + chan.get_id_for_log()), 839 'type': 'channel_closure', 840 'amount_msat': -chan.balance_minus_outgoing_htlcs(LOCAL), 841 'direction': 'sent', 842 'timestamp': tx_height.timestamp, 843 'fee_sat': None, 844 'fee_msat': None, 845 'height': tx_height.height, 846 'confirmations': tx_height.conf, 847 } 848 out[closing_txid] = item 849 # add info about submarine swaps 850 settled_payments = self.get_payments(status='settled') 851 for payment_hash_hex, swap in self.swap_manager.swaps.items(): 852 txid = swap.spending_txid if swap.is_reverse else swap.funding_txid 853 if txid is None: 854 continue 855 payment_hash = bytes.fromhex(payment_hash_hex) 856 if payment_hash in settled_payments: 857 plist = settled_payments[payment_hash] 858 info = self.get_payment_info(payment_hash) 859 amount_msat, fee_msat, timestamp = self.get_payment_value(info, plist) 860 else: 861 amount_msat = 0 862 label = 'Reverse swap' if swap.is_reverse else 'Forward swap' 863 delta = current_height - swap.locktime 864 if not swap.is_redeemed and swap.spending_txid is None and delta < 0: 865 label += f' (refundable in {-delta} blocks)' # fixme: only if unspent 866 out[txid] = { 867 'txid': txid, 868 'group_id': txid, 869 'amount_msat': 0, 870 #'amount_msat': amount_msat, # must not be added 871 'type': 'swap', 872 'label': self.wallet.get_label_for_txid(txid) or label, 873 } 874 return out 875 876 def get_history(self): 877 out = list(self.get_lightning_history().values()) + list(self.get_onchain_history().values()) 878 # sort by timestamp 879 out.sort(key=lambda x: (x.get('timestamp') or float("inf"))) 880 balance_msat = 0 881 for item in out: 882 balance_msat += item['amount_msat'] 883 item['balance_msat'] = balance_msat 884 return out 885 886 def channel_peers(self) -> List[bytes]: 887 node_ids = [chan.node_id for chan in self.channels.values() if not chan.is_closed()] 888 return node_ids 889 890 def channels_for_peer(self, node_id): 891 assert type(node_id) is bytes 892 return {chan_id: chan for (chan_id, chan) in self.channels.items() 893 if chan.node_id == node_id} 894 895 def channel_state_changed(self, chan: Channel): 896 if type(chan) is Channel: 897 self.save_channel(chan) 898 util.trigger_callback('channel', self.wallet, chan) 899 900 def save_channel(self, chan: Channel): 901 assert type(chan) is Channel 902 if chan.config[REMOTE].next_per_commitment_point == chan.config[REMOTE].current_per_commitment_point: 903 raise Exception("Tried to save channel with next_point == current_point, this should not happen") 904 self.wallet.save_db() 905 util.trigger_callback('channel', self.wallet, chan) 906 907 def channel_by_txo(self, txo: str) -> Optional[Channel]: 908 for chan in self.channels.values(): 909 if chan.funding_outpoint.to_str() == txo: 910 return chan 911 for chan in self.channel_backups.values(): 912 if chan.funding_outpoint.to_str() == txo: 913 return chan 914 915 async def on_channel_update(self, chan: Channel): 916 if type(chan) is ChannelBackup: 917 util.trigger_callback('channel', self.wallet, chan) 918 return 919 920 if chan.get_state() == ChannelState.OPEN and chan.should_be_closed_due_to_expiring_htlcs(self.network.get_local_height()): 921 self.logger.info(f"force-closing due to expiring htlcs") 922 await self.try_force_closing(chan.channel_id) 923 924 elif chan.get_state() == ChannelState.FUNDED: 925 peer = self._peers.get(chan.node_id) 926 if peer and peer.is_initialized(): 927 peer.send_funding_locked(chan) 928 929 elif chan.get_state() == ChannelState.OPEN: 930 peer = self._peers.get(chan.node_id) 931 if peer: 932 await peer.maybe_update_fee(chan) 933 conf = self.lnwatcher.get_tx_height(chan.funding_outpoint.txid).conf 934 peer.on_network_update(chan, conf) 935 936 elif chan.get_state() == ChannelState.FORCE_CLOSING: 937 force_close_tx = chan.force_close_tx() 938 txid = force_close_tx.txid() 939 height = self.lnwatcher.get_tx_height(txid).height 940 if height == TX_HEIGHT_LOCAL: 941 self.logger.info('REBROADCASTING CLOSING TX') 942 await self.network.try_broadcasting(force_close_tx, 'force-close') 943 944 @log_exceptions 945 async def _open_channel_coroutine( 946 self, *, 947 connect_str: str, 948 funding_tx: PartialTransaction, 949 funding_sat: int, 950 push_sat: int, 951 password: Optional[str]) -> Tuple[Channel, PartialTransaction]: 952 953 peer = await self.add_peer(connect_str) 954 coro = peer.channel_establishment_flow( 955 funding_tx=funding_tx, 956 funding_sat=funding_sat, 957 push_msat=push_sat * 1000, 958 temp_channel_id=os.urandom(32)) 959 chan, funding_tx = await asyncio.wait_for(coro, LN_P2P_NETWORK_TIMEOUT) 960 util.trigger_callback('channels_updated', self.wallet) 961 self.wallet.add_transaction(funding_tx) # save tx as local into the wallet 962 self.wallet.sign_transaction(funding_tx, password) 963 self.wallet.set_label(funding_tx.txid(), _('Open channel')) 964 if funding_tx.is_complete(): 965 await self.network.try_broadcasting(funding_tx, 'open_channel') 966 return chan, funding_tx 967 968 def add_channel(self, chan: Channel): 969 with self.lock: 970 self._channels[chan.channel_id] = chan 971 self.lnwatcher.add_channel(chan.funding_outpoint.to_str(), chan.get_funding_address()) 972 973 def add_new_channel(self, chan: Channel): 974 self.add_channel(chan) 975 channels_db = self.db.get_dict('channels') 976 channels_db[chan.channel_id.hex()] = chan.storage 977 for addr in chan.get_wallet_addresses_channel_might_want_reserved(): 978 self.wallet.set_reserved_state_of_address(addr, reserved=True) 979 try: 980 self.save_channel(chan) 981 self.wallet.save_backup() 982 except: 983 chan.set_state(ChannelState.REDEEMED) 984 self.remove_channel(chan.channel_id) 985 raise 986 987 def mktx_for_open_channel(self, *, coins: Sequence[PartialTxInput], funding_sat: int, 988 fee_est=None) -> PartialTransaction: 989 dummy_address = ln_dummy_address() 990 outputs = [PartialTxOutput.from_address_and_value(dummy_address, funding_sat)] 991 tx = self.wallet.make_unsigned_transaction( 992 coins=coins, 993 outputs=outputs, 994 fee=fee_est) 995 tx.set_rbf(False) 996 return tx 997 998 def open_channel(self, *, connect_str: str, funding_tx: PartialTransaction, 999 funding_sat: int, push_amt_sat: int, password: str = None) -> Tuple[Channel, PartialTransaction]: 1000 if funding_sat > LN_MAX_FUNDING_SAT: 1001 raise Exception(_("Requested channel capacity is over protocol allowed maximum.")) 1002 coro = self._open_channel_coroutine( 1003 connect_str=connect_str, funding_tx=funding_tx, funding_sat=funding_sat, 1004 push_sat=push_amt_sat, password=password) 1005 fut = asyncio.run_coroutine_threadsafe(coro, self.network.asyncio_loop) 1006 try: 1007 chan, funding_tx = fut.result() 1008 except concurrent.futures.TimeoutError: 1009 raise Exception(_("open_channel timed out")) 1010 return chan, funding_tx 1011 1012 def get_channel_by_short_id(self, short_channel_id: bytes) -> Optional[Channel]: 1013 for chan in self.channels.values(): 1014 if chan.short_channel_id == short_channel_id: 1015 return chan 1016 1017 def create_routes_from_invoice(self, amount_msat: int, decoded_invoice: LnAddr, *, full_path=None): 1018 return self.create_routes_for_payment( 1019 amount_msat=amount_msat, 1020 final_total_msat=amount_msat, 1021 invoice_pubkey=decoded_invoice.pubkey.serialize(), 1022 min_cltv_expiry=decoded_invoice.get_min_final_cltv_expiry(), 1023 r_tags=decoded_invoice.get_routing_info('r'), 1024 invoice_features=decoded_invoice.get_features(), 1025 trampoline_fee_level=0, 1026 use_two_trampolines=False, 1027 payment_hash=decoded_invoice.paymenthash, 1028 payment_secret=decoded_invoice.payment_secret, 1029 full_path=full_path) 1030 1031 @log_exceptions 1032 async def pay_invoice( 1033 self, invoice: str, *, 1034 amount_msat: int = None, 1035 attempts: int = 1, 1036 full_path: LNPaymentPath = None) -> Tuple[bool, List[HtlcLog]]: 1037 1038 lnaddr = self._check_invoice(invoice, amount_msat=amount_msat) 1039 min_cltv_expiry = lnaddr.get_min_final_cltv_expiry() 1040 payment_hash = lnaddr.paymenthash 1041 key = payment_hash.hex() 1042 payment_secret = lnaddr.payment_secret 1043 invoice_pubkey = lnaddr.pubkey.serialize() 1044 invoice_features = lnaddr.get_features() 1045 r_tags = lnaddr.get_routing_info('r') 1046 amount_to_pay = lnaddr.get_amount_msat() 1047 status = self.get_payment_status(payment_hash) 1048 if status == PR_PAID: 1049 raise PaymentFailure(_("This invoice has been paid already")) 1050 if status == PR_INFLIGHT: 1051 raise PaymentFailure(_("A payment was already initiated for this invoice")) 1052 if payment_hash in self.get_payments(status='inflight'): 1053 raise PaymentFailure(_("A previous attempt to pay this invoice did not clear")) 1054 info = PaymentInfo(payment_hash, amount_to_pay, SENT, PR_UNPAID) 1055 self.save_payment_info(info) 1056 self.wallet.set_label(key, lnaddr.get_description()) 1057 1058 self.set_invoice_status(key, PR_INFLIGHT) 1059 try: 1060 await self.pay_to_node( 1061 node_pubkey=invoice_pubkey, 1062 payment_hash=payment_hash, 1063 payment_secret=payment_secret, 1064 amount_to_pay=amount_to_pay, 1065 min_cltv_expiry=min_cltv_expiry, 1066 r_tags=r_tags, 1067 invoice_features=invoice_features, 1068 attempts=attempts, 1069 full_path=full_path) 1070 success = True 1071 except PaymentFailure as e: 1072 self.logger.exception('') 1073 success = False 1074 reason = str(e) 1075 if success: 1076 self.set_invoice_status(key, PR_PAID) 1077 util.trigger_callback('payment_succeeded', self.wallet, key) 1078 else: 1079 self.set_invoice_status(key, PR_UNPAID) 1080 util.trigger_callback('payment_failed', self.wallet, key, reason) 1081 log = self.logs[key] 1082 return success, log 1083 1084 async def pay_to_node( 1085 self, *, 1086 node_pubkey: bytes, 1087 payment_hash: bytes, 1088 payment_secret: Optional[bytes], 1089 amount_to_pay: int, # in msat 1090 min_cltv_expiry: int, 1091 r_tags, 1092 invoice_features: int, 1093 attempts: int = 1, 1094 full_path: LNPaymentPath = None, 1095 fwd_trampoline_onion=None, 1096 fwd_trampoline_fee=None, 1097 fwd_trampoline_cltv_delta=None) -> None: 1098 1099 if fwd_trampoline_onion: 1100 # todo: compare to the fee of the actual route we found 1101 if fwd_trampoline_fee < 1000: 1102 raise OnionRoutingFailure(code=OnionFailureCode.TRAMPOLINE_FEE_INSUFFICIENT, data=b'') 1103 if fwd_trampoline_cltv_delta < 576: 1104 raise OnionRoutingFailure(code=OnionFailureCode.TRAMPOLINE_EXPIRY_TOO_SOON, data=b'') 1105 1106 self.logs[payment_hash.hex()] = log = [] 1107 trampoline_fee_level = 0 # only used for trampoline payments 1108 use_two_trampolines = True # only used for pay to legacy 1109 1110 amount_inflight = 0 # what we sent in htlcs (that receiver gets, without fees) 1111 while True: 1112 amount_to_send = amount_to_pay - amount_inflight 1113 if amount_to_send > 0: 1114 # 1. create a set of routes for remaining amount. 1115 # note: path-finding runs in a separate thread so that we don't block the asyncio loop 1116 # graph updates might occur during the computation 1117 routes = await run_in_thread(partial( 1118 self.create_routes_for_payment, 1119 amount_msat=amount_to_send, 1120 final_total_msat=amount_to_pay, 1121 invoice_pubkey=node_pubkey, 1122 min_cltv_expiry=min_cltv_expiry, 1123 r_tags=r_tags, 1124 invoice_features=invoice_features, 1125 full_path=full_path, 1126 payment_hash=payment_hash, 1127 payment_secret=payment_secret, 1128 trampoline_fee_level=trampoline_fee_level, 1129 use_two_trampolines=use_two_trampolines, 1130 fwd_trampoline_onion=fwd_trampoline_onion)) 1131 # 2. send htlcs 1132 for route, amount_msat, total_msat, amount_receiver_msat, cltv_delta, bucket_payment_secret, trampoline_onion in routes: 1133 amount_inflight += amount_receiver_msat 1134 if amount_inflight > amount_to_pay: # safety belts 1135 raise Exception(f"amount_inflight={amount_inflight} > amount_to_pay={amount_to_pay}") 1136 await self.pay_to_route( 1137 route=route, 1138 amount_msat=amount_msat, 1139 total_msat=total_msat, 1140 amount_receiver_msat=amount_receiver_msat, 1141 payment_hash=payment_hash, 1142 payment_secret=bucket_payment_secret, 1143 min_cltv_expiry=cltv_delta, 1144 trampoline_onion=trampoline_onion) 1145 util.trigger_callback('invoice_status', self.wallet, payment_hash.hex()) 1146 # 3. await a queue 1147 self.logger.info(f"amount inflight {amount_inflight}") 1148 htlc_log = await self.sent_htlcs[payment_hash].get() 1149 amount_inflight -= htlc_log.amount_msat 1150 if amount_inflight < 0: 1151 raise Exception(f"amount_inflight={amount_inflight} < 0") 1152 log.append(htlc_log) 1153 if htlc_log.success: 1154 return 1155 # htlc failed 1156 if len(log) >= attempts: 1157 raise PaymentFailure('Giving up after %d attempts'%len(log)) 1158 # if we get a tmp channel failure, it might work to split the amount and try more routes 1159 # if we get a channel update, we might retry the same route and amount 1160 route = htlc_log.route 1161 sender_idx = htlc_log.sender_idx 1162 failure_msg = htlc_log.failure_msg 1163 code, data = failure_msg.code, failure_msg.data 1164 self.logger.info(f"UPDATE_FAIL_HTLC {repr(code)} {data}") 1165 self.logger.info(f"error reported by {bh2u(route[sender_idx].node_id)}") 1166 if code == OnionFailureCode.MPP_TIMEOUT: 1167 raise PaymentFailure(failure_msg.code_name()) 1168 # trampoline 1169 if self.channel_db is None: 1170 if code == OnionFailureCode.TRAMPOLINE_FEE_INSUFFICIENT: 1171 # todo: parse the node parameters here (not returned by eclair yet) 1172 trampoline_fee_level += 1 1173 continue 1174 elif use_two_trampolines: 1175 use_two_trampolines = False 1176 else: 1177 raise PaymentFailure(failure_msg.code_name()) 1178 else: 1179 self.handle_error_code_from_failed_htlc(route, sender_idx, failure_msg, code, data) 1180 1181 async def pay_to_route( 1182 self, *, 1183 route: LNPaymentRoute, 1184 amount_msat: int, 1185 total_msat: int, 1186 amount_receiver_msat:int, 1187 payment_hash: bytes, 1188 payment_secret: Optional[bytes], 1189 min_cltv_expiry: int, 1190 trampoline_onion: bytes = None) -> None: 1191 1192 # send a single htlc 1193 short_channel_id = route[0].short_channel_id 1194 chan = self.get_channel_by_short_id(short_channel_id) 1195 peer = self._peers.get(route[0].node_id) 1196 if not peer: 1197 raise Exception('Dropped peer') 1198 await peer.initialized 1199 htlc = peer.pay( 1200 route=route, 1201 chan=chan, 1202 amount_msat=amount_msat, 1203 total_msat=total_msat, 1204 payment_hash=payment_hash, 1205 min_final_cltv_expiry=min_cltv_expiry, 1206 payment_secret=payment_secret, 1207 trampoline_onion=trampoline_onion) 1208 1209 key = (payment_hash, short_channel_id, htlc.htlc_id) 1210 self.sent_htlcs_routes[key] = route, payment_secret, amount_msat, total_msat, amount_receiver_msat 1211 # if we sent MPP to a trampoline, add item to sent_buckets 1212 if not self.channel_db and amount_msat != total_msat: 1213 if payment_secret not in self.sent_buckets: 1214 self.sent_buckets[payment_secret] = (0, 0) 1215 amount_sent, amount_failed = self.sent_buckets[payment_secret] 1216 amount_sent += amount_receiver_msat 1217 self.sent_buckets[payment_secret] = amount_sent, amount_failed 1218 util.trigger_callback('htlc_added', chan, htlc, SENT) 1219 1220 1221 def handle_error_code_from_failed_htlc(self, route, sender_idx, failure_msg, code, data): 1222 # handle some specific error codes 1223 failure_codes = { 1224 OnionFailureCode.TEMPORARY_CHANNEL_FAILURE: 0, 1225 OnionFailureCode.AMOUNT_BELOW_MINIMUM: 8, 1226 OnionFailureCode.FEE_INSUFFICIENT: 8, 1227 OnionFailureCode.INCORRECT_CLTV_EXPIRY: 4, 1228 OnionFailureCode.EXPIRY_TOO_SOON: 0, 1229 OnionFailureCode.CHANNEL_DISABLED: 2, 1230 } 1231 blacklist = False 1232 update = False 1233 if code in failure_codes: 1234 offset = failure_codes[code] 1235 channel_update_len = int.from_bytes(data[offset:offset+2], byteorder="big") 1236 channel_update_as_received = data[offset+2: offset+2+channel_update_len] 1237 payload = self._decode_channel_update_msg(channel_update_as_received) 1238 if payload is None: 1239 self.logger.info(f'could not decode channel_update for failed htlc: {channel_update_as_received.hex()}') 1240 blacklist = True 1241 else: 1242 r = self.channel_db.add_channel_update(payload) 1243 short_channel_id = ShortChannelID(payload['short_channel_id']) 1244 if r == UpdateStatus.GOOD: 1245 self.logger.info(f"applied channel update to {short_channel_id}") 1246 # TODO: test this 1247 for chan in self.channels.values(): 1248 if chan.short_channel_id == short_channel_id: 1249 chan.set_remote_update(payload['raw']) 1250 update = True 1251 elif r == UpdateStatus.ORPHANED: 1252 # maybe it is a private channel (and data in invoice was outdated) 1253 self.logger.info(f"Could not find {short_channel_id}. maybe update is for private channel?") 1254 start_node_id = route[sender_idx].node_id 1255 update = self.channel_db.add_channel_update_for_private_channel(payload, start_node_id) 1256 blacklist = not update 1257 elif r == UpdateStatus.EXPIRED: 1258 blacklist = True 1259 elif r == UpdateStatus.DEPRECATED: 1260 self.logger.info(f'channel update is not more recent.') 1261 blacklist = True 1262 elif r == UpdateStatus.UNCHANGED: 1263 blacklist = True 1264 else: 1265 blacklist = True 1266 1267 if blacklist: 1268 # blacklist channel after reporter node 1269 # TODO this should depend on the error (even more granularity) 1270 # also, we need finer blacklisting (directed edges; nodes) 1271 if sender_idx is None: 1272 raise PaymentFailure(failure_msg.code_name()) 1273 try: 1274 short_chan_id = route[sender_idx + 1].short_channel_id 1275 except IndexError: 1276 raise PaymentFailure('payment destination reported error') 1277 # TODO: for MPP we need to save the amount for which 1278 # we saw temporary channel failure 1279 self.logger.info(f'blacklisting channel {short_chan_id}') 1280 self.network.channel_blacklist.add(short_chan_id) 1281 1282 # we should not continue if we did not blacklist or update anything 1283 if not (blacklist or update): 1284 raise PaymentFailure(failure_msg.code_name()) 1285 1286 @classmethod 1287 def _decode_channel_update_msg(cls, chan_upd_msg: bytes) -> Optional[Dict[str, Any]]: 1288 channel_update_as_received = chan_upd_msg 1289 channel_update_typed = (258).to_bytes(length=2, byteorder="big") + channel_update_as_received 1290 # note: some nodes put channel updates in error msgs with the leading msg_type already there. 1291 # we try decoding both ways here. 1292 try: 1293 message_type, payload = decode_msg(channel_update_typed) 1294 if payload['chain_hash'] != constants.net.rev_genesis_bytes(): raise Exception() 1295 payload['raw'] = channel_update_typed 1296 return payload 1297 except: # FIXME: too broad 1298 try: 1299 message_type, payload = decode_msg(channel_update_as_received) 1300 if payload['chain_hash'] != constants.net.rev_genesis_bytes(): raise Exception() 1301 payload['raw'] = channel_update_as_received 1302 return payload 1303 except: 1304 return None 1305 1306 @staticmethod 1307 def _check_invoice(invoice: str, *, amount_msat: int = None) -> LnAddr: 1308 addr = lndecode(invoice, expected_hrp=constants.net.SEGWIT_HRP) 1309 if addr.is_expired(): 1310 raise InvoiceError(_("This invoice has expired")) 1311 if amount_msat: # replace amt in invoice. main usecase is paying zero amt invoices 1312 existing_amt_msat = addr.get_amount_msat() 1313 if existing_amt_msat and amount_msat < existing_amt_msat: 1314 raise Exception("cannot pay lower amt than what is originally in LN invoice") 1315 addr.amount = Decimal(amount_msat) / COIN / 1000 1316 if addr.amount is None: 1317 raise InvoiceError(_("Missing amount")) 1318 if addr.get_min_final_cltv_expiry() > lnutil.NBLOCK_CLTV_EXPIRY_TOO_FAR_INTO_FUTURE: 1319 raise InvoiceError("{}\n{}".format( 1320 _("Invoice wants us to risk locking funds for unreasonably long."), 1321 f"min_final_cltv_expiry: {addr.get_min_final_cltv_expiry()}")) 1322 return addr 1323 1324 def is_trampoline_peer(self, node_id: bytes) -> bool: 1325 # until trampoline is advertised in lnfeatures, check against hardcoded list 1326 if is_hardcoded_trampoline(node_id): 1327 return True 1328 peer = self._peers.get(node_id) 1329 if peer and peer.their_features.supports(LnFeatures.OPTION_TRAMPOLINE_ROUTING_OPT): 1330 return True 1331 return False 1332 1333 def suggest_peer(self) -> Optional[bytes]: 1334 if self.channel_db: 1335 return self.lnrater.suggest_peer() 1336 else: 1337 return random.choice(list(hardcoded_trampoline_nodes().values())).pubkey 1338 1339 @profiler 1340 def create_routes_for_payment( 1341 self, *, 1342 amount_msat: int, # part of payment amount we want routes for now 1343 final_total_msat: int, # total payment amount final receiver will get 1344 invoice_pubkey, 1345 min_cltv_expiry, 1346 r_tags, 1347 invoice_features: int, 1348 payment_hash, 1349 payment_secret, 1350 trampoline_fee_level: int, 1351 use_two_trampolines: bool, 1352 fwd_trampoline_onion = None, 1353 full_path: LNPaymentPath = None) -> Sequence[Tuple[LNPaymentRoute, int]]: 1354 1355 """Creates multiple routes for splitting a payment over the available 1356 private channels. 1357 1358 We first try to conduct the payment over a single channel. If that fails 1359 and mpp is supported by the receiver, we will split the payment.""" 1360 # It could happen that the pathfinding uses a channel 1361 # in the graph multiple times, meaning we could exhaust 1362 # its capacity. This could be dealt with by temporarily 1363 # iteratively blacklisting channels for this mpp attempt. 1364 invoice_features = LnFeatures(invoice_features) 1365 trampoline_features = LnFeatures.VAR_ONION_OPT 1366 local_height = self.network.get_local_height() 1367 try: 1368 # try to send over a single channel 1369 if not self.channel_db: 1370 for chan in self.channels.values(): 1371 if not self.is_trampoline_peer(chan.node_id): 1372 continue 1373 if chan.is_frozen_for_sending(): 1374 continue 1375 trampoline_onion, amount_with_fees, cltv_delta = create_trampoline_route_and_onion( 1376 amount_msat=amount_msat, 1377 total_msat=final_total_msat, 1378 min_cltv_expiry=min_cltv_expiry, 1379 my_pubkey=self.node_keypair.pubkey, 1380 invoice_pubkey=invoice_pubkey, 1381 invoice_features=invoice_features, 1382 node_id=chan.node_id, 1383 r_tags=r_tags, 1384 payment_hash=payment_hash, 1385 payment_secret=payment_secret, 1386 local_height=local_height, 1387 trampoline_fee_level=trampoline_fee_level, 1388 use_two_trampolines=use_two_trampolines) 1389 trampoline_payment_secret = os.urandom(32) 1390 if chan.available_to_spend(LOCAL, strict=True) < amount_with_fees: 1391 continue 1392 route = [ 1393 RouteEdge( 1394 start_node=self.node_keypair.pubkey, 1395 end_node=chan.node_id, 1396 short_channel_id=chan.short_channel_id, 1397 fee_base_msat=0, 1398 fee_proportional_millionths=0, 1399 cltv_expiry_delta=0, 1400 node_features=trampoline_features) 1401 ] 1402 routes = [(route, amount_with_fees, amount_with_fees, amount_msat, cltv_delta, trampoline_payment_secret, trampoline_onion)] 1403 break 1404 else: 1405 raise NoPathFound() 1406 else: 1407 route = self.create_route_for_payment( 1408 amount_msat=amount_msat, 1409 invoice_pubkey=invoice_pubkey, 1410 min_cltv_expiry=min_cltv_expiry, 1411 r_tags=r_tags, 1412 invoice_features=invoice_features, 1413 outgoing_channel=None, full_path=full_path) 1414 routes = [(route, amount_msat, final_total_msat, amount_msat, min_cltv_expiry, payment_secret, fwd_trampoline_onion)] 1415 except NoPathFound: 1416 if not invoice_features.supports(LnFeatures.BASIC_MPP_OPT): 1417 raise 1418 channels_with_funds = dict([ 1419 (cid, int(chan.available_to_spend(HTLCOwner.LOCAL))) 1420 for cid, chan in self._channels.items() if not chan.is_frozen_for_sending()]) 1421 self.logger.info(f"channels_with_funds: {channels_with_funds}") 1422 # Create split configurations that are rated according to our 1423 # preference -funds = (low rating=high preference). 1424 split_configurations = suggest_splits(amount_msat, channels_with_funds) 1425 self.logger.info(f'suggest_split {amount_msat} returned {len(split_configurations)} configurations') 1426 for s in split_configurations: 1427 self.logger.info(f"trying split configuration: {s[0].values()} rating: {s[1]}") 1428 routes = [] 1429 try: 1430 if not self.channel_db: 1431 buckets = defaultdict(list) 1432 for chan_id, part_amount_msat in s[0].items(): 1433 chan = self.channels[chan_id] 1434 if part_amount_msat: 1435 buckets[chan.node_id].append((chan_id, part_amount_msat)) 1436 for node_id, bucket in buckets.items(): 1437 bucket_amount_msat = sum([x[1] for x in bucket]) 1438 trampoline_onion, bucket_amount_with_fees, bucket_cltv_delta = create_trampoline_route_and_onion( 1439 amount_msat=bucket_amount_msat, 1440 total_msat=final_total_msat, 1441 min_cltv_expiry=min_cltv_expiry, 1442 my_pubkey=self.node_keypair.pubkey, 1443 invoice_pubkey=invoice_pubkey, 1444 invoice_features=invoice_features, 1445 node_id=node_id, 1446 r_tags=r_tags, 1447 payment_hash=payment_hash, 1448 payment_secret=payment_secret, 1449 local_height=local_height, 1450 trampoline_fee_level=trampoline_fee_level, 1451 use_two_trampolines=use_two_trampolines) 1452 # node_features is only used to determine is_tlv 1453 bucket_payment_secret = os.urandom(32) 1454 bucket_fees = bucket_amount_with_fees - bucket_amount_msat 1455 self.logger.info(f'bucket_fees {bucket_fees}') 1456 for chan_id, part_amount_msat in bucket: 1457 chan = self.channels[chan_id] 1458 margin = chan.available_to_spend(LOCAL, strict=True) - part_amount_msat 1459 delta_fee = min(bucket_fees, margin) 1460 part_amount_msat_with_fees = part_amount_msat + delta_fee 1461 bucket_fees -= delta_fee 1462 route = [ 1463 RouteEdge( 1464 start_node=self.node_keypair.pubkey, 1465 end_node=node_id, 1466 short_channel_id=chan.short_channel_id, 1467 fee_base_msat=0, 1468 fee_proportional_millionths=0, 1469 cltv_expiry_delta=0, 1470 node_features=trampoline_features) 1471 ] 1472 self.logger.info(f'adding route {part_amount_msat} {delta_fee} {margin}') 1473 routes.append((route, part_amount_msat_with_fees, bucket_amount_with_fees, part_amount_msat, bucket_cltv_delta, bucket_payment_secret, trampoline_onion)) 1474 if bucket_fees != 0: 1475 self.logger.info('not enough margin to pay trampoline fee') 1476 raise NoPathFound() 1477 else: 1478 for chan_id, part_amount_msat in s[0].items(): 1479 if part_amount_msat: 1480 channel = self.channels[chan_id] 1481 route = self.create_route_for_payment( 1482 amount_msat=part_amount_msat, 1483 invoice_pubkey=invoice_pubkey, 1484 min_cltv_expiry=min_cltv_expiry, 1485 r_tags=r_tags, 1486 invoice_features=invoice_features, 1487 outgoing_channel=channel, full_path=None) 1488 routes.append((route, part_amount_msat, final_total_msat, part_amount_msat, min_cltv_expiry, payment_secret, fwd_trampoline_onion)) 1489 self.logger.info(f"found acceptable split configuration: {list(s[0].values())} rating: {s[1]}") 1490 break 1491 except NoPathFound: 1492 continue 1493 else: 1494 raise NoPathFound() 1495 return routes 1496 1497 def create_route_for_payment( 1498 self, *, 1499 amount_msat: int, 1500 invoice_pubkey: bytes, 1501 min_cltv_expiry: int, 1502 r_tags, 1503 invoice_features: int, 1504 outgoing_channel: Channel = None, 1505 full_path: Optional[LNPaymentPath]) -> Tuple[LNPaymentRoute, int]: 1506 1507 channels = [outgoing_channel] if outgoing_channel else list(self.channels.values()) 1508 scid_to_my_channels = { 1509 chan.short_channel_id: chan for chan in channels 1510 if chan.short_channel_id is not None 1511 } 1512 blacklist = self.network.channel_blacklist.get_current_list() 1513 # Collect all private edges from route hints. 1514 # Note: if some route hints are multiple edges long, and these paths cross each other, 1515 # we allow our path finding to cross the paths; i.e. the route hints are not isolated. 1516 private_route_edges = {} # type: Dict[ShortChannelID, RouteEdge] 1517 for private_path in r_tags: 1518 # we need to shift the node pubkey by one towards the destination: 1519 private_path_nodes = [edge[0] for edge in private_path][1:] + [invoice_pubkey] 1520 private_path_rest = [edge[1:] for edge in private_path] 1521 start_node = private_path[0][0] 1522 for end_node, edge_rest in zip(private_path_nodes, private_path_rest): 1523 short_channel_id, fee_base_msat, fee_proportional_millionths, cltv_expiry_delta = edge_rest 1524 short_channel_id = ShortChannelID(short_channel_id) 1525 # if we have a routing policy for this edge in the db, that takes precedence, 1526 # as it is likely from a previous failure 1527 channel_policy = self.channel_db.get_policy_for_node( 1528 short_channel_id=short_channel_id, 1529 node_id=start_node, 1530 my_channels=scid_to_my_channels) 1531 if channel_policy: 1532 fee_base_msat = channel_policy.fee_base_msat 1533 fee_proportional_millionths = channel_policy.fee_proportional_millionths 1534 cltv_expiry_delta = channel_policy.cltv_expiry_delta 1535 node_info = self.channel_db.get_node_info_for_node_id(node_id=end_node) 1536 route_edge = RouteEdge( 1537 start_node=start_node, 1538 end_node=end_node, 1539 short_channel_id=short_channel_id, 1540 fee_base_msat=fee_base_msat, 1541 fee_proportional_millionths=fee_proportional_millionths, 1542 cltv_expiry_delta=cltv_expiry_delta, 1543 node_features=node_info.features if node_info else 0) 1544 if route_edge.short_channel_id not in blacklist: 1545 private_route_edges[route_edge.short_channel_id] = route_edge 1546 start_node = end_node 1547 # now find a route, end to end: between us and the recipient 1548 try: 1549 route = self.network.path_finder.find_route( 1550 nodeA=self.node_keypair.pubkey, 1551 nodeB=invoice_pubkey, 1552 invoice_amount_msat=amount_msat, 1553 path=full_path, 1554 my_channels=scid_to_my_channels, 1555 blacklist=blacklist, 1556 private_route_edges=private_route_edges) 1557 except NoChannelPolicy as e: 1558 raise NoPathFound() from e 1559 if not route: 1560 raise NoPathFound() 1561 # test sanity 1562 if not is_route_sane_to_use(route, amount_msat, min_cltv_expiry): 1563 self.logger.info(f"rejecting insane route {route}") 1564 raise NoPathFound() 1565 assert len(route) > 0 1566 if route[-1].end_node != invoice_pubkey: 1567 raise LNPathInconsistent("last node_id != invoice pubkey") 1568 # add features from invoice 1569 route[-1].node_features |= invoice_features 1570 return route 1571 1572 def add_request(self, amount_sat, message, expiry) -> str: 1573 coro = self._add_request_coro(amount_sat, message, expiry) 1574 fut = asyncio.run_coroutine_threadsafe(coro, self.network.asyncio_loop) 1575 try: 1576 return fut.result(timeout=5) 1577 except concurrent.futures.TimeoutError: 1578 raise Exception(_("add invoice timed out")) 1579 1580 @log_exceptions 1581 async def create_invoice( 1582 self, *, 1583 amount_msat: Optional[int], 1584 message: str, 1585 expiry: int) -> Tuple[LnAddr, str]: 1586 1587 timestamp = int(time.time()) 1588 routing_hints = await self._calc_routing_hints_for_invoice(amount_msat) 1589 if not routing_hints: 1590 self.logger.info( 1591 "Warning. No routing hints added to invoice. " 1592 "Other clients will likely not be able to send to us.") 1593 # if not all hints are trampoline, do not create trampoline invoice 1594 invoice_features = self.features.for_invoice() 1595 trampoline_hints = [] 1596 for r in routing_hints: 1597 node_id, short_channel_id, fee_base_msat, fee_proportional_millionths, cltv_expiry_delta = r[1][0] 1598 if len(r[1])== 1 and self.is_trampoline_peer(node_id): 1599 trampoline_hints.append(('t', (node_id, fee_base_msat, fee_proportional_millionths, cltv_expiry_delta))) 1600 payment_preimage = os.urandom(32) 1601 payment_hash = sha256(payment_preimage) 1602 info = PaymentInfo(payment_hash, amount_msat, RECEIVED, PR_UNPAID) 1603 amount_btc = amount_msat/Decimal(COIN*1000) if amount_msat else None 1604 if expiry == 0: 1605 expiry = LN_EXPIRY_NEVER 1606 lnaddr = LnAddr( 1607 paymenthash=payment_hash, 1608 amount=amount_btc, 1609 tags=[ 1610 ('d', message), 1611 ('c', MIN_FINAL_CLTV_EXPIRY_FOR_INVOICE), 1612 ('x', expiry), 1613 ('9', invoice_features)] 1614 + routing_hints 1615 + trampoline_hints, 1616 date=timestamp, 1617 payment_secret=derive_payment_secret_from_payment_preimage(payment_preimage)) 1618 invoice = lnencode(lnaddr, self.node_keypair.privkey) 1619 self.save_preimage(payment_hash, payment_preimage) 1620 self.save_payment_info(info) 1621 return lnaddr, invoice 1622 1623 async def _add_request_coro(self, amount_sat: Optional[int], message, expiry: int) -> str: 1624 amount_msat = amount_sat * 1000 if amount_sat is not None else None 1625 lnaddr, invoice = await self.create_invoice( 1626 amount_msat=amount_msat, 1627 message=message, 1628 expiry=expiry) 1629 key = bh2u(lnaddr.paymenthash) 1630 req = LNInvoice.from_bech32(invoice) 1631 self.wallet.add_payment_request(req) 1632 self.wallet.set_label(key, message) 1633 return key 1634 1635 def save_preimage(self, payment_hash: bytes, preimage: bytes): 1636 assert sha256(preimage) == payment_hash 1637 self.preimages[bh2u(payment_hash)] = bh2u(preimage) 1638 self.wallet.save_db() 1639 1640 def get_preimage(self, payment_hash: bytes) -> Optional[bytes]: 1641 r = self.preimages.get(bh2u(payment_hash)) 1642 return bfh(r) if r else None 1643 1644 def get_payment_info(self, payment_hash: bytes) -> Optional[PaymentInfo]: 1645 """returns None if payment_hash is a payment we are forwarding""" 1646 key = payment_hash.hex() 1647 with self.lock: 1648 if key in self.payments: 1649 amount_msat, direction, status = self.payments[key] 1650 return PaymentInfo(payment_hash, amount_msat, direction, status) 1651 1652 def save_payment_info(self, info: PaymentInfo) -> None: 1653 key = info.payment_hash.hex() 1654 assert info.status in SAVED_PR_STATUS 1655 with self.lock: 1656 self.payments[key] = info.amount_msat, info.direction, info.status 1657 self.wallet.save_db() 1658 1659 def check_received_mpp_htlc(self, payment_secret, short_channel_id, htlc: UpdateAddHtlc, expected_msat: int) -> Optional[bool]: 1660 """ return MPP status: True (accepted), False (expired) or None """ 1661 payment_hash = htlc.payment_hash 1662 is_expired, is_accepted, htlc_set = self.received_mpp_htlcs.get(payment_secret, (False, False, set())) 1663 if self.get_payment_status(payment_hash) == PR_PAID: 1664 # payment_status is persisted 1665 is_accepted = True 1666 is_expired = False 1667 key = (short_channel_id, htlc) 1668 if key not in htlc_set: 1669 htlc_set.add(key) 1670 if not is_accepted and not is_expired: 1671 total = sum([_htlc.amount_msat for scid, _htlc in htlc_set]) 1672 first_timestamp = min([_htlc.timestamp for scid, _htlc in htlc_set]) 1673 if self.stopping_soon: 1674 is_expired = True # try to time out pending HTLCs before shutting down 1675 elif time.time() - first_timestamp > self.MPP_EXPIRY: 1676 is_expired = True 1677 elif total == expected_msat: 1678 is_accepted = True 1679 if is_accepted or is_expired: 1680 htlc_set.remove(key) 1681 if len(htlc_set) > 0: 1682 self.received_mpp_htlcs[payment_secret] = is_expired, is_accepted, htlc_set 1683 elif payment_secret in self.received_mpp_htlcs: 1684 self.received_mpp_htlcs.pop(payment_secret) 1685 return True if is_accepted else (False if is_expired else None) 1686 1687 def get_payment_status(self, payment_hash: bytes) -> int: 1688 info = self.get_payment_info(payment_hash) 1689 return info.status if info else PR_UNPAID 1690 1691 def get_invoice_status(self, invoice: LNInvoice) -> int: 1692 key = invoice.rhash 1693 log = self.logs[key] 1694 if key in self.inflight_payments: 1695 return PR_INFLIGHT 1696 # status may be PR_FAILED 1697 status = self.get_payment_status(bfh(key)) 1698 if status == PR_UNPAID and log: 1699 status = PR_FAILED 1700 return status 1701 1702 def set_invoice_status(self, key: str, status: int) -> None: 1703 if status == PR_INFLIGHT: 1704 self.inflight_payments.add(key) 1705 elif key in self.inflight_payments: 1706 self.inflight_payments.remove(key) 1707 if status in SAVED_PR_STATUS: 1708 self.set_payment_status(bfh(key), status) 1709 util.trigger_callback('invoice_status', self.wallet, key) 1710 1711 def set_request_status(self, payment_hash: bytes, status: int) -> None: 1712 if self.get_payment_status(payment_hash) != status: 1713 self.set_payment_status(payment_hash, status) 1714 util.trigger_callback('request_status', self.wallet, payment_hash.hex(), status) 1715 1716 def set_payment_status(self, payment_hash: bytes, status: int) -> None: 1717 info = self.get_payment_info(payment_hash) 1718 if info is None: 1719 # if we are forwarding 1720 return 1721 info = info._replace(status=status) 1722 self.save_payment_info(info) 1723 1724 def htlc_fulfilled(self, chan, payment_hash: bytes, htlc_id:int): 1725 util.trigger_callback('htlc_fulfilled', payment_hash, chan.channel_id) 1726 q = self.sent_htlcs.get(payment_hash) 1727 if q: 1728 route, payment_secret, amount_msat, bucket_msat, amount_receiver_msat = self.sent_htlcs_routes[(payment_hash, chan.short_channel_id, htlc_id)] 1729 htlc_log = HtlcLog( 1730 success=True, 1731 route=route, 1732 amount_msat=amount_receiver_msat) 1733 q.put_nowait(htlc_log) 1734 else: 1735 key = payment_hash.hex() 1736 self.set_invoice_status(key, PR_PAID) 1737 util.trigger_callback('payment_succeeded', self.wallet, key) 1738 1739 def htlc_failed( 1740 self, 1741 chan: Channel, 1742 payment_hash: bytes, 1743 htlc_id: int, 1744 error_bytes: Optional[bytes], 1745 failure_message: Optional['OnionRoutingFailure']): 1746 1747 util.trigger_callback('htlc_failed', payment_hash, chan.channel_id) 1748 q = self.sent_htlcs.get(payment_hash) 1749 if q: 1750 # detect if it is part of a bucket 1751 # if yes, wait until the bucket completely failed 1752 key = (payment_hash, chan.short_channel_id, htlc_id) 1753 route, payment_secret, amount_msat, bucket_msat, amount_receiver_msat = self.sent_htlcs_routes[key] 1754 if error_bytes: 1755 # TODO "decode_onion_error" might raise, catch and maybe blacklist/penalise someone? 1756 try: 1757 failure_message, sender_idx = chan.decode_onion_error(error_bytes, route, htlc_id) 1758 except Exception as e: 1759 sender_idx = None 1760 failure_message = OnionRoutingFailure(-1, str(e)) 1761 else: 1762 # probably got "update_fail_malformed_htlc". well... who to penalise now? 1763 assert failure_message is not None 1764 sender_idx = None 1765 self.logger.info(f"htlc_failed {failure_message}") 1766 1767 # check sent_buckets if we use trampoline 1768 if self.channel_db is None and payment_secret in self.sent_buckets: 1769 amount_sent, amount_failed = self.sent_buckets[payment_secret] 1770 amount_failed += amount_receiver_msat 1771 self.sent_buckets[payment_secret] = amount_sent, amount_failed 1772 if amount_sent != amount_failed: 1773 self.logger.info('bucket still active...') 1774 return 1775 self.logger.info('bucket failed') 1776 amount_receiver_msat = amount_sent 1777 1778 htlc_log = HtlcLog( 1779 success=False, 1780 route=route, 1781 amount_msat=amount_receiver_msat, 1782 error_bytes=error_bytes, 1783 failure_msg=failure_message, 1784 sender_idx=sender_idx) 1785 q.put_nowait(htlc_log) 1786 else: 1787 self.logger.info(f"received unknown htlc_failed, probably from previous session") 1788 key = payment_hash.hex() 1789 self.set_invoice_status(key, PR_UNPAID) 1790 util.trigger_callback('payment_failed', self.wallet, key, '') 1791 1792 async def _calc_routing_hints_for_invoice(self, amount_msat: Optional[int]): 1793 """calculate routing hints (BOLT-11 'r' field)""" 1794 routing_hints = [] 1795 channels = list(self.channels.values()) 1796 # do minimal filtering of channels. 1797 # we include channels that cannot *right now* receive (e.g. peer disconnected or balance insufficient) 1798 channels = [chan for chan in channels 1799 if (chan.is_open() and not chan.is_frozen_for_receiving())] 1800 # cap max channels to include to keep QR code reasonably scannable 1801 channels = sorted(channels, key=lambda chan: (not chan.is_active(), -chan.available_to_spend(REMOTE))) 1802 channels = channels[:15] 1803 random.shuffle(channels) # let's not leak channel order 1804 scid_to_my_channels = {chan.short_channel_id: chan for chan in channels 1805 if chan.short_channel_id is not None} 1806 for chan in channels: 1807 chan_id = chan.short_channel_id 1808 assert isinstance(chan_id, bytes), chan_id 1809 channel_info = get_mychannel_info(chan_id, scid_to_my_channels) 1810 # note: as a fallback, if we don't have a channel update for the 1811 # incoming direction of our private channel, we fill the invoice with garbage. 1812 # the sender should still be able to pay us, but will incur an extra round trip 1813 # (they will get the channel update from the onion error) 1814 # at least, that's the theory. https://github.com/lightningnetwork/lnd/issues/2066 1815 fee_base_msat = fee_proportional_millionths = 0 1816 cltv_expiry_delta = 1 # lnd won't even try with zero 1817 missing_info = True 1818 if channel_info: 1819 policy = get_mychannel_policy(channel_info.short_channel_id, chan.node_id, scid_to_my_channels) 1820 if policy: 1821 fee_base_msat = policy.fee_base_msat 1822 fee_proportional_millionths = policy.fee_proportional_millionths 1823 cltv_expiry_delta = policy.cltv_expiry_delta 1824 missing_info = False 1825 if missing_info: 1826 self.logger.info( 1827 f"Warning. Missing channel update for our channel {chan_id}; " 1828 f"filling invoice with incorrect data.") 1829 routing_hints.append(('r', [( 1830 chan.node_id, 1831 chan_id, 1832 fee_base_msat, 1833 fee_proportional_millionths, 1834 cltv_expiry_delta)])) 1835 return routing_hints 1836 1837 def delete_payment(self, payment_hash_hex: str): 1838 try: 1839 with self.lock: 1840 del self.payments[payment_hash_hex] 1841 except KeyError: 1842 return 1843 self.wallet.save_db() 1844 1845 def get_balance(self): 1846 with self.lock: 1847 return Decimal(sum( 1848 chan.balance(LOCAL) if not chan.is_closed() else 0 1849 for chan in self.channels.values())) / 1000 1850 1851 def num_sats_can_send(self) -> Decimal: 1852 can_send = Decimal(0) 1853 with self.lock: 1854 if self.channels: 1855 for c in self.channels.values(): 1856 if c.is_active() and not c.is_frozen_for_sending(): 1857 can_send += Decimal(c.available_to_spend(LOCAL)) / 1000 1858 return can_send 1859 1860 def num_sats_can_receive(self) -> Decimal: 1861 can_receive = Decimal(0) 1862 with self.lock: 1863 if self.channels: 1864 for c in self.channels.values(): 1865 if c.is_active() and not c.is_frozen_for_receiving(): 1866 can_receive += Decimal(c.available_to_spend(REMOTE)) / 1000 1867 return can_receive 1868 1869 def can_pay_invoice(self, invoice: LNInvoice) -> bool: 1870 return invoice.get_amount_sat() <= self.num_sats_can_send() 1871 1872 def can_receive_invoice(self, invoice: LNInvoice) -> bool: 1873 return invoice.get_amount_sat() <= self.num_sats_can_receive() 1874 1875 async def close_channel(self, chan_id): 1876 chan = self._channels[chan_id] 1877 peer = self._peers[chan.node_id] 1878 return await peer.close_channel(chan_id) 1879 1880 async def force_close_channel(self, chan_id): 1881 # returns txid or raises 1882 chan = self._channels[chan_id] 1883 tx = chan.force_close_tx() 1884 await self.network.broadcast_transaction(tx) 1885 chan.set_state(ChannelState.FORCE_CLOSING) 1886 return tx.txid() 1887 1888 async def try_force_closing(self, chan_id): 1889 # fails silently but sets the state, so that we will retry later 1890 chan = self._channels[chan_id] 1891 tx = chan.force_close_tx() 1892 chan.set_state(ChannelState.FORCE_CLOSING) 1893 await self.network.try_broadcasting(tx, 'force-close') 1894 1895 def remove_channel(self, chan_id): 1896 chan = self._channels[chan_id] 1897 assert chan.get_state() == ChannelState.REDEEMED 1898 with self.lock: 1899 self._channels.pop(chan_id) 1900 self.db.get('channels').pop(chan_id.hex()) 1901 for addr in chan.get_wallet_addresses_channel_might_want_reserved(): 1902 self.wallet.set_reserved_state_of_address(addr, reserved=False) 1903 1904 util.trigger_callback('channels_updated', self.wallet) 1905 util.trigger_callback('wallet_updated', self.wallet) 1906 1907 @ignore_exceptions 1908 @log_exceptions 1909 async def reestablish_peer_for_given_channel(self, chan: Channel) -> None: 1910 now = time.time() 1911 peer_addresses = [] 1912 if not self.channel_db: 1913 addr = trampolines_by_id().get(chan.node_id) 1914 if addr: 1915 peer_addresses.append(addr) 1916 else: 1917 # will try last good address first, from gossip 1918 last_good_addr = self.channel_db.get_last_good_address(chan.node_id) 1919 if last_good_addr: 1920 peer_addresses.append(last_good_addr) 1921 # will try addresses for node_id from gossip 1922 addrs_from_gossip = self.channel_db.get_node_addresses(chan.node_id) or [] 1923 for host, port, ts in addrs_from_gossip: 1924 peer_addresses.append(LNPeerAddr(host, port, chan.node_id)) 1925 # will try addresses stored in channel storage 1926 peer_addresses += list(chan.get_peer_addresses()) 1927 # Done gathering addresses. 1928 # Now select first one that has not failed recently. 1929 for peer in peer_addresses: 1930 if self._can_retry_addr(peer, urgent=True, now=now): 1931 await self._add_peer(peer.host, peer.port, peer.pubkey) 1932 return 1933 1934 async def reestablish_peers_and_channels(self): 1935 while True: 1936 await asyncio.sleep(1) 1937 if self.stopping_soon: 1938 return 1939 for chan in self.channels.values(): 1940 if chan.is_closed(): 1941 continue 1942 # reestablish 1943 if not chan.should_try_to_reestablish_peer(): 1944 continue 1945 peer = self._peers.get(chan.node_id, None) 1946 if peer: 1947 await peer.taskgroup.spawn(peer.reestablish_channel(chan)) 1948 else: 1949 await self.taskgroup.spawn(self.reestablish_peer_for_given_channel(chan)) 1950 1951 def current_feerate_per_kw(self): 1952 from .simple_config import FEE_LN_ETA_TARGET, FEERATE_FALLBACK_STATIC_FEE, FEERATE_REGTEST_HARDCODED 1953 if constants.net is constants.BitcoinRegtest: 1954 return FEERATE_REGTEST_HARDCODED // 4 1955 feerate_per_kvbyte = self.network.config.eta_target_to_fee(FEE_LN_ETA_TARGET) 1956 if feerate_per_kvbyte is None: 1957 feerate_per_kvbyte = FEERATE_FALLBACK_STATIC_FEE 1958 return max(253, feerate_per_kvbyte // 4) 1959 1960 def create_channel_backup(self, channel_id): 1961 chan = self._channels[channel_id] 1962 # do not backup old-style channels 1963 assert chan.is_static_remotekey_enabled() 1964 peer_addresses = list(chan.get_peer_addresses()) 1965 peer_addr = peer_addresses[0] 1966 return ChannelBackupStorage( 1967 node_id = chan.node_id, 1968 privkey = self.node_keypair.privkey, 1969 funding_txid = chan.funding_outpoint.txid, 1970 funding_index = chan.funding_outpoint.output_index, 1971 funding_address = chan.get_funding_address(), 1972 host = peer_addr.host, 1973 port = peer_addr.port, 1974 is_initiator = chan.constraints.is_initiator, 1975 channel_seed = chan.config[LOCAL].channel_seed, 1976 local_delay = chan.config[LOCAL].to_self_delay, 1977 remote_delay = chan.config[REMOTE].to_self_delay, 1978 remote_revocation_pubkey = chan.config[REMOTE].revocation_basepoint.pubkey, 1979 remote_payment_pubkey = chan.config[REMOTE].payment_basepoint.pubkey) 1980 1981 def export_channel_backup(self, channel_id): 1982 xpub = self.wallet.get_fingerprint() 1983 backup_bytes = self.create_channel_backup(channel_id).to_bytes() 1984 assert backup_bytes == ChannelBackupStorage.from_bytes(backup_bytes).to_bytes(), "roundtrip failed" 1985 encrypted = pw_encode_with_version_and_mac(backup_bytes, xpub) 1986 assert backup_bytes == pw_decode_with_version_and_mac(encrypted, xpub), "encrypt failed" 1987 return 'channel_backup:' + encrypted 1988 1989 async def request_remote_force_close( 1990 self, *, funding_txid: str, funding_index: int, connect_str: str): 1991 """ 1992 Requests the remote to force close a channel. Can be used without 1993 having state or any backup for the channel. 1994 Assumes that channel was originally opened with the same local peer (node_keypair). 1995 Kept for console use. 1996 1997 Example: 1998 network.run_from_another_thread(wallet.lnworker.request_remote_force_close(funding_txid="11a3b391bc99dbca0b2be4fdd8f18ca641896c81ae4d9596b30cbf1eef17af71", funding_index=1, connect_str="023a8dfe081c6bbd0504e599f33d39d17687de63023a8b20afcb59147d9d77c19d")) 1999 """ 2000 channel_id = lnutil.channel_id_from_funding_tx(funding_txid, funding_index)[0] 2001 peer = await self.add_peer(connect_str) 2002 await peer.trigger_force_close(channel_id) 2003 2004 def import_channel_backup(self, data): 2005 assert data.startswith('channel_backup:') 2006 encrypted = data[15:] 2007 xpub = self.wallet.get_fingerprint() 2008 decrypted = pw_decode_with_version_and_mac(encrypted, xpub) 2009 cb_storage = ChannelBackupStorage.from_bytes(decrypted) 2010 channel_id = cb_storage.channel_id() 2011 if channel_id.hex() in self.db.get_dict("channels"): 2012 raise Exception('Channel already in wallet') 2013 self.logger.info(f'importing channel backup: {channel_id.hex()}') 2014 cb = ChannelBackup(cb_storage, sweep_address=self.sweep_address, lnworker=self) 2015 d = self.db.get_dict("channel_backups") 2016 d[channel_id.hex()] = cb_storage 2017 with self.lock: 2018 self._channel_backups[channel_id] = cb 2019 self.wallet.save_db() 2020 util.trigger_callback('channels_updated', self.wallet) 2021 self.lnwatcher.add_channel(cb.funding_outpoint.to_str(), cb.get_funding_address()) 2022 2023 def remove_channel_backup(self, channel_id): 2024 d = self.db.get_dict("channel_backups") 2025 if channel_id.hex() not in d: 2026 raise Exception('Channel not found') 2027 with self.lock: 2028 d.pop(channel_id.hex()) 2029 self._channel_backups.pop(channel_id) 2030 self.wallet.save_db() 2031 util.trigger_callback('channels_updated', self.wallet) 2032 2033 @log_exceptions 2034 async def request_force_close_from_backup(self, channel_id: bytes): 2035 cb = self.channel_backups.get(channel_id) 2036 if not cb: 2037 raise Exception(f'channel backup not found {self.channel_backups}') 2038 cb = cb.cb # storage 2039 self.logger.info(f'requesting channel force close: {channel_id.hex()}') 2040 # TODO also try network addresses from gossip db (as it might have changed) 2041 peer_addr = LNPeerAddr(cb.host, cb.port, cb.node_id) 2042 transport = LNTransport(cb.privkey, peer_addr, proxy=self.network.proxy) 2043 peer = Peer(self, cb.node_id, transport, is_channel_backup=True) 2044 async with TaskGroup(wait=any) as group: 2045 await group.spawn(peer._message_loop()) 2046 await group.spawn(peer.trigger_force_close(channel_id)) 2047 return True