electrum-personal-server

Maximally lightweight electrum server for a single user
git clone https://git.parazyd.org/electrum-personal-server
Log | Files | Refs | README

test_electrum_protocol.py (9449B)


      1 
      2 import pytest
      3 import logging
      4 import json
      5 
      6 from electrumpersonalserver.server import (
      7     TransactionMonitor,
      8     JsonRpcError,
      9     ElectrumProtocol,
     10     get_block_header,
     11     get_current_header,
     12     get_block_headers_hex,
     13     JsonRpcError,
     14     get_status_electrum
     15 )
     16 
     17 logger = logging.getLogger('ELECTRUMPERSONALSERVER-TEST')
     18 logger.setLevel(logging.DEBUG)
     19 
     20 DUMMY_JSONRPC_BLOCKCHAIN_HEIGHT = 100000
     21 
     22 def get_dummy_hash_from_height(height):
     23     if height == 0:
     24         return "00"*32
     25     return str(height) + "a"*(64 - len(str(height)))
     26 
     27 def get_height_from_dummy_hash(hhash):
     28     if hhash == "00"*32:
     29         return 0
     30     return int(hhash[:hhash.index("a")])
     31 
     32 class DummyJsonRpc(object):
     33     def __init__(self):
     34         self.calls = {}
     35         self.blockchain_height = DUMMY_JSONRPC_BLOCKCHAIN_HEIGHT
     36 
     37     def call(self, method, params):
     38         if method not in self.calls:
     39             self.calls[method] = [0, []]
     40         self.calls[method][0] += 1
     41         self.calls[method][1].append(params)
     42         if method == "getbestblockhash":
     43             return get_dummy_hash_from_height(self.blockchain_height)
     44         elif method == "getblockhash":
     45             height = params[0]
     46             if height > self.blockchain_height:
     47                 raise JsonRpcError()
     48             return get_dummy_hash_from_height(height)
     49         elif method == "getblockheader":
     50             blockhash = params[0]
     51             height = get_height_from_dummy_hash(blockhash)
     52             header = {
     53                 "hash": blockhash,
     54                 "confirmations": self.blockchain_height - height + 1,
     55                 "height": height,
     56                 "version": 536870912,
     57                 "versionHex": "20000000",
     58                 "merkleroot": "aa"*32,
     59                 "time": height*100,
     60                 "mediantime": height*100,
     61                 "nonce": 1,
     62                 "bits": "207fffff",
     63                 "difficulty": 4.656542373906925e-10,
     64                 "chainwork": "000000000000000000000000000000000000000000000"
     65                     + "00000000000000000da",
     66                 "nTx": 1,
     67             }
     68             if height > 1:
     69                 header["previousblockhash"] = get_dummy_hash_from_height(
     70                     height - 1)
     71             elif height == 1:
     72                 header["previousblockhash"] = "00"*32 #genesis block
     73             elif height == 0:
     74                 pass #no prevblock for genesis
     75             else:
     76                 assert 0
     77             if height < self.blockchain_height:
     78                 header["nextblockhash"] = get_dummy_hash_from_height(height + 1)
     79             return header
     80         elif method == "gettransaction":
     81             for t in self.txlist:
     82                 if t["txid"] == params[0]:
     83                     return t
     84             raise JsonRpcError()
     85         else:
     86             raise ValueError("unknown method in dummy jsonrpc")
     87 
     88 def test_get_block_header():
     89     rpc = DummyJsonRpc()
     90     for height in [0, 1000]:
     91         for raw in [True, False]:
     92             blockhash = rpc.call("getblockhash", [height])
     93             ret = get_block_header(rpc, blockhash, raw)
     94             if raw:
     95                 assert type(ret) == dict
     96                 assert "hex" in ret
     97                 assert "height" in ret
     98                 assert len(ret["hex"]) == 160
     99             else:
    100                 assert type(ret) == dict
    101                 assert len(ret) == 7
    102 
    103 def test_get_current_header():
    104     rpc = DummyJsonRpc()
    105     for raw in [True, False]:
    106         ret = get_current_header(rpc, raw)
    107         assert type(ret[0]) == str
    108         assert len(ret[0]) == 64
    109         if raw:
    110             assert type(ret[1]) == dict
    111             assert "hex" in ret[1]
    112             assert "height" in ret[1]
    113             assert len(ret[1]["hex"]) == 160
    114         else:
    115             assert type(ret[1]) == dict
    116             assert len(ret[1]) == 7
    117 
    118 @pytest.mark.parametrize(
    119     "start_height, count",
    120     [(100, 200),
    121     (DUMMY_JSONRPC_BLOCKCHAIN_HEIGHT + 10, 5),
    122     (DUMMY_JSONRPC_BLOCKCHAIN_HEIGHT - 10, 15),
    123     (0, 250)
    124     ]
    125 )
    126 def test_get_block_headers_hex(start_height, count):
    127     rpc = DummyJsonRpc()
    128     ret = get_block_headers_hex(rpc, start_height, count)
    129     print("start_height=" + str(start_height) + " count=" + str(count))
    130     assert len(ret) == 2
    131     available_blocks = -min(0, start_height - DUMMY_JSONRPC_BLOCKCHAIN_HEIGHT
    132         - 1)
    133     expected_count = min(available_blocks, count)
    134     assert len(ret[0]) == expected_count*80*2 #80 bytes/header, 2 chars/byte
    135     assert ret[1] == expected_count
    136 
    137 @pytest.mark.parametrize(
    138     "invalid_json_query",
    139     [
    140         {"valid-json-no-method": 5}
    141     ]
    142 ) 
    143 def test_invalid_json_query_line(invalid_json_query):
    144     protocol = ElectrumProtocol(None, None, logger, None, None, None)
    145     with pytest.raises(IOError) as e:
    146         protocol.handle_query(invalid_json_query)
    147 
    148 def create_electrum_protocol_instance(broadcast_method="own-node",
    149         tor_hostport=("127.0.0.1", 9050),
    150         disable_mempool_fee_histogram=False):
    151     protocol = ElectrumProtocol(DummyJsonRpc(), DummyTransactionMonitor(),
    152         logger, broadcast_method, tor_hostport, disable_mempool_fee_histogram)
    153     sent_replies = []
    154     protocol.set_send_reply_fun(lambda l: sent_replies.append(l))
    155     assert len(sent_replies) == 0
    156     return protocol, sent_replies
    157 
    158 def dummy_script_hash_to_history(scrhash):
    159     index = int(scrhash[:scrhash.index("s")])
    160     tx_count = (index+2) % 5
    161     height = 500
    162     return [(index_to_dummy_txid(i), height) for i in range(tx_count)]
    163 
    164 def index_to_dummy_script_hash(index):
    165     return str(index) + "s"*(64 - len(str(index)))
    166 
    167 def index_to_dummy_txid(index):
    168     return str(index) + "t"*(64 - len(str(index)))
    169 
    170 def dummy_txid_to_dummy_tx(txid):
    171     return txid[::-1] * 6
    172 
    173 class DummyTransactionMonitor(object):
    174     def __init__(self):
    175         self.deterministic_wallets = list(range(5))
    176         self.address_history = list(range(5))
    177         self.subscribed_addresses = []
    178         self.history_hashes = {}
    179 
    180     def get_electrum_history_hash(self, scrhash):
    181         history = dummy_script_hash_to_history(scrhash)
    182         hhash = get_status_electrum(history)
    183         self.history_hashes[scrhash] = history
    184         return hhash
    185 
    186     def get_electrum_history(self, scrhash):
    187         return self.history_hashes[scrhash]
    188 
    189     def unsubscribe_all_addresses(self):
    190         self.subscribed_addresses = []
    191 
    192     def subscribe_address(self, scrhash):
    193         self.subscribed_addresses.append(scrhash)
    194         return True
    195 
    196     def get_address_balance(self, scrhash):
    197         pass
    198 
    199 def test_script_hash_sync():
    200     protocol, sent_replies = create_electrum_protocol_instance()
    201     scrhash_index = 0
    202     scrhash = index_to_dummy_script_hash(scrhash_index)
    203     protocol.handle_query({"method": "blockchain.scripthash.subscribe",
    204         "params": [scrhash], "id": 0})
    205     assert len(sent_replies) == 1
    206     assert len(protocol.txmonitor.subscribed_addresses) == 1
    207     assert protocol.txmonitor.subscribed_addresses[0] == scrhash
    208     assert len(sent_replies) == 1
    209     assert len(sent_replies[0]["result"]) == 64
    210     history_hash = sent_replies[0]["result"]
    211 
    212     protocol.handle_query({"method": "blockchain.scripthash.get_history",
    213         "params": [scrhash], "id": 0})
    214     assert len(sent_replies) == 2
    215     assert get_status_electrum(sent_replies[1]["result"]) == history_hash
    216 
    217     #updated scripthash but actually nothing changed, history_hash unchanged
    218     protocol.on_updated_scripthashes([scrhash])
    219     assert len(sent_replies) == 3
    220     assert sent_replies[2]["method"] == "blockchain.scripthash.subscribe"
    221     assert sent_replies[2]["params"][0] == scrhash
    222     assert sent_replies[2]["params"][1] == history_hash
    223 
    224     protocol.on_disconnect()
    225     assert len(protocol.txmonitor.subscribed_addresses) == 0
    226 
    227 def test_headers_subscribe():
    228     protocol, sent_replies = create_electrum_protocol_instance()
    229 
    230     protocol.handle_query({"method": "server.version", "params": ["test-code",
    231         1.4], "id": 0}) #protocol version of 1.4 means only raw headers used
    232     assert len(sent_replies) == 1
    233 
    234     protocol.handle_query({"method": "blockchain.headers.subscribe", "params":
    235         [], "id": 0})
    236     assert len(sent_replies) == 2
    237     assert "height" in sent_replies[1]["result"]
    238     assert sent_replies[1]["result"]["height"] == protocol.rpc.blockchain_height
    239     assert "hex" in sent_replies[1]["result"]
    240     assert len(sent_replies[1]["result"]["hex"]) == 80*2 #80 b/header, 2 b/char
    241 
    242     protocol.rpc.blockchain_height += 1
    243     new_bestblockhash, header = get_current_header(protocol.rpc,
    244         protocol.are_headers_raw)
    245     protocol.on_blockchain_tip_updated(header)
    246     assert len(sent_replies) == 3
    247     assert "method" in sent_replies[2]
    248     assert sent_replies[2]["method"] == "blockchain.headers.subscribe"
    249     assert "params" in sent_replies[2]
    250     assert "height" in sent_replies[2]["params"][0]
    251     assert sent_replies[2]["params"][0]["height"]\
    252         == protocol.rpc.blockchain_height
    253     assert "hex" in sent_replies[2]["params"][0]
    254     assert len(sent_replies[2]["params"][0]["hex"]) == 80*2 #80 b/header, 2 b/c
    255 
    256 def test_server_ping():
    257     protocol, sent_replies = create_electrum_protocol_instance()
    258     idd = 1
    259     protocol.handle_query({"method": "server.ping", "id": idd})
    260     assert len(sent_replies) == 1
    261     assert sent_replies[0]["result"] == None
    262     assert sent_replies[0]["id"] == idd
    263 
    264 #test scripthash.subscribe, scripthash.get_history transaction.get
    265 # transaction.get_merkle
    266