commit dd9695403c88ae5e4b54ef682564a315203caf2d
parent fe44aa79ca54d3a1bf805e1ea203f9cb07ab9b14
Author: chris-belcher <chris-belcher@users.noreply.github.com>
Date: Mon, 24 Jun 2019 00:12:34 +0100
Move getnodeaddresses RPC calls to main thread
This avoids a RPC concurrency issue arising from having
8 threads connect to the node RPC port at once
Diffstat:
2 files changed, 50 insertions(+), 29 deletions(-)
diff --git a/electrumpersonalserver/server/common.py b/electrumpersonalserver/server/common.py
@@ -5,14 +5,13 @@ import traceback, sys, platform
from ipaddress import ip_network, ip_address
import logging
import tempfile
-import threading
from electrumpersonalserver.server.jsonrpc import JsonRpc, JsonRpcError
import electrumpersonalserver.server.hashes as hashes
import electrumpersonalserver.server.merkleproof as merkleproof
import electrumpersonalserver.server.deterministicwallet as deterministicwallet
import electrumpersonalserver.server.transactionmonitor as transactionmonitor
-import electrumpersonalserver.server.peertopeer as p2p
+import electrumpersonalserver.server.peertopeer as peertopeer
SERVER_VERSION_NUMBER = "0.1.7"
@@ -247,21 +246,17 @@ def handle_query(sock, line, rpc, txmonitor, disable_mempool_fee_histogram,
try:
rpc.call("sendrawtransaction", [txhex])
except JsonRpcError as e:
- pass
+ logger.error("Error broadcasting: " + repr(e))
elif broadcast_method == "tor":
- TOR_CONNECTIONS = 8
network = "mainnet"
chaininfo = rpc.call("getblockchaininfo", [])
if chaininfo["chain"] == "test":
network = "testnet"
elif chaininfo["chain"] == "regtest":
network = "regtest"
- for i in range(TOR_CONNECTIONS):
- t = threading.Thread(target=p2p.tor_broadcast_tx,
- args=(txhex, tor_hostport, network, rpc, logger),
- daemon=True)
- t.start()
- time.sleep(0.1)
+ logger.debug("broadcasting to network: " + network)
+ peertopeer.tor_broadcast_tx(txhex, tor_hostport, network, rpc,
+ logger)
elif broadcast_method.startswith("system "):
with tempfile.NamedTemporaryFile() as fd:
system_line = broadcast_method[7:].replace("%s", fd.name)
diff --git a/electrumpersonalserver/server/peertopeer.py b/electrumpersonalserver/server/peertopeer.py
@@ -2,6 +2,7 @@
import socket, time
import base64
+import threading
from struct import pack, unpack
from datetime import datetime
@@ -13,8 +14,6 @@ from electrumpersonalserver.server.socks import (
)
from electrumpersonalserver.server.jsonrpc import JsonRpcError
-import logging
-
PROTOCOL_VERSION = 70012
DEFAULT_USER_AGENT = '/Satoshi:0.18.0/'
NODE_WITNESS = (1 << 3)
@@ -187,9 +186,9 @@ class P2PProtocol(object):
+ pack('<I', start_height)
+ b'\x01')
- self.logger.debug('Connecting to bitcoin peer (magic=' +
- hex(self.magic) + ') at ' + str(self.remote_hostport) +
- ' with proxy ' + str(self.socks5_hostport))
+ self.logger.debug('Connecting to bitcoin peer at ' +
+ str(self.remote_hostport) + ' with proxy ' +
+ str(self.socks5_hostport))
setdefaultproxy(PROXY_TYPE_SOCKS5, self.socks5_hostport[0],
self.socks5_hostport[1], True)
self.sock = socksocket()
@@ -325,23 +324,15 @@ class P2PBroadcastTx(P2PMessageHandler):
hash_id = payload[ptr[0] : ptr[0] + 32]
ptr[0] += 32
if hash_id == self.txid:
- self.logger.info("Uploading tx to " +
- str(p2p.remote_hostport))
p2p.sock.sendall(p2p.create_message('tx', self.txhex))
self.uploaded_tx = True
+ self.logger.info("Uploaded transaction via tor to peer at "
+ + str(p2p.remote_hostport))
p2p.close()
-def tor_broadcast_tx(txhex, tor_hostport, network, rpc, logger):
- ATTEMPTS = 8 # how many times to search for a node that accepts txes
- try:
- node_addrs = rpc.call("getnodeaddresses", [ATTEMPTS])
- except JsonRpcError as e:
- logger.debug(repr(e))
- logger.error("BitcoinCore v0.18.0 is required to broadcast through Tor")
- return False
- node_addrs = [a for a in node_addrs if a["services"] & NODE_WITNESS]
- for i in range(len(node_addrs)):
- remote_hostport = (node_addrs[i]["address"], node_addrs[i]["port"])
+def broadcaster_thread(txhex, node_addrs, tor_hostport, network, rpc, logger):
+ for node_addr in node_addrs:
+ remote_hostport = (node_addr["address"], node_addr["port"])
p2p_msg_handler = P2PBroadcastTx(txhex, logger)
p2p = P2PProtocol(p2p_msg_handler, remote_hostport=remote_hostport,
network=network, logger=logger, socks5_hostport=tor_hostport,
@@ -349,7 +340,7 @@ def tor_broadcast_tx(txhex, tor_hostport, network, rpc, logger):
try:
p2p.run()
except IOError as e:
- logger.debug("p2p.run(): " + repr(e))
+ logger.debug("p2p.run() exited: " + repr(e))
continue
if p2p_msg_handler.uploaded_tx:
break
@@ -358,3 +349,38 @@ def tor_broadcast_tx(txhex, tor_hostport, network, rpc, logger):
# return false if never found a node that accepted unconfirms
return p2p_msg_handler.uploaded_tx
+def chunk_list(d, n):
+ return [d[x:x + n] for x in range(0, len(d), n)]
+
+def tor_broadcast_tx(txhex, tor_hostport, network, rpc, logger):
+ CONNECTION_THREADS = 8
+ CONNECTION_ATTEMPTS_PER_THREAD = 10
+
+ required_address_count = CONNECTION_ATTEMPTS_PER_THREAD * CONNECTION_THREADS
+ node_addrs_witness = []
+ while True:
+ try:
+ new_node_addrs = rpc.call("getnodeaddresses",
+ [required_address_count])
+ except JsonRpcError as e:
+ logger.debug(repr(e))
+ logger.error("Bitcoin Core v0.18.0 or higher is required "
+ "to broadcast through Tor")
+ return False
+ node_addrs_witness.extend(
+ [a for a in new_node_addrs if a["services"] & NODE_WITNESS]
+ )
+ logger.debug("len(new_node_addrs) = " + str(len(new_node_addrs)) +
+ " len(node_addrs_witness) = " + str(len(node_addrs_witness)))
+ if len(node_addrs_witness) > required_address_count:
+ break
+ node_addrs_chunks = chunk_list(
+ node_addrs_witness[:required_address_count],
+ CONNECTION_ATTEMPTS_PER_THREAD
+ )
+ for node_addrs in node_addrs_chunks:
+ t = threading.Thread(target=broadcaster_thread,
+ args=(txhex, node_addrs, tor_hostport, network, rpc, logger),
+ daemon=True)
+ t.start()
+