electrum

Electrum Bitcoin wallet
git clone https://git.parazyd.org/electrum
Log | Files | Refs | Submodules

commit 9ffd2de4923848f18e26b890fc0b77050894ab34
parent 19d4bd4837ae1ebb59ad9ea325897e43e8cc12c6
Author: SomberNight <somber.night@protonmail.com>
Date:   Tue, 11 Sep 2018 20:52:58 +0200

Merge branch 'aiorpcx'

Diffstat:
Mcontrib/deterministic-build/requirements.txt | 76++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mcontrib/requirements/requirements.txt | 4+++-
Melectrum/__init__.py | 2+-
Melectrum/address_synchronizer.py | 48++++++++++++++++++++++++++++++++++++------------
Melectrum/base_crash_reporter.py | 18++++++++++++------
Melectrum/blockchain.py | 33++++++++++++++++++++++++++-------
Melectrum/commands.py | 2+-
Melectrum/constants.py | 11+++++++++--
Melectrum/daemon.py | 6+++---
Melectrum/exchange_rate.py | 225+++++++++++++++++++++++++++++++++++++++++++------------------------------------
Melectrum/gui/kivy/main_window.py | 27+++++++++++++++------------
Melectrum/gui/kivy/uix/dialogs/crash_reporter.py | 9++++++---
Melectrum/gui/kivy/uix/dialogs/settings.py | 10+++++++---
Melectrum/gui/kivy/uix/ui_screens/proxy.kv | 7++++---
Melectrum/gui/kivy/uix/ui_screens/server.kv | 7+++----
Melectrum/gui/qt/__init__.py | 2+-
Melectrum/gui/qt/exception_window.py | 6++++--
Melectrum/gui/qt/main_window.py | 6+++---
Melectrum/gui/qt/network_dialog.py | 36++++++++++++++++++++----------------
Melectrum/gui/stdio.py | 4++--
Melectrum/gui/text.py | 18+++++++++++-------
Melectrum/interface.py | 749+++++++++++++++++++++++++++++++++++++++++++++++--------------------------------
Melectrum/network.py | 980+++++++++++++++++++------------------------------------------------------------
Melectrum/plugins/labels/labels.py | 85++++++++++++++++++++++++++++++++++++++-----------------------------------------
Melectrum/plugins/labels/qt.py | 4++--
Melectrum/plugins/trustedcoin/trustedcoin.py | 63++++++++++++++++++++++++++++++++++++++-------------------------
Melectrum/synchronizer.py | 237++++++++++++++++++++++++++++++++++---------------------------------------------
Delectrum/tests/test_interface.py | 26--------------------------
Aelectrum/tests/test_network.py | 119+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Melectrum/util.py | 54+++++++++++++++++++++++++++++++++++++++++++++++++++---
Melectrum/verifier.py | 45+++++++++++++++++++--------------------------
Melectrum/version.py | 2+-
Mrun_electrum | 3++-
33 files changed, 1423 insertions(+), 1501 deletions(-)

diff --git a/contrib/deterministic-build/requirements.txt b/contrib/deterministic-build/requirements.txt @@ -1,3 +1,37 @@ +aiohttp==3.4.4 \ + --hash=sha256:0419705a36b43c0ac6f15469f9c2a08cad5c939d78bd12a5c23ea167c8253b2b \ + --hash=sha256:1812fc4bc6ac1bde007daa05d2d0f61199324e0cc893b11523e646595047ca08 \ + --hash=sha256:2214b5c0153f45256d5d52d1e0cafe53f9905ed035a142191727a5fb620c03dd \ + --hash=sha256:275909137f0c92c61ba6bb1af856a522d5546f1de8ea01e4e726321c697754ac \ + --hash=sha256:3983611922b561868428ea1e7269e757803713f55b53502423decc509fef1650 \ + --hash=sha256:51afec6ffa50a9da4cdef188971a802beb1ca8e8edb40fa429e5e529db3475fa \ + --hash=sha256:589f2ec8a101a0f340453ee6945bdfea8e1cd84c8d88e5be08716c34c0799d95 \ + --hash=sha256:789820ddc65e1f5e71516adaca2e9022498fa5a837c79ba9c692a9f8f916c330 \ + --hash=sha256:7a968a0bdaaf9abacc260911775611c9a602214a23aeb846f2eb2eeaa350c4dc \ + --hash=sha256:7aeefbed253f59ea39e70c5848de42ed85cb941165357fc7e87ab5d8f1f9592b \ + --hash=sha256:7b2eb55c66512405103485bd7d285a839d53e7fdc261ab20e5bcc51d7aaff5de \ + --hash=sha256:87bc95d3d333bb689c8d755b4a9d7095a2356108002149523dfc8e607d5d32a4 \ + --hash=sha256:9d80e40db208e29168d3723d1440ecbb06054d349c5ece6a2c5a611490830dd7 \ + --hash=sha256:a1b442195c2a77d33e4dbee67c9877ccbdd3a1f686f91eb479a9577ed8cc326b \ + --hash=sha256:ab3d769413b322d6092f169f316f7b21cd261a7589f7e31db779d5731b0480d8 \ + --hash=sha256:b066d3dec5d0f5aee6e34e5765095dc3d6d78ef9839640141a2b20816a0642bd \ + --hash=sha256:b24e7845ae8de3e388ef4bcfcf7f96b05f52c8e633b33cf8003a6b1d726fc7c2 \ + --hash=sha256:c59a953c3f8524a7c86eaeaef5bf702555be12f5668f6384149fe4bb75c52698 \ + --hash=sha256:cf2cc6c2c10d242790412bea7ccf73726a9a44b4c4b073d2699ef3b48971fd95 \ + --hash=sha256:e0c9c8d4150ae904f308ff27b35446990d2b1dfc944702a21925937e937394c6 \ + --hash=sha256:f1839db4c2b08a9c8f9788112644f8a8557e8e0ecc77b07091afabb941dc55d0 \ + --hash=sha256:f3df52362be39908f9c028a65490fae0475e4898b43a03d8aa29d1e765b45e07 +aiohttp_socks==0.1.6 \ + --hash=sha256:943148a3797ba9ffb6df6ddb006ffdd40538885b410589d589bda42a8e8bcd5a +aiorpcX==0.7.3 \ + --hash=sha256:24dd4fe2f65f743cb74c8626570470e325bb777bb66d1932e7d2965ae71d1164 \ + --hash=sha256:5120ca40beef6b6a45d3a7055e343815401385dc607da2fd93baca2762c8a97d +async_timeout==3.0.0 \ + --hash=sha256:474d4bc64cee20603e225eb1ece15e248962958b45a3648a9f5cc29e827a610c \ + --hash=sha256:b3c0ddc416736619bd4a95ca31de8da6920c3b9a140c64dbef2b2fa7bf521287 +attrs==18.2.0 \ + --hash=sha256:10cbf6e27dbce8c30807caf056c8eb50917e0eaafe86347671b57254006c3e69 \ + --hash=sha256:ca4be454458f9dec299268d472aaa5a11f67a4ff70093396e1ceae9c76cf4bbb certifi==2018.8.24 \ --hash=sha256:376690d6f16d32f9d1fe8932551d80b23e9d393a8578c5633a2ed39a64861638 \ --hash=sha256:456048c7e371c089d0a77a5212fb37a2c2dce1e24146e3b7e0261736aaeaa22a @@ -13,9 +47,41 @@ ecdsa==0.13 \ idna==2.7 \ --hash=sha256:156a6814fb5ac1fc6850fb002e0852d56c0c8d2531923a51032d1b70760e186e \ --hash=sha256:684a38a6f903c1d71d6d5fac066b58d7768af4de2b832e426ec79c30daa94a16 +idna_ssl==1.1.0 \ + --hash=sha256:a933e3bb13da54383f9e8f35dc4f9cb9eb9b3b78c6b36f311254d6d0d92c6c7c jsonrpclib-pelix==0.3.1 \ --hash=sha256:5417b1508d5a50ec64f6e5b88907f111155d52607b218ff3ba9a777afb2e49e3 \ --hash=sha256:bd89a6093bc4d47dc8a096197aacb827359944a4533be5193f3845f57b9f91b4 +multidict==4.4.0 \ + --hash=sha256:112eeeddd226af681dc82b756ed34aa7b6d98f9c4a15760050298c21d715473d \ + --hash=sha256:13b64ecb692effcabc5e29569ba9b5eb69c35112f990a16d6833ec3a9d9f8ec0 \ + --hash=sha256:1725373fb8f18c2166f8e0e5789851ccf98453c849b403945fa4ef59a16ca44e \ + --hash=sha256:2061a50b7cae60a1f987503a995b2fc38e47027a937a355a124306ed9c629041 \ + --hash=sha256:35b062288a9a478f627c520fd27983160fc97591017d170f966805b428d17e07 \ + --hash=sha256:467b134bcc227b91b8e2ef8d2931f28b50bf7eb7a04c0403d102ded22e66dbfc \ + --hash=sha256:475a3ece8bb450e49385414ebfae7f8fdb33f62f1ac0c12935c1cfb1b7c1076a \ + --hash=sha256:49b885287e227a24545a1126d9ac17ae43138610713dc6219b781cc0ad5c6dfc \ + --hash=sha256:4c95b2725592adb5c46642be2875c1234c32af841732c5504c17726b92082021 \ + --hash=sha256:4ea7ed00f4be0f7335c9a2713a65ac3d986be789ce5ebc10821da9664cbe6b85 \ + --hash=sha256:5e2d5e1d999e941b4a626aea46bdc4206877cf727107fdaa9d46a8a773a6e49b \ + --hash=sha256:8039c520ef7bb9ec7c3db3df14c570be6362f43c200ae9854d2422d4ffe175a4 \ + --hash=sha256:81459a0ebcca09c1fcb8fe887ed13cf267d9b60fe33718fc5fd1a2a1ab49470a \ + --hash=sha256:847c3b7b9ca3268e883685dc1347a4d09f84de7bd7597310044d847590447492 \ + --hash=sha256:8551d1db45f0ca4e8ec99130767009a29a4e0dc6558a4a6808491bcd3472d325 \ + --hash=sha256:8fa7679ffe615e0c1c7b80946ab4194669be74848719adf2d7867b5e861eb073 \ + --hash=sha256:a42a36f09f0f907579ff0fde547f2fde8a739a69efe4a2728835979d2bb5e17b \ + --hash=sha256:a5fcad0070685c5b2d04b468bf5f4c735f5c176432f495ad055fcc4bc0a79b23 \ + --hash=sha256:ae22195b2a7494619b73c01129ddcddc0dfaa9e42727404b1d9a77253da3f420 \ + --hash=sha256:b360e82bdbbd862e1ce2a41cc3bbd0ab614350e813ca74801b34aac0f73465aa \ + --hash=sha256:b96417899344c5e96bef757f4963a72d02e52653a4e0f99bbea3a531cedac59f \ + --hash=sha256:b9e921140b797093edfc13ac08dc2a4fd016dd711dc42bb0e1aaf180e48425a7 \ + --hash=sha256:c5022b94fc330e6d177f3eb38097fb52c7df96ca0e04842c068cf0d9fc38b1e6 \ + --hash=sha256:cf2b117f2a8d951638efc7592fb72d3eeb2d38cc2194c26ba7f00e7190451d92 \ + --hash=sha256:d79620b542d9d0e23ae9790ca2fe44f1af40ffad9936efa37bd14954bc3e2818 \ + --hash=sha256:e2860691c11d10dac7c91bddae44f6211b3da4122d9a2ebb509c2247674d6070 \ + --hash=sha256:e3a293553715afecf7e10ea02da40593f9d7f48fe48a74fc5dd3ce08a0c46188 \ + --hash=sha256:e465be3fe7e992e5a6e16731afa6f41cb6ca53afccb4f28ea2fa6457783edf15 \ + --hash=sha256:e6d27895ef922bc859d969452f247bfbe5345d9aba69b9c8dbe1ea7704f0c5d9 pip==18.0 \ --hash=sha256:070e4bf493c7c2c9f6a08dd797dd3c066d64074c38e9e8a0fb4e6541f266d96c \ --hash=sha256:a0e11645ee37c90b40c46d607070c4fd583e2cd46231b1c06e389c5e814eed76 @@ -64,6 +130,16 @@ urllib3==1.23 \ wheel==0.31.1 \ --hash=sha256:0a2e54558a0628f2145d2fc822137e322412115173e8a2ddbe1c9024338ae83c \ --hash=sha256:80044e51ec5bbf6c894ba0bc48d26a8c20a9ba629f4ca19ea26ecfcf87685f5f +yarl==1.2.6 \ + --hash=sha256:2556b779125621b311844a072e0ed367e8409a18fa12cbd68eb1258d187820f9 \ + --hash=sha256:4aec0769f1799a9d4496827292c02a7b1f75c0bab56ab2b60dd94ebb57cbd5ee \ + --hash=sha256:55369d95afaacf2fa6b49c84d18b51f1704a6560c432a0f9a1aeb23f7b971308 \ + --hash=sha256:6c098b85442c8fe3303e708bbb775afd0f6b29f77612e8892627bcab4b939357 \ + --hash=sha256:9182cd6f93412d32e009020a44d6d170d2093646464a88aeec2aef50592f8c78 \ + --hash=sha256:c8cbc21bbfa1dd7d5386d48cc814fe3d35b80f60299cdde9279046f399c3b0d8 \ + --hash=sha256:db6f70a4b09cde813a4807843abaaa60f3b15fb4a2a06f9ae9c311472662daa1 \ + --hash=sha256:f17495e6fe3d377e3faac68121caef6f974fcb9e046bc075bcff40d8e5cc69a4 \ + --hash=sha256:f85900b9cca0c67767bb61b2b9bd53208aaa7373dae633dbe25d179b4bf38aa7 colorama==0.3.9 \ --hash=sha256:463f8483208e921368c9f306094eb6f725c6ca42b0f97e313cb5d5512459feda \ --hash=sha256:48eb22f4f8461b1df5734a074b57042430fb06e1d61bd1e11b078c0fe6d7a1f1 diff --git a/contrib/requirements/requirements.txt b/contrib/requirements/requirements.txt @@ -5,6 +5,8 @@ qrcode protobuf dnspython jsonrpclib-pelix -PySocks>=1.6.6 qdarkstyle<3.0 typing>=3.0.0 +aiorpcx>=0.7.1 +aiohttp +aiohttp_socks diff --git a/electrum/__init__.py b/electrum/__init__.py @@ -4,7 +4,7 @@ from .wallet import Wallet from .storage import WalletStorage from .coinchooser import COIN_CHOOSERS from .network import Network, pick_random_server -from .interface import Connection, Interface +from .interface import Interface from .simple_config import SimpleConfig, get_config, set_config from . import bitcoin from . import transaction diff --git a/electrum/address_synchronizer.py b/electrum/address_synchronizer.py @@ -22,12 +22,13 @@ # SOFTWARE. import threading +import asyncio import itertools from collections import defaultdict from . import bitcoin from .bitcoin import COINBASE_MATURITY, TYPE_ADDRESS, TYPE_PUBKEY -from .util import PrintError, profiler, bfh, VerifiedTxInfo, TxMinedStatus +from .util import PrintError, profiler, bfh, VerifiedTxInfo, TxMinedStatus, aiosafe, CustomTaskGroup from .transaction import Transaction, TxOutput from .synchronizer import Synchronizer from .verifier import SPV @@ -58,6 +59,8 @@ class AddressSynchronizer(PrintError): # verifier (SPV) and synchronizer are started in start_threads self.synchronizer = None self.verifier = None + self.sync_restart_lock = asyncio.Lock() + self.group = None # locks: if you need to take multiple ones, acquire them in the order they are defined here! self.lock = threading.RLock() self.transaction_lock = threading.RLock() @@ -134,24 +137,45 @@ class AddressSynchronizer(PrintError): # add it in case it was previously unconfirmed self.add_unverified_tx(tx_hash, tx_height) - def start_threads(self, network): + @aiosafe + async def on_default_server_changed(self, event): + async with self.sync_restart_lock: + self.stop_threads() + await self._start_threads() + + def start_network(self, network): self.network = network if self.network is not None: - self.verifier = SPV(self.network, self) - self.synchronizer = Synchronizer(self, network) - network.add_jobs([self.verifier, self.synchronizer]) - else: - self.verifier = None - self.synchronizer = None + self.network.register_callback(self.on_default_server_changed, ['default_server_changed']) + asyncio.run_coroutine_threadsafe(self._start_threads(), network.asyncio_loop) + + async def _start_threads(self): + interface = self.network.interface + if interface is None: + return # we should get called again soon + + self.verifier = SPV(self.network, self) + self.synchronizer = synchronizer = Synchronizer(self) + assert self.group is None, 'group already exists' + self.group = CustomTaskGroup() + + async def job(): + async with self.group as group: + await group.spawn(self.verifier.main(group)) + await group.spawn(self.synchronizer.send_subscriptions(group)) + await group.spawn(self.synchronizer.handle_status(group)) + await group.spawn(self.synchronizer.main()) + # we are being cancelled now + interface.session.unsubscribe(synchronizer.status_queue) + await interface.group.spawn(job) def stop_threads(self): if self.network: - self.network.remove_jobs([self.synchronizer, self.verifier]) - self.synchronizer.release() self.synchronizer = None self.verifier = None - # Now no references to the synchronizer or verifier - # remain so they will be GC-ed + if self.group: + asyncio.run_coroutine_threadsafe(self.group.cancel_remaining(), self.network.asyncio_loop) + self.group = None self.storage.put('stored_height', self.get_local_height()) self.save_transactions() self.save_verified_tx() diff --git a/electrum/base_crash_reporter.py b/electrum/base_crash_reporter.py @@ -19,6 +19,7 @@ # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. +import asyncio import json import locale import traceback @@ -26,14 +27,13 @@ import subprocess import sys import os -import requests - from .version import ELECTRUM_VERSION from .import constants from .i18n import _ +from .util import make_aiohttp_session -class BaseCrashReporter(object): +class BaseCrashReporter: report_server = "https://crashhub.electrum.org" config_key = "show_crash_reporter" issue_template = """<h2>Traceback</h2> @@ -60,16 +60,22 @@ class BaseCrashReporter(object): def __init__(self, exctype, value, tb): self.exc_args = (exctype, value, tb) - def send_report(self, endpoint="/crash"): + def send_report(self, asyncio_loop, proxy, endpoint="/crash"): if constants.net.GENESIS[-4:] not in ["4943", "e26f"] and ".electrum.org" in BaseCrashReporter.report_server: # Gah! Some kind of altcoin wants to send us crash reports. raise Exception(_("Missing report URL.")) report = self.get_traceback_info() report.update(self.get_additional_info()) report = json.dumps(report) - response = requests.post(BaseCrashReporter.report_server + endpoint, data=report) + coro = self.do_post(proxy, BaseCrashReporter.report_server + endpoint, data=report) + response = asyncio.run_coroutine_threadsafe(coro, asyncio_loop).result(5) return response + async def do_post(self, proxy, url, data): + async with make_aiohttp_session(proxy) as session: + async with session.post(url, data=data) as resp: + return await resp.text() + def get_traceback_info(self): exc_string = str(self.exc_args[1]) stack = traceback.extract_tb(self.exc_args[2]) @@ -125,4 +131,4 @@ class BaseCrashReporter(object): raise NotImplementedError def get_os_version(self): - raise NotImplementedError + raise NotImplementedError diff --git a/electrum/blockchain.py b/electrum/blockchain.py @@ -28,6 +28,7 @@ from .bitcoin import Hash, hash_encode, int_to_hex, rev_hex from . import constants from .util import bfh, bh2u + MAX_TARGET = 0x00000000FFFF0000000000000000000000000000000000000000000000000000 @@ -146,7 +147,10 @@ class Blockchain(util.PrintError): def check_header(self, header): header_hash = hash_header(header) height = header.get('block_height') - return header_hash == self.get_hash(height) + try: + return header_hash == self.get_hash(height) + except MissingHeader: + return False def fork(parent, header): forkpoint = header.get('block_height') @@ -166,8 +170,10 @@ class Blockchain(util.PrintError): p = self.path() self._size = os.path.getsize(p)//80 if os.path.exists(p) else 0 - def verify_header(self, header, prev_hash, target): + def verify_header(self, header, prev_hash, target, expected_header_hash=None): _hash = hash_header(header) + if expected_header_hash and expected_header_hash != _hash: + raise Exception("hash mismatches with expected: {} vs {}".format(expected_header_hash, _hash)) if prev_hash != header.get('prev_block_hash'): raise Exception("prev hash mismatch: %s vs %s" % (prev_hash, header.get('prev_block_hash'))) if constants.net.TESTNET: @@ -180,12 +186,18 @@ class Blockchain(util.PrintError): def verify_chunk(self, index, data): num = len(data) // 80 - prev_hash = self.get_hash(index * 2016 - 1) + start_height = index * 2016 + prev_hash = self.get_hash(start_height - 1) target = self.get_target(index-1) for i in range(num): + height = start_height + i + try: + expected_header_hash = self.get_hash(height) + except MissingHeader: + expected_header_hash = None raw_header = data[i*80:(i+1) * 80] header = deserialize_header(raw_header, index*2016 + i) - self.verify_header(header, prev_hash, target) + self.verify_header(header, prev_hash, target, expected_header_hash) prev_hash = hash_header(header) def path(self): @@ -303,17 +315,24 @@ class Blockchain(util.PrintError): return deserialize_header(h, height) def get_hash(self, height): + def is_height_checkpoint(): + within_cp_range = height <= constants.net.max_checkpoint() + at_chunk_boundary = (height+1) % 2016 == 0 + return within_cp_range and at_chunk_boundary + if height == -1: return '0000000000000000000000000000000000000000000000000000000000000000' elif height == 0: return constants.net.GENESIS - elif height < len(self.checkpoints) * 2016: - assert (height+1) % 2016 == 0, height + elif is_height_checkpoint(): index = height // 2016 h, t = self.checkpoints[index] return h else: - return hash_header(self.read_header(height)) + header = self.read_header(height) + if header is None: + raise MissingHeader(height) + return hash_header(header) def get_target(self, index): # compute target from chunk x, used in chunk x+1 diff --git a/electrum/commands.py b/electrum/commands.py @@ -255,7 +255,7 @@ class Commands: def broadcast(self, tx): """Broadcast a transaction to the network. """ tx = Transaction(tx) - return self.network.broadcast_transaction(tx) + return self.network.broadcast_transaction_from_non_network_thread(tx) @command('') def createmultisig(self, num, pubkeys): diff --git a/electrum/constants.py b/electrum/constants.py @@ -37,7 +37,14 @@ def read_json(filename, default): return r -class BitcoinMainnet: +class AbstractNet: + + @classmethod + def max_checkpoint(cls) -> int: + return max(0, len(cls.CHECKPOINTS) * 2016 - 1) + + +class BitcoinMainnet(AbstractNet): TESTNET = False WIF_PREFIX = 0x80 @@ -66,7 +73,7 @@ class BitcoinMainnet: BIP44_COIN_TYPE = 0 -class BitcoinTestnet: +class BitcoinTestnet(AbstractNet): TESTNET = True WIF_PREFIX = 0xef diff --git a/electrum/daemon.py b/electrum/daemon.py @@ -22,6 +22,7 @@ # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. +import asyncio import ast import os import time @@ -126,10 +127,9 @@ class Daemon(DaemonThread): self.network = None else: self.network = Network(config) - self.network.start() self.fx = FxThread(config, self.network) if self.network: - self.network.add_jobs([self.fx]) + self.network.start(self.fx.run()) self.gui = None self.wallets = {} # Setup JSONRPC server @@ -243,7 +243,7 @@ class Daemon(DaemonThread): if storage.get_action(): return wallet = Wallet(storage) - wallet.start_threads(self.network) + wallet.start_network(self.network) self.wallets[path] = wallet return wallet diff --git a/electrum/exchange_rate.py b/electrum/exchange_rate.py @@ -1,19 +1,23 @@ +import asyncio +import aiohttp +from aiohttp_socks import SocksConnector, SocksVer from datetime import datetime import inspect -import requests import sys import os import json -from threading import Thread import time import csv import decimal from decimal import Decimal +import concurrent.futures +import traceback from .bitcoin import COIN from .i18n import _ -from .util import PrintError, ThreadJob, make_dir - +from .util import PrintError, ThreadJob, make_dir, aiosafe +from .util import make_aiohttp_session +from .network import Network # See https://en.wikipedia.org/wiki/ISO_4217 CCY_PRECISIONS = {'BHD': 3, 'BIF': 0, 'BYR': 0, 'CLF': 4, 'CLP': 0, @@ -23,7 +27,6 @@ CCY_PRECISIONS = {'BHD': 3, 'BIF': 0, 'BYR': 0, 'CLF': 4, 'CLP': 0, 'RWF': 0, 'TND': 3, 'UGX': 0, 'UYI': 0, 'VND': 0, 'VUV': 0, 'XAF': 0, 'XAU': 4, 'XOF': 0, 'XPF': 0} - class ExchangeBase(PrintError): def __init__(self, on_quotes, on_history): @@ -32,34 +35,41 @@ class ExchangeBase(PrintError): self.on_quotes = on_quotes self.on_history = on_history - def get_json(self, site, get_string): + async def get_raw(self, site, get_string): # APIs must have https url = ''.join(['https://', site, get_string]) - response = requests.request('GET', url, headers={'User-Agent' : 'Electrum'}, timeout=10) - return response.json() + async with make_aiohttp_session(Network.get_instance().proxy) as session: + async with session.get(url) as response: + return await response.text() - def get_csv(self, site, get_string): + async def get_json(self, site, get_string): + # APIs must have https url = ''.join(['https://', site, get_string]) - response = requests.request('GET', url, headers={'User-Agent' : 'Electrum'}) - reader = csv.DictReader(response.content.decode().split('\n')) + async with make_aiohttp_session(Network.get_instance().proxy) as session: + async with session.get(url) as response: + return await response.json() + + async def get_csv(self, site, get_string): + raw = await self.get_raw(site, get_string) + reader = csv.DictReader(raw.split('\n')) return list(reader) def name(self): return self.__class__.__name__ - def update_safe(self, ccy): + @aiosafe + async def update_safe(self, ccy): try: self.print_error("getting fx quotes for", ccy) - self.quotes = self.get_rates(ccy) + self.quotes = await self.get_rates(ccy) self.print_error("received fx quotes") except BaseException as e: self.print_error("failed fx quotes:", e) + self.quotes = {} self.on_quotes() def update(self, ccy): - t = Thread(target=self.update_safe, args=(ccy,)) - t.setDaemon(True) - t.start() + asyncio.get_event_loop().create_task(self.update_safe(ccy)) def read_historical_rates(self, ccy, cache_dir): filename = os.path.join(cache_dir, self.name() + '_'+ ccy) @@ -78,13 +88,15 @@ class ExchangeBase(PrintError): self.on_history() return h - def get_historical_rates_safe(self, ccy, cache_dir): + @aiosafe + async def get_historical_rates_safe(self, ccy, cache_dir): try: self.print_error("requesting fx history for", ccy) - h = self.request_history(ccy) + h = await self.request_history(ccy) self.print_error("received fx history for", ccy) except BaseException as e: self.print_error("failed fx history:", e) + #traceback.print_exc() return filename = os.path.join(cache_dir, self.name() + '_' + ccy) with open(filename, 'w', encoding='utf-8') as f: @@ -100,9 +112,7 @@ class ExchangeBase(PrintError): if h is None: h = self.read_historical_rates(ccy, cache_dir) if h is None or h['timestamp'] < time.time() - 24*3600: - t = Thread(target=self.get_historical_rates_safe, args=(ccy, cache_dir)) - t.setDaemon(True) - t.start() + asyncio.get_event_loop().create_task(self.get_historical_rates_safe(ccy, cache_dir)) def history_ccys(self): return [] @@ -116,8 +126,8 @@ class ExchangeBase(PrintError): class BitcoinAverage(ExchangeBase): - def get_rates(self, ccy): - json = self.get_json('apiv2.bitcoinaverage.com', '/indices/global/ticker/short') + async def get_rates(self, ccy): + json = await self.get_json('apiv2.bitcoinaverage.com', '/indices/global/ticker/short') return dict([(r.replace("BTC", ""), Decimal(json[r]['last'])) for r in json if r != 'timestamp']) @@ -126,8 +136,8 @@ class BitcoinAverage(ExchangeBase): 'MXN', 'NOK', 'NZD', 'PLN', 'RON', 'RUB', 'SEK', 'SGD', 'USD', 'ZAR'] - def request_history(self, ccy): - history = self.get_csv('apiv2.bitcoinaverage.com', + async def request_history(self, ccy): + history = await self.get_csv('apiv2.bitcoinaverage.com', "/indices/global/history/BTC%s?period=alltime&format=csv" % ccy) return dict([(h['DateTime'][:10], h['Average']) for h in history]) @@ -135,8 +145,8 @@ class BitcoinAverage(ExchangeBase): class Bitcointoyou(ExchangeBase): - def get_rates(self, ccy): - json = self.get_json('bitcointoyou.com', "/API/ticker.aspx") + async def get_rates(self, ccy): + json = await self.get_json('bitcointoyou.com', "/API/ticker.aspx") return {'BRL': Decimal(json['ticker']['last'])} def history_ccys(self): @@ -145,8 +155,8 @@ class Bitcointoyou(ExchangeBase): class BitcoinVenezuela(ExchangeBase): - def get_rates(self, ccy): - json = self.get_json('api.bitcoinvenezuela.com', '/') + async def get_rates(self, ccy): + json = await self.get_json('api.bitcoinvenezuela.com', '/') rates = [(r, json['BTC'][r]) for r in json['BTC'] if json['BTC'][r] is not None] # Giving NULL for LTC return dict(rates) @@ -154,85 +164,86 @@ class BitcoinVenezuela(ExchangeBase): def history_ccys(self): return ['ARS', 'EUR', 'USD', 'VEF'] - def request_history(self, ccy): - return self.get_json('api.bitcoinvenezuela.com', - "/historical/index.php?coin=BTC")[ccy +'_BTC'] + async def request_history(self, ccy): + json = await self.get_json('api.bitcoinvenezuela.com', + "/historical/index.php?coin=BTC") + return json[ccy +'_BTC'] class Bitbank(ExchangeBase): - def get_rates(self, ccy): - json = self.get_json('public.bitbank.cc', '/btc_jpy/ticker') + async def get_rates(self, ccy): + json = await self.get_json('public.bitbank.cc', '/btc_jpy/ticker') return {'JPY': Decimal(json['data']['last'])} class BitFlyer(ExchangeBase): - def get_rates(self, ccy): - json = self.get_json('bitflyer.jp', '/api/echo/price') + async def get_rates(self, ccy): + json = await self.get_json('bitflyer.jp', '/api/echo/price') return {'JPY': Decimal(json['mid'])} class Bitmarket(ExchangeBase): - def get_rates(self, ccy): - json = self.get_json('www.bitmarket.pl', '/json/BTCPLN/ticker.json') + async def get_rates(self, ccy): + json = await self.get_json('www.bitmarket.pl', '/json/BTCPLN/ticker.json') return {'PLN': Decimal(json['last'])} class BitPay(ExchangeBase): - def get_rates(self, ccy): - json = self.get_json('bitpay.com', '/api/rates') + async def get_rates(self, ccy): + json = await self.get_json('bitpay.com', '/api/rates') return dict([(r['code'], Decimal(r['rate'])) for r in json]) class Bitso(ExchangeBase): - def get_rates(self, ccy): - json = self.get_json('api.bitso.com', '/v2/ticker') + async def get_rates(self, ccy): + json = await self.get_json('api.bitso.com', '/v2/ticker') return {'MXN': Decimal(json['last'])} class BitStamp(ExchangeBase): - def get_rates(self, ccy): - json = self.get_json('www.bitstamp.net', '/api/ticker/') + async def get_rates(self, ccy): + json = await self.get_json('www.bitstamp.net', '/api/ticker/') return {'USD': Decimal(json['last'])} class Bitvalor(ExchangeBase): - def get_rates(self,ccy): - json = self.get_json('api.bitvalor.com', '/v1/ticker.json') + async def get_rates(self,ccy): + json = await self.get_json('api.bitvalor.com', '/v1/ticker.json') return {'BRL': Decimal(json['ticker_1h']['total']['last'])} class BlockchainInfo(ExchangeBase): - def get_rates(self, ccy): - json = self.get_json('blockchain.info', '/ticker') + async def get_rates(self, ccy): + json = await self.get_json('blockchain.info', '/ticker') return dict([(r, Decimal(json[r]['15m'])) for r in json]) class BTCChina(ExchangeBase): - def get_rates(self, ccy): - json = self.get_json('data.btcchina.com', '/data/ticker') + async def get_rates(self, ccy): + json = await self.get_json('data.btcchina.com', '/data/ticker') return {'CNY': Decimal(json['ticker']['last'])} class BTCParalelo(ExchangeBase): - def get_rates(self, ccy): - json = self.get_json('btcparalelo.com', '/api/price') + async def get_rates(self, ccy): + json = await self.get_json('btcparalelo.com', '/api/price') return {'VEF': Decimal(json['price'])} class Coinbase(ExchangeBase): - def get_rates(self, ccy): - json = self.get_json('coinbase.com', + async def get_rates(self, ccy): + json = await self.get_json('coinbase.com', '/api/v1/currencies/exchange_rates') return dict([(r[7:].upper(), Decimal(json[r])) for r in json if r.startswith('btc_to_')]) @@ -240,13 +251,13 @@ class Coinbase(ExchangeBase): class CoinDesk(ExchangeBase): - def get_currencies(self): - dicts = self.get_json('api.coindesk.com', + async def get_currencies(self): + dicts = await self.get_json('api.coindesk.com', '/v1/bpi/supported-currencies.json') return [d['currency'] for d in dicts] - def get_rates(self, ccy): - json = self.get_json('api.coindesk.com', + async def get_rates(self, ccy): + json = await self.get_json('api.coindesk.com', '/v1/bpi/currentprice/%s.json' % ccy) result = {ccy: Decimal(json['bpi'][ccy]['rate_float'])} return result @@ -257,35 +268,35 @@ class CoinDesk(ExchangeBase): def history_ccys(self): return self.history_starts().keys() - def request_history(self, ccy): + async def request_history(self, ccy): start = self.history_starts()[ccy] end = datetime.today().strftime('%Y-%m-%d') # Note ?currency and ?index don't work as documented. Sigh. query = ('/v1/bpi/historical/close.json?start=%s&end=%s' % (start, end)) - json = self.get_json('api.coindesk.com', query) + json = await self.get_json('api.coindesk.com', query) return json['bpi'] class Coinsecure(ExchangeBase): - def get_rates(self, ccy): - json = self.get_json('api.coinsecure.in', '/v0/noauth/newticker') + async def get_rates(self, ccy): + json = await self.get_json('api.coinsecure.in', '/v0/noauth/newticker') return {'INR': Decimal(json['lastprice'] / 100.0 )} class Foxbit(ExchangeBase): - def get_rates(self,ccy): - json = self.get_json('api.bitvalor.com', '/v1/ticker.json') + async def get_rates(self,ccy): + json = await self.get_json('api.bitvalor.com', '/v1/ticker.json') return {'BRL': Decimal(json['ticker_1h']['exchanges']['FOX']['last'])} class itBit(ExchangeBase): - def get_rates(self, ccy): + async def get_rates(self, ccy): ccys = ['USD', 'EUR', 'SGD'] - json = self.get_json('api.itbit.com', '/v1/markets/XBT%s/ticker' % ccy) + json = await self.get_json('api.itbit.com', '/v1/markets/XBT%s/ticker' % ccy) result = dict.fromkeys(ccys) if ccy in ccys: result[ccy] = Decimal(json['lastPrice']) @@ -294,10 +305,10 @@ class itBit(ExchangeBase): class Kraken(ExchangeBase): - def get_rates(self, ccy): + async def get_rates(self, ccy): ccys = ['EUR', 'USD', 'CAD', 'GBP', 'JPY'] pairs = ['XBT%s' % c for c in ccys] - json = self.get_json('api.kraken.com', + json = await self.get_json('api.kraken.com', '/0/public/Ticker?pair=%s' % ','.join(pairs)) return dict((k[-3:], Decimal(float(v['c'][0]))) for k, v in json['result'].items()) @@ -305,45 +316,45 @@ class Kraken(ExchangeBase): class LocalBitcoins(ExchangeBase): - def get_rates(self, ccy): - json = self.get_json('localbitcoins.com', + async def get_rates(self, ccy): + json = await self.get_json('localbitcoins.com', '/bitcoinaverage/ticker-all-currencies/') return dict([(r, Decimal(json[r]['rates']['last'])) for r in json]) class MercadoBitcoin(ExchangeBase): - def get_rates(self, ccy): - json = self.get_json('api.bitvalor.com', '/v1/ticker.json') + async def get_rates(self, ccy): + json = await self.get_json('api.bitvalor.com', '/v1/ticker.json') return {'BRL': Decimal(json['ticker_1h']['exchanges']['MBT']['last'])} class NegocieCoins(ExchangeBase): - def get_rates(self,ccy): - json = self.get_json('api.bitvalor.com', '/v1/ticker.json') + async def get_rates(self,ccy): + json = await self.get_json('api.bitvalor.com', '/v1/ticker.json') return {'BRL': Decimal(json['ticker_1h']['exchanges']['NEG']['last'])} class TheRockTrading(ExchangeBase): - def get_rates(self, ccy): - json = self.get_json('api.therocktrading.com', + async def get_rates(self, ccy): + json = await self.get_json('api.therocktrading.com', '/v1/funds/BTCEUR/ticker') return {'EUR': Decimal(json['last'])} class Unocoin(ExchangeBase): - def get_rates(self, ccy): - json = self.get_json('www.unocoin.com', 'trade?buy') + async def get_rates(self, ccy): + json = await self.get_json('www.unocoin.com', 'trade?buy') return {'INR': Decimal(json)} class WEX(ExchangeBase): - def get_rates(self, ccy): - json_eur = self.get_json('wex.nz', '/api/3/ticker/btc_eur') - json_rub = self.get_json('wex.nz', '/api/3/ticker/btc_rur') - json_usd = self.get_json('wex.nz', '/api/3/ticker/btc_usd') + async def get_rates(self, ccy): + json_eur = await self.get_json('wex.nz', '/api/3/ticker/btc_eur') + json_rub = await self.get_json('wex.nz', '/api/3/ticker/btc_rur') + json_usd = await self.get_json('wex.nz', '/api/3/ticker/btc_usd') return {'EUR': Decimal(json_eur['btc_eur']['last']), 'RUB': Decimal(json_rub['btc_rur']['last']), 'USD': Decimal(json_usd['btc_usd']['last'])} @@ -351,15 +362,15 @@ class WEX(ExchangeBase): class Winkdex(ExchangeBase): - def get_rates(self, ccy): - json = self.get_json('winkdex.com', '/api/v0/price') + async def get_rates(self, ccy): + json = await self.get_json('winkdex.com', '/api/v0/price') return {'USD': Decimal(json['price'] / 100.0)} def history_ccys(self): return ['USD'] - def request_history(self, ccy): - json = self.get_json('winkdex.com', + async def request_history(self, ccy): + json = await self.get_json('winkdex.com', "/api/v0/series?start_time=1342915200") history = json['series'][0]['results'] return dict([(h['timestamp'][:10], h['price'] / 100.0) @@ -367,8 +378,8 @@ class Winkdex(ExchangeBase): class Zaif(ExchangeBase): - def get_rates(self, ccy): - json = self.get_json('api.zaif.jp', '/api/1/last_price/btc_jpy') + async def get_rates(self, ccy): + json = await self.get_json('api.zaif.jp', '/api/1/last_price/btc_jpy') return {'JPY': Decimal(json['last_price'])} @@ -381,7 +392,6 @@ def dictinvert(d): return inv def get_exchanges_and_currencies(): - import os, json path = os.path.join(os.path.dirname(__file__), 'currencies.json') try: with open(path, 'r', encoding='utf-8') as f: @@ -426,14 +436,21 @@ class FxThread(ThreadJob): def __init__(self, config, network): self.config = config self.network = network + if self.network: + self.network.register_callback(self.set_proxy, ['proxy_set']) self.ccy = self.get_currency() self.history_used_spot = False self.ccy_combo = None self.hist_checkbox = None self.cache_dir = os.path.join(config.path, 'cache') + self.trigger = asyncio.Event() + self.trigger.set() self.set_exchange(self.config_exchange()) make_dir(self.cache_dir) + def set_proxy(self, trigger_name, *args): + self.trigger.set() + def get_currencies(self, h): d = get_exchanges_by_ccy(h) return sorted(d.keys()) @@ -451,20 +468,26 @@ class FxThread(ThreadJob): rounded_amount = amount return fmt_str.format(rounded_amount) - def run(self): - # This runs from the plugins thread which catches exceptions - if self.is_enabled(): - if self.timeout ==0 and self.show_history(): - self.exchange.get_historical_rates(self.ccy, self.cache_dir) - if self.timeout <= time.time(): - self.timeout = time.time() + 150 + async def run(self): + while True: + try: + await asyncio.wait_for(self.trigger.wait(), 150) + except concurrent.futures.TimeoutError: + pass + else: + self.trigger.clear() + if self.is_enabled(): + if self.show_history(): + self.exchange.get_historical_rates(self.ccy, self.cache_dir) + if self.is_enabled(): self.exchange.update(self.ccy) def is_enabled(self): return bool(self.config.get('use_exchange_rate')) def set_enabled(self, b): - return self.config.set_key('use_exchange_rate', bool(b)) + self.config.set_key('use_exchange_rate', bool(b)) + self.trigger.set() def get_history_config(self): return bool(self.config.get('history_rates')) @@ -497,7 +520,7 @@ class FxThread(ThreadJob): def set_currency(self, ccy): self.ccy = ccy self.config.set_key('currency', ccy, True) - self.timeout = 0 # Because self.ccy changes + self.trigger.set() # Because self.ccy changes self.on_quotes() def set_exchange(self, name): @@ -508,7 +531,7 @@ class FxThread(ThreadJob): self.exchange = class_(self.on_quotes, self.on_history) # A new exchange means new fx quotes, initially empty. Force # a quote refresh - self.timeout = 0 + self.trigger.set() self.exchange.read_historical_rates(self.ccy, self.cache_dir) def on_quotes(self): @@ -519,8 +542,8 @@ class FxThread(ThreadJob): if self.network: self.network.trigger_callback('on_history') - def exchange_rate(self): - '''Returns None, or the exchange rate as a Decimal''' + def exchange_rate(self) -> Decimal: + """Returns the exchange rate as a Decimal""" rate = self.exchange.quotes.get(self.ccy) if rate is None: return Decimal('NaN') diff --git a/electrum/gui/kivy/main_window.py b/electrum/gui/kivy/main_window.py @@ -15,6 +15,7 @@ from electrum.util import profiler, InvalidPassword from electrum.plugin import run_hook from electrum.util import format_satoshis, format_satoshis_plain from electrum.paymentrequest import PR_UNPAID, PR_PAID, PR_UNKNOWN, PR_EXPIRED +from electrum import blockchain from .i18n import _ from kivy.app import App @@ -93,8 +94,9 @@ class ElectrumWindow(App): auto_connect = BooleanProperty(False) def on_auto_connect(self, instance, x): - host, port, protocol, proxy, auto_connect = self.network.get_parameters() - self.network.set_parameters(host, port, protocol, proxy, self.auto_connect) + net_params = self.network.get_parameters() + net_params = net_params._replace(auto_connect=self.auto_connect) + self.network.set_parameters(net_params) def toggle_auto_connect(self, x): self.auto_connect = not self.auto_connect @@ -114,10 +116,10 @@ class ElectrumWindow(App): from .uix.dialogs.choice_dialog import ChoiceDialog chains = self.network.get_blockchains() def cb(name): - for index, b in self.network.blockchains.items(): + for index, b in blockchain.blockchains.items(): if name == b.get_name(): self.network.follow_chain(index) - names = [self.network.blockchains[b].get_name() for b in chains] + names = [blockchain.blockchains[b].get_name() for b in chains] if len(names) > 1: cur_chain = self.network.blockchain().get_name() ChoiceDialog(_('Choose your chain'), names, cur_chain, cb).open() @@ -154,6 +156,7 @@ class ElectrumWindow(App): def on_quotes(self, d): Logger.info("on_quotes") + self._trigger_update_status() self._trigger_update_history() def on_history(self, d): @@ -266,11 +269,11 @@ class ElectrumWindow(App): if self.network: self.num_blocks = self.network.get_local_height() self.num_nodes = len(self.network.get_interfaces()) - host, port, protocol, proxy_config, auto_connect = self.network.get_parameters() - self.server_host = host - self.server_port = port - self.auto_connect = auto_connect - self.proxy_config = proxy_config if proxy_config else {} + net_params = self.network.get_parameters() + self.server_host = net_params.host + self.server_port = net_params.port + self.auto_connect = net_params.auto_connect + self.proxy_config = net_params.proxy if net_params.proxy else {} self.plugins = kwargs.get('plugins', []) self.gui_object = kwargs.get('gui_object', None) @@ -509,7 +512,7 @@ class ElectrumWindow(App): def on_wizard_complete(self, wizard, wallet): if wallet: # wizard returned a wallet - wallet.start_threads(self.daemon.network) + wallet.start_network(self.daemon.network) self.daemon.add_wallet(wallet) self.load_wallet(wallet) elif not self.wallet: @@ -696,7 +699,7 @@ class ElectrumWindow(App): if not self.wallet: self.status = _("No Wallet") return - if self.network is None or not self.network.is_running(): + if self.network is None or not self.network.is_connected(): status = _("Offline") elif self.network.is_connected(): server_height = self.network.get_server_height() @@ -877,7 +880,7 @@ class ElectrumWindow(App): Clock.schedule_once(lambda dt: on_success(tx)) def _broadcast_thread(self, tx, on_complete): - ok, txid = self.network.broadcast_transaction(tx) + ok, txid = self.network.broadcast_transaction_from_non_network_thread(tx) Clock.schedule_once(lambda dt: on_complete(ok, txid)) def broadcast(self, tx, pr=None): diff --git a/electrum/gui/kivy/uix/dialogs/crash_reporter.py b/electrum/gui/kivy/uix/dialogs/crash_reporter.py @@ -1,6 +1,7 @@ import sys +import json -import requests +from aiohttp.client_exceptions import ClientError from kivy import base, utils from kivy.clock import Clock from kivy.core.window import Window @@ -115,8 +116,10 @@ class CrashReporter(BaseCrashReporter, Factory.Popup): def send_report(self): try: - response = BaseCrashReporter.send_report(self, "/crash.json").json() - except requests.exceptions.RequestException: + loop = self.main_window.network.asyncio_loop + proxy = self.main_window.network.proxy + response = json.loads(BaseCrashReporter.send_report(self, loop, proxy, "/crash.json")) + except (ValueError, ClientError): self.show_popup(_('Unable to send report'), _("Please check your network connection.")) else: self.show_popup(_('Report sent'), response["text"]) diff --git a/electrum/gui/kivy/uix/dialogs/settings.py b/electrum/gui/kivy/uix/dialogs/settings.py @@ -154,13 +154,16 @@ class SettingsDialog(Factory.Popup): self._coinselect_dialog.open() def proxy_status(self): - server, port, protocol, proxy, auto_connect = self.app.network.get_parameters() + net_params = self.app.network.get_parameters() + proxy = net_params.proxy return proxy.get('host') +':' + proxy.get('port') if proxy else _('None') def proxy_dialog(self, item, dt): if self._proxy_dialog is None: - server, port, protocol, proxy, auto_connect = self.app.network.get_parameters() + net_params = self.app.network.get_parameters() + proxy = net_params.proxy def callback(popup): + nonlocal net_params if popup.ids.mode.text != 'None': proxy = { 'mode':popup.ids.mode.text, @@ -171,7 +174,8 @@ class SettingsDialog(Factory.Popup): } else: proxy = None - self.app.network.set_parameters(server, port, protocol, proxy, auto_connect) + net_params = net_params._replace(proxy=proxy) + self.app.network.set_parameters(net_params) item.status = self.proxy_status() popup = Builder.load_file('electrum/gui/kivy/uix/ui_screens/proxy.kv') popup.ids.mode.text = proxy.get('mode') if proxy else 'None' diff --git a/electrum/gui/kivy/uix/ui_screens/proxy.kv b/electrum/gui/kivy/uix/ui_screens/proxy.kv @@ -14,7 +14,7 @@ Popup: height: '48dp' size_hint_y: None text: app.proxy_config.get('mode', 'none') - values: ['none', 'socks4', 'socks5', 'http'] + values: ['none', 'socks4', 'socks5'] Label: text: _('Host') TextInput: @@ -63,7 +63,7 @@ Popup: height: '48dp' text: _('OK') on_release: - host, port, protocol, proxy, auto_connect = app.network.get_parameters() + net_params = app.network.get_parameters() proxy = {} proxy['mode']=str(root.ids.mode.text).lower() proxy['host']=str(root.ids.host.text) @@ -71,6 +71,7 @@ Popup: proxy['user']=str(root.ids.user.text) proxy['password']=str(root.ids.password.text) if proxy['mode']=='none': proxy = None - app.network.set_parameters(host, port, protocol, proxy, auto_connect) + net_params = net_params._replace(proxy=proxy) + app.network.set_parameters(net_params) app.proxy_config = proxy if proxy else {} nd.dismiss() diff --git a/electrum/gui/kivy/uix/ui_screens/server.kv b/electrum/gui/kivy/uix/ui_screens/server.kv @@ -56,8 +56,7 @@ Popup: height: '48dp' text: _('OK') on_release: - host, port, protocol, proxy, auto_connect = app.network.get_parameters() - host = str(root.ids.host.text) - port = str(root.ids.port.text) - app.network.set_parameters(host, port, protocol, proxy, auto_connect) + net_params = app.network.get_parameters() + net_params = net_params._replace(host=str(root.ids.host.text), port=str(root.ids.port.text)) + app.network.set_parameters(net_params) nd.dismiss() diff --git a/electrum/gui/qt/__init__.py b/electrum/gui/qt/__init__.py @@ -236,7 +236,7 @@ class ElectrumGui: if not self.daemon.get_wallet(wallet.storage.path): # wallet was not in memory - wallet.start_threads(self.daemon.network) + wallet.start_network(self.daemon.network) self.daemon.add_wallet(wallet) try: for w in self.windows: diff --git a/electrum/gui/qt/exception_window.py b/electrum/gui/qt/exception_window.py @@ -41,6 +41,7 @@ class Exception_Window(BaseCrashReporter, QWidget, MessageBoxMixin): def __init__(self, main_window, exctype, value, tb): BaseCrashReporter.__init__(self, exctype, value, tb) self.main_window = main_window + QWidget.__init__(self) self.setWindowTitle('Electrum - ' + _('An Error Occurred')) self.setMinimumSize(600, 300) @@ -90,14 +91,15 @@ class Exception_Window(BaseCrashReporter, QWidget, MessageBoxMixin): def send_report(self): try: - response = BaseCrashReporter.send_report(self) + proxy = self.main_window.network.proxy + response = BaseCrashReporter.send_report(self, self.main_window.network.asyncio_loop, proxy) except BaseException as e: traceback.print_exc(file=sys.stderr) self.main_window.show_critical(_('There was a problem with the automatic reporting:') + '\n' + str(e) + '\n' + _("Please report this issue manually.")) return - QMessageBox.about(self, _("Crash report"), response.text) + QMessageBox.about(self, _("Crash report"), response) self.close() def on_close(self): diff --git a/electrum/gui/qt/main_window.py b/electrum/gui/qt/main_window.py @@ -563,7 +563,7 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, PrintError): def donate_to_server(self): d = self.network.get_donation_address() if d: - host = self.network.get_parameters()[0] + host = self.network.get_parameters().host self.pay_to_URI('bitcoin:%s?message=donation for %s'%(d, host)) else: self.show_error(_('No donation address for this server')) @@ -723,7 +723,7 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, PrintError): if not self.wallet: return - if self.network is None or not self.network.is_running(): + if self.network is None: text = _("Offline") icon = QIcon(":icons/status_disconnected.png") @@ -1627,7 +1627,7 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, PrintError): if pr and pr.has_expired(): self.payment_request = None return False, _("Payment request has expired") - status, msg = self.network.broadcast_transaction(tx) + status, msg = self.network.broadcast_transaction_from_non_network_thread(tx) if pr and status is True: self.invoices.set_paid(pr, tx.txid()) self.invoices.save() diff --git a/electrum/gui/qt/network_dialog.py b/electrum/gui/qt/network_dialog.py @@ -31,9 +31,9 @@ from PyQt5.QtWidgets import * import PyQt5.QtCore as QtCore from electrum.i18n import _ -from electrum import constants +from electrum import constants, blockchain from electrum.util import print_error -from electrum.network import serialize_server, deserialize_server +from electrum.interface import serialize_server, deserialize_server from .util import * @@ -103,7 +103,7 @@ class NodesListWidget(QTreeWidget): chains = network.get_blockchains() n_chains = len(chains) for k, items in chains.items(): - b = network.blockchains[k] + b = blockchain.blockchains[k] name = b.get_name() if n_chains >1: x = QTreeWidgetItem([name + '@%d'%b.get_forkpoint(), '%d'%b.height()]) @@ -239,7 +239,7 @@ class NetworkChoiceLayout(object): self.proxy_cb.clicked.connect(self.set_proxy) self.proxy_mode = QComboBox() - self.proxy_mode.addItems(['SOCKS4', 'SOCKS5', 'HTTP']) + self.proxy_mode.addItems(['SOCKS4', 'SOCKS5']) self.proxy_host = QLineEdit() self.proxy_host.setFixedWidth(200) self.proxy_port = QLineEdit() @@ -335,9 +335,11 @@ class NetworkChoiceLayout(object): w.setEnabled(False) def update(self): - host, port, protocol, proxy_config, auto_connect = self.network.get_parameters() + net_params = self.network.get_parameters() + host, port, protocol = net_params.host, net_params.port, net_params.protocol + proxy_config, auto_connect = net_params.proxy, net_params.auto_connect self.server_host.setText(host) - self.server_port.setText(port) + self.server_port.setText(str(port)) self.autoconnect_cb.setChecked(auto_connect) interface = self.network.interface @@ -368,7 +370,7 @@ class NetworkChoiceLayout(object): self.nodes_list_widget.update(self.network) def fill_in_proxy_settings(self): - host, port, protocol, proxy_config, auto_connect = self.network.get_parameters() + proxy_config = self.network.get_parameters().proxy if not proxy_config: proxy_config = {"mode": "none", "host": "localhost", "port": "9050"} @@ -409,9 +411,10 @@ class NetworkChoiceLayout(object): def follow_server(self, server): self.network.switch_to_interface(server) - host, port, protocol, proxy, auto_connect = self.network.get_parameters() + net_params = self.network.get_parameters() host, port, protocol = deserialize_server(server) - self.network.set_parameters(host, port, protocol, proxy, auto_connect) + net_params = net_params._replace(host=host, port=port, protocol=protocol) + self.network.set_parameters(net_params) self.update() def server_changed(self, x): @@ -440,14 +443,14 @@ class NetworkChoiceLayout(object): pass def set_server(self): - host, port, protocol, proxy, auto_connect = self.network.get_parameters() - host = str(self.server_host.text()) - port = str(self.server_port.text()) - auto_connect = self.autoconnect_cb.isChecked() - self.network.set_parameters(host, port, protocol, proxy, auto_connect) + net_params = self.network.get_parameters() + net_params = net_params._replace(host=str(self.server_host.text()), + port=str(self.server_port.text()), + auto_connect=self.autoconnect_cb.isChecked()) + self.network.set_parameters(net_params) def set_proxy(self): - host, port, protocol, proxy, auto_connect = self.network.get_parameters() + net_params = self.network.get_parameters() if self.proxy_cb.isChecked(): proxy = { 'mode':str(self.proxy_mode.currentText()).lower(), 'host':str(self.proxy_host.text()), @@ -457,7 +460,8 @@ class NetworkChoiceLayout(object): else: proxy = None self.tor_cb.setChecked(False) - self.network.set_parameters(host, port, protocol, proxy, auto_connect) + net_params = net_params._replace(proxy=proxy) + self.network.set_parameters(net_params) def suggest_proxy(self, found_proxy): self.tor_proxy = found_proxy diff --git a/electrum/gui/stdio.py b/electrum/gui/stdio.py @@ -34,7 +34,7 @@ class ElectrumGui: self.str_fee = "" self.wallet = Wallet(storage) - self.wallet.start_threads(self.network) + self.wallet.start_network(self.network) self.contacts = self.wallet.contacts self.network.register_callback(self.on_network, ['updated', 'banner']) @@ -200,7 +200,7 @@ class ElectrumGui: self.wallet.labels[tx.txid()] = self.str_description print(_("Please wait...")) - status, msg = self.network.broadcast_transaction(tx) + status, msg = self.network.broadcast_transaction_from_non_network_thread(tx) if status: print(_('Payment sent.')) diff --git a/electrum/gui/text.py b/electrum/gui/text.py @@ -7,7 +7,10 @@ import electrum from electrum.util import format_satoshis, set_verbosity from electrum.bitcoin import is_address, COIN, TYPE_ADDRESS from electrum.transaction import TxOutput -from .. import Wallet, WalletStorage +from electrum.wallet import Wallet +from electrum.storage import WalletStorage +from electrum.network import NetworkParameters +from electrum.interface import deserialize_server _ = lambda x:x @@ -27,7 +30,7 @@ class ElectrumGui: password = getpass.getpass('Password:', stream=None) storage.decrypt(password) self.wallet = Wallet(storage) - self.wallet.start_threads(self.network) + self.wallet.start_network(self.network) self.contacts = self.wallet.contacts locale.setlocale(locale.LC_ALL, '') @@ -351,7 +354,7 @@ class ElectrumGui: self.wallet.labels[tx.txid()] = self.str_description self.show_message(_("Please wait..."), getchar=False) - status, msg = self.network.broadcast_transaction(tx) + status, msg = self.network.broadcast_transaction_from_non_network_thread(tx) if status: self.show_message(_('Payment sent.')) @@ -376,8 +379,9 @@ class ElectrumGui: def network_dialog(self): if not self.network: return - params = self.network.get_parameters() - host, port, protocol, proxy_config, auto_connect = params + net_params = self.network.get_parameters() + host, port, protocol = net_params.host, net_params.port, net_params.protocol + proxy_config, auto_connect = net_params.proxy, net_params.auto_connect srv = 'auto-connect' if auto_connect else self.network.default_server out = self.run_dialog('Network', [ {'label':'server', 'type':'str', 'value':srv}, @@ -389,13 +393,13 @@ class ElectrumGui: auto_connect = server == 'auto-connect' if not auto_connect: try: - host, port, protocol = server.split(':') + host, port, protocol = deserialize_server(server) except Exception: self.show_message("Error:" + server + "\nIn doubt, type \"auto-connect\"") return False if out.get('server') or out.get('proxy'): proxy = electrum.network.deserialize_proxy(out.get('proxy')) if out.get('proxy') else proxy_config - self.network.set_parameters(host, port, protocol, proxy, auto_connect) + self.network.set_parameters(NetworkParameters(host, port, protocol, proxy, auto_connect)) def settings_dialog(self): fee = str(Decimal(self.config.fee_per_kb()) / COIN) diff --git a/electrum/interface.py b/electrum/interface.py @@ -24,346 +24,493 @@ # SOFTWARE. import os import re -import socket import ssl import sys -import threading -import time import traceback +import asyncio +from typing import Tuple, Union -import requests - -from .util import print_error - -ca_path = requests.certs.where() +import aiorpcx +from aiorpcx import ClientSession, Notification +from .util import PrintError, aiosafe, bfh, AIOSafeSilentException, CustomTaskGroup from . import util from . import x509 from . import pem +from .version import ELECTRUM_VERSION, PROTOCOL_VERSION +from . import blockchain +from . import constants + + +class NotificationSession(ClientSession): + + def __init__(self, *args, **kwargs): + super(NotificationSession, self).__init__(*args, **kwargs) + self.subscriptions = {} + self.cache = {} + + async def handle_request(self, request): + # note: if server sends malformed request and we raise, the superclass + # will catch the exception, count errors, and at some point disconnect + if isinstance(request, Notification): + params, result = request.args[:-1], request.args[-1] + key = self.get_index(request.method, params) + if key in self.subscriptions: + self.cache[key] = result + for queue in self.subscriptions[key]: + await queue.put(request.args) + else: + assert False, request.method + + async def send_request(self, *args, timeout=-1, **kwargs): + if timeout == -1: + timeout = 20 if not self.proxy else 30 + return await asyncio.wait_for( + super().send_request(*args, **kwargs), + timeout) + + async def subscribe(self, method, params, queue): + key = self.get_index(method, params) + if key in self.subscriptions: + self.subscriptions[key].append(queue) + result = self.cache[key] + else: + self.subscriptions[key] = [queue] + result = await self.send_request(method, params) + self.cache[key] = result + await queue.put(params + [result]) + def unsubscribe(self, queue): + """Unsubscribe a callback to free object references to enable GC.""" + # note: we can't unsubscribe from the server, so we keep receiving + # subsequent notifications + for v in self.subscriptions.values(): + if queue in v: + v.remove(queue) -def Connection(server, queue, config_path): - """Makes asynchronous connections to a remote Electrum server. - Returns the running thread that is making the connection. + @classmethod + def get_index(cls, method, params): + """Hashable index for subscriptions and cache""" + return str(method) + repr(params) - Once the thread has connected, it finishes, placing a tuple on the - queue of the form (server, socket), where socket is None if - connection failed. - """ - host, port, protocol = server.rsplit(':', 2) - if not protocol in 'st': - raise Exception('Unknown protocol: %s' % protocol) - c = TcpConnection(server, queue, config_path) - c.start() - return c +# FIXME this is often raised inside a TaskGroup, but then it's not silent :( +class GracefulDisconnect(AIOSafeSilentException): pass -class TcpConnection(threading.Thread, util.PrintError): - verbosity_filter = 'i' - def __init__(self, server, queue, config_path): - threading.Thread.__init__(self) - self.config_path = config_path - self.queue = queue - self.server = server - self.host, self.port, self.protocol = self.server.rsplit(':', 2) - self.host = str(self.host) - self.port = int(self.port) - self.use_ssl = (self.protocol == 's') - self.daemon = True - - def diagnostic_name(self): - return self.host +class ErrorParsingSSLCert(Exception): pass - def check_host_name(self, peercert, name): - """Simple certificate/host name checker. Returns True if the - certificate matches, False otherwise. Does not support - wildcards.""" - # Check that the peer has supplied a certificate. - # None/{} is not acceptable. - if not peercert: - return False - if 'subjectAltName' in peercert: - for typ, val in peercert["subjectAltName"]: - if typ == "DNS" and val == name: - return True - else: - # Only check the subject DN if there is no subject alternative - # name. - cn = None - for attr, val in peercert["subject"]: - # Use most-specific (last) commonName attribute. - if attr == "commonName": - cn = val - if cn is not None: - return cn == name - return False - - def get_simple_socket(self): - try: - l = socket.getaddrinfo(self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM) - except socket.gaierror: - self.print_error("cannot resolve hostname") - return - e = None - for res in l: - try: - s = socket.socket(res[0], socket.SOCK_STREAM) - s.settimeout(10) - s.connect(res[4]) - s.settimeout(2) - s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) - return s - except BaseException as _e: - e = _e - continue - else: - self.print_error("failed to connect", str(e)) - - @staticmethod - def get_ssl_context(cert_reqs, ca_certs): - context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=ca_certs) - context.check_hostname = False - context.verify_mode = cert_reqs - - context.options |= ssl.OP_NO_SSLv2 - context.options |= ssl.OP_NO_SSLv3 - context.options |= ssl.OP_NO_TLSv1 - - return context - - def get_socket(self): - if self.use_ssl: - cert_path = os.path.join(self.config_path, 'certs', self.host) - if not os.path.exists(cert_path): - is_new = True - s = self.get_simple_socket() - if s is None: - return - # try with CA first - try: - context = self.get_ssl_context(cert_reqs=ssl.CERT_REQUIRED, ca_certs=ca_path) - s = context.wrap_socket(s, do_handshake_on_connect=True) - except ssl.SSLError as e: - self.print_error(e) - except: - return - else: - try: - peer_cert = s.getpeercert() - except OSError: - return - if self.check_host_name(peer_cert, self.host): - self.print_error("SSL certificate signed by CA") - return s - # get server certificate. - # Do not use ssl.get_server_certificate because it does not work with proxy - s = self.get_simple_socket() - if s is None: - return - try: - context = self.get_ssl_context(cert_reqs=ssl.CERT_NONE, ca_certs=None) - s = context.wrap_socket(s) - except ssl.SSLError as e: - self.print_error("SSL error retrieving SSL certificate:", e) - return - except: - return - - try: - dercert = s.getpeercert(True) - except OSError: - return - s.close() - cert = ssl.DER_cert_to_PEM_cert(dercert) - # workaround android bug - cert = re.sub("([^\n])-----END CERTIFICATE-----","\\1\n-----END CERTIFICATE-----",cert) - temporary_path = cert_path + '.temp' - util.assert_datadir_available(self.config_path) - with open(temporary_path, "w", encoding='utf-8') as f: - f.write(cert) - f.flush() - os.fsync(f.fileno()) - else: - is_new = False - s = self.get_simple_socket() - if s is None: - return +class ErrorGettingSSLCertFromServer(Exception): pass - if self.use_ssl: - try: - context = self.get_ssl_context(cert_reqs=ssl.CERT_REQUIRED, - ca_certs=(temporary_path if is_new else cert_path)) - s = context.wrap_socket(s, do_handshake_on_connect=True) - except socket.timeout: - self.print_error('timeout') - return - except ssl.SSLError as e: - self.print_error("SSL error:", e) - if e.errno != 1: - return - if is_new: - rej = cert_path + '.rej' - if os.path.exists(rej): - os.unlink(rej) - os.rename(temporary_path, rej) - else: - util.assert_datadir_available(self.config_path) - with open(cert_path, encoding='utf-8') as f: - cert = f.read() - try: - b = pem.dePem(cert, 'CERTIFICATE') - x = x509.X509(b) - except: - traceback.print_exc(file=sys.stderr) - self.print_error("wrong certificate") - return - try: - x.check_date() - except: - self.print_error("certificate has expired:", cert_path) - os.unlink(cert_path) - return - self.print_error("wrong certificate") - if e.errno == 104: - return - return - except BaseException as e: - self.print_error(e) - traceback.print_exc(file=sys.stderr) - return - if is_new: - self.print_error("saving certificate") - os.rename(temporary_path, cert_path) - return s +def deserialize_server(server_str: str) -> Tuple[str, str, str]: + # host might be IPv6 address, hence do rsplit: + host, port, protocol = str(server_str).rsplit(':', 2) + if protocol not in ('s', 't'): + raise ValueError('invalid network protocol: {}'.format(protocol)) + int(port) # Throw if cannot be converted to int + if not (0 < int(port) < 2**16): + raise ValueError('port {} is out of valid range'.format(port)) + return host, port, protocol - def run(self): - socket = self.get_socket() - if socket: - self.print_error("connected") - self.queue.put((self.server, socket)) +def serialize_server(host: str, port: Union[str, int], protocol: str) -> str: + return str(':'.join([host, str(port), protocol])) -class Interface(util.PrintError): - """The Interface class handles a socket connected to a single remote - Electrum server. Its exposed API is: - - Member functions close(), fileno(), get_responses(), has_timed_out(), - ping_required(), queue_request(), send_requests() - - Member variable server. - """ +class Interface(PrintError): - def __init__(self, server, socket): + def __init__(self, network, server, config_path, proxy): + self.exception = None + self.ready = asyncio.Future() self.server = server - self.host, _, _ = server.rsplit(':', 2) - self.socket = socket - - self.pipe = util.SocketPipe(socket) - self.pipe.set_timeout(0.0) # Don't wait for data - # Dump network messages. Set at runtime from the console. - self.debug = False - self.unsent_requests = [] - self.unanswered_requests = {} - self.last_send = time.time() - self.closed_remotely = False + self.host, self.port, self.protocol = deserialize_server(self.server) + self.port = int(self.port) + self.config_path = config_path + self.cert_path = os.path.join(self.config_path, 'certs', self.host) + self.blockchain = None + self.network = network + + self.tip_header = None + self.tip = 0 + + # TODO combine? + self.fut = asyncio.get_event_loop().create_task(self.run()) + self.group = CustomTaskGroup() + + if proxy: + username, pw = proxy.get('user'), proxy.get('password') + if not username or not pw: + auth = None + else: + auth = aiorpcx.socks.SOCKSUserAuth(username, pw) + if proxy['mode'] == "socks4": + self.proxy = aiorpcx.socks.SOCKSProxy((proxy['host'], int(proxy['port'])), aiorpcx.socks.SOCKS4a, auth) + elif proxy['mode'] == "socks5": + self.proxy = aiorpcx.socks.SOCKSProxy((proxy['host'], int(proxy['port'])), aiorpcx.socks.SOCKS5, auth) + else: + raise NotImplementedError # http proxy not available with aiorpcx + else: + self.proxy = None def diagnostic_name(self): return self.host - def fileno(self): - # Needed for select - return self.socket.fileno() - - def close(self): - if not self.closed_remotely: - try: - self.socket.shutdown(socket.SHUT_RDWR) - except socket.error: - pass - self.socket.close() - - def queue_request(self, *args): # method, params, _id - '''Queue a request, later to be send with send_requests when the - socket is available for writing. - ''' - self.request_time = time.time() - self.unsent_requests.append(args) - - def num_requests(self): - '''Keep unanswered requests below 100''' - n = 100 - len(self.unanswered_requests) - return min(n, len(self.unsent_requests)) - - def send_requests(self): - '''Sends queued requests. Returns False on failure.''' - self.last_send = time.time() - make_dict = lambda m, p, i: {'method': m, 'params': p, 'id': i} - n = self.num_requests() - wire_requests = self.unsent_requests[0:n] + async def is_server_ca_signed(self, sslc): try: - self.pipe.send_all([make_dict(*r) for r in wire_requests]) - except BaseException as e: - self.print_error("pipe send error:", e) + await self.open_session(sslc, exit_early=True) + except ssl.SSLError as e: + assert e.reason == 'CERTIFICATE_VERIFY_FAILED' return False - self.unsent_requests = self.unsent_requests[n:] - for request in wire_requests: - if self.debug: - self.print_error("-->", request) - self.unanswered_requests[request[2]] = request return True - def ping_required(self): - '''Returns True if a ping should be sent.''' - return time.time() - self.last_send > 300 + async def _try_saving_ssl_cert_for_first_time(self, ca_ssl_context): + try: + ca_signed = await self.is_server_ca_signed(ca_ssl_context) + except (OSError, aiorpcx.socks.SOCKSFailure) as e: + raise ErrorGettingSSLCertFromServer(e) from e + if ca_signed: + with open(self.cert_path, 'w') as f: + # empty file means this is CA signed, not self-signed + f.write('') + else: + await self.save_certificate() - def has_timed_out(self): - '''Returns True if the interface has timed out.''' - if (self.unanswered_requests and time.time() - self.request_time > 10 - and self.pipe.idle_time() > 10): - self.print_error("timeout", len(self.unanswered_requests)) + def _is_saved_ssl_cert_available(self): + if not os.path.exists(self.cert_path): + return False + with open(self.cert_path, 'r') as f: + contents = f.read() + if contents == '': # CA signed return True + # pinned self-signed cert + try: + b = pem.dePem(contents, 'CERTIFICATE') + except SyntaxError as e: + self.print_error("error parsing already saved cert:", e) + raise ErrorParsingSSLCert(e) from e + try: + x = x509.X509(b) + except Exception as e: + self.print_error("error parsing already saved cert:", e) + raise ErrorParsingSSLCert(e) from e + try: + x.check_date() + return True + except x509.CertificateError as e: + self.print_error("certificate has expired:", e) + os.unlink(self.cert_path) # delete pinned cert only in this case + return False - return False - - def get_responses(self): - '''Call if there is data available on the socket. Returns a list of - (request, response) pairs. Notifications are singleton - unsolicited responses presumably as a result of prior - subscriptions, so request is None and there is no 'id' member. - Otherwise it is a response, which has an 'id' member and a - corresponding request. If the connection was closed remotely - or the remote server is misbehaving, a (None, None) will appear. - ''' - responses = [] - while True: + async def _get_ssl_context(self): + if self.protocol != 's': + # using plaintext TCP + return None + + # see if we already have cert for this server; or get it for the first time + ca_sslc = ssl.create_default_context(ssl.Purpose.SERVER_AUTH) + if not self._is_saved_ssl_cert_available(): + await self._try_saving_ssl_cert_for_first_time(ca_sslc) + # now we have a file saved in our certificate store + siz = os.stat(self.cert_path).st_size + if siz == 0: + # CA signed cert + sslc = ca_sslc + else: + # pinned self-signed cert + sslc = ssl.create_default_context(ssl.Purpose.SERVER_AUTH, cafile=self.cert_path) + sslc.check_hostname = 0 + return sslc + + @aiosafe + async def run(self): + try: + ssl_context = await self._get_ssl_context() + except (ErrorParsingSSLCert, ErrorGettingSSLCertFromServer) as e: + self.exception = e + return + try: + await self.open_session(ssl_context, exit_early=False) + except (asyncio.CancelledError, OSError, aiorpcx.socks.SOCKSFailure) as e: + self.print_error('disconnecting due to: {} {}'.format(e, type(e))) + self.exception = e + return + # should never get here (can only exit via exception) + assert False + + def mark_ready(self): + assert self.tip_header + chain = blockchain.check_header(self.tip_header) + if not chain: + self.blockchain = blockchain.blockchains[0] + else: + self.blockchain = chain + + self.print_error("set blockchain with height", self.blockchain.height()) + + if not self.ready.done(): + self.ready.set_result(1) + + async def save_certificate(self): + if not os.path.exists(self.cert_path): + # we may need to retry this a few times, in case the handshake hasn't completed + for _ in range(10): + dercert = await self.get_certificate() + if dercert: + self.print_error("succeeded in getting cert") + with open(self.cert_path, 'w') as f: + cert = ssl.DER_cert_to_PEM_cert(dercert) + # workaround android bug + cert = re.sub("([^\n])-----END CERTIFICATE-----","\\1\n-----END CERTIFICATE-----",cert) + f.write(cert) + # even though close flushes we can't fsync when closed. + # and we must flush before fsyncing, cause flush flushes to OS buffer + # fsync writes to OS buffer to disk + f.flush() + os.fsync(f.fileno()) + break + await asyncio.sleep(1) + else: + raise Exception("could not get certificate") + + async def get_certificate(self): + sslc = ssl.SSLContext() + try: + async with aiorpcx.ClientSession(self.host, self.port, ssl=sslc, proxy=self.proxy) as session: + return session.transport._ssl_protocol._sslpipe._sslobj.getpeercert(True) + except ValueError: + return None + + async def get_block_header(self, height, assert_mode): + res = await self.session.send_request('blockchain.block.header', [height], timeout=5) + return blockchain.deserialize_header(bytes.fromhex(res), height) + + async def request_chunk(self, idx, tip): + return await self.network.request_chunk(idx, tip, self.session) + + async def open_session(self, sslc, exit_early): + header_queue = asyncio.Queue() + self.session = NotificationSession(self.host, self.port, ssl=sslc, proxy=self.proxy) + async with self.session as session: try: - response = self.pipe.get() - except util.timeout: - break - if not type(response) is dict: - responses.append((None, None)) - if response is None: - self.closed_remotely = True - self.print_error("connection closed remotely") - break - if self.debug: - self.print_error("<--", response) - wire_id = response.get('id', None) - if wire_id is None: # Notification - responses.append((None, response)) + ver = await session.send_request('server.version', [ELECTRUM_VERSION, PROTOCOL_VERSION]) + except aiorpcx.jsonrpc.RPCError as e: + raise GracefulDisconnect(e) # probably 'unsupported protocol version' + if exit_early: + return + self.print_error(ver, self.host) + await session.subscribe('blockchain.headers.subscribe', [], header_queue) + async with self.group as group: + await group.spawn(self.ping()) + await group.spawn(self.run_fetch_blocks(header_queue)) + await group.spawn(self.monitor_connection()) + # NOTE: group.__aexit__ will be called here; this is needed to notice exceptions in the group! + + async def monitor_connection(self): + while True: + await asyncio.sleep(1) + if not self.session or self.session.is_closing(): + raise GracefulDisconnect('server closed session') + + async def ping(self): + while True: + await asyncio.sleep(300) + await self.session.send_request('server.ping', timeout=10) + + def close(self): + self.fut.cancel() + asyncio.get_event_loop().create_task(self.group.cancel_remaining()) + + async def run_fetch_blocks(self, header_queue): + while True: + self.network.notify('updated') + item = await header_queue.get() + item = item[0] + height = item['height'] + item = blockchain.deserialize_header(bfh(item['hex']), item['height']) + self.tip_header = item + self.tip = height + if self.tip < constants.net.max_checkpoint(): + raise GracefulDisconnect('server tip below max checkpoint') + if not self.ready.done(): + self.mark_ready() + async with self.network.bhi_lock: + if self.blockchain.height() < item['block_height']-1: + _, height = await self.sync_until(height, None) + if self.blockchain.height() >= height and self.blockchain.check_header(item): + # another interface amended the blockchain + self.print_error("skipping header", height) + continue + if self.tip < height: + height = self.tip + _, height = await self.step(height, item) + + async def sync_until(self, height, next_height=None): + if next_height is None: + next_height = self.tip + last = None + while last is None or height < next_height: + if next_height > height + 10: + could_connect, num_headers = await self.request_chunk(height, next_height) + if not could_connect: + if height <= constants.net.max_checkpoint(): + raise Exception('server chain conflicts with checkpoints or genesis') + last, height = await self.step(height) + continue + height = (height // 2016 * 2016) + num_headers + if height > next_height: + assert False, (height, self.tip) + last = 'catchup' else: - request = self.unanswered_requests.pop(wire_id, None) - if request: - responses.append((request, response)) + last, height = await self.step(height) + return last, height + + async def step(self, height, header=None): + assert height != 0 + if header is None: + header = await self.get_block_header(height, 'catchup') + chain = self.blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header) + if chain: return 'catchup', height + can_connect = blockchain.can_connect(header) if 'mock' not in header else header['mock']['connect'](height) + + bad_header = None + if not can_connect: + self.print_error("can't connect", height) + #backward + bad = height + bad_header = header + height -= 1 + checkp = False + if height <= constants.net.max_checkpoint(): + height = constants.net.max_checkpoint() + checkp = True + + header = await self.get_block_header(height, 'backward') + chain = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header) + can_connect = blockchain.can_connect(header) if 'mock' not in header else header['mock']['connect'](height) + if checkp and not (can_connect or chain): + raise Exception("server chain conflicts with checkpoints. {} {}".format(can_connect, chain)) + while not chain and not can_connect: + bad = height + bad_header = header + delta = self.tip - height + next_height = self.tip - 2 * delta + checkp = False + if next_height <= constants.net.max_checkpoint(): + next_height = constants.net.max_checkpoint() + checkp = True + height = next_height + + header = await self.get_block_header(height, 'backward') + chain = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header) + can_connect = blockchain.can_connect(header) if 'mock' not in header else header['mock']['connect'](height) + if checkp and not (can_connect or chain): + raise Exception("server chain conflicts with checkpoints. {} {}".format(can_connect, chain)) + self.print_error("exiting backward mode at", height) + if can_connect: + self.print_error("could connect", height) + chain = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header) + + if type(can_connect) is bool: + # mock + height += 1 + if height > self.tip: + assert False + return 'catchup', height + self.blockchain = can_connect + height += 1 + self.blockchain.save_header(header) + return 'catchup', height + + if not chain: + raise Exception("not chain") # line 931 in 8e69174374aee87d73cd2f8005fbbe87c93eee9c's network.py + + # binary + if type(chain) in [int, bool]: + pass # mock + else: + self.blockchain = chain + good = height + height = (bad + good) // 2 + header = await self.get_block_header(height, 'binary') + while True: + self.print_error("binary step") + chain = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header) + if chain: + assert bad != height, (bad, height) + good = height + self.blockchain = self.blockchain if type(chain) in [bool, int] else chain + else: + bad = height + assert good != height + bad_header = header + if bad != good + 1: + height = (bad + good) // 2 + header = await self.get_block_header(height, 'binary') + continue + mock = bad_header and 'mock' in bad_header and bad_header['mock']['connect'](height) + real = not mock and self.blockchain.can_connect(bad_header, check_height=False) + if not real and not mock: + raise Exception('unexpected bad header during binary' + str(bad_header)) # line 948 in 8e69174374aee87d73cd2f8005fbbe87c93eee9c's network.py + branch = blockchain.blockchains.get(bad) + if branch is not None: + ismocking = False + if type(branch) is dict: + ismocking = True + # FIXME: it does not seem sufficient to check that the branch + # contains the bad_header. what if self.blockchain doesn't? + # the chains shouldn't be joined then. observe the incorrect + # joining on regtest with a server that has a fork of height + # one. the problem is observed only if forking is not during + # electrum runtime + if not ismocking and branch.check_header(bad_header) \ + or ismocking and branch['check'](bad_header): + self.print_error('joining chain', bad) + height += 1 + return 'join', height else: - self.print_error("unknown wire ID", wire_id) - responses.append((None, None)) # Signal - break - - return responses + if not ismocking and branch.parent().check_header(header) \ + or ismocking and branch['parent']['check'](header): + self.print_error('reorg', bad, self.tip) + self.blockchain = branch.parent() if not ismocking else branch['parent'] + height = bad + header = await self.get_block_header(height, 'binary') + else: + if ismocking: + height = bad + 1 + self.print_error("TODO replace blockchain") + return 'conflict', height + self.print_error('forkpoint conflicts with existing fork', branch.path()) + branch.write(b'', 0) + branch.save_header(bad_header) + self.blockchain = branch + height = bad + 1 + return 'conflict', height + else: + bh = self.blockchain.height() + if bh > good: + forkfun = self.blockchain.fork + if 'mock' in bad_header: + chain = bad_header['mock']['check'](bad_header) + forkfun = bad_header['mock']['fork'] if 'fork' in bad_header['mock'] else forkfun + else: + chain = self.blockchain.check_header(bad_header) + if not chain: + b = forkfun(bad_header) + assert bad not in blockchain.blockchains, (bad, list(blockchain.blockchains.keys())) + blockchain.blockchains[bad] = b + self.blockchain = b + height = b.forkpoint + 1 + assert b.forkpoint == bad + return 'fork', height + else: + assert bh == good + if bh < self.tip: + self.print_error("catching up from %d"% (bh + 1)) + height = bh + 1 + return 'no_fork', height def check_cert(host, cert): diff --git a/electrum/network.py b/electrum/network.py @@ -23,32 +23,29 @@ import time import queue import os -import errno import random import re -import select from collections import defaultdict import threading import socket import json import sys import ipaddress +import asyncio +from typing import NamedTuple, Optional import dns import dns.resolver -import socks +from aiorpcx import TaskGroup from . import util -from .util import print_error -from . import bitcoin +from .util import PrintError, print_error, aiosafe, bfh from .bitcoin import COIN from . import constants -from .interface import Connection, Interface from . import blockchain -from .version import ELECTRUM_VERSION, PROTOCOL_VERSION -from .i18n import _ -from .blockchain import InvalidHeader - +from .interface import Interface, serialize_server, deserialize_server +from .version import PROTOCOL_VERSION +from .simple_config import SimpleConfig NODES_RETRY_INTERVAL = 60 SERVER_RETRY_INTERVAL = 10 @@ -111,9 +108,14 @@ def pick_random_server(hostmap = None, protocol = 's', exclude_set = set()): return random.choice(eligible) if eligible else None -from .simple_config import SimpleConfig +NetworkParameters = NamedTuple("NetworkParameters", [("host", str), + ("port", str), + ("protocol", str), + ("proxy", Optional[dict]), + ("auto_connect", bool)]) -proxy_modes = ['socks4', 'socks5', 'http'] + +proxy_modes = ['socks4', 'socks5'] def serialize_proxy(p): @@ -123,12 +125,13 @@ def serialize_proxy(p): p.get('user', ''), p.get('password', '')]) -def deserialize_proxy(s): +def deserialize_proxy(s: str) -> Optional[dict]: if not isinstance(s, str): return None if s.lower() == 'none': return None proxy = { "mode":"socks5", "host":"localhost" } + # FIXME raw IPv6 address fails here args = s.split(':') n = 0 if proxy_modes.count(args[n]) == 1: @@ -150,19 +153,10 @@ def deserialize_proxy(s): return proxy -def deserialize_server(server_str): - host, port, protocol = str(server_str).rsplit(':', 2) - if protocol not in 'st': - raise ValueError('invalid network protocol: {}'.format(protocol)) - int(port) # Throw if cannot be converted to int - return host, port, protocol - - -def serialize_server(host, port, protocol): - return str(':'.join([host, port, protocol])) +INSTANCE = None -class Network(util.DaemonThread): +class Network(PrintError): """The Network class manages a set of connections to remote electrum servers, each connected socket is handled by an Interface() object. Connections are initiated by a Connection() thread which stops once @@ -177,15 +171,16 @@ class Network(util.DaemonThread): verbosity_filter = 'n' def __init__(self, config=None): + global INSTANCE + INSTANCE = self if config is None: config = {} # Do not use mutables as default values! - util.DaemonThread.__init__(self) self.config = SimpleConfig(config) if isinstance(config, dict) else config self.num_server = 10 if not self.config.get('oneserver') else 0 - self.blockchains = blockchain.read_blockchains(self.config) # note: needs self.blockchains_lock - self.print_error("blockchains", self.blockchains.keys()) + blockchain.blockchains = blockchain.read_blockchains(self.config) # note: needs self.blockchains_lock + self.print_error("blockchains", list(blockchain.blockchains.keys())) self.blockchain_index = config.get('blockchain_index', 0) - if self.blockchain_index not in self.blockchains.keys(): + if self.blockchain_index not in blockchain.blockchains.keys(): self.blockchain_index = 0 # Server for addresses and transactions self.default_server = self.config.get('server', None) @@ -200,11 +195,11 @@ class Network(util.DaemonThread): self.default_server = pick_random_server() # locks: if you need to take multiple ones, acquire them in the order they are defined here! + self.bhi_lock = asyncio.Lock() self.interface_lock = threading.RLock() # <- re-entrant self.callback_lock = threading.Lock() self.pending_sends_lock = threading.Lock() self.recent_servers_lock = threading.RLock() # <- re-entrant - self.subscribed_addresses_lock = threading.Lock() self.blockchains_lock = threading.Lock() self.pending_sends = [] @@ -216,9 +211,6 @@ class Network(util.DaemonThread): self.banner = '' self.donation_address = '' self.relay_fee = None - # callbacks passed with subscriptions - self.subscriptions = defaultdict(list) # note: needs self.callback_lock - self.sub_cache = {} # note: needs self.interface_lock # callbacks set by the GUI self.callbacks = defaultdict(list) # note: needs self.callback_lock @@ -226,7 +218,6 @@ class Network(util.DaemonThread): util.make_dir(dir_path) # subscriptions and requests - self.subscribed_addresses = set() # note: needs self.subscribed_addresses_lock self.h2addr = {} # Requests from client we've not seen a response to self.unanswered_requests = {} @@ -244,6 +235,11 @@ class Network(util.DaemonThread): self.socket_queue = queue.Queue() self.start_network(deserialize_server(self.default_server)[2], deserialize_proxy(self.config.get('proxy'))) + self.asyncio_loop = asyncio.get_event_loop() + + @staticmethod + def get_instance(): + return INSTANCE def with_interface_lock(func): def func_wrapper(self, *args, **kwargs): @@ -271,7 +267,12 @@ class Network(util.DaemonThread): def trigger_callback(self, event, *args): with self.callback_lock: callbacks = self.callbacks[event][:] - [callback(event, *args) for callback in callbacks] + for callback in callbacks: + if asyncio.iscoroutinefunction(callback): + # FIXME: if callback throws, we will lose the traceback + asyncio.run_coroutine_threadsafe(callback(event, *args), self.asyncio_loop) + else: + callback(event, *args) def read_recent_servers(self): if not self.config.path: @@ -316,53 +317,39 @@ class Network(util.DaemonThread): self.notify('status') def is_connected(self): - return self.interface is not None + return self.interface is not None and self.interface.ready.done() def is_connecting(self): return self.connection_status == 'connecting' - @with_interface_lock - def queue_request(self, method, params, interface=None): - # If you want to queue a request on any interface it must go - # through this function so message ids are properly tracked - if interface is None: - interface = self.interface - if interface is None: - self.print_error('warning: dropping request', method, params) - return - message_id = self.message_id - self.message_id += 1 - if self.debug: - self.print_error(interface.host, "-->", method, params, message_id) - interface.queue_request(method, params, message_id) - return message_id - - @with_interface_lock - def send_subscriptions(self): - assert self.interface - self.print_error('sending subscriptions to', self.interface.server, len(self.unanswered_requests), len(self.subscribed_addresses)) - self.sub_cache.clear() - # Resend unanswered requests - requests = self.unanswered_requests.values() - self.unanswered_requests = {} - for request in requests: - message_id = self.queue_request(request[0], request[1]) - self.unanswered_requests[message_id] = request - self.queue_request('server.banner', []) - self.queue_request('server.donation_address', []) - self.queue_request('server.peers.subscribe', []) - self.request_fee_estimates() - self.queue_request('blockchain.relayfee', []) - with self.subscribed_addresses_lock: - for h in self.subscribed_addresses: - self.queue_request('blockchain.scripthash.subscribe', [h]) - - def request_fee_estimates(self): + async def request_server_info(self, interface): + await interface.ready + session = interface.session + self.banner = await session.send_request('server.banner') + self.notify('banner') + self.donation_address = await session.send_request('server.donation_address') + self.irc_servers = parse_servers(await session.send_request('server.peers.subscribe')) + self.notify('servers') + await self.request_fee_estimates(interface) + relayfee = await session.send_request('blockchain.relayfee') + self.relay_fee = int(relayfee * COIN) if relayfee is not None else None + + async def request_fee_estimates(self, interface): + session = interface.session from .simple_config import FEE_ETA_TARGETS self.config.requested_fee_estimates() - self.queue_request('mempool.get_fee_histogram', []) - for i in FEE_ETA_TARGETS: - self.queue_request('blockchain.estimatefee', [i]) + async with TaskGroup() as group: + histogram_task = await group.spawn(session.send_request('mempool.get_fee_histogram')) + fee_tasks = [] + for i in FEE_ETA_TARGETS: + fee_tasks.append((i, await group.spawn(session.send_request('blockchain.estimatefee', [i])))) + self.config.mempool_fees = histogram_task.result() + self.notify('fee_histogram') + for i, task in fee_tasks: + fee = int(task.result() * COIN) + self.config.update_fee_estimates(i, fee) + self.print_error("fee_estimates[%d]" % i, fee) + self.notify('fee') def get_status_value(self, key): if key == 'status': @@ -387,9 +374,9 @@ class Network(util.DaemonThread): else: self.trigger_callback(key, self.get_status_value(key)) - def get_parameters(self): + def get_parameters(self) -> NetworkParameters: host, port, protocol = deserialize_server(self.default_server) - return host, port, protocol, self.proxy, self.auto_connect + return NetworkParameters(host, port, protocol, self.proxy, self.auto_connect) def get_donation_address(self): if self.is_connected(): @@ -424,7 +411,7 @@ class Network(util.DaemonThread): self.print_error("connecting to %s as new interface" % server) self.set_status('connecting') self.connecting.add(server) - Connection(server, self.socket_queue, self.config.path) + self.socket_queue.put(server) def start_random_interface(self): with self.interface_lock: @@ -432,32 +419,18 @@ class Network(util.DaemonThread): server = pick_random_server(self.get_servers(), self.protocol, exclude_set) if server: self.start_interface(server) + return server - def start_interfaces(self): - self.start_interface(self.default_server) - for i in range(self.num_server - 1): - self.start_random_interface() - - def set_proxy(self, proxy): + def set_proxy(self, proxy: Optional[dict]): self.proxy = proxy # Store these somewhere so we can un-monkey-patch - if not hasattr(socket, "_socketobject"): - socket._socketobject = socket.socket + if not hasattr(socket, "_getaddrinfo"): socket._getaddrinfo = socket.getaddrinfo if proxy: self.print_error('setting proxy', proxy) - proxy_mode = proxy_modes.index(proxy["mode"]) + 1 - socks.setdefaultproxy(proxy_mode, - proxy["host"], - int(proxy["port"]), - # socks.py seems to want either None or a non-empty string - username=(proxy.get("user", "") or None), - password=(proxy.get("password", "") or None)) - socket.socket = socks.socksocket # prevent dns leaks, see http://stackoverflow.com/questions/13184205/dns-over-proxy socket.getaddrinfo = lambda *args: [(socket.AF_INET, socket.SOCK_STREAM, 6, '', (args[0], args[1]))] else: - socket.socket = socket._socketobject if sys.platform == 'win32': # On Windows, socket.getaddrinfo takes a mutex, and might hold it for up to 10 seconds # when dns-resolving. To speed it up drastically, we resolve dns ourselves, outside that lock. @@ -465,6 +438,7 @@ class Network(util.DaemonThread): socket.getaddrinfo = self._fast_getaddrinfo else: socket.getaddrinfo = socket._getaddrinfo + self.trigger_callback('proxy_set', self.proxy) @staticmethod def _fast_getaddrinfo(host, *args, **kwargs): @@ -495,14 +469,14 @@ class Network(util.DaemonThread): return socket._getaddrinfo(addr, *args, **kwargs) @with_interface_lock - def start_network(self, protocol, proxy): + def start_network(self, protocol: str, proxy: Optional[dict]): assert not self.interface and not self.interfaces assert not self.connecting and self.socket_queue.empty() self.print_error('starting network') self.disconnected_servers = set([]) # note: needs self.interface_lock self.protocol = protocol self.set_proxy(proxy) - self.start_interfaces() + self.start_interface(self.default_server) @with_interface_lock def stop_network(self): @@ -513,13 +487,15 @@ class Network(util.DaemonThread): self.close_interface(self.interface) assert self.interface is None assert not self.interfaces - self.connecting = set() + self.connecting.clear() # Get a new queue - no old pending connections thanks! self.socket_queue = queue.Queue() - def set_parameters(self, host, port, protocol, proxy, auto_connect): + def set_parameters(self, net_params: NetworkParameters): + proxy = net_params.proxy proxy_str = serialize_proxy(proxy) - server = serialize_server(host, port, protocol) + host, port, protocol = net_params.host, net_params.port, net_params.protocol + server_str = serialize_server(host, port, protocol) # sanitize parameters try: deserialize_server(serialize_server(host, port, protocol)) @@ -528,21 +504,21 @@ class Network(util.DaemonThread): int(proxy['port']) except: return - self.config.set_key('auto_connect', auto_connect, False) + self.config.set_key('auto_connect', net_params.auto_connect, False) self.config.set_key("proxy", proxy_str, False) - self.config.set_key("server", server, True) + self.config.set_key("server", server_str, True) # abort if changes were not allowed by config - if self.config.get('server') != server or self.config.get('proxy') != proxy_str: + if self.config.get('server') != server_str or self.config.get('proxy') != proxy_str: return - self.auto_connect = auto_connect + self.auto_connect = net_params.auto_connect if self.proxy != proxy or self.protocol != protocol: # Restart the network defaulting to the given server with self.interface_lock: self.stop_network() - self.default_server = server + self.default_server = server_str self.start_network(protocol, proxy) - elif self.default_server != server: - self.switch_to_interface(server) + elif self.default_server != server_str: + self.switch_to_interface(server_str) else: self.switch_lagging_interface() self.notify('updated') @@ -561,7 +537,12 @@ class Network(util.DaemonThread): if self.server_is_lagging() and self.auto_connect: # switch to one that has the correct header (not height) header = self.blockchain().read_header(self.get_local_height()) - filtered = list(map(lambda x: x[0], filter(lambda x: x[1].tip_header == header, self.interfaces.items()))) + def filt(x): + a = x[1].tip_header + b = header + assert type(a) is type(b) + return a == b + filtered = list(map(lambda x: x[0], filter(filt, self.interfaces.items()))) if filtered: choice = random.choice(filtered) self.switch_to_interface(choice) @@ -581,11 +562,20 @@ class Network(util.DaemonThread): i = self.interfaces[server] if self.interface != i: self.print_error("switching to", server) - # stop any current interface in order to terminate subscriptions - # fixme: we don't want to close headers sub - #self.close_interface(self.interface) + if self.interface is not None: + # Stop any current interface in order to terminate subscriptions, + # and to cancel tasks in interface.group. + # However, for headers sub, give preference to this interface + # over unknown ones, i.e. start it again right away. + old_server = self.interface.server + self.close_interface(self.interface) + if len(self.interfaces) <= self.num_server: + self.start_interface(old_server) + self.interface = i - self.send_subscriptions() + asyncio.get_event_loop().create_task( + i.group.spawn(self.request_server_info(i))) + self.trigger_callback('default_server_changed') self.set_status('connected') self.notify('updated') self.notify('interfaces') @@ -608,164 +598,6 @@ class Network(util.DaemonThread): self.recent_servers = self.recent_servers[0:20] self.save_recent_servers() - def process_response(self, interface, response, callbacks): - if self.debug: - self.print_error(interface.host, "<--", response) - error = response.get('error') - result = response.get('result') - method = response.get('method') - params = response.get('params') - - # We handle some responses; return the rest to the client. - if method == 'server.version': - interface.server_version = result - elif method == 'blockchain.headers.subscribe': - if error is None: - self.on_notify_header(interface, result) - else: - # no point in keeping this connection without headers sub - self.connection_down(interface.server) - return - elif method == 'server.peers.subscribe': - if error is None: - self.irc_servers = parse_servers(result) - self.notify('servers') - elif method == 'server.banner': - if error is None: - self.banner = result - self.notify('banner') - elif method == 'server.donation_address': - if error is None: - self.donation_address = result - elif method == 'mempool.get_fee_histogram': - if error is None: - self.print_error('fee_histogram', result) - self.config.mempool_fees = result - self.notify('fee_histogram') - elif method == 'blockchain.estimatefee': - if error is None and result > 0: - i = params[0] - fee = int(result*COIN) - self.config.update_fee_estimates(i, fee) - self.print_error("fee_estimates[%d]" % i, fee) - self.notify('fee') - elif method == 'blockchain.relayfee': - if error is None: - self.relay_fee = int(result * COIN) if result is not None else None - self.print_error("relayfee", self.relay_fee) - elif method == 'blockchain.block.headers': - self.on_block_headers(interface, response) - elif method == 'blockchain.block.get_header': - self.on_get_header(interface, response) - - for callback in callbacks: - callback(response) - - @classmethod - def get_index(cls, method, params): - """ hashable index for subscriptions and cache""" - return str(method) + (':' + str(params[0]) if params else '') - - def process_responses(self, interface): - responses = interface.get_responses() - for request, response in responses: - if request: - method, params, message_id = request - k = self.get_index(method, params) - # client requests go through self.send() with a - # callback, are only sent to the current interface, - # and are placed in the unanswered_requests dictionary - client_req = self.unanswered_requests.pop(message_id, None) - if client_req: - if interface != self.interface: - # we probably changed the current interface - # in the meantime; drop this. - return - callbacks = [client_req[2]] - else: - # fixme: will only work for subscriptions - k = self.get_index(method, params) - callbacks = list(self.subscriptions.get(k, [])) - - # Copy the request method and params to the response - response['method'] = method - response['params'] = params - # Only once we've received a response to an addr subscription - # add it to the list; avoids double-sends on reconnection - if method == 'blockchain.scripthash.subscribe': - with self.subscribed_addresses_lock: - self.subscribed_addresses.add(params[0]) - else: - if not response: # Closed remotely / misbehaving - self.connection_down(interface.server) - break - # Rewrite response shape to match subscription request response - method = response.get('method') - params = response.get('params') - k = self.get_index(method, params) - if method == 'blockchain.headers.subscribe': - response['result'] = params[0] - response['params'] = [] - elif method == 'blockchain.scripthash.subscribe': - response['params'] = [params[0]] # addr - response['result'] = params[1] - callbacks = list(self.subscriptions.get(k, [])) - - # update cache if it's a subscription - if method.endswith('.subscribe'): - with self.interface_lock: - self.sub_cache[k] = response - # Response is now in canonical form - self.process_response(interface, response, callbacks) - - def send(self, messages, callback): - '''Messages is a list of (method, params) tuples''' - messages = list(messages) - with self.pending_sends_lock: - self.pending_sends.append((messages, callback)) - - @with_interface_lock - def process_pending_sends(self): - # Requests needs connectivity. If we don't have an interface, - # we cannot process them. - if not self.interface: - return - - with self.pending_sends_lock: - sends = self.pending_sends - self.pending_sends = [] - - for messages, callback in sends: - for method, params in messages: - r = None - if method.endswith('.subscribe'): - k = self.get_index(method, params) - # add callback to list - l = list(self.subscriptions.get(k, [])) - if callback not in l: - l.append(callback) - with self.callback_lock: - self.subscriptions[k] = l - # check cached response for subscriptions - r = self.sub_cache.get(k) - - if r is not None: - self.print_error("cache hit", k) - callback(r) - else: - message_id = self.queue_request(method, params) - self.unanswered_requests[message_id] = method, params, callback - - def unsubscribe(self, callback): - '''Unsubscribe a callback to free object references to enable GC.''' - # Note: we can't unsubscribe from the server, so if we receive - # subsequent notifications process_response() will emit a harmless - # "received unexpected notification" warning - with self.callback_lock: - for v in self.subscriptions.values(): - if callback in v: - v.remove(callback) - @with_interface_lock def connection_down(self, server): '''A connection to server either went down, or was never made. @@ -777,297 +609,39 @@ class Network(util.DaemonThread): self.close_interface(self.interfaces[server]) self.notify('interfaces') with self.blockchains_lock: - for b in self.blockchains.values(): + for b in blockchain.blockchains.values(): if b.catch_up == server: b.catch_up = None - def new_interface(self, server, socket): + @aiosafe + async def new_interface(self, server): # todo: get tip first, then decide which checkpoint to use. self.add_recent_server(server) - interface = Interface(server, socket) - interface.blockchain = None - interface.tip_header = None - interface.tip = 0 - interface.mode = 'default' - interface.request = None - with self.interface_lock: - self.interfaces[server] = interface - # server.version should be the first message - params = [ELECTRUM_VERSION, PROTOCOL_VERSION] - self.queue_request('server.version', params, interface) - self.queue_request('blockchain.headers.subscribe', [True], interface) - if server == self.default_server: - self.switch_to_interface(server) - #self.notify('interfaces') - - def maintain_sockets(self): - '''Socket maintenance.''' - # Responses to connection attempts? - while not self.socket_queue.empty(): - server, socket = self.socket_queue.get() - if server in self.connecting: - self.connecting.remove(server) - - if socket: - self.new_interface(server, socket) - else: - self.connection_down(server) - - # Send pings and shut down stale interfaces - # must use copy of values - with self.interface_lock: - interfaces = list(self.interfaces.values()) - for interface in interfaces: - if interface.has_timed_out(): - self.connection_down(interface.server) - elif interface.ping_required(): - self.queue_request('server.ping', [], interface) - - now = time.time() - # nodes - with self.interface_lock: - if len(self.interfaces) + len(self.connecting) < self.num_server: - self.start_random_interface() - if now - self.nodes_retry_time > NODES_RETRY_INTERVAL: - self.print_error('network: retrying connections') - self.disconnected_servers = set([]) - self.nodes_retry_time = now - # main interface - with self.interface_lock: - if not self.is_connected(): - if self.auto_connect: - if not self.is_connecting(): - self.switch_to_random_interface() - else: - if self.default_server in self.disconnected_servers: - if now - self.server_retry_time > SERVER_RETRY_INTERVAL: - self.disconnected_servers.remove(self.default_server) - self.server_retry_time = now - else: - self.switch_to_interface(self.default_server) - else: - if self.config.is_fee_estimates_update_required(): - self.request_fee_estimates() - - def request_chunk(self, interface, index): - if index in self.requested_chunks: - return - interface.print_error("requesting chunk %d" % index) - self.requested_chunks.add(index) - height = index * 2016 - self.queue_request('blockchain.block.headers', [height, 2016], - interface) - - def on_block_headers(self, interface, response): - '''Handle receiving a chunk of block headers''' - error = response.get('error') - result = response.get('result') - params = response.get('params') - blockchain = interface.blockchain - if result is None or params is None or error is not None: - interface.print_error(error or 'bad response') - return - # Ignore unsolicited chunks - height = params[0] - index = height // 2016 - if index * 2016 != height or index not in self.requested_chunks: - interface.print_error("received chunk %d (unsolicited)" % index) - return - else: - interface.print_error("received chunk %d" % index) - self.requested_chunks.remove(index) - hexdata = result['hex'] - connect = blockchain.connect_chunk(index, hexdata) - if not connect: + interface = Interface(self, server, self.config.path, self.proxy) + timeout = 10 if not self.proxy else 20 + try: + await asyncio.wait_for(interface.ready, timeout) + except BaseException as e: + #import traceback + #traceback.print_exc() + self.print_error(interface.server, "couldn't launch because", str(e), str(type(e))) self.connection_down(interface.server) return - if index >= len(blockchain.checkpoints): - # If not finished, get the next chunk - if blockchain.height() < interface.tip: - self.request_chunk(interface, index+1) - else: - interface.mode = 'default' - interface.print_error('catch up done', blockchain.height()) - blockchain.catch_up = None - else: - # the verifier must have asked for this chunk - pass - self.notify('updated') + finally: + try: self.connecting.remove(server) + except KeyError: pass - def on_get_header(self, interface, response): - '''Handle receiving a single block header''' - header = response.get('result') - if not header: - interface.print_error(response) - self.connection_down(interface.server) - return - height = header.get('block_height') - #interface.print_error('got header', height, blockchain.hash_header(header)) - if interface.request != height: - interface.print_error("unsolicited header",interface.request, height) - self.connection_down(interface.server) - return - chain = blockchain.check_header(header) - if interface.mode == 'backward': - can_connect = blockchain.can_connect(header) - if can_connect and can_connect.catch_up is None: - interface.mode = 'catch_up' - interface.blockchain = can_connect - interface.blockchain.save_header(header) - next_height = height + 1 - interface.blockchain.catch_up = interface.server - elif chain: - # FIXME should await "initial chunk download". - # binary search will NOT do the correct thing if we don't yet - # have all headers up to the fork height - interface.print_error("binary search") - interface.mode = 'binary' - interface.blockchain = chain - interface.good = height - next_height = (interface.bad + interface.good) // 2 - assert next_height >= self.max_checkpoint(), (interface.bad, interface.good) - else: - if height == 0: - self.connection_down(interface.server) - next_height = None - else: - interface.bad = height - interface.bad_header = header - delta = interface.tip - height - next_height = max(self.max_checkpoint(), interface.tip - 2 * delta) - if height == next_height: - self.connection_down(interface.server) - next_height = None - - elif interface.mode == 'binary': - if chain: - interface.good = height - interface.blockchain = chain - else: - interface.bad = height - interface.bad_header = header - if interface.bad != interface.good + 1: - next_height = (interface.bad + interface.good) // 2 - assert next_height >= self.max_checkpoint() - elif not interface.blockchain.can_connect(interface.bad_header, check_height=False): - self.connection_down(interface.server) - next_height = None - else: - branch = self.blockchains.get(interface.bad) - if branch is not None: - if branch.check_header(interface.bad_header): - interface.print_error('joining chain', interface.bad) - next_height = None - elif branch.parent().check_header(header): - interface.print_error('reorg', interface.bad, interface.tip) - interface.blockchain = branch.parent() - next_height = interface.bad - else: - interface.print_error('forkpoint conflicts with existing fork', branch.path()) - branch.write(b'', 0) - branch.save_header(interface.bad_header) - interface.mode = 'catch_up' - interface.blockchain = branch - next_height = interface.bad + 1 - interface.blockchain.catch_up = interface.server - else: - bh = interface.blockchain.height() - next_height = None - if bh > interface.good: - if not interface.blockchain.check_header(interface.bad_header): - b = interface.blockchain.fork(interface.bad_header) - with self.blockchains_lock: - self.blockchains[interface.bad] = b - interface.blockchain = b - interface.print_error("new chain", b.forkpoint) - interface.mode = 'catch_up' - maybe_next_height = interface.bad + 1 - if maybe_next_height <= interface.tip: - next_height = maybe_next_height - interface.blockchain.catch_up = interface.server - else: - assert bh == interface.good - if interface.blockchain.catch_up is None and bh < interface.tip: - interface.print_error("catching up from %d"% (bh + 1)) - interface.mode = 'catch_up' - next_height = bh + 1 - interface.blockchain.catch_up = interface.server - - self.notify('updated') - - elif interface.mode == 'catch_up': - can_connect = interface.blockchain.can_connect(header) - if can_connect: - interface.blockchain.save_header(header) - next_height = height + 1 if height < interface.tip else None - else: - # go back - interface.print_error("cannot connect", height) - interface.mode = 'backward' - interface.bad = height - interface.bad_header = header - next_height = height - 1 - - if next_height is None: - # exit catch_up state - interface.print_error('catch up done', interface.blockchain.height()) - interface.blockchain.catch_up = None - self.switch_lagging_interface() - self.notify('updated') + with self.interface_lock: + self.interfaces[server] = interface - else: - raise Exception(interface.mode) - # If not finished, get the next header - if next_height is not None: - if next_height < 0: - self.connection_down(interface.server) - next_height = None - elif interface.mode == 'catch_up' and interface.tip > next_height + 50: - self.request_chunk(interface, next_height // 2016) - else: - self.request_header(interface, next_height) - if next_height is None: - interface.mode = 'default' - interface.request = None - self.notify('updated') + if server == self.default_server: + self.switch_to_interface(server) - # refresh network dialog self.notify('interfaces') - def maintain_requests(self): - with self.interface_lock: - interfaces = list(self.interfaces.values()) - for interface in interfaces: - if interface.request and time.time() - interface.request_time > 20: - interface.print_error("blockchain request timed out") - self.connection_down(interface.server) - continue - - def wait_on_sockets(self): - # Python docs say Windows doesn't like empty selects. - # Sleep to prevent busy looping - if not self.interfaces: - time.sleep(0.1) - return - with self.interface_lock: - interfaces = list(self.interfaces.values()) - rin = [i for i in interfaces] - win = [i for i in interfaces if i.num_requests()] - try: - rout, wout, xout = select.select(rin, win, [], 0.1) - except socket.error as e: - if e.errno == errno.EINTR: - return - raise - assert not xout - for interface in wout: - interface.send_requests() - for interface in rout: - self.process_responses(interface) - def init_headers_file(self): - b = self.blockchains[0] + b = blockchain.blockchains[0] filename = b.path() length = 80 * len(constants.net.CHECKPOINTS) * 2016 if not os.path.exists(filename) or os.path.getsize(filename) < length: @@ -1078,82 +652,57 @@ class Network(util.DaemonThread): with b.lock: b.update_size() - def run(self): - self.init_headers_file() - while self.is_running(): - self.maintain_sockets() - self.wait_on_sockets() - self.maintain_requests() - self.run_jobs() # Synchronizer and Verifier - self.process_pending_sends() - self.stop_network() - self.on_stop() - - def on_notify_header(self, interface, header_dict): + async def get_merkle_for_transaction(self, tx_hash, tx_height): + return await self.interface.session.send_request('blockchain.transaction.get_merkle', [tx_hash, tx_height]) + + def broadcast_transaction_from_non_network_thread(self, tx, timeout=10): + # note: calling this from the network thread will deadlock it + fut = asyncio.run_coroutine_threadsafe(self.broadcast_transaction(tx, timeout=timeout), self.asyncio_loop) + return fut.result() + + async def broadcast_transaction(self, tx, timeout=10): try: - header_hex, height = header_dict['hex'], header_dict['height'] - except KeyError: - # no point in keeping this connection without headers sub - self.connection_down(interface.server) + out = await self.interface.session.send_request('blockchain.transaction.broadcast', [str(tx)], timeout=timeout) + except asyncio.TimeoutError as e: + return False, "error: operation timed out" + except Exception as e: + return False, "error: " + str(e) + + if out != tx.txid(): + return False, "error: " + out + return True, out + + async def request_chunk(self, height, tip, session=None, can_return_early=False): + if session is None: session = self.interface.session + index = height // 2016 + if can_return_early and index in self.requested_chunks: return + size = 2016 + if tip is not None: + size = min(size, tip - index * 2016) + size = max(size, 0) try: - header = blockchain.deserialize_header(util.bfh(header_hex), height) - except InvalidHeader: - self.connection_down(interface.server) - return - #interface.print_error('notified of header', height, blockchain.hash_header(header)) - if height < self.max_checkpoint(): - self.connection_down(interface.server) - return - interface.tip_header = header - interface.tip = height - if interface.mode != 'default': - return - b = blockchain.check_header(header) - if b: - interface.blockchain = b - self.switch_lagging_interface() - self.notify('updated') - self.notify('interfaces') - return - b = blockchain.can_connect(header) - if b: - interface.blockchain = b - b.save_header(header) - self.switch_lagging_interface() - self.notify('updated') - self.notify('interfaces') - return - with self.blockchains_lock: - tip = max([x.height() for x in self.blockchains.values()]) - if tip >=0: - interface.mode = 'backward' - interface.bad = height - interface.bad_header = header - self.request_header(interface, min(tip +1, height - 1)) - else: - chain = self.blockchains[0] - if chain.catch_up is None: - chain.catch_up = interface - interface.mode = 'catch_up' - interface.blockchain = chain - with self.blockchains_lock: - self.print_error("switching to catchup mode", tip, self.blockchains) - self.request_header(interface, 0) - else: - self.print_error("chain already catching up with", chain.catch_up.server) + self.requested_chunks.add(index) + res = await session.send_request('blockchain.block.headers', [index * 2016, size]) + finally: + try: self.requested_chunks.remove(index) + except KeyError: pass + conn = self.blockchain().connect_chunk(index, res['hex']) + if not conn: + return conn, 0 + return conn, res['count'] @with_interface_lock def blockchain(self): if self.interface and self.interface.blockchain is not None: self.blockchain_index = self.interface.blockchain.forkpoint - return self.blockchains[self.blockchain_index] + return blockchain.blockchains[self.blockchain_index] @with_interface_lock def get_blockchains(self): out = {} with self.blockchains_lock: - blockchain_items = list(self.blockchains.items()) + blockchain_items = list(blockchain.blockchains.items()) for k, b in blockchain_items: r = list(filter(lambda i: i.blockchain==b, list(self.interfaces.values()))) if r: @@ -1161,14 +710,14 @@ class Network(util.DaemonThread): return out def follow_chain(self, index): - blockchain = self.blockchains.get(index) - if blockchain: + bc = blockchain.blockchains.get(index) + if bc: self.blockchain_index = index self.config.set_key('blockchain_index', index) with self.interface_lock: interfaces = list(self.interfaces.values()) for i in interfaces: - if i.blockchain == blockchain: + if i.blockchain == bc: self.switch_to_interface(i.server) break else: @@ -1176,148 +725,83 @@ class Network(util.DaemonThread): with self.interface_lock: if self.interface: - server = self.interface.server - host, port, protocol, proxy, auto_connect = self.get_parameters() - host, port, protocol = server.split(':') - self.set_parameters(host, port, protocol, proxy, auto_connect) + net_params = self.get_parameters() + host, port, protocol = deserialize_server(self.interface.server) + net_params = net_params._replace(host=host, port=port, protocol=protocol) + self.set_parameters(net_params) def get_local_height(self): return self.blockchain().height() - @staticmethod - def __wait_for(it): - """Wait for the result of calling lambda `it`.""" - q = queue.Queue() - it(q.put) - try: - result = q.get(block=True, timeout=30) - except queue.Empty: - raise util.TimeoutException(_('Server did not answer')) - - if result.get('error'): - raise Exception(result.get('error')) - - return result.get('result') - - @staticmethod - def __with_default_synchronous_callback(invocation, callback): - """ Use this method if you want to make the network request - synchronous. """ - if not callback: - return Network.__wait_for(invocation) - - invocation(callback) - - def request_header(self, interface, height): - self.queue_request('blockchain.block.get_header', [height], interface) - interface.request = height - interface.req_time = time.time() - - def map_scripthash_to_address(self, callback): - def cb2(x): - x2 = x.copy() - p = x2.pop('params') - addr = self.h2addr[p[0]] - x2['params'] = [addr] - callback(x2) - return cb2 - - def subscribe_to_addresses(self, addresses, callback): - hash2address = { - bitcoin.address_to_scripthash(address): address - for address in addresses} - self.h2addr.update(hash2address) - msgs = [ - ('blockchain.scripthash.subscribe', [x]) - for x in hash2address.keys()] - self.send(msgs, self.map_scripthash_to_address(callback)) - - def request_address_history(self, address, callback): - h = bitcoin.address_to_scripthash(address) - self.h2addr.update({h: address}) - self.send([('blockchain.scripthash.get_history', [h])], self.map_scripthash_to_address(callback)) - - # NOTE this method handles exceptions and a special edge case, counter to - # what the other ElectrumX methods do. This is unexpected. - def broadcast_transaction(self, transaction, callback=None): - command = 'blockchain.transaction.broadcast' - invocation = lambda c: self.send([(command, [str(transaction)])], c) - - if callback: - invocation(callback) - return - - try: - out = Network.__wait_for(invocation) - except BaseException as e: - return False, "error: " + str(e) - - if out != transaction.txid(): - return False, "error: " + out - - return True, out - - def get_history_for_scripthash(self, hash, callback=None): - command = 'blockchain.scripthash.get_history' - invocation = lambda c: self.send([(command, [hash])], c) - - return Network.__with_default_synchronous_callback(invocation, callback) - - def subscribe_to_headers(self, callback=None): - command = 'blockchain.headers.subscribe' - invocation = lambda c: self.send([(command, [True])], c) - - return Network.__with_default_synchronous_callback(invocation, callback) - - def subscribe_to_address(self, address, callback=None): - command = 'blockchain.address.subscribe' - invocation = lambda c: self.send([(command, [address])], c) - - return Network.__with_default_synchronous_callback(invocation, callback) - - def get_merkle_for_transaction(self, tx_hash, tx_height, callback=None): - command = 'blockchain.transaction.get_merkle' - invocation = lambda c: self.send([(command, [tx_hash, tx_height])], c) - - return Network.__with_default_synchronous_callback(invocation, callback) - - def subscribe_to_scripthash(self, scripthash, callback=None): - command = 'blockchain.scripthash.subscribe' - invocation = lambda c: self.send([(command, [scripthash])], c) - - return Network.__with_default_synchronous_callback(invocation, callback) - - def get_transaction(self, transaction_hash, callback=None): - command = 'blockchain.transaction.get' - invocation = lambda c: self.send([(command, [transaction_hash])], c) - - return Network.__with_default_synchronous_callback(invocation, callback) - - def get_transactions(self, transaction_hashes, callback=None): - command = 'blockchain.transaction.get' - messages = [(command, [tx_hash]) for tx_hash in transaction_hashes] - invocation = lambda c: self.send(messages, c) - - return Network.__with_default_synchronous_callback(invocation, callback) - - def listunspent_for_scripthash(self, scripthash, callback=None): - command = 'blockchain.scripthash.listunspent' - invocation = lambda c: self.send([(command, [scripthash])], c) - - return Network.__with_default_synchronous_callback(invocation, callback) - - def get_balance_for_scripthash(self, scripthash, callback=None): - command = 'blockchain.scripthash.get_balance' - invocation = lambda c: self.send([(command, [scripthash])], c) - - return Network.__with_default_synchronous_callback(invocation, callback) - def export_checkpoints(self, path): # run manually from the console to generate checkpoints cp = self.blockchain().get_checkpoints() with open(path, 'w', encoding='utf-8') as f: f.write(json.dumps(cp, indent=4)) - @classmethod - def max_checkpoint(cls): - return max(0, len(constants.net.CHECKPOINTS) * 2016 - 1) + def start(self, fx=None): + self.main_taskgroup = TaskGroup() + async def main(): + self.init_headers_file() + async with self.main_taskgroup as group: + await group.spawn(self.maintain_sessions()) + if fx: await group.spawn(fx) + self._wrapper_thread = threading.Thread(target=self.asyncio_loop.run_until_complete, args=(main(),)) + self._wrapper_thread.start() + + def stop(self): + asyncio.run_coroutine_threadsafe(self.main_taskgroup.cancel_remaining(), self.asyncio_loop) + + def join(self): + self._wrapper_thread.join(1) + + async def maintain_sessions(self): + while True: + while self.socket_queue.qsize() > 0: + server = self.socket_queue.get() + asyncio.get_event_loop().create_task(self.new_interface(server)) + remove = [] + for k, i in self.interfaces.items(): + if i.fut.done() and not i.exception: + assert False, "interface future should not finish without exception" + if i.exception: + if not i.fut.done(): + try: i.fut.cancel() + except Exception as e: self.print_error('exception while cancelling fut', e) + try: + raise i.exception + except BaseException as e: + self.print_error(i.server, "errored because:", str(e), str(type(e))) + remove.append(k) + for k in remove: + self.connection_down(k) + + # nodes + now = time.time() + for i in range(self.num_server - len(self.interfaces) - len(self.connecting)): + self.start_random_interface() + if now - self.nodes_retry_time > NODES_RETRY_INTERVAL: + self.print_error('network: retrying connections') + self.disconnected_servers = set([]) + self.nodes_retry_time = now + + # main interface + if not self.is_connected(): + if self.auto_connect: + if not self.is_connecting(): + self.switch_to_random_interface() + else: + if self.default_server in self.disconnected_servers: + if now - self.server_retry_time > SERVER_RETRY_INTERVAL: + self.disconnected_servers.remove(self.default_server) + self.server_retry_time = now + else: + self.switch_to_interface(self.default_server) + else: + if self.config.is_fee_estimates_update_required(): + await self.interface.group.spawn(self.attempt_fee_estimate_update()) + + await asyncio.sleep(0.1) + + async def attempt_fee_estimate_update(self): + await self.request_fee_estimates(self.interface) diff --git a/electrum/plugins/labels/labels.py b/electrum/plugins/labels/labels.py @@ -1,6 +1,5 @@ +import asyncio import hashlib -import requests -import threading import json import sys import traceback @@ -10,7 +9,7 @@ import base64 from electrum.plugin import BasePlugin, hook from electrum.crypto import aes_encrypt_with_iv, aes_decrypt_with_iv from electrum.i18n import _ - +from electrum.util import aiosafe, make_aiohttp_session class LabelsPlugin(BasePlugin): @@ -18,11 +17,11 @@ class LabelsPlugin(BasePlugin): BasePlugin.__init__(self, parent, config, name) self.target_host = 'labels.electrum.org' self.wallets = {} + self.proxy = None def encode(self, wallet, msg): password, iv, wallet_id = self.wallets[wallet] - encrypted = aes_encrypt_with_iv(password, iv, - msg.encode('utf8')) + encrypted = aes_encrypt_with_iv(password, iv, msg.encode('utf8')) return base64.b64encode(encrypted).decode() def decode(self, wallet, message): @@ -55,37 +54,27 @@ class LabelsPlugin(BasePlugin): "walletNonce": nonce, "externalId": self.encode(wallet, item), "encryptedLabel": self.encode(wallet, label)} - t = threading.Thread(target=self.do_request_safe, - args=["POST", "/label", False, bundle]) - t.setDaemon(True) - t.start() + asyncio.get_event_loop().create_task(self.do_post_safe("/label", bundle)) # Caller will write the wallet self.set_nonce(wallet, nonce + 1) - def do_request(self, method, url = "/labels", is_batch=False, data=None): + @aiosafe + async def do_post_safe(self, *args): + await self.do_post(*args) + + async def do_get(self, url = "/labels"): + url = 'https://' + self.target_host + url + async with make_aiohttp_session(self.proxy) as session: + async with session.get(url) as result: + return await result.json() + + async def do_post(self, url = "/labels", data=None): url = 'https://' + self.target_host + url - kwargs = {'headers': {}} - if method == 'GET' and data: - kwargs['params'] = data - elif method == 'POST' and data: - kwargs['data'] = json.dumps(data) - kwargs['headers']['Content-Type'] = 'application/json' - response = requests.request(method, url, **kwargs) - if response.status_code != 200: - raise Exception(response.status_code, response.text) - response = response.json() - if "error" in response: - raise Exception(response["error"]) - return response - - def do_request_safe(self, *args, **kwargs): - try: - self.do_request(*args, **kwargs) - except BaseException as e: - #traceback.print_exc(file=sys.stderr) - self.print_error('error doing request') - - def push_thread(self, wallet): + async with make_aiohttp_session(self.proxy) as session: + async with session.post(url, json=data) as result: + return await result.json() + + async def push_thread(self, wallet): wallet_data = self.wallets.get(wallet, None) if not wallet_data: raise Exception('Wallet {} not loaded'.format(wallet)) @@ -102,16 +91,16 @@ class LabelsPlugin(BasePlugin): continue bundle["labels"].append({'encryptedLabel': encoded_value, 'externalId': encoded_key}) - self.do_request("POST", "/labels", True, bundle) + await self.do_post("/labels", bundle) - def pull_thread(self, wallet, force): + async def pull_thread(self, wallet, force): wallet_data = self.wallets.get(wallet, None) if not wallet_data: raise Exception('Wallet {} not loaded'.format(wallet)) wallet_id = wallet_data[2] nonce = 1 if force else self.get_nonce(wallet) - 1 self.print_error("asking for labels since nonce", nonce) - response = self.do_request("GET", ("/labels/since/%d/for/%s" % (nonce, wallet_id) )) + response = await self.do_get("/labels/since/%d/for/%s" % (nonce, wallet_id)) if response["labels"] is None: self.print_error('no new labels') return @@ -140,12 +129,15 @@ class LabelsPlugin(BasePlugin): self.set_nonce(wallet, response["nonce"] + 1) self.on_pulled(wallet) - def pull_thread_safe(self, wallet, force): - try: - self.pull_thread(wallet, force) - except BaseException as e: - # traceback.print_exc(file=sys.stderr) - self.print_error('could not retrieve labels') + @aiosafe + async def pull_safe_thread(self, wallet, force): + await self.pull_thread(wallet, force) + + def pull(self, wallet, force): + return asyncio.run_coroutine_threadsafe(self.pull_thread(wallet, force), wallet.network.asyncio_loop).result() + + def push(self, wallet): + return asyncio.run_coroutine_threadsafe(self.push_thread(wallet), wallet.network.asyncio_loop).result() def start_wallet(self, wallet): nonce = self.get_nonce(wallet) @@ -159,9 +151,14 @@ class LabelsPlugin(BasePlugin): wallet_id = hashlib.sha256(mpk).hexdigest() self.wallets[wallet] = (password, iv, wallet_id) # If there is an auth token we can try to actually start syncing - t = threading.Thread(target=self.pull_thread_safe, args=(wallet, False)) - t.setDaemon(True) - t.start() + asyncio.get_event_loop().create_task(self.pull_safe_thread(wallet, False)) + self.proxy = wallet.network.proxy + wallet.network.register_callback(self.set_proxy, ['proxy_set']) def stop_wallet(self, wallet): + wallet.network.unregister_callback('proxy_set') self.wallets.pop(wallet, None) + + def set_proxy(self, evt_name, new_proxy): + self.proxy = new_proxy + self.print_error("proxy set") diff --git a/electrum/plugins/labels/qt.py b/electrum/plugins/labels/qt.py @@ -38,11 +38,11 @@ class Plugin(LabelsPlugin): hbox = QHBoxLayout() hbox.addWidget(QLabel("Label sync options:")) upload = ThreadedButton("Force upload", - partial(self.push_thread, wallet), + partial(self.push, wallet), partial(self.done_processing_success, d), partial(self.done_processing_error, d)) download = ThreadedButton("Force download", - partial(self.pull_thread, wallet, True), + partial(self.pull, wallet, True), partial(self.done_processing_success, d), partial(self.done_processing_error, d)) vbox = QVBoxLayout() diff --git a/electrum/plugins/trustedcoin/trustedcoin.py b/electrum/plugins/trustedcoin/trustedcoin.py @@ -22,10 +22,9 @@ # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. - +import asyncio import socket import os -import requests import json import base64 from urllib.parse import urljoin @@ -38,8 +37,9 @@ from electrum.mnemonic import Mnemonic from electrum.wallet import Multisig_Wallet, Deterministic_Wallet from electrum.i18n import _ from electrum.plugin import BasePlugin, hook -from electrum.util import NotEnoughFunds +from electrum.util import NotEnoughFunds, make_aiohttp_session from electrum.storage import STO_EV_USER_PW +from electrum.network import Network # signing_xpub is hardcoded so that the wallet can be restored from seed, without TrustedCoin's server def get_signing_xpub(): @@ -104,34 +104,47 @@ class TrustedCoinCosignerClient(object): self.user_agent = user_agent def send_request(self, method, relative_url, data=None): - kwargs = {'headers': {}} - if self.user_agent: - kwargs['headers']['user-agent'] = self.user_agent - if method == 'get' and data: - kwargs['params'] = data - elif method == 'post' and data: - kwargs['data'] = json.dumps(data) - kwargs['headers']['content-type'] = 'application/json' + network = Network.get_instance() + if network: + return asyncio.run_coroutine_threadsafe(self._send_request(method, relative_url, data), network.asyncio_loop).result() + else: + raise ErrorConnectingServer('You are offline.') + + async def handle_response(self, resp): + if resp.status != 200: + try: + r = await resp.json() + message = r['message'] + except: + message = await resp.text() + raise TrustedCoinException(message, resp.status) + try: + return await resp.json() + except: + return await resp.text() + + async def _send_request(self, method, relative_url, data): url = urljoin(self.base_url, relative_url) if self.debug: print('%s %s %s' % (method, url, data)) + headers = {} + if self.user_agent: + headers['user-agent'] = self.user_agent try: - response = requests.request(method, url, **kwargs) + proxy = Network.get_instance().proxy + async with make_aiohttp_session(proxy) as session: + if method == 'get': + async with session.get(url, params=data, headers=headers) as resp: + return await self.handle_response(resp) + elif method == 'post': + async with session.post(url, json=data, headers=headers) as resp: + return await self.handle_response(resp) + else: + assert False + except TrustedCoinException: + raise except Exception as e: raise ErrorConnectingServer(e) - if self.debug: - print(response.text) - if response.status_code != 200: - message = str(response.text) - if response.headers.get('content-type') == 'application/json': - r = response.json() - if 'message' in r: - message = r['message'] - raise TrustedCoinException(message, response.status_code) - if response.headers.get('content-type') == 'application/json': - return response.json() - else: - return response.text def get_terms_of_service(self, billing_plan='electrum-per-tx-otp'): """ diff --git a/electrum/synchronizer.py b/electrum/synchronizer.py @@ -22,102 +22,65 @@ # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. -from threading import Lock +import asyncio import hashlib -# from .bitcoin import Hash, hash_encode +from aiorpcx import TaskGroup + from .transaction import Transaction -from .util import ThreadJob, bh2u +from .util import bh2u, PrintError +from .bitcoin import address_to_scripthash + + +def history_status(h): + if not h: + return None + status = '' + for tx_hash, height in h: + status += tx_hash + ':%d:' % height + return bh2u(hashlib.sha256(status.encode('ascii')).digest()) -class Synchronizer(ThreadJob): +class Synchronizer(PrintError): '''The synchronizer keeps the wallet up-to-date with its set of addresses and their transactions. It subscribes over the network to wallet addresses, gets the wallet to generate new addresses when necessary, requests the transaction history of any addresses we don't have the full history of, and requests binary transaction data of any transactions the wallet doesn't have. - - External interface: __init__() and add() member functions. ''' - - def __init__(self, wallet, network): + def __init__(self, wallet): self.wallet = wallet - self.network = network - self.new_addresses = set() - # Entries are (tx_hash, tx_height) tuples self.requested_tx = {} self.requested_histories = {} self.requested_addrs = set() - self.lock = Lock() + self.scripthash_to_address = {} + # Queues + self.add_queue = asyncio.Queue() + self.status_queue = asyncio.Queue() - self.initialized = False - self.initialize() + def is_up_to_date(self): + return (not self.requested_addrs + and not self.requested_histories + and not self.requested_tx) - def parse_response(self, response): - if response.get('error'): - self.print_error("response error:", response) - return None, None - return response['params'], response['result'] + def add(self, addr): + self.requested_addrs.add(addr) + self.add_queue.put_nowait(addr) - def is_up_to_date(self): - return (not self.requested_tx and not self.requested_histories - and not self.requested_addrs) - - def release(self): - self.network.unsubscribe(self.on_address_status) - - def add(self, address): - '''This can be called from the proxy or GUI threads.''' - with self.lock: - self.new_addresses.add(address) - - def subscribe_to_addresses(self, addresses): - if addresses: - self.requested_addrs |= addresses - self.network.subscribe_to_addresses(addresses, self.on_address_status) - - def get_status(self, h): - if not h: - return None - status = '' - for tx_hash, height in h: - status += tx_hash + ':%d:' % height - return bh2u(hashlib.sha256(status.encode('ascii')).digest()) - - def on_address_status(self, response): - if self.wallet.synchronizer is None and self.initialized: - return # we have been killed, this was just an orphan callback - params, result = self.parse_response(response) - if not params: - return - addr = params[0] + async def on_address_status(self, addr, status): history = self.wallet.history.get(addr, []) - if self.get_status(history) != result: - # note that at this point 'result' can be None; - # if we had a history for addr but now the server is telling us - # there is no history - if addr not in self.requested_histories: - self.requested_histories[addr] = result - self.network.request_address_history(addr, self.on_address_history) - # remove addr from list only after it is added to requested_histories - if addr in self.requested_addrs: # Notifications won't be in - self.requested_addrs.remove(addr) - - def on_address_history(self, response): - if self.wallet.synchronizer is None and self.initialized: - return # we have been killed, this was just an orphan callback - params, result = self.parse_response(response) - if not params: + if history_status(history) == status: return - addr = params[0] - try: - server_status = self.requested_histories[addr] - except KeyError: - # note: server_status can be None even if we asked for the history, - # so it is not sufficient to test that - self.print_error("receiving history (unsolicited)", addr, len(result)) + # note that at this point 'result' can be None; + # if we had a history for addr but now the server is telling us + # there is no history + if addr in self.requested_histories: return + # request address history + self.requested_histories[addr] = status + h = address_to_scripthash(addr) + result = await self.session.send_request("blockchain.scripthash.get_history", [h]) self.print_error("receiving history", addr, len(result)) hashes = set(map(lambda item: item['tx_hash'], result)) hist = list(map(lambda item: (item['tx_hash'], item['height']), result)) @@ -128,23 +91,36 @@ class Synchronizer(ThreadJob): if len(hashes) != len(result): self.print_error("error: server history has non-unique txids: %s"% addr) # Check that the status corresponds to what was announced - elif self.get_status(hist) != server_status: + elif history_status(hist) != status: self.print_error("error: status mismatch: %s" % addr) else: # Store received history self.wallet.receive_history_callback(addr, hist, tx_fees) # Request transactions we don't have - self.request_missing_txs(hist) + await self.request_missing_txs(hist) + # Remove request; this allows up_to_date to be True self.requested_histories.pop(addr) - def on_tx_response(self, response): - if self.wallet.synchronizer is None and self.initialized: - return # we have been killed, this was just an orphan callback - params, result = self.parse_response(response) - if not params: - return - tx_hash = params[0] + if self.wallet.network: self.wallet.network.notify('updated') + + async def request_missing_txs(self, hist): + # "hist" is a list of [tx_hash, tx_height] lists + transaction_hashes = [] + for tx_hash, tx_height in hist: + if tx_hash in self.requested_tx: + continue + if tx_hash in self.wallet.transactions: + continue + transaction_hashes.append(tx_hash) + self.requested_tx[tx_hash] = tx_height + + async with TaskGroup() as group: + for tx_hash in transaction_hashes: + await group.spawn(self.get_transaction, tx_hash) + + async def get_transaction(self, tx_hash): + result = await self.session.send_request('blockchain.transaction.get', [tx_hash]) tx = Transaction(result) try: tx.deserialize() @@ -160,54 +136,47 @@ class Synchronizer(ThreadJob): self.print_error("received tx %s height: %d bytes: %d" % (tx_hash, tx_height, len(tx.raw))) # callbacks - self.network.trigger_callback('new_transaction', tx) - if not self.requested_tx: - self.network.trigger_callback('updated') - - def request_missing_txs(self, hist): - # "hist" is a list of [tx_hash, tx_height] lists - transaction_hashes = [] - for tx_hash, tx_height in hist: - if tx_hash in self.requested_tx: - continue - if tx_hash in self.wallet.transactions: - continue - transaction_hashes.append(tx_hash) - self.requested_tx[tx_hash] = tx_height - - self.network.get_transactions(transaction_hashes, self.on_tx_response) - - def initialize(self): - '''Check the initial state of the wallet. Subscribe to all its - addresses, and request any transactions in its address history - we don't have. - ''' - for history in self.wallet.history.values(): - # Old electrum servers returned ['*'] when all history for - # the address was pruned. This no longer happens but may - # remain in old wallets. - if history == ['*']: - continue - self.request_missing_txs(history) - - if self.requested_tx: - self.print_error("missing tx", self.requested_tx) - self.subscribe_to_addresses(set(self.wallet.get_addresses())) - self.initialized = True - - def run(self): - '''Called from the network proxy thread main loop.''' - # 1. Create new addresses - self.wallet.synchronize() - - # 2. Subscribe to new addresses - with self.lock: - addresses = self.new_addresses - self.new_addresses = set() - self.subscribe_to_addresses(addresses) - - # 3. Detect if situation has changed - up_to_date = self.is_up_to_date() - if up_to_date != self.wallet.is_up_to_date(): - self.wallet.set_up_to_date(up_to_date) - self.network.trigger_callback('updated') + self.wallet.network.trigger_callback('new_transaction', tx) + + async def subscribe_to_address(self, addr): + h = address_to_scripthash(addr) + self.scripthash_to_address[h] = addr + await self.session.subscribe('blockchain.scripthash.subscribe', [h], self.status_queue) + self.requested_addrs.remove(addr) + + async def send_subscriptions(self, group: TaskGroup): + while True: + addr = await self.add_queue.get() + await group.spawn(self.subscribe_to_address, addr) + + async def handle_status(self, group: TaskGroup): + while True: + h, status = await self.status_queue.get() + addr = self.scripthash_to_address[h] + await group.spawn(self.on_address_status, addr, status) + + @property + def session(self): + s = self.wallet.network.interface.session + assert s is not None + return s + + async def main(self): + # request missing txns, if any + async with TaskGroup() as group: + for history in self.wallet.history.values(): + # Old electrum servers returned ['*'] when all history for the address + # was pruned. This no longer happens but may remain in old wallets. + if history == ['*']: continue + await group.spawn(self.request_missing_txs, history) + # add addresses to bootstrap + for addr in self.wallet.get_addresses(): + self.add(addr) + # main loop + while True: + await asyncio.sleep(0.1) + self.wallet.synchronize() + up_to_date = self.is_up_to_date() + if up_to_date != self.wallet.is_up_to_date(): + self.wallet.set_up_to_date(up_to_date) + self.wallet.network.trigger_callback('updated') diff --git a/electrum/tests/test_interface.py b/electrum/tests/test_interface.py @@ -1,26 +0,0 @@ -from electrum import interface - -from . import SequentialTestCase - - -class TestInterface(SequentialTestCase): - - def test_match_host_name(self): - self.assertTrue(interface._match_hostname('asd.fgh.com', 'asd.fgh.com')) - self.assertFalse(interface._match_hostname('asd.fgh.com', 'asd.zxc.com')) - self.assertTrue(interface._match_hostname('asd.fgh.com', '*.fgh.com')) - self.assertFalse(interface._match_hostname('asd.fgh.com', '*fgh.com')) - self.assertFalse(interface._match_hostname('asd.fgh.com', '*.zxc.com')) - - def test_check_host_name(self): - i = interface.TcpConnection(server=':1:', queue=None, config_path=None) - - self.assertFalse(i.check_host_name(None, None)) - self.assertFalse(i.check_host_name( - peercert={'subjectAltName': []}, name='')) - self.assertTrue(i.check_host_name( - peercert={'subjectAltName': [('DNS', 'foo.bar.com')]}, - name='foo.bar.com')) - self.assertTrue(i.check_host_name( - peercert={'subject': [('commonName', 'foo.bar.com')]}, - name='foo.bar.com')) diff --git a/electrum/tests/test_network.py b/electrum/tests/test_network.py @@ -0,0 +1,119 @@ +import asyncio +import tempfile +import unittest + +from electrum import constants +from electrum.simple_config import SimpleConfig +from electrum import blockchain +from electrum.interface import Interface + +class MockInterface(Interface): + def __init__(self, config): + self.config = config + super().__init__(None, 'mock-server:50000:t', self.config.electrum_path(), None) + self.q = asyncio.Queue() + self.blockchain = blockchain.Blockchain(self.config, 2002, None) + self.tip = 12 + async def get_block_header(self, height, assert_mode): + assert self.q.qsize() > 0, (height, assert_mode) + item = await self.q.get() + print("step with height", height, item) + assert item['block_height'] == height, (item['block_height'], height) + assert assert_mode in item['mock'], (assert_mode, item) + return item + +class TestNetwork(unittest.TestCase): + + @classmethod + def setUpClass(cls): + super().setUpClass() + constants.set_regtest() + + @classmethod + def tearDownClass(cls): + super().tearDownClass() + constants.set_mainnet() + + def setUp(self): + self.config = SimpleConfig({'electrum_path': tempfile.mkdtemp(prefix="test_network")}) + self.interface = MockInterface(self.config) + + def test_new_fork(self): + blockchain.blockchains = {} + self.interface.q.put_nowait({'block_height': 8, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: False}}) + def mock_connect(height): + return height == 6 + self.interface.q.put_nowait({'block_height': 7, 'mock': {'backward':1,'check': lambda x: False, 'connect': mock_connect, 'fork': self.mock_fork}}) + self.interface.q.put_nowait({'block_height': 2, 'mock': {'backward':1,'check':lambda x: True, 'connect': lambda x: False}}) + self.interface.q.put_nowait({'block_height': 4, 'mock': {'binary':1,'check':lambda x: True, 'connect': lambda x: True}}) + self.interface.q.put_nowait({'block_height': 5, 'mock': {'binary':1,'check':lambda x: True, 'connect': lambda x: True}}) + self.interface.q.put_nowait({'block_height': 6, 'mock': {'binary':1,'check':lambda x: True, 'connect': lambda x: True}}) + ifa = self.interface + self.assertEqual(('fork', 8), asyncio.get_event_loop().run_until_complete(ifa.sync_until(8, next_height=8))) + self.assertEqual(self.interface.q.qsize(), 0) + + def test_new_can_connect_during_backward(self): + blockchain.blockchains = {} + self.interface.q.put_nowait({'block_height': 8, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: False}}) + def mock_connect(height): + return height == 2 + self.interface.q.put_nowait({'block_height': 7, 'mock': {'backward':1, 'check': lambda x: False, 'connect': mock_connect, 'fork': self.mock_fork}}) + self.interface.q.put_nowait({'block_height': 2, 'mock': {'backward':1, 'check': lambda x: False, 'connect': mock_connect, 'fork': self.mock_fork}}) + self.interface.q.put_nowait({'block_height': 3, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: True}}) + self.interface.q.put_nowait({'block_height': 4, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: True}}) + ifa = self.interface + self.assertEqual(('catchup', 5), asyncio.get_event_loop().run_until_complete(ifa.sync_until(8, next_height=5))) + self.assertEqual(self.interface.q.qsize(), 0) + + def mock_fork(self, bad_header): + return blockchain.Blockchain(self.config, bad_header['block_height'], None) + + def test_new_chain_false_during_binary(self): + blockchain.blockchains = {} + self.interface.q.put_nowait({'block_height': 8, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: False}}) + mock_connect = lambda height: height == 3 + self.interface.q.put_nowait({'block_height': 7, 'mock': {'backward':1, 'check': lambda x: False, 'connect': mock_connect}}) + self.interface.q.put_nowait({'block_height': 2, 'mock': {'backward':1, 'check': lambda x: True, 'connect': mock_connect}}) + self.interface.q.put_nowait({'block_height': 4, 'mock': {'binary':1, 'check': lambda x: False, 'fork': self.mock_fork, 'connect': mock_connect}}) + self.interface.q.put_nowait({'block_height': 3, 'mock': {'binary':1, 'check': lambda x: True, 'connect': lambda x: True}}) + self.interface.q.put_nowait({'block_height': 5, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: True}}) + self.interface.q.put_nowait({'block_height': 6, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: True}}) + ifa = self.interface + self.assertEqual(('catchup', 7), asyncio.get_event_loop().run_until_complete(ifa.sync_until(8, next_height=7))) + self.assertEqual(self.interface.q.qsize(), 0) + + def test_new_join(self): + blockchain.blockchains = {7: {'check': lambda bad_header: True}} + self.interface.q.put_nowait({'block_height': 8, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: False}}) + self.interface.q.put_nowait({'block_height': 7, 'mock': {'backward':1, 'check': lambda x: False, 'connect': lambda height: height == 6}}) + self.interface.q.put_nowait({'block_height': 2, 'mock': {'backward':1, 'check': lambda x: True, 'connect': lambda x: False}}) + self.interface.q.put_nowait({'block_height': 4, 'mock': {'binary':1, 'check': lambda x: True, 'connect': lambda x: False}}) + self.interface.q.put_nowait({'block_height': 5, 'mock': {'binary':1, 'check': lambda x: True, 'connect': lambda x: False}}) + self.interface.q.put_nowait({'block_height': 6, 'mock': {'binary':1, 'check': lambda x: True, 'connect': lambda x: True}}) + ifa = self.interface + self.assertEqual(('join', 7), asyncio.get_event_loop().run_until_complete(ifa.sync_until(8, next_height=7))) + self.assertEqual(self.interface.q.qsize(), 0) + + def test_new_reorg(self): + times = 0 + def check(header): + nonlocal times + self.assertEqual(header['block_height'], 7) + times += 1 + return times != 1 + blockchain.blockchains = {7: {'check': check, 'parent': {'check': lambda x: True}}} + self.interface.q.put_nowait({'block_height': 8, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: False}}) + self.interface.q.put_nowait({'block_height': 7, 'mock': {'backward':1, 'check': lambda x: False, 'connect': lambda height: height == 6}}) + self.interface.q.put_nowait({'block_height': 2, 'mock': {'backward':1, 'check': lambda x: 1, 'connect': lambda x: False}}) + self.interface.q.put_nowait({'block_height': 4, 'mock': {'binary':1, 'check': lambda x: 1, 'connect': lambda x: False}}) + self.interface.q.put_nowait({'block_height': 5, 'mock': {'binary':1, 'check': lambda x: 1, 'connect': lambda x: False}}) + self.interface.q.put_nowait({'block_height': 6, 'mock': {'binary':1, 'check': lambda x: 1, 'connect': lambda x: True}}) + self.interface.q.put_nowait({'block_height': 7, 'mock': {'binary':1, 'check': lambda x: False, 'connect': lambda x: True}}) + ifa = self.interface + self.assertEqual(('join', 8), asyncio.get_event_loop().run_until_complete(ifa.sync_until(8, next_height=8))) + self.assertEqual(self.interface.q.qsize(), 0) + self.assertEqual(times, 2) + +if __name__=="__main__": + constants.set_regtest() + unittest.main() diff --git a/electrum/util.py b/electrum/util.py @@ -34,12 +34,16 @@ import hmac import stat import inspect from locale import localeconv +import asyncio +import urllib.request, urllib.parse, urllib.error +import queue -from .i18n import _ +import aiohttp +from aiohttp_socks import SocksConnector, SocksVer +from aiorpcx import TaskGroup +from .i18n import _ -import urllib.request, urllib.parse, urllib.error -import queue def inv_dict(d): return {v: k for k, v in d.items()} @@ -926,6 +930,27 @@ def make_dir(path, allow_symlink=True): os.chmod(path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR) +class AIOSafeSilentException(Exception): pass + + +def aiosafe(f): + # save exception in object. + # f must be a method of a PrintError instance. + # aiosafe calls should not be nested + async def f2(*args, **kwargs): + self = args[0] + try: + return await f(*args, **kwargs) + except AIOSafeSilentException as e: + self.exception = e + except asyncio.CancelledError as e: + self.exception = e + except BaseException as e: + self.print_error("Exception in", f.__name__, ":", e.__class__.__name__, str(e)) + traceback.print_exc(file=sys.stderr) + self.exception = e + return f2 + TxMinedStatus = NamedTuple("TxMinedStatus", [("height", int), ("conf", int), ("timestamp", int), @@ -934,3 +959,26 @@ VerifiedTxInfo = NamedTuple("VerifiedTxInfo", [("height", int), ("timestamp", int), ("txpos", int), ("header_hash", str)]) + +def make_aiohttp_session(proxy): + if proxy: + connector = SocksConnector( + socks_ver=SocksVer.SOCKS5 if proxy['mode'] == 'socks5' else SocksVer.SOCKS4, + host=proxy['host'], + port=int(proxy['port']), + username=proxy.get('user', None), + password=proxy.get('password', None), + rdns=True + ) + return aiohttp.ClientSession(headers={'User-Agent' : 'Electrum'}, timeout=aiohttp.ClientTimeout(total=10), connector=connector) + else: + return aiohttp.ClientSession(headers={'User-Agent' : 'Electrum'}, timeout=aiohttp.ClientTimeout(total=10)) + + +class CustomTaskGroup(TaskGroup): + + def spawn(self, *args, **kwargs): + # don't complain if group is already closed. + if self._closed: + raise asyncio.CancelledError() + return super().spawn(*args, **kwargs) diff --git a/electrum/verifier.py b/electrum/verifier.py @@ -21,12 +21,16 @@ # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. +import asyncio from typing import Sequence, Optional +from aiorpcx import TaskGroup + from .util import ThreadJob, bh2u, VerifiedTxInfo from .bitcoin import Hash, hash_decode, hash_encode from .transaction import Transaction from .blockchain import hash_header +from .interface import GracefulDisconnect class MerkleVerificationFailure(Exception): pass @@ -45,17 +49,20 @@ class SPV(ThreadJob): self.merkle_roots = {} # txid -> merkle root (once it has been verified) self.requested_merkle = set() # txid set of pending requests - def run(self): - interface = self.network.interface - if not interface: - return + async def main(self, group: TaskGroup): + while True: + await self._request_proofs(group) + await asyncio.sleep(0.1) - blockchain = interface.blockchain + async def _request_proofs(self, group: TaskGroup): + blockchain = self.network.blockchain() if not blockchain: + self.print_error("no blockchain") return local_height = self.network.get_local_height() unverified = self.wallet.get_unverified_txs() + for tx_hash, tx_height in unverified.items(): # do not request merkle branch before headers are available if tx_height <= 0 or tx_height > local_height: @@ -65,31 +72,21 @@ class SPV(ThreadJob): if header is None: index = tx_height // 2016 if index < len(blockchain.checkpoints): - self.network.request_chunk(interface, index) + await group.spawn(self.network.request_chunk(tx_height, None, can_return_early=True)) elif (tx_hash not in self.requested_merkle and tx_hash not in self.merkle_roots): - self.network.get_merkle_for_transaction( - tx_hash, - tx_height, - self.verify_merkle) self.print_error('requested merkle', tx_hash) self.requested_merkle.add(tx_hash) + await group.spawn(self._request_and_verify_single_proof, tx_hash, tx_height) if self.network.blockchain() != self.blockchain: self.blockchain = self.network.blockchain() - self.undo_verifications() + self._undo_verifications() - def verify_merkle(self, response): - if self.wallet.verifier is None: - return # we have been killed, this was just an orphan callback - if response.get('error'): - self.print_error('received an error:', response) - return - params = response['params'] - merkle = response['result'] + async def _request_and_verify_single_proof(self, tx_hash, tx_height): + merkle = await self.network.get_merkle_for_transaction(tx_hash, tx_height) # Verify the hash of the server-provided merkle branch to a # transaction matches the merkle root of its block - tx_hash = params[0] tx_height = merkle.get('block_height') pos = merkle.get('pos') merkle_branch = merkle.get('merkle') @@ -98,14 +95,10 @@ class SPV(ThreadJob): verify_tx_is_in_block(tx_hash, merkle_branch, pos, header, tx_height) except MerkleVerificationFailure as e: self.print_error(str(e)) - # FIXME: we should make a fresh connection to a server - # to recover from this, as this TX will now never verify - return + raise GracefulDisconnect(e) # we passed all the tests self.merkle_roots[tx_hash] = header.get('merkle_root') try: - # note: we could pop in the beginning, but then we would request - # this proof again in case of verification failure from the same server self.requested_merkle.remove(tx_hash) except KeyError: pass self.print_error("verified %s" % tx_hash) @@ -144,7 +137,7 @@ class SPV(ThreadJob): else: raise InnerNodeOfSpvProofIsValidTx() - def undo_verifications(self): + def _undo_verifications(self): height = self.blockchain.get_forkpoint() tx_hashes = self.wallet.undo_verifications(self.blockchain, height) for tx_hash in tx_hashes: diff --git a/electrum/version.py b/electrum/version.py @@ -1,7 +1,7 @@ ELECTRUM_VERSION = '3.2.3' # version of the client package APK_VERSION = '3.2.3.1' # read by buildozer.spec -PROTOCOL_VERSION = '1.2' # protocol version requested +PROTOCOL_VERSION = '1.4' # protocol version requested # The hash of the mnemonic seed must begin with this SEED_PREFIX = '01' # Standard wallet diff --git a/run_electrum b/run_electrum @@ -48,6 +48,7 @@ def check_imports(): import qrcode import google.protobuf import jsonrpclib + import aiorpcx except ImportError as e: sys.exit("Error: %s. Try 'sudo pip install <module-name>'"%str(e)) # the following imports are for pyinstaller @@ -134,7 +135,7 @@ def run_non_RPC(config): if not config.get('offline'): network = Network(config) network.start() - wallet.start_threads(network) + wallet.start_network(network) print_msg("Recovering wallet...") wallet.synchronize() wallet.wait_until_synchronized()