util.py (52263B)
1 # Electrum - lightweight Bitcoin client 2 # Copyright (C) 2011 Thomas Voegtlin 3 # 4 # Permission is hereby granted, free of charge, to any person 5 # obtaining a copy of this software and associated documentation files 6 # (the "Software"), to deal in the Software without restriction, 7 # including without limitation the rights to use, copy, modify, merge, 8 # publish, distribute, sublicense, and/or sell copies of the Software, 9 # and to permit persons to whom the Software is furnished to do so, 10 # subject to the following conditions: 11 # 12 # The above copyright notice and this permission notice shall be 13 # included in all copies or substantial portions of the Software. 14 # 15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 16 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 17 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 18 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS 19 # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 20 # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 21 # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 22 # SOFTWARE. 23 import binascii 24 import os, sys, re, json 25 from collections import defaultdict, OrderedDict 26 from typing import (NamedTuple, Union, TYPE_CHECKING, Tuple, Optional, Callable, Any, 27 Sequence, Dict, Generic, TypeVar, List, Iterable) 28 from datetime import datetime 29 import decimal 30 from decimal import Decimal 31 import traceback 32 import urllib 33 import threading 34 import hmac 35 import stat 36 from locale import localeconv 37 import asyncio 38 import urllib.request, urllib.parse, urllib.error 39 import builtins 40 import json 41 import time 42 from typing import NamedTuple, Optional 43 import ssl 44 import ipaddress 45 from ipaddress import IPv4Address, IPv6Address 46 import random 47 import secrets 48 import functools 49 from abc import abstractmethod, ABC 50 51 import attr 52 import aiohttp 53 from aiohttp_socks import ProxyConnector, ProxyType 54 import aiorpcx 55 from aiorpcx import TaskGroup 56 import certifi 57 import dns.resolver 58 59 from .i18n import _ 60 from .logging import get_logger, Logger 61 62 if TYPE_CHECKING: 63 from .network import Network 64 from .interface import Interface 65 from .simple_config import SimpleConfig 66 67 68 _logger = get_logger(__name__) 69 70 71 def inv_dict(d): 72 return {v: k for k, v in d.items()} 73 74 75 ca_path = certifi.where() 76 77 78 base_units = {'BTC':8, 'mBTC':5, 'bits':2, 'sat':0} 79 base_units_inverse = inv_dict(base_units) 80 base_units_list = ['BTC', 'mBTC', 'bits', 'sat'] # list(dict) does not guarantee order 81 82 DECIMAL_POINT_DEFAULT = 5 # mBTC 83 84 85 class UnknownBaseUnit(Exception): pass 86 87 88 def decimal_point_to_base_unit_name(dp: int) -> str: 89 # e.g. 8 -> "BTC" 90 try: 91 return base_units_inverse[dp] 92 except KeyError: 93 raise UnknownBaseUnit(dp) from None 94 95 96 def base_unit_name_to_decimal_point(unit_name: str) -> int: 97 # e.g. "BTC" -> 8 98 try: 99 return base_units[unit_name] 100 except KeyError: 101 raise UnknownBaseUnit(unit_name) from None 102 103 104 class NotEnoughFunds(Exception): 105 def __str__(self): 106 return _("Insufficient funds") 107 108 109 class NoDynamicFeeEstimates(Exception): 110 def __str__(self): 111 return _('Dynamic fee estimates not available') 112 113 114 class MultipleSpendMaxTxOutputs(Exception): 115 def __str__(self): 116 return _('At most one output can be set to spend max') 117 118 119 class InvalidPassword(Exception): 120 def __str__(self): 121 return _("Incorrect password") 122 123 124 class AddTransactionException(Exception): 125 pass 126 127 128 class UnrelatedTransactionException(AddTransactionException): 129 def __str__(self): 130 return _("Transaction is unrelated to this wallet.") 131 132 133 class FileImportFailed(Exception): 134 def __init__(self, message=''): 135 self.message = str(message) 136 137 def __str__(self): 138 return _("Failed to import from file.") + "\n" + self.message 139 140 141 class FileExportFailed(Exception): 142 def __init__(self, message=''): 143 self.message = str(message) 144 145 def __str__(self): 146 return _("Failed to export to file.") + "\n" + self.message 147 148 149 class WalletFileException(Exception): pass 150 151 152 class BitcoinException(Exception): pass 153 154 155 class UserFacingException(Exception): 156 """Exception that contains information intended to be shown to the user.""" 157 158 159 class InvoiceError(UserFacingException): pass 160 161 162 # Throw this exception to unwind the stack like when an error occurs. 163 # However unlike other exceptions the user won't be informed. 164 class UserCancelled(Exception): 165 '''An exception that is suppressed from the user''' 166 pass 167 168 169 # note: this is not a NamedTuple as then its json encoding cannot be customized 170 class Satoshis(object): 171 __slots__ = ('value',) 172 173 def __new__(cls, value): 174 self = super(Satoshis, cls).__new__(cls) 175 # note: 'value' sometimes has msat precision 176 self.value = value 177 return self 178 179 def __repr__(self): 180 return f'Satoshis({self.value})' 181 182 def __str__(self): 183 # note: precision is truncated to satoshis here 184 return format_satoshis(self.value) 185 186 def __eq__(self, other): 187 return self.value == other.value 188 189 def __ne__(self, other): 190 return not (self == other) 191 192 def __add__(self, other): 193 return Satoshis(self.value + other.value) 194 195 196 # note: this is not a NamedTuple as then its json encoding cannot be customized 197 class Fiat(object): 198 __slots__ = ('value', 'ccy') 199 200 def __new__(cls, value: Optional[Decimal], ccy: str): 201 self = super(Fiat, cls).__new__(cls) 202 self.ccy = ccy 203 if not isinstance(value, (Decimal, type(None))): 204 raise TypeError(f"value should be Decimal or None, not {type(value)}") 205 self.value = value 206 return self 207 208 def __repr__(self): 209 return 'Fiat(%s)'% self.__str__() 210 211 def __str__(self): 212 if self.value is None or self.value.is_nan(): 213 return _('No Data') 214 else: 215 return "{:.2f}".format(self.value) 216 217 def to_ui_string(self): 218 if self.value is None or self.value.is_nan(): 219 return _('No Data') 220 else: 221 return "{:.2f}".format(self.value) + ' ' + self.ccy 222 223 def __eq__(self, other): 224 if not isinstance(other, Fiat): 225 return False 226 if self.ccy != other.ccy: 227 return False 228 if isinstance(self.value, Decimal) and isinstance(other.value, Decimal) \ 229 and self.value.is_nan() and other.value.is_nan(): 230 return True 231 return self.value == other.value 232 233 def __ne__(self, other): 234 return not (self == other) 235 236 def __add__(self, other): 237 assert self.ccy == other.ccy 238 return Fiat(self.value + other.value, self.ccy) 239 240 241 class MyEncoder(json.JSONEncoder): 242 def default(self, obj): 243 # note: this does not get called for namedtuples :( https://bugs.python.org/issue30343 244 from .transaction import Transaction, TxOutput 245 from .lnutil import UpdateAddHtlc 246 if isinstance(obj, UpdateAddHtlc): 247 return obj.to_tuple() 248 if isinstance(obj, Transaction): 249 return obj.serialize() 250 if isinstance(obj, TxOutput): 251 return obj.to_legacy_tuple() 252 if isinstance(obj, Satoshis): 253 return str(obj) 254 if isinstance(obj, Fiat): 255 return str(obj) 256 if isinstance(obj, Decimal): 257 return str(obj) 258 if isinstance(obj, datetime): 259 return obj.isoformat(' ')[:-3] 260 if isinstance(obj, set): 261 return list(obj) 262 if isinstance(obj, bytes): # for nametuples in lnchannel 263 return obj.hex() 264 if hasattr(obj, 'to_json') and callable(obj.to_json): 265 return obj.to_json() 266 return super(MyEncoder, self).default(obj) 267 268 269 class ThreadJob(Logger): 270 """A job that is run periodically from a thread's main loop. run() is 271 called from that thread's context. 272 """ 273 274 def __init__(self): 275 Logger.__init__(self) 276 277 def run(self): 278 """Called periodically from the thread""" 279 pass 280 281 class DebugMem(ThreadJob): 282 '''A handy class for debugging GC memory leaks''' 283 def __init__(self, classes, interval=30): 284 ThreadJob.__init__(self) 285 self.next_time = 0 286 self.classes = classes 287 self.interval = interval 288 289 def mem_stats(self): 290 import gc 291 self.logger.info("Start memscan") 292 gc.collect() 293 objmap = defaultdict(list) 294 for obj in gc.get_objects(): 295 for class_ in self.classes: 296 if isinstance(obj, class_): 297 objmap[class_].append(obj) 298 for class_, objs in objmap.items(): 299 self.logger.info(f"{class_.__name__}: {len(objs)}") 300 self.logger.info("Finish memscan") 301 302 def run(self): 303 if time.time() > self.next_time: 304 self.mem_stats() 305 self.next_time = time.time() + self.interval 306 307 class DaemonThread(threading.Thread, Logger): 308 """ daemon thread that terminates cleanly """ 309 310 LOGGING_SHORTCUT = 'd' 311 312 def __init__(self): 313 threading.Thread.__init__(self) 314 Logger.__init__(self) 315 self.parent_thread = threading.currentThread() 316 self.running = False 317 self.running_lock = threading.Lock() 318 self.job_lock = threading.Lock() 319 self.jobs = [] 320 self.stopped_event = threading.Event() # set when fully stopped 321 322 def add_jobs(self, jobs): 323 with self.job_lock: 324 self.jobs.extend(jobs) 325 326 def run_jobs(self): 327 # Don't let a throwing job disrupt the thread, future runs of 328 # itself, or other jobs. This is useful protection against 329 # malformed or malicious server responses 330 with self.job_lock: 331 for job in self.jobs: 332 try: 333 job.run() 334 except Exception as e: 335 self.logger.exception('') 336 337 def remove_jobs(self, jobs): 338 with self.job_lock: 339 for job in jobs: 340 self.jobs.remove(job) 341 342 def start(self): 343 with self.running_lock: 344 self.running = True 345 return threading.Thread.start(self) 346 347 def is_running(self): 348 with self.running_lock: 349 return self.running and self.parent_thread.is_alive() 350 351 def stop(self): 352 with self.running_lock: 353 self.running = False 354 355 def on_stop(self): 356 if 'ANDROID_DATA' in os.environ: 357 import jnius 358 jnius.detach() 359 self.logger.info("jnius detach") 360 self.logger.info("stopped") 361 self.stopped_event.set() 362 363 364 def print_stderr(*args): 365 args = [str(item) for item in args] 366 sys.stderr.write(" ".join(args) + "\n") 367 sys.stderr.flush() 368 369 def print_msg(*args): 370 # Stringify args 371 args = [str(item) for item in args] 372 sys.stdout.write(" ".join(args) + "\n") 373 sys.stdout.flush() 374 375 def json_encode(obj): 376 try: 377 s = json.dumps(obj, sort_keys = True, indent = 4, cls=MyEncoder) 378 except TypeError: 379 s = repr(obj) 380 return s 381 382 def json_decode(x): 383 try: 384 return json.loads(x, parse_float=Decimal) 385 except: 386 return x 387 388 def json_normalize(x): 389 # note: The return value of commands, when going through the JSON-RPC interface, 390 # is json-encoded. The encoder used there cannot handle some types, e.g. electrum.util.Satoshis. 391 # note: We should not simply do "json_encode(x)" here, as then later x would get doubly json-encoded. 392 # see #5868 393 return json_decode(json_encode(x)) 394 395 396 # taken from Django Source Code 397 def constant_time_compare(val1, val2): 398 """Return True if the two strings are equal, False otherwise.""" 399 return hmac.compare_digest(to_bytes(val1, 'utf8'), to_bytes(val2, 'utf8')) 400 401 402 # decorator that prints execution time 403 _profiler_logger = _logger.getChild('profiler') 404 def profiler(func): 405 def do_profile(args, kw_args): 406 name = func.__qualname__ 407 t0 = time.time() 408 o = func(*args, **kw_args) 409 t = time.time() - t0 410 _profiler_logger.debug(f"{name} {t:,.4f}") 411 return o 412 return lambda *args, **kw_args: do_profile(args, kw_args) 413 414 415 def android_ext_dir(): 416 from android.storage import primary_external_storage_path 417 return primary_external_storage_path() 418 419 def android_backup_dir(): 420 d = os.path.join(android_ext_dir(), 'org.electrum.electrum') 421 if not os.path.exists(d): 422 os.mkdir(d) 423 return d 424 425 def android_data_dir(): 426 import jnius 427 PythonActivity = jnius.autoclass('org.kivy.android.PythonActivity') 428 return PythonActivity.mActivity.getFilesDir().getPath() + '/data' 429 430 def get_backup_dir(config): 431 if 'ANDROID_DATA' in os.environ: 432 return android_backup_dir() if config.get('android_backups') else None 433 else: 434 return config.get('backup_dir') 435 436 def ensure_sparse_file(filename): 437 # On modern Linux, no need to do anything. 438 # On Windows, need to explicitly mark file. 439 if os.name == "nt": 440 try: 441 os.system('fsutil sparse setflag "{}" 1'.format(filename)) 442 except Exception as e: 443 _logger.info(f'error marking file {filename} as sparse: {e}') 444 445 446 def get_headers_dir(config): 447 return config.path 448 449 450 def assert_datadir_available(config_path): 451 path = config_path 452 if os.path.exists(path): 453 return 454 else: 455 raise FileNotFoundError( 456 'Electrum datadir does not exist. Was it deleted while running?' + '\n' + 457 'Should be at {}'.format(path)) 458 459 460 def assert_file_in_datadir_available(path, config_path): 461 if os.path.exists(path): 462 return 463 else: 464 assert_datadir_available(config_path) 465 raise FileNotFoundError( 466 'Cannot find file but datadir is there.' + '\n' + 467 'Should be at {}'.format(path)) 468 469 470 def standardize_path(path): 471 return os.path.normcase( 472 os.path.realpath( 473 os.path.abspath( 474 os.path.expanduser( 475 path 476 )))) 477 478 479 def get_new_wallet_name(wallet_folder: str) -> str: 480 i = 1 481 while True: 482 filename = "wallet_%d" % i 483 if filename in os.listdir(wallet_folder): 484 i += 1 485 else: 486 break 487 return filename 488 489 490 def assert_bytes(*args): 491 """ 492 porting helper, assert args type 493 """ 494 try: 495 for x in args: 496 assert isinstance(x, (bytes, bytearray)) 497 except: 498 print('assert bytes failed', list(map(type, args))) 499 raise 500 501 502 def assert_str(*args): 503 """ 504 porting helper, assert args type 505 """ 506 for x in args: 507 assert isinstance(x, str) 508 509 510 def to_string(x, enc) -> str: 511 if isinstance(x, (bytes, bytearray)): 512 return x.decode(enc) 513 if isinstance(x, str): 514 return x 515 else: 516 raise TypeError("Not a string or bytes like object") 517 518 519 def to_bytes(something, encoding='utf8') -> bytes: 520 """ 521 cast string to bytes() like object, but for python2 support it's bytearray copy 522 """ 523 if isinstance(something, bytes): 524 return something 525 if isinstance(something, str): 526 return something.encode(encoding) 527 elif isinstance(something, bytearray): 528 return bytes(something) 529 else: 530 raise TypeError("Not a string or bytes like object") 531 532 533 bfh = bytes.fromhex 534 535 536 def bh2u(x: bytes) -> str: 537 """ 538 str with hex representation of a bytes-like object 539 540 >>> x = bytes((1, 2, 10)) 541 >>> bh2u(x) 542 '01020A' 543 """ 544 return x.hex() 545 546 547 def xor_bytes(a: bytes, b: bytes) -> bytes: 548 size = min(len(a), len(b)) 549 return ((int.from_bytes(a[:size], "big") ^ int.from_bytes(b[:size], "big")) 550 .to_bytes(size, "big")) 551 552 553 def user_dir(): 554 if "ELECTRUMDIR" in os.environ: 555 return os.environ["ELECTRUMDIR"] 556 elif 'ANDROID_DATA' in os.environ: 557 return android_data_dir() 558 elif os.name == 'posix': 559 return os.path.join(os.environ["HOME"], ".electrum") 560 elif "APPDATA" in os.environ: 561 return os.path.join(os.environ["APPDATA"], "Electrum") 562 elif "LOCALAPPDATA" in os.environ: 563 return os.path.join(os.environ["LOCALAPPDATA"], "Electrum") 564 else: 565 #raise Exception("No home directory found in environment variables.") 566 return 567 568 569 def resource_path(*parts): 570 return os.path.join(pkg_dir, *parts) 571 572 573 # absolute path to python package folder of electrum ("lib") 574 pkg_dir = os.path.split(os.path.realpath(__file__))[0] 575 576 577 def is_valid_email(s): 578 regexp = r"[^@]+@[^@]+\.[^@]+" 579 return re.match(regexp, s) is not None 580 581 582 def is_hash256_str(text: Any) -> bool: 583 if not isinstance(text, str): return False 584 if len(text) != 64: return False 585 return is_hex_str(text) 586 587 588 def is_hex_str(text: Any) -> bool: 589 if not isinstance(text, str): return False 590 try: 591 b = bytes.fromhex(text) 592 except: 593 return False 594 # forbid whitespaces in text: 595 if len(text) != 2 * len(b): 596 return False 597 return True 598 599 600 def is_integer(val: Any) -> bool: 601 return isinstance(val, int) 602 603 604 def is_non_negative_integer(val: Any) -> bool: 605 if is_integer(val): 606 return val >= 0 607 return False 608 609 610 def is_int_or_float(val: Any) -> bool: 611 return isinstance(val, (int, float)) 612 613 614 def is_non_negative_int_or_float(val: Any) -> bool: 615 if is_int_or_float(val): 616 return val >= 0 617 return False 618 619 620 def chunks(items, size: int): 621 """Break up items, an iterable, into chunks of length size.""" 622 if size < 1: 623 raise ValueError(f"size must be positive, not {repr(size)}") 624 for i in range(0, len(items), size): 625 yield items[i: i + size] 626 627 628 def format_satoshis_plain(x, *, decimal_point=8) -> str: 629 """Display a satoshi amount scaled. Always uses a '.' as a decimal 630 point and has no thousands separator""" 631 if x == '!': 632 return 'max' 633 scale_factor = pow(10, decimal_point) 634 return "{:.8f}".format(Decimal(x) / scale_factor).rstrip('0').rstrip('.') 635 636 637 # Check that Decimal precision is sufficient. 638 # We need at the very least ~20, as we deal with msat amounts, and 639 # log10(21_000_000 * 10**8 * 1000) ~= 18.3 640 # decimal.DefaultContext.prec == 28 by default, but it is mutable. 641 # We enforce that we have at least that available. 642 assert decimal.getcontext().prec >= 28, f"PyDecimal precision too low: {decimal.getcontext().prec}" 643 644 DECIMAL_POINT = localeconv()['decimal_point'] # type: str 645 646 647 def format_satoshis( 648 x, # in satoshis 649 *, 650 num_zeros=0, 651 decimal_point=8, 652 precision=None, 653 is_diff=False, 654 whitespaces=False, 655 ) -> str: 656 if x is None: 657 return 'unknown' 658 if x == '!': 659 return 'max' 660 if precision is None: 661 precision = decimal_point 662 # format string 663 decimal_format = "." + str(precision) if precision > 0 else "" 664 if is_diff: 665 decimal_format = '+' + decimal_format 666 # initial result 667 scale_factor = pow(10, decimal_point) 668 if not isinstance(x, Decimal): 669 x = Decimal(x).quantize(Decimal('1E-8')) 670 result = ("{:" + decimal_format + "f}").format(x / scale_factor) 671 if "." not in result: result += "." 672 result = result.rstrip('0') 673 # extra decimal places 674 integer_part, fract_part = result.split(".") 675 if len(fract_part) < num_zeros: 676 fract_part += "0" * (num_zeros - len(fract_part)) 677 result = integer_part + DECIMAL_POINT + fract_part 678 # leading/trailing whitespaces 679 if whitespaces: 680 result += " " * (decimal_point - len(fract_part)) 681 result = " " * (15 - len(result)) + result 682 return result 683 684 685 FEERATE_PRECISION = 1 # num fractional decimal places for sat/byte fee rates 686 _feerate_quanta = Decimal(10) ** (-FEERATE_PRECISION) 687 688 689 def format_fee_satoshis(fee, *, num_zeros=0, precision=None): 690 if precision is None: 691 precision = FEERATE_PRECISION 692 num_zeros = min(num_zeros, FEERATE_PRECISION) # no more zeroes than available prec 693 return format_satoshis(fee, num_zeros=num_zeros, decimal_point=0, precision=precision) 694 695 696 def quantize_feerate(fee) -> Union[None, Decimal, int]: 697 """Strip sat/byte fee rate of excess precision.""" 698 if fee is None: 699 return None 700 return Decimal(fee).quantize(_feerate_quanta, rounding=decimal.ROUND_HALF_DOWN) 701 702 703 def timestamp_to_datetime(timestamp): 704 if timestamp is None: 705 return None 706 return datetime.fromtimestamp(timestamp) 707 708 def format_time(timestamp): 709 date = timestamp_to_datetime(timestamp) 710 return date.isoformat(' ')[:-3] if date else _("Unknown") 711 712 713 # Takes a timestamp and returns a string with the approximation of the age 714 def age(from_date, since_date = None, target_tz=None, include_seconds=False): 715 if from_date is None: 716 return "Unknown" 717 718 from_date = datetime.fromtimestamp(from_date) 719 if since_date is None: 720 since_date = datetime.now(target_tz) 721 722 td = time_difference(from_date - since_date, include_seconds) 723 return td + " ago" if from_date < since_date else "in " + td 724 725 726 def time_difference(distance_in_time, include_seconds): 727 #distance_in_time = since_date - from_date 728 distance_in_seconds = int(round(abs(distance_in_time.days * 86400 + distance_in_time.seconds))) 729 distance_in_minutes = int(round(distance_in_seconds/60)) 730 731 if distance_in_minutes == 0: 732 if include_seconds: 733 return "%s seconds" % distance_in_seconds 734 else: 735 return "less than a minute" 736 elif distance_in_minutes < 45: 737 return "%s minutes" % distance_in_minutes 738 elif distance_in_minutes < 90: 739 return "about 1 hour" 740 elif distance_in_minutes < 1440: 741 return "about %d hours" % (round(distance_in_minutes / 60.0)) 742 elif distance_in_minutes < 2880: 743 return "1 day" 744 elif distance_in_minutes < 43220: 745 return "%d days" % (round(distance_in_minutes / 1440)) 746 elif distance_in_minutes < 86400: 747 return "about 1 month" 748 elif distance_in_minutes < 525600: 749 return "%d months" % (round(distance_in_minutes / 43200)) 750 elif distance_in_minutes < 1051200: 751 return "about 1 year" 752 else: 753 return "over %d years" % (round(distance_in_minutes / 525600)) 754 755 mainnet_block_explorers = { 756 'Bitupper Explorer': ('https://bitupper.com/en/explorer/bitcoin/', 757 {'tx': 'transactions/', 'addr': 'addresses/'}), 758 'Bitflyer.jp': ('https://chainflyer.bitflyer.jp/', 759 {'tx': 'Transaction/', 'addr': 'Address/'}), 760 'Blockchain.info': ('https://blockchain.com/btc/', 761 {'tx': 'tx/', 'addr': 'address/'}), 762 'blockchainbdgpzk.onion': ('https://blockchainbdgpzk.onion/', 763 {'tx': 'tx/', 'addr': 'address/'}), 764 'Blockstream.info': ('https://blockstream.info/', 765 {'tx': 'tx/', 'addr': 'address/'}), 766 'Bitaps.com': ('https://btc.bitaps.com/', 767 {'tx': '', 'addr': ''}), 768 'BTC.com': ('https://btc.com/', 769 {'tx': '', 'addr': ''}), 770 'Chain.so': ('https://www.chain.so/', 771 {'tx': 'tx/BTC/', 'addr': 'address/BTC/'}), 772 'Insight.is': ('https://insight.bitpay.com/', 773 {'tx': 'tx/', 'addr': 'address/'}), 774 'TradeBlock.com': ('https://tradeblock.com/blockchain/', 775 {'tx': 'tx/', 'addr': 'address/'}), 776 'BlockCypher.com': ('https://live.blockcypher.com/btc/', 777 {'tx': 'tx/', 'addr': 'address/'}), 778 'Blockchair.com': ('https://blockchair.com/bitcoin/', 779 {'tx': 'transaction/', 'addr': 'address/'}), 780 'blockonomics.co': ('https://www.blockonomics.co/', 781 {'tx': 'api/tx?txid=', 'addr': '#/search?q='}), 782 'mempool.space': ('https://mempool.space/', 783 {'tx': 'tx/', 'addr': 'address/'}), 784 'mempool.emzy.de': ('https://mempool.emzy.de/', 785 {'tx': 'tx/', 'addr': 'address/'}), 786 'OXT.me': ('https://oxt.me/', 787 {'tx': 'transaction/', 'addr': 'address/'}), 788 'smartbit.com.au': ('https://www.smartbit.com.au/', 789 {'tx': 'tx/', 'addr': 'address/'}), 790 'mynode.local': ('http://mynode.local:3002/', 791 {'tx': 'tx/', 'addr': 'address/'}), 792 'system default': ('blockchain:/', 793 {'tx': 'tx/', 'addr': 'address/'}), 794 } 795 796 testnet_block_explorers = { 797 'Bitaps.com': ('https://tbtc.bitaps.com/', 798 {'tx': '', 'addr': ''}), 799 'BlockCypher.com': ('https://live.blockcypher.com/btc-testnet/', 800 {'tx': 'tx/', 'addr': 'address/'}), 801 'Blockchain.info': ('https://www.blockchain.com/btc-testnet/', 802 {'tx': 'tx/', 'addr': 'address/'}), 803 'Blockstream.info': ('https://blockstream.info/testnet/', 804 {'tx': 'tx/', 'addr': 'address/'}), 805 'mempool.space': ('https://mempool.space/testnet/', 806 {'tx': 'tx/', 'addr': 'address/'}), 807 'smartbit.com.au': ('https://testnet.smartbit.com.au/', 808 {'tx': 'tx/', 'addr': 'address/'}), 809 'system default': ('blockchain://000000000933ea01ad0ee984209779baaec3ced90fa3f408719526f8d77f4943/', 810 {'tx': 'tx/', 'addr': 'address/'}), 811 } 812 813 _block_explorer_default_api_loc = {'tx': 'tx/', 'addr': 'address/'} 814 815 816 def block_explorer_info(): 817 from . import constants 818 return mainnet_block_explorers if not constants.net.TESTNET else testnet_block_explorers 819 820 821 def block_explorer(config: 'SimpleConfig') -> Optional[str]: 822 """Returns name of selected block explorer, 823 or None if a custom one (not among hardcoded ones) is configured. 824 """ 825 if config.get('block_explorer_custom') is not None: 826 return None 827 default_ = 'Blockstream.info' 828 be_key = config.get('block_explorer', default_) 829 be_tuple = block_explorer_info().get(be_key) 830 if be_tuple is None: 831 be_key = default_ 832 assert isinstance(be_key, str), f"{be_key!r} should be str" 833 return be_key 834 835 836 def block_explorer_tuple(config: 'SimpleConfig') -> Optional[Tuple[str, dict]]: 837 custom_be = config.get('block_explorer_custom') 838 if custom_be: 839 if isinstance(custom_be, str): 840 return custom_be, _block_explorer_default_api_loc 841 if isinstance(custom_be, (tuple, list)) and len(custom_be) == 2: 842 return tuple(custom_be) 843 _logger.warning(f"not using 'block_explorer_custom' from config. " 844 f"expected a str or a pair but got {custom_be!r}") 845 return None 846 else: 847 # using one of the hardcoded block explorers 848 return block_explorer_info().get(block_explorer(config)) 849 850 851 def block_explorer_URL(config: 'SimpleConfig', kind: str, item: str) -> Optional[str]: 852 be_tuple = block_explorer_tuple(config) 853 if not be_tuple: 854 return 855 explorer_url, explorer_dict = be_tuple 856 kind_str = explorer_dict.get(kind) 857 if kind_str is None: 858 return 859 if explorer_url[-1] != "/": 860 explorer_url += "/" 861 url_parts = [explorer_url, kind_str, item] 862 return ''.join(url_parts) 863 864 # URL decode 865 #_ud = re.compile('%([0-9a-hA-H]{2})', re.MULTILINE) 866 #urldecode = lambda x: _ud.sub(lambda m: chr(int(m.group(1), 16)), x) 867 868 869 # note: when checking against these, use .lower() to support case-insensitivity 870 BITCOIN_BIP21_URI_SCHEME = 'bitcoin' 871 LIGHTNING_URI_SCHEME = 'lightning' 872 873 874 class InvalidBitcoinURI(Exception): pass 875 876 877 # TODO rename to parse_bip21_uri or similar 878 def parse_URI(uri: str, on_pr: Callable = None, *, loop=None) -> dict: 879 """Raises InvalidBitcoinURI on malformed URI.""" 880 from . import bitcoin 881 from .bitcoin import COIN 882 883 if not isinstance(uri, str): 884 raise InvalidBitcoinURI(f"expected string, not {repr(uri)}") 885 886 if ':' not in uri: 887 if not bitcoin.is_address(uri): 888 raise InvalidBitcoinURI("Not a bitcoin address") 889 return {'address': uri} 890 891 u = urllib.parse.urlparse(uri) 892 if u.scheme.lower() != BITCOIN_BIP21_URI_SCHEME: 893 raise InvalidBitcoinURI("Not a bitcoin URI") 894 address = u.path 895 896 # python for android fails to parse query 897 if address.find('?') > 0: 898 address, query = u.path.split('?') 899 pq = urllib.parse.parse_qs(query) 900 else: 901 pq = urllib.parse.parse_qs(u.query) 902 903 for k, v in pq.items(): 904 if len(v) != 1: 905 raise InvalidBitcoinURI(f'Duplicate Key: {repr(k)}') 906 907 out = {k: v[0] for k, v in pq.items()} 908 if address: 909 if not bitcoin.is_address(address): 910 raise InvalidBitcoinURI(f"Invalid bitcoin address: {address}") 911 out['address'] = address 912 if 'amount' in out: 913 am = out['amount'] 914 try: 915 m = re.match(r'([0-9.]+)X([0-9])', am) 916 if m: 917 k = int(m.group(2)) - 8 918 amount = Decimal(m.group(1)) * pow( Decimal(10) , k) 919 else: 920 amount = Decimal(am) * COIN 921 out['amount'] = int(amount) 922 except Exception as e: 923 raise InvalidBitcoinURI(f"failed to parse 'amount' field: {repr(e)}") from e 924 if 'message' in out: 925 out['message'] = out['message'] 926 out['memo'] = out['message'] 927 if 'time' in out: 928 try: 929 out['time'] = int(out['time']) 930 except Exception as e: 931 raise InvalidBitcoinURI(f"failed to parse 'time' field: {repr(e)}") from e 932 if 'exp' in out: 933 try: 934 out['exp'] = int(out['exp']) 935 except Exception as e: 936 raise InvalidBitcoinURI(f"failed to parse 'exp' field: {repr(e)}") from e 937 if 'sig' in out: 938 try: 939 out['sig'] = bh2u(bitcoin.base_decode(out['sig'], base=58)) 940 except Exception as e: 941 raise InvalidBitcoinURI(f"failed to parse 'sig' field: {repr(e)}") from e 942 943 r = out.get('r') 944 sig = out.get('sig') 945 name = out.get('name') 946 if on_pr and (r or (name and sig)): 947 @log_exceptions 948 async def get_payment_request(): 949 from . import paymentrequest as pr 950 if name and sig: 951 s = pr.serialize_request(out).SerializeToString() 952 request = pr.PaymentRequest(s) 953 else: 954 request = await pr.get_payment_request(r) 955 if on_pr: 956 on_pr(request) 957 loop = loop or asyncio.get_event_loop() 958 asyncio.run_coroutine_threadsafe(get_payment_request(), loop) 959 960 return out 961 962 963 def create_bip21_uri(addr, amount_sat: Optional[int], message: Optional[str], 964 *, extra_query_params: Optional[dict] = None) -> str: 965 from . import bitcoin 966 if not bitcoin.is_address(addr): 967 return "" 968 if extra_query_params is None: 969 extra_query_params = {} 970 query = [] 971 if amount_sat: 972 query.append('amount=%s'%format_satoshis_plain(amount_sat)) 973 if message: 974 query.append('message=%s'%urllib.parse.quote(message)) 975 for k, v in extra_query_params.items(): 976 if not isinstance(k, str) or k != urllib.parse.quote(k): 977 raise Exception(f"illegal key for URI: {repr(k)}") 978 v = urllib.parse.quote(v) 979 query.append(f"{k}={v}") 980 p = urllib.parse.ParseResult( 981 scheme=BITCOIN_BIP21_URI_SCHEME, 982 netloc='', 983 path=addr, 984 params='', 985 query='&'.join(query), 986 fragment='', 987 ) 988 return str(urllib.parse.urlunparse(p)) 989 990 991 def maybe_extract_bolt11_invoice(data: str) -> Optional[str]: 992 data = data.strip() # whitespaces 993 data = data.lower() 994 if data.startswith(LIGHTNING_URI_SCHEME + ':ln'): 995 data = data[10:] 996 if data.startswith('ln'): 997 return data 998 return None 999 1000 1001 # Python bug (http://bugs.python.org/issue1927) causes raw_input 1002 # to be redirected improperly between stdin/stderr on Unix systems 1003 #TODO: py3 1004 def raw_input(prompt=None): 1005 if prompt: 1006 sys.stdout.write(prompt) 1007 return builtin_raw_input() 1008 1009 builtin_raw_input = builtins.input 1010 builtins.input = raw_input 1011 1012 1013 def parse_json(message): 1014 # TODO: check \r\n pattern 1015 n = message.find(b'\n') 1016 if n==-1: 1017 return None, message 1018 try: 1019 j = json.loads(message[0:n].decode('utf8')) 1020 except: 1021 j = None 1022 return j, message[n+1:] 1023 1024 1025 def setup_thread_excepthook(): 1026 """ 1027 Workaround for `sys.excepthook` thread bug from: 1028 http://bugs.python.org/issue1230540 1029 1030 Call once from the main thread before creating any threads. 1031 """ 1032 1033 init_original = threading.Thread.__init__ 1034 1035 def init(self, *args, **kwargs): 1036 1037 init_original(self, *args, **kwargs) 1038 run_original = self.run 1039 1040 def run_with_except_hook(*args2, **kwargs2): 1041 try: 1042 run_original(*args2, **kwargs2) 1043 except Exception: 1044 sys.excepthook(*sys.exc_info()) 1045 1046 self.run = run_with_except_hook 1047 1048 threading.Thread.__init__ = init 1049 1050 1051 def send_exception_to_crash_reporter(e: BaseException): 1052 sys.excepthook(type(e), e, e.__traceback__) 1053 1054 1055 def versiontuple(v): 1056 return tuple(map(int, (v.split(".")))) 1057 1058 1059 def read_json_file(path): 1060 try: 1061 with open(path, 'r', encoding='utf-8') as f: 1062 data = json.loads(f.read()) 1063 #backwards compatibility for JSONDecodeError 1064 except ValueError: 1065 _logger.exception('') 1066 raise FileImportFailed(_("Invalid JSON code.")) 1067 except BaseException as e: 1068 _logger.exception('') 1069 raise FileImportFailed(e) 1070 return data 1071 1072 def write_json_file(path, data): 1073 try: 1074 with open(path, 'w+', encoding='utf-8') as f: 1075 json.dump(data, f, indent=4, sort_keys=True, cls=MyEncoder) 1076 except (IOError, os.error) as e: 1077 _logger.exception('') 1078 raise FileExportFailed(e) 1079 1080 1081 def make_dir(path, allow_symlink=True): 1082 """Make directory if it does not yet exist.""" 1083 if not os.path.exists(path): 1084 if not allow_symlink and os.path.islink(path): 1085 raise Exception('Dangling link: ' + path) 1086 os.mkdir(path) 1087 os.chmod(path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR) 1088 1089 1090 def log_exceptions(func): 1091 """Decorator to log AND re-raise exceptions.""" 1092 assert asyncio.iscoroutinefunction(func), 'func needs to be a coroutine' 1093 @functools.wraps(func) 1094 async def wrapper(*args, **kwargs): 1095 self = args[0] if len(args) > 0 else None 1096 try: 1097 return await func(*args, **kwargs) 1098 except asyncio.CancelledError as e: 1099 raise 1100 except BaseException as e: 1101 mylogger = self.logger if hasattr(self, 'logger') else _logger 1102 try: 1103 mylogger.exception(f"Exception in {func.__name__}: {repr(e)}") 1104 except BaseException as e2: 1105 print(f"logging exception raised: {repr(e2)}... orig exc: {repr(e)} in {func.__name__}") 1106 raise 1107 return wrapper 1108 1109 1110 def ignore_exceptions(func): 1111 """Decorator to silently swallow all exceptions.""" 1112 assert asyncio.iscoroutinefunction(func), 'func needs to be a coroutine' 1113 @functools.wraps(func) 1114 async def wrapper(*args, **kwargs): 1115 try: 1116 return await func(*args, **kwargs) 1117 except asyncio.CancelledError: 1118 # note: with python 3.8, CancelledError no longer inherits Exception, so this catch is redundant 1119 raise 1120 except Exception as e: 1121 pass 1122 return wrapper 1123 1124 1125 class TxMinedInfo(NamedTuple): 1126 height: int # height of block that mined tx 1127 conf: Optional[int] = None # number of confirmations, SPV verified (None means unknown) 1128 timestamp: Optional[int] = None # timestamp of block that mined tx 1129 txpos: Optional[int] = None # position of tx in serialized block 1130 header_hash: Optional[str] = None # hash of block that mined tx 1131 1132 1133 def make_aiohttp_session(proxy: Optional[dict], headers=None, timeout=None): 1134 if headers is None: 1135 headers = {'User-Agent': 'Electrum'} 1136 if timeout is None: 1137 # The default timeout is high intentionally. 1138 # DNS on some systems can be really slow, see e.g. #5337 1139 timeout = aiohttp.ClientTimeout(total=45) 1140 elif isinstance(timeout, (int, float)): 1141 timeout = aiohttp.ClientTimeout(total=timeout) 1142 ssl_context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=ca_path) 1143 1144 if proxy: 1145 connector = ProxyConnector( 1146 proxy_type=ProxyType.SOCKS5 if proxy['mode'] == 'socks5' else ProxyType.SOCKS4, 1147 host=proxy['host'], 1148 port=int(proxy['port']), 1149 username=proxy.get('user', None), 1150 password=proxy.get('password', None), 1151 rdns=True, 1152 ssl=ssl_context, 1153 ) 1154 else: 1155 connector = aiohttp.TCPConnector(ssl=ssl_context) 1156 1157 return aiohttp.ClientSession(headers=headers, timeout=timeout, connector=connector) 1158 1159 1160 class SilentTaskGroup(TaskGroup): 1161 1162 def spawn(self, *args, **kwargs): 1163 # don't complain if group is already closed. 1164 if self._closed: 1165 raise asyncio.CancelledError() 1166 return super().spawn(*args, **kwargs) 1167 1168 1169 class NetworkJobOnDefaultServer(Logger, ABC): 1170 """An abstract base class for a job that runs on the main network 1171 interface. Every time the main interface changes, the job is 1172 restarted, and some of its internals are reset. 1173 """ 1174 def __init__(self, network: 'Network'): 1175 Logger.__init__(self) 1176 asyncio.set_event_loop(network.asyncio_loop) 1177 self.network = network 1178 self.interface = None # type: Interface 1179 self._restart_lock = asyncio.Lock() 1180 # Ensure fairness between NetworkJobs. e.g. if multiple wallets 1181 # are open, a large wallet's Synchronizer should not starve the small wallets: 1182 self._network_request_semaphore = asyncio.Semaphore(100) 1183 1184 self._reset() 1185 # every time the main interface changes, restart: 1186 register_callback(self._restart, ['default_server_changed']) 1187 # also schedule a one-off restart now, as there might already be a main interface: 1188 asyncio.run_coroutine_threadsafe(self._restart(), network.asyncio_loop) 1189 1190 def _reset(self): 1191 """Initialise fields. Called every time the underlying 1192 server connection changes. 1193 """ 1194 self.taskgroup = SilentTaskGroup() 1195 1196 async def _start(self, interface: 'Interface'): 1197 self.interface = interface 1198 await interface.taskgroup.spawn(self._run_tasks(taskgroup=self.taskgroup)) 1199 1200 @abstractmethod 1201 async def _run_tasks(self, *, taskgroup: TaskGroup) -> None: 1202 """Start tasks in taskgroup. Called every time the underlying 1203 server connection changes. 1204 """ 1205 # If self.taskgroup changed, don't start tasks. This can happen if we have 1206 # been restarted *just now*, i.e. after the _run_tasks coroutine object was created. 1207 if taskgroup != self.taskgroup: 1208 raise asyncio.CancelledError() 1209 1210 async def stop(self, *, full_shutdown: bool = True): 1211 if full_shutdown: 1212 unregister_callback(self._restart) 1213 await self.taskgroup.cancel_remaining() 1214 1215 @log_exceptions 1216 async def _restart(self, *args): 1217 interface = self.network.interface 1218 if interface is None: 1219 return # we should get called again soon 1220 1221 async with self._restart_lock: 1222 await self.stop(full_shutdown=False) 1223 self._reset() 1224 await self._start(interface) 1225 1226 @property 1227 def session(self): 1228 # ORIG: s = self.interface.session 1229 # TODO: libbitcoin 1230 s = self.interface.client 1231 assert s is not None 1232 return s 1233 1234 1235 def create_and_start_event_loop() -> Tuple[asyncio.AbstractEventLoop, 1236 asyncio.Future, 1237 threading.Thread]: 1238 def on_exception(loop, context): 1239 """Suppress spurious messages it appears we cannot control.""" 1240 SUPPRESS_MESSAGE_REGEX = re.compile('SSL handshake|Fatal read error on|' 1241 'SSL error in data received') 1242 message = context.get('message') 1243 if message and SUPPRESS_MESSAGE_REGEX.match(message): 1244 return 1245 loop.default_exception_handler(context) 1246 1247 loop = asyncio.get_event_loop() 1248 loop.set_exception_handler(on_exception) 1249 # loop.set_debug(1) 1250 stopping_fut = asyncio.Future() 1251 loop_thread = threading.Thread(target=loop.run_until_complete, 1252 args=(stopping_fut,), 1253 name='EventLoop') 1254 loop_thread.start() 1255 loop._mythread = loop_thread 1256 return loop, stopping_fut, loop_thread 1257 1258 1259 class OrderedDictWithIndex(OrderedDict): 1260 """An OrderedDict that keeps track of the positions of keys. 1261 1262 Note: very inefficient to modify contents, except to add new items. 1263 """ 1264 1265 def __init__(self): 1266 super().__init__() 1267 self._key_to_pos = {} 1268 self._pos_to_key = {} 1269 1270 def _recalc_index(self): 1271 self._key_to_pos = {key: pos for (pos, key) in enumerate(self.keys())} 1272 self._pos_to_key = {pos: key for (pos, key) in enumerate(self.keys())} 1273 1274 def pos_from_key(self, key): 1275 return self._key_to_pos[key] 1276 1277 def value_from_pos(self, pos): 1278 key = self._pos_to_key[pos] 1279 return self[key] 1280 1281 def popitem(self, *args, **kwargs): 1282 ret = super().popitem(*args, **kwargs) 1283 self._recalc_index() 1284 return ret 1285 1286 def move_to_end(self, *args, **kwargs): 1287 ret = super().move_to_end(*args, **kwargs) 1288 self._recalc_index() 1289 return ret 1290 1291 def clear(self): 1292 ret = super().clear() 1293 self._recalc_index() 1294 return ret 1295 1296 def pop(self, *args, **kwargs): 1297 ret = super().pop(*args, **kwargs) 1298 self._recalc_index() 1299 return ret 1300 1301 def update(self, *args, **kwargs): 1302 ret = super().update(*args, **kwargs) 1303 self._recalc_index() 1304 return ret 1305 1306 def __delitem__(self, *args, **kwargs): 1307 ret = super().__delitem__(*args, **kwargs) 1308 self._recalc_index() 1309 return ret 1310 1311 def __setitem__(self, key, *args, **kwargs): 1312 is_new_key = key not in self 1313 ret = super().__setitem__(key, *args, **kwargs) 1314 if is_new_key: 1315 pos = len(self) - 1 1316 self._key_to_pos[key] = pos 1317 self._pos_to_key[pos] = key 1318 return ret 1319 1320 1321 def multisig_type(wallet_type): 1322 '''If wallet_type is mofn multi-sig, return [m, n], 1323 otherwise return None.''' 1324 if not wallet_type: 1325 return None 1326 match = re.match(r'(\d+)of(\d+)', wallet_type) 1327 if match: 1328 match = [int(x) for x in match.group(1, 2)] 1329 return match 1330 1331 1332 def is_ip_address(x: Union[str, bytes]) -> bool: 1333 if isinstance(x, bytes): 1334 x = x.decode("utf-8") 1335 try: 1336 ipaddress.ip_address(x) 1337 return True 1338 except ValueError: 1339 return False 1340 1341 1342 def is_private_netaddress(host: str) -> bool: 1343 if str(host) in ('localhost', 'localhost.',): 1344 return True 1345 if host[0] == '[' and host[-1] == ']': # IPv6 1346 host = host[1:-1] 1347 try: 1348 ip_addr = ipaddress.ip_address(host) # type: Union[IPv4Address, IPv6Address] 1349 return ip_addr.is_private 1350 except ValueError: 1351 pass # not an IP 1352 return False 1353 1354 1355 def list_enabled_bits(x: int) -> Sequence[int]: 1356 """e.g. 77 (0b1001101) --> (0, 2, 3, 6)""" 1357 binary = bin(x)[2:] 1358 rev_bin = reversed(binary) 1359 return tuple(i for i, b in enumerate(rev_bin) if b == '1') 1360 1361 1362 def resolve_dns_srv(host: str): 1363 srv_records = dns.resolver.resolve(host, 'SRV') 1364 # priority: prefer lower 1365 # weight: tie breaker; prefer higher 1366 srv_records = sorted(srv_records, key=lambda x: (x.priority, -x.weight)) 1367 1368 def dict_from_srv_record(srv): 1369 return { 1370 'host': str(srv.target), 1371 'port': srv.port, 1372 } 1373 return [dict_from_srv_record(srv) for srv in srv_records] 1374 1375 1376 def randrange(bound: int) -> int: 1377 """Return a random integer k such that 1 <= k < bound, uniformly 1378 distributed across that range.""" 1379 # secrets.randbelow(bound) returns a random int: 0 <= r < bound, 1380 # hence transformations: 1381 return secrets.randbelow(bound - 1) + 1 1382 1383 1384 class CallbackManager: 1385 # callbacks set by the GUI or any thread 1386 # guarantee: the callbacks will always get triggered from the asyncio thread. 1387 1388 def __init__(self): 1389 self.callback_lock = threading.Lock() 1390 self.callbacks = defaultdict(list) # note: needs self.callback_lock 1391 self.asyncio_loop = None 1392 1393 def register_callback(self, callback, events): 1394 with self.callback_lock: 1395 for event in events: 1396 self.callbacks[event].append(callback) 1397 1398 def unregister_callback(self, callback): 1399 with self.callback_lock: 1400 for callbacks in self.callbacks.values(): 1401 if callback in callbacks: 1402 callbacks.remove(callback) 1403 1404 def trigger_callback(self, event, *args): 1405 """Trigger a callback with given arguments. 1406 Can be called from any thread. The callback itself will get scheduled 1407 on the event loop. 1408 """ 1409 if self.asyncio_loop is None: 1410 self.asyncio_loop = asyncio.get_event_loop() 1411 assert self.asyncio_loop.is_running(), "event loop not running" 1412 with self.callback_lock: 1413 callbacks = self.callbacks[event][:] 1414 for callback in callbacks: 1415 # FIXME: if callback throws, we will lose the traceback 1416 if asyncio.iscoroutinefunction(callback): 1417 asyncio.run_coroutine_threadsafe(callback(event, *args), self.asyncio_loop) 1418 else: 1419 self.asyncio_loop.call_soon_threadsafe(callback, event, *args) 1420 1421 1422 callback_mgr = CallbackManager() 1423 trigger_callback = callback_mgr.trigger_callback 1424 register_callback = callback_mgr.register_callback 1425 unregister_callback = callback_mgr.unregister_callback 1426 1427 1428 _NetAddrType = TypeVar("_NetAddrType") 1429 1430 1431 class NetworkRetryManager(Generic[_NetAddrType]): 1432 """Truncated Exponential Backoff for network connections.""" 1433 1434 def __init__( 1435 self, *, 1436 max_retry_delay_normal: float, 1437 init_retry_delay_normal: float, 1438 max_retry_delay_urgent: float = None, 1439 init_retry_delay_urgent: float = None, 1440 ): 1441 self._last_tried_addr = {} # type: Dict[_NetAddrType, Tuple[float, int]] # (unix ts, num_attempts) 1442 1443 # note: these all use "seconds" as unit 1444 if max_retry_delay_urgent is None: 1445 max_retry_delay_urgent = max_retry_delay_normal 1446 if init_retry_delay_urgent is None: 1447 init_retry_delay_urgent = init_retry_delay_normal 1448 self._max_retry_delay_normal = max_retry_delay_normal 1449 self._init_retry_delay_normal = init_retry_delay_normal 1450 self._max_retry_delay_urgent = max_retry_delay_urgent 1451 self._init_retry_delay_urgent = init_retry_delay_urgent 1452 1453 def _trying_addr_now(self, addr: _NetAddrType) -> None: 1454 last_time, num_attempts = self._last_tried_addr.get(addr, (0, 0)) 1455 # we add up to 1 second of noise to the time, so that clients are less likely 1456 # to get synchronised and bombard the remote in connection waves: 1457 cur_time = time.time() + random.random() 1458 self._last_tried_addr[addr] = cur_time, num_attempts + 1 1459 1460 def _on_connection_successfully_established(self, addr: _NetAddrType) -> None: 1461 self._last_tried_addr[addr] = time.time(), 0 1462 1463 def _can_retry_addr(self, addr: _NetAddrType, *, 1464 now: float = None, urgent: bool = False) -> bool: 1465 if now is None: 1466 now = time.time() 1467 last_time, num_attempts = self._last_tried_addr.get(addr, (0, 0)) 1468 if urgent: 1469 max_delay = self._max_retry_delay_urgent 1470 init_delay = self._init_retry_delay_urgent 1471 else: 1472 max_delay = self._max_retry_delay_normal 1473 init_delay = self._init_retry_delay_normal 1474 delay = self.__calc_delay(multiplier=init_delay, max_delay=max_delay, num_attempts=num_attempts) 1475 next_time = last_time + delay 1476 return next_time < now 1477 1478 @classmethod 1479 def __calc_delay(cls, *, multiplier: float, max_delay: float, 1480 num_attempts: int) -> float: 1481 num_attempts = min(num_attempts, 100_000) 1482 try: 1483 res = multiplier * 2 ** num_attempts 1484 except OverflowError: 1485 return max_delay 1486 return max(0, min(max_delay, res)) 1487 1488 def _clear_addr_retry_times(self) -> None: 1489 self._last_tried_addr.clear() 1490 1491 1492 class MySocksProxy(aiorpcx.SOCKSProxy): 1493 1494 async def open_connection(self, host=None, port=None, **kwargs): 1495 loop = asyncio.get_event_loop() 1496 reader = asyncio.StreamReader(loop=loop) 1497 protocol = asyncio.StreamReaderProtocol(reader, loop=loop) 1498 transport, _ = await self.create_connection( 1499 lambda: protocol, host, port, **kwargs) 1500 writer = asyncio.StreamWriter(transport, protocol, reader, loop) 1501 return reader, writer 1502 1503 @classmethod 1504 def from_proxy_dict(cls, proxy: dict = None) -> Optional['MySocksProxy']: 1505 if not proxy: 1506 return None 1507 username, pw = proxy.get('user'), proxy.get('password') 1508 if not username or not pw: 1509 auth = None 1510 else: 1511 auth = aiorpcx.socks.SOCKSUserAuth(username, pw) 1512 addr = aiorpcx.NetAddress(proxy['host'], proxy['port']) 1513 if proxy['mode'] == "socks4": 1514 ret = cls(addr, aiorpcx.socks.SOCKS4a, auth) 1515 elif proxy['mode'] == "socks5": 1516 ret = cls(addr, aiorpcx.socks.SOCKS5, auth) 1517 else: 1518 raise NotImplementedError # http proxy not available with aiorpcx 1519 return ret 1520 1521 1522 class JsonRPCClient: 1523 1524 def __init__(self, session: aiohttp.ClientSession, url: str): 1525 self.session = session 1526 self.url = url 1527 self._id = 0 1528 1529 async def request(self, endpoint, *args): 1530 # TODO: libbitcoin 1531 self._id += 1 1532 data = ('{"jsonrpc": "2.0", "id":"%d", "method": "%s", "params": %s }' 1533 % (self._id, endpoint, json.dumps(args))) 1534 async with self.session.post(self.url, data=data) as resp: 1535 if resp.status == 200: 1536 r = await resp.json() 1537 result = r.get('result') 1538 error = r.get('error') 1539 if error: 1540 return 'Error: ' + str(error) 1541 else: 1542 return result 1543 else: 1544 text = await resp.text() 1545 return 'Error: ' + str(text) 1546 1547 def add_method(self, endpoint): 1548 async def coro(*args): 1549 return await self.request(endpoint, *args) 1550 setattr(self, endpoint, coro) 1551 1552 1553 T = TypeVar('T') 1554 1555 def random_shuffled_copy(x: Iterable[T]) -> List[T]: 1556 """Returns a shuffled copy of the input.""" 1557 x_copy = list(x) # copy 1558 random.shuffle(x_copy) # shuffle in-place 1559 return x_copy 1560 1561 1562 def test_read_write_permissions(path) -> None: 1563 # note: There might already be a file at 'path'. 1564 # Make sure we do NOT overwrite/corrupt that! 1565 temp_path = "%s.tmptest.%s" % (path, os.getpid()) 1566 echo = "fs r/w test" 1567 try: 1568 # test READ permissions for actual path 1569 if os.path.exists(path): 1570 with open(path, "rb") as f: 1571 f.read(1) # read 1 byte 1572 # test R/W sanity for "similar" path 1573 with open(temp_path, "w", encoding='utf-8') as f: 1574 f.write(echo) 1575 with open(temp_path, "r", encoding='utf-8') as f: 1576 echo2 = f.read() 1577 os.remove(temp_path) 1578 except Exception as e: 1579 raise IOError(e) from e 1580 if echo != echo2: 1581 raise IOError('echo sanity-check failed') 1582 1583 1584 class nullcontext: 1585 """Context manager that does no additional processing. 1586 This is a ~backport of contextlib.nullcontext from Python 3.10 1587 """ 1588 1589 def __init__(self, enter_result=None): 1590 self.enter_result = enter_result 1591 1592 def __enter__(self): 1593 return self.enter_result 1594 1595 def __exit__(self, *excinfo): 1596 pass 1597 1598 async def __aenter__(self): 1599 return self.enter_result 1600 1601 async def __aexit__(self, *excinfo): 1602 pass