commit 094ce2e4b391c187ba38b3706fbe8b0340790605
parent 538846ee0ba8aac60330d08723fdb77e65d3d93f
Author: ThomasV <thomasv@gitorious>
Date: Wed, 8 Jul 2015 19:20:54 +0200
add DNSSEC chain validation
Diffstat:
M | lib/contacts.py | | | 62 | ++++++++++++++++---------------------------------------------- |
A | lib/dnssec.py | | | 123 | +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ |
2 files changed, 139 insertions(+), 46 deletions(-)
diff --git a/lib/contacts.py b/lib/contacts.py
@@ -1,12 +1,13 @@
import sys
import re
import dns
-import traceback
import bitcoin
+import dnssec
from util import StoreDict, print_error
from i18n import _
+
class Contacts(StoreDict):
def __init__(self, config):
@@ -18,7 +19,6 @@ class Contacts(StoreDict):
'address': k,
'type': 'address'
}
-
if k in self.keys():
_type, addr = self[k]
if _type == 'address':
@@ -26,62 +26,32 @@ class Contacts(StoreDict):
'address': addr,
'type': 'contact'
}
-
out = self.resolve_openalias(k)
if out:
- address, name = out
- validated = False
+ address, name, validated = out
return {
'address': address,
'name': name,
'type': 'openalias',
'validated': validated
}
-
raise Exception("Invalid Bitcoin address or alias", k)
def resolve_openalias(self, url):
- '''Resolve OpenAlias address using url.'''
- print_error('[OA] Attempting to resolve OpenAlias data for ' + url)
-
- url = url.replace('@', '.') # support email-style addresses, per the OA standard
+ # support email-style addresses, per the OA standard
+ url = url.replace('@', '.')
+ records, validated = dnssec.query(url, dns.rdatatype.TXT)
prefix = 'btc'
- retries = 3
- err = None
- for i in range(0, retries):
- try:
- resolver = dns.resolver.Resolver()
- resolver.timeout = 2.0
- resolver.lifetime = 4.0
- records = resolver.query(url, dns.rdatatype.TXT)
- for record in records:
- string = record.strings[0]
- if string.startswith('oa1:' + prefix):
- address = self.find_regex(string, r'recipient_address=([A-Za-z0-9]+)')
- name = self.find_regex(string, r'recipient_name=([^;]+)')
- if not name:
- name = address
- if not address:
- continue
- return (address, name)
- err = _('No OpenAlias record found.')
- break
- except dns.resolver.NXDOMAIN:
- err = _('No such domain.')
- continue
- except dns.resolver.Timeout:
- err = _('Timed out while resolving.')
- continue
- except DNSException:
- err = _('Unhandled exception.')
- continue
- except Exception, e:
- err = _('Unexpected error: ' + str(e))
- continue
- break
- if err:
- print_error(err)
- return 0
+ for record in records:
+ string = record.strings[0]
+ if string.startswith('oa1:' + prefix):
+ address = self.find_regex(string, r'recipient_address=([A-Za-z0-9]+)')
+ name = self.find_regex(string, r'recipient_name=([^;]+)')
+ if not name:
+ name = address
+ if not address:
+ continue
+ return address, name, validated
def find_regex(self, haystack, needle):
regex = re.compile(needle)
diff --git a/lib/dnssec.py b/lib/dnssec.py
@@ -0,0 +1,123 @@
+#!/usr/bin/env python
+#
+# Electrum - lightweight Bitcoin client
+# Copyright (C) 2015 Thomas Voegtlin
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+
+# Check DNSSEC trust chain.
+# Todo: verify expiration dates
+#
+# Based on
+# http://backreference.org/2010/11/17/dnssec-verification-with-dig/
+# https://github.com/rthalley/dnspython/blob/master/tests/test_dnssec.py
+
+
+import traceback
+import sys
+
+import dns.name
+import dns.query
+import dns.dnssec
+import dns.message
+import dns.resolver
+import dns.rdatatype
+import dns.rdtypes.ANY.NS
+import dns.rdtypes.ANY.CNAME
+import dns.rdtypes.ANY.DLV
+import dns.rdtypes.ANY.DNSKEY
+import dns.rdtypes.ANY.DS
+import dns.rdtypes.ANY.NSEC
+import dns.rdtypes.ANY.NSEC3
+import dns.rdtypes.ANY.NSEC3PARAM
+import dns.rdtypes.ANY.RRSIG
+import dns.rdtypes.ANY.SOA
+import dns.rdtypes.ANY.TXT
+import dns.rdtypes.IN.A
+import dns.rdtypes.IN.AAAA
+from dns.exception import DNSException
+
+
+from util import print_error
+
+
+# hard-coded root KSK
+root_KSK = dns.rrset.from_text('.', 15202, 'IN', 'DNSKEY', '257 3 8 AwEAAagAIKlVZrpC6Ia7gEzahOR+9W29euxhJhVVLOyQbSEW0O8gcCjF FVQUTf6v58fLjwBd0YI0EzrAcQqBGCzh/RStIoO8g0NfnfL2MTJRkxoX bfDaUeVPQuYEhg37NZWAJQ9VnMVDxP/VHL496M/QZxkjf5/Efucp2gaD X6RS6CXpoY68LsvPVjR0ZSwzz1apAzvN9dlzEheX7ICJBBtuA6G3LQpz W5hOA2hzCTMjJPJ8LbqF6dsV6DoBQzgul0sGIcGOYl7OyQdXfZ57relS Qageu+ipAdTTJ25AsRTAoub8ONGcLmqrAmRLKBP1dfwhYB4N7knNnulq QxA+Uk1ihz0=')
+
+
+
+def check_query(ns, sub, _type, keys):
+ q = dns.message.make_query(sub, _type, want_dnssec=True)
+ response = dns.query.tcp(q, ns, timeout=5)
+ assert response.rcode() == 0, 'No answer'
+ answer = response.answer
+ assert len(answer) == 2, 'No DNSSEC record found'
+ if answer[0].rdtype == dns.rdatatype.RRSIG:
+ rrsig, rrset = answer
+ else:
+ rrset, rrsig = answer
+ if keys is None:
+ keys = {dns.name.from_text(sub):rrset}
+ dns.dnssec.validate(rrset, rrsig, keys)
+ return rrset
+
+
+def get_and_validate(ns, url, _type):
+ # get trusted root keys
+ root_rrset = check_query(ns, '', dns.rdatatype.DNSKEY, {dns.name.root: root_KSK})
+ keys = {dns.name.root: root_rrset}
+ # top-down verification
+ parts = url.split('.')
+ for i in range(len(parts), 0, -1):
+ sub = '.'.join(parts[i-1:])
+ name = dns.name.from_text(sub)
+ # get DNSKEY (self-signed)
+ rrset = check_query(ns, sub, dns.rdatatype.DNSKEY, None)
+ # get DS (signed by parent)
+ ds_rrset = check_query(ns, sub, dns.rdatatype.DS, keys)
+ # verify that a signed DS validates DNSKEY
+ for ds in ds_rrset:
+ for dnskey in rrset:
+ good_ds = dns.dnssec.make_ds(name, dnskey, 'SHA256')
+ if ds == good_ds:
+ break
+ else:
+ continue
+ break
+ else:
+ print ds_rrset
+ raise BaseException("DS does not match DNSKEY")
+ # set key for next iteration
+ keys = {name: rrset}
+ # get TXT record (signed by zone)
+ rrset = check_query(ns, url, _type, keys)
+ return rrset
+
+
+def query(url, rtype):
+ resolver = dns.resolver.get_default_resolver()
+ # 8.8.8.8 is Google's public DNS server
+ resolver.nameservers = ['8.8.8.8']
+ ns = resolver.nameservers[0]
+ try:
+ out = get_and_validate(ns, url, rtype)
+ validated = True
+ except BaseException as e:
+ #traceback.print_exc(file=sys.stderr)
+ print_error("DNSSEC error:", str(e))
+ out = resolver.query(url, rtype)
+ validated = False
+ return out, validated