electrumprotocol.py (21819B)
1 import json 2 import datetime 3 import time 4 import binascii 5 import os 6 import struct 7 import tempfile 8 import socket 9 from collections import defaultdict 10 11 from electrumpersonalserver.server.hashes import ( 12 hash_merkle_root, 13 get_status_electrum, 14 bytes_fmt 15 ) 16 from .jsonrpc import JsonRpc, JsonRpcError 17 from electrumpersonalserver.server.peertopeer import tor_broadcast_tx 18 from electrumpersonalserver.server.merkleproof import ( 19 convert_core_to_electrum_merkle_proof 20 ) 21 22 #protocol documentation 23 #https://github.com/kyuupichan/electrumx/blob/master/docs/protocol-methods.rst 24 25 SERVER_VERSION_NUMBER = "0.2.1.1" 26 27 SERVER_PROTOCOL_VERSION_MAX = 1.4 28 SERVER_PROTOCOL_VERSION_MIN = 1.1 29 30 DONATION_ADDR = "bc1qwt8kh83dpdj4yuquvsf28rhcft2rjh6jvy6678" 31 32 BANNER = \ 33 """Welcome to Electrum Personal Server {serverversion} 34 35 Monitoring {detwallets} deterministic wallets, in total {addr} addresses. 36 37 Connected bitcoin node: {useragent} 38 Uptime: {uptime} 39 Peers: {peers} 40 Download: {recvbytes} ({recvbytesperday} per day) 41 Upload: {sentbytes} ({sentbytesperday} per day) 42 Blocksonly: {blocksonly} 43 Pruning: {pruning} 44 Blockchain size: {blockchainsizeondisk} 45 {firstunprunedblock} 46 https://github.com/chris-belcher/electrum-personal-server 47 48 Donate to help improve Electrum Personal Server: 49 {donationaddr} 50 51 """ 52 53 class UnknownScripthashError(Exception): 54 pass 55 56 def get_tor_hostport(): 57 # Probable ports for Tor to listen at 58 host = "127.0.0.1" 59 ports = [9050, 9150] 60 for port in ports: 61 try: 62 s = (socket._socketobject if hasattr(socket, "_socketobject") 63 else socket.socket)(socket.AF_INET, socket.SOCK_STREAM) 64 s.settimeout(0.1) 65 s.connect((host, port)) 66 # Tor responds uniquely to HTTP-like requests 67 s.send(b"GET\n") 68 if b"Tor is not an HTTP Proxy" in s.recv(1024): 69 return (host, port) 70 except socket.error: 71 pass 72 return None 73 74 75 def get_block_header(rpc, blockhash, raw=False): 76 rpc_head = rpc.call("getblockheader", [blockhash]) 77 if "previousblockhash" in rpc_head: 78 prevblockhash = rpc_head["previousblockhash"] 79 else: 80 prevblockhash = "00"*32 #genesis block 81 if raw: 82 head_hex = struct.pack("<i32s32sIII", rpc_head["version"], 83 binascii.unhexlify(prevblockhash)[::-1], 84 binascii.unhexlify(rpc_head["merkleroot"])[::-1], 85 rpc_head["time"], int(rpc_head["bits"], 16), rpc_head["nonce"]) 86 head_hex = binascii.hexlify(head_hex).decode("utf-8") 87 header = {"hex": head_hex, "height": rpc_head["height"]} 88 else: 89 header = {"block_height": rpc_head["height"], 90 "prev_block_hash": prevblockhash, 91 "timestamp": rpc_head["time"], 92 "merkle_root": rpc_head["merkleroot"], 93 "version": rpc_head["version"], 94 "nonce": rpc_head["nonce"], 95 "bits": int(rpc_head["bits"], 16)} 96 return header 97 98 def get_current_header(rpc, raw): 99 bestblockhash = rpc.call("getbestblockhash", []) 100 header = get_block_header(rpc, bestblockhash, raw) 101 return bestblockhash, header 102 103 def get_block_headers_hex(rpc, start_height, count): 104 #read count number of headers starting from start_height 105 result = bytearray() 106 try: 107 the_hash = rpc.call("getblockhash", [start_height]) 108 except JsonRpcError as e: 109 return "", 0 110 for i in range(count): 111 header = rpc.call("getblockheader", [the_hash]) 112 #add header hex to result 113 if "previousblockhash" in header: 114 prevblockhash = header["previousblockhash"] 115 else: 116 prevblockhash = "00"*32 #genesis block 117 h1 = struct.pack("<i32s32sIII", header["version"], 118 binascii.unhexlify(prevblockhash)[::-1], 119 binascii.unhexlify(header["merkleroot"])[::-1], 120 header["time"], int(header["bits"], 16), header["nonce"]) 121 result.extend(h1) 122 if "nextblockhash" not in header: 123 break 124 the_hash = header["nextblockhash"] 125 return binascii.hexlify(result).decode("utf-8"), len(result)//80 126 127 class ElectrumProtocol(object): 128 """ 129 Class which implements the electrum protocol for one single connection. It 130 does not handle the actual sockets, which could be any combination of 131 blocking/non-blocking, asyncio, twisted, etc 132 This class may be instantized multiple times if the server accepts multiple 133 client connections at once 134 """ 135 136 def __init__(self, rpc, txmonitor, logger, broadcast_method, 137 tor_hostport, mempool_sync): 138 self.rpc = rpc 139 self.txmonitor = txmonitor 140 self.logger = logger 141 self.broadcast_method = broadcast_method 142 self.tor_hostport = tor_hostport 143 self.mempool_sync = mempool_sync 144 145 self.protocol_version = 0 146 self.subscribed_to_headers = False 147 self.are_headers_raw = False 148 self.txid_blockhash_map = {} 149 150 def set_send_reply_fun(self, send_reply_fun): 151 self.send_reply_fun = send_reply_fun 152 153 def on_blockchain_tip_updated(self, header): 154 if self.subscribed_to_headers: 155 update = {"method": "blockchain.headers.subscribe", "params": 156 [header]} 157 self._send_update(update) 158 159 def on_updated_scripthashes(self, updated_scripthashes): 160 for scrhash in updated_scripthashes: 161 history_hash = self.txmonitor.get_electrum_history_hash(scrhash) 162 update = {"method": "blockchain.scripthash.subscribe", "params": 163 [scrhash, history_hash]} 164 self._send_update(update) 165 166 def on_disconnect(self): 167 self.subscribed_to_headers = False 168 self.txmonitor.unsubscribe_all_addresses() 169 170 def _send_response(self, query, result): 171 response = {"jsonrpc": "2.0", "result": result, "id": query["id"]} 172 self.send_reply_fun(response) 173 174 def _send_update(self, update): 175 update["jsonrpc"] = "2.0" 176 self.send_reply_fun(update) 177 178 def _send_error(self, nid, error): 179 payload = {"error": error, "jsonrpc": "2.0", "id": nid} 180 self.send_reply_fun(payload) 181 182 def handle_query(self, query): 183 if "method" not in query: 184 raise IOError("Bad client query, no \"method\"") 185 method = query["method"] 186 187 if method == "blockchain.transaction.get": 188 txid = query["params"][0] 189 tx = None 190 try: 191 tx = self.rpc.call("gettransaction", [txid])["hex"] 192 except JsonRpcError: 193 if txid in self.txid_blockhash_map: 194 tx = self.rpc.call("getrawtransaction", [txid, False, 195 self.txid_blockhash_map[txid]]) 196 if tx is not None: 197 self._send_response(query, tx) 198 else: 199 self._send_error(query["id"], {"message": "txid not found"}) 200 elif method == "blockchain.transaction.get_merkle": 201 txid = query["params"][0] 202 try: 203 tx = self.rpc.call("gettransaction", [txid]) 204 txheader = get_block_header(self.rpc, tx["blockhash"], False) 205 except JsonRpcError as e: 206 self._send_error(query["id"], {"message": "txid not found"}) 207 else: 208 try: 209 core_proof = self.rpc.call("gettxoutproof", [[txid], 210 tx["blockhash"]]) 211 electrum_proof = \ 212 convert_core_to_electrum_merkle_proof(core_proof) 213 implied_merkle_root = hash_merkle_root( 214 electrum_proof["merkle"], txid, electrum_proof["pos"]) 215 if implied_merkle_root != electrum_proof["merkleroot"]: 216 raise ValueError 217 reply = {"block_height": txheader["block_height"], "pos": 218 electrum_proof["pos"], "merkle": 219 electrum_proof["merkle"]} 220 except (ValueError, JsonRpcError) as e: 221 self.logger.info("merkle proof not found for " + txid 222 + " sending a dummy, Electrum client should be run " 223 + "with --skipmerklecheck") 224 #reply with a proof that the client with accept if 225 # its configured to not check the merkle proof 226 reply = {"block_height": txheader["block_height"], "pos": 0, 227 "merkle": [txid]} 228 self._send_response(query, reply) 229 elif method == "blockchain.scripthash.subscribe": 230 scrhash = query["params"][0] 231 if self.txmonitor.subscribe_address(scrhash): 232 history_hash = self.txmonitor.get_electrum_history_hash(scrhash) 233 else: 234 self.logger.warning("Address not known to server, hash(address)" 235 + " = " + scrhash + ".\nCheck that you've imported the " 236 + "master public key(s) correctly. The first three " 237 + "addresses of each key are printed out on startup,\nso " 238 + "check that they really are addresses you expect. In " 239 + "Electrum go to Wallet -> Information to get the right " 240 + "master public key.") 241 raise UnknownScripthashError(scrhash) 242 self._send_response(query, history_hash) 243 elif method == "blockchain.scripthash.get_history": 244 scrhash = query["params"][0] 245 history = self.txmonitor.get_electrum_history(scrhash) 246 if history == None: 247 self.logger.warning("Address history not known to server, " 248 + "hash(address) = " + scrhash) 249 raise UnknownScripthashError(scrhash) 250 self._send_response(query, history) 251 elif method == "blockchain.scripthash.get_balance": 252 scrhash = query["params"][0] 253 balance = self.txmonitor.get_address_balance(scrhash) 254 if balance == None: 255 self.logger.warning("Address history not known to server, " 256 + "hash(address) = " + scrhash) 257 raise UnknownScripthashError(scrhash) 258 self._send_response(query, balance) 259 elif method == "server.ping": 260 self._send_response(query, None) 261 elif method == "blockchain.headers.subscribe": 262 if self.protocol_version in (1.2, 1.3): 263 if len(query["params"]) > 0: 264 self.are_headers_raw = query["params"][0] 265 else: 266 self.are_headers_raw = (False if self.protocol_version == 267 1.2 else True) 268 elif self.protocol_version == 1.4: 269 self.are_headers_raw = True 270 self.logger.debug("are_headers_raw = " + str(self.are_headers_raw)) 271 self.subscribed_to_headers = True 272 new_bestblockhash, header = get_current_header(self.rpc, 273 self.are_headers_raw) 274 self._send_response(query, header) 275 elif method == "blockchain.block.get_header": 276 height = query["params"][0] 277 try: 278 blockhash = self.rpc.call("getblockhash", [height]) 279 #this deprecated method (as of 1.3) can only 280 # return non-raw headers 281 header = get_block_header(self.rpc, blockhash, False) 282 self._send_response(query, header) 283 except JsonRpcError: 284 error = {"message": "height " + str(height) + " out of range", 285 "code": -1} 286 self._send_error(query["id"], error) 287 elif method == "blockchain.block.header": 288 height = query["params"][0] 289 try: 290 blockhash = self.rpc.call("getblockhash", [height]) 291 header = get_block_header(self.rpc, blockhash, True) 292 self._send_response(query, header["hex"]) 293 except JsonRpcError: 294 error = {"message": "height " + str(height) + " out of range", 295 "code": -1} 296 self._send_error(query["id"], error) 297 elif method == "blockchain.block.headers": 298 MAX_CHUNK_SIZE = 2016 299 start_height = query["params"][0] 300 count = query["params"][1] 301 count = min(count, MAX_CHUNK_SIZE) 302 headers_hex, n = get_block_headers_hex(self.rpc, start_height, 303 count) 304 self._send_response(query, {'hex': headers_hex, 'count': n, 'max': 305 MAX_CHUNK_SIZE}) 306 elif method == "blockchain.block.get_chunk": 307 RETARGET_INTERVAL = 2016 308 index = query["params"][0] 309 tip_height = self.rpc.call("getblockchaininfo", [])["headers"] 310 #logic copied from electrumx get_chunk() in controller.py 311 next_height = tip_height + 1 312 start_height = min(index*RETARGET_INTERVAL, next_height) 313 count = min(next_height - start_height, RETARGET_INTERVAL) 314 headers_hex, n = get_block_headers_hex(self.rpc, start_height, 315 count) 316 self._send_response(query, headers_hex) 317 elif method == "blockchain.transaction.broadcast": 318 txhex = query["params"][0] 319 result = None 320 error = None 321 txreport = self.rpc.call("testmempoolaccept", [[txhex]])[0] 322 if not txreport["allowed"]: 323 error = txreport["reject-reason"] 324 else: 325 result = txreport["txid"] 326 broadcast_method = self.broadcast_method 327 self.logger.info('Broadcasting tx ' + txreport["txid"] 328 + " with broadcast method: " + broadcast_method) 329 if broadcast_method == "tor-or-own-node": 330 tor_hostport = get_tor_hostport() 331 if tor_hostport is not None: 332 self.logger.info("Tor detected at " + str(tor_hostport) 333 + ". Broadcasting through tor.") 334 broadcast_method = "tor" 335 self.tor_hostport = tor_hostport 336 else: 337 self.logger.info("Could not detect tor. Broadcasting " 338 + "through own node.") 339 broadcast_method = "own-node" 340 if broadcast_method == "own-node": 341 if not self.rpc.call("getnetworkinfo", [])["localrelay"]: 342 error = "Broadcast disabled when using blocksonly" 343 result = None 344 self.logger.warning("Transaction broadcasting disabled" 345 + " when blocksonly") 346 else: 347 try: 348 self.rpc.call("sendrawtransaction", [txhex]) 349 except JsonRpcError as e: 350 self.logger.error("Error broadcasting: " + repr(e)) 351 elif broadcast_method == "tor": 352 network = "mainnet" 353 chaininfo = self.rpc.call("getblockchaininfo", []) 354 if chaininfo["chain"] == "test": 355 network = "testnet" 356 elif chaininfo["chain"] == "regtest": 357 network = "regtest" 358 self.logger.debug("broadcasting to network: " + network) 359 success = tor_broadcast_tx(txhex, self.tor_hostport, 360 network, self.rpc, self.logger) 361 if not success: 362 result = None 363 elif broadcast_method.startswith("system "): 364 with tempfile.NamedTemporaryFile() as fd: 365 system_line = broadcast_method[7:].replace("%s", 366 fd.name) 367 fd.write(txhex.encode()) 368 fd.flush() 369 self.logger.debug("running command: " + system_line) 370 os.system(system_line) 371 else: 372 self.logger.error("Unrecognized broadcast method = " 373 + broadcast_method) 374 result = None 375 error = "Unrecognized broadcast method" 376 if result != None: 377 self._send_response(query, result) 378 else: 379 self._send_error(query["id"], error) 380 elif method == "mempool.get_fee_histogram": 381 result = self.mempool_sync.get_fee_histogram() 382 self.logger.debug("mempool entry count = " 383 + str(len(self.mempool_sync.mempool))) 384 self._send_response(query, result) 385 elif method == "blockchain.estimatefee": 386 estimate = self.rpc.call("estimatesmartfee", [query["params"][0]]) 387 feerate = 0.0001 388 if "feerate" in estimate: 389 feerate = estimate["feerate"] 390 self._send_response(query, feerate) 391 elif method == "blockchain.relayfee": 392 networkinfo = self.rpc.call("getnetworkinfo", []) 393 self._send_response(query, networkinfo["relayfee"]) 394 elif method == "server.banner": 395 networkinfo = self.rpc.call("getnetworkinfo", []) 396 blockchaininfo = self.rpc.call("getblockchaininfo", []) 397 uptime = self.rpc.call("uptime", []) 398 nettotals = self.rpc.call("getnettotals", []) 399 uptime_days = uptime / 60.0 / 60 / 24 400 first_unpruned_block_text = "" 401 if blockchaininfo["pruned"]: 402 first_unpruned_block_time = self.rpc.call("getblockheader", [ 403 self.rpc.call("getblockhash", [blockchaininfo[ 404 "pruneheight"]])])["time"] 405 first_unpruned_block_text = ("First unpruned block: " 406 + str(blockchaininfo["pruneheight"]) + " (" 407 + str( 408 datetime.datetime.fromtimestamp(first_unpruned_block_time)) 409 + ")\n") 410 self._send_response(query, BANNER.format( 411 serverversion=SERVER_VERSION_NUMBER, 412 detwallets=len(self.txmonitor.deterministic_wallets), 413 addr=len(self.txmonitor.address_history), 414 useragent=networkinfo["subversion"], 415 uptime=str(datetime.timedelta(seconds=uptime)), 416 peers=networkinfo["connections"], 417 recvbytes=bytes_fmt(nettotals["totalbytesrecv"]), 418 recvbytesperday=bytes_fmt( 419 nettotals["totalbytesrecv"]/uptime_days), 420 sentbytes=bytes_fmt(nettotals["totalbytessent"]), 421 sentbytesperday=bytes_fmt( 422 nettotals["totalbytessent"]/uptime_days), 423 blocksonly=not networkinfo["localrelay"], 424 pruning=blockchaininfo["pruned"], 425 blockchainsizeondisk=bytes_fmt( 426 blockchaininfo["size_on_disk"]), 427 firstunprunedblock=first_unpruned_block_text, 428 donationaddr=DONATION_ADDR)) 429 elif method == "server.donation_address": 430 self._send_response(query, DONATION_ADDR) 431 elif method == "server.version": 432 if len(query["params"]) > 0: 433 client_protocol_version = query["params"][1] 434 if isinstance(client_protocol_version, list): 435 client_min, client_max = float(client_min) 436 else: 437 client_min = float(query["params"][1]) 438 client_max = client_min 439 else: 440 #it seems some clients like bluewallet dont provide a version 441 #just assume the client is compatible with us then 442 client_min = SERVER_PROTOCOL_VERSION_MIN 443 client_max = SERVER_PROTOCOL_VERSION_MAX 444 self.protocol_version = min(client_max, SERVER_PROTOCOL_VERSION_MAX) 445 if self.protocol_version < max(client_min, 446 SERVER_PROTOCOL_VERSION_MIN): 447 logging.error("*** Client protocol version " + str( 448 client_protocol_version) + " not supported, update needed") 449 raise ConnectionRefusedError() 450 self._send_response(query, ["ElectrumPersonalServer " 451 + SERVER_VERSION_NUMBER, str(self.protocol_version)]) 452 elif method == "server.peers.subscribe": 453 self._send_response(query, []) #no peers to report 454 elif method == "blockchain.transaction.id_from_pos": 455 height = query["params"][0] 456 tx_pos = query["params"][1] 457 merkle = False 458 if len(query["params"]) > 2: 459 merkle = query["params"][2] 460 try: 461 blockhash = self.rpc.call("getblockhash", [height]) 462 block = self.rpc.call("getblock", [blockhash, 1]) 463 txid = block["tx"][tx_pos] 464 self.txid_blockhash_map[txid] = blockhash 465 if not merkle: 466 result = txid 467 else: 468 core_proof = self.rpc.call("gettxoutproof", [[txid], 469 blockhash]) 470 electrum_proof =\ 471 convert_core_to_electrum_merkle_proof(core_proof) 472 result = {"tx_hash": txid, "merkle": electrum_proof[ 473 "merkle"]} 474 self._send_response(query, result) 475 except JsonRpcError as e: 476 error = {"message": repr(e)} 477 self._send_error(query["id"], error) 478 else: 479 self.logger.error("*** BUG! Not handling method: " + method 480 + " query=" + str(query)) 481