commit fe44aa79ca54d3a1bf805e1ea203f9cb07ab9b14
parent 90da1ce20cbea9b8ffed1a4c812cde3f7b63b99b
Author: chris-belcher <chris-belcher@users.noreply.github.com>
Date: Sat, 22 Jun 2019 01:00:50 +0100
Pass same logging instance to tor threads
Also move around lines a little bit for clarity. And edit,
remove or change logging level for some log messages.
Diffstat:
2 files changed, 54 insertions(+), 60 deletions(-)
diff --git a/electrumpersonalserver/server/common.py b/electrumpersonalserver/server/common.py
@@ -236,6 +236,8 @@ def handle_query(sock, line, rpc, txmonitor, disable_mempool_fee_histogram,
result = txreport["reject-reason"]
else:
result = txreport["txid"]
+ logger.info('Broadcasting tx ' + txreport["txid"] + " with " +
+ "broadcast method: " + broadcast_method)
if broadcast_method == "own-node":
if not rpc.call("getnetworkinfo", [])["localrelay"]:
result = "Broadcast disabled when using blocksonly"
@@ -256,7 +258,8 @@ def handle_query(sock, line, rpc, txmonitor, disable_mempool_fee_histogram,
network = "regtest"
for i in range(TOR_CONNECTIONS):
t = threading.Thread(target=p2p.tor_broadcast_tx,
- args=(txhex, tor_hostport, network, rpc,))
+ args=(txhex, tor_hostport, network, rpc, logger),
+ daemon=True)
t.start()
time.sleep(0.1)
elif broadcast_method.startswith("system "):
diff --git a/electrumpersonalserver/server/peertopeer.py b/electrumpersonalserver/server/peertopeer.py
@@ -13,7 +13,7 @@ from electrumpersonalserver.server.socks import (
)
from electrumpersonalserver.server.jsonrpc import JsonRpcError
-import logging as log
+import logging
PROTOCOL_VERSION = 70012
DEFAULT_USER_AGENT = '/Satoshi:0.18.0/'
@@ -35,17 +35,14 @@ def ip_to_hex(ip_str):
# ipv4 only for now
return socket.inet_pton(socket.AF_INET, ip_str)
-
def create_net_addr(hexip, port): # doesnt contain time as in bitcoin wiki
services = 0
hex = bytes(10) + b'\xFF\xFF' + hexip
return pack('<Q16s', services, hex) + pack('>H', port)
-
def create_var_str(s):
return btc.num_to_var_int(len(s)) + s.encode()
-
def read_int(ptr, payload, n, littleendian=True):
data = payload[ptr[0] : ptr[0]+n]
if littleendian:
@@ -54,7 +51,6 @@ def read_int(ptr, payload, n, littleendian=True):
ptr[0] += n
return ret
-
def read_var_int(ptr, payload):
val = payload[ptr[0]]
ptr[0] += 1
@@ -62,14 +58,12 @@ def read_var_int(ptr, payload):
return val
return read_int(ptr, payload, 2**(val - 252))
-
def read_var_str(ptr, payload):
l = read_var_int(ptr, payload)
ret = payload[ptr[0]: ptr[0] + l]
ptr[0] += l
return ret
-
def ip_hex_to_str(ip_hex):
# https://en.wikipedia.org/wiki/IPv6#IPv4-mapped_IPv6_addresses
# https://www.cypherpunk.at/onioncat_trac/wiki/OnionCat
@@ -81,26 +75,24 @@ def ip_hex_to_str(ip_hex):
else:
return socket.inet_ntop(socket.AF_INET6, ip_hex)
-
class P2PMessageHandler(object):
- def __init__(self):
+ def __init__(self, logger):
self.last_message = datetime.now()
self.waiting_for_keepalive = False
- self.log = (log if log else
- log.getLogger('ELECTRUMPERSONALSERVER'))
+ self.logger = logger
def check_keepalive(self, p2p):
if self.waiting_for_keepalive:
if ((datetime.now() - self.last_message).total_seconds()
< KEEPALIVE_TIMEOUT):
return
- log.info('keepalive timed out, closing')
+ self.logger.info('keepalive timed out, closing')
p2p.sock.close()
else:
if ((datetime.now() - self.last_message).total_seconds()
< KEEPALIVE_INTERVAL):
return
- log.debug('sending keepalive to peer')
+ self.logger.debug('sending keepalive to peer')
self.waiting_for_keepalive = True
p2p.sock.sendall(p2p.create_message('ping', '\x00'*8))
@@ -128,7 +120,8 @@ class P2PMessageHandler(object):
else:
# must check node accepts unconfirmed txes before broadcasting
relay = True
- log.debug(('peer version message: version=%d services=0x%x'
+ self.logger.debug(('Received peer version message: version=%d'
+ + ' services=0x%x'
+ ' timestamp=%s user_agent=%s start_height=%d relay=%i'
+ ' them=%s:%d us=%s:%d') % (version,
services, str(datetime.fromtimestamp(timestamp)),
@@ -160,11 +153,10 @@ class P2PMessageHandler(object):
class P2PProtocol(object):
def __init__(self, p2p_message_handler, remote_hostport,
- network, user_agent=DEFAULT_USER_AGENT,
+ network, logger, user_agent=DEFAULT_USER_AGENT,
socks5_hostport=("localhost", 9050), connect_timeout=30,
heartbeat_interval=15):
- self.log = (log if log else
- log.getLogger('ELECTRUMPERSONALSERVER'))
+ self.logger = logger
self.p2p_message_handler = p2p_message_handler
self.network = network
self.user_agent = user_agent
@@ -177,9 +169,7 @@ class P2PProtocol(object):
self.magic = 0xdab5bffa
else:
self.magic = 0xd9b4bef9
-
self.closed = False
-
self.remote_hostport = remote_hostport
def run(self):
@@ -196,24 +186,18 @@ class P2PProtocol(object):
+ create_var_str(self.user_agent)
+ pack('<I', start_height)
+ b'\x01')
- data = self.create_message('version', version_message)
- while True:
- log.info('connecting to bitcoin peer (magic=' + hex(self.magic)
- + ') at ' + str(self.remote_hostport) + ' with proxy ' +
- str(self.socks5_hostport))
- if self.socks5_hostport is None:
- self.sock = socket.socket(socket.AF_INET,
- socket.SOCK_STREAM)
- else:
- setdefaultproxy(PROXY_TYPE_SOCKS5, self.socks5_hostport[0],
- self.socks5_hostport[1], True)
- self.sock = socksocket()
- self.sock.settimeout(self.connect_timeout)
- self.sock.connect(self.remote_hostport)
- self.sock.sendall(data)
- break
- log.info('connected')
+ self.logger.debug('Connecting to bitcoin peer (magic=' +
+ hex(self.magic) + ') at ' + str(self.remote_hostport) +
+ ' with proxy ' + str(self.socks5_hostport))
+ setdefaultproxy(PROXY_TYPE_SOCKS5, self.socks5_hostport[0],
+ self.socks5_hostport[1], True)
+ self.sock = socksocket()
+ self.sock.settimeout(self.connect_timeout)
+ self.sock.connect(self.remote_hostport)
+ self.sock.sendall(self.create_message('version', version_message))
+
+ self.logger.debug('Connected to bitcoin peer')
self.sock.settimeout(self.heartbeat_interval)
self.closed = False
try:
@@ -239,7 +223,8 @@ class P2PProtocol(object):
unpack('<I12sI4s', recv_buffer[:HEADER_LENGTH])
recv_buffer = recv_buffer[HEADER_LENGTH:]
if net_magic != self.magic:
- log.debug('wrong MAGIC: ' + hex(net_magic))
+ self.logger.debug('wrong MAGIC: ' +
+ hex(net_magic))
self.sock.close()
break
command = command.strip(b'\0')
@@ -252,7 +237,8 @@ class P2PProtocol(object):
self.p2p_message_handler.handle_message(
self, command, payload_length, payload)
else:
- log.debug("wrong checksum, dropping " +
+ self.logger.debug("wrong checksum, " +
+ "dropping " +
"message, cmd=" + command +
" payloadlen=" + str(payload_length))
payload_length = -1
@@ -266,7 +252,7 @@ class P2PProtocol(object):
self.closed = True
except IOError as e:
import traceback
- log.debug("logging traceback from %s: \n" %
+ self.logger.debug("logging traceback from %s: \n" %
traceback.format_exc())
self.closed = True
finally:
@@ -283,11 +269,10 @@ class P2PProtocol(object):
+ btc.bin_dbl_sha256(payload)[:4] + payload)
class P2PBroadcastTx(P2PMessageHandler):
- def __init__(self, txhex):
- P2PMessageHandler.__init__(self)
+ def __init__(self, txhex, logger):
+ P2PMessageHandler.__init__(self, logger)
self.txhex = bytes.fromhex(txhex)
self.txid = btc.bin_txhash(self.txhex)
- log.debug('broadcasting txid ' + str(self.txid))
self.uploaded_tx = False
self.time_marker = datetime.now()
self.connected = False
@@ -296,15 +281,15 @@ class P2PBroadcastTx(P2PMessageHandler):
addr_recv_services, addr_recv_ip, addr_trans_services,
addr_trans_ip, addr_trans_port, user_agent, start_height, relay):
if not relay:
- log.debug('peer not accepting unconfirmed txes, trying another')
+ self.logger.debug('peer not accepting unconfirmed txes, trying ' +
+ 'another')
# this happens if the other node is using blockonly=1
p2p.close()
if not services & NODE_WITNESS:
- log.debug('peer not accepting witness data, trying another')
+ self.logger.debug('peer not accepting witness data, trying another')
p2p.close()
def on_connected(self, p2p):
- log.debug('sending inv')
MSG = 1 #msg_tx
inv_payload = pack('<BI', 1, MSG) + self.txid
p2p.sock.sendall(p2p.create_message('inv', inv_payload))
@@ -313,19 +298,20 @@ class P2PBroadcastTx(P2PMessageHandler):
self.connected = True
def on_heartbeat(self, p2p):
- log.debug('broadcaster heartbeat')
+ self.logger.debug('broadcaster heartbeat')
VERACK_TIMEOUT = 40
GETDATA_TIMEOUT = 60
if not self.connected:
if ((datetime.now() - self.time_marker).total_seconds()
< VERACK_TIMEOUT):
return
- log.debug('timed out of waiting for verack')
+ self.logger.debug('timed out of waiting for verack')
else:
if ((datetime.now() - self.time_marker).total_seconds()
< GETDATA_TIMEOUT):
return
- log.debug('timed out of waiting for getdata, node already has tx')
+ self.logger.debug('timed out in waiting for getdata, node ' +
+ 'already has tx')
self.uploaded_tx = True
p2p.close()
@@ -338,32 +324,37 @@ class P2PBroadcastTx(P2PMessageHandler):
ptr[0] += 4
hash_id = payload[ptr[0] : ptr[0] + 32]
ptr[0] += 32
- log.debug('hashid=' + hash_id.hex())
if hash_id == self.txid:
- log.debug('uploading tx')
+ self.logger.info("Uploading tx to " +
+ str(p2p.remote_hostport))
p2p.sock.sendall(p2p.create_message('tx', self.txhex))
self.uploaded_tx = True
p2p.close()
-def tor_broadcast_tx(txhex, tor_hostport, network, rpc):
+def tor_broadcast_tx(txhex, tor_hostport, network, rpc, logger):
ATTEMPTS = 8 # how many times to search for a node that accepts txes
try:
node_addrs = rpc.call("getnodeaddresses", [ATTEMPTS])
- except JsonRpcError:
- log.error("BitcoinCore v0.18.0 must be used to broadcast through Tor")
+ except JsonRpcError as e:
+ logger.debug(repr(e))
+ logger.error("BitcoinCore v0.18.0 is required to broadcast through Tor")
return False
node_addrs = [a for a in node_addrs if a["services"] & NODE_WITNESS]
for i in range(len(node_addrs)):
remote_hostport = (node_addrs[i]["address"], node_addrs[i]["port"])
- p2p_msg_handler = P2PBroadcastTx(txhex)
+ p2p_msg_handler = P2PBroadcastTx(txhex, logger)
p2p = P2PProtocol(p2p_msg_handler, remote_hostport=remote_hostport,
- network=network, socks5_hostport=tor_hostport,
+ network=network, logger=logger, socks5_hostport=tor_hostport,
heartbeat_interval=20)
try:
p2p.run()
- except IOError:
+ except IOError as e:
+ logger.debug("p2p.run(): " + repr(e))
continue
- log.debug('uploaded={}'.format(p2p_msg_handler.uploaded_tx))
if p2p_msg_handler.uploaded_tx:
- return True
- return False # never find a node that accepted unconfirms
+ break
+ logger.debug("Exiting tor broadcast thread, uploaded_tx = " +
+ str(p2p_msg_handler.uploaded_tx))
+ # return false if never found a node that accepted unconfirms
+ return p2p_msg_handler.uploaded_tx
+