commit e37da62a1ce68ed2374ddc18796a89bd21529b70
parent 5c4a6c0f2b531bd84d4a0c5dad91ace4b8ad3596
Author: SomberNight <somber.night@protonmail.com>
Date: Fri, 2 Nov 2018 20:14:59 +0100
fix most "scripts"
related: #4754
Diffstat:
15 files changed, 266 insertions(+), 194 deletions(-)
diff --git a/electrum/bitcoin.py b/electrum/bitcoin.py
@@ -284,13 +284,15 @@ def script_to_address(script, *, net=None):
assert t == TYPE_ADDRESS
return addr
-def address_to_script(addr, *, net=None):
+def address_to_script(addr: str, *, net=None) -> str:
if net is None:
net = constants.net
+ if not is_address(addr, net=net):
+ raise BitcoinException(f"invalid bitcoin address: {addr}")
witver, witprog = segwit_addr.decode(net.SEGWIT_HRP, addr)
if witprog is not None:
if not (0 <= witver <= 16):
- raise BitcoinException('impossible witness version: {}'.format(witver))
+ raise BitcoinException(f'impossible witness version: {witver}')
OP_n = witver + 0x50 if witver > 0 else 0
script = bh2u(bytes([OP_n]))
script += push_script(bh2u(bytes(witprog)))
@@ -305,7 +307,7 @@ def address_to_script(addr, *, net=None):
script += push_script(bh2u(hash_160_))
script += '87' # op_equal
else:
- raise BitcoinException('unknown address type: {}'.format(addrtype))
+ raise BitcoinException(f'unknown address type: {addrtype}')
return script
def address_to_scripthash(addr):
@@ -491,24 +493,28 @@ def address_from_private_key(sec):
public_key = ecc.ECPrivkey(privkey).get_public_key_hex(compressed=compressed)
return pubkey_to_address(txin_type, public_key)
-def is_segwit_address(addr):
+def is_segwit_address(addr, *, net=None):
+ if net is None: net = constants.net
try:
- witver, witprog = segwit_addr.decode(constants.net.SEGWIT_HRP, addr)
+ witver, witprog = segwit_addr.decode(net.SEGWIT_HRP, addr)
except Exception as e:
return False
return witprog is not None
-def is_b58_address(addr):
+def is_b58_address(addr, *, net=None):
+ if net is None: net = constants.net
try:
addrtype, h = b58_address_to_hash160(addr)
except Exception as e:
return False
- if addrtype not in [constants.net.ADDRTYPE_P2PKH, constants.net.ADDRTYPE_P2SH]:
+ if addrtype not in [net.ADDRTYPE_P2PKH, net.ADDRTYPE_P2SH]:
return False
return addr == hash160_to_b58_address(h, addrtype)
-def is_address(addr):
- return is_segwit_address(addr) or is_b58_address(addr)
+def is_address(addr, *, net=None):
+ if net is None: net = constants.net
+ return is_segwit_address(addr, net=net) \
+ or is_b58_address(addr, net=net)
def is_private_key(key):
diff --git a/electrum/blockchain.py b/electrum/blockchain.py
@@ -72,7 +72,11 @@ def hash_header(header: dict) -> str:
return '0' * 64
if header.get('prev_block_hash') is None:
header['prev_block_hash'] = '00'*32
- return hash_encode(sha256d(bfh(serialize_header(header))))
+ return hash_raw_header(serialize_header(header))
+
+
+def hash_raw_header(header: str) -> str:
+ return hash_encode(sha256d(bfh(header)))
blockchains = {} # type: Dict[int, Blockchain]
diff --git a/electrum/daemon.py b/electrum/daemon.py
@@ -30,15 +30,14 @@ import traceback
import sys
import threading
from typing import Dict, Optional, Tuple
-import re
import jsonrpclib
from .jsonrpc import VerifyingJSONRPCServer
from .version import ELECTRUM_VERSION
from .network import Network
-from .util import json_decode, DaemonThread
-from .util import print_error, to_string
+from .util import (json_decode, DaemonThread, print_error, to_string,
+ create_and_start_event_loop)
from .wallet import Wallet, Abstract_Wallet
from .storage import WalletStorage
from .commands import known_commands, Commands
@@ -128,7 +127,7 @@ class Daemon(DaemonThread):
if fd is None and listen_jsonrpc:
fd, server = get_fd_or_server(config)
if fd is None: raise Exception('failed to lock daemon; already running?')
- self.create_and_start_event_loop()
+ self.asyncio_loop, self._stop_loop, self._loop_thread = create_and_start_event_loop()
if config.get('offline'):
self.network = None
else:
@@ -330,22 +329,3 @@ class Daemon(DaemonThread):
except BaseException as e:
traceback.print_exc(file=sys.stdout)
# app will exit now
-
- def create_and_start_event_loop(self):
- def on_exception(loop, context):
- """Suppress spurious messages it appears we cannot control."""
- SUPPRESS_MESSAGE_REGEX = re.compile('SSL handshake|Fatal read error on|'
- 'SSL error in data received')
- message = context.get('message')
- if message and SUPPRESS_MESSAGE_REGEX.match(message):
- return
- loop.default_exception_handler(context)
-
- self.asyncio_loop = asyncio.get_event_loop()
- self.asyncio_loop.set_exception_handler(on_exception)
- # self.asyncio_loop.set_debug(1)
- self._stop_loop = asyncio.Future()
- self._loop_thread = threading.Thread(target=self.asyncio_loop.run_until_complete,
- args=(self._stop_loop,),
- name='EventLoop')
- self._loop_thread.start()
diff --git a/electrum/network.py b/electrum/network.py
@@ -32,7 +32,7 @@ import json
import sys
import ipaddress
import asyncio
-from typing import NamedTuple, Optional, Sequence, List, Dict
+from typing import NamedTuple, Optional, Sequence, List, Dict, Tuple
import traceback
import dns
@@ -53,7 +53,7 @@ NODES_RETRY_INTERVAL = 60
SERVER_RETRY_INTERVAL = 10
-def parse_servers(result):
+def parse_servers(result: Sequence[Tuple[str, str, List[str]]]) -> Dict[str, dict]:
""" parse servers list into dict format"""
servers = {}
for item in result:
@@ -170,6 +170,7 @@ class Network(PrintError):
INSTANCE = self
self.asyncio_loop = asyncio.get_event_loop()
+ assert self.asyncio_loop.is_running(), "event loop not running"
self._loop_thread = None # type: threading.Thread # set by caller; only used for sanity checks
if config is None:
@@ -225,6 +226,8 @@ class Network(PrintError):
self.server_queue = None
self.proxy = None
+ self._set_status('disconnected')
+
def run_from_another_thread(self, coro):
assert self._loop_thread != threading.current_thread(), 'must not be called from network thread'
fut = asyncio.run_coroutine_threadsafe(coro, self.asyncio_loop)
@@ -411,10 +414,10 @@ class Network(PrintError):
out = filter_noonion(out)
return out
- def _start_interface(self, server):
+ def _start_interface(self, server: str):
if server not in self.interfaces and server not in self.connecting:
if server == self.default_server:
- self.print_error("connecting to %s as new interface" % server)
+ self.print_error(f"connecting to {server} as new interface")
self._set_status('connecting')
self.connecting.add(server)
self.server_queue.put(server)
diff --git a/electrum/scripts/bip70.py b/electrum/scripts/bip70.py
@@ -1,5 +1,6 @@
#!/usr/bin/env python3
# create a BIP70 payment request signed with a certificate
+# FIXME: the code here is outdated, and no longer working
import tlslite
diff --git a/electrum/scripts/block_headers.py b/electrum/scripts/block_headers.py
@@ -3,29 +3,33 @@
# A simple script that connects to a server and displays block headers
import time
-import sys
+import asyncio
-from .. import SimpleConfig, Network
-from electrum.util import print_msg, json_encode
+from electrum.network import Network
+from electrum.util import print_msg, json_encode, create_and_start_event_loop, log_exceptions
# start network
-c = SimpleConfig()
-network = Network(c)
+loop, stopping_fut, loop_thread = create_and_start_event_loop()
+network = Network()
network.start()
# wait until connected
-while network.is_connecting():
- time.sleep(0.1)
+while not network.is_connected():
+ time.sleep(1)
+ print_msg("waiting for network to get connected...")
-if not network.is_connected():
- print_msg("daemon is not connected")
- sys.exit(1)
+header_queue = asyncio.Queue()
-# 2. send the subscription
-callback = lambda response: print_msg(json_encode(response.get('result')))
-network.send([('server.version',["block_headers script", "1.2"])], callback)
-network.subscribe_to_headers(callback)
+@log_exceptions
+async def f():
+ try:
+ await network.interface.session.subscribe('blockchain.headers.subscribe', [], header_queue)
+ # 3. wait for results
+ while network.is_connected():
+ header = await header_queue.get()
+ print_msg(json_encode(header))
+ finally:
+ stopping_fut.set_result(1)
-# 3. wait for results
-while network.is_connected():
- time.sleep(1)
+# 2. send the subscription
+asyncio.run_coroutine_threadsafe(f(), loop)
diff --git a/electrum/scripts/estimate_fee.py b/electrum/scripts/estimate_fee.py
@@ -1,7 +1,29 @@
#!/usr/bin/env python3
-from . import util
import json
-from electrum.network import filter_protocol
-peers = filter_protocol(util.get_peers())
-results = util.send_request(peers, 'blockchain.estimatefee', [2])
-print(json.dumps(results, indent=4))
+import asyncio
+from statistics import median
+from numbers import Number
+
+from electrum.network import filter_protocol, Network
+from electrum.util import create_and_start_event_loop, log_exceptions
+
+import util
+
+
+loop, stopping_fut, loop_thread = create_and_start_event_loop()
+network = Network()
+network.start()
+
+@log_exceptions
+async def f():
+ try:
+ peers = await util.get_peers(network)
+ peers = filter_protocol(peers)
+ results = await util.send_request(network, peers, 'blockchain.estimatefee', [2])
+ print(json.dumps(results, indent=4))
+ feerate_estimates = filter(lambda x: isinstance(x, Number), results.values())
+ print(f"median feerate: {median(feerate_estimates)}")
+ finally:
+ stopping_fut.set_result(1)
+
+asyncio.run_coroutine_threadsafe(f(), loop)
diff --git a/electrum/scripts/get_history.py b/electrum/scripts/get_history.py
@@ -1,9 +1,12 @@
#!/usr/bin/env python3
import sys
-from .. import Network
-from electrum.util import json_encode, print_msg
+import asyncio
+
from electrum import bitcoin
+from electrum.network import Network
+from electrum.util import json_encode, print_msg, create_and_start_event_loop, log_exceptions
+
try:
addr = sys.argv[1]
@@ -11,8 +14,17 @@ except Exception:
print("usage: get_history <bitcoin_address>")
sys.exit(1)
-n = Network()
-n.start()
-_hash = bitcoin.address_to_scripthash(addr)
-h = n.get_history_for_scripthash(_hash)
-print_msg(json_encode(h))
+loop, stopping_fut, loop_thread = create_and_start_event_loop()
+network = Network()
+network.start()
+
+@log_exceptions
+async def f():
+ try:
+ sh = bitcoin.address_to_scripthash(addr)
+ hist = await network.get_history_for_scripthash(sh)
+ print_msg(json_encode(hist))
+ finally:
+ stopping_fut.set_result(1)
+
+asyncio.run_coroutine_threadsafe(f(), loop)
diff --git a/electrum/scripts/peers.py b/electrum/scripts/peers.py
@@ -1,14 +1,28 @@
#!/usr/bin/env python3
+import asyncio
-from . import util
+from electrum.network import filter_protocol, Network
+from electrum.util import create_and_start_event_loop, log_exceptions
+from electrum.blockchain import hash_raw_header
-from electrum.network import filter_protocol
-from electrum.blockchain import hash_header
+import util
-peers = util.get_peers()
-peers = filter_protocol(peers, 's')
-results = util.send_request(peers, 'blockchain.headers.subscribe', [])
+loop, stopping_fut, loop_thread = create_and_start_event_loop()
+network = Network()
+network.start()
-for n,v in sorted(results.items(), key=lambda x:x[1].get('block_height')):
- print("%60s"%n, v.get('block_height'), hash_header(v))
+@log_exceptions
+async def f():
+ try:
+ peers = await util.get_peers(network)
+ peers = filter_protocol(peers, 's')
+ results = await util.send_request(network, peers, 'blockchain.headers.subscribe', [])
+ for server, header in sorted(results.items(), key=lambda x: x[1].get('height')):
+ height = header.get('height')
+ blockhash = hash_raw_header(header.get('hex'))
+ print("%60s" % server, height, blockhash)
+ finally:
+ stopping_fut.set_result(1)
+
+asyncio.run_coroutine_threadsafe(f(), loop)
diff --git a/electrum/scripts/servers.py b/electrum/scripts/servers.py
@@ -1,10 +1,27 @@
#!/usr/bin/env python3
-
-from .. import set_verbosity
-from electrum.network import filter_version
-from . import util
import json
-set_verbosity(False)
+import asyncio
+
+from electrum.network import filter_version, Network
+from electrum.util import create_and_start_event_loop, log_exceptions
+from electrum import constants
+
+import util
+
+
+#constants.set_testnet()
+
+loop, stopping_fut, loop_thread = create_and_start_event_loop()
+network = Network()
+network.start()
+
+@log_exceptions
+async def f():
+ try:
+ peers = await util.get_peers(network)
+ peers = filter_version(peers)
+ print(json.dumps(peers, sort_keys=True, indent=4))
+ finally:
+ stopping_fut.set_result(1)
-servers = filter_version(util.get_peers())
-print(json.dumps(servers, sort_keys = True, indent = 4))
+asyncio.run_coroutine_threadsafe(f(), loop)
diff --git a/electrum/scripts/txradar.py b/electrum/scripts/txradar.py
@@ -1,20 +1,38 @@
#!/usr/bin/env python3
-from . import util
import sys
+import asyncio
+
+from electrum.network import filter_protocol, Network
+from electrum.util import create_and_start_event_loop, log_exceptions
+
+import util
+
+
try:
- tx = sys.argv[1]
+ txid = sys.argv[1]
except:
print("usage: txradar txid")
sys.exit(1)
-peers = util.get_peers()
-results = util.send_request(peers, 'blockchain.transaction.get', [tx])
-r1 = []
-r2 = []
+loop, stopping_fut, loop_thread = create_and_start_event_loop()
+network = Network()
+network.start()
-for k, v in results.items():
- (r1 if v else r2).append(k)
+@log_exceptions
+async def f():
+ try:
+ peers = await util.get_peers(network)
+ peers = filter_protocol(peers, 's')
+ results = await util.send_request(network, peers, 'blockchain.transaction.get', [txid])
+ r1, r2 = [], []
+ for k, v in results.items():
+ (r1 if not isinstance(v, Exception) else r2).append(k)
+ print(f"Received {len(results)} answers")
+ try: propagation = len(r1) * 100. / (len(r1) + len(r2))
+ except ZeroDivisionError: propagation = 0
+ print(f"Propagation rate: {propagation:.1f} percent")
+ finally:
+ stopping_fut.set_result(1)
-print("Received %d answers"%len(results))
-print("Propagation rate: %.1f percent" % (len(r1) *100./(len(r1)+ len(r2))))
+asyncio.run_coroutine_threadsafe(f(), loop)
diff --git a/electrum/scripts/util.py b/electrum/scripts/util.py
@@ -1,87 +1,46 @@
-import select, time, queue
-# import electrum
-from .. import Connection, Interface, SimpleConfig
+import asyncio
+from typing import List, Sequence
-from electrum.network import parse_servers
-from collections import defaultdict
+from aiorpcx import TaskGroup
+
+from electrum.network import parse_servers, Network
+from electrum.interface import Interface
-# electrum.util.set_verbosity(1)
-def get_interfaces(servers, timeout=10):
- '''Returns a map of servers to connected interfaces. If any
- connections fail or timeout, they will be missing from the map.
- '''
- assert type(servers) is list
- socket_queue = queue.Queue()
- config = SimpleConfig()
- connecting = {}
- for server in servers:
- if server not in connecting:
- connecting[server] = Connection(server, socket_queue, config.path)
- interfaces = {}
- timeout = time.time() + timeout
- count = 0
- while time.time() < timeout and count < len(servers):
- try:
- server, socket = socket_queue.get(True, 0.3)
- except queue.Empty:
- continue
- if socket:
- interfaces[server] = Interface(server, socket)
- count += 1
- return interfaces
-def wait_on_interfaces(interfaces, timeout=10):
- '''Return a map of servers to a list of (request, response) tuples.
- Waits timeout seconds, or until each interface has a response'''
- result = defaultdict(list)
- timeout = time.time() + timeout
- while len(result) < len(interfaces) and time.time() < timeout:
- rin = [i for i in interfaces.values()]
- win = [i for i in interfaces.values() if i.unsent_requests]
- rout, wout, xout = select.select(rin, win, [], 1)
- for interface in wout:
- interface.send_requests()
- for interface in rout:
- responses = interface.get_responses()
- if responses:
- result[interface.server].extend(responses)
- return result
+#electrum.util.set_verbosity(True)
-def get_peers():
- config = SimpleConfig()
- peers = {}
- # 1. get connected interfaces
- server = config.get('server')
- if server is None:
- print("You need to set a secure server, for example (for mainnet): 'electrum setconfig server helicarrier.bauerj.eu:50002:s'")
- return []
- interfaces = get_interfaces([server])
- if not interfaces:
- print("No connection to", server)
- return []
- # 2. get list of peers
- interface = interfaces[server]
- interface.queue_request('server.peers.subscribe', [], 0)
- responses = wait_on_interfaces(interfaces).get(server)
- if responses:
- response = responses[0][1] # One response, (req, response) tuple
- peers = parse_servers(response.get('result'))
+async def get_peers(network: Network):
+ while not network.is_connected():
+ await asyncio.sleep(1)
+ interface = network.interface
+ session = interface.session
+ print(f"asking server {interface.server} for its peers")
+ peers = parse_servers(await session.send_request('server.peers.subscribe'))
+ print(f"got {len(peers)} servers")
return peers
-def send_request(peers, method, params):
- print("Contacting %d servers"%len(peers))
- interfaces = get_interfaces(peers)
- print("%d servers could be reached" % len(interfaces))
- for peer in peers:
- if not peer in interfaces:
- print("Connection failed:", peer)
- for msg_id, i in enumerate(interfaces.values()):
- i.queue_request(method, params, msg_id)
- responses = wait_on_interfaces(interfaces)
- for peer in interfaces:
- if not peer in responses:
- print(peer, "did not answer")
- results = dict(zip(responses.keys(), [t[0][1].get('result') for t in responses.values()]))
- print("%d answers"%len(results))
- return results
+async def send_request(network: Network, servers: List[str], method: str, params: Sequence):
+ print(f"contacting {len(servers)} servers")
+ num_connecting = len(network.connecting)
+ for server in servers:
+ network._start_interface(server)
+ # sleep a bit
+ for _ in range(10):
+ if len(network.connecting) < num_connecting:
+ break
+ await asyncio.sleep(1)
+ print(f"connected to {len(network.interfaces)} servers. sending request to all.")
+ responses = dict()
+ async def get_response(iface: Interface):
+ try:
+ res = await iface.session.send_request(method, params, timeout=10)
+ except Exception as e:
+ print(f"server {iface.server} errored or timed out: ({repr(e)})")
+ res = e
+ responses[iface.server] = res
+ async with TaskGroup() as group:
+ for interface in network.interfaces.values():
+ await group.spawn(get_response(interface))
+ print("%d answers" % len(responses))
+ return responses
diff --git a/electrum/scripts/watch_address.py b/electrum/scripts/watch_address.py
@@ -1,10 +1,12 @@
#!/usr/bin/env python3
import sys
-import time
-from electrum import bitcoin
-from .. import SimpleConfig, Network
-from electrum.util import print_msg, json_encode
+import asyncio
+
+from electrum.network import Network
+from electrum.util import print_msg, create_and_start_event_loop
+from electrum.synchronizer import SynchronizerBase
+
try:
addr = sys.argv[1]
@@ -12,25 +14,31 @@ except Exception:
print("usage: watch_address <bitcoin_address>")
sys.exit(1)
-sh = bitcoin.address_to_scripthash(addr)
-
# start network
-c = SimpleConfig()
-network = Network(c)
+loop = create_and_start_event_loop()[0]
+network = Network()
network.start()
-# wait until connected
-while network.is_connecting():
- time.sleep(0.1)
-if not network.is_connected():
- print_msg("daemon is not connected")
- sys.exit(1)
+class Notifier(SynchronizerBase):
+ def __init__(self, network):
+ SynchronizerBase.__init__(self, network)
+ self.watched_addresses = set()
+ self.watch_queue = asyncio.Queue()
+
+ async def main(self):
+ # resend existing subscriptions if we were restarted
+ for addr in self.watched_addresses:
+ await self._add_address(addr)
+ # main loop
+ while True:
+ addr = await self.watch_queue.get()
+ self.watched_addresses.add(addr)
+ await self._add_address(addr)
+
+ async def _on_address_status(self, addr, status):
+ print_msg(f"addr {addr}, status {status}")
-# 2. send the subscription
-callback = lambda response: print_msg(json_encode(response.get('result')))
-network.subscribe_to_address(addr, callback)
-# 3. wait for results
-while network.is_connected():
- time.sleep(1)
+notifier = Notifier(network)
+asyncio.run_coroutine_threadsafe(notifier.watch_queue.put(addr), loop)
diff --git a/electrum/synchronizer.py b/electrum/synchronizer.py
@@ -31,7 +31,7 @@ from aiorpcx import TaskGroup, run_in_thread
from .transaction import Transaction
from .util import bh2u, make_aiohttp_session, NetworkJobOnDefaultServer
-from .bitcoin import address_to_scripthash
+from .bitcoin import address_to_scripthash, is_address
if TYPE_CHECKING:
from .network import Network
@@ -77,7 +77,8 @@ class SynchronizerBase(NetworkJobOnDefaultServer):
def add(self, addr):
asyncio.run_coroutine_threadsafe(self._add_address(addr), self.asyncio_loop)
- async def _add_address(self, addr):
+ async def _add_address(self, addr: str):
+ if not is_address(addr): raise ValueError(f"invalid bitcoin address {addr}")
if addr in self.requested_addrs: return
self.requested_addrs.add(addr)
await self.add_queue.put(addr)
diff --git a/electrum/util.py b/electrum/util.py
@@ -278,7 +278,7 @@ class DaemonThread(threading.Thread, PrintError):
self.print_error("stopped")
-verbosity = '*'
+verbosity = ''
def set_verbosity(filters: Union[str, bool]):
global verbosity
if type(filters) is bool: # backwards compat
@@ -983,3 +983,26 @@ class NetworkJobOnDefaultServer(PrintError):
s = self.interface.session
assert s is not None
return s
+
+
+def create_and_start_event_loop() -> Tuple[asyncio.AbstractEventLoop,
+ asyncio.Future,
+ threading.Thread]:
+ def on_exception(loop, context):
+ """Suppress spurious messages it appears we cannot control."""
+ SUPPRESS_MESSAGE_REGEX = re.compile('SSL handshake|Fatal read error on|'
+ 'SSL error in data received')
+ message = context.get('message')
+ if message and SUPPRESS_MESSAGE_REGEX.match(message):
+ return
+ loop.default_exception_handler(context)
+
+ loop = asyncio.get_event_loop()
+ loop.set_exception_handler(on_exception)
+ # loop.set_debug(1)
+ stopping_fut = asyncio.Future()
+ loop_thread = threading.Thread(target=loop.run_until_complete,
+ args=(stopping_fut,),
+ name='EventLoop')
+ loop_thread.start()
+ return loop, stopping_fut, loop_thread