lnrater.py (11416B)
1 # Copyright (C) 2020 The Electrum developers 2 # Distributed under the MIT software license, see the accompanying 3 # file LICENCE or http://www.opensource.org/licenses/mit-license.php 4 """ 5 lnrater.py contains Lightning Network node rating functionality. 6 """ 7 8 import asyncio 9 from collections import defaultdict 10 from pprint import pformat 11 from random import choices 12 from statistics import mean, median, stdev 13 from typing import TYPE_CHECKING, Dict, NamedTuple, Tuple, List, Optional 14 import sys 15 import time 16 17 if sys.version_info[:2] >= (3, 7): 18 from asyncio import get_running_loop 19 else: 20 from asyncio import _get_running_loop as get_running_loop # noqa: F401 21 22 from .logging import Logger 23 from .util import profiler 24 from .lnrouter import fee_for_edge_msat 25 from .lnutil import LnFeatures, ln_compare_features, IncompatibleLightningFeatures 26 27 if TYPE_CHECKING: 28 from .network import Network 29 from .channel_db import Policy, NodeInfo 30 from .lnchannel import ShortChannelID 31 from .lnworker import LNWallet 32 33 34 MONTH_IN_BLOCKS = 6 * 24 * 30 35 # the scores are only updated after this time interval 36 RATER_UPDATE_TIME_SEC = 10 * 60 37 # amount used for calculating an effective relative fee 38 FEE_AMOUNT_MSAT = 100_000_000 39 40 # define some numbers for minimal requirements of good nodes 41 # exclude nodes with less number of channels 42 EXCLUDE_NUM_CHANNELS = 15 43 # exclude nodes with less mean capacity 44 EXCLUDE_MEAN_CAPACITY_MSAT = 1_000_000_000 45 # exclude nodes which are young 46 EXCLUDE_NODE_AGE = 2 * MONTH_IN_BLOCKS 47 # exclude nodes which have young mean channel age 48 EXCLUDE_MEAN_CHANNEL_AGE = EXCLUDE_NODE_AGE 49 # exclude nodes which charge a high fee 50 EXCLUCE_EFFECTIVE_FEE_RATE = 0.001500 51 # exclude nodes whose last channel open was a long time ago 52 EXCLUDE_BLOCKS_LAST_CHANNEL = 3 * MONTH_IN_BLOCKS 53 54 55 class NodeStats(NamedTuple): 56 number_channels: int 57 # capacity related 58 total_capacity_msat: int 59 median_capacity_msat: float 60 mean_capacity_msat: float 61 # block height related 62 node_age_block_height: int 63 mean_channel_age_block_height: float 64 blocks_since_last_channel: int 65 # fees 66 mean_fee_rate: float 67 68 69 def weighted_sum(numbers: List[float], weights: List[float]) -> float: 70 running_sum = 0.0 71 for n, w in zip(numbers, weights): 72 running_sum += n * w 73 return running_sum/sum(weights) 74 75 76 class LNRater(Logger): 77 def __init__(self, lnworker: 'LNWallet', network: 'Network'): 78 """LNRater can be used to suggest nodes to open up channels with. 79 80 The graph is analyzed and some heuristics are applied to sort out nodes 81 that are deemed to be bad routers or unmaintained. 82 """ 83 Logger.__init__(self) 84 self.lnworker = lnworker 85 self.network = network 86 87 self._node_stats: Dict[bytes, NodeStats] = {} # node_id -> NodeStats 88 self._node_ratings: Dict[bytes, float] = {} # node_id -> float 89 self._policies_by_nodes: Dict[bytes, List[Tuple[ShortChannelID, Policy]]] = defaultdict(list) # node_id -> (short_channel_id, policy) 90 self._last_analyzed = 0 # timestamp 91 self._last_progress_percent = 0 92 93 def maybe_analyze_graph(self): 94 loop = asyncio.get_event_loop() 95 fut = asyncio.run_coroutine_threadsafe(self._maybe_analyze_graph(), loop) 96 fut.result() 97 98 def analyze_graph(self): 99 """Forces a graph analysis, e.g., due to external triggers like 100 the graph info reaching 50%.""" 101 loop = asyncio.get_event_loop() 102 fut = asyncio.run_coroutine_threadsafe(self._analyze_graph(), loop) 103 fut.result() 104 105 async def _maybe_analyze_graph(self): 106 """Analyzes the graph when in early sync stage (>30%) or when caching 107 time expires.""" 108 # gather information about graph sync status 109 current_channels, total, progress_percent = self.network.lngossip.get_sync_progress_estimate() 110 111 # gossip sync progress state could be None when not started, but channel 112 # db already knows something about the graph, which is why we allow to 113 # evaluate the graph early 114 if progress_percent is not None or self.network.channel_db.num_nodes > 500: 115 progress_percent = progress_percent or 0 # convert None to 0 116 now = time.time() 117 # graph should have changed significantly during the sync progress 118 # or last analysis was a long time ago 119 if (30 <= progress_percent and progress_percent - self._last_progress_percent >= 10 or 120 self._last_analyzed + RATER_UPDATE_TIME_SEC < now): 121 await self._analyze_graph() 122 self._last_progress_percent = progress_percent 123 self._last_analyzed = now 124 125 async def _analyze_graph(self): 126 await self.network.channel_db.data_loaded.wait() 127 self._collect_policies_by_node() 128 loop = get_running_loop() 129 # the analysis is run in an executor because it's costly 130 await loop.run_in_executor(None, self._collect_purged_stats) 131 self._rate_nodes() 132 now = time.time() 133 self._last_analyzed = now 134 135 def _collect_policies_by_node(self): 136 policies = self.network.channel_db.get_node_policies() 137 for pv, p in policies.items(): 138 # append tuples of ShortChannelID and Policy 139 self._policies_by_nodes[pv[0]].append((pv[1], p)) 140 141 @profiler 142 def _collect_purged_stats(self): 143 """Traverses through the graph and sorts out nodes.""" 144 current_height = self.network.get_local_height() 145 node_infos = self.network.channel_db.get_node_infos() 146 147 for n, channel_policies in self._policies_by_nodes.items(): 148 try: 149 # use policies synonymously to channels 150 num_channels = len(channel_policies) 151 152 # save some time for nodes we are not interested in: 153 if num_channels < EXCLUDE_NUM_CHANNELS: 154 continue 155 156 # analyze block heights 157 block_heights = [p[0].block_height for p in channel_policies] 158 node_age_bh = current_height - min(block_heights) 159 if node_age_bh < EXCLUDE_NODE_AGE: 160 continue 161 mean_channel_age_bh = current_height - mean(block_heights) 162 if mean_channel_age_bh < EXCLUDE_MEAN_CHANNEL_AGE: 163 continue 164 blocks_since_last_channel = current_height - max(block_heights) 165 if blocks_since_last_channel > EXCLUDE_BLOCKS_LAST_CHANNEL: 166 continue 167 168 # analyze capacities 169 capacities = [p[1].htlc_maximum_msat for p in channel_policies] 170 if None in capacities: 171 continue 172 total_capacity = sum(capacities) 173 174 mean_capacity = total_capacity / num_channels if num_channels else 0 175 if mean_capacity < EXCLUDE_MEAN_CAPACITY_MSAT: 176 continue 177 median_capacity = median(capacities) 178 179 # analyze fees 180 effective_fee_rates = [fee_for_edge_msat( 181 FEE_AMOUNT_MSAT, 182 p[1].fee_base_msat, 183 p[1].fee_proportional_millionths) / FEE_AMOUNT_MSAT for p in channel_policies] 184 mean_fees_rate = mean(effective_fee_rates) 185 if mean_fees_rate > EXCLUCE_EFFECTIVE_FEE_RATE: 186 continue 187 188 self._node_stats[n] = NodeStats( 189 number_channels=num_channels, 190 total_capacity_msat=total_capacity, 191 median_capacity_msat=median_capacity, 192 mean_capacity_msat=mean_capacity, 193 node_age_block_height=node_age_bh, 194 mean_channel_age_block_height=mean_channel_age_bh, 195 blocks_since_last_channel=blocks_since_last_channel, 196 mean_fee_rate=mean_fees_rate 197 ) 198 199 except Exception as e: 200 self.logger.exception("Could not use channel policies for " 201 "calculating statistics.") 202 self.logger.debug(pformat(channel_policies)) 203 continue 204 205 self.logger.info(f"node statistics done, calculated statistics" 206 f"for {len(self._node_stats)} nodes") 207 208 def _rate_nodes(self): 209 """Rate nodes by collected statistics.""" 210 211 max_capacity = 0 212 max_num_chan = 0 213 min_fee_rate = float('inf') 214 for stats in self._node_stats.values(): 215 max_capacity = max(max_capacity, stats.total_capacity_msat) 216 max_num_chan = max(max_num_chan, stats.number_channels) 217 min_fee_rate = min(min_fee_rate, stats.mean_fee_rate) 218 219 for n, stats in self._node_stats.items(): 220 heuristics = [] 221 heuristics_weights = [] 222 223 # Construct an average score which leads to recommendation of nodes 224 # with low fees, large capacity and reasonable number of channels. 225 # This is somewhat akin to preferential attachment, but low fee 226 # nodes are more favored. Here we make a compromise between user 227 # comfort and decentralization, tending towards user comfort. 228 229 # number of channels 230 heuristics.append(stats.number_channels / max_num_chan) 231 heuristics_weights.append(0.2) 232 # total capacity 233 heuristics.append(stats.total_capacity_msat / max_capacity) 234 heuristics_weights.append(0.8) 235 # inverse fees 236 fees = min(1E-6, min_fee_rate) / max(1E-10, stats.mean_fee_rate) 237 heuristics.append(fees) 238 heuristics_weights.append(1.0) 239 240 self._node_ratings[n] = weighted_sum(heuristics, heuristics_weights) 241 242 def suggest_node_channel_open(self) -> Tuple[bytes, NodeStats]: 243 node_keys = list(self._node_stats.keys()) 244 node_ratings = list(self._node_ratings.values()) 245 channel_peers = self.lnworker.channel_peers() 246 node_info: Optional["NodeInfo"] = None 247 248 while True: 249 # randomly pick nodes weighted by node_rating 250 pk = choices(node_keys, weights=node_ratings, k=1)[0] 251 # node should have compatible features 252 node_info = self.network.channel_db.get_node_infos().get(pk, None) 253 peer_features = LnFeatures(node_info.features) 254 try: 255 ln_compare_features(self.lnworker.features, peer_features) 256 except IncompatibleLightningFeatures as e: 257 self.logger.info("suggested node is incompatible") 258 continue 259 260 # don't want to connect to nodes we are already connected to 261 if pk not in channel_peers: 262 break 263 264 alias = node_info.alias if node_info else 'unknown node alias' 265 self.logger.info( 266 f"node rating for {alias}:\n" 267 f"{pformat(self._node_stats[pk])} (score {self._node_ratings[pk]})") 268 269 return pk, self._node_stats[pk] 270 271 def suggest_peer(self) -> Optional[bytes]: 272 """Suggests a LN node to open a channel with. 273 Returns a node ID (pubkey). 274 """ 275 self.maybe_analyze_graph() 276 if self._node_ratings: 277 return self.suggest_node_channel_open()[0] 278 else: 279 return None