add error counting

This commit is contained in:
lare 2023-02-12 11:48:20 +01:00
parent 68bf8a7704
commit b3b6524487

View file

@ -27,11 +27,15 @@ except ImportError:
import dns.exception import dns.exception
import binascii import binascii
# counter of errors that occured
errors= 0
# step1: # step1:
def get_domain_by_mntner(mntner): def get_domain_by_mntner(mntner):
global errors
"""get a list of domains (and reverse ipv4/6) if a nserver is specified""" """get a list of domains (and reverse ipv4/6) if a nserver is specified"""
# grep for the given mntner in the dns,inetnum,inet6num directory of the registry and split it into a list; replace // with / in case REGISTRY_PATH ends with / # grep for the given mntner in the dns,inetnum,inet6num directory of the registry and split it into a list; replace // with / in case REGISTRY_PATH ends with /
dns_files = subprocess.Popen(["grep", "-Ril", f" {mntner}", f"{REGISTRY_PATH}/data/dns/"], dns_files = subprocess.Popen(["grep", "-Ril", f" {mntner}", f"{REGISTRY_PATH}/data/dns/"],
@ -173,21 +177,24 @@ def get_domain_by_mntner(mntner):
def get_dnskey(domain_name, nserver): def get_dnskey(domain_name, nserver):
"""query dns server for DNSKEY""" """query dns server for DNSKEY"""
global errors
try: try:
request = dns.message.make_query( request = dns.message.make_query(
domain_name, dns.rdatatype.DNSKEY, want_dnssec=False) domain_name, dns.rdatatype.DNSKEY, want_dnssec=False)
response = dns.query.udp_with_fallback(request, nserver, timeout=2) response = dns.query.udp_with_fallback(request, nserver, timeout=2)
except dns.exception.Timeout: except dns.exception.Timeout:
print(f"WARN: querying {nserver} for {domain_name} timed out") print(f"WARN: querying {nserver} for {domain_name} timed out")
errors += 1
return False return False
except dns.query.UnexpectedSource as e: except dns.query.UnexpectedSource as e:
print(f"ERROR: server replied with different different ip than requested: error: {e}") print(f"ERROR: server replied with different different ip than requested: error: {e}")
errors += 1
return False return False
if response[0].rcode() != 0: if response[0].rcode() != 0:
# HANDLE QUERY FAILED (SERVER ERROR OR NO DNSKEY RECORD) # HANDLE QUERY FAILED (SERVER ERROR OR NO DNSKEY RECORD)
print( print(
f"WARN: query for a DNSKEY on {domain_name} failed on {nserver}, returncode: {response[0].rcode()}") f"WARN: query for a DNSKEY on {domain_name} failed on {nserver}, returncode: {response[0].rcode()}")
errors += 1
return False return False
return [dnskey.to_text().split("IN DNSKEY ")[1] for dnskey in response[0].answer] return [dnskey.to_text().split("IN DNSKEY ")[1] for dnskey in response[0].answer]
# if not nserver: # if not nserver:
@ -299,13 +306,14 @@ def dnskey_to_ds(domain, dnskey):
def check_dnssec(domain_name, domain_data): def check_dnssec(domain_name, domain_data):
global errors
success = False success = False
no_ds_rdata = domain_data["ds-rdata"] == [] no_ds_rdata = domain_data["ds-rdata"] == []
if no_ds_rdata: if no_ds_rdata:
print( print(
f"NOTE: {domain_name} doesn't have ds-rdata configured, not checking it") f"NOTE: {domain_name} doesn't have ds-rdata configured, not checking it")
return True
for nserver in domain_data["nserver"]: for nserver in domain_data["nserver"]:
@ -341,6 +349,7 @@ def check_dnssec(domain_name, domain_data):
# HANDLE QUERY FAILED (SERVER ERROR OR NO DNSKEY RECORD) # HANDLE QUERY FAILED (SERVER ERROR OR NO DNSKEY RECORD)
print( print(
f"WARN: query for a DNSKEY on {domain_name} failed on {nserver} ({nsaddr}), returncode: {response[0].rcode()}") f"WARN: query for a DNSKEY on {domain_name} failed on {nserver} ({nsaddr}), returncode: {response[0].rcode()}")
errors += 1
continue continue
# answer should contain two RRSET: DNSKEY and RRSIG(DNSKEY) # answer should contain two RRSET: DNSKEY and RRSIG(DNSKEY)
answer = response[0].answer answer = response[0].answer
@ -348,6 +357,7 @@ def check_dnssec(domain_name, domain_data):
# SOMETHING WENT WRONG # SOMETHING WENT WRONG
print( print(
f"ERROR: query for a DNSKEY on {domain_name} failed on {nserver} ({nsaddr}), invalid answer length: {len(answer)}") f"ERROR: query for a DNSKEY on {domain_name} failed on {nserver} ({nsaddr}), invalid answer length: {len(answer)}")
errors += 1
continue continue
# the DNSKEY should be self signed, validate it # the DNSKEY should be self signed, validate it
name = dns.name.from_text(domain_name) name = dns.name.from_text(domain_name)
@ -366,8 +376,10 @@ def check_dnssec(domain_name, domain_data):
# BE SUSPICIOUS # BE SUSPICIOUS
print( print(
f"WARN: DNSSEC validation failed on {domain_name} failed on {nserver} ({nsaddr}), answer: {answer}") f"WARN: DNSSEC validation failed on {domain_name} failed on {nserver} ({nsaddr}), answer: {answer}")
errors += 1
except AttributeError as e: except AttributeError as e:
print(f"ERROR: {e}") print(f"ERROR: {e}")
errors += 1
else: else:
# WE'RE GOOD, THERE'S A VALID DNSSEC SELF-SIGNED KEY FOR example.com # WE'RE GOOD, THERE'S A VALID DNSSEC SELF-SIGNED KEY FOR example.com
print( print(
@ -380,7 +392,7 @@ def check_dnssec(domain_name, domain_data):
# step3: end # step3: end
def main(mntner): def main(mntner):
global errors
# get all domains/inet(6)nums of the mntner # get all domains/inet(6)nums of the mntner
domains = get_domain_by_mntner(mntner=mntner) domains = get_domain_by_mntner(mntner=mntner)
@ -422,6 +434,7 @@ def main(mntner):
else: else:
print( print(
f"ERROR: invalid ds-rdata specified and matching DNSKEY returned by {ip} for {domain_name}") f"ERROR: invalid ds-rdata specified and matching DNSKEY returned by {ip} for {domain_name}")
errors += 1
# break # break
# print(check_dnssec(domain_name, domains[domain_name])) # print(check_dnssec(domain_name, domains[domain_name]))
@ -432,6 +445,7 @@ if __name__ == "__main__":
print(f"please specify your mntner\n {sys.argv[0]} YOU-MNT") print(f"please specify your mntner\n {sys.argv[0]} YOU-MNT")
exit(1) exit(1)
main(sys.argv[1]) main(sys.argv[1])
exit(errors)
# commands to run: # commands to run: