electrum

Electrum Bitcoin wallet
git clone https://git.parazyd.org/electrum
Log | Files | Refs | Submodules

lnworker.py (99688B)


      1 # Copyright (C) 2018 The Electrum developers
      2 # Distributed under the MIT software license, see the accompanying
      3 # file LICENCE or http://www.opensource.org/licenses/mit-license.php
      4 
      5 import asyncio
      6 import os
      7 from decimal import Decimal
      8 import random
      9 import time
     10 from typing import (Optional, Sequence, Tuple, List, Set, Dict, TYPE_CHECKING,
     11                     NamedTuple, Union, Mapping, Any, Iterable)
     12 import threading
     13 import socket
     14 import aiohttp
     15 import json
     16 from datetime import datetime, timezone
     17 from functools import partial
     18 from collections import defaultdict
     19 import concurrent
     20 from concurrent import futures
     21 import urllib.parse
     22 
     23 import dns.resolver
     24 import dns.exception
     25 from aiorpcx import run_in_thread, TaskGroup, NetAddress, ignore_after
     26 
     27 from . import constants, util
     28 from . import keystore
     29 from .util import profiler, chunks
     30 from .invoices import PR_TYPE_LN, PR_UNPAID, PR_EXPIRED, PR_PAID, PR_INFLIGHT, PR_FAILED, PR_ROUTING, LNInvoice, LN_EXPIRY_NEVER
     31 from .util import NetworkRetryManager, JsonRPCClient
     32 from .lnutil import LN_MAX_FUNDING_SAT
     33 from .keystore import BIP32_KeyStore
     34 from .bitcoin import COIN
     35 from .transaction import Transaction
     36 from .crypto import sha256
     37 from .bip32 import BIP32Node
     38 from .util import bh2u, bfh, InvoiceError, resolve_dns_srv, is_ip_address, log_exceptions
     39 from .util import ignore_exceptions, make_aiohttp_session, SilentTaskGroup
     40 from .util import timestamp_to_datetime, random_shuffled_copy
     41 from .util import MyEncoder, is_private_netaddress
     42 from .logging import Logger
     43 from .lntransport import LNTransport, LNResponderTransport
     44 from .lnpeer import Peer, LN_P2P_NETWORK_TIMEOUT
     45 from .lnaddr import lnencode, LnAddr, lndecode
     46 from .ecc import der_sig_from_sig_string
     47 from .lnchannel import Channel
     48 from .lnchannel import ChannelState, PeerState
     49 from .lnrater import LNRater
     50 from . import lnutil
     51 from .lnutil import funding_output_script
     52 from .bitcoin import redeem_script_to_address
     53 from .lnutil import (Outpoint, LNPeerAddr,
     54                      get_compressed_pubkey_from_bech32, extract_nodeid,
     55                      PaymentFailure, split_host_port, ConnStringFormatError,
     56                      generate_keypair, LnKeyFamily, LOCAL, REMOTE,
     57                      MIN_FINAL_CLTV_EXPIRY_FOR_INVOICE,
     58                      NUM_MAX_EDGES_IN_PAYMENT_PATH, SENT, RECEIVED, HTLCOwner,
     59                      UpdateAddHtlc, Direction, LnFeatures, ShortChannelID,
     60                      HtlcLog, derive_payment_secret_from_payment_preimage,
     61                      NoPathFound)
     62 from .lnutil import ln_dummy_address, ln_compare_features, IncompatibleLightningFeatures
     63 from .lnrouter import TrampolineEdge
     64 from .transaction import PartialTxOutput, PartialTransaction, PartialTxInput
     65 from .lnonion import OnionFailureCode, process_onion_packet, OnionPacket, OnionRoutingFailure
     66 from .lnmsg import decode_msg
     67 from .i18n import _
     68 from .lnrouter import (RouteEdge, LNPaymentRoute, LNPaymentPath, is_route_sane_to_use,
     69                        NoChannelPolicy, LNPathInconsistent)
     70 from .address_synchronizer import TX_HEIGHT_LOCAL
     71 from . import lnsweep
     72 from .lnwatcher import LNWalletWatcher
     73 from .crypto import pw_encode_with_version_and_mac, pw_decode_with_version_and_mac
     74 from .lnutil import ChannelBackupStorage
     75 from .lnchannel import ChannelBackup
     76 from .channel_db import UpdateStatus
     77 from .channel_db import get_mychannel_info, get_mychannel_policy
     78 from .submarine_swaps import SwapManager
     79 from .channel_db import ChannelInfo, Policy
     80 from .mpp_split import suggest_splits
     81 from .trampoline import create_trampoline_route_and_onion
     82 
     83 if TYPE_CHECKING:
     84     from .network import Network
     85     from .wallet import Abstract_Wallet
     86     from .channel_db import ChannelDB
     87     from .simple_config import SimpleConfig
     88 
     89 
     90 SAVED_PR_STATUS = [PR_PAID, PR_UNPAID] # status that are persisted
     91 
     92 
     93 NUM_PEERS_TARGET = 4
     94 
     95 
     96 FALLBACK_NODE_LIST_TESTNET = (
     97     LNPeerAddr(host='203.132.95.10', port=9735, pubkey=bfh('038863cf8ab91046230f561cd5b386cbff8309fa02e3f0c3ed161a3aeb64a643b9')),
     98     LNPeerAddr(host='2401:d002:4402:0:bf1d:986a:7598:6d49', port=9735, pubkey=bfh('038863cf8ab91046230f561cd5b386cbff8309fa02e3f0c3ed161a3aeb64a643b9')),
     99     LNPeerAddr(host='50.116.3.223', port=9734, pubkey=bfh('03236a685d30096b26692dce0cf0fa7c8528bdf61dbf5363a3ef6d5c92733a3016')),
    100     LNPeerAddr(host='3.16.119.191', port=9735, pubkey=bfh('03d5e17a3c213fe490e1b0c389f8cfcfcea08a29717d50a9f453735e0ab2a7c003')),
    101     LNPeerAddr(host='34.250.234.192', port=9735, pubkey=bfh('03933884aaf1d6b108397e5efe5c86bcf2d8ca8d2f700eda99db9214fc2712b134')),
    102     LNPeerAddr(host='88.99.209.230', port=9735, pubkey=bfh('0260d9119979caedc570ada883ff614c6efb93f7f7382e25d73ecbeba0b62df2d7')),
    103     LNPeerAddr(host='160.16.233.215', port=9735, pubkey=bfh('023ea0a53af875580899da0ab0a21455d9c19160c4ea1b7774c9d4be6810b02d2c')),
    104     LNPeerAddr(host='197.155.6.173', port=9735, pubkey=bfh('0269a94e8b32c005e4336bfb743c08a6e9beb13d940d57c479d95c8e687ccbdb9f')),
    105     LNPeerAddr(host='2c0f:fb18:406::4', port=9735, pubkey=bfh('0269a94e8b32c005e4336bfb743c08a6e9beb13d940d57c479d95c8e687ccbdb9f')),
    106     LNPeerAddr(host='163.172.94.64', port=9735, pubkey=bfh('030f0bf260acdbd3edcad84d7588ec7c5df4711e87e6a23016f989b8d3a4147230')),
    107     LNPeerAddr(host='23.237.77.12', port=9735, pubkey=bfh('02312627fdf07fbdd7e5ddb136611bdde9b00d26821d14d94891395452f67af248')),
    108     LNPeerAddr(host='197.155.6.172', port=9735, pubkey=bfh('02ae2f22b02375e3e9b4b4a2db4f12e1b50752b4062dbefd6e01332acdaf680379')),
    109     LNPeerAddr(host='2c0f:fb18:406::3', port=9735, pubkey=bfh('02ae2f22b02375e3e9b4b4a2db4f12e1b50752b4062dbefd6e01332acdaf680379')),
    110     LNPeerAddr(host='23.239.23.44', port=9740, pubkey=bfh('034fe52e98a0e9d3c21b767e1b371881265d8c7578c21f5afd6d6438da10348b36')),
    111     LNPeerAddr(host='2600:3c01::f03c:91ff:fe05:349c', port=9740, pubkey=bfh('034fe52e98a0e9d3c21b767e1b371881265d8c7578c21f5afd6d6438da10348b36')),
    112 )
    113 
    114 FALLBACK_NODE_LIST_MAINNET = [
    115     LNPeerAddr(host='172.81.181.3', port=9735, pubkey=bfh('0214382bdce7750dfcb8126df8e2b12de38536902dc36abcebdaeefdeca1df8284')),
    116     LNPeerAddr(host='35.230.100.60', port=9735, pubkey=bfh('023f5e3582716bed96f6f26cfcd8037e07474d7b4743afdc8b07e692df63464d7e')),
    117     LNPeerAddr(host='40.69.71.114', port=9735, pubkey=bfh('028303182c9885da93b3b25c9621d22cf34475e63c123942e402ab530c0556e675')),
    118     LNPeerAddr(host='94.177.171.73', port=9735, pubkey=bfh('0276e09a267592e7451a939c932cf685f0754de382a3ca85d2fb3a864d4c365ad5')),
    119     LNPeerAddr(host='34.236.113.58', port=9735, pubkey=bfh('02fa50c72ee1e2eb5f1b6d9c3032080c4c864373c4201dfa2966aa34eee1051f97')),
    120     LNPeerAddr(host='52.50.244.44', port=9735, pubkey=bfh('030c3f19d742ca294a55c00376b3b355c3c90d61c6b6b39554dbc7ac19b141c14f')),
    121     LNPeerAddr(host='157.245.68.47', port=9735, pubkey=bfh('03c2abfa93eacec04721c019644584424aab2ba4dff3ac9bdab4e9c97007491dda')),
    122     LNPeerAddr(host='18.221.23.28', port=9735, pubkey=bfh('03abf6f44c355dec0d5aa155bdbdd6e0c8fefe318eff402de65c6eb2e1be55dc3e')),
    123     LNPeerAddr(host='52.224.178.244', port=9735, pubkey=bfh('026b105ac13212c48714c6be9b11577a9ce10f10e1c88a45ce217e6331209faf8b')),
    124     LNPeerAddr(host='34.239.230.56', port=9735, pubkey=bfh('03864ef025fde8fb587d989186ce6a4a186895ee44a926bfc370e2c366597a3f8f')),
    125     LNPeerAddr(host='46.229.165.136', port=9735, pubkey=bfh('0390b5d4492dc2f5318e5233ab2cebf6d48914881a33ef6a9c6bcdbb433ad986d0')),
    126     LNPeerAddr(host='157.230.28.160', port=9735, pubkey=bfh('0279c22ed7a068d10dc1a38ae66d2d6461e269226c60258c021b1ddcdfe4b00bc4')),
    127     LNPeerAddr(host='74.108.13.152', port=9735, pubkey=bfh('0331f80652fb840239df8dc99205792bba2e559a05469915804c08420230e23c7c')),
    128     LNPeerAddr(host='167.172.44.148', port=9735, pubkey=bfh('0395033b252c6f40e3756984162d68174e2bd8060a129c0d3462a9370471c6d28f')),
    129     LNPeerAddr(host='138.68.14.104', port=9735, pubkey=bfh('03bb88ccc444534da7b5b64b4f7b15e1eccb18e102db0e400d4b9cfe93763aa26d')),
    130     LNPeerAddr(host='3.124.63.44', port=9735, pubkey=bfh('0242a4ae0c5bef18048fbecf995094b74bfb0f7391418d71ed394784373f41e4f3')),
    131     LNPeerAddr(host='2001:470:8:2e1::43', port=9735, pubkey=bfh('03baa70886d9200af0ffbd3f9e18d96008331c858456b16e3a9b41e735c6208fef')),
    132     LNPeerAddr(host='2601:186:c100:6bcd:219:d1ff:fe75:dc2f', port=9735, pubkey=bfh('0298f6074a454a1f5345cb2a7c6f9fce206cd0bf675d177cdbf0ca7508dd28852f')),
    133     LNPeerAddr(host='2001:41d0:e:734::1', port=9735, pubkey=bfh('03a503d8e30f2ff407096d235b5db63b4fcf3f89a653acb6f43d3fc492a7674019')),
    134     LNPeerAddr(host='2a01:4f9:2b:2254::2', port=9735, pubkey=bfh('02f3069a342ae2883a6f29e275f06f28a56a6ea2e2d96f5888a3266444dcf542b6')),
    135     LNPeerAddr(host='2a02:8070:24c1:100:528c:2997:6dbc:a054', port=9735, pubkey=bfh('02a45def9ae014fdd2603dd7033d157faa3a55a72b06a63ae22ef46d9fafdc6e8d')),
    136     LNPeerAddr(host='2600:3c01::f03c:91ff:fe05:349c', port=9736, pubkey=bfh('02731b798b39a09f9f14e90ee601afb6ebb796d6e5797de14582a978770b33700f')),
    137     LNPeerAddr(host='2a00:8a60:e012:a00::21', port=9735, pubkey=bfh('027ce055380348d7812d2ae7745701c9f93e70c1adeb2657f053f91df4f2843c71')),
    138     LNPeerAddr(host='2604:a880:400:d1::8bd:1001', port=9735, pubkey=bfh('03649c72a4816f0cd546f84aafbd657e92a30ab474de7ab795e8b5650a427611f7')),
    139     LNPeerAddr(host='2a01:4f8:c0c:7b31::1', port=9735, pubkey=bfh('02c16cca44562b590dd279c942200bdccfd4f990c3a69fad620c10ef2f8228eaff')),
    140     LNPeerAddr(host='2001:41d0:1:b40d::1', port=9735, pubkey=bfh('026726a4b043d413b45b334876d17b8a98848129604429ec65532ba286a42efeac')),
    141 ]
    142 
    143 
    144 from .trampoline import trampolines_by_id, hardcoded_trampoline_nodes, is_hardcoded_trampoline
    145 
    146 
    147 class PaymentInfo(NamedTuple):
    148     payment_hash: bytes
    149     amount_msat: Optional[int]
    150     direction: int
    151     status: int
    152 
    153 
    154 class ErrorAddingPeer(Exception): pass
    155 
    156 
    157 # set some feature flags as baseline for both LNWallet and LNGossip
    158 # note that e.g. DATA_LOSS_PROTECT is needed for LNGossip as many peers require it
    159 BASE_FEATURES = LnFeatures(0)\
    160     | LnFeatures.OPTION_DATA_LOSS_PROTECT_OPT\
    161     | LnFeatures.OPTION_STATIC_REMOTEKEY_OPT\
    162     | LnFeatures.VAR_ONION_OPT\
    163     | LnFeatures.PAYMENT_SECRET_OPT\
    164     | LnFeatures.OPTION_UPFRONT_SHUTDOWN_SCRIPT_OPT
    165 
    166 # we do not want to receive unrequested gossip (see lnpeer.maybe_save_remote_update)
    167 LNWALLET_FEATURES = BASE_FEATURES\
    168     | LnFeatures.OPTION_DATA_LOSS_PROTECT_REQ\
    169     | LnFeatures.OPTION_STATIC_REMOTEKEY_REQ\
    170     | LnFeatures.GOSSIP_QUERIES_REQ\
    171     | LnFeatures.BASIC_MPP_OPT\
    172     | LnFeatures.OPTION_TRAMPOLINE_ROUTING_OPT
    173 
    174 LNGOSSIP_FEATURES = BASE_FEATURES\
    175     | LnFeatures.GOSSIP_QUERIES_OPT\
    176     | LnFeatures.GOSSIP_QUERIES_REQ
    177 
    178 
    179 class LNWorker(Logger, NetworkRetryManager[LNPeerAddr]):
    180 
    181     def __init__(self, xprv, features: LnFeatures):
    182         Logger.__init__(self)
    183         NetworkRetryManager.__init__(
    184             self,
    185             max_retry_delay_normal=3600,
    186             init_retry_delay_normal=600,
    187             max_retry_delay_urgent=300,
    188             init_retry_delay_urgent=4,
    189         )
    190         self.lock = threading.RLock()
    191         self.node_keypair = generate_keypair(BIP32Node.from_xkey(xprv), LnKeyFamily.NODE_KEY)
    192         self._peers = {}  # type: Dict[bytes, Peer]  # pubkey -> Peer  # needs self.lock
    193         self.taskgroup = SilentTaskGroup()
    194         self.listen_server = None  # type: Optional[asyncio.AbstractServer]
    195         self.features = features
    196         self.network = None  # type: Optional[Network]
    197         self.config = None  # type: Optional[SimpleConfig]
    198         self.stopping_soon = False  # whether we are being shut down
    199 
    200         util.register_callback(self.on_proxy_changed, ['proxy_set'])
    201 
    202     @property
    203     def channel_db(self):
    204         return self.network.channel_db if self.network else None
    205 
    206     @property
    207     def peers(self) -> Mapping[bytes, Peer]:
    208         """Returns a read-only copy of peers."""
    209         with self.lock:
    210             return self._peers.copy()
    211 
    212     def channels_for_peer(self, node_id: bytes) -> Dict[bytes, Channel]:
    213         return {}
    214 
    215     def get_node_alias(self, node_id: bytes) -> Optional[str]:
    216         """Returns the alias of the node, or None if unknown."""
    217         node_alias = None
    218         if self.channel_db:
    219             node_info = self.channel_db.get_node_info_for_node_id(node_id)
    220             if node_info:
    221                 node_alias = node_info.alias
    222         else:
    223             for k, v in hardcoded_trampoline_nodes().items():
    224                 if v.pubkey == node_id:
    225                     node_alias = k
    226                     break
    227         return node_alias
    228 
    229     async def maybe_listen(self):
    230         # FIXME: only one LNWorker can listen at a time (single port)
    231         listen_addr = self.config.get('lightning_listen')
    232         if listen_addr:
    233             self.logger.info(f'lightning_listen enabled. will try to bind: {listen_addr!r}')
    234             try:
    235                 netaddr = NetAddress.from_string(listen_addr)
    236             except Exception as e:
    237                 self.logger.error(f"failed to parse config key 'lightning_listen'. got: {e!r}")
    238                 return
    239             addr = str(netaddr.host)
    240             async def cb(reader, writer):
    241                 transport = LNResponderTransport(self.node_keypair.privkey, reader, writer)
    242                 try:
    243                     node_id = await transport.handshake()
    244                 except Exception as e:
    245                     self.logger.info(f'handshake failure from incoming connection: {e!r}')
    246                     return
    247                 peer = Peer(self, node_id, transport)
    248                 with self.lock:
    249                     self._peers[node_id] = peer
    250                 await self.taskgroup.spawn(peer.main_loop())
    251             try:
    252                 self.listen_server = await asyncio.start_server(cb, addr, netaddr.port)
    253             except OSError as e:
    254                 self.logger.error(f"cannot listen for lightning p2p. error: {e!r}")
    255 
    256     @ignore_exceptions  # don't kill outer taskgroup
    257     async def main_loop(self):
    258         self.logger.info("starting taskgroup.")
    259         try:
    260             async with self.taskgroup as group:
    261                 await group.spawn(self._maintain_connectivity())
    262         except asyncio.CancelledError:
    263             raise
    264         except Exception as e:
    265             self.logger.exception("taskgroup died.")
    266         finally:
    267             self.logger.info("taskgroup stopped.")
    268 
    269     async def _maintain_connectivity(self):
    270         while True:
    271             await asyncio.sleep(1)
    272             if self.stopping_soon:
    273                 return
    274             now = time.time()
    275             if len(self._peers) >= NUM_PEERS_TARGET:
    276                 continue
    277             peers = await self._get_next_peers_to_try()
    278             for peer in peers:
    279                 if self._can_retry_addr(peer, now=now):
    280                     try:
    281                         await self._add_peer(peer.host, peer.port, peer.pubkey)
    282                     except ErrorAddingPeer as e:
    283                         self.logger.info(f"failed to add peer: {peer}. exc: {e!r}")
    284 
    285     async def _add_peer(self, host: str, port: int, node_id: bytes) -> Peer:
    286         if node_id in self._peers:
    287             return self._peers[node_id]
    288         port = int(port)
    289         peer_addr = LNPeerAddr(host, port, node_id)
    290         self._trying_addr_now(peer_addr)
    291         self.logger.info(f"adding peer {peer_addr}")
    292         if node_id == self.node_keypair.pubkey:
    293             raise ErrorAddingPeer("cannot connect to self")
    294         transport = LNTransport(self.node_keypair.privkey, peer_addr,
    295                                 proxy=self.network.proxy)
    296         peer = Peer(self, node_id, transport)
    297         await self.taskgroup.spawn(peer.main_loop())
    298         with self.lock:
    299             self._peers[node_id] = peer
    300         return peer
    301 
    302     def peer_closed(self, peer: Peer) -> None:
    303         with self.lock:
    304             self._peers.pop(peer.pubkey, None)
    305 
    306     def num_peers(self) -> int:
    307         return sum([p.is_initialized() for p in self.peers.values()])
    308 
    309     def start_network(self, network: 'Network'):
    310         assert network
    311         self.network = network
    312         self.config = network.config
    313         self._add_peers_from_config()
    314         asyncio.run_coroutine_threadsafe(self.main_loop(), self.network.asyncio_loop)
    315 
    316     async def stop(self):
    317         if self.listen_server:
    318             self.listen_server.close()
    319         util.unregister_callback(self.on_proxy_changed)
    320         await self.taskgroup.cancel_remaining()
    321 
    322     def _add_peers_from_config(self):
    323         peer_list = self.config.get('lightning_peers', [])
    324         for host, port, pubkey in peer_list:
    325             asyncio.run_coroutine_threadsafe(
    326                 self._add_peer(host, int(port), bfh(pubkey)),
    327                 self.network.asyncio_loop)
    328 
    329     def is_good_peer(self, peer: LNPeerAddr) -> bool:
    330         # the purpose of this method is to filter peers that advertise the desired feature bits
    331         # it is disabled for now, because feature bits published in node announcements seem to be unreliable
    332         return True
    333         node_id = peer.pubkey
    334         node = self.channel_db._nodes.get(node_id)
    335         if not node:
    336             return False
    337         try:
    338             ln_compare_features(self.features, node.features)
    339         except IncompatibleLightningFeatures:
    340             return False
    341         #self.logger.info(f'is_good {peer.host}')
    342         return True
    343 
    344     def on_peer_successfully_established(self, peer: Peer) -> None:
    345         if isinstance(peer.transport, LNTransport):
    346             peer_addr = peer.transport.peer_addr
    347             # reset connection attempt count
    348             self._on_connection_successfully_established(peer_addr)
    349             # add into channel db
    350             if self.channel_db:
    351                 self.channel_db.add_recent_peer(peer_addr)
    352             # save network address into channels we might have with peer
    353             for chan in peer.channels.values():
    354                 chan.add_or_update_peer_addr(peer_addr)
    355 
    356     async def _get_next_peers_to_try(self) -> Sequence[LNPeerAddr]:
    357         now = time.time()
    358         await self.channel_db.data_loaded.wait()
    359         # first try from recent peers
    360         recent_peers = self.channel_db.get_recent_peers()
    361         for peer in recent_peers:
    362             if not peer:
    363                 continue
    364             if peer.pubkey in self._peers:
    365                 continue
    366             if not self._can_retry_addr(peer, now=now):
    367                 continue
    368             if not self.is_good_peer(peer):
    369                 continue
    370             return [peer]
    371         # try random peer from graph
    372         unconnected_nodes = self.channel_db.get_200_randomly_sorted_nodes_not_in(self.peers.keys())
    373         if unconnected_nodes:
    374             for node_id in unconnected_nodes:
    375                 addrs = self.channel_db.get_node_addresses(node_id)
    376                 if not addrs:
    377                     continue
    378                 host, port, timestamp = self.choose_preferred_address(list(addrs))
    379                 try:
    380                     peer = LNPeerAddr(host, port, node_id)
    381                 except ValueError:
    382                     continue
    383                 if not self._can_retry_addr(peer, now=now):
    384                     continue
    385                 if not self.is_good_peer(peer):
    386                     continue
    387                 #self.logger.info('taking random ln peer from our channel db')
    388                 return [peer]
    389 
    390         # getting desperate... let's try hardcoded fallback list of peers
    391         if constants.net in (constants.BitcoinTestnet, ):
    392             fallback_list = FALLBACK_NODE_LIST_TESTNET
    393         elif constants.net in (constants.BitcoinMainnet, ):
    394             fallback_list = FALLBACK_NODE_LIST_MAINNET
    395         else:
    396             return []  # regtest??
    397 
    398         fallback_list = [peer for peer in fallback_list if self._can_retry_addr(peer, now=now)]
    399         if fallback_list:
    400             return [random.choice(fallback_list)]
    401 
    402         # last resort: try dns seeds (BOLT-10)
    403         return await run_in_thread(self._get_peers_from_dns_seeds)
    404 
    405     def _get_peers_from_dns_seeds(self) -> Sequence[LNPeerAddr]:
    406         # NOTE: potentially long blocking call, do not run directly on asyncio event loop.
    407         # Return several peers to reduce the number of dns queries.
    408         if not constants.net.LN_DNS_SEEDS:
    409             return []
    410         dns_seed = random.choice(constants.net.LN_DNS_SEEDS)
    411         self.logger.info('asking dns seed "{}" for ln peers'.format(dns_seed))
    412         try:
    413             # note: this might block for several seconds
    414             # this will include bech32-encoded-pubkeys and ports
    415             srv_answers = resolve_dns_srv('r{}.{}'.format(
    416                 constants.net.LN_REALM_BYTE, dns_seed))
    417         except dns.exception.DNSException as e:
    418             self.logger.info(f'failed querying (1) dns seed "{dns_seed}" for ln peers: {repr(e)}')
    419             return []
    420         random.shuffle(srv_answers)
    421         num_peers = 2 * NUM_PEERS_TARGET
    422         srv_answers = srv_answers[:num_peers]
    423         # we now have pubkeys and ports but host is still needed
    424         peers = []
    425         for srv_ans in srv_answers:
    426             try:
    427                 # note: this might block for several seconds
    428                 answers = dns.resolver.resolve(srv_ans['host'])
    429             except dns.exception.DNSException as e:
    430                 self.logger.info(f'failed querying (2) dns seed "{dns_seed}" for ln peers: {repr(e)}')
    431                 continue
    432             try:
    433                 ln_host = str(answers[0])
    434                 port = int(srv_ans['port'])
    435                 bech32_pubkey = srv_ans['host'].split('.')[0]
    436                 pubkey = get_compressed_pubkey_from_bech32(bech32_pubkey)
    437                 peers.append(LNPeerAddr(ln_host, port, pubkey))
    438             except Exception as e:
    439                 self.logger.info(f'error with parsing peer from dns seed: {repr(e)}')
    440                 continue
    441         self.logger.info(f'got {len(peers)} ln peers from dns seed')
    442         return peers
    443 
    444     @staticmethod
    445     def choose_preferred_address(addr_list: Sequence[Tuple[str, int, int]]) -> Tuple[str, int, int]:
    446         assert len(addr_list) >= 1
    447         # choose first one that is an IP
    448         for host, port, timestamp in addr_list:
    449             if is_ip_address(host):
    450                 return host, port, timestamp
    451         # otherwise choose one at random
    452         # TODO maybe filter out onion if not on tor?
    453         choice = random.choice(addr_list)
    454         return choice
    455 
    456     def on_proxy_changed(self, event, *args):
    457         for peer in self.peers.values():
    458             peer.close_and_cleanup()
    459         self._clear_addr_retry_times()
    460 
    461     @log_exceptions
    462     async def add_peer(self, connect_str: str) -> Peer:
    463         node_id, rest = extract_nodeid(connect_str)
    464         peer = self._peers.get(node_id)
    465         if not peer:
    466             if rest is not None:
    467                 host, port = split_host_port(rest)
    468             else:
    469                 if not self.channel_db:
    470                     addr = trampolines_by_id().get(node_id)
    471                     if not addr:
    472                         raise ConnStringFormatError(_('Address unknown for node:') + ' ' + bh2u(node_id))
    473                     host, port = addr.host, addr.port
    474                 else:
    475                     addrs = self.channel_db.get_node_addresses(node_id)
    476                     if not addrs:
    477                         raise ConnStringFormatError(_('Don\'t know any addresses for node:') + ' ' + bh2u(node_id))
    478                     host, port, timestamp = self.choose_preferred_address(list(addrs))
    479             port = int(port)
    480             # Try DNS-resolving the host (if needed). This is simply so that
    481             # the caller gets a nice exception if it cannot be resolved.
    482             try:
    483                 await asyncio.get_event_loop().getaddrinfo(host, port)
    484             except socket.gaierror:
    485                 raise ConnStringFormatError(_('Hostname does not resolve (getaddrinfo failed)'))
    486             # add peer
    487             peer = await self._add_peer(host, port, node_id)
    488         return peer
    489 
    490 
    491 class LNGossip(LNWorker):
    492     max_age = 14*24*3600
    493     LOGGING_SHORTCUT = 'g'
    494 
    495     def __init__(self):
    496         seed = os.urandom(32)
    497         node = BIP32Node.from_rootseed(seed, xtype='standard')
    498         xprv = node.to_xprv()
    499         super().__init__(xprv, LNGOSSIP_FEATURES)
    500         self.unknown_ids = set()
    501 
    502     def start_network(self, network: 'Network'):
    503         assert network
    504         super().start_network(network)
    505         asyncio.run_coroutine_threadsafe(self.taskgroup.spawn(self.maintain_db()), self.network.asyncio_loop)
    506 
    507     async def maintain_db(self):
    508         await self.channel_db.data_loaded.wait()
    509         while True:
    510             if len(self.unknown_ids) == 0:
    511                 self.channel_db.prune_old_policies(self.max_age)
    512                 self.channel_db.prune_orphaned_channels()
    513             await asyncio.sleep(120)
    514 
    515     async def add_new_ids(self, ids: Iterable[bytes]):
    516         known = self.channel_db.get_channel_ids()
    517         new = set(ids) - set(known)
    518         self.unknown_ids.update(new)
    519         util.trigger_callback('unknown_channels', len(self.unknown_ids))
    520         util.trigger_callback('gossip_peers', self.num_peers())
    521         util.trigger_callback('ln_gossip_sync_progress')
    522 
    523     def get_ids_to_query(self) -> Sequence[bytes]:
    524         N = 500
    525         l = list(self.unknown_ids)
    526         self.unknown_ids = set(l[N:])
    527         util.trigger_callback('unknown_channels', len(self.unknown_ids))
    528         util.trigger_callback('ln_gossip_sync_progress')
    529         return l[0:N]
    530 
    531     def get_sync_progress_estimate(self) -> Tuple[Optional[int], Optional[int], Optional[int]]:
    532         """Estimates the gossip synchronization process and returns the number
    533         of synchronized channels, the total channels in the network and a
    534         rescaled percentage of the synchronization process."""
    535         if self.num_peers() == 0:
    536             return None, None, None
    537         nchans_with_0p, nchans_with_1p, nchans_with_2p = self.channel_db.get_num_channels_partitioned_by_policy_count()
    538         num_db_channels = nchans_with_0p + nchans_with_1p + nchans_with_2p
    539         # some channels will never have two policies (only one is in gossip?...)
    540         # so if we have at least 1 policy for a channel, we consider that channel "complete" here
    541         current_est = num_db_channels - nchans_with_0p
    542         total_est = len(self.unknown_ids) + num_db_channels
    543 
    544         progress = current_est / total_est if total_est and current_est else 0
    545         progress_percent = (1.0 / 0.95 * progress) * 100
    546         progress_percent = min(progress_percent, 100)
    547         progress_percent = round(progress_percent)
    548         # take a minimal number of synchronized channels to get a more accurate
    549         # percentage estimate
    550         if current_est < 200:
    551             progress_percent = 0
    552         return current_est, total_est, progress_percent
    553 
    554     async def process_gossip(self, chan_anns, node_anns, chan_upds):
    555         await self.channel_db.data_loaded.wait()
    556         self.logger.debug(f'process_gossip {len(chan_anns)} {len(node_anns)} {len(chan_upds)}')
    557         # note: data processed in chunks to avoid taking sql lock for too long
    558         # channel announcements
    559         for chan_anns_chunk in chunks(chan_anns, 300):
    560             self.channel_db.add_channel_announcement(chan_anns_chunk)
    561         # node announcements
    562         for node_anns_chunk in chunks(node_anns, 100):
    563             self.channel_db.add_node_announcement(node_anns_chunk)
    564         # channel updates
    565         for chan_upds_chunk in chunks(chan_upds, 1000):
    566             categorized_chan_upds = self.channel_db.add_channel_updates(
    567                 chan_upds_chunk, max_age=self.max_age)
    568             orphaned = categorized_chan_upds.orphaned
    569             if orphaned:
    570                 self.logger.info(f'adding {len(orphaned)} unknown channel ids')
    571                 orphaned_ids = [c['short_channel_id'] for c in orphaned]
    572                 await self.add_new_ids(orphaned_ids)
    573             if categorized_chan_upds.good:
    574                 self.logger.debug(f'on_channel_update: {len(categorized_chan_upds.good)}/{len(chan_upds_chunk)}')
    575 
    576 
    577 class LNWallet(LNWorker):
    578 
    579     lnwatcher: Optional['LNWalletWatcher']
    580     MPP_EXPIRY = 120
    581     TIMEOUT_SHUTDOWN_FAIL_PENDING_HTLCS = 3  # seconds
    582 
    583     def __init__(self, wallet: 'Abstract_Wallet', xprv):
    584         self.wallet = wallet
    585         self.db = wallet.db
    586         Logger.__init__(self)
    587         LNWorker.__init__(self, xprv, LNWALLET_FEATURES)
    588         self.config = wallet.config
    589         self.lnwatcher = None
    590         self.lnrater: LNRater = None
    591         self.payments = self.db.get_dict('lightning_payments')     # RHASH -> amount, direction, is_paid
    592         self.preimages = self.db.get_dict('lightning_preimages')   # RHASH -> preimage
    593         # note: this sweep_address is only used as fallback; as it might result in address-reuse
    594         self.sweep_address = wallet.get_new_sweep_address_for_channel()
    595         self.logs = defaultdict(list)  # type: Dict[str, List[HtlcLog]]  # key is RHASH  # (not persisted)
    596         # used in tests
    597         self.enable_htlc_settle = asyncio.Event()
    598         self.enable_htlc_settle.set()
    599         self.enable_htlc_forwarding = asyncio.Event()
    600         self.enable_htlc_forwarding.set()
    601 
    602         # note: accessing channels (besides simple lookup) needs self.lock!
    603         self._channels = {}  # type: Dict[bytes, Channel]
    604         channels = self.db.get_dict("channels")
    605         for channel_id, c in random_shuffled_copy(channels.items()):
    606             self._channels[bfh(channel_id)] = Channel(c, sweep_address=self.sweep_address, lnworker=self)
    607 
    608         self._channel_backups = {}  # type: Dict[bytes, Channel]
    609         channel_backups = self.db.get_dict("channel_backups")
    610         for channel_id, cb in random_shuffled_copy(channel_backups.items()):
    611             self._channel_backups[bfh(channel_id)] = ChannelBackup(cb, sweep_address=self.sweep_address, lnworker=self)
    612 
    613         self.sent_htlcs = defaultdict(asyncio.Queue)  # type: Dict[bytes, asyncio.Queue[HtlcLog]]
    614         self.sent_htlcs_routes = dict()               # (RHASH, scid, htlc_id) -> route, payment_secret, amount_msat, bucket_msat
    615         self.sent_buckets = dict()                    # payment_secret -> (amount_sent, amount_failed)
    616         self.received_mpp_htlcs = dict()                  # RHASH -> mpp_status, htlc_set
    617 
    618         self.swap_manager = SwapManager(wallet=self.wallet, lnworker=self)
    619         # detect inflight payments
    620         self.inflight_payments = set()        # (not persisted) keys of invoices that are in PR_INFLIGHT state
    621         for payment_hash in self.get_payments(status='inflight').keys():
    622             self.set_invoice_status(payment_hash.hex(), PR_INFLIGHT)
    623 
    624         self.trampoline_forwarding_failures = {} # todo: should be persisted
    625 
    626     @property
    627     def channels(self) -> Mapping[bytes, Channel]:
    628         """Returns a read-only copy of channels."""
    629         with self.lock:
    630             return self._channels.copy()
    631 
    632     @property
    633     def channel_backups(self) -> Mapping[bytes, Channel]:
    634         """Returns a read-only copy of channels."""
    635         with self.lock:
    636             return self._channel_backups.copy()
    637 
    638     def get_channel_by_id(self, channel_id: bytes) -> Optional[Channel]:
    639         return self._channels.get(channel_id, None)
    640 
    641     def diagnostic_name(self):
    642         return self.wallet.diagnostic_name()
    643 
    644     @ignore_exceptions
    645     @log_exceptions
    646     async def sync_with_local_watchtower(self):
    647         watchtower = self.network.local_watchtower
    648         if watchtower:
    649             while True:
    650                 for chan in self.channels.values():
    651                     await self.sync_channel_with_watchtower(chan, watchtower.sweepstore)
    652                 await asyncio.sleep(5)
    653 
    654     @ignore_exceptions
    655     @log_exceptions
    656     async def sync_with_remote_watchtower(self):
    657         while True:
    658             # periodically poll if the user updated 'watchtower_url'
    659             await asyncio.sleep(5)
    660             watchtower_url = self.config.get('watchtower_url')
    661             if not watchtower_url:
    662                 continue
    663             parsed_url = urllib.parse.urlparse(watchtower_url)
    664             if not (parsed_url.scheme == 'https' or is_private_netaddress(parsed_url.hostname)):
    665                 self.logger.warning(f"got watchtower URL for remote tower but we won't use it! "
    666                                     f"can only use HTTPS (except if private IP): not using {watchtower_url!r}")
    667                 continue
    668             # try to sync with the remote watchtower
    669             try:
    670                 async with make_aiohttp_session(proxy=self.network.proxy) as session:
    671                     watchtower = JsonRPCClient(session, watchtower_url)
    672                     watchtower.add_method('get_ctn')
    673                     watchtower.add_method('add_sweep_tx')
    674                     for chan in self.channels.values():
    675                         await self.sync_channel_with_watchtower(chan, watchtower)
    676             except aiohttp.client_exceptions.ClientConnectorError:
    677                 self.logger.info(f'could not contact remote watchtower {watchtower_url}')
    678 
    679     async def sync_channel_with_watchtower(self, chan: Channel, watchtower):
    680         outpoint = chan.funding_outpoint.to_str()
    681         addr = chan.get_funding_address()
    682         current_ctn = chan.get_oldest_unrevoked_ctn(REMOTE)
    683         watchtower_ctn = await watchtower.get_ctn(outpoint, addr)
    684         for ctn in range(watchtower_ctn + 1, current_ctn):
    685             sweeptxs = chan.create_sweeptxs(ctn)
    686             for tx in sweeptxs:
    687                 await watchtower.add_sweep_tx(outpoint, ctn, tx.inputs()[0].prevout.to_str(), tx.serialize())
    688 
    689     def start_network(self, network: 'Network'):
    690         assert network
    691         self.network = network
    692         self.config = network.config
    693         self.lnwatcher = LNWalletWatcher(self, network)
    694         self.lnwatcher.start_network(network)
    695         self.swap_manager.start_network(network=network, lnwatcher=self.lnwatcher)
    696         self.lnrater = LNRater(self, network)
    697 
    698         for chan in self.channels.values():
    699             self.lnwatcher.add_channel(chan.funding_outpoint.to_str(), chan.get_funding_address())
    700         for cb in self.channel_backups.values():
    701             self.lnwatcher.add_channel(cb.funding_outpoint.to_str(), cb.get_funding_address())
    702 
    703         for coro in [
    704                 self.maybe_listen(),
    705                 self.lnwatcher.on_network_update('network_updated'), # shortcut (don't block) if funding tx locked and verified
    706                 self.reestablish_peers_and_channels(),
    707                 self.sync_with_local_watchtower(),
    708                 self.sync_with_remote_watchtower(),
    709         ]:
    710             tg_coro = self.taskgroup.spawn(coro)
    711             asyncio.run_coroutine_threadsafe(tg_coro, self.network.asyncio_loop)
    712 
    713     async def stop(self):
    714         self.stopping_soon = True
    715         if self.listen_server:  # stop accepting new peers
    716             self.listen_server.close()
    717         async with ignore_after(self.TIMEOUT_SHUTDOWN_FAIL_PENDING_HTLCS):
    718             await self.wait_for_received_pending_htlcs_to_get_removed()
    719         await LNWorker.stop(self)
    720         if self.lnwatcher:
    721             await self.lnwatcher.stop()
    722             self.lnwatcher = None
    723 
    724     async def wait_for_received_pending_htlcs_to_get_removed(self):
    725         assert self.stopping_soon is True
    726         # We try to fail pending MPP HTLCs, and wait a bit for them to get removed.
    727         # Note: even without MPP, if we just failed/fulfilled an HTLC, it is good
    728         #       to wait a bit for it to become irrevocably removed.
    729         # Note: we don't wait for *all htlcs* to get removed, only for those
    730         #       that we can already fail/fulfill. e.g. forwarded htlcs cannot be removed
    731         async with TaskGroup() as group:
    732             for peer in self.peers.values():
    733                 await group.spawn(peer.wait_one_htlc_switch_iteration())
    734         while True:
    735             if all(not peer.received_htlcs_pending_removal for peer in self.peers.values()):
    736                 break
    737             async with TaskGroup(wait=any) as group:
    738                 for peer in self.peers.values():
    739                     await group.spawn(peer.received_htlc_removed_event.wait())
    740 
    741     def peer_closed(self, peer):
    742         for chan in self.channels_for_peer(peer.pubkey).values():
    743             chan.peer_state = PeerState.DISCONNECTED
    744             util.trigger_callback('channel', self.wallet, chan)
    745         super().peer_closed(peer)
    746 
    747     def get_payments(self, *, status=None):
    748         # return one item per payment_hash
    749         # note: with AMP we will have several channels per payment
    750         out = defaultdict(list)
    751         for chan in self.channels.values():
    752             d = chan.get_payments(status=status)
    753             for k, v in d.items():
    754                 out[k] += v
    755         return out
    756 
    757     def get_payment_value(self, info: Optional['PaymentInfo'], plist):
    758         amount_msat = 0
    759         fee_msat = None
    760         for chan_id, htlc, _direction, _status in plist:
    761             amount_msat += int(_direction) * htlc.amount_msat
    762             if _direction == SENT and info and info.amount_msat:
    763                 fee_msat = (fee_msat or 0) - info.amount_msat - amount_msat
    764         timestamp = min([htlc.timestamp for chan_id, htlc, _direction, _status in plist])
    765         return amount_msat, fee_msat, timestamp
    766 
    767     def get_lightning_history(self):
    768         out = {}
    769         for payment_hash, plist in self.get_payments(status='settled').items():
    770             if len(plist) == 0:
    771                 continue
    772             key = payment_hash.hex()
    773             info = self.get_payment_info(payment_hash)
    774             amount_msat, fee_msat, timestamp = self.get_payment_value(info, plist)
    775             if info is not None:
    776                 label = self.wallet.get_label(key)
    777                 direction = ('sent' if info.direction == SENT else 'received') if len(plist)==1 else 'self-payment'
    778             else:
    779                 direction = 'forwarding'
    780                 label = _('Forwarding')
    781             preimage = self.get_preimage(payment_hash).hex()
    782             item = {
    783                 'type': 'payment',
    784                 'label': label,
    785                 'timestamp': timestamp or 0,
    786                 'date': timestamp_to_datetime(timestamp),
    787                 'direction': direction,
    788                 'amount_msat': amount_msat,
    789                 'fee_msat': fee_msat,
    790                 'payment_hash': key,
    791                 'preimage': preimage,
    792             }
    793             # add group_id to swap transactions
    794             swap = self.swap_manager.get_swap(payment_hash)
    795             if swap:
    796                 if swap.is_reverse:
    797                     item['group_id'] = swap.spending_txid
    798                     item['group_label'] = 'Reverse swap' + ' ' + self.config.format_amount_and_units(swap.lightning_amount)
    799                 else:
    800                     item['group_id'] = swap.funding_txid
    801                     item['group_label'] = 'Forward swap' + ' ' + self.config.format_amount_and_units(swap.onchain_amount)
    802             # done
    803             out[payment_hash] = item
    804         return out
    805 
    806     def get_onchain_history(self):
    807         current_height = self.wallet.get_local_height()
    808         out = {}
    809         # add funding events
    810         for chan in self.channels.values():
    811             item = chan.get_funding_height()
    812             if item is None:
    813                 continue
    814             funding_txid, funding_height, funding_timestamp = item
    815             tx_height = self.lnwatcher.get_tx_height(funding_txid)
    816             item = {
    817                 'channel_id': bh2u(chan.channel_id),
    818                 'type': 'channel_opening',
    819                 'label': self.wallet.get_label_for_txid(funding_txid) or (_('Open channel') + ' ' + chan.get_id_for_log()),
    820                 'txid': funding_txid,
    821                 'amount_msat': chan.balance(LOCAL, ctn=0),
    822                 'direction': 'received',
    823                 'timestamp': tx_height.timestamp,
    824                 'fee_sat': None,
    825                 'fee_msat': None,
    826                 'height': tx_height.height,
    827                 'confirmations': tx_height.conf,
    828             }
    829             out[funding_txid] = item
    830             item = chan.get_closing_height()
    831             if item is None:
    832                 continue
    833             closing_txid, closing_height, closing_timestamp = item
    834             tx_height = self.lnwatcher.get_tx_height(closing_txid)
    835             item = {
    836                 'channel_id': bh2u(chan.channel_id),
    837                 'txid': closing_txid,
    838                 'label': self.wallet.get_label_for_txid(closing_txid) or (_('Close channel') + ' ' + chan.get_id_for_log()),
    839                 'type': 'channel_closure',
    840                 'amount_msat': -chan.balance_minus_outgoing_htlcs(LOCAL),
    841                 'direction': 'sent',
    842                 'timestamp': tx_height.timestamp,
    843                 'fee_sat': None,
    844                 'fee_msat': None,
    845                 'height': tx_height.height,
    846                 'confirmations': tx_height.conf,
    847             }
    848             out[closing_txid] = item
    849         # add info about submarine swaps
    850         settled_payments = self.get_payments(status='settled')
    851         for payment_hash_hex, swap in self.swap_manager.swaps.items():
    852             txid = swap.spending_txid if swap.is_reverse else swap.funding_txid
    853             if txid is None:
    854                 continue
    855             payment_hash = bytes.fromhex(payment_hash_hex)
    856             if payment_hash in settled_payments:
    857                 plist = settled_payments[payment_hash]
    858                 info = self.get_payment_info(payment_hash)
    859                 amount_msat, fee_msat, timestamp = self.get_payment_value(info, plist)
    860             else:
    861                 amount_msat = 0
    862             label = 'Reverse swap' if swap.is_reverse else 'Forward swap'
    863             delta = current_height - swap.locktime
    864             if not swap.is_redeemed and swap.spending_txid is None and delta < 0:
    865                 label += f' (refundable in {-delta} blocks)' # fixme: only if unspent
    866             out[txid] = {
    867                 'txid': txid,
    868                 'group_id': txid,
    869                 'amount_msat': 0,
    870                 #'amount_msat': amount_msat, # must not be added
    871                 'type': 'swap',
    872                 'label': self.wallet.get_label_for_txid(txid) or label,
    873             }
    874         return out
    875 
    876     def get_history(self):
    877         out = list(self.get_lightning_history().values()) + list(self.get_onchain_history().values())
    878         # sort by timestamp
    879         out.sort(key=lambda x: (x.get('timestamp') or float("inf")))
    880         balance_msat = 0
    881         for item in out:
    882             balance_msat += item['amount_msat']
    883             item['balance_msat'] = balance_msat
    884         return out
    885 
    886     def channel_peers(self) -> List[bytes]:
    887         node_ids = [chan.node_id for chan in self.channels.values() if not chan.is_closed()]
    888         return node_ids
    889 
    890     def channels_for_peer(self, node_id):
    891         assert type(node_id) is bytes
    892         return {chan_id: chan for (chan_id, chan) in self.channels.items()
    893                 if chan.node_id == node_id}
    894 
    895     def channel_state_changed(self, chan: Channel):
    896         if type(chan) is Channel:
    897             self.save_channel(chan)
    898         util.trigger_callback('channel', self.wallet, chan)
    899 
    900     def save_channel(self, chan: Channel):
    901         assert type(chan) is Channel
    902         if chan.config[REMOTE].next_per_commitment_point == chan.config[REMOTE].current_per_commitment_point:
    903             raise Exception("Tried to save channel with next_point == current_point, this should not happen")
    904         self.wallet.save_db()
    905         util.trigger_callback('channel', self.wallet, chan)
    906 
    907     def channel_by_txo(self, txo: str) -> Optional[Channel]:
    908         for chan in self.channels.values():
    909             if chan.funding_outpoint.to_str() == txo:
    910                 return chan
    911         for chan in self.channel_backups.values():
    912             if chan.funding_outpoint.to_str() == txo:
    913                 return chan
    914 
    915     async def on_channel_update(self, chan: Channel):
    916         if type(chan) is ChannelBackup:
    917             util.trigger_callback('channel', self.wallet, chan)
    918             return
    919 
    920         if chan.get_state() == ChannelState.OPEN and chan.should_be_closed_due_to_expiring_htlcs(self.network.get_local_height()):
    921             self.logger.info(f"force-closing due to expiring htlcs")
    922             await self.try_force_closing(chan.channel_id)
    923 
    924         elif chan.get_state() == ChannelState.FUNDED:
    925             peer = self._peers.get(chan.node_id)
    926             if peer and peer.is_initialized():
    927                 peer.send_funding_locked(chan)
    928 
    929         elif chan.get_state() == ChannelState.OPEN:
    930             peer = self._peers.get(chan.node_id)
    931             if peer:
    932                 await peer.maybe_update_fee(chan)
    933                 conf = self.lnwatcher.get_tx_height(chan.funding_outpoint.txid).conf
    934                 peer.on_network_update(chan, conf)
    935 
    936         elif chan.get_state() == ChannelState.FORCE_CLOSING:
    937             force_close_tx = chan.force_close_tx()
    938             txid = force_close_tx.txid()
    939             height = self.lnwatcher.get_tx_height(txid).height
    940             if height == TX_HEIGHT_LOCAL:
    941                 self.logger.info('REBROADCASTING CLOSING TX')
    942                 await self.network.try_broadcasting(force_close_tx, 'force-close')
    943 
    944     @log_exceptions
    945     async def _open_channel_coroutine(
    946             self, *,
    947             connect_str: str,
    948             funding_tx: PartialTransaction,
    949             funding_sat: int,
    950             push_sat: int,
    951             password: Optional[str]) -> Tuple[Channel, PartialTransaction]:
    952 
    953         peer = await self.add_peer(connect_str)
    954         coro = peer.channel_establishment_flow(
    955             funding_tx=funding_tx,
    956             funding_sat=funding_sat,
    957             push_msat=push_sat * 1000,
    958             temp_channel_id=os.urandom(32))
    959         chan, funding_tx = await asyncio.wait_for(coro, LN_P2P_NETWORK_TIMEOUT)
    960         util.trigger_callback('channels_updated', self.wallet)
    961         self.wallet.add_transaction(funding_tx)  # save tx as local into the wallet
    962         self.wallet.sign_transaction(funding_tx, password)
    963         self.wallet.set_label(funding_tx.txid(), _('Open channel'))
    964         if funding_tx.is_complete():
    965             await self.network.try_broadcasting(funding_tx, 'open_channel')
    966         return chan, funding_tx
    967 
    968     def add_channel(self, chan: Channel):
    969         with self.lock:
    970             self._channels[chan.channel_id] = chan
    971         self.lnwatcher.add_channel(chan.funding_outpoint.to_str(), chan.get_funding_address())
    972 
    973     def add_new_channel(self, chan: Channel):
    974         self.add_channel(chan)
    975         channels_db = self.db.get_dict('channels')
    976         channels_db[chan.channel_id.hex()] = chan.storage
    977         for addr in chan.get_wallet_addresses_channel_might_want_reserved():
    978             self.wallet.set_reserved_state_of_address(addr, reserved=True)
    979         try:
    980             self.save_channel(chan)
    981             self.wallet.save_backup()
    982         except:
    983             chan.set_state(ChannelState.REDEEMED)
    984             self.remove_channel(chan.channel_id)
    985             raise
    986 
    987     def mktx_for_open_channel(self, *, coins: Sequence[PartialTxInput], funding_sat: int,
    988                               fee_est=None) -> PartialTransaction:
    989         dummy_address = ln_dummy_address()
    990         outputs = [PartialTxOutput.from_address_and_value(dummy_address, funding_sat)]
    991         tx = self.wallet.make_unsigned_transaction(
    992             coins=coins,
    993             outputs=outputs,
    994             fee=fee_est)
    995         tx.set_rbf(False)
    996         return tx
    997 
    998     def open_channel(self, *, connect_str: str, funding_tx: PartialTransaction,
    999                      funding_sat: int, push_amt_sat: int, password: str = None) -> Tuple[Channel, PartialTransaction]:
   1000         if funding_sat > LN_MAX_FUNDING_SAT:
   1001             raise Exception(_("Requested channel capacity is over protocol allowed maximum."))
   1002         coro = self._open_channel_coroutine(
   1003             connect_str=connect_str, funding_tx=funding_tx, funding_sat=funding_sat,
   1004             push_sat=push_amt_sat, password=password)
   1005         fut = asyncio.run_coroutine_threadsafe(coro, self.network.asyncio_loop)
   1006         try:
   1007             chan, funding_tx = fut.result()
   1008         except concurrent.futures.TimeoutError:
   1009             raise Exception(_("open_channel timed out"))
   1010         return chan, funding_tx
   1011 
   1012     def get_channel_by_short_id(self, short_channel_id: bytes) -> Optional[Channel]:
   1013         for chan in self.channels.values():
   1014             if chan.short_channel_id == short_channel_id:
   1015                 return chan
   1016 
   1017     def create_routes_from_invoice(self, amount_msat: int, decoded_invoice: LnAddr, *, full_path=None):
   1018         return self.create_routes_for_payment(
   1019             amount_msat=amount_msat,
   1020             final_total_msat=amount_msat,
   1021             invoice_pubkey=decoded_invoice.pubkey.serialize(),
   1022             min_cltv_expiry=decoded_invoice.get_min_final_cltv_expiry(),
   1023             r_tags=decoded_invoice.get_routing_info('r'),
   1024             invoice_features=decoded_invoice.get_features(),
   1025             trampoline_fee_level=0,
   1026             use_two_trampolines=False,
   1027             payment_hash=decoded_invoice.paymenthash,
   1028             payment_secret=decoded_invoice.payment_secret,
   1029             full_path=full_path)
   1030 
   1031     @log_exceptions
   1032     async def pay_invoice(
   1033             self, invoice: str, *,
   1034             amount_msat: int = None,
   1035             attempts: int = 1,
   1036             full_path: LNPaymentPath = None) -> Tuple[bool, List[HtlcLog]]:
   1037 
   1038         lnaddr = self._check_invoice(invoice, amount_msat=amount_msat)
   1039         min_cltv_expiry = lnaddr.get_min_final_cltv_expiry()
   1040         payment_hash = lnaddr.paymenthash
   1041         key = payment_hash.hex()
   1042         payment_secret = lnaddr.payment_secret
   1043         invoice_pubkey = lnaddr.pubkey.serialize()
   1044         invoice_features = lnaddr.get_features()
   1045         r_tags = lnaddr.get_routing_info('r')
   1046         amount_to_pay = lnaddr.get_amount_msat()
   1047         status = self.get_payment_status(payment_hash)
   1048         if status == PR_PAID:
   1049             raise PaymentFailure(_("This invoice has been paid already"))
   1050         if status == PR_INFLIGHT:
   1051             raise PaymentFailure(_("A payment was already initiated for this invoice"))
   1052         if payment_hash in self.get_payments(status='inflight'):
   1053             raise PaymentFailure(_("A previous attempt to pay this invoice did not clear"))
   1054         info = PaymentInfo(payment_hash, amount_to_pay, SENT, PR_UNPAID)
   1055         self.save_payment_info(info)
   1056         self.wallet.set_label(key, lnaddr.get_description())
   1057 
   1058         self.set_invoice_status(key, PR_INFLIGHT)
   1059         try:
   1060             await self.pay_to_node(
   1061                 node_pubkey=invoice_pubkey,
   1062                 payment_hash=payment_hash,
   1063                 payment_secret=payment_secret,
   1064                 amount_to_pay=amount_to_pay,
   1065                 min_cltv_expiry=min_cltv_expiry,
   1066                 r_tags=r_tags,
   1067                 invoice_features=invoice_features,
   1068                 attempts=attempts,
   1069                 full_path=full_path)
   1070             success = True
   1071         except PaymentFailure as e:
   1072             self.logger.exception('')
   1073             success = False
   1074             reason = str(e)
   1075         if success:
   1076             self.set_invoice_status(key, PR_PAID)
   1077             util.trigger_callback('payment_succeeded', self.wallet, key)
   1078         else:
   1079             self.set_invoice_status(key, PR_UNPAID)
   1080             util.trigger_callback('payment_failed', self.wallet, key, reason)
   1081         log = self.logs[key]
   1082         return success, log
   1083 
   1084     async def pay_to_node(
   1085             self, *,
   1086             node_pubkey: bytes,
   1087             payment_hash: bytes,
   1088             payment_secret: Optional[bytes],
   1089             amount_to_pay: int,  # in msat
   1090             min_cltv_expiry: int,
   1091             r_tags,
   1092             invoice_features: int,
   1093             attempts: int = 1,
   1094             full_path: LNPaymentPath = None,
   1095             fwd_trampoline_onion=None,
   1096             fwd_trampoline_fee=None,
   1097             fwd_trampoline_cltv_delta=None) -> None:
   1098 
   1099         if fwd_trampoline_onion:
   1100             # todo: compare to the fee of the actual route we found
   1101             if fwd_trampoline_fee < 1000:
   1102                 raise OnionRoutingFailure(code=OnionFailureCode.TRAMPOLINE_FEE_INSUFFICIENT, data=b'')
   1103             if fwd_trampoline_cltv_delta < 576:
   1104                 raise OnionRoutingFailure(code=OnionFailureCode.TRAMPOLINE_EXPIRY_TOO_SOON, data=b'')
   1105 
   1106         self.logs[payment_hash.hex()] = log = []
   1107         trampoline_fee_level = 0   # only used for trampoline payments
   1108         use_two_trampolines = True # only used for pay to legacy
   1109 
   1110         amount_inflight = 0  # what we sent in htlcs (that receiver gets, without fees)
   1111         while True:
   1112             amount_to_send = amount_to_pay - amount_inflight
   1113             if amount_to_send > 0:
   1114                 # 1. create a set of routes for remaining amount.
   1115                 # note: path-finding runs in a separate thread so that we don't block the asyncio loop
   1116                 # graph updates might occur during the computation
   1117                 routes = await run_in_thread(partial(
   1118                     self.create_routes_for_payment,
   1119                     amount_msat=amount_to_send,
   1120                     final_total_msat=amount_to_pay,
   1121                     invoice_pubkey=node_pubkey,
   1122                     min_cltv_expiry=min_cltv_expiry,
   1123                     r_tags=r_tags,
   1124                     invoice_features=invoice_features,
   1125                     full_path=full_path,
   1126                     payment_hash=payment_hash,
   1127                     payment_secret=payment_secret,
   1128                     trampoline_fee_level=trampoline_fee_level,
   1129                     use_two_trampolines=use_two_trampolines,
   1130                     fwd_trampoline_onion=fwd_trampoline_onion))
   1131                 # 2. send htlcs
   1132                 for route, amount_msat, total_msat, amount_receiver_msat, cltv_delta, bucket_payment_secret, trampoline_onion in routes:
   1133                     amount_inflight += amount_receiver_msat
   1134                     if amount_inflight > amount_to_pay:  # safety belts
   1135                         raise Exception(f"amount_inflight={amount_inflight} > amount_to_pay={amount_to_pay}")
   1136                     await self.pay_to_route(
   1137                         route=route,
   1138                         amount_msat=amount_msat,
   1139                         total_msat=total_msat,
   1140                         amount_receiver_msat=amount_receiver_msat,
   1141                         payment_hash=payment_hash,
   1142                         payment_secret=bucket_payment_secret,
   1143                         min_cltv_expiry=cltv_delta,
   1144                         trampoline_onion=trampoline_onion)
   1145                 util.trigger_callback('invoice_status', self.wallet, payment_hash.hex())
   1146             # 3. await a queue
   1147             self.logger.info(f"amount inflight {amount_inflight}")
   1148             htlc_log = await self.sent_htlcs[payment_hash].get()
   1149             amount_inflight -= htlc_log.amount_msat
   1150             if amount_inflight < 0:
   1151                 raise Exception(f"amount_inflight={amount_inflight} < 0")
   1152             log.append(htlc_log)
   1153             if htlc_log.success:
   1154                 return
   1155             # htlc failed
   1156             if len(log) >= attempts:
   1157                 raise PaymentFailure('Giving up after %d attempts'%len(log))
   1158             # if we get a tmp channel failure, it might work to split the amount and try more routes
   1159             # if we get a channel update, we might retry the same route and amount
   1160             route = htlc_log.route
   1161             sender_idx = htlc_log.sender_idx
   1162             failure_msg = htlc_log.failure_msg
   1163             code, data = failure_msg.code, failure_msg.data
   1164             self.logger.info(f"UPDATE_FAIL_HTLC {repr(code)} {data}")
   1165             self.logger.info(f"error reported by {bh2u(route[sender_idx].node_id)}")
   1166             if code == OnionFailureCode.MPP_TIMEOUT:
   1167                 raise PaymentFailure(failure_msg.code_name())
   1168             # trampoline
   1169             if self.channel_db is None:
   1170                 if code == OnionFailureCode.TRAMPOLINE_FEE_INSUFFICIENT:
   1171                     # todo: parse the node parameters here (not returned by eclair yet)
   1172                     trampoline_fee_level += 1
   1173                     continue
   1174                 elif use_two_trampolines:
   1175                     use_two_trampolines = False
   1176                 else:
   1177                     raise PaymentFailure(failure_msg.code_name())
   1178             else:
   1179                 self.handle_error_code_from_failed_htlc(route, sender_idx, failure_msg, code, data)
   1180 
   1181     async def pay_to_route(
   1182             self, *,
   1183             route: LNPaymentRoute,
   1184             amount_msat: int,
   1185             total_msat: int,
   1186             amount_receiver_msat:int,
   1187             payment_hash: bytes,
   1188             payment_secret: Optional[bytes],
   1189             min_cltv_expiry: int,
   1190             trampoline_onion: bytes = None) -> None:
   1191 
   1192         # send a single htlc
   1193         short_channel_id = route[0].short_channel_id
   1194         chan = self.get_channel_by_short_id(short_channel_id)
   1195         peer = self._peers.get(route[0].node_id)
   1196         if not peer:
   1197             raise Exception('Dropped peer')
   1198         await peer.initialized
   1199         htlc = peer.pay(
   1200             route=route,
   1201             chan=chan,
   1202             amount_msat=amount_msat,
   1203             total_msat=total_msat,
   1204             payment_hash=payment_hash,
   1205             min_final_cltv_expiry=min_cltv_expiry,
   1206             payment_secret=payment_secret,
   1207             trampoline_onion=trampoline_onion)
   1208 
   1209         key = (payment_hash, short_channel_id, htlc.htlc_id)
   1210         self.sent_htlcs_routes[key] = route, payment_secret, amount_msat, total_msat, amount_receiver_msat
   1211         # if we sent MPP to a trampoline, add item to sent_buckets
   1212         if not self.channel_db and amount_msat != total_msat:
   1213             if payment_secret not in self.sent_buckets:
   1214                 self.sent_buckets[payment_secret] = (0, 0)
   1215             amount_sent, amount_failed = self.sent_buckets[payment_secret]
   1216             amount_sent += amount_receiver_msat
   1217             self.sent_buckets[payment_secret] = amount_sent, amount_failed
   1218         util.trigger_callback('htlc_added', chan, htlc, SENT)
   1219 
   1220 
   1221     def handle_error_code_from_failed_htlc(self, route, sender_idx, failure_msg, code, data):
   1222         # handle some specific error codes
   1223         failure_codes = {
   1224             OnionFailureCode.TEMPORARY_CHANNEL_FAILURE: 0,
   1225             OnionFailureCode.AMOUNT_BELOW_MINIMUM: 8,
   1226             OnionFailureCode.FEE_INSUFFICIENT: 8,
   1227             OnionFailureCode.INCORRECT_CLTV_EXPIRY: 4,
   1228             OnionFailureCode.EXPIRY_TOO_SOON: 0,
   1229             OnionFailureCode.CHANNEL_DISABLED: 2,
   1230         }
   1231         blacklist = False
   1232         update = False
   1233         if code in failure_codes:
   1234             offset = failure_codes[code]
   1235             channel_update_len = int.from_bytes(data[offset:offset+2], byteorder="big")
   1236             channel_update_as_received = data[offset+2: offset+2+channel_update_len]
   1237             payload = self._decode_channel_update_msg(channel_update_as_received)
   1238             if payload is None:
   1239                 self.logger.info(f'could not decode channel_update for failed htlc: {channel_update_as_received.hex()}')
   1240                 blacklist = True
   1241             else:
   1242                 r = self.channel_db.add_channel_update(payload)
   1243                 short_channel_id = ShortChannelID(payload['short_channel_id'])
   1244                 if r == UpdateStatus.GOOD:
   1245                     self.logger.info(f"applied channel update to {short_channel_id}")
   1246                     # TODO: test this
   1247                     for chan in self.channels.values():
   1248                         if chan.short_channel_id == short_channel_id:
   1249                             chan.set_remote_update(payload['raw'])
   1250                     update = True
   1251                 elif r == UpdateStatus.ORPHANED:
   1252                     # maybe it is a private channel (and data in invoice was outdated)
   1253                     self.logger.info(f"Could not find {short_channel_id}. maybe update is for private channel?")
   1254                     start_node_id = route[sender_idx].node_id
   1255                     update = self.channel_db.add_channel_update_for_private_channel(payload, start_node_id)
   1256                     blacklist = not update
   1257                 elif r == UpdateStatus.EXPIRED:
   1258                     blacklist = True
   1259                 elif r == UpdateStatus.DEPRECATED:
   1260                     self.logger.info(f'channel update is not more recent.')
   1261                     blacklist = True
   1262                 elif r == UpdateStatus.UNCHANGED:
   1263                     blacklist = True
   1264         else:
   1265             blacklist = True
   1266 
   1267         if blacklist:
   1268             # blacklist channel after reporter node
   1269             # TODO this should depend on the error (even more granularity)
   1270             # also, we need finer blacklisting (directed edges; nodes)
   1271             if sender_idx is None:
   1272                 raise PaymentFailure(failure_msg.code_name())
   1273             try:
   1274                 short_chan_id = route[sender_idx + 1].short_channel_id
   1275             except IndexError:
   1276                 raise PaymentFailure('payment destination reported error')
   1277             # TODO: for MPP we need to save the amount for which
   1278             # we saw temporary channel failure
   1279             self.logger.info(f'blacklisting channel {short_chan_id}')
   1280             self.network.channel_blacklist.add(short_chan_id)
   1281 
   1282         # we should not continue if we did not blacklist or update anything
   1283         if not (blacklist or update):
   1284             raise PaymentFailure(failure_msg.code_name())
   1285 
   1286     @classmethod
   1287     def _decode_channel_update_msg(cls, chan_upd_msg: bytes) -> Optional[Dict[str, Any]]:
   1288         channel_update_as_received = chan_upd_msg
   1289         channel_update_typed = (258).to_bytes(length=2, byteorder="big") + channel_update_as_received
   1290         # note: some nodes put channel updates in error msgs with the leading msg_type already there.
   1291         #       we try decoding both ways here.
   1292         try:
   1293             message_type, payload = decode_msg(channel_update_typed)
   1294             if payload['chain_hash'] != constants.net.rev_genesis_bytes(): raise Exception()
   1295             payload['raw'] = channel_update_typed
   1296             return payload
   1297         except:  # FIXME: too broad
   1298             try:
   1299                 message_type, payload = decode_msg(channel_update_as_received)
   1300                 if payload['chain_hash'] != constants.net.rev_genesis_bytes(): raise Exception()
   1301                 payload['raw'] = channel_update_as_received
   1302                 return payload
   1303             except:
   1304                 return None
   1305 
   1306     @staticmethod
   1307     def _check_invoice(invoice: str, *, amount_msat: int = None) -> LnAddr:
   1308         addr = lndecode(invoice, expected_hrp=constants.net.SEGWIT_HRP)
   1309         if addr.is_expired():
   1310             raise InvoiceError(_("This invoice has expired"))
   1311         if amount_msat:  # replace amt in invoice. main usecase is paying zero amt invoices
   1312             existing_amt_msat = addr.get_amount_msat()
   1313             if existing_amt_msat and amount_msat < existing_amt_msat:
   1314                 raise Exception("cannot pay lower amt than what is originally in LN invoice")
   1315             addr.amount = Decimal(amount_msat) / COIN / 1000
   1316         if addr.amount is None:
   1317             raise InvoiceError(_("Missing amount"))
   1318         if addr.get_min_final_cltv_expiry() > lnutil.NBLOCK_CLTV_EXPIRY_TOO_FAR_INTO_FUTURE:
   1319             raise InvoiceError("{}\n{}".format(
   1320                 _("Invoice wants us to risk locking funds for unreasonably long."),
   1321                 f"min_final_cltv_expiry: {addr.get_min_final_cltv_expiry()}"))
   1322         return addr
   1323 
   1324     def is_trampoline_peer(self, node_id: bytes) -> bool:
   1325         # until trampoline is advertised in lnfeatures, check against hardcoded list
   1326         if is_hardcoded_trampoline(node_id):
   1327             return True
   1328         peer = self._peers.get(node_id)
   1329         if peer and peer.their_features.supports(LnFeatures.OPTION_TRAMPOLINE_ROUTING_OPT):
   1330             return True
   1331         return False
   1332 
   1333     def suggest_peer(self) -> Optional[bytes]:
   1334         if self.channel_db:
   1335             return self.lnrater.suggest_peer()
   1336         else:
   1337             return random.choice(list(hardcoded_trampoline_nodes().values())).pubkey
   1338 
   1339     @profiler
   1340     def create_routes_for_payment(
   1341             self, *,
   1342             amount_msat: int,        # part of payment amount we want routes for now
   1343             final_total_msat: int,   # total payment amount final receiver will get
   1344             invoice_pubkey,
   1345             min_cltv_expiry,
   1346             r_tags,
   1347             invoice_features: int,
   1348             payment_hash,
   1349             payment_secret,
   1350             trampoline_fee_level: int,
   1351             use_two_trampolines: bool,
   1352             fwd_trampoline_onion = None,
   1353             full_path: LNPaymentPath = None) -> Sequence[Tuple[LNPaymentRoute, int]]:
   1354 
   1355         """Creates multiple routes for splitting a payment over the available
   1356         private channels.
   1357 
   1358         We first try to conduct the payment over a single channel. If that fails
   1359         and mpp is supported by the receiver, we will split the payment."""
   1360         # It could happen that the pathfinding uses a channel
   1361         # in the graph multiple times, meaning we could exhaust
   1362         # its capacity. This could be dealt with by temporarily
   1363         # iteratively blacklisting channels for this mpp attempt.
   1364         invoice_features = LnFeatures(invoice_features)
   1365         trampoline_features = LnFeatures.VAR_ONION_OPT
   1366         local_height = self.network.get_local_height()
   1367         try:
   1368             # try to send over a single channel
   1369             if not self.channel_db:
   1370                 for chan in self.channels.values():
   1371                     if not self.is_trampoline_peer(chan.node_id):
   1372                         continue
   1373                     if chan.is_frozen_for_sending():
   1374                         continue
   1375                     trampoline_onion, amount_with_fees, cltv_delta = create_trampoline_route_and_onion(
   1376                         amount_msat=amount_msat,
   1377                         total_msat=final_total_msat,
   1378                         min_cltv_expiry=min_cltv_expiry,
   1379                         my_pubkey=self.node_keypair.pubkey,
   1380                         invoice_pubkey=invoice_pubkey,
   1381                         invoice_features=invoice_features,
   1382                         node_id=chan.node_id,
   1383                         r_tags=r_tags,
   1384                         payment_hash=payment_hash,
   1385                         payment_secret=payment_secret,
   1386                         local_height=local_height,
   1387                         trampoline_fee_level=trampoline_fee_level,
   1388                         use_two_trampolines=use_two_trampolines)
   1389                     trampoline_payment_secret = os.urandom(32)
   1390                     if chan.available_to_spend(LOCAL, strict=True) < amount_with_fees:
   1391                         continue
   1392                     route = [
   1393                         RouteEdge(
   1394                             start_node=self.node_keypair.pubkey,
   1395                             end_node=chan.node_id,
   1396                             short_channel_id=chan.short_channel_id,
   1397                             fee_base_msat=0,
   1398                             fee_proportional_millionths=0,
   1399                             cltv_expiry_delta=0,
   1400                             node_features=trampoline_features)
   1401                     ]
   1402                     routes = [(route, amount_with_fees, amount_with_fees, amount_msat, cltv_delta, trampoline_payment_secret, trampoline_onion)]
   1403                     break
   1404                 else:
   1405                     raise NoPathFound()
   1406             else:
   1407                 route = self.create_route_for_payment(
   1408                     amount_msat=amount_msat,
   1409                     invoice_pubkey=invoice_pubkey,
   1410                     min_cltv_expiry=min_cltv_expiry,
   1411                     r_tags=r_tags,
   1412                     invoice_features=invoice_features,
   1413                     outgoing_channel=None, full_path=full_path)
   1414                 routes = [(route, amount_msat, final_total_msat, amount_msat, min_cltv_expiry, payment_secret, fwd_trampoline_onion)]
   1415         except NoPathFound:
   1416             if not invoice_features.supports(LnFeatures.BASIC_MPP_OPT):
   1417                 raise
   1418             channels_with_funds = dict([
   1419                 (cid, int(chan.available_to_spend(HTLCOwner.LOCAL)))
   1420                 for cid, chan in self._channels.items() if not chan.is_frozen_for_sending()])
   1421             self.logger.info(f"channels_with_funds: {channels_with_funds}")
   1422             # Create split configurations that are rated according to our
   1423             # preference -funds = (low rating=high preference).
   1424             split_configurations = suggest_splits(amount_msat, channels_with_funds)
   1425             self.logger.info(f'suggest_split {amount_msat} returned {len(split_configurations)} configurations')
   1426             for s in split_configurations:
   1427                 self.logger.info(f"trying split configuration: {s[0].values()} rating: {s[1]}")
   1428                 routes = []
   1429                 try:
   1430                     if not self.channel_db:
   1431                         buckets = defaultdict(list)
   1432                         for chan_id, part_amount_msat in s[0].items():
   1433                             chan = self.channels[chan_id]
   1434                             if part_amount_msat:
   1435                                 buckets[chan.node_id].append((chan_id, part_amount_msat))
   1436                         for node_id, bucket in buckets.items():
   1437                             bucket_amount_msat = sum([x[1] for x in bucket])
   1438                             trampoline_onion, bucket_amount_with_fees, bucket_cltv_delta = create_trampoline_route_and_onion(
   1439                                 amount_msat=bucket_amount_msat,
   1440                                 total_msat=final_total_msat,
   1441                                 min_cltv_expiry=min_cltv_expiry,
   1442                                 my_pubkey=self.node_keypair.pubkey,
   1443                                 invoice_pubkey=invoice_pubkey,
   1444                                 invoice_features=invoice_features,
   1445                                 node_id=node_id,
   1446                                 r_tags=r_tags,
   1447                                 payment_hash=payment_hash,
   1448                                 payment_secret=payment_secret,
   1449                                 local_height=local_height,
   1450                                 trampoline_fee_level=trampoline_fee_level,
   1451                                 use_two_trampolines=use_two_trampolines)
   1452                             # node_features is only used to determine is_tlv
   1453                             bucket_payment_secret = os.urandom(32)
   1454                             bucket_fees = bucket_amount_with_fees - bucket_amount_msat
   1455                             self.logger.info(f'bucket_fees {bucket_fees}')
   1456                             for chan_id, part_amount_msat in bucket:
   1457                                 chan = self.channels[chan_id]
   1458                                 margin = chan.available_to_spend(LOCAL, strict=True) - part_amount_msat
   1459                                 delta_fee = min(bucket_fees, margin)
   1460                                 part_amount_msat_with_fees = part_amount_msat + delta_fee
   1461                                 bucket_fees -= delta_fee
   1462                                 route = [
   1463                                     RouteEdge(
   1464                                         start_node=self.node_keypair.pubkey,
   1465                                         end_node=node_id,
   1466                                         short_channel_id=chan.short_channel_id,
   1467                                         fee_base_msat=0,
   1468                                         fee_proportional_millionths=0,
   1469                                         cltv_expiry_delta=0,
   1470                                         node_features=trampoline_features)
   1471                                 ]
   1472                                 self.logger.info(f'adding route {part_amount_msat} {delta_fee} {margin}')
   1473                                 routes.append((route, part_amount_msat_with_fees, bucket_amount_with_fees, part_amount_msat, bucket_cltv_delta, bucket_payment_secret, trampoline_onion))
   1474                             if bucket_fees != 0:
   1475                                 self.logger.info('not enough margin to pay trampoline fee')
   1476                                 raise NoPathFound()
   1477                     else:
   1478                         for chan_id, part_amount_msat in s[0].items():
   1479                             if part_amount_msat:
   1480                                 channel = self.channels[chan_id]
   1481                                 route = self.create_route_for_payment(
   1482                                     amount_msat=part_amount_msat,
   1483                                     invoice_pubkey=invoice_pubkey,
   1484                                     min_cltv_expiry=min_cltv_expiry,
   1485                                     r_tags=r_tags,
   1486                                     invoice_features=invoice_features,
   1487                                     outgoing_channel=channel, full_path=None)
   1488                                 routes.append((route, part_amount_msat, final_total_msat, part_amount_msat, min_cltv_expiry, payment_secret, fwd_trampoline_onion))
   1489                     self.logger.info(f"found acceptable split configuration: {list(s[0].values())} rating: {s[1]}")
   1490                     break
   1491                 except NoPathFound:
   1492                     continue
   1493             else:
   1494                 raise NoPathFound()
   1495         return routes
   1496 
   1497     def create_route_for_payment(
   1498             self, *,
   1499             amount_msat: int,
   1500             invoice_pubkey: bytes,
   1501             min_cltv_expiry: int,
   1502             r_tags,
   1503             invoice_features: int,
   1504             outgoing_channel: Channel = None,
   1505             full_path: Optional[LNPaymentPath]) -> Tuple[LNPaymentRoute, int]:
   1506 
   1507         channels = [outgoing_channel] if outgoing_channel else list(self.channels.values())
   1508         scid_to_my_channels = {
   1509             chan.short_channel_id: chan for chan in channels
   1510             if chan.short_channel_id is not None
   1511         }
   1512         blacklist = self.network.channel_blacklist.get_current_list()
   1513         # Collect all private edges from route hints.
   1514         # Note: if some route hints are multiple edges long, and these paths cross each other,
   1515         #       we allow our path finding to cross the paths; i.e. the route hints are not isolated.
   1516         private_route_edges = {}  # type: Dict[ShortChannelID, RouteEdge]
   1517         for private_path in r_tags:
   1518             # we need to shift the node pubkey by one towards the destination:
   1519             private_path_nodes = [edge[0] for edge in private_path][1:] + [invoice_pubkey]
   1520             private_path_rest = [edge[1:] for edge in private_path]
   1521             start_node = private_path[0][0]
   1522             for end_node, edge_rest in zip(private_path_nodes, private_path_rest):
   1523                 short_channel_id, fee_base_msat, fee_proportional_millionths, cltv_expiry_delta = edge_rest
   1524                 short_channel_id = ShortChannelID(short_channel_id)
   1525                 # if we have a routing policy for this edge in the db, that takes precedence,
   1526                 # as it is likely from a previous failure
   1527                 channel_policy = self.channel_db.get_policy_for_node(
   1528                     short_channel_id=short_channel_id,
   1529                     node_id=start_node,
   1530                     my_channels=scid_to_my_channels)
   1531                 if channel_policy:
   1532                     fee_base_msat = channel_policy.fee_base_msat
   1533                     fee_proportional_millionths = channel_policy.fee_proportional_millionths
   1534                     cltv_expiry_delta = channel_policy.cltv_expiry_delta
   1535                 node_info = self.channel_db.get_node_info_for_node_id(node_id=end_node)
   1536                 route_edge = RouteEdge(
   1537                         start_node=start_node,
   1538                         end_node=end_node,
   1539                         short_channel_id=short_channel_id,
   1540                         fee_base_msat=fee_base_msat,
   1541                         fee_proportional_millionths=fee_proportional_millionths,
   1542                         cltv_expiry_delta=cltv_expiry_delta,
   1543                         node_features=node_info.features if node_info else 0)
   1544                 if route_edge.short_channel_id not in blacklist:
   1545                     private_route_edges[route_edge.short_channel_id] = route_edge
   1546                 start_node = end_node
   1547         # now find a route, end to end: between us and the recipient
   1548         try:
   1549             route = self.network.path_finder.find_route(
   1550                 nodeA=self.node_keypair.pubkey,
   1551                 nodeB=invoice_pubkey,
   1552                 invoice_amount_msat=amount_msat,
   1553                 path=full_path,
   1554                 my_channels=scid_to_my_channels,
   1555                 blacklist=blacklist,
   1556                 private_route_edges=private_route_edges)
   1557         except NoChannelPolicy as e:
   1558             raise NoPathFound() from e
   1559         if not route:
   1560             raise NoPathFound()
   1561         # test sanity
   1562         if not is_route_sane_to_use(route, amount_msat, min_cltv_expiry):
   1563             self.logger.info(f"rejecting insane route {route}")
   1564             raise NoPathFound()
   1565         assert len(route) > 0
   1566         if route[-1].end_node != invoice_pubkey:
   1567             raise LNPathInconsistent("last node_id != invoice pubkey")
   1568         # add features from invoice
   1569         route[-1].node_features |= invoice_features
   1570         return route
   1571 
   1572     def add_request(self, amount_sat, message, expiry) -> str:
   1573         coro = self._add_request_coro(amount_sat, message, expiry)
   1574         fut = asyncio.run_coroutine_threadsafe(coro, self.network.asyncio_loop)
   1575         try:
   1576             return fut.result(timeout=5)
   1577         except concurrent.futures.TimeoutError:
   1578             raise Exception(_("add invoice timed out"))
   1579 
   1580     @log_exceptions
   1581     async def create_invoice(
   1582             self, *,
   1583             amount_msat: Optional[int],
   1584             message: str,
   1585             expiry: int) -> Tuple[LnAddr, str]:
   1586 
   1587         timestamp = int(time.time())
   1588         routing_hints = await self._calc_routing_hints_for_invoice(amount_msat)
   1589         if not routing_hints:
   1590             self.logger.info(
   1591                 "Warning. No routing hints added to invoice. "
   1592                 "Other clients will likely not be able to send to us.")
   1593         # if not all hints are trampoline, do not create trampoline invoice
   1594         invoice_features = self.features.for_invoice()
   1595         trampoline_hints = []
   1596         for r in routing_hints:
   1597             node_id, short_channel_id, fee_base_msat, fee_proportional_millionths, cltv_expiry_delta = r[1][0]
   1598             if len(r[1])== 1 and self.is_trampoline_peer(node_id):
   1599                 trampoline_hints.append(('t', (node_id, fee_base_msat, fee_proportional_millionths, cltv_expiry_delta)))
   1600         payment_preimage = os.urandom(32)
   1601         payment_hash = sha256(payment_preimage)
   1602         info = PaymentInfo(payment_hash, amount_msat, RECEIVED, PR_UNPAID)
   1603         amount_btc = amount_msat/Decimal(COIN*1000) if amount_msat else None
   1604         if expiry == 0:
   1605             expiry = LN_EXPIRY_NEVER
   1606         lnaddr = LnAddr(
   1607             paymenthash=payment_hash,
   1608             amount=amount_btc,
   1609             tags=[
   1610                 ('d', message),
   1611                 ('c', MIN_FINAL_CLTV_EXPIRY_FOR_INVOICE),
   1612                 ('x', expiry),
   1613                 ('9', invoice_features)]
   1614             + routing_hints
   1615             + trampoline_hints,
   1616             date=timestamp,
   1617             payment_secret=derive_payment_secret_from_payment_preimage(payment_preimage))
   1618         invoice = lnencode(lnaddr, self.node_keypair.privkey)
   1619         self.save_preimage(payment_hash, payment_preimage)
   1620         self.save_payment_info(info)
   1621         return lnaddr, invoice
   1622 
   1623     async def _add_request_coro(self, amount_sat: Optional[int], message, expiry: int) -> str:
   1624         amount_msat = amount_sat * 1000 if amount_sat is not None else None
   1625         lnaddr, invoice = await self.create_invoice(
   1626             amount_msat=amount_msat,
   1627             message=message,
   1628             expiry=expiry)
   1629         key = bh2u(lnaddr.paymenthash)
   1630         req = LNInvoice.from_bech32(invoice)
   1631         self.wallet.add_payment_request(req)
   1632         self.wallet.set_label(key, message)
   1633         return key
   1634 
   1635     def save_preimage(self, payment_hash: bytes, preimage: bytes):
   1636         assert sha256(preimage) == payment_hash
   1637         self.preimages[bh2u(payment_hash)] = bh2u(preimage)
   1638         self.wallet.save_db()
   1639 
   1640     def get_preimage(self, payment_hash: bytes) -> Optional[bytes]:
   1641         r = self.preimages.get(bh2u(payment_hash))
   1642         return bfh(r) if r else None
   1643 
   1644     def get_payment_info(self, payment_hash: bytes) -> Optional[PaymentInfo]:
   1645         """returns None if payment_hash is a payment we are forwarding"""
   1646         key = payment_hash.hex()
   1647         with self.lock:
   1648             if key in self.payments:
   1649                 amount_msat, direction, status = self.payments[key]
   1650                 return PaymentInfo(payment_hash, amount_msat, direction, status)
   1651 
   1652     def save_payment_info(self, info: PaymentInfo) -> None:
   1653         key = info.payment_hash.hex()
   1654         assert info.status in SAVED_PR_STATUS
   1655         with self.lock:
   1656             self.payments[key] = info.amount_msat, info.direction, info.status
   1657         self.wallet.save_db()
   1658 
   1659     def check_received_mpp_htlc(self, payment_secret, short_channel_id, htlc: UpdateAddHtlc, expected_msat: int) -> Optional[bool]:
   1660         """ return MPP status: True (accepted), False (expired) or None """
   1661         payment_hash = htlc.payment_hash
   1662         is_expired, is_accepted, htlc_set = self.received_mpp_htlcs.get(payment_secret, (False, False, set()))
   1663         if self.get_payment_status(payment_hash) == PR_PAID:
   1664             # payment_status is persisted
   1665             is_accepted = True
   1666             is_expired = False
   1667         key = (short_channel_id, htlc)
   1668         if key not in htlc_set:
   1669             htlc_set.add(key)
   1670         if not is_accepted and not is_expired:
   1671             total = sum([_htlc.amount_msat for scid, _htlc in htlc_set])
   1672             first_timestamp = min([_htlc.timestamp for scid, _htlc in htlc_set])
   1673             if self.stopping_soon:
   1674                 is_expired = True  # try to time out pending HTLCs before shutting down
   1675             elif time.time() - first_timestamp > self.MPP_EXPIRY:
   1676                 is_expired = True
   1677             elif total == expected_msat:
   1678                 is_accepted = True
   1679         if is_accepted or is_expired:
   1680             htlc_set.remove(key)
   1681         if len(htlc_set) > 0:
   1682             self.received_mpp_htlcs[payment_secret] = is_expired, is_accepted, htlc_set
   1683         elif payment_secret in self.received_mpp_htlcs:
   1684             self.received_mpp_htlcs.pop(payment_secret)
   1685         return True if is_accepted else (False if is_expired else None)
   1686 
   1687     def get_payment_status(self, payment_hash: bytes) -> int:
   1688         info = self.get_payment_info(payment_hash)
   1689         return info.status if info else PR_UNPAID
   1690 
   1691     def get_invoice_status(self, invoice: LNInvoice) -> int:
   1692         key = invoice.rhash
   1693         log = self.logs[key]
   1694         if key in self.inflight_payments:
   1695             return PR_INFLIGHT
   1696         # status may be PR_FAILED
   1697         status = self.get_payment_status(bfh(key))
   1698         if status == PR_UNPAID and log:
   1699             status = PR_FAILED
   1700         return status
   1701 
   1702     def set_invoice_status(self, key: str, status: int) -> None:
   1703         if status == PR_INFLIGHT:
   1704             self.inflight_payments.add(key)
   1705         elif key in self.inflight_payments:
   1706             self.inflight_payments.remove(key)
   1707         if status in SAVED_PR_STATUS:
   1708             self.set_payment_status(bfh(key), status)
   1709         util.trigger_callback('invoice_status', self.wallet, key)
   1710 
   1711     def set_request_status(self, payment_hash: bytes, status: int) -> None:
   1712         if self.get_payment_status(payment_hash) != status:
   1713             self.set_payment_status(payment_hash, status)
   1714             util.trigger_callback('request_status', self.wallet, payment_hash.hex(), status)
   1715 
   1716     def set_payment_status(self, payment_hash: bytes, status: int) -> None:
   1717         info = self.get_payment_info(payment_hash)
   1718         if info is None:
   1719             # if we are forwarding
   1720             return
   1721         info = info._replace(status=status)
   1722         self.save_payment_info(info)
   1723 
   1724     def htlc_fulfilled(self, chan, payment_hash: bytes, htlc_id:int):
   1725         util.trigger_callback('htlc_fulfilled', payment_hash, chan.channel_id)
   1726         q = self.sent_htlcs.get(payment_hash)
   1727         if q:
   1728             route, payment_secret, amount_msat, bucket_msat, amount_receiver_msat = self.sent_htlcs_routes[(payment_hash, chan.short_channel_id, htlc_id)]
   1729             htlc_log = HtlcLog(
   1730                 success=True,
   1731                 route=route,
   1732                 amount_msat=amount_receiver_msat)
   1733             q.put_nowait(htlc_log)
   1734         else:
   1735             key = payment_hash.hex()
   1736             self.set_invoice_status(key, PR_PAID)
   1737             util.trigger_callback('payment_succeeded', self.wallet, key)
   1738 
   1739     def htlc_failed(
   1740             self,
   1741             chan: Channel,
   1742             payment_hash: bytes,
   1743             htlc_id: int,
   1744             error_bytes: Optional[bytes],
   1745             failure_message: Optional['OnionRoutingFailure']):
   1746 
   1747         util.trigger_callback('htlc_failed', payment_hash, chan.channel_id)
   1748         q = self.sent_htlcs.get(payment_hash)
   1749         if q:
   1750             # detect if it is part of a bucket
   1751             # if yes, wait until the bucket completely failed
   1752             key = (payment_hash, chan.short_channel_id, htlc_id)
   1753             route, payment_secret, amount_msat, bucket_msat, amount_receiver_msat = self.sent_htlcs_routes[key]
   1754             if error_bytes:
   1755                 # TODO "decode_onion_error" might raise, catch and maybe blacklist/penalise someone?
   1756                 try:
   1757                     failure_message, sender_idx = chan.decode_onion_error(error_bytes, route, htlc_id)
   1758                 except Exception as e:
   1759                     sender_idx = None
   1760                     failure_message = OnionRoutingFailure(-1, str(e))
   1761             else:
   1762                 # probably got "update_fail_malformed_htlc". well... who to penalise now?
   1763                 assert failure_message is not None
   1764                 sender_idx = None
   1765             self.logger.info(f"htlc_failed {failure_message}")
   1766 
   1767             # check sent_buckets if we use trampoline
   1768             if self.channel_db is None and payment_secret in self.sent_buckets:
   1769                 amount_sent, amount_failed = self.sent_buckets[payment_secret]
   1770                 amount_failed += amount_receiver_msat
   1771                 self.sent_buckets[payment_secret] = amount_sent, amount_failed
   1772                 if amount_sent != amount_failed:
   1773                     self.logger.info('bucket still active...')
   1774                     return
   1775                 self.logger.info('bucket failed')
   1776                 amount_receiver_msat = amount_sent
   1777 
   1778             htlc_log = HtlcLog(
   1779                 success=False,
   1780                 route=route,
   1781                 amount_msat=amount_receiver_msat,
   1782                 error_bytes=error_bytes,
   1783                 failure_msg=failure_message,
   1784                 sender_idx=sender_idx)
   1785             q.put_nowait(htlc_log)
   1786         else:
   1787             self.logger.info(f"received unknown htlc_failed, probably from previous session")
   1788             key = payment_hash.hex()
   1789             self.set_invoice_status(key, PR_UNPAID)
   1790             util.trigger_callback('payment_failed', self.wallet, key, '')
   1791 
   1792     async def _calc_routing_hints_for_invoice(self, amount_msat: Optional[int]):
   1793         """calculate routing hints (BOLT-11 'r' field)"""
   1794         routing_hints = []
   1795         channels = list(self.channels.values())
   1796         # do minimal filtering of channels.
   1797         # we include channels that cannot *right now* receive (e.g. peer disconnected or balance insufficient)
   1798         channels = [chan for chan in channels
   1799                     if (chan.is_open() and not chan.is_frozen_for_receiving())]
   1800         # cap max channels to include to keep QR code reasonably scannable
   1801         channels = sorted(channels, key=lambda chan: (not chan.is_active(), -chan.available_to_spend(REMOTE)))
   1802         channels = channels[:15]
   1803         random.shuffle(channels)  # let's not leak channel order
   1804         scid_to_my_channels = {chan.short_channel_id: chan for chan in channels
   1805                                if chan.short_channel_id is not None}
   1806         for chan in channels:
   1807             chan_id = chan.short_channel_id
   1808             assert isinstance(chan_id, bytes), chan_id
   1809             channel_info = get_mychannel_info(chan_id, scid_to_my_channels)
   1810             # note: as a fallback, if we don't have a channel update for the
   1811             # incoming direction of our private channel, we fill the invoice with garbage.
   1812             # the sender should still be able to pay us, but will incur an extra round trip
   1813             # (they will get the channel update from the onion error)
   1814             # at least, that's the theory. https://github.com/lightningnetwork/lnd/issues/2066
   1815             fee_base_msat = fee_proportional_millionths = 0
   1816             cltv_expiry_delta = 1  # lnd won't even try with zero
   1817             missing_info = True
   1818             if channel_info:
   1819                 policy = get_mychannel_policy(channel_info.short_channel_id, chan.node_id, scid_to_my_channels)
   1820                 if policy:
   1821                     fee_base_msat = policy.fee_base_msat
   1822                     fee_proportional_millionths = policy.fee_proportional_millionths
   1823                     cltv_expiry_delta = policy.cltv_expiry_delta
   1824                     missing_info = False
   1825             if missing_info:
   1826                 self.logger.info(
   1827                     f"Warning. Missing channel update for our channel {chan_id}; "
   1828                     f"filling invoice with incorrect data.")
   1829             routing_hints.append(('r', [(
   1830                 chan.node_id,
   1831                 chan_id,
   1832                 fee_base_msat,
   1833                 fee_proportional_millionths,
   1834                 cltv_expiry_delta)]))
   1835         return routing_hints
   1836 
   1837     def delete_payment(self, payment_hash_hex: str):
   1838         try:
   1839             with self.lock:
   1840                 del self.payments[payment_hash_hex]
   1841         except KeyError:
   1842             return
   1843         self.wallet.save_db()
   1844 
   1845     def get_balance(self):
   1846         with self.lock:
   1847             return Decimal(sum(
   1848                 chan.balance(LOCAL) if not chan.is_closed() else 0
   1849                 for chan in self.channels.values())) / 1000
   1850 
   1851     def num_sats_can_send(self) -> Decimal:
   1852         can_send = Decimal(0)
   1853         with self.lock:
   1854             if self.channels:
   1855                 for c in self.channels.values():
   1856                     if c.is_active() and not c.is_frozen_for_sending():
   1857                         can_send += Decimal(c.available_to_spend(LOCAL)) / 1000
   1858         return can_send
   1859 
   1860     def num_sats_can_receive(self) -> Decimal:
   1861         can_receive = Decimal(0)
   1862         with self.lock:
   1863             if self.channels:
   1864                 for c in self.channels.values():
   1865                     if c.is_active() and not c.is_frozen_for_receiving():
   1866                         can_receive += Decimal(c.available_to_spend(REMOTE)) / 1000
   1867         return can_receive
   1868 
   1869     def can_pay_invoice(self, invoice: LNInvoice) -> bool:
   1870         return invoice.get_amount_sat() <= self.num_sats_can_send()
   1871 
   1872     def can_receive_invoice(self, invoice: LNInvoice) -> bool:
   1873         return invoice.get_amount_sat() <= self.num_sats_can_receive()
   1874 
   1875     async def close_channel(self, chan_id):
   1876         chan = self._channels[chan_id]
   1877         peer = self._peers[chan.node_id]
   1878         return await peer.close_channel(chan_id)
   1879 
   1880     async def force_close_channel(self, chan_id):
   1881         # returns txid or raises
   1882         chan = self._channels[chan_id]
   1883         tx = chan.force_close_tx()
   1884         await self.network.broadcast_transaction(tx)
   1885         chan.set_state(ChannelState.FORCE_CLOSING)
   1886         return tx.txid()
   1887 
   1888     async def try_force_closing(self, chan_id):
   1889         # fails silently but sets the state, so that we will retry later
   1890         chan = self._channels[chan_id]
   1891         tx = chan.force_close_tx()
   1892         chan.set_state(ChannelState.FORCE_CLOSING)
   1893         await self.network.try_broadcasting(tx, 'force-close')
   1894 
   1895     def remove_channel(self, chan_id):
   1896         chan = self._channels[chan_id]
   1897         assert chan.get_state() == ChannelState.REDEEMED
   1898         with self.lock:
   1899             self._channels.pop(chan_id)
   1900             self.db.get('channels').pop(chan_id.hex())
   1901         for addr in chan.get_wallet_addresses_channel_might_want_reserved():
   1902             self.wallet.set_reserved_state_of_address(addr, reserved=False)
   1903 
   1904         util.trigger_callback('channels_updated', self.wallet)
   1905         util.trigger_callback('wallet_updated', self.wallet)
   1906 
   1907     @ignore_exceptions
   1908     @log_exceptions
   1909     async def reestablish_peer_for_given_channel(self, chan: Channel) -> None:
   1910         now = time.time()
   1911         peer_addresses = []
   1912         if not self.channel_db:
   1913             addr = trampolines_by_id().get(chan.node_id)
   1914             if addr:
   1915                 peer_addresses.append(addr)
   1916         else:
   1917             # will try last good address first, from gossip
   1918             last_good_addr = self.channel_db.get_last_good_address(chan.node_id)
   1919             if last_good_addr:
   1920                 peer_addresses.append(last_good_addr)
   1921             # will try addresses for node_id from gossip
   1922             addrs_from_gossip = self.channel_db.get_node_addresses(chan.node_id) or []
   1923             for host, port, ts in addrs_from_gossip:
   1924                 peer_addresses.append(LNPeerAddr(host, port, chan.node_id))
   1925         # will try addresses stored in channel storage
   1926         peer_addresses += list(chan.get_peer_addresses())
   1927         # Done gathering addresses.
   1928         # Now select first one that has not failed recently.
   1929         for peer in peer_addresses:
   1930             if self._can_retry_addr(peer, urgent=True, now=now):
   1931                 await self._add_peer(peer.host, peer.port, peer.pubkey)
   1932                 return
   1933 
   1934     async def reestablish_peers_and_channels(self):
   1935         while True:
   1936             await asyncio.sleep(1)
   1937             if self.stopping_soon:
   1938                 return
   1939             for chan in self.channels.values():
   1940                 if chan.is_closed():
   1941                     continue
   1942                 # reestablish
   1943                 if not chan.should_try_to_reestablish_peer():
   1944                     continue
   1945                 peer = self._peers.get(chan.node_id, None)
   1946                 if peer:
   1947                     await peer.taskgroup.spawn(peer.reestablish_channel(chan))
   1948                 else:
   1949                     await self.taskgroup.spawn(self.reestablish_peer_for_given_channel(chan))
   1950 
   1951     def current_feerate_per_kw(self):
   1952         from .simple_config import FEE_LN_ETA_TARGET, FEERATE_FALLBACK_STATIC_FEE, FEERATE_REGTEST_HARDCODED
   1953         if constants.net is constants.BitcoinRegtest:
   1954             return FEERATE_REGTEST_HARDCODED // 4
   1955         feerate_per_kvbyte = self.network.config.eta_target_to_fee(FEE_LN_ETA_TARGET)
   1956         if feerate_per_kvbyte is None:
   1957             feerate_per_kvbyte = FEERATE_FALLBACK_STATIC_FEE
   1958         return max(253, feerate_per_kvbyte // 4)
   1959 
   1960     def create_channel_backup(self, channel_id):
   1961         chan = self._channels[channel_id]
   1962         # do not backup old-style channels
   1963         assert chan.is_static_remotekey_enabled()
   1964         peer_addresses = list(chan.get_peer_addresses())
   1965         peer_addr = peer_addresses[0]
   1966         return ChannelBackupStorage(
   1967             node_id = chan.node_id,
   1968             privkey = self.node_keypair.privkey,
   1969             funding_txid = chan.funding_outpoint.txid,
   1970             funding_index = chan.funding_outpoint.output_index,
   1971             funding_address = chan.get_funding_address(),
   1972             host = peer_addr.host,
   1973             port = peer_addr.port,
   1974             is_initiator = chan.constraints.is_initiator,
   1975             channel_seed = chan.config[LOCAL].channel_seed,
   1976             local_delay = chan.config[LOCAL].to_self_delay,
   1977             remote_delay = chan.config[REMOTE].to_self_delay,
   1978             remote_revocation_pubkey = chan.config[REMOTE].revocation_basepoint.pubkey,
   1979             remote_payment_pubkey = chan.config[REMOTE].payment_basepoint.pubkey)
   1980 
   1981     def export_channel_backup(self, channel_id):
   1982         xpub = self.wallet.get_fingerprint()
   1983         backup_bytes = self.create_channel_backup(channel_id).to_bytes()
   1984         assert backup_bytes == ChannelBackupStorage.from_bytes(backup_bytes).to_bytes(), "roundtrip failed"
   1985         encrypted = pw_encode_with_version_and_mac(backup_bytes, xpub)
   1986         assert backup_bytes == pw_decode_with_version_and_mac(encrypted, xpub), "encrypt failed"
   1987         return 'channel_backup:' + encrypted
   1988 
   1989     async def request_remote_force_close(
   1990             self, *, funding_txid: str, funding_index: int, connect_str: str):
   1991         """
   1992         Requests the remote to force close a channel. Can be used without
   1993         having state or any backup for the channel.
   1994         Assumes that channel was originally opened with the same local peer (node_keypair).
   1995         Kept for console use.
   1996 
   1997         Example:
   1998         network.run_from_another_thread(wallet.lnworker.request_remote_force_close(funding_txid="11a3b391bc99dbca0b2be4fdd8f18ca641896c81ae4d9596b30cbf1eef17af71", funding_index=1, connect_str="023a8dfe081c6bbd0504e599f33d39d17687de63023a8b20afcb59147d9d77c19d"))
   1999         """
   2000         channel_id = lnutil.channel_id_from_funding_tx(funding_txid, funding_index)[0]
   2001         peer = await self.add_peer(connect_str)
   2002         await peer.trigger_force_close(channel_id)
   2003 
   2004     def import_channel_backup(self, data):
   2005         assert data.startswith('channel_backup:')
   2006         encrypted = data[15:]
   2007         xpub = self.wallet.get_fingerprint()
   2008         decrypted = pw_decode_with_version_and_mac(encrypted, xpub)
   2009         cb_storage = ChannelBackupStorage.from_bytes(decrypted)
   2010         channel_id = cb_storage.channel_id()
   2011         if channel_id.hex() in self.db.get_dict("channels"):
   2012             raise Exception('Channel already in wallet')
   2013         self.logger.info(f'importing channel backup: {channel_id.hex()}')
   2014         cb = ChannelBackup(cb_storage, sweep_address=self.sweep_address, lnworker=self)
   2015         d = self.db.get_dict("channel_backups")
   2016         d[channel_id.hex()] = cb_storage
   2017         with self.lock:
   2018             self._channel_backups[channel_id] = cb
   2019         self.wallet.save_db()
   2020         util.trigger_callback('channels_updated', self.wallet)
   2021         self.lnwatcher.add_channel(cb.funding_outpoint.to_str(), cb.get_funding_address())
   2022 
   2023     def remove_channel_backup(self, channel_id):
   2024         d = self.db.get_dict("channel_backups")
   2025         if channel_id.hex() not in d:
   2026             raise Exception('Channel not found')
   2027         with self.lock:
   2028             d.pop(channel_id.hex())
   2029             self._channel_backups.pop(channel_id)
   2030         self.wallet.save_db()
   2031         util.trigger_callback('channels_updated', self.wallet)
   2032 
   2033     @log_exceptions
   2034     async def request_force_close_from_backup(self, channel_id: bytes):
   2035         cb = self.channel_backups.get(channel_id)
   2036         if not cb:
   2037             raise Exception(f'channel backup not found {self.channel_backups}')
   2038         cb = cb.cb # storage
   2039         self.logger.info(f'requesting channel force close: {channel_id.hex()}')
   2040         # TODO also try network addresses from gossip db (as it might have changed)
   2041         peer_addr = LNPeerAddr(cb.host, cb.port, cb.node_id)
   2042         transport = LNTransport(cb.privkey, peer_addr, proxy=self.network.proxy)
   2043         peer = Peer(self, cb.node_id, transport, is_channel_backup=True)
   2044         async with TaskGroup(wait=any) as group:
   2045             await group.spawn(peer._message_loop())
   2046             await group.spawn(peer.trigger_force_close(channel_id))
   2047         return True