From 54ff5d65f24c2a3849ee1c5bd24efd889954a8ba Mon Sep 17 00:00:00 2001 From: lare Date: Sat, 11 Feb 2023 13:36:48 +0100 Subject: [PATCH] a few hours of work --- validate-my-dns.py | 366 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 366 insertions(+) create mode 100644 validate-my-dns.py diff --git a/validate-my-dns.py b/validate-my-dns.py new file mode 100644 index 0000000..40f1c90 --- /dev/null +++ b/validate-my-dns.py @@ -0,0 +1,366 @@ +#!/usr/bin/env python3 + +import hashlib +import base64 +import struct +import sys +import os +import subprocess +import json + +REGISTRY_PATH = "../registry" + +# step1: + + +def get_domain_by_mntner(mntner): + """get a list of domains (and reverse ipv4/6) if a nserver is specified""" + # grep for the given mntner in the dns,inetnum,inet6num directory of the registry and split it into a list; replace // with / in case REGISTRY_PATH ends with / + dns_files = subprocess.Popen(["grep", "-Ril", f" {mntner}", f"{REGISTRY_PATH}/data/dns/"], + stdout=subprocess.PIPE).communicate()[0].decode().replace("//", "/").split("\n")[:-1] + inetnums_files = subprocess.Popen(["grep", "-Ril", f" {mntner}", f"{REGISTRY_PATH}/data/inetnum/"], + stdout=subprocess.PIPE).communicate()[0].decode().replace("//", "/").split("\n")[:-1] + inet6nums_files = subprocess.Popen( + ["grep", "-Ril", f" {mntner}", f"{REGISTRY_PATH}/data/inet6num/"], stdout=subprocess.PIPE).communicate()[0].decode().split("\n")[:-1] + + # domains dict containing dns objects and inet(6)nums if they have nserver specified + domains = {} + # read dns files + for domain in dns_files: + with open(domain) as d: + domain_name = domain.split("/")[-1] + # a dictionary for each domain with "nserver": {"ns1.domain.dn42": ["ns1 ipv4", "ns1 ipv6"], ...}, "ds-rdata": ["123 45 67 ...", "98 7 65 ..."] + domains[domain_name] = {"nserver": {}, "ds-rdata": []} + for line in d.readlines(): + line = line.replace("\n", "") + if line.startswith("nserver"): + nserver = line[20:].split(" ") + # handle edge case where + if "\t" in nserver[0]: + nserver = nserver[0].split("\t") + # ignore registry-sync nservers + if "registry-sync.dn42" in nserver[0]: + break + # nserver should be defined in an other dns file + if len(nserver) == 1: + domains[domain_name]["nserver"][nserver[0]] = None + # nserver is defined in this file + elif len(nserver) == 2: + if nserver[0] not in domains[domain_name]["nserver"]: + domains[domain_name]["nserver"][nserver[0]] = [nserver[1]] + else: + domains[domain_name]["nserver"][nserver[0]].append(nserver[1]) + + elif line.startswith("ds-rdata:"): + domains[domain_name]["ds-rdata"].append(line[20:]) + # load inetnums + for inetnum in inetnums_files: + # temp variables in case there is no nserver + _nserver = {} + _ds_rdata = [] + _domain_name = "" + with open(inetnum) as i4: + for line in i4.readlines(): + line = line.replace("\n", "") + if line.startswith("cidr"): + line = line[20:] + _domain_name = ".".join(line.split(".")[::-1]) + ".in-addr.arpa" + elif line.startswith("nserver"): + nserver = line[20:].split(" ") + # handle edge case where + if "\t" in nserver[0]: + nserver = nserver[0].split("\t") + # ignore registry-sync nservers + if "registry-sync.dn42" in nserver[0]: + break + # nserver should be defined in an other dns file + if len(nserver) == 1: + _nserver[nserver[0]] = None + # nserver is defined in this file + elif len(nserver) == 2: + if nserver[0] not in _nserver: + _nserver[nserver[0]] = [nserver[1]] + else: + _nserver[nserver[0]].append(nserver[1]) + + elif line.startswith("ds-rdata:"): + _ds_rdata.append(line[20:]) + # if nserver list is not empty add the reverse to the domain list + if not _nserver == {}: + domains[_domain_name] = {"nserver": _nserver, "ds-rdata":_ds_rdata} + # load inet6nums + for inet6num in inet6nums_files: + # temp variables in case there is no nserver + _nserver = {} + _ds_rdata = [] + _domain_name = "" + with open(inet6num) as i6: + for line in i6.readlines(): + line = line.replace("\n", "") + if line.startswith("inet6num"): + line = line[20:] + + # generate the reverse ipv6 + _domain_name = "ip6.arpa" + _lowest, _highest = line.replace(":","").split(" - ") + for _digit1, _digit2 in zip(_lowest,_highest): + if _digit1 != _digit2: + break + _domain_name = _digit1 + "." + _domain_name + elif line.startswith("nserver"): + nserver = line[20:].split(" ") + + # handle edge case where + if "\t" in nserver[0]: + nserver = nserver[0].split("\t") + # ignore registry-sync nservers + if "registry-sync.dn42" in nserver[0]: + break + + # nserver should be defined in an other dns file + if len(nserver) == 1: + _nserver[nserver[0]] = None + # nserver is defined in this file + elif len(nserver) == 2: + if nserver[0] not in _nserver: + _nserver[nserver[0]] = [nserver[1]] + else: + _nserver[nserver[0]].append(nserver[1]) + + elif line.startswith("ds-rdata:"): + _ds_rdata.append(line[20:]) + # if nserver list is not empty add the reverse to the domain list + if not _nserver == {}: + domains[_domain_name] = {"nserver": _nserver, "ds-rdata":_ds_rdata} + + # add entries from main domain, if the nserver doesn't have an ip address (like in inet(6)nums) + for domain in domains: + for nserver in domains[domain]["nserver"]: + # if the nserver isn't specified: ... + if domains[domain]["nserver"][nserver] == None: + #print(f"INFO: the nserver {nserver} isn't specified in {domain}, looking into the parent domain of it") + for i in range(len(nserver.split(".")), 1, -1): + # check if the nserver is already in loaded database, starts with more specific + if ".".join(nserver.split(".")[-i:]) in domains: + try: + domains[domain]["nserver"][nserver] = domains[".".join(nserver.split(".")[-i:])]["nserver"][nserver] + except KeyError: + # reaches here if the domain for the nserver specified in the inet{6}num/domain is found, but the nserver itself not. + print(f"Warn: the nserver {nserver} specified in {domain} wasn't found") + break + + + + return domains + +def get_dnskey(domain, nserver=None): + """query dns server for DNSKEY""" + + if not nserver: + # drill: use DNSSEC, fallback to tcp, for $domain + drill_cmd = subprocess.Popen( + ["drill", "-D", "-a", "DNSKEY", domain], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + else: + # drill: use DNSSEC, fallback to tcp, ask $nserver for $domain + drill_cmd = subprocess.Popen( + ["drill", "-D", "-a", "DNSKEY", f"@{nserver}", domain], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + # get the output of the drill command + lines = drill_cmd.communicate()[0].decode().split("\n") + # filter out comments (;;) and empty lines + new_lines = [] + for line in lines: + if line == "" or line.startswith(";;"): + continue + new_lines.append(line) + lines = new_lines + # split records into domain and key + dnskeys = [[record.split("\t")[0], record.split("\t")[-1]] + for record in lines] + #print(len(dnskeys)) + return dnskeys +# end_step1 + + +# step 2: +""" +Generate a DNSSEC DS record based on the incoming DNSKEY record +The DNSKEY can be found using for example 'dig': +$ dig DNSKEY secure.widodh.nl +The output can then be parsed with the following code to generate a DS record +for in the parent DNS zone +Author: Wido den Hollander +Many thanks to this blogpost: https://www.v13.gr/blog/?p=239 +""" + + +def _calc_keyid(flags, protocol, algorithm, dnskey): + st = struct.pack('!HBB', int(flags), int(protocol), int(algorithm)) + st += base64.b64decode(dnskey) + + cnt = 0 + for idx in range(len(st)): + s = struct.unpack('B', st[idx:idx+1])[0] + if (idx % 2) == 0: + cnt += s << 8 + else: + cnt += s + + return ((cnt & 0xFFFF) + (cnt >> 16)) & 0xFFFF + + +def _calc_ds(domain, flags, protocol, algorithm, dnskey): + if domain.endswith('.') is False: + domain += '.' + + signature = bytes() + for i in domain.split('.'): + signature += struct.pack('B', len(i)) + i.encode() + + signature += struct.pack('!HBB', int(flags), int(protocol), int(algorithm)) + signature += base64.b64decode(dnskey) + + return { + 'sha1': hashlib.sha1(signature).hexdigest().upper(), + 'sha256': hashlib.sha256(signature).hexdigest().upper(), + } + + +def dnskey_to_ds(domain, dnskey): + dnskeylist = dnskey.split(' ', 3) + + flags = dnskeylist[0] + protocol = dnskeylist[1] + algorithm = dnskeylist[2] + key = dnskeylist[3].replace(' ', '') + + keyid = _calc_keyid(flags, protocol, algorithm, key) + ds = _calc_ds(domain, flags, protocol, algorithm, key) + + ret = list() + ret.append(str(keyid) + ' ' + str(algorithm) + ' ' + str(1) + ' ' + + ds['sha1'].lower()) + ret.append(str(keyid) + ' ' + str(algorithm) + ' ' + str(2) + ' ' + + ds['sha256'].lower()) + return ret + +# if __name__ == "__main__": +# print(len(sys.argv)) +# if len(sys.argv) == 1: +# print("no data specified, please enter manually") +# DOMAIN = input("enter domain: ") +# DNSKEY = input("enter DNSKEY: ") +# +# print(dnskey_to_ds(DOMAIN, DNSKEY)) +# elif len(sys.argv) in [9,10]: +# # user pasted full dig output +# DOMAIN = sys.argv[1] +# DNSKEY = " ".join(sys.argv[5:]) +# print(DOMAIN) +# print(DNSKEY) +# print(dnskey_to_ds(DOMAIN,DNSKEY)) + +# step2: + +# step3: start: partially stolen from: https://stackoverflow.com/questions/26137036/programmatically-check-if-domains-are-dnssec-protected + +def check_dnssec(domain_name, domain_data): + import dns.name + import dns.query + import dns.dnssec + import dns.message + import dns.resolver + import dns.rdatatype + import dns.exception + + success = False + + no_ds_rdata = domain_data["ds-rdata"] == [] + if no_ds_rdata: + print(f"INFO: {domain_name} doesn't have ds-rdata configured, not checking it") + + for nserver in domain_data["nserver"]: + + # if the nserver is not set (i.e. not loaded from other dns file or "wrong" fqdn) + if domain_data["nserver"][nserver] == None: + print(f"INFO: ip address(es) for nserver '{nserver}' in '{domain_name}' isn't specified/loaded") + continue + for nsaddr in domain_data["nserver"][nserver]: + + # get SOA + request = dns.message.make_query(domain_name, dns.rdatatype.SOA,want_dnssec=False) + try: + # send the query + dns.query.udp(request,nsaddr, timeout=2) + # if it timed out: tell the user + except dns.exception.Timeout: + print(f"WARN: querying {nserver} ({nsaddr}) for {domain_name} timed out") + continue + + if no_ds_rdata: + print(f"INFO: query for {domain_name} SOA on {nserver} ({nsaddr}) succeded, not checking DNSSEC") + continue + # get DNSKEY for zone + request = dns.message.make_query(domain_name, dns.rdatatype.DNSKEY,want_dnssec=True) + response = dns.query.udp(request,nsaddr, timeout=2) + + if response.rcode() != 0: + # HANDLE QUERY FAILED (SERVER ERROR OR NO DNSKEY RECORD) + print(f"WARN: query for a DNSKEY on {domain_name} failed on {nserver} ({nsaddr}), returncode: {response.rcode()}") + continue + # answer should contain two RRSET: DNSKEY and RRSIG(DNSKEY) + answer = response.answer + if len(answer) != 2: + # SOMETHING WENT WRONG + print(f"ERROR: query for a DNSKEY on {domain_name} failed on {nserver} ({nsaddr}), invalid answer length: {len(answer)}") + continue + # the DNSKEY should be self signed, validate it + name = dns.name.from_text(domain_name) + try: + #print(f"DEBUG: answer[0]: {answer[0]}") + #print(f"DEBUG: answer[1]: {answer[1]}") + try: + dns.dnssec.validate(answer[0],answer[1],{name:answer[0]}) + # it raises an AttributeError if the records are in the wrong order + except AttributeError as e: + dns.dnssec.validate(answer[1],answer[0],{name:answer[0]}) + + except dns.dnssec.ValidationFailure: + # BE SUSPICIOUS + print(f"WARN: DNSSEC validation failed on {domain_name} failed on {nserver} ({nsaddr}), answer: {answer}") + except AttributeError as e: + print(f"ERROR: {e}") + else: + # WE'RE GOOD, THERE'S A VALID DNSSEC SELF-SIGNED KEY FOR example.com + print(f"INFO: DNSSEC validation succeded on {domain_name} failed on {nserver} ({nsaddr})") + success = True + + return success + + +# step3: end + +def main(mntner): + + domains = get_domain_by_mntner(mntner=mntner) + + #with open("domains.tmp", "w") as domain_file: + # json.dump(domains, domain_file, indent=4) + # print(domains) + + for domain_name in domains: + print(check_dnssec(domain_name, domains[domain_name])) + +if __name__ == "__main__": + if len(sys.argv) == 1: + print(f"please specify your mntner\n {sys.argv[0]} YOU-MNT") + exit(1) + main(sys.argv[1]) + + +# commands to run: +# 1. drill -D .dn42 @ns1..dn42 NS +# 2. dnskey_to_ds(".dn42" +# # IN DNSKEY +# "257 3 13 ") +# 3. write dnskey to "trust-anchor" +# 4.delv @ns1..dn42 -a ./trust-anchor.tmp SOA .dn42