peertopeer.py (16716B)
1 #! /usr/bin/env python 2 3 import socket 4 import time 5 import base64 6 import threading 7 import queue 8 import random 9 from struct import pack, unpack 10 from datetime import datetime 11 12 import electrumpersonalserver.bitcoin as btc 13 from electrumpersonalserver.server.socks import ( 14 socksocket, 15 setdefaultproxy, 16 PROXY_TYPE_SOCKS5 17 ) 18 from electrumpersonalserver.server.jsonrpc import JsonRpcError 19 20 PROTOCOL_VERSION = 70016 21 DEFAULT_USER_AGENT = '/Satoshi:0.21.0/' 22 23 #https://github.com/bitcoin/bitcoin/blob/master/src/protocol.h 24 NODE_NETWORK = 1 25 NODE_BLOOM = 1 << 2 26 NODE_WITNESS = 1 << 3 27 NODE_NETWORK_LIMITED = 1 << 10 28 29 # protocol versions above this also send a relay boolean 30 RELAY_TX_VERSION = 70001 31 32 # length of bitcoin p2p packets 33 HEADER_LENGTH = 24 34 35 # if no message has been seen for this many seconds, send a ping 36 KEEPALIVE_INTERVAL = 2 * 60 37 38 # close connection if keep alive ping isnt responded to in this many seconds 39 KEEPALIVE_TIMEOUT = 20 * 60 40 41 42 def ip_to_hex(ip_str): 43 # ipv4 only for now 44 return socket.inet_pton(socket.AF_INET, ip_str) 45 46 def create_net_addr(hexip, port): # doesnt contain time as in bitcoin wiki 47 services = 0 48 hex = bytes(10) + b'\xFF\xFF' + hexip 49 return pack('<Q16s', services, hex) + pack('>H', port) 50 51 def create_var_str(s): 52 return btc.num_to_var_int(len(s)) + s.encode() 53 54 def read_int(ptr, payload, n, littleendian=True): 55 data = payload[ptr[0] : ptr[0]+n] 56 if littleendian: 57 data = data[::-1] 58 ret = btc.decode(data, 256) 59 ptr[0] += n 60 return ret 61 62 def read_var_int(ptr, payload): 63 val = payload[ptr[0]] 64 ptr[0] += 1 65 if val < 253: 66 return val 67 return read_int(ptr, payload, 2**(val - 252)) 68 69 def read_var_str(ptr, payload): 70 l = read_var_int(ptr, payload) 71 ret = payload[ptr[0]: ptr[0] + l] 72 ptr[0] += l 73 return ret 74 75 def ip_hex_to_str(ip_hex): 76 # https://en.wikipedia.org/wiki/IPv6#IPv4-mapped_IPv6_addresses 77 # https://www.cypherpunk.at/onioncat_trac/wiki/OnionCat 78 if ip_hex[:14] == '\x00'*10 + '\xff'*2: 79 # ipv4 mapped ipv6 addr 80 return socket.inet_ntoa(ip_hex[12:]) 81 elif ip_hex[:6] == '\xfd\x87\xd8\x7e\xeb\x43': 82 return base64.b32encode(ip_hex[6:]).lower() + '.onion' 83 else: 84 return socket.inet_ntop(socket.AF_INET6, ip_hex) 85 86 class P2PMessageHandler(object): 87 def __init__(self, logger): 88 self.last_message = datetime.now() 89 self.waiting_for_keepalive = False 90 self.logger = logger 91 92 def check_keepalive(self, p2p): 93 if self.waiting_for_keepalive: 94 if ((datetime.now() - self.last_message).total_seconds() 95 < KEEPALIVE_TIMEOUT): 96 return 97 self.logger.debug('keepalive timed out, closing') 98 p2p.sock.close() 99 else: 100 if ((datetime.now() - self.last_message).total_seconds() 101 < KEEPALIVE_INTERVAL): 102 return 103 self.logger.debug('sending keepalive to peer') 104 self.waiting_for_keepalive = True 105 p2p.sock.sendall(p2p.create_message('ping', '\x00'*8)) 106 107 def handle_message(self, p2p, command, length, payload): 108 self.last_message = datetime.now() 109 self.waiting_for_keepalive = False 110 ptr = [0] 111 if command == b'version': 112 version = read_int(ptr, payload, 4) 113 services = read_int(ptr, payload, 8) 114 timestamp = read_int(ptr, payload, 8) 115 addr_recv_services = read_int(ptr, payload, 8) 116 addr_recv_ip = payload[ptr[0] : ptr[0]+16] 117 ptr[0] += 16 118 addr_recv_port = read_int(ptr, payload, 2, False) 119 addr_trans_services = read_int(ptr, payload, 8) 120 addr_trans_ip = payload[ptr[0] : ptr[0]+16] 121 ptr[0] += 16 122 addr_trans_port = read_int(ptr, payload, 2, False) 123 ptr[0] += 8 # skip over nonce 124 user_agent = read_var_str(ptr, payload) 125 start_height = read_int(ptr, payload, 4) 126 if version > RELAY_TX_VERSION: 127 relay = read_int(ptr, payload, 1) != 0 128 else: 129 # must check node accepts unconfirmed txes before broadcasting 130 relay = True 131 self.logger.debug(('Received peer version message: version=%d' 132 + ' services=0x%x' 133 + ' timestamp=%s user_agent=%s start_height=%d relay=%i' 134 + ' them=%s:%d us=%s:%d') % (version, 135 services, str(datetime.fromtimestamp(timestamp)), 136 user_agent, start_height, relay, ip_hex_to_str(addr_trans_ip) 137 , addr_trans_port, ip_hex_to_str(addr_recv_ip), addr_recv_port)) 138 p2p.sock.sendall(p2p.create_message('verack', b'')) 139 self.on_recv_version(p2p, version, services, timestamp, 140 addr_recv_services, addr_recv_ip, addr_trans_services, 141 addr_trans_ip, addr_trans_port, user_agent, start_height, 142 relay) 143 elif command == b'verack': 144 self.on_connected(p2p) 145 elif command == b'ping': 146 p2p.sock.sendall(p2p.create_message('pong', payload)) 147 148 # optional override these in a subclass 149 150 def on_recv_version(self, p2p, version, services, timestamp, 151 addr_recv_services, addr_recv_ip, addr_trans_services, 152 addr_trans_ip, addr_trans_port, user_agent, start_height, relay): 153 pass 154 155 def on_connected(self, p2p): 156 pass 157 158 def on_heartbeat(self, p2p): 159 pass 160 161 162 class P2PProtocol(object): 163 def __init__(self, p2p_message_handler, remote_hostport, 164 network, logger, notify_queue, user_agent=DEFAULT_USER_AGENT, 165 socks5_hostport=("localhost", 9050), connect_timeout=30, 166 heartbeat_interval=15, start_height=0): 167 self.p2p_message_handler = p2p_message_handler 168 self.remote_hostport = remote_hostport 169 self.logger = logger 170 self.notify_queue = notify_queue 171 self.user_agent = user_agent 172 self.socks5_hostport = socks5_hostport 173 self.connect_timeout = connect_timeout 174 self.heartbeat_interval = heartbeat_interval 175 self.start_height = start_height 176 if network == "testnet": 177 self.magic = 0x0709110b 178 elif network == "regtest": 179 self.magic = 0xdab5bffa 180 else: 181 self.magic = 0xd9b4bef9 182 self.closed = False 183 184 def run(self): 185 services = (NODE_NETWORK | NODE_WITNESS | NODE_NETWORK_LIMITED) 186 st = int(time.time()) 187 nonce = random.getrandbits(64) 188 189 netaddr_them = create_net_addr(ip_to_hex('0.0.0.0'), 0) 190 netaddr_us = create_net_addr(ip_to_hex('0.0.0.0'), 0) 191 version_message = (pack('<iQQ', PROTOCOL_VERSION, services, st) 192 + netaddr_them 193 + netaddr_us 194 + pack('<Q', nonce) 195 + create_var_str(self.user_agent) 196 + pack('<I', self.start_height) 197 + b'\x01') 198 199 self.logger.debug('Connecting to bitcoin peer at ' + 200 str(self.remote_hostport) + ' with proxy ' + 201 str(self.socks5_hostport)) 202 setdefaultproxy(PROXY_TYPE_SOCKS5, self.socks5_hostport[0], 203 self.socks5_hostport[1], True) 204 self.sock = socksocket() 205 self.sock.settimeout(self.connect_timeout) 206 self.sock.connect(self.remote_hostport) 207 self.sock.sendall(self.create_message('version', version_message)) 208 209 self.sock.settimeout(self.heartbeat_interval) 210 self.closed = False 211 try: 212 recv_buffer = b'' 213 payload_length = -1 # -1 means waiting for header 214 command = None 215 checksum = None 216 while not self.closed: 217 try: 218 recv_data = self.sock.recv(4096) 219 if not recv_data or len(recv_data) == 0: 220 raise EOFError() 221 recv_buffer += recv_data 222 # this is O(N^2) scaling in time, another way would be to 223 # store in a list and combine at the end with "".join() 224 # but this isnt really timing critical so didnt optimize it 225 226 data_remaining = True 227 while data_remaining and not self.closed: 228 if payload_length == -1 and (len(recv_buffer) 229 >= HEADER_LENGTH): 230 net_magic, command, payload_length, checksum =\ 231 unpack('<I12sI4s', recv_buffer[:HEADER_LENGTH]) 232 recv_buffer = recv_buffer[HEADER_LENGTH:] 233 if net_magic != self.magic: 234 self.logger.debug('wrong MAGIC: ' + 235 hex(net_magic)) 236 self.sock.close() 237 break 238 command = command.strip(b'\0') 239 else: 240 if payload_length >= 0 and (len(recv_buffer) 241 >= payload_length): 242 payload = recv_buffer[:payload_length] 243 recv_buffer = recv_buffer[payload_length:] 244 if btc.bin_dbl_sha256(payload)[:4] == checksum: 245 self.p2p_message_handler.handle_message( 246 self, command, payload_length, payload) 247 else: 248 self.logger.debug("wrong checksum, " + 249 "dropping " + 250 "message, cmd=" + command + 251 " payloadlen=" + str(payload_length)) 252 payload_length = -1 253 data_remaining = True 254 else: 255 data_remaining = False 256 except socket.timeout: 257 self.p2p_message_handler.check_keepalive(self) 258 self.p2p_message_handler.on_heartbeat(self) 259 except EOFError as e: 260 self.closed = True 261 except IOError as e: 262 import traceback 263 self.logger.debug("logging traceback from %s: \n" % 264 traceback.format_exc()) 265 self.closed = True 266 finally: 267 try: 268 self.sock.close() 269 except Exception as _: 270 pass 271 272 def close(self): 273 self.closed = True 274 275 def create_message(self, command, payload): 276 return (pack("<I12sI", self.magic, command.encode(), len(payload)) 277 + btc.bin_dbl_sha256(payload)[:4] + payload) 278 279 class P2PBroadcastTx(P2PMessageHandler): 280 def __init__(self, txhex, logger, notify_queue): 281 P2PMessageHandler.__init__(self, logger) 282 self.txhex = bytes.fromhex(txhex) 283 self.txid = btc.bin_txhash(self.txhex) 284 self.uploaded_tx = False 285 self.time_marker = datetime.now() 286 self.connected = False 287 self.notify_queue = notify_queue 288 289 def on_recv_version(self, p2p, version, services, timestamp, 290 addr_recv_services, addr_recv_ip, addr_trans_services, 291 addr_trans_ip, addr_trans_port, user_agent, start_height, relay): 292 if not relay: 293 self.logger.debug('peer not accepting unconfirmed txes, trying ' + 294 'another') 295 # this happens if the other node is using blockonly=1 296 p2p.close() 297 if not services & NODE_WITNESS: 298 self.logger.debug('peer not accepting witness data, trying another') 299 p2p.close() 300 301 def on_connected(self, p2p): 302 MSG = 1 #msg_tx 303 inv_payload = pack('<BI', 1, MSG) + self.txid 304 p2p.sock.sendall(p2p.create_message('inv', inv_payload)) 305 self.time_marker = datetime.now() 306 self.uploaded_tx = False 307 self.connected = True 308 309 def on_heartbeat(self, p2p): 310 self.logger.debug('broadcaster heartbeat') 311 VERACK_TIMEOUT = 40 312 GETDATA_TIMEOUT = 60 313 if not self.connected: 314 if ((datetime.now() - self.time_marker).total_seconds() 315 < VERACK_TIMEOUT): 316 return 317 self.logger.debug('timed out of waiting for verack') 318 else: 319 if ((datetime.now() - self.time_marker).total_seconds() 320 < GETDATA_TIMEOUT): 321 return 322 self.logger.debug('timed out in waiting for getdata, node ' + 323 'already has tx') 324 self.uploaded_tx = True 325 p2p.close() 326 327 def handle_message(self, p2p, command, length, payload): 328 P2PMessageHandler.handle_message(self, p2p, command, length, payload) 329 ptr = [0] 330 if command == b'getdata': 331 count = read_var_int(ptr, payload) 332 for _ in range(count): 333 ptr[0] += 4 334 hash_id = payload[ptr[0] : ptr[0] + 32] 335 ptr[0] += 32 336 if hash_id == self.txid: 337 p2p.sock.sendall(p2p.create_message('tx', self.txhex)) 338 self.uploaded_tx = True 339 self.logger.debug("Uploaded transaction via tor to peer at " 340 + str(p2p.remote_hostport)) 341 self.notify_queue.put(True) 342 ##make sure the packets really got through by sleeping 343 ##some kernels seem to send a RST packet on close() even 344 ##if theres still data in the send buffer 345 time.sleep(5) 346 p2p.close() 347 348 def broadcaster_thread(txhex, node_addrs, tor_hostport, network, logger, 349 start_height, notify_queue): 350 for node_addr in node_addrs: 351 remote_hostport = (node_addr["address"], node_addr["port"]) 352 p2p_msg_handler = P2PBroadcastTx(txhex, logger, notify_queue) 353 p2p = P2PProtocol(p2p_msg_handler, remote_hostport, 354 network, logger, notify_queue, socks5_hostport=tor_hostport, 355 heartbeat_interval=20, start_height=start_height) 356 try: 357 p2p.run() 358 except IOError as e: 359 logger.debug("p2p.run() exited: " + repr(e)) 360 continue 361 if p2p_msg_handler.uploaded_tx: 362 break 363 logger.debug("Exiting tor broadcast thread, uploaded_tx = " + 364 str(p2p_msg_handler.uploaded_tx)) 365 if not p2p_msg_handler.uploaded_tx: 366 notify_queue.put(False) 367 return p2p_msg_handler.uploaded_tx 368 369 def chunk_list(d, n): 370 return [d[x:x + n] for x in range(0, len(d), n)] 371 372 def tor_broadcast_tx(txhex, tor_hostport, network, rpc, logger): 373 CONNECTION_THREADS = 8 374 CONNECTION_ATTEMPTS_PER_THREAD = 10 375 376 required_address_count = CONNECTION_ATTEMPTS_PER_THREAD * CONNECTION_THREADS 377 node_addrs_witness = [] 378 while True: 379 try: 380 new_node_addrs = rpc.call("getnodeaddresses", 381 [3*required_address_count//2]) 382 except JsonRpcError as e: 383 logger.debug(repr(e)) 384 logger.error("Bitcoin Core v0.18.0 or higher is required " 385 "to broadcast through Tor") 386 return False 387 node_addrs_witness.extend( 388 [a for a in new_node_addrs if a["services"] & NODE_WITNESS] 389 ) 390 logger.debug("len(new_node_addrs) = " + str(len(new_node_addrs)) + 391 " len(node_addrs_witness) = " + str(len(node_addrs_witness))) 392 if len(node_addrs_witness) > required_address_count: 393 break 394 node_addrs_chunks = chunk_list( 395 node_addrs_witness[:required_address_count], 396 CONNECTION_ATTEMPTS_PER_THREAD 397 ) 398 notify_queue = queue.Queue() 399 start_height = rpc.call("getblockcount", []) 400 for node_addrs in node_addrs_chunks: 401 t = threading.Thread(target=broadcaster_thread, 402 args=(txhex, node_addrs, tor_hostport, network, logger, 403 start_height, notify_queue), 404 daemon=True) 405 t.start() 406 try: 407 success = notify_queue.get(block=True, timeout=20) 408 except queue.Empty: 409 logger.debug("Timed out getting notification for broadcasting " 410 + "transaction") 411 #the threads will maybe still continue to try broadcasting even 412 # after this timeout 413 #could time out at 20 seconds for any legitimate reason, tor is slow 414 # so no point failing, this timeout is just so the user doesnt have 415 # to stare at a seemingly-frozen dialog 416 success = True 417 return success