electrum-personal-server

Maximally lightweight electrum server for a single user
git clone https://git.parazyd.org/electrum-personal-server
Log | Files | Refs | README

peertopeer.py (16716B)


      1 #! /usr/bin/env python
      2 
      3 import socket
      4 import time
      5 import base64
      6 import threading
      7 import queue
      8 import random
      9 from struct import pack, unpack
     10 from datetime import datetime
     11 
     12 import electrumpersonalserver.bitcoin as btc
     13 from electrumpersonalserver.server.socks import (
     14     socksocket,
     15     setdefaultproxy,
     16     PROXY_TYPE_SOCKS5
     17 )
     18 from electrumpersonalserver.server.jsonrpc import JsonRpcError
     19 
     20 PROTOCOL_VERSION = 70016
     21 DEFAULT_USER_AGENT = '/Satoshi:0.21.0/'
     22 
     23 #https://github.com/bitcoin/bitcoin/blob/master/src/protocol.h
     24 NODE_NETWORK = 1
     25 NODE_BLOOM = 1 << 2
     26 NODE_WITNESS = 1 << 3
     27 NODE_NETWORK_LIMITED = 1 << 10
     28 
     29 # protocol versions above this also send a relay boolean
     30 RELAY_TX_VERSION = 70001
     31 
     32 # length of bitcoin p2p packets
     33 HEADER_LENGTH = 24
     34 
     35 # if no message has been seen for this many seconds, send a ping
     36 KEEPALIVE_INTERVAL = 2 * 60
     37 
     38 # close connection if keep alive ping isnt responded to in this many seconds
     39 KEEPALIVE_TIMEOUT = 20 * 60
     40 
     41 
     42 def ip_to_hex(ip_str):
     43     # ipv4 only for now
     44     return socket.inet_pton(socket.AF_INET, ip_str)
     45 
     46 def create_net_addr(hexip, port): # doesnt contain time as in bitcoin wiki
     47     services = 0
     48     hex = bytes(10) + b'\xFF\xFF' + hexip
     49     return pack('<Q16s', services, hex) + pack('>H', port)
     50 
     51 def create_var_str(s):
     52     return btc.num_to_var_int(len(s)) + s.encode()
     53 
     54 def read_int(ptr, payload, n, littleendian=True):
     55     data = payload[ptr[0] : ptr[0]+n]
     56     if littleendian:
     57         data = data[::-1]
     58     ret =  btc.decode(data, 256)
     59     ptr[0] += n
     60     return ret
     61 
     62 def read_var_int(ptr, payload):
     63     val = payload[ptr[0]]
     64     ptr[0] += 1
     65     if val < 253:
     66         return val
     67     return read_int(ptr, payload, 2**(val - 252))
     68 
     69 def read_var_str(ptr, payload):
     70     l = read_var_int(ptr, payload)
     71     ret = payload[ptr[0]: ptr[0] + l]
     72     ptr[0] += l
     73     return ret
     74 
     75 def ip_hex_to_str(ip_hex):
     76     # https://en.wikipedia.org/wiki/IPv6#IPv4-mapped_IPv6_addresses
     77     # https://www.cypherpunk.at/onioncat_trac/wiki/OnionCat
     78     if ip_hex[:14] == '\x00'*10 + '\xff'*2:
     79         # ipv4 mapped ipv6 addr
     80         return socket.inet_ntoa(ip_hex[12:])
     81     elif ip_hex[:6] == '\xfd\x87\xd8\x7e\xeb\x43':
     82         return base64.b32encode(ip_hex[6:]).lower() + '.onion'
     83     else:
     84         return socket.inet_ntop(socket.AF_INET6, ip_hex)
     85 
     86 class P2PMessageHandler(object):
     87     def __init__(self, logger):
     88         self.last_message = datetime.now()
     89         self.waiting_for_keepalive = False
     90         self.logger = logger
     91 
     92     def check_keepalive(self, p2p):
     93         if self.waiting_for_keepalive:
     94             if ((datetime.now() - self.last_message).total_seconds()
     95                     < KEEPALIVE_TIMEOUT):
     96                 return
     97             self.logger.debug('keepalive timed out, closing')
     98             p2p.sock.close()
     99         else:
    100             if ((datetime.now() - self.last_message).total_seconds()
    101                     < KEEPALIVE_INTERVAL):
    102                 return
    103             self.logger.debug('sending keepalive to peer')
    104             self.waiting_for_keepalive = True
    105             p2p.sock.sendall(p2p.create_message('ping', '\x00'*8))
    106 
    107     def handle_message(self, p2p, command, length, payload):
    108         self.last_message = datetime.now()
    109         self.waiting_for_keepalive = False
    110         ptr = [0]
    111         if command == b'version':
    112             version = read_int(ptr, payload, 4)
    113             services = read_int(ptr, payload, 8)
    114             timestamp = read_int(ptr, payload, 8)
    115             addr_recv_services = read_int(ptr, payload, 8)
    116             addr_recv_ip = payload[ptr[0] : ptr[0]+16]
    117             ptr[0] += 16
    118             addr_recv_port = read_int(ptr, payload, 2, False)
    119             addr_trans_services = read_int(ptr, payload, 8)
    120             addr_trans_ip = payload[ptr[0] : ptr[0]+16]
    121             ptr[0] += 16
    122             addr_trans_port = read_int(ptr, payload, 2, False)
    123             ptr[0] += 8 # skip over nonce
    124             user_agent = read_var_str(ptr, payload)
    125             start_height = read_int(ptr, payload, 4)
    126             if version > RELAY_TX_VERSION:
    127                 relay = read_int(ptr, payload, 1) != 0
    128             else:
    129                 # must check node accepts unconfirmed txes before broadcasting
    130                 relay = True
    131             self.logger.debug(('Received peer version message: version=%d'
    132                 + ' services=0x%x'
    133                 + ' timestamp=%s user_agent=%s start_height=%d relay=%i'
    134                 + ' them=%s:%d us=%s:%d') % (version,
    135                 services, str(datetime.fromtimestamp(timestamp)),
    136                 user_agent, start_height, relay, ip_hex_to_str(addr_trans_ip)
    137                 , addr_trans_port, ip_hex_to_str(addr_recv_ip), addr_recv_port))
    138             p2p.sock.sendall(p2p.create_message('verack', b''))
    139             self.on_recv_version(p2p, version, services, timestamp,
    140                 addr_recv_services, addr_recv_ip, addr_trans_services,
    141                 addr_trans_ip, addr_trans_port, user_agent, start_height,
    142                 relay)
    143         elif command == b'verack':
    144             self.on_connected(p2p)
    145         elif command == b'ping':
    146             p2p.sock.sendall(p2p.create_message('pong', payload))
    147 
    148     # optional override these in a subclass
    149 
    150     def on_recv_version(self, p2p, version, services, timestamp,
    151             addr_recv_services, addr_recv_ip, addr_trans_services,
    152             addr_trans_ip, addr_trans_port, user_agent, start_height, relay):
    153         pass
    154 
    155     def on_connected(self, p2p):
    156         pass
    157 
    158     def on_heartbeat(self, p2p):
    159         pass
    160 
    161 
    162 class P2PProtocol(object):
    163     def __init__(self, p2p_message_handler, remote_hostport,
    164                  network, logger, notify_queue, user_agent=DEFAULT_USER_AGENT,
    165                  socks5_hostport=("localhost", 9050), connect_timeout=30,
    166                  heartbeat_interval=15, start_height=0):
    167         self.p2p_message_handler = p2p_message_handler
    168         self.remote_hostport = remote_hostport
    169         self.logger = logger
    170         self.notify_queue = notify_queue
    171         self.user_agent = user_agent
    172         self.socks5_hostport = socks5_hostport
    173         self.connect_timeout = connect_timeout
    174         self.heartbeat_interval = heartbeat_interval
    175         self.start_height = start_height
    176         if network == "testnet":
    177             self.magic = 0x0709110b
    178         elif network == "regtest":
    179             self.magic = 0xdab5bffa
    180         else:
    181             self.magic = 0xd9b4bef9
    182         self.closed = False
    183 
    184     def run(self):
    185         services = (NODE_NETWORK | NODE_WITNESS | NODE_NETWORK_LIMITED)
    186         st = int(time.time())
    187         nonce = random.getrandbits(64)
    188 
    189         netaddr_them = create_net_addr(ip_to_hex('0.0.0.0'), 0)
    190         netaddr_us = create_net_addr(ip_to_hex('0.0.0.0'), 0)
    191         version_message = (pack('<iQQ', PROTOCOL_VERSION, services, st)
    192                            + netaddr_them
    193                            + netaddr_us
    194                            + pack('<Q', nonce)
    195                            + create_var_str(self.user_agent)
    196                            + pack('<I', self.start_height)
    197                            + b'\x01')
    198 
    199         self.logger.debug('Connecting to bitcoin peer at ' +
    200                 str(self.remote_hostport) + ' with proxy ' +
    201                 str(self.socks5_hostport))
    202         setdefaultproxy(PROXY_TYPE_SOCKS5, self.socks5_hostport[0],
    203                         self.socks5_hostport[1], True)
    204         self.sock = socksocket()
    205         self.sock.settimeout(self.connect_timeout)
    206         self.sock.connect(self.remote_hostport)
    207         self.sock.sendall(self.create_message('version', version_message))
    208 
    209         self.sock.settimeout(self.heartbeat_interval)
    210         self.closed = False
    211         try:
    212             recv_buffer = b''
    213             payload_length = -1  # -1 means waiting for header
    214             command = None
    215             checksum = None
    216             while not self.closed:
    217                 try:
    218                     recv_data = self.sock.recv(4096)
    219                     if not recv_data or len(recv_data) == 0:
    220                         raise EOFError()
    221                     recv_buffer += recv_data
    222                     # this is O(N^2) scaling in time, another way would be to
    223                     # store in a list and combine at the end with "".join()
    224                     # but this isnt really timing critical so didnt optimize it
    225 
    226                     data_remaining = True
    227                     while data_remaining and not self.closed:
    228                         if payload_length == -1 and (len(recv_buffer)
    229                                 >= HEADER_LENGTH):
    230                             net_magic, command, payload_length, checksum =\
    231                                 unpack('<I12sI4s', recv_buffer[:HEADER_LENGTH])
    232                             recv_buffer = recv_buffer[HEADER_LENGTH:]
    233                             if net_magic != self.magic:
    234                                 self.logger.debug('wrong MAGIC: ' +
    235                                     hex(net_magic))
    236                                 self.sock.close()
    237                                 break
    238                             command = command.strip(b'\0')
    239                         else:
    240                             if payload_length >= 0 and (len(recv_buffer)
    241                                     >= payload_length):
    242                                 payload = recv_buffer[:payload_length]
    243                                 recv_buffer = recv_buffer[payload_length:]
    244                                 if btc.bin_dbl_sha256(payload)[:4] == checksum:
    245                                     self.p2p_message_handler.handle_message(
    246                                         self, command, payload_length, payload)
    247                                 else:
    248                                     self.logger.debug("wrong checksum, " +
    249                                         "dropping " +
    250                                         "message, cmd=" + command +
    251                                         " payloadlen=" + str(payload_length))
    252                                 payload_length = -1
    253                                 data_remaining = True
    254                             else:
    255                                 data_remaining = False
    256                 except socket.timeout:
    257                     self.p2p_message_handler.check_keepalive(self)
    258                     self.p2p_message_handler.on_heartbeat(self)
    259         except EOFError as e:
    260             self.closed = True
    261         except IOError as e:
    262             import traceback
    263             self.logger.debug("logging traceback from %s: \n" %
    264                 traceback.format_exc())
    265             self.closed = True
    266         finally:
    267             try:
    268                 self.sock.close()
    269             except Exception as _:
    270                 pass
    271 
    272     def close(self):
    273         self.closed = True
    274 
    275     def create_message(self, command, payload):
    276         return (pack("<I12sI", self.magic, command.encode(), len(payload))
    277             + btc.bin_dbl_sha256(payload)[:4] + payload)
    278 
    279 class P2PBroadcastTx(P2PMessageHandler):
    280     def __init__(self, txhex, logger, notify_queue):
    281         P2PMessageHandler.__init__(self, logger)
    282         self.txhex = bytes.fromhex(txhex)
    283         self.txid = btc.bin_txhash(self.txhex)
    284         self.uploaded_tx = False
    285         self.time_marker = datetime.now()
    286         self.connected = False
    287         self.notify_queue = notify_queue
    288 
    289     def on_recv_version(self, p2p, version, services, timestamp,
    290             addr_recv_services, addr_recv_ip, addr_trans_services,
    291             addr_trans_ip, addr_trans_port, user_agent, start_height, relay):
    292         if not relay:
    293             self.logger.debug('peer not accepting unconfirmed txes, trying ' +
    294                 'another')
    295             # this happens if the other node is using blockonly=1
    296             p2p.close()
    297         if not services & NODE_WITNESS:
    298             self.logger.debug('peer not accepting witness data, trying another')
    299             p2p.close()
    300 
    301     def on_connected(self, p2p):
    302         MSG = 1 #msg_tx
    303         inv_payload = pack('<BI', 1, MSG) + self.txid
    304         p2p.sock.sendall(p2p.create_message('inv', inv_payload))
    305         self.time_marker = datetime.now()
    306         self.uploaded_tx = False
    307         self.connected = True
    308 
    309     def on_heartbeat(self, p2p):
    310         self.logger.debug('broadcaster heartbeat')
    311         VERACK_TIMEOUT = 40
    312         GETDATA_TIMEOUT = 60
    313         if not self.connected:
    314             if ((datetime.now() - self.time_marker).total_seconds()
    315                     < VERACK_TIMEOUT):
    316                 return
    317             self.logger.debug('timed out of waiting for verack')
    318         else:
    319             if ((datetime.now() - self.time_marker).total_seconds()
    320                     < GETDATA_TIMEOUT):
    321                 return
    322             self.logger.debug('timed out in waiting for getdata, node ' +
    323                 'already has tx')
    324             self.uploaded_tx = True
    325         p2p.close()
    326 
    327     def handle_message(self, p2p, command, length, payload):
    328         P2PMessageHandler.handle_message(self, p2p, command, length, payload)
    329         ptr = [0]
    330         if command == b'getdata':
    331             count = read_var_int(ptr, payload)
    332             for _ in range(count):
    333                 ptr[0] += 4
    334                 hash_id = payload[ptr[0] : ptr[0] + 32]
    335                 ptr[0] += 32
    336                 if hash_id == self.txid:
    337                     p2p.sock.sendall(p2p.create_message('tx', self.txhex))
    338                     self.uploaded_tx = True
    339                     self.logger.debug("Uploaded transaction via tor to peer at "
    340                         + str(p2p.remote_hostport))
    341                     self.notify_queue.put(True)
    342                     ##make sure the packets really got through by sleeping
    343                     ##some kernels seem to send a RST packet on close() even
    344                     ##if theres still data in the send buffer
    345                     time.sleep(5)
    346                     p2p.close()
    347 
    348 def broadcaster_thread(txhex, node_addrs, tor_hostport, network, logger,
    349         start_height, notify_queue):
    350     for node_addr in node_addrs:
    351         remote_hostport = (node_addr["address"], node_addr["port"])
    352         p2p_msg_handler = P2PBroadcastTx(txhex, logger, notify_queue)
    353         p2p = P2PProtocol(p2p_msg_handler, remote_hostport,
    354             network, logger, notify_queue, socks5_hostport=tor_hostport,
    355             heartbeat_interval=20, start_height=start_height)
    356         try:
    357             p2p.run()
    358         except IOError as e:
    359             logger.debug("p2p.run() exited: " + repr(e))
    360             continue
    361         if p2p_msg_handler.uploaded_tx:
    362             break
    363     logger.debug("Exiting tor broadcast thread, uploaded_tx = " +
    364         str(p2p_msg_handler.uploaded_tx))
    365     if not p2p_msg_handler.uploaded_tx:
    366         notify_queue.put(False)
    367     return p2p_msg_handler.uploaded_tx
    368 
    369 def chunk_list(d, n):
    370     return [d[x:x + n] for x in range(0, len(d), n)]
    371 
    372 def tor_broadcast_tx(txhex, tor_hostport, network, rpc, logger):
    373     CONNECTION_THREADS = 8
    374     CONNECTION_ATTEMPTS_PER_THREAD = 10
    375 
    376     required_address_count = CONNECTION_ATTEMPTS_PER_THREAD * CONNECTION_THREADS
    377     node_addrs_witness = []
    378     while True:
    379         try:
    380             new_node_addrs = rpc.call("getnodeaddresses",
    381                 [3*required_address_count//2])
    382         except JsonRpcError as e:
    383             logger.debug(repr(e))
    384             logger.error("Bitcoin Core v0.18.0 or higher is required "
    385                 "to broadcast through Tor")
    386             return False
    387         node_addrs_witness.extend(
    388             [a for a in new_node_addrs if a["services"] & NODE_WITNESS]
    389         )
    390         logger.debug("len(new_node_addrs) = " + str(len(new_node_addrs)) +
    391             " len(node_addrs_witness) = " + str(len(node_addrs_witness)))
    392         if len(node_addrs_witness) > required_address_count:
    393             break
    394     node_addrs_chunks = chunk_list(
    395         node_addrs_witness[:required_address_count],
    396         CONNECTION_ATTEMPTS_PER_THREAD
    397     )
    398     notify_queue = queue.Queue()
    399     start_height = rpc.call("getblockcount", [])
    400     for node_addrs in node_addrs_chunks:
    401         t = threading.Thread(target=broadcaster_thread,
    402             args=(txhex, node_addrs, tor_hostport, network, logger,
    403                 start_height, notify_queue),
    404             daemon=True)
    405         t.start()
    406     try:
    407         success = notify_queue.get(block=True, timeout=20)
    408     except queue.Empty:
    409         logger.debug("Timed out getting notification for broadcasting "
    410             + "transaction")
    411         #the threads will maybe still continue to try broadcasting even
    412         # after this timeout
    413         #could time out at 20 seconds for any legitimate reason, tor is slow
    414         # so no point failing, this timeout is just so the user doesnt have
    415         # to stare at a seemingly-frozen dialog
    416         success = True
    417     return success