electrum-personal-server

Maximally lightweight electrum server for a single user
git clone https://git.parazyd.org/electrum-personal-server
Log | Files | Refs | README

electrumprotocol.py (21819B)


      1 import json
      2 import datetime
      3 import time
      4 import binascii
      5 import os
      6 import struct
      7 import tempfile
      8 import socket
      9 from collections import defaultdict
     10 
     11 from electrumpersonalserver.server.hashes import (
     12     hash_merkle_root,
     13     get_status_electrum,
     14     bytes_fmt
     15 )
     16 from .jsonrpc import JsonRpc, JsonRpcError
     17 from electrumpersonalserver.server.peertopeer import tor_broadcast_tx
     18 from electrumpersonalserver.server.merkleproof import (
     19     convert_core_to_electrum_merkle_proof
     20 )
     21 
     22 #protocol documentation
     23 #https://github.com/kyuupichan/electrumx/blob/master/docs/protocol-methods.rst
     24 
     25 SERVER_VERSION_NUMBER = "0.2.1.1"
     26 
     27 SERVER_PROTOCOL_VERSION_MAX = 1.4
     28 SERVER_PROTOCOL_VERSION_MIN = 1.1
     29 
     30 DONATION_ADDR = "bc1qwt8kh83dpdj4yuquvsf28rhcft2rjh6jvy6678"
     31 
     32 BANNER = \
     33 """Welcome to Electrum Personal Server {serverversion}
     34 
     35 Monitoring {detwallets} deterministic wallets, in total {addr} addresses.
     36 
     37 Connected bitcoin node: {useragent}
     38 Uptime: {uptime}
     39 Peers: {peers}
     40 Download: {recvbytes} ({recvbytesperday} per day)
     41 Upload: {sentbytes} ({sentbytesperday} per day)
     42 Blocksonly: {blocksonly}
     43 Pruning: {pruning}
     44 Blockchain size: {blockchainsizeondisk}
     45 {firstunprunedblock}
     46 https://github.com/chris-belcher/electrum-personal-server
     47 
     48 Donate to help improve Electrum Personal Server:
     49 {donationaddr}
     50 
     51 """
     52 
     53 class UnknownScripthashError(Exception):
     54     pass
     55 
     56 def get_tor_hostport():
     57     # Probable ports for Tor to listen at
     58     host = "127.0.0.1"
     59     ports = [9050, 9150]
     60     for port in ports:
     61         try:
     62             s = (socket._socketobject if hasattr(socket, "_socketobject")
     63                  else socket.socket)(socket.AF_INET, socket.SOCK_STREAM)
     64             s.settimeout(0.1)
     65             s.connect((host, port))
     66             # Tor responds uniquely to HTTP-like requests
     67             s.send(b"GET\n")
     68             if b"Tor is not an HTTP Proxy" in s.recv(1024):
     69                 return (host, port)
     70         except socket.error:
     71             pass
     72     return None
     73 
     74 
     75 def get_block_header(rpc, blockhash, raw=False):
     76     rpc_head = rpc.call("getblockheader", [blockhash])
     77     if "previousblockhash" in rpc_head:
     78         prevblockhash = rpc_head["previousblockhash"]
     79     else:
     80         prevblockhash = "00"*32 #genesis block
     81     if raw:
     82         head_hex = struct.pack("<i32s32sIII", rpc_head["version"],
     83             binascii.unhexlify(prevblockhash)[::-1],
     84             binascii.unhexlify(rpc_head["merkleroot"])[::-1],
     85             rpc_head["time"], int(rpc_head["bits"], 16), rpc_head["nonce"])
     86         head_hex = binascii.hexlify(head_hex).decode("utf-8")
     87         header = {"hex": head_hex, "height": rpc_head["height"]}
     88     else:
     89         header = {"block_height": rpc_head["height"],
     90                 "prev_block_hash": prevblockhash,
     91                 "timestamp": rpc_head["time"],
     92                 "merkle_root": rpc_head["merkleroot"],
     93                 "version": rpc_head["version"],
     94                 "nonce": rpc_head["nonce"],
     95                 "bits": int(rpc_head["bits"], 16)}
     96     return header
     97 
     98 def get_current_header(rpc, raw):
     99     bestblockhash = rpc.call("getbestblockhash", [])
    100     header = get_block_header(rpc, bestblockhash, raw)
    101     return bestblockhash, header
    102 
    103 def get_block_headers_hex(rpc, start_height, count):
    104     #read count number of headers starting from start_height
    105     result = bytearray()
    106     try:
    107         the_hash = rpc.call("getblockhash", [start_height])
    108     except JsonRpcError as e:
    109         return "", 0
    110     for i in range(count):
    111         header = rpc.call("getblockheader", [the_hash])
    112         #add header hex to result
    113         if "previousblockhash" in header:
    114             prevblockhash = header["previousblockhash"]
    115         else:
    116             prevblockhash = "00"*32 #genesis block
    117         h1 = struct.pack("<i32s32sIII", header["version"],
    118             binascii.unhexlify(prevblockhash)[::-1],
    119             binascii.unhexlify(header["merkleroot"])[::-1],
    120             header["time"], int(header["bits"], 16), header["nonce"])
    121         result.extend(h1)
    122         if "nextblockhash" not in header:
    123             break
    124         the_hash = header["nextblockhash"]
    125     return binascii.hexlify(result).decode("utf-8"), len(result)//80
    126 
    127 class ElectrumProtocol(object):
    128     """
    129     Class which implements the electrum protocol for one single connection. It
    130     does not handle the actual sockets, which could be any combination of 
    131     blocking/non-blocking, asyncio, twisted, etc
    132     This class may be instantized multiple times if the server accepts multiple
    133     client connections at once
    134     """
    135 
    136     def __init__(self, rpc, txmonitor, logger, broadcast_method,
    137             tor_hostport, mempool_sync):
    138         self.rpc = rpc
    139         self.txmonitor = txmonitor
    140         self.logger = logger
    141         self.broadcast_method = broadcast_method
    142         self.tor_hostport = tor_hostport
    143         self.mempool_sync = mempool_sync
    144 
    145         self.protocol_version = 0   
    146         self.subscribed_to_headers = False
    147         self.are_headers_raw = False
    148         self.txid_blockhash_map = {}
    149 
    150     def set_send_reply_fun(self, send_reply_fun):
    151         self.send_reply_fun = send_reply_fun
    152 
    153     def on_blockchain_tip_updated(self, header):
    154         if self.subscribed_to_headers:
    155             update = {"method": "blockchain.headers.subscribe", "params":
    156                 [header]}
    157             self._send_update(update)
    158 
    159     def on_updated_scripthashes(self, updated_scripthashes):
    160         for scrhash in updated_scripthashes:
    161             history_hash = self.txmonitor.get_electrum_history_hash(scrhash)
    162             update = {"method": "blockchain.scripthash.subscribe", "params": 
    163                 [scrhash, history_hash]}
    164             self._send_update(update)
    165 
    166     def on_disconnect(self):
    167         self.subscribed_to_headers = False
    168         self.txmonitor.unsubscribe_all_addresses()
    169 
    170     def _send_response(self, query, result):
    171         response = {"jsonrpc": "2.0", "result": result, "id": query["id"]}
    172         self.send_reply_fun(response)
    173 
    174     def _send_update(self, update):
    175         update["jsonrpc"] = "2.0"
    176         self.send_reply_fun(update)
    177 
    178     def _send_error(self, nid, error):
    179         payload = {"error": error, "jsonrpc": "2.0", "id": nid}
    180         self.send_reply_fun(payload)
    181 
    182     def handle_query(self, query):
    183         if "method" not in query:
    184             raise IOError("Bad client query, no \"method\"")
    185         method = query["method"]
    186 
    187         if method == "blockchain.transaction.get":
    188             txid = query["params"][0]
    189             tx = None
    190             try:
    191                 tx = self.rpc.call("gettransaction", [txid])["hex"]
    192             except JsonRpcError:
    193                 if txid in self.txid_blockhash_map:
    194                     tx = self.rpc.call("getrawtransaction", [txid, False,
    195                         self.txid_blockhash_map[txid]])
    196             if tx is not None:
    197                 self._send_response(query, tx)
    198             else:
    199                 self._send_error(query["id"], {"message": "txid not found"})
    200         elif method == "blockchain.transaction.get_merkle":
    201             txid = query["params"][0]
    202             try:
    203                 tx = self.rpc.call("gettransaction", [txid])
    204                 txheader = get_block_header(self.rpc, tx["blockhash"], False)
    205             except JsonRpcError as e:
    206                 self._send_error(query["id"], {"message": "txid not found"})
    207             else:
    208                 try:
    209                     core_proof = self.rpc.call("gettxoutproof", [[txid],
    210                         tx["blockhash"]])
    211                     electrum_proof = \
    212                         convert_core_to_electrum_merkle_proof(core_proof)
    213                     implied_merkle_root = hash_merkle_root(
    214                         electrum_proof["merkle"], txid, electrum_proof["pos"])
    215                     if implied_merkle_root != electrum_proof["merkleroot"]:
    216                         raise ValueError
    217                     reply = {"block_height": txheader["block_height"], "pos":
    218                         electrum_proof["pos"], "merkle":
    219                         electrum_proof["merkle"]}
    220                 except (ValueError, JsonRpcError) as e:
    221                     self.logger.info("merkle proof not found for " + txid
    222                         + " sending a dummy, Electrum client should be run "
    223                         + "with --skipmerklecheck")
    224                     #reply with a proof that the client with accept if
    225                     # its configured to not check the merkle proof
    226                     reply = {"block_height": txheader["block_height"], "pos": 0,
    227                         "merkle": [txid]}
    228                 self._send_response(query, reply)
    229         elif method == "blockchain.scripthash.subscribe":
    230             scrhash = query["params"][0]
    231             if self.txmonitor.subscribe_address(scrhash):
    232                 history_hash = self.txmonitor.get_electrum_history_hash(scrhash)
    233             else:
    234                 self.logger.warning("Address not known to server, hash(address)"
    235                     + " = " + scrhash + ".\nCheck that you've imported the "
    236                     + "master public key(s) correctly. The first three "
    237                     + "addresses of each key are printed out on startup,\nso "
    238                     + "check that they really are addresses you expect. In "
    239                     + "Electrum go to Wallet -> Information to get the right "
    240                     + "master public key.")
    241                 raise UnknownScripthashError(scrhash)
    242             self._send_response(query, history_hash)
    243         elif method == "blockchain.scripthash.get_history":
    244             scrhash = query["params"][0]
    245             history = self.txmonitor.get_electrum_history(scrhash)
    246             if history == None:
    247                 self.logger.warning("Address history not known to server, "
    248                     + "hash(address) = " + scrhash)
    249                 raise UnknownScripthashError(scrhash)
    250             self._send_response(query, history)
    251         elif method == "blockchain.scripthash.get_balance":
    252             scrhash = query["params"][0]
    253             balance = self.txmonitor.get_address_balance(scrhash)
    254             if balance == None:
    255                 self.logger.warning("Address history not known to server, "
    256                     + "hash(address) = " + scrhash)
    257                 raise UnknownScripthashError(scrhash)
    258             self._send_response(query, balance)
    259         elif method == "server.ping":
    260             self._send_response(query, None)
    261         elif method == "blockchain.headers.subscribe":
    262             if self.protocol_version in (1.2, 1.3):
    263                 if len(query["params"]) > 0:
    264                     self.are_headers_raw = query["params"][0]
    265                 else:
    266                     self.are_headers_raw = (False if self.protocol_version ==
    267                         1.2 else True)
    268             elif self.protocol_version == 1.4:
    269                 self.are_headers_raw = True
    270             self.logger.debug("are_headers_raw = " + str(self.are_headers_raw))
    271             self.subscribed_to_headers = True
    272             new_bestblockhash, header = get_current_header(self.rpc,
    273                 self.are_headers_raw)
    274             self._send_response(query, header)
    275         elif method == "blockchain.block.get_header":
    276             height = query["params"][0]
    277             try:
    278                 blockhash = self.rpc.call("getblockhash", [height])
    279                 #this deprecated method (as of 1.3) can only
    280                 # return non-raw headers
    281                 header = get_block_header(self.rpc, blockhash, False)
    282                 self._send_response(query, header)
    283             except JsonRpcError:
    284                 error = {"message": "height " + str(height) + " out of range",
    285                     "code": -1}
    286                 self._send_error(query["id"], error)
    287         elif method == "blockchain.block.header":
    288             height = query["params"][0]
    289             try:
    290                 blockhash = self.rpc.call("getblockhash", [height])
    291                 header = get_block_header(self.rpc, blockhash, True)
    292                 self._send_response(query, header["hex"])
    293             except JsonRpcError:
    294                 error = {"message": "height " + str(height) + " out of range",
    295                     "code": -1}
    296                 self._send_error(query["id"], error)
    297         elif method == "blockchain.block.headers":
    298             MAX_CHUNK_SIZE = 2016
    299             start_height = query["params"][0]
    300             count = query["params"][1]
    301             count = min(count, MAX_CHUNK_SIZE)
    302             headers_hex, n = get_block_headers_hex(self.rpc, start_height,
    303                 count)
    304             self._send_response(query, {'hex': headers_hex, 'count': n, 'max':
    305                 MAX_CHUNK_SIZE})
    306         elif method == "blockchain.block.get_chunk":
    307             RETARGET_INTERVAL = 2016
    308             index = query["params"][0]
    309             tip_height = self.rpc.call("getblockchaininfo", [])["headers"]
    310             #logic copied from electrumx get_chunk() in controller.py
    311             next_height = tip_height + 1
    312             start_height = min(index*RETARGET_INTERVAL, next_height)
    313             count = min(next_height - start_height, RETARGET_INTERVAL)
    314             headers_hex, n = get_block_headers_hex(self.rpc, start_height,
    315                 count)
    316             self._send_response(query, headers_hex)
    317         elif method == "blockchain.transaction.broadcast":
    318             txhex = query["params"][0]
    319             result = None
    320             error = None
    321             txreport = self.rpc.call("testmempoolaccept", [[txhex]])[0]
    322             if not txreport["allowed"]:
    323                 error = txreport["reject-reason"]
    324             else:
    325                 result = txreport["txid"]
    326                 broadcast_method = self.broadcast_method
    327                 self.logger.info('Broadcasting tx ' + txreport["txid"]
    328                     + " with broadcast method: " + broadcast_method)
    329                 if broadcast_method == "tor-or-own-node":
    330                     tor_hostport = get_tor_hostport()
    331                     if tor_hostport is not None:
    332                         self.logger.info("Tor detected at " + str(tor_hostport)
    333                             + ". Broadcasting through tor.")
    334                         broadcast_method = "tor"
    335                         self.tor_hostport = tor_hostport
    336                     else:
    337                         self.logger.info("Could not detect tor. Broadcasting "
    338                             + "through own node.")
    339                         broadcast_method = "own-node"
    340                 if broadcast_method == "own-node":
    341                     if not self.rpc.call("getnetworkinfo", [])["localrelay"]:
    342                         error = "Broadcast disabled when using blocksonly"
    343                         result = None
    344                         self.logger.warning("Transaction broadcasting disabled"
    345                             + " when blocksonly")
    346                     else:
    347                         try:
    348                             self.rpc.call("sendrawtransaction", [txhex])
    349                         except JsonRpcError as e:
    350                             self.logger.error("Error broadcasting: " + repr(e))
    351                 elif broadcast_method == "tor":
    352                     network = "mainnet"
    353                     chaininfo = self.rpc.call("getblockchaininfo", [])
    354                     if chaininfo["chain"] == "test":
    355                         network = "testnet"
    356                     elif chaininfo["chain"] == "regtest":
    357                         network = "regtest"
    358                     self.logger.debug("broadcasting to network: " + network)
    359                     success = tor_broadcast_tx(txhex, self.tor_hostport,
    360                         network, self.rpc, self.logger)
    361                     if not success:
    362                         result = None
    363                 elif broadcast_method.startswith("system "):
    364                     with tempfile.NamedTemporaryFile() as fd:
    365                         system_line = broadcast_method[7:].replace("%s",
    366                             fd.name)
    367                         fd.write(txhex.encode())
    368                         fd.flush()
    369                         self.logger.debug("running command: " + system_line)
    370                         os.system(system_line)
    371                 else:
    372                     self.logger.error("Unrecognized broadcast method = "
    373                         + broadcast_method)
    374                     result = None
    375                     error = "Unrecognized broadcast method"
    376             if result != None:
    377                 self._send_response(query, result)
    378             else:
    379                 self._send_error(query["id"], error)
    380         elif method == "mempool.get_fee_histogram":
    381             result = self.mempool_sync.get_fee_histogram()
    382             self.logger.debug("mempool entry count = "
    383                 + str(len(self.mempool_sync.mempool)))
    384             self._send_response(query, result)
    385         elif method == "blockchain.estimatefee":
    386             estimate = self.rpc.call("estimatesmartfee", [query["params"][0]])
    387             feerate = 0.0001
    388             if "feerate" in estimate:
    389                 feerate = estimate["feerate"]
    390             self._send_response(query, feerate)
    391         elif method == "blockchain.relayfee":
    392             networkinfo = self.rpc.call("getnetworkinfo", [])
    393             self._send_response(query, networkinfo["relayfee"])
    394         elif method == "server.banner":
    395             networkinfo = self.rpc.call("getnetworkinfo", [])
    396             blockchaininfo = self.rpc.call("getblockchaininfo", [])
    397             uptime = self.rpc.call("uptime", [])
    398             nettotals = self.rpc.call("getnettotals", [])
    399             uptime_days = uptime / 60.0 / 60 / 24
    400             first_unpruned_block_text = ""
    401             if blockchaininfo["pruned"]:
    402                 first_unpruned_block_time = self.rpc.call("getblockheader", [
    403                     self.rpc.call("getblockhash", [blockchaininfo[
    404                     "pruneheight"]])])["time"]
    405                 first_unpruned_block_text = ("First unpruned block: "
    406                     + str(blockchaininfo["pruneheight"]) + " ("
    407                     + str(
    408                     datetime.datetime.fromtimestamp(first_unpruned_block_time))
    409                     + ")\n")
    410             self._send_response(query, BANNER.format(
    411                 serverversion=SERVER_VERSION_NUMBER,
    412                 detwallets=len(self.txmonitor.deterministic_wallets),
    413                 addr=len(self.txmonitor.address_history),
    414                 useragent=networkinfo["subversion"],
    415                 uptime=str(datetime.timedelta(seconds=uptime)),
    416                 peers=networkinfo["connections"],
    417                 recvbytes=bytes_fmt(nettotals["totalbytesrecv"]),
    418                 recvbytesperday=bytes_fmt(
    419                     nettotals["totalbytesrecv"]/uptime_days),
    420                 sentbytes=bytes_fmt(nettotals["totalbytessent"]),
    421                 sentbytesperday=bytes_fmt(
    422                     nettotals["totalbytessent"]/uptime_days),
    423                 blocksonly=not networkinfo["localrelay"],
    424                 pruning=blockchaininfo["pruned"],
    425                 blockchainsizeondisk=bytes_fmt(
    426                     blockchaininfo["size_on_disk"]),
    427                 firstunprunedblock=first_unpruned_block_text,
    428                 donationaddr=DONATION_ADDR))
    429         elif method == "server.donation_address":
    430             self._send_response(query, DONATION_ADDR)
    431         elif method == "server.version":
    432             if len(query["params"]) > 0:
    433                 client_protocol_version = query["params"][1]
    434                 if isinstance(client_protocol_version, list):
    435                     client_min, client_max = float(client_min)
    436                 else:
    437                     client_min = float(query["params"][1])
    438                     client_max = client_min
    439             else:
    440                 #it seems some clients like bluewallet dont provide a version
    441                 #just assume the client is compatible with us then
    442                 client_min = SERVER_PROTOCOL_VERSION_MIN
    443                 client_max = SERVER_PROTOCOL_VERSION_MAX
    444             self.protocol_version = min(client_max, SERVER_PROTOCOL_VERSION_MAX)
    445             if self.protocol_version < max(client_min,
    446                     SERVER_PROTOCOL_VERSION_MIN):
    447                 logging.error("*** Client protocol version " + str(
    448                     client_protocol_version) + " not supported, update needed")
    449                 raise ConnectionRefusedError()
    450             self._send_response(query, ["ElectrumPersonalServer "
    451                 + SERVER_VERSION_NUMBER, str(self.protocol_version)])
    452         elif method == "server.peers.subscribe":
    453             self._send_response(query, []) #no peers to report
    454         elif method == "blockchain.transaction.id_from_pos":
    455             height = query["params"][0]
    456             tx_pos = query["params"][1]
    457             merkle = False
    458             if len(query["params"]) > 2:
    459                 merkle = query["params"][2]
    460             try:
    461                 blockhash = self.rpc.call("getblockhash", [height])
    462                 block = self.rpc.call("getblock", [blockhash, 1])
    463                 txid = block["tx"][tx_pos]
    464                 self.txid_blockhash_map[txid] = blockhash
    465                 if not merkle:
    466                     result = txid
    467                 else:
    468                     core_proof = self.rpc.call("gettxoutproof", [[txid],
    469                         blockhash])
    470                     electrum_proof =\
    471                         convert_core_to_electrum_merkle_proof(core_proof)
    472                     result = {"tx_hash": txid, "merkle": electrum_proof[
    473                         "merkle"]}
    474                 self._send_response(query, result)
    475             except JsonRpcError as e:
    476                 error = {"message": repr(e)}
    477                 self._send_error(query["id"], error)
    478         else:
    479             self.logger.error("*** BUG! Not handling method: " + method
    480                 + " query=" + str(query))
    481