commit 7986bd66635cbabcb2dc135aebd7612ebdb7efbe
parent 8add436172282a018fbcc9e664c703d2b3191a20
Author: chris-belcher <chris-belcher@users.noreply.github.com>
Date: Thu, 25 Feb 2021 13:52:46 +0000
Notify client the status of a tor broadcasted tx
See issue #220
Previously if Tor broadcasting was enabled the server would always reply
with a success message to the client as long as the transaction was valid,
regardless of what actually happened with the broadcasting.
One consequence was that if Tor was not running the broadcast would silently
fail, all the while displaying a success message to the user.
This commit uses a thread-safe queue to wait for a message from one of the
broadcasting threads. Once a thread uploads the transaction to the first peer
it will notify and wake up the server thread. Only then will the client see
a success message. For network failures or Tor not running a fail message
will be shown. There is also a timeout of 20 seconds after which a success
message will be shown, because Tor can be slow and theres no point making
the user stare at a "Please wait" dialog for so long.
Diffstat:
2 files changed, 31 insertions(+), 12 deletions(-)
diff --git a/electrumpersonalserver/server/electrumprotocol.py b/electrumpersonalserver/server/electrumprotocol.py
@@ -357,8 +357,10 @@ class ElectrumProtocol(object):
elif chaininfo["chain"] == "regtest":
network = "regtest"
self.logger.debug("broadcasting to network: " + network)
- tor_broadcast_tx(txhex, self.tor_hostport, network,
- self.rpc, self.logger)
+ success = tor_broadcast_tx(txhex, self.tor_hostport,
+ network, self.rpc, self.logger)
+ if not success:
+ result = None
elif broadcast_method.startswith("system "):
with tempfile.NamedTemporaryFile() as fd:
system_line = broadcast_method[7:].replace("%s",
diff --git a/electrumpersonalserver/server/peertopeer.py b/electrumpersonalserver/server/peertopeer.py
@@ -4,6 +4,7 @@ import socket
import time
import base64
import threading
+import queue
import random
from struct import pack, unpack
from datetime import datetime
@@ -37,6 +38,7 @@ KEEPALIVE_INTERVAL = 2 * 60
# close connection if keep alive ping isnt responded to in this many seconds
KEEPALIVE_TIMEOUT = 20 * 60
+
def ip_to_hex(ip_str):
# ipv4 only for now
return socket.inet_pton(socket.AF_INET, ip_str)
@@ -159,12 +161,13 @@ class P2PMessageHandler(object):
class P2PProtocol(object):
def __init__(self, p2p_message_handler, remote_hostport,
- network, logger, user_agent=DEFAULT_USER_AGENT,
+ network, logger, notify_queue, user_agent=DEFAULT_USER_AGENT,
socks5_hostport=("localhost", 9050), connect_timeout=30,
heartbeat_interval=15, start_height=0):
self.p2p_message_handler = p2p_message_handler
self.remote_hostport = remote_hostport
self.logger = logger
+ self.notify_queue = notify_queue
self.user_agent = user_agent
self.socks5_hostport = socks5_hostport
self.connect_timeout = connect_timeout
@@ -204,7 +207,6 @@ class P2PProtocol(object):
self.sock.connect(self.remote_hostport)
self.sock.sendall(self.create_message('version', version_message))
- self.logger.debug('Connected to bitcoin peer')
self.sock.settimeout(self.heartbeat_interval)
self.closed = False
try:
@@ -276,13 +278,14 @@ class P2PProtocol(object):
+ btc.bin_dbl_sha256(payload)[:4] + payload)
class P2PBroadcastTx(P2PMessageHandler):
- def __init__(self, txhex, logger):
+ def __init__(self, txhex, logger, notify_queue):
P2PMessageHandler.__init__(self, logger)
self.txhex = bytes.fromhex(txhex)
self.txid = btc.bin_txhash(self.txhex)
self.uploaded_tx = False
self.time_marker = datetime.now()
self.connected = False
+ self.notify_queue = notify_queue
def on_recv_version(self, p2p, version, services, timestamp,
addr_recv_services, addr_recv_ip, addr_trans_services,
@@ -336,6 +339,7 @@ class P2PBroadcastTx(P2PMessageHandler):
self.uploaded_tx = True
self.logger.debug("Uploaded transaction via tor to peer at "
+ str(p2p.remote_hostport))
+ self.notify_queue.put(True)
##make sure the packets really got through by sleeping
##some kernels seem to send a RST packet on close() even
##if theres still data in the send buffer
@@ -343,12 +347,12 @@ class P2PBroadcastTx(P2PMessageHandler):
p2p.close()
def broadcaster_thread(txhex, node_addrs, tor_hostport, network, logger,
- start_height):
+ start_height, notify_queue):
for node_addr in node_addrs:
remote_hostport = (node_addr["address"], node_addr["port"])
- p2p_msg_handler = P2PBroadcastTx(txhex, logger)
- p2p = P2PProtocol(p2p_msg_handler, remote_hostport=remote_hostport,
- network=network, logger=logger, socks5_hostport=tor_hostport,
+ p2p_msg_handler = P2PBroadcastTx(txhex, logger, notify_queue)
+ p2p = P2PProtocol(p2p_msg_handler, remote_hostport,
+ network, logger, notify_queue, socks5_hostport=tor_hostport,
heartbeat_interval=20, start_height=start_height)
try:
p2p.run()
@@ -359,7 +363,8 @@ def broadcaster_thread(txhex, node_addrs, tor_hostport, network, logger,
break
logger.debug("Exiting tor broadcast thread, uploaded_tx = " +
str(p2p_msg_handler.uploaded_tx))
- # return false if never found a node that accepted unconfirms
+ if not p2p_msg_handler.uploaded_tx:
+ notify_queue.put(False)
return p2p_msg_handler.uploaded_tx
def chunk_list(d, n):
@@ -391,11 +396,23 @@ def tor_broadcast_tx(txhex, tor_hostport, network, rpc, logger):
node_addrs_witness[:required_address_count],
CONNECTION_ATTEMPTS_PER_THREAD
)
+ notify_queue = queue.Queue()
start_height = rpc.call("getblockcount", [])
for node_addrs in node_addrs_chunks:
t = threading.Thread(target=broadcaster_thread,
args=(txhex, node_addrs, tor_hostport, network, logger,
- start_height),
+ start_height, notify_queue),
daemon=True)
t.start()
-
+ try:
+ success = notify_queue.get(block=True, timeout=20)
+ except queue.Empty:
+ logger.debug("Timed out getting notification for broadcasting "
+ + "transaction")
+ #the threads will maybe still continue to try broadcasting even
+ # after this timeout
+ #could time out at 20 seconds for any legitimate reason, tor is slow
+ # so no point failing, this timeout is just so the user doesnt have
+ # to stare at a seemingly-frozen dialog
+ success = True
+ return success