simple_config.py (25073B)
1 import json 2 import threading 3 import time 4 import os 5 import stat 6 import ssl 7 from decimal import Decimal 8 from typing import Union, Optional, Dict, Sequence, Tuple 9 from numbers import Real 10 11 from copy import deepcopy 12 from aiorpcx import NetAddress 13 14 from . import util 15 from . import constants 16 from .util import base_units, base_unit_name_to_decimal_point, decimal_point_to_base_unit_name, UnknownBaseUnit, DECIMAL_POINT_DEFAULT 17 from .util import format_satoshis, format_fee_satoshis 18 from .util import user_dir, make_dir, NoDynamicFeeEstimates, quantize_feerate 19 from .i18n import _ 20 from .logging import get_logger, Logger 21 22 23 FEE_ETA_TARGETS = [25, 10, 5, 2] 24 FEE_DEPTH_TARGETS = [10000000, 5000000, 2000000, 1000000, 500000, 200000, 100000] 25 FEE_LN_ETA_TARGET = 2 # note: make sure the network is asking for estimates for this target 26 27 # satoshi per kbyte 28 FEERATE_MAX_DYNAMIC = 1500000 29 FEERATE_WARNING_HIGH_FEE = 600000 30 FEERATE_FALLBACK_STATIC_FEE = 150000 31 FEERATE_DEFAULT_RELAY = 1000 32 FEERATE_MAX_RELAY = 50000 33 FEERATE_STATIC_VALUES = [1000, 2000, 5000, 10000, 20000, 30000, 34 50000, 70000, 100000, 150000, 200000, 300000] 35 FEERATE_REGTEST_HARDCODED = 180000 # for eclair compat 36 37 FEE_RATIO_HIGH_WARNING = 0.05 # warn user if fee/amount for on-chain tx is higher than this 38 39 40 _logger = get_logger(__name__) 41 42 43 FINAL_CONFIG_VERSION = 3 44 45 46 class SimpleConfig(Logger): 47 """ 48 The SimpleConfig class is responsible for handling operations involving 49 configuration files. 50 51 There are two different sources of possible configuration values: 52 1. Command line options. 53 2. User configuration (in the user's config directory) 54 They are taken in order (1. overrides config options set in 2.) 55 """ 56 57 def __init__(self, options=None, read_user_config_function=None, 58 read_user_dir_function=None): 59 if options is None: 60 options = {} 61 62 Logger.__init__(self) 63 64 # This lock needs to be acquired for updating and reading the config in 65 # a thread-safe way. 66 self.lock = threading.RLock() 67 68 self.mempool_fees = None # type: Optional[Sequence[Tuple[Union[float, int], int]]] 69 self.fee_estimates = {} 70 self.fee_estimates_last_updated = {} 71 self.last_time_fee_estimates_requested = 0 # zero ensures immediate fees 72 73 # The following two functions are there for dependency injection when 74 # testing. 75 if read_user_config_function is None: 76 read_user_config_function = read_user_config 77 if read_user_dir_function is None: 78 self.user_dir = user_dir 79 else: 80 self.user_dir = read_user_dir_function 81 82 # The command line options 83 self.cmdline_options = deepcopy(options) 84 # don't allow to be set on CLI: 85 self.cmdline_options.pop('config_version', None) 86 87 # Set self.path and read the user config 88 self.user_config = {} # for self.get in electrum_path() 89 self.path = self.electrum_path() 90 self.user_config = read_user_config_function(self.path) 91 if not self.user_config: 92 # avoid new config getting upgraded 93 self.user_config = {'config_version': FINAL_CONFIG_VERSION} 94 95 self._not_modifiable_keys = set() 96 97 # config "upgrade" - CLI options 98 self.rename_config_keys( 99 self.cmdline_options, {'auto_cycle': 'auto_connect'}, True) 100 101 # config upgrade - user config 102 if self.requires_upgrade(): 103 self.upgrade() 104 105 self._check_dependent_keys() 106 107 # units and formatting 108 self.decimal_point = self.get('decimal_point', DECIMAL_POINT_DEFAULT) 109 try: 110 decimal_point_to_base_unit_name(self.decimal_point) 111 except UnknownBaseUnit: 112 self.decimal_point = DECIMAL_POINT_DEFAULT 113 self.num_zeros = int(self.get('num_zeros', 0)) 114 115 def electrum_path(self): 116 # Read electrum_path from command line 117 # Otherwise use the user's default data directory. 118 path = self.get('electrum_path') 119 if path is None: 120 path = self.user_dir() 121 122 make_dir(path, allow_symlink=False) 123 if self.get('testnet'): 124 path = os.path.join(path, 'testnet') 125 make_dir(path, allow_symlink=False) 126 elif self.get('regtest'): 127 path = os.path.join(path, 'regtest') 128 make_dir(path, allow_symlink=False) 129 elif self.get('simnet'): 130 path = os.path.join(path, 'simnet') 131 make_dir(path, allow_symlink=False) 132 133 self.logger.info(f"electrum directory {path}") 134 return path 135 136 def rename_config_keys(self, config, keypairs, deprecation_warning=False): 137 """Migrate old key names to new ones""" 138 updated = False 139 for old_key, new_key in keypairs.items(): 140 if old_key in config: 141 if new_key not in config: 142 config[new_key] = config[old_key] 143 if deprecation_warning: 144 self.logger.warning('Note that the {} variable has been deprecated. ' 145 'You should use {} instead.'.format(old_key, new_key)) 146 del config[old_key] 147 updated = True 148 return updated 149 150 def set_key(self, key, value, save=True): 151 if not self.is_modifiable(key): 152 self.logger.warning(f"not changing config key '{key}' set on the command line") 153 return 154 try: 155 json.dumps(key) 156 json.dumps(value) 157 except: 158 self.logger.info(f"json error: cannot save {repr(key)} ({repr(value)})") 159 return 160 self._set_key_in_user_config(key, value, save) 161 162 def _set_key_in_user_config(self, key, value, save=True): 163 with self.lock: 164 if value is not None: 165 self.user_config[key] = value 166 else: 167 self.user_config.pop(key, None) 168 if save: 169 self.save_user_config() 170 171 def get(self, key, default=None): 172 with self.lock: 173 out = self.cmdline_options.get(key) 174 if out is None: 175 out = self.user_config.get(key, default) 176 return out 177 178 def _check_dependent_keys(self) -> None: 179 if self.get('serverfingerprint'): 180 if not self.get('server'): 181 raise Exception("config key 'serverfingerprint' requires 'server' to also be set") 182 self.make_key_not_modifiable('server') 183 184 def requires_upgrade(self): 185 return self.get_config_version() < FINAL_CONFIG_VERSION 186 187 def upgrade(self): 188 with self.lock: 189 self.logger.info('upgrading config') 190 191 self.convert_version_2() 192 self.convert_version_3() 193 194 self.set_key('config_version', FINAL_CONFIG_VERSION, save=True) 195 196 def convert_version_2(self): 197 if not self._is_upgrade_method_needed(1, 1): 198 return 199 200 self.rename_config_keys(self.user_config, {'auto_cycle': 'auto_connect'}) 201 202 try: 203 # change server string FROM host:port:proto TO host:port:s 204 server_str = self.user_config.get('server') 205 host, port, protocol = str(server_str).rsplit(':', 2) 206 assert protocol in ('s', 't') 207 int(port) # Throw if cannot be converted to int 208 server_str = '{}:{}:s'.format(host, port) 209 self._set_key_in_user_config('server', server_str) 210 except BaseException: 211 self._set_key_in_user_config('server', None) 212 213 self.set_key('config_version', 2) 214 215 def convert_version_3(self): 216 if not self._is_upgrade_method_needed(2, 2): 217 return 218 219 base_unit = self.user_config.get('base_unit') 220 if isinstance(base_unit, str): 221 self._set_key_in_user_config('base_unit', None) 222 map_ = {'btc':8, 'mbtc':5, 'ubtc':2, 'bits':2, 'sat':0} 223 decimal_point = map_.get(base_unit.lower()) 224 self._set_key_in_user_config('decimal_point', decimal_point) 225 226 self.set_key('config_version', 3) 227 228 def _is_upgrade_method_needed(self, min_version, max_version): 229 cur_version = self.get_config_version() 230 if cur_version > max_version: 231 return False 232 elif cur_version < min_version: 233 raise Exception( 234 ('config upgrade: unexpected version %d (should be %d-%d)' 235 % (cur_version, min_version, max_version))) 236 else: 237 return True 238 239 def get_config_version(self): 240 config_version = self.get('config_version', 1) 241 if config_version > FINAL_CONFIG_VERSION: 242 self.logger.warning('config version ({}) is higher than latest ({})' 243 .format(config_version, FINAL_CONFIG_VERSION)) 244 return config_version 245 246 def is_modifiable(self, key) -> bool: 247 return (key not in self.cmdline_options 248 and key not in self._not_modifiable_keys) 249 250 def make_key_not_modifiable(self, key) -> None: 251 self._not_modifiable_keys.add(key) 252 253 def save_user_config(self): 254 if self.get('forget_config'): 255 return 256 if not self.path: 257 return 258 path = os.path.join(self.path, "config") 259 s = json.dumps(self.user_config, indent=4, sort_keys=True) 260 try: 261 with open(path, "w", encoding='utf-8') as f: 262 f.write(s) 263 os.chmod(path, stat.S_IREAD | stat.S_IWRITE) 264 except FileNotFoundError: 265 # datadir probably deleted while running... 266 if os.path.exists(self.path): # or maybe not? 267 raise 268 269 def get_wallet_path(self, *, use_gui_last_wallet=False): 270 """Set the path of the wallet.""" 271 272 # command line -w option 273 if self.get('wallet_path'): 274 return os.path.join(self.get('cwd', ''), self.get('wallet_path')) 275 276 if use_gui_last_wallet: 277 path = self.get('gui_last_wallet') 278 if path and os.path.exists(path): 279 return path 280 281 # default path 282 util.assert_datadir_available(self.path) 283 dirpath = os.path.join(self.path, "wallets") 284 make_dir(dirpath, allow_symlink=False) 285 286 new_path = os.path.join(self.path, "wallets", "default_wallet") 287 288 # default path in pre 1.9 versions 289 old_path = os.path.join(self.path, "electrum.dat") 290 if os.path.exists(old_path) and not os.path.exists(new_path): 291 os.rename(old_path, new_path) 292 293 return new_path 294 295 def remove_from_recently_open(self, filename): 296 recent = self.get('recently_open', []) 297 if filename in recent: 298 recent.remove(filename) 299 self.set_key('recently_open', recent) 300 301 def set_session_timeout(self, seconds): 302 self.logger.info(f"session timeout -> {seconds} seconds") 303 self.set_key('session_timeout', seconds) 304 305 def get_session_timeout(self): 306 return self.get('session_timeout', 300) 307 308 def save_last_wallet(self, wallet): 309 if self.get('wallet_path') is None: 310 path = wallet.storage.path 311 self.set_key('gui_last_wallet', path) 312 313 def impose_hard_limits_on_fee(func): 314 def get_fee_within_limits(self, *args, **kwargs): 315 fee = func(self, *args, **kwargs) 316 if fee is None: 317 return fee 318 fee = min(FEERATE_MAX_DYNAMIC, fee) 319 fee = max(FEERATE_DEFAULT_RELAY, fee) 320 return fee 321 return get_fee_within_limits 322 323 def eta_to_fee(self, slider_pos) -> Optional[int]: 324 """Returns fee in sat/kbyte.""" 325 slider_pos = max(slider_pos, 0) 326 slider_pos = min(slider_pos, len(FEE_ETA_TARGETS)) 327 if slider_pos < len(FEE_ETA_TARGETS): 328 num_blocks = FEE_ETA_TARGETS[int(slider_pos)] 329 fee = self.eta_target_to_fee(num_blocks) 330 else: 331 fee = self.eta_target_to_fee(1) 332 return fee 333 334 @impose_hard_limits_on_fee 335 def eta_target_to_fee(self, num_blocks: int) -> Optional[int]: 336 """Returns fee in sat/kbyte.""" 337 if num_blocks == 1: 338 fee = self.fee_estimates.get(2) 339 if fee is not None: 340 fee += fee / 2 341 fee = int(fee) 342 else: 343 fee = self.fee_estimates.get(num_blocks) 344 if fee is not None: 345 fee = int(fee) 346 return fee 347 348 def fee_to_depth(self, target_fee: Real) -> Optional[int]: 349 """For a given sat/vbyte fee, returns an estimate of how deep 350 it would be in the current mempool in vbytes. 351 Pessimistic == overestimates the depth. 352 """ 353 if self.mempool_fees is None: 354 return None 355 depth = 0 356 for fee, s in self.mempool_fees: 357 depth += s 358 if fee <= target_fee: 359 break 360 return depth 361 362 def depth_to_fee(self, slider_pos) -> Optional[int]: 363 """Returns fee in sat/kbyte.""" 364 target = self.depth_target(slider_pos) 365 return self.depth_target_to_fee(target) 366 367 @impose_hard_limits_on_fee 368 def depth_target_to_fee(self, target: int) -> Optional[int]: 369 """Returns fee in sat/kbyte. 370 target: desired mempool depth in vbytes 371 """ 372 if self.mempool_fees is None: 373 return None 374 depth = 0 375 for fee, s in self.mempool_fees: 376 depth += s 377 if depth > target: 378 break 379 else: 380 return 0 381 # add one sat/byte as currently that is 382 # the max precision of the histogram 383 # (well, in case of ElectrumX at least. not for electrs) 384 fee += 1 385 # convert to sat/kbyte 386 return int(fee * 1000) 387 388 def depth_target(self, slider_pos: int) -> int: 389 """Returns mempool depth target in bytes for a fee slider position.""" 390 slider_pos = max(slider_pos, 0) 391 slider_pos = min(slider_pos, len(FEE_DEPTH_TARGETS)-1) 392 return FEE_DEPTH_TARGETS[slider_pos] 393 394 def eta_target(self, slider_pos: int) -> int: 395 """Returns 'num blocks' ETA target for a fee slider position.""" 396 if slider_pos == len(FEE_ETA_TARGETS): 397 return 1 398 return FEE_ETA_TARGETS[slider_pos] 399 400 def fee_to_eta(self, fee_per_kb: int) -> int: 401 """Returns 'num blocks' ETA estimate for given fee rate, 402 or -1 for low fee. 403 """ 404 import operator 405 lst = list(self.fee_estimates.items()) + [(1, self.eta_to_fee(len(FEE_ETA_TARGETS)))] 406 dist = map(lambda x: (x[0], abs(x[1] - fee_per_kb)), lst) 407 min_target, min_value = min(dist, key=operator.itemgetter(1)) 408 if fee_per_kb < self.fee_estimates.get(FEE_ETA_TARGETS[0])/2: 409 min_target = -1 410 return min_target 411 412 def depth_tooltip(self, depth: Optional[int]) -> str: 413 """Returns text tooltip for given mempool depth (in vbytes).""" 414 if depth is None: 415 return "unknown from tip" 416 return "%.1f MB from tip" % (depth/1_000_000) 417 418 def eta_tooltip(self, x): 419 if x < 0: 420 return _('Low fee') 421 elif x == 1: 422 return _('In the next block') 423 else: 424 return _('Within {} blocks').format(x) 425 426 def get_fee_target(self): 427 dyn = self.is_dynfee() 428 mempool = self.use_mempool_fees() 429 pos = self.get_depth_level() if mempool else self.get_fee_level() 430 fee_rate = self.fee_per_kb() 431 target, tooltip = self.get_fee_text(pos, dyn, mempool, fee_rate) 432 return target, tooltip, dyn 433 434 def get_fee_status(self): 435 target, tooltip, dyn = self.get_fee_target() 436 return tooltip + ' [%s]'%target if dyn else target + ' [Static]' 437 438 def get_fee_text( 439 self, 440 slider_pos: int, 441 dyn: bool, 442 mempool: bool, 443 fee_per_kb: Optional[int], 444 ): 445 """Returns (text, tooltip) where 446 text is what we target: static fee / num blocks to confirm in / mempool depth 447 tooltip is the corresponding estimate (e.g. num blocks for a static fee) 448 449 fee_rate is in sat/kbyte 450 """ 451 if fee_per_kb is None: 452 rate_str = 'unknown' 453 fee_per_byte = None 454 else: 455 fee_per_byte = fee_per_kb/1000 456 rate_str = format_fee_satoshis(fee_per_byte) + ' sat/byte' 457 458 if dyn: 459 if mempool: 460 depth = self.depth_target(slider_pos) 461 text = self.depth_tooltip(depth) 462 else: 463 eta = self.eta_target(slider_pos) 464 text = self.eta_tooltip(eta) 465 tooltip = rate_str 466 else: # using static fees 467 assert fee_per_kb is not None 468 assert fee_per_byte is not None 469 text = rate_str 470 if mempool and self.has_fee_mempool(): 471 depth = self.fee_to_depth(fee_per_byte) 472 tooltip = self.depth_tooltip(depth) 473 elif not mempool and self.has_fee_etas(): 474 eta = self.fee_to_eta(fee_per_kb) 475 tooltip = self.eta_tooltip(eta) 476 else: 477 tooltip = '' 478 return text, tooltip 479 480 def get_depth_level(self): 481 maxp = len(FEE_DEPTH_TARGETS) - 1 482 return min(maxp, self.get('depth_level', 2)) 483 484 def get_fee_level(self): 485 maxp = len(FEE_ETA_TARGETS) # not (-1) to have "next block" 486 return min(maxp, self.get('fee_level', 2)) 487 488 def get_fee_slider(self, dyn, mempool) -> Tuple[int, int, Optional[int]]: 489 if dyn: 490 if mempool: 491 pos = self.get_depth_level() 492 maxp = len(FEE_DEPTH_TARGETS) - 1 493 fee_rate = self.depth_to_fee(pos) 494 else: 495 pos = self.get_fee_level() 496 maxp = len(FEE_ETA_TARGETS) # not (-1) to have "next block" 497 fee_rate = self.eta_to_fee(pos) 498 else: 499 fee_rate = self.fee_per_kb(dyn=False) 500 pos = self.static_fee_index(fee_rate) 501 maxp = len(FEERATE_STATIC_VALUES) - 1 502 return maxp, pos, fee_rate 503 504 def static_fee(self, i): 505 return FEERATE_STATIC_VALUES[i] 506 507 def static_fee_index(self, value) -> int: 508 if value is None: 509 raise TypeError('static fee cannot be None') 510 dist = list(map(lambda x: abs(x - value), FEERATE_STATIC_VALUES)) 511 return min(range(len(dist)), key=dist.__getitem__) 512 513 def has_fee_etas(self): 514 return len(self.fee_estimates) == 4 515 516 def has_fee_mempool(self) -> bool: 517 return self.mempool_fees is not None 518 519 def has_dynamic_fees_ready(self): 520 if self.use_mempool_fees(): 521 return self.has_fee_mempool() 522 else: 523 return self.has_fee_etas() 524 525 def is_dynfee(self): 526 return bool(self.get('dynamic_fees', True)) 527 528 def use_mempool_fees(self): 529 return bool(self.get('mempool_fees', False)) 530 531 def _feerate_from_fractional_slider_position(self, fee_level: float, dyn: bool, 532 mempool: bool) -> Union[int, None]: 533 fee_level = max(fee_level, 0) 534 fee_level = min(fee_level, 1) 535 if dyn: 536 max_pos = (len(FEE_DEPTH_TARGETS) - 1) if mempool else len(FEE_ETA_TARGETS) 537 slider_pos = round(fee_level * max_pos) 538 fee_rate = self.depth_to_fee(slider_pos) if mempool else self.eta_to_fee(slider_pos) 539 else: 540 max_pos = len(FEERATE_STATIC_VALUES) - 1 541 slider_pos = round(fee_level * max_pos) 542 fee_rate = FEERATE_STATIC_VALUES[slider_pos] 543 return fee_rate 544 545 def fee_per_kb(self, dyn: bool=None, mempool: bool=None, fee_level: float=None) -> Optional[int]: 546 """Returns sat/kvB fee to pay for a txn. 547 Note: might return None. 548 549 fee_level: float between 0.0 and 1.0, representing fee slider position 550 """ 551 if constants.net is constants.BitcoinRegtest: 552 return FEERATE_REGTEST_HARDCODED 553 if dyn is None: 554 dyn = self.is_dynfee() 555 if mempool is None: 556 mempool = self.use_mempool_fees() 557 if fee_level is not None: 558 return self._feerate_from_fractional_slider_position(fee_level, dyn, mempool) 559 # there is no fee_level specified; will use config. 560 # note: 'depth_level' and 'fee_level' in config are integer slider positions, 561 # unlike fee_level here, which (when given) is a float in [0.0, 1.0] 562 if dyn: 563 if mempool: 564 fee_rate = self.depth_to_fee(self.get_depth_level()) 565 else: 566 fee_rate = self.eta_to_fee(self.get_fee_level()) 567 else: 568 fee_rate = self.get('fee_per_kb', FEERATE_FALLBACK_STATIC_FEE) 569 if fee_rate is not None: 570 fee_rate = int(fee_rate) 571 return fee_rate 572 573 def fee_per_byte(self): 574 """Returns sat/vB fee to pay for a txn. 575 Note: might return None. 576 """ 577 fee_per_kb = self.fee_per_kb() 578 return fee_per_kb / 1000 if fee_per_kb is not None else None 579 580 def estimate_fee(self, size: Union[int, float, Decimal], *, 581 allow_fallback_to_static_rates: bool = False) -> int: 582 fee_per_kb = self.fee_per_kb() 583 if fee_per_kb is None: 584 if allow_fallback_to_static_rates: 585 fee_per_kb = FEERATE_FALLBACK_STATIC_FEE 586 else: 587 raise NoDynamicFeeEstimates() 588 return self.estimate_fee_for_feerate(fee_per_kb, size) 589 590 @classmethod 591 def estimate_fee_for_feerate(cls, fee_per_kb: Union[int, float, Decimal], 592 size: Union[int, float, Decimal]) -> int: 593 size = Decimal(size) 594 fee_per_kb = Decimal(fee_per_kb) 595 fee_per_byte = fee_per_kb / 1000 596 # to be consistent with what is displayed in the GUI, 597 # the calculation needs to use the same precision: 598 fee_per_byte = quantize_feerate(fee_per_byte) 599 return round(fee_per_byte * size) 600 601 def update_fee_estimates(self, key, value): 602 self.fee_estimates[key] = value 603 self.fee_estimates_last_updated[key] = time.time() 604 605 def is_fee_estimates_update_required(self): 606 """Checks time since last requested and updated fee estimates. 607 Returns True if an update should be requested. 608 """ 609 now = time.time() 610 return now - self.last_time_fee_estimates_requested > 60 611 612 def requested_fee_estimates(self): 613 self.last_time_fee_estimates_requested = time.time() 614 615 def get_video_device(self): 616 device = self.get("video_device", "default") 617 if device == 'default': 618 device = '' 619 return device 620 621 def get_ssl_context(self): 622 ssl_keyfile = self.get('ssl_keyfile') 623 ssl_certfile = self.get('ssl_certfile') 624 if ssl_keyfile and ssl_certfile: 625 ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) 626 ssl_context.load_cert_chain(ssl_certfile, ssl_keyfile) 627 return ssl_context 628 629 def get_ssl_domain(self): 630 from .paymentrequest import check_ssl_config 631 if self.get('ssl_keyfile') and self.get('ssl_certfile'): 632 SSL_identity = check_ssl_config(self) 633 else: 634 SSL_identity = None 635 return SSL_identity 636 637 def get_netaddress(self, key: str) -> Optional[NetAddress]: 638 text = self.get(key) 639 if text: 640 try: 641 return NetAddress.from_string(text) 642 except: 643 pass 644 645 def format_amount(self, x, is_diff=False, whitespaces=False): 646 return format_satoshis( 647 x, 648 num_zeros=self.num_zeros, 649 decimal_point=self.decimal_point, 650 is_diff=is_diff, 651 whitespaces=whitespaces, 652 ) 653 654 def format_amount_and_units(self, amount): 655 return self.format_amount(amount) + ' '+ self.get_base_unit() 656 657 def format_fee_rate(self, fee_rate): 658 return format_fee_satoshis(fee_rate/1000, num_zeros=self.num_zeros) + ' sat/byte' 659 660 def get_base_unit(self): 661 return decimal_point_to_base_unit_name(self.decimal_point) 662 663 def set_base_unit(self, unit): 664 assert unit in base_units.keys() 665 self.decimal_point = base_unit_name_to_decimal_point(unit) 666 self.set_key('decimal_point', self.decimal_point, True) 667 668 def get_decimal_point(self): 669 return self.decimal_point 670 671 672 def read_user_config(path): 673 """Parse and store the user config settings in electrum.conf into user_config[].""" 674 if not path: 675 return {} 676 config_path = os.path.join(path, "config") 677 if not os.path.exists(config_path): 678 return {} 679 try: 680 with open(config_path, "r", encoding='utf-8') as f: 681 data = f.read() 682 result = json.loads(data) 683 except: 684 _logger.warning(f"Cannot read config file. {config_path}") 685 return {} 686 if not type(result) is dict: 687 return {} 688 return result