commit 916071ce618482d817dc31bfecc7d825b0e5003f
parent 291b57b99a3ede685323a847fff64358a390ca18
Author: ThomasV <electrumdev@gmail.com>
Date: Wed, 29 Jul 2015 10:57:04 +0200
Merge pull request #1377 from romanz/master
interface: fix check_host_name() pattern matching
Diffstat:
2 files changed, 54 insertions(+), 25 deletions(-)
diff --git a/lib/interface.py b/lib/interface.py
@@ -119,30 +119,6 @@ class TcpInterface(threading.Thread):
queue.put((self, {'method':method, 'params':params, 'result':result, 'id':_id}))
- def check_host_name(self, peercert, name):
- """Simple certificate/host name checker. Returns True if the
- certificate matches, False otherwise."""
- # Check that the peer has supplied a certificate.
- # None/{} is not acceptable.
- if not peercert:
- return False
- if peercert.has_key("subjectAltName"):
- for typ, val in peercert["subjectAltName"]:
- if typ == "DNS" and (val == name or (val.find('*.') == 0 and name.find(val[1:]) + len(val[1:]) == len(name))):
- return True
- else:
- # Only check the subject DN if there is no subject alternative
- # name.
- cn = None
- for attr, val in peercert["subject"]:
- # Use most-specific (last) commonName attribute.
- if attr == "commonName":
- cn = val
- if cn is not None:
- return (cn == name or (cn.find('*.') == 0 and name.find(cn[1:]) + len(cn[1:]) == len(name)))
- return False
-
-
def get_simple_socket(self):
try:
l = socket.getaddrinfo(self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM)
@@ -175,7 +151,7 @@ class TcpInterface(threading.Thread):
s = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_SSLv23, cert_reqs=ssl.CERT_REQUIRED, ca_certs=ca_path, do_handshake_on_connect=True)
except ssl.SSLError, e:
s = None
- if s and self.check_host_name(s.getpeercert(), self.host):
+ if s and check_host_name(s.getpeercert(), self.host):
self.print_error("SSL certificate signed by CA")
return s
@@ -334,6 +310,35 @@ class TcpInterface(threading.Thread):
self.response_queue.put((self, None))
+def _match_hostname(name, val):
+ if val == name:
+ return True
+
+ return val.startswith('*.') and name.endswith(val[1:])
+
+
+def check_host_name(peercert, name):
+ """Simple certificate/host name checker. Returns True if the
+ certificate matches, False otherwise."""
+ # Check that the peer has supplied a certificate.
+ # None/{} is not acceptable.
+ if not peercert:
+ return False
+ if peercert.has_key("subjectAltName"):
+ for typ, val in peercert["subjectAltName"]:
+ if typ == "DNS" and _match_hostname(name, val):
+ return True
+ else:
+ # Only check the subject DN if there is no subject alternative
+ # name.
+ cn = None
+ for attr, val in peercert["subject"]:
+ # Use most-specific (last) commonName attribute.
+ if attr == "commonName":
+ cn = val
+ if cn is not None:
+ return _match_hostname(name, cn)
+ return False
def check_cert(host, cert):
diff --git a/lib/tests/test_interface.py b/lib/tests/test_interface.py
@@ -0,0 +1,24 @@
+import unittest
+
+from lib import interface
+
+
+class TestInterface(unittest.TestCase):
+
+ def test_match_host_name(self):
+ self.assertTrue(interface._match_hostname('asd.fgh.com', 'asd.fgh.com'))
+ self.assertFalse(interface._match_hostname('asd.fgh.com', 'asd.zxc.com'))
+ self.assertTrue(interface._match_hostname('asd.fgh.com', '*.fgh.com'))
+ self.assertFalse(interface._match_hostname('asd.fgh.com', '*fgh.com'))
+ self.assertFalse(interface._match_hostname('asd.fgh.com', '*.zxc.com'))
+
+ def test_check_host_name(self):
+ self.assertFalse(interface.check_host_name(None, None))
+ self.assertFalse(interface.check_host_name(
+ peercert={'subjectAltName': []}, name=''))
+ self.assertTrue(interface.check_host_name(
+ peercert={'subjectAltName': [('DNS', '*.bar.com')]},
+ name='foo.bar.com'))
+ self.assertTrue(interface.check_host_name(
+ peercert={'subject': [('commonName', '*.bar.com')]},
+ name='foo.bar.com'))