regtest.py (1834B)
1 import os 2 import sys 3 import unittest 4 import subprocess 5 6 class TestLightning(unittest.TestCase): 7 8 @staticmethod 9 def run_shell(args, timeout=30): 10 process = subprocess.Popen(['electrum/tests/regtest/regtest.sh'] + args, stderr=subprocess.STDOUT, stdout=subprocess.PIPE) 11 for line in iter(process.stdout.readline, b''): 12 sys.stdout.write(line.decode(sys.stdout.encoding)) 13 process.wait(timeout=timeout) 14 process.stdout.close() 15 assert process.returncode == 0 16 17 def setUp(self): 18 test_name = self.id().split('.')[-1] 19 sys.stdout.write("***** %s ******\n" % test_name) 20 # initialize and get funds 21 for agent in self.agents: 22 self.run_shell(['init', agent]) 23 # mine a block so that funds are confirmed 24 self.run_shell(['new_block']) 25 # extra configuration (optional) 26 self.run_shell(['configure_' + test_name]) 27 # start daemons 28 for agent in self.agents: 29 self.run_shell(['start', agent]) 30 31 def tearDown(self): 32 for agent in self.agents: 33 self.run_shell(['stop', agent]) 34 35 36 class TestLightningAB(TestLightning): 37 agents = ['alice', 'bob'] 38 39 def test_backup(self): 40 self.run_shell(['backup']) 41 42 def test_breach(self): 43 self.run_shell(['breach']) 44 45 def test_extract_preimage(self): 46 self.run_shell(['extract_preimage']) 47 48 def test_redeem_htlcs(self): 49 self.run_shell(['redeem_htlcs']) 50 51 def test_breach_with_unspent_htlc(self): 52 self.run_shell(['breach_with_unspent_htlc']) 53 54 def test_breach_with_spent_htlc(self): 55 self.run_shell(['breach_with_spent_htlc']) 56 57 58 class TestLightningABC(TestLightning): 59 agents = ['alice', 'bob', 'carol'] 60 61 def test_watchtower(self): 62 self.run_shell(['watchtower'])