diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..2f73596 --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +dnspython diff --git a/validate-my-dns.py b/validate-my-dns.py index 40f1c90..6b6856d 100644 --- a/validate-my-dns.py +++ b/validate-my-dns.py @@ -8,6 +8,17 @@ import os import subprocess import json +import dns.name +import dns.query +import dns.dnssec +import dns.message +import dns.resolver +import dns.rdatatype + +# import errors so they could be try/catched +import dns.exception +import binascii + REGISTRY_PATH = "../registry" # step1: @@ -18,11 +29,11 @@ def get_domain_by_mntner(mntner): # grep for the given mntner in the dns,inetnum,inet6num directory of the registry and split it into a list; replace // with / in case REGISTRY_PATH ends with / dns_files = subprocess.Popen(["grep", "-Ril", f" {mntner}", f"{REGISTRY_PATH}/data/dns/"], stdout=subprocess.PIPE).communicate()[0].decode().replace("//", "/").split("\n")[:-1] - inetnums_files = subprocess.Popen(["grep", "-Ril", f" {mntner}", f"{REGISTRY_PATH}/data/inetnum/"], - stdout=subprocess.PIPE).communicate()[0].decode().replace("//", "/").split("\n")[:-1] + inetnums_files = subprocess.Popen(["grep", "-Ril", f" {mntner}", f"{REGISTRY_PATH}/data/inetnum/"], + stdout=subprocess.PIPE).communicate()[0].decode().replace("//", "/").split("\n")[:-1] inet6nums_files = subprocess.Popen( ["grep", "-Ril", f" {mntner}", f"{REGISTRY_PATH}/data/inet6num/"], stdout=subprocess.PIPE).communicate()[0].decode().split("\n")[:-1] - + # domains dict containing dns objects and inet(6)nums if they have nserver specified domains = {} # read dns files @@ -35,7 +46,7 @@ def get_domain_by_mntner(mntner): line = line.replace("\n", "") if line.startswith("nserver"): nserver = line[20:].split(" ") - # handle edge case where + # handle edge case where if "\t" in nserver[0]: nserver = nserver[0].split("\t") # ignore registry-sync nservers @@ -47,12 +58,14 @@ def get_domain_by_mntner(mntner): # nserver is defined in this file elif len(nserver) == 2: if nserver[0] not in domains[domain_name]["nserver"]: - domains[domain_name]["nserver"][nserver[0]] = [nserver[1]] + domains[domain_name]["nserver"][nserver[0]] = [ + nserver[1]] else: - domains[domain_name]["nserver"][nserver[0]].append(nserver[1]) + domains[domain_name]["nserver"][nserver[0]].append( + nserver[1]) elif line.startswith("ds-rdata:"): - domains[domain_name]["ds-rdata"].append(line[20:]) + domains[domain_name]["ds-rdata"].append(line[20:].lower()) # load inetnums for inetnum in inetnums_files: # temp variables in case there is no nserver @@ -64,10 +77,11 @@ def get_domain_by_mntner(mntner): line = line.replace("\n", "") if line.startswith("cidr"): line = line[20:] - _domain_name = ".".join(line.split(".")[::-1]) + ".in-addr.arpa" + _domain_name = ".".join( + line.split(".")[::-1]) + ".in-addr.arpa" elif line.startswith("nserver"): nserver = line[20:].split(" ") - # handle edge case where + # handle edge case where if "\t" in nserver[0]: nserver = nserver[0].split("\t") # ignore registry-sync nservers @@ -84,10 +98,11 @@ def get_domain_by_mntner(mntner): _nserver[nserver[0]].append(nserver[1]) elif line.startswith("ds-rdata:"): - _ds_rdata.append(line[20:]) + _ds_rdata.append(line[20:].lower()) # if nserver list is not empty add the reverse to the domain list if not _nserver == {}: - domains[_domain_name] = {"nserver": _nserver, "ds-rdata":_ds_rdata} + domains[_domain_name] = { + "nserver": _nserver, "ds-rdata": _ds_rdata} # load inet6nums for inet6num in inet6nums_files: # temp variables in case there is no nserver @@ -99,18 +114,18 @@ def get_domain_by_mntner(mntner): line = line.replace("\n", "") if line.startswith("inet6num"): line = line[20:] - - # generate the reverse ipv6 + + # generate the reverse ipv6 _domain_name = "ip6.arpa" - _lowest, _highest = line.replace(":","").split(" - ") - for _digit1, _digit2 in zip(_lowest,_highest): + _lowest, _highest = line.replace(":", "").split(" - ") + for _digit1, _digit2 in zip(_lowest, _highest): if _digit1 != _digit2: break _domain_name = _digit1 + "." + _domain_name elif line.startswith("nserver"): nserver = line[20:].split(" ") - # handle edge case where + # handle edge case where if "\t" in nserver[0]: nserver = nserver[0].split("\t") # ignore registry-sync nservers @@ -128,56 +143,74 @@ def get_domain_by_mntner(mntner): _nserver[nserver[0]].append(nserver[1]) elif line.startswith("ds-rdata:"): - _ds_rdata.append(line[20:]) + _ds_rdata.append(line[20:].lower()) # if nserver list is not empty add the reverse to the domain list if not _nserver == {}: - domains[_domain_name] = {"nserver": _nserver, "ds-rdata":_ds_rdata} + domains[_domain_name] = { + "nserver": _nserver, "ds-rdata": _ds_rdata} # add entries from main domain, if the nserver doesn't have an ip address (like in inet(6)nums) for domain in domains: for nserver in domains[domain]["nserver"]: # if the nserver isn't specified: ... if domains[domain]["nserver"][nserver] == None: - #print(f"INFO: the nserver {nserver} isn't specified in {domain}, looking into the parent domain of it") + # print(f"INFO: the nserver {nserver} isn't specified in {domain}, looking into the parent domain of it") for i in range(len(nserver.split(".")), 1, -1): # check if the nserver is already in loaded database, starts with more specific if ".".join(nserver.split(".")[-i:]) in domains: try: - domains[domain]["nserver"][nserver] = domains[".".join(nserver.split(".")[-i:])]["nserver"][nserver] + domains[domain]["nserver"][nserver] = domains[".".join( + nserver.split(".")[-i:])]["nserver"][nserver] except KeyError: # reaches here if the domain for the nserver specified in the inet{6}num/domain is found, but the nserver itself not. - print(f"Warn: the nserver {nserver} specified in {domain} wasn't found") + print( + f"Warn: the nserver {nserver} specified in {domain} wasn't found") break - - return domains -def get_dnskey(domain, nserver=None): + +def get_dnskey(domain_name, nserver): """query dns server for DNSKEY""" - if not nserver: - # drill: use DNSSEC, fallback to tcp, for $domain - drill_cmd = subprocess.Popen( - ["drill", "-D", "-a", "DNSKEY", domain], stdout=subprocess.PIPE, stderr=subprocess.PIPE) - else: - # drill: use DNSSEC, fallback to tcp, ask $nserver for $domain - drill_cmd = subprocess.Popen( - ["drill", "-D", "-a", "DNSKEY", f"@{nserver}", domain], stdout=subprocess.PIPE, stderr=subprocess.PIPE) - # get the output of the drill command - lines = drill_cmd.communicate()[0].decode().split("\n") - # filter out comments (;;) and empty lines - new_lines = [] - for line in lines: - if line == "" or line.startswith(";;"): - continue - new_lines.append(line) - lines = new_lines - # split records into domain and key - dnskeys = [[record.split("\t")[0], record.split("\t")[-1]] - for record in lines] - #print(len(dnskeys)) - return dnskeys + try: + request = dns.message.make_query( + domain_name, dns.rdatatype.DNSKEY, want_dnssec=False) + response = dns.query.udp(request, nserver, timeout=2) + except dns.exception.Timeout: + print(f"WARN: querying {nserver} for {domain_name} timed out") + return False + except dns.query.UnexpectedSource as e: + print(f"ERROR: server replied with different different ip than requested: error: {e}") + return False + if response.rcode() != 0: + # HANDLE QUERY FAILED (SERVER ERROR OR NO DNSKEY RECORD) + print( + f"WARN: query for a DNSKEY on {domain_name} failed on {nserver}, returncode: {response.rcode()}") + return False + return [dnskey.to_text().split("IN DNSKEY ")[1] for dnskey in response.answer] + # if not nserver: + # # drill: use DNSSEC, fallback to tcp, for $domain + # drill_cmd = subprocess.Popen( + # ["drill", "-D", "-a", "DNSKEY", domain], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + # else: + # # drill: use DNSSEC, fallback to tcp, ask $nserver for $domain + # drill_cmd = subprocess.Popen( + # ["drill", "-D", "-a", "DNSKEY", f"@{nserver}", domain], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + # # get the output of the drill command + # lines = drill_cmd.communicate()[0].decode().split("\n") + # # filter out comments (;;) and empty lines + # new_lines = [] + # for line in lines: + # if line == "" or line.startswith(";;"): + # continue + # new_lines.append(line) + # lines = new_lines + # # split records into domain and key + # dnskeys = [[record.split("\t")[0], record.split("\t")[-1]] + # for record in lines] + # #print(len(dnskeys)) + # return dnskeys # end_step1 @@ -263,77 +296,83 @@ def dnskey_to_ds(domain, dnskey): # step3: start: partially stolen from: https://stackoverflow.com/questions/26137036/programmatically-check-if-domains-are-dnssec-protected + def check_dnssec(domain_name, domain_data): - import dns.name - import dns.query - import dns.dnssec - import dns.message - import dns.resolver - import dns.rdatatype - import dns.exception success = False no_ds_rdata = domain_data["ds-rdata"] == [] if no_ds_rdata: - print(f"INFO: {domain_name} doesn't have ds-rdata configured, not checking it") + print( + f"INFO: {domain_name} doesn't have ds-rdata configured, not checking it") for nserver in domain_data["nserver"]: - + # if the nserver is not set (i.e. not loaded from other dns file or "wrong" fqdn) if domain_data["nserver"][nserver] == None: - print(f"INFO: ip address(es) for nserver '{nserver}' in '{domain_name}' isn't specified/loaded") + print( + f"INFO: ip address(es) for nserver '{nserver}' in '{domain_name}' isn't specified/loaded") continue for nsaddr in domain_data["nserver"][nserver]: # get SOA - request = dns.message.make_query(domain_name, dns.rdatatype.SOA,want_dnssec=False) + request = dns.message.make_query( + domain_name, dns.rdatatype.SOA, want_dnssec=False) try: # send the query - dns.query.udp(request,nsaddr, timeout=2) + dns.query.udp(request, nsaddr, timeout=2) # if it timed out: tell the user except dns.exception.Timeout: - print(f"WARN: querying {nserver} ({nsaddr}) for {domain_name} timed out") + print( + f"WARN: querying {nserver} ({nsaddr}) for {domain_name} timed out") continue - + if no_ds_rdata: - print(f"INFO: query for {domain_name} SOA on {nserver} ({nsaddr}) succeded, not checking DNSSEC") + print( + f"INFO: query for {domain_name} SOA on {nserver} ({nsaddr}) succeded, not checking DNSSEC") continue # get DNSKEY for zone - request = dns.message.make_query(domain_name, dns.rdatatype.DNSKEY,want_dnssec=True) - response = dns.query.udp(request,nsaddr, timeout=2) + request = dns.message.make_query( + domain_name, dns.rdatatype.DNSKEY, want_dnssec=True) + response = dns.query.udp(request, nsaddr, timeout=2) if response.rcode() != 0: # HANDLE QUERY FAILED (SERVER ERROR OR NO DNSKEY RECORD) - print(f"WARN: query for a DNSKEY on {domain_name} failed on {nserver} ({nsaddr}), returncode: {response.rcode()}") + print( + f"WARN: query for a DNSKEY on {domain_name} failed on {nserver} ({nsaddr}), returncode: {response.rcode()}") continue # answer should contain two RRSET: DNSKEY and RRSIG(DNSKEY) answer = response.answer if len(answer) != 2: # SOMETHING WENT WRONG - print(f"ERROR: query for a DNSKEY on {domain_name} failed on {nserver} ({nsaddr}), invalid answer length: {len(answer)}") + print( + f"ERROR: query for a DNSKEY on {domain_name} failed on {nserver} ({nsaddr}), invalid answer length: {len(answer)}") continue # the DNSKEY should be self signed, validate it name = dns.name.from_text(domain_name) try: - #print(f"DEBUG: answer[0]: {answer[0]}") - #print(f"DEBUG: answer[1]: {answer[1]}") + # print(f"DEBUG: answer[0]: {answer[0]}") + # print(f"DEBUG: answer[1]: {answer[1]}") try: - dns.dnssec.validate(answer[0],answer[1],{name:answer[0]}) + dns.dnssec.validate( + answer[0], answer[1], {name: answer[0]}) # it raises an AttributeError if the records are in the wrong order except AttributeError as e: - dns.dnssec.validate(answer[1],answer[0],{name:answer[0]}) - + dns.dnssec.validate( + answer[1], answer[0], {name: answer[0]}) + except dns.dnssec.ValidationFailure: # BE SUSPICIOUS - print(f"WARN: DNSSEC validation failed on {domain_name} failed on {nserver} ({nsaddr}), answer: {answer}") + print( + f"WARN: DNSSEC validation failed on {domain_name} failed on {nserver} ({nsaddr}), answer: {answer}") except AttributeError as e: print(f"ERROR: {e}") else: # WE'RE GOOD, THERE'S A VALID DNSSEC SELF-SIGNED KEY FOR example.com - print(f"INFO: DNSSEC validation succeded on {domain_name} failed on {nserver} ({nsaddr})") + print( + f"INFO: DNSSEC validation succeded on {domain_name} failed on {nserver} ({nsaddr})") success = True - + return success @@ -341,14 +380,51 @@ def check_dnssec(domain_name, domain_data): def main(mntner): + # get all domains/inet(6)nums of the mntner domains = get_domain_by_mntner(mntner=mntner) - #with open("domains.tmp", "w") as domain_file: - # json.dump(domains, domain_file, indent=4) - # print(domains) - for domain_name in domains: - print(check_dnssec(domain_name, domains[domain_name])) + + # check if the domain doesn't have DS data + if domains[domain_name]["ds-rdata"] == []: + print(f"INFO: {domain_name} doens't have any ds-rdata specified") + continue + + for nserver in domains[domain_name]["nserver"]: + # check for unset nserver ips -> dont check them + if domains[domain_name]["nserver"][nserver] == None: + continue + for ip in domains[domain_name]["nserver"][nserver]: + ds_candidates = [] + # load DNSKEYs from nserver: if False something failed (timeout) + _keys = get_dnskey(domain_name, ip) + if _keys == False: continue + # convert all found keys to DS + for key in _keys: + try: + _ds_s = dnskey_to_ds(domain_name, key) + except binascii.Error as e: + print(f"ERROR: trying to convert '{key}' to DS failed: {e}") + continue + ds_candidates.extend(_ds_s) + found = False + # iterate over DS-rdata from the registry and check if they are found on the nserver + for ds in domains[domain_name]["ds-rdata"]: + # print(ds) + if ds in ds_candidates: + found = True + # print(f"DEBUG: available: {domains[domain_name]['ds-rdata']}") + # print(f"DEBUG: generated: {ds_candidates}") + if found: + print( + f"INFO: correct ds-rdata specified and matching DNSKEY returned by {ip} for {domain_name}") + else: + print( + f"ERROR: invalid ds-rdata specified and matching DNSKEY returned by {ip} for {domain_name}") + # break + + # print(check_dnssec(domain_name, domains[domain_name])) + if __name__ == "__main__": if len(sys.argv) == 1: