test_electrum_protocol.py (9449B)
1 2 import pytest 3 import logging 4 import json 5 6 from electrumpersonalserver.server import ( 7 TransactionMonitor, 8 JsonRpcError, 9 ElectrumProtocol, 10 get_block_header, 11 get_current_header, 12 get_block_headers_hex, 13 JsonRpcError, 14 get_status_electrum 15 ) 16 17 logger = logging.getLogger('ELECTRUMPERSONALSERVER-TEST') 18 logger.setLevel(logging.DEBUG) 19 20 DUMMY_JSONRPC_BLOCKCHAIN_HEIGHT = 100000 21 22 def get_dummy_hash_from_height(height): 23 if height == 0: 24 return "00"*32 25 return str(height) + "a"*(64 - len(str(height))) 26 27 def get_height_from_dummy_hash(hhash): 28 if hhash == "00"*32: 29 return 0 30 return int(hhash[:hhash.index("a")]) 31 32 class DummyJsonRpc(object): 33 def __init__(self): 34 self.calls = {} 35 self.blockchain_height = DUMMY_JSONRPC_BLOCKCHAIN_HEIGHT 36 37 def call(self, method, params): 38 if method not in self.calls: 39 self.calls[method] = [0, []] 40 self.calls[method][0] += 1 41 self.calls[method][1].append(params) 42 if method == "getbestblockhash": 43 return get_dummy_hash_from_height(self.blockchain_height) 44 elif method == "getblockhash": 45 height = params[0] 46 if height > self.blockchain_height: 47 raise JsonRpcError() 48 return get_dummy_hash_from_height(height) 49 elif method == "getblockheader": 50 blockhash = params[0] 51 height = get_height_from_dummy_hash(blockhash) 52 header = { 53 "hash": blockhash, 54 "confirmations": self.blockchain_height - height + 1, 55 "height": height, 56 "version": 536870912, 57 "versionHex": "20000000", 58 "merkleroot": "aa"*32, 59 "time": height*100, 60 "mediantime": height*100, 61 "nonce": 1, 62 "bits": "207fffff", 63 "difficulty": 4.656542373906925e-10, 64 "chainwork": "000000000000000000000000000000000000000000000" 65 + "00000000000000000da", 66 "nTx": 1, 67 } 68 if height > 1: 69 header["previousblockhash"] = get_dummy_hash_from_height( 70 height - 1) 71 elif height == 1: 72 header["previousblockhash"] = "00"*32 #genesis block 73 elif height == 0: 74 pass #no prevblock for genesis 75 else: 76 assert 0 77 if height < self.blockchain_height: 78 header["nextblockhash"] = get_dummy_hash_from_height(height + 1) 79 return header 80 elif method == "gettransaction": 81 for t in self.txlist: 82 if t["txid"] == params[0]: 83 return t 84 raise JsonRpcError() 85 else: 86 raise ValueError("unknown method in dummy jsonrpc") 87 88 def test_get_block_header(): 89 rpc = DummyJsonRpc() 90 for height in [0, 1000]: 91 for raw in [True, False]: 92 blockhash = rpc.call("getblockhash", [height]) 93 ret = get_block_header(rpc, blockhash, raw) 94 if raw: 95 assert type(ret) == dict 96 assert "hex" in ret 97 assert "height" in ret 98 assert len(ret["hex"]) == 160 99 else: 100 assert type(ret) == dict 101 assert len(ret) == 7 102 103 def test_get_current_header(): 104 rpc = DummyJsonRpc() 105 for raw in [True, False]: 106 ret = get_current_header(rpc, raw) 107 assert type(ret[0]) == str 108 assert len(ret[0]) == 64 109 if raw: 110 assert type(ret[1]) == dict 111 assert "hex" in ret[1] 112 assert "height" in ret[1] 113 assert len(ret[1]["hex"]) == 160 114 else: 115 assert type(ret[1]) == dict 116 assert len(ret[1]) == 7 117 118 @pytest.mark.parametrize( 119 "start_height, count", 120 [(100, 200), 121 (DUMMY_JSONRPC_BLOCKCHAIN_HEIGHT + 10, 5), 122 (DUMMY_JSONRPC_BLOCKCHAIN_HEIGHT - 10, 15), 123 (0, 250) 124 ] 125 ) 126 def test_get_block_headers_hex(start_height, count): 127 rpc = DummyJsonRpc() 128 ret = get_block_headers_hex(rpc, start_height, count) 129 print("start_height=" + str(start_height) + " count=" + str(count)) 130 assert len(ret) == 2 131 available_blocks = -min(0, start_height - DUMMY_JSONRPC_BLOCKCHAIN_HEIGHT 132 - 1) 133 expected_count = min(available_blocks, count) 134 assert len(ret[0]) == expected_count*80*2 #80 bytes/header, 2 chars/byte 135 assert ret[1] == expected_count 136 137 @pytest.mark.parametrize( 138 "invalid_json_query", 139 [ 140 {"valid-json-no-method": 5} 141 ] 142 ) 143 def test_invalid_json_query_line(invalid_json_query): 144 protocol = ElectrumProtocol(None, None, logger, None, None, None) 145 with pytest.raises(IOError) as e: 146 protocol.handle_query(invalid_json_query) 147 148 def create_electrum_protocol_instance(broadcast_method="own-node", 149 tor_hostport=("127.0.0.1", 9050), 150 disable_mempool_fee_histogram=False): 151 protocol = ElectrumProtocol(DummyJsonRpc(), DummyTransactionMonitor(), 152 logger, broadcast_method, tor_hostport, disable_mempool_fee_histogram) 153 sent_replies = [] 154 protocol.set_send_reply_fun(lambda l: sent_replies.append(l)) 155 assert len(sent_replies) == 0 156 return protocol, sent_replies 157 158 def dummy_script_hash_to_history(scrhash): 159 index = int(scrhash[:scrhash.index("s")]) 160 tx_count = (index+2) % 5 161 height = 500 162 return [(index_to_dummy_txid(i), height) for i in range(tx_count)] 163 164 def index_to_dummy_script_hash(index): 165 return str(index) + "s"*(64 - len(str(index))) 166 167 def index_to_dummy_txid(index): 168 return str(index) + "t"*(64 - len(str(index))) 169 170 def dummy_txid_to_dummy_tx(txid): 171 return txid[::-1] * 6 172 173 class DummyTransactionMonitor(object): 174 def __init__(self): 175 self.deterministic_wallets = list(range(5)) 176 self.address_history = list(range(5)) 177 self.subscribed_addresses = [] 178 self.history_hashes = {} 179 180 def get_electrum_history_hash(self, scrhash): 181 history = dummy_script_hash_to_history(scrhash) 182 hhash = get_status_electrum(history) 183 self.history_hashes[scrhash] = history 184 return hhash 185 186 def get_electrum_history(self, scrhash): 187 return self.history_hashes[scrhash] 188 189 def unsubscribe_all_addresses(self): 190 self.subscribed_addresses = [] 191 192 def subscribe_address(self, scrhash): 193 self.subscribed_addresses.append(scrhash) 194 return True 195 196 def get_address_balance(self, scrhash): 197 pass 198 199 def test_script_hash_sync(): 200 protocol, sent_replies = create_electrum_protocol_instance() 201 scrhash_index = 0 202 scrhash = index_to_dummy_script_hash(scrhash_index) 203 protocol.handle_query({"method": "blockchain.scripthash.subscribe", 204 "params": [scrhash], "id": 0}) 205 assert len(sent_replies) == 1 206 assert len(protocol.txmonitor.subscribed_addresses) == 1 207 assert protocol.txmonitor.subscribed_addresses[0] == scrhash 208 assert len(sent_replies) == 1 209 assert len(sent_replies[0]["result"]) == 64 210 history_hash = sent_replies[0]["result"] 211 212 protocol.handle_query({"method": "blockchain.scripthash.get_history", 213 "params": [scrhash], "id": 0}) 214 assert len(sent_replies) == 2 215 assert get_status_electrum(sent_replies[1]["result"]) == history_hash 216 217 #updated scripthash but actually nothing changed, history_hash unchanged 218 protocol.on_updated_scripthashes([scrhash]) 219 assert len(sent_replies) == 3 220 assert sent_replies[2]["method"] == "blockchain.scripthash.subscribe" 221 assert sent_replies[2]["params"][0] == scrhash 222 assert sent_replies[2]["params"][1] == history_hash 223 224 protocol.on_disconnect() 225 assert len(protocol.txmonitor.subscribed_addresses) == 0 226 227 def test_headers_subscribe(): 228 protocol, sent_replies = create_electrum_protocol_instance() 229 230 protocol.handle_query({"method": "server.version", "params": ["test-code", 231 1.4], "id": 0}) #protocol version of 1.4 means only raw headers used 232 assert len(sent_replies) == 1 233 234 protocol.handle_query({"method": "blockchain.headers.subscribe", "params": 235 [], "id": 0}) 236 assert len(sent_replies) == 2 237 assert "height" in sent_replies[1]["result"] 238 assert sent_replies[1]["result"]["height"] == protocol.rpc.blockchain_height 239 assert "hex" in sent_replies[1]["result"] 240 assert len(sent_replies[1]["result"]["hex"]) == 80*2 #80 b/header, 2 b/char 241 242 protocol.rpc.blockchain_height += 1 243 new_bestblockhash, header = get_current_header(protocol.rpc, 244 protocol.are_headers_raw) 245 protocol.on_blockchain_tip_updated(header) 246 assert len(sent_replies) == 3 247 assert "method" in sent_replies[2] 248 assert sent_replies[2]["method"] == "blockchain.headers.subscribe" 249 assert "params" in sent_replies[2] 250 assert "height" in sent_replies[2]["params"][0] 251 assert sent_replies[2]["params"][0]["height"]\ 252 == protocol.rpc.blockchain_height 253 assert "hex" in sent_replies[2]["params"][0] 254 assert len(sent_replies[2]["params"][0]["hex"]) == 80*2 #80 b/header, 2 b/c 255 256 def test_server_ping(): 257 protocol, sent_replies = create_electrum_protocol_instance() 258 idd = 1 259 protocol.handle_query({"method": "server.ping", "id": idd}) 260 assert len(sent_replies) == 1 261 assert sent_replies[0]["result"] == None 262 assert sent_replies[0]["id"] == idd 263 264 #test scripthash.subscribe, scripthash.get_history transaction.get 265 # transaction.get_merkle 266