test_lntransport.py (3302B)
1 import asyncio 2 3 from electrum.ecc import ECPrivkey 4 from electrum.lnutil import LNPeerAddr 5 from electrum.lntransport import LNResponderTransport, LNTransport 6 7 from . import ElectrumTestCase 8 from .test_bitcoin import needs_test_with_all_chacha20_implementations 9 10 11 class TestLNTransport(ElectrumTestCase): 12 13 @needs_test_with_all_chacha20_implementations 14 def test_responder(self): 15 # local static 16 ls_priv=bytes.fromhex('2121212121212121212121212121212121212121212121212121212121212121') 17 # ephemeral 18 e_priv=bytes.fromhex('2222222222222222222222222222222222222222222222222222222222222222') 19 20 class Writer: 21 def __init__(self): 22 self.state = 0 23 def write(self, data): 24 assert self.state == 0 25 self.state += 1 26 assert len(data) == 50 27 class Reader: 28 def __init__(self): 29 self.state = 0 30 async def read(self, num_bytes): 31 assert self.state in (0, 1) 32 self.state += 1 33 if self.state-1 == 0: 34 assert num_bytes == 50 35 return bytes.fromhex('00036360e856310ce5d294e8be33fc807077dc56ac80d95d9cd4ddbd21325eff73f70df6086551151f58b8afe6c195782c6a') 36 elif self.state-1 == 1: 37 assert num_bytes == 66 38 return bytes.fromhex('00b9e3a702e93e3a9948c2ed6e5fd7590a6e1c3a0344cfc9d5b57357049aa22355361aa02e55a8fc28fef5bd6d71ad0c38228dc68b1c466263b47fdf31e560e139ba') 39 transport = LNResponderTransport(ls_priv, Reader(), Writer()) 40 asyncio.get_event_loop().run_until_complete(transport.handshake(epriv=e_priv)) 41 42 @needs_test_with_all_chacha20_implementations 43 def test_loop(self): 44 loop = asyncio.get_event_loop() 45 responder_shaked = asyncio.Event() 46 server_shaked = asyncio.Event() 47 responder_key = ECPrivkey.generate_random_key() 48 initiator_key = ECPrivkey.generate_random_key() 49 async def cb(reader, writer): 50 t = LNResponderTransport(responder_key.get_secret_bytes(), reader, writer) 51 self.assertEqual(await t.handshake(), initiator_key.get_public_key_bytes()) 52 t.send_bytes(b'hello from server') 53 self.assertEqual(await t.read_messages().__anext__(), b'hello from client') 54 responder_shaked.set() 55 server_future = asyncio.ensure_future(asyncio.start_server(cb, '127.0.0.1', 42898)) 56 loop.run_until_complete(server_future) 57 server = server_future.result() # type: asyncio.Server 58 async def connect(): 59 peer_addr = LNPeerAddr('127.0.0.1', 42898, responder_key.get_public_key_bytes()) 60 t = LNTransport(initiator_key.get_secret_bytes(), peer_addr, proxy=None) 61 await t.handshake() 62 t.send_bytes(b'hello from client') 63 self.assertEqual(await t.read_messages().__anext__(), b'hello from server') 64 server_shaked.set() 65 66 try: 67 connect_future = asyncio.ensure_future(connect()) 68 loop.run_until_complete(responder_shaked.wait()) 69 loop.run_until_complete(server_shaked.wait()) 70 finally: 71 server.close() 72 loop.run_until_complete(server.wait_closed())