x509.py (11489B)
1 #!/usr/bin/env python 2 # 3 # Electrum - lightweight Bitcoin client 4 # Copyright (C) 2014 Thomas Voegtlin 5 # 6 # Permission is hereby granted, free of charge, to any person 7 # obtaining a copy of this software and associated documentation files 8 # (the "Software"), to deal in the Software without restriction, 9 # including without limitation the rights to use, copy, modify, merge, 10 # publish, distribute, sublicense, and/or sell copies of the Software, 11 # and to permit persons to whom the Software is furnished to do so, 12 # subject to the following conditions: 13 # 14 # The above copyright notice and this permission notice shall be 15 # included in all copies or substantial portions of the Software. 16 # 17 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 18 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 19 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 20 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS 21 # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 22 # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 23 # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 24 # SOFTWARE. 25 26 import hashlib 27 import time 28 from datetime import datetime 29 30 from . import util 31 from .util import profiler, bh2u 32 from .logging import get_logger 33 34 35 _logger = get_logger(__name__) 36 37 38 # algo OIDs 39 ALGO_RSA_SHA1 = '1.2.840.113549.1.1.5' 40 ALGO_RSA_SHA256 = '1.2.840.113549.1.1.11' 41 ALGO_RSA_SHA384 = '1.2.840.113549.1.1.12' 42 ALGO_RSA_SHA512 = '1.2.840.113549.1.1.13' 43 ALGO_ECDSA_SHA256 = '1.2.840.10045.4.3.2' 44 45 # prefixes, see http://stackoverflow.com/questions/3713774/c-sharp-how-to-calculate-asn-1-der-encoding-of-a-particular-hash-algorithm 46 PREFIX_RSA_SHA256 = bytearray( 47 [0x30, 0x31, 0x30, 0x0d, 0x06, 0x09, 0x60, 0x86, 0x48, 0x01, 0x65, 0x03, 0x04, 0x02, 0x01, 0x05, 0x00, 0x04, 0x20]) 48 PREFIX_RSA_SHA384 = bytearray( 49 [0x30, 0x41, 0x30, 0x0d, 0x06, 0x09, 0x60, 0x86, 0x48, 0x01, 0x65, 0x03, 0x04, 0x02, 0x02, 0x05, 0x00, 0x04, 0x30]) 50 PREFIX_RSA_SHA512 = bytearray( 51 [0x30, 0x51, 0x30, 0x0d, 0x06, 0x09, 0x60, 0x86, 0x48, 0x01, 0x65, 0x03, 0x04, 0x02, 0x03, 0x05, 0x00, 0x04, 0x40]) 52 53 # types used in ASN1 structured data 54 ASN1_TYPES = { 55 'BOOLEAN' : 0x01, 56 'INTEGER' : 0x02, 57 'BIT STRING' : 0x03, 58 'OCTET STRING' : 0x04, 59 'NULL' : 0x05, 60 'OBJECT IDENTIFIER': 0x06, 61 'SEQUENCE' : 0x70, 62 'SET' : 0x71, 63 'PrintableString' : 0x13, 64 'IA5String' : 0x16, 65 'UTCTime' : 0x17, 66 'GeneralizedTime' : 0x18, 67 'ENUMERATED' : 0x0A, 68 'UTF8String' : 0x0C, 69 } 70 71 72 class CertificateError(Exception): 73 pass 74 75 76 # helper functions 77 def bitstr_to_bytestr(s): 78 if s[0] != 0x00: 79 raise TypeError('no padding') 80 return s[1:] 81 82 83 def bytestr_to_int(s): 84 i = 0 85 for char in s: 86 i <<= 8 87 i |= char 88 return i 89 90 91 def decode_OID(s): 92 r = [] 93 r.append(s[0] // 40) 94 r.append(s[0] % 40) 95 k = 0 96 for i in s[1:]: 97 if i < 128: 98 r.append(i + 128 * k) 99 k = 0 100 else: 101 k = (i - 128) + 128 * k 102 return '.'.join(map(str, r)) 103 104 105 def encode_OID(oid): 106 x = [int(i) for i in oid.split('.')] 107 s = chr(x[0] * 40 + x[1]) 108 for i in x[2:]: 109 ss = chr(i % 128) 110 while i > 128: 111 i //= 128 112 ss = chr(128 + i % 128) + ss 113 s += ss 114 return s 115 116 117 class ASN1_Node(bytes): 118 def get_node(self, ix): 119 # return index of first byte, first content byte and last byte. 120 first = self[ix + 1] 121 if (first & 0x80) == 0: 122 length = first 123 ixf = ix + 2 124 ixl = ixf + length - 1 125 else: 126 lengthbytes = first & 0x7F 127 length = bytestr_to_int(self[ix + 2:ix + 2 + lengthbytes]) 128 ixf = ix + 2 + lengthbytes 129 ixl = ixf + length - 1 130 return ix, ixf, ixl 131 132 def root(self): 133 return self.get_node(0) 134 135 def next_node(self, node): 136 ixs, ixf, ixl = node 137 return self.get_node(ixl + 1) 138 139 def first_child(self, node): 140 ixs, ixf, ixl = node 141 if self[ixs] & 0x20 != 0x20: 142 raise TypeError('Can only open constructed types.', hex(self[ixs])) 143 return self.get_node(ixf) 144 145 def is_child_of(node1, node2): 146 ixs, ixf, ixl = node1 147 jxs, jxf, jxl = node2 148 return ((ixf <= jxs) and (jxl <= ixl)) or ((jxf <= ixs) and (ixl <= jxl)) 149 150 def get_all(self, node): 151 # return type + length + value 152 ixs, ixf, ixl = node 153 return self[ixs:ixl + 1] 154 155 def get_value_of_type(self, node, asn1_type): 156 # verify type byte and return content 157 ixs, ixf, ixl = node 158 if ASN1_TYPES[asn1_type] != self[ixs]: 159 raise TypeError('Wrong type:', hex(self[ixs]), hex(ASN1_TYPES[asn1_type])) 160 return self[ixf:ixl + 1] 161 162 def get_value(self, node): 163 ixs, ixf, ixl = node 164 return self[ixf:ixl + 1] 165 166 def get_children(self, node): 167 nodes = [] 168 ii = self.first_child(node) 169 nodes.append(ii) 170 while ii[2] < node[2]: 171 ii = self.next_node(ii) 172 nodes.append(ii) 173 return nodes 174 175 def get_sequence(self): 176 return list(map(lambda j: self.get_value(j), self.get_children(self.root()))) 177 178 def get_dict(self, node): 179 p = {} 180 for ii in self.get_children(node): 181 for iii in self.get_children(ii): 182 iiii = self.first_child(iii) 183 oid = decode_OID(self.get_value_of_type(iiii, 'OBJECT IDENTIFIER')) 184 iiii = self.next_node(iiii) 185 value = self.get_value(iiii) 186 p[oid] = value 187 return p 188 189 def decode_time(self, ii): 190 GENERALIZED_TIMESTAMP_FMT = '%Y%m%d%H%M%SZ' 191 UTCTIME_TIMESTAMP_FMT = '%y%m%d%H%M%SZ' 192 193 try: 194 return time.strptime(self.get_value_of_type(ii, 'UTCTime').decode('ascii'), UTCTIME_TIMESTAMP_FMT) 195 except TypeError: 196 return time.strptime(self.get_value_of_type(ii, 'GeneralizedTime').decode('ascii'), GENERALIZED_TIMESTAMP_FMT) 197 198 class X509(object): 199 def __init__(self, b): 200 201 self.bytes = bytearray(b) 202 203 der = ASN1_Node(b) 204 root = der.root() 205 cert = der.first_child(root) 206 # data for signature 207 self.data = der.get_all(cert) 208 209 # optional version field 210 if der.get_value(cert)[0] == 0xa0: 211 version = der.first_child(cert) 212 serial_number = der.next_node(version) 213 else: 214 serial_number = der.first_child(cert) 215 self.serial_number = bytestr_to_int(der.get_value_of_type(serial_number, 'INTEGER')) 216 217 # signature algorithm 218 sig_algo = der.next_node(serial_number) 219 ii = der.first_child(sig_algo) 220 self.sig_algo = decode_OID(der.get_value_of_type(ii, 'OBJECT IDENTIFIER')) 221 222 # issuer 223 issuer = der.next_node(sig_algo) 224 self.issuer = der.get_dict(issuer) 225 226 # validity 227 validity = der.next_node(issuer) 228 ii = der.first_child(validity) 229 self.notBefore = der.decode_time(ii) 230 ii = der.next_node(ii) 231 self.notAfter = der.decode_time(ii) 232 233 # subject 234 subject = der.next_node(validity) 235 self.subject = der.get_dict(subject) 236 subject_pki = der.next_node(subject) 237 public_key_algo = der.first_child(subject_pki) 238 ii = der.first_child(public_key_algo) 239 self.public_key_algo = decode_OID(der.get_value_of_type(ii, 'OBJECT IDENTIFIER')) 240 241 if self.public_key_algo != '1.2.840.10045.2.1': # for non EC public key 242 # pubkey modulus and exponent 243 subject_public_key = der.next_node(public_key_algo) 244 spk = der.get_value_of_type(subject_public_key, 'BIT STRING') 245 spk = ASN1_Node(bitstr_to_bytestr(spk)) 246 r = spk.root() 247 modulus = spk.first_child(r) 248 exponent = spk.next_node(modulus) 249 rsa_n = spk.get_value_of_type(modulus, 'INTEGER') 250 rsa_e = spk.get_value_of_type(exponent, 'INTEGER') 251 self.modulus = int.from_bytes(rsa_n, byteorder='big', signed=False) 252 self.exponent = int.from_bytes(rsa_e, byteorder='big', signed=False) 253 else: 254 subject_public_key = der.next_node(public_key_algo) 255 spk = der.get_value_of_type(subject_public_key, 'BIT STRING') 256 self.ec_public_key = spk 257 258 # extensions 259 self.CA = False 260 self.AKI = None 261 self.SKI = None 262 i = subject_pki 263 while i[2] < cert[2]: 264 i = der.next_node(i) 265 d = der.get_dict(i) 266 for oid, value in d.items(): 267 value = ASN1_Node(value) 268 if oid == '2.5.29.19': 269 # Basic Constraints 270 self.CA = bool(value) 271 elif oid == '2.5.29.14': 272 # Subject Key Identifier 273 r = value.root() 274 value = value.get_value_of_type(r, 'OCTET STRING') 275 self.SKI = bh2u(value) 276 elif oid == '2.5.29.35': 277 # Authority Key Identifier 278 self.AKI = bh2u(value.get_sequence()[0]) 279 else: 280 pass 281 282 # cert signature 283 cert_sig_algo = der.next_node(cert) 284 ii = der.first_child(cert_sig_algo) 285 self.cert_sig_algo = decode_OID(der.get_value_of_type(ii, 'OBJECT IDENTIFIER')) 286 cert_sig = der.next_node(cert_sig_algo) 287 self.signature = der.get_value(cert_sig)[1:] 288 289 def get_keyID(self): 290 # http://security.stackexchange.com/questions/72077/validating-an-ssl-certificate-chain-according-to-rfc-5280-am-i-understanding-th 291 return self.SKI if self.SKI else repr(self.subject) 292 293 def get_issuer_keyID(self): 294 return self.AKI if self.AKI else repr(self.issuer) 295 296 def get_common_name(self): 297 return self.subject.get('2.5.4.3', b'unknown').decode() 298 299 def get_signature(self): 300 return self.cert_sig_algo, self.signature, self.data 301 302 def check_ca(self): 303 return self.CA 304 305 def check_date(self): 306 now = time.gmtime() 307 if self.notBefore > now: 308 raise CertificateError('Certificate has not entered its valid date range. (%s)' % self.get_common_name()) 309 if self.notAfter <= now: 310 dt = datetime.utcfromtimestamp(time.mktime(self.notAfter)) 311 raise CertificateError(f'Certificate ({self.get_common_name()}) has expired (at {dt} UTC).') 312 313 def getFingerprint(self): 314 return hashlib.sha1(self.bytes).digest() 315 316 317 @profiler 318 def load_certificates(ca_path): 319 from . import pem 320 ca_list = {} 321 ca_keyID = {} 322 # ca_path = '/tmp/tmp.txt' 323 with open(ca_path, 'r', encoding='utf-8') as f: 324 s = f.read() 325 bList = pem.dePemList(s, "CERTIFICATE") 326 for b in bList: 327 try: 328 x = X509(b) 329 x.check_date() 330 except BaseException as e: 331 # with open('/tmp/tmp.txt', 'w') as f: 332 # f.write(pem.pem(b, 'CERTIFICATE').decode('ascii')) 333 _logger.info(f"cert error: {e}") 334 continue 335 336 fp = x.getFingerprint() 337 ca_list[fp] = x 338 ca_keyID[x.get_keyID()] = fp 339 340 return ca_list, ca_keyID 341 342 343 if __name__ == "__main__": 344 import certifi 345 346 ca_path = certifi.where() 347 ca_list, ca_keyID = load_certificates(ca_path)