add further checks and helpers

This commit is contained in:
lare 2023-02-11 22:15:41 +01:00
parent 54ff5d65f2
commit cf4b99a408
2 changed files with 155 additions and 78 deletions

1
requirements.txt Normal file
View file

@ -0,0 +1 @@
dnspython

View file

@ -8,6 +8,17 @@ import os
import subprocess import subprocess
import json import json
import dns.name
import dns.query
import dns.dnssec
import dns.message
import dns.resolver
import dns.rdatatype
# import errors so they could be try/catched
import dns.exception
import binascii
REGISTRY_PATH = "../registry" REGISTRY_PATH = "../registry"
# step1: # step1:
@ -47,12 +58,14 @@ def get_domain_by_mntner(mntner):
# nserver is defined in this file # nserver is defined in this file
elif len(nserver) == 2: elif len(nserver) == 2:
if nserver[0] not in domains[domain_name]["nserver"]: if nserver[0] not in domains[domain_name]["nserver"]:
domains[domain_name]["nserver"][nserver[0]] = [nserver[1]] domains[domain_name]["nserver"][nserver[0]] = [
nserver[1]]
else: else:
domains[domain_name]["nserver"][nserver[0]].append(nserver[1]) domains[domain_name]["nserver"][nserver[0]].append(
nserver[1])
elif line.startswith("ds-rdata:"): elif line.startswith("ds-rdata:"):
domains[domain_name]["ds-rdata"].append(line[20:]) domains[domain_name]["ds-rdata"].append(line[20:].lower())
# load inetnums # load inetnums
for inetnum in inetnums_files: for inetnum in inetnums_files:
# temp variables in case there is no nserver # temp variables in case there is no nserver
@ -64,7 +77,8 @@ def get_domain_by_mntner(mntner):
line = line.replace("\n", "") line = line.replace("\n", "")
if line.startswith("cidr"): if line.startswith("cidr"):
line = line[20:] line = line[20:]
_domain_name = ".".join(line.split(".")[::-1]) + ".in-addr.arpa" _domain_name = ".".join(
line.split(".")[::-1]) + ".in-addr.arpa"
elif line.startswith("nserver"): elif line.startswith("nserver"):
nserver = line[20:].split(" ") nserver = line[20:].split(" ")
# handle edge case where # handle edge case where
@ -84,10 +98,11 @@ def get_domain_by_mntner(mntner):
_nserver[nserver[0]].append(nserver[1]) _nserver[nserver[0]].append(nserver[1])
elif line.startswith("ds-rdata:"): elif line.startswith("ds-rdata:"):
_ds_rdata.append(line[20:]) _ds_rdata.append(line[20:].lower())
# if nserver list is not empty add the reverse to the domain list # if nserver list is not empty add the reverse to the domain list
if not _nserver == {}: if not _nserver == {}:
domains[_domain_name] = {"nserver": _nserver, "ds-rdata":_ds_rdata} domains[_domain_name] = {
"nserver": _nserver, "ds-rdata": _ds_rdata}
# load inet6nums # load inet6nums
for inet6num in inet6nums_files: for inet6num in inet6nums_files:
# temp variables in case there is no nserver # temp variables in case there is no nserver
@ -128,10 +143,11 @@ def get_domain_by_mntner(mntner):
_nserver[nserver[0]].append(nserver[1]) _nserver[nserver[0]].append(nserver[1])
elif line.startswith("ds-rdata:"): elif line.startswith("ds-rdata:"):
_ds_rdata.append(line[20:]) _ds_rdata.append(line[20:].lower())
# if nserver list is not empty add the reverse to the domain list # if nserver list is not empty add the reverse to the domain list
if not _nserver == {}: if not _nserver == {}:
domains[_domain_name] = {"nserver": _nserver, "ds-rdata":_ds_rdata} domains[_domain_name] = {
"nserver": _nserver, "ds-rdata": _ds_rdata}
# add entries from main domain, if the nserver doesn't have an ip address (like in inet(6)nums) # add entries from main domain, if the nserver doesn't have an ip address (like in inet(6)nums)
for domain in domains: for domain in domains:
@ -143,41 +159,58 @@ def get_domain_by_mntner(mntner):
# check if the nserver is already in loaded database, starts with more specific # check if the nserver is already in loaded database, starts with more specific
if ".".join(nserver.split(".")[-i:]) in domains: if ".".join(nserver.split(".")[-i:]) in domains:
try: try:
domains[domain]["nserver"][nserver] = domains[".".join(nserver.split(".")[-i:])]["nserver"][nserver] domains[domain]["nserver"][nserver] = domains[".".join(
nserver.split(".")[-i:])]["nserver"][nserver]
except KeyError: except KeyError:
# reaches here if the domain for the nserver specified in the inet{6}num/domain is found, but the nserver itself not. # reaches here if the domain for the nserver specified in the inet{6}num/domain is found, but the nserver itself not.
print(f"Warn: the nserver {nserver} specified in {domain} wasn't found") print(
f"Warn: the nserver {nserver} specified in {domain} wasn't found")
break break
return domains return domains
def get_dnskey(domain, nserver=None):
def get_dnskey(domain_name, nserver):
"""query dns server for DNSKEY""" """query dns server for DNSKEY"""
if not nserver: try:
# drill: use DNSSEC, fallback to tcp, for $domain request = dns.message.make_query(
drill_cmd = subprocess.Popen( domain_name, dns.rdatatype.DNSKEY, want_dnssec=False)
["drill", "-D", "-a", "DNSKEY", domain], stdout=subprocess.PIPE, stderr=subprocess.PIPE) response = dns.query.udp(request, nserver, timeout=2)
else: except dns.exception.Timeout:
# drill: use DNSSEC, fallback to tcp, ask $nserver for $domain print(f"WARN: querying {nserver} for {domain_name} timed out")
drill_cmd = subprocess.Popen( return False
["drill", "-D", "-a", "DNSKEY", f"@{nserver}", domain], stdout=subprocess.PIPE, stderr=subprocess.PIPE) except dns.query.UnexpectedSource as e:
# get the output of the drill command print(f"ERROR: server replied with different different ip than requested: error: {e}")
lines = drill_cmd.communicate()[0].decode().split("\n") return False
# filter out comments (;;) and empty lines if response.rcode() != 0:
new_lines = [] # HANDLE QUERY FAILED (SERVER ERROR OR NO DNSKEY RECORD)
for line in lines: print(
if line == "" or line.startswith(";;"): f"WARN: query for a DNSKEY on {domain_name} failed on {nserver}, returncode: {response.rcode()}")
continue return False
new_lines.append(line) return [dnskey.to_text().split("IN DNSKEY ")[1] for dnskey in response.answer]
lines = new_lines # if not nserver:
# split records into domain and key # # drill: use DNSSEC, fallback to tcp, for $domain
dnskeys = [[record.split("\t")[0], record.split("\t")[-1]] # drill_cmd = subprocess.Popen(
for record in lines] # ["drill", "-D", "-a", "DNSKEY", domain], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
#print(len(dnskeys)) # else:
return dnskeys # # drill: use DNSSEC, fallback to tcp, ask $nserver for $domain
# drill_cmd = subprocess.Popen(
# ["drill", "-D", "-a", "DNSKEY", f"@{nserver}", domain], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# # get the output of the drill command
# lines = drill_cmd.communicate()[0].decode().split("\n")
# # filter out comments (;;) and empty lines
# new_lines = []
# for line in lines:
# if line == "" or line.startswith(";;"):
# continue
# new_lines.append(line)
# lines = new_lines
# # split records into domain and key
# dnskeys = [[record.split("\t")[0], record.split("\t")[-1]]
# for record in lines]
# #print(len(dnskeys))
# return dnskeys
# end_step1 # end_step1
@ -263,55 +296,57 @@ def dnskey_to_ds(domain, dnskey):
# step3: start: partially stolen from: https://stackoverflow.com/questions/26137036/programmatically-check-if-domains-are-dnssec-protected # step3: start: partially stolen from: https://stackoverflow.com/questions/26137036/programmatically-check-if-domains-are-dnssec-protected
def check_dnssec(domain_name, domain_data): def check_dnssec(domain_name, domain_data):
import dns.name
import dns.query
import dns.dnssec
import dns.message
import dns.resolver
import dns.rdatatype
import dns.exception
success = False success = False
no_ds_rdata = domain_data["ds-rdata"] == [] no_ds_rdata = domain_data["ds-rdata"] == []
if no_ds_rdata: if no_ds_rdata:
print(f"INFO: {domain_name} doesn't have ds-rdata configured, not checking it") print(
f"INFO: {domain_name} doesn't have ds-rdata configured, not checking it")
for nserver in domain_data["nserver"]: for nserver in domain_data["nserver"]:
# if the nserver is not set (i.e. not loaded from other dns file or "wrong" fqdn) # if the nserver is not set (i.e. not loaded from other dns file or "wrong" fqdn)
if domain_data["nserver"][nserver] == None: if domain_data["nserver"][nserver] == None:
print(f"INFO: ip address(es) for nserver '{nserver}' in '{domain_name}' isn't specified/loaded") print(
f"INFO: ip address(es) for nserver '{nserver}' in '{domain_name}' isn't specified/loaded")
continue continue
for nsaddr in domain_data["nserver"][nserver]: for nsaddr in domain_data["nserver"][nserver]:
# get SOA # get SOA
request = dns.message.make_query(domain_name, dns.rdatatype.SOA,want_dnssec=False) request = dns.message.make_query(
domain_name, dns.rdatatype.SOA, want_dnssec=False)
try: try:
# send the query # send the query
dns.query.udp(request, nsaddr, timeout=2) dns.query.udp(request, nsaddr, timeout=2)
# if it timed out: tell the user # if it timed out: tell the user
except dns.exception.Timeout: except dns.exception.Timeout:
print(f"WARN: querying {nserver} ({nsaddr}) for {domain_name} timed out") print(
f"WARN: querying {nserver} ({nsaddr}) for {domain_name} timed out")
continue continue
if no_ds_rdata: if no_ds_rdata:
print(f"INFO: query for {domain_name} SOA on {nserver} ({nsaddr}) succeded, not checking DNSSEC") print(
f"INFO: query for {domain_name} SOA on {nserver} ({nsaddr}) succeded, not checking DNSSEC")
continue continue
# get DNSKEY for zone # get DNSKEY for zone
request = dns.message.make_query(domain_name, dns.rdatatype.DNSKEY,want_dnssec=True) request = dns.message.make_query(
domain_name, dns.rdatatype.DNSKEY, want_dnssec=True)
response = dns.query.udp(request, nsaddr, timeout=2) response = dns.query.udp(request, nsaddr, timeout=2)
if response.rcode() != 0: if response.rcode() != 0:
# HANDLE QUERY FAILED (SERVER ERROR OR NO DNSKEY RECORD) # HANDLE QUERY FAILED (SERVER ERROR OR NO DNSKEY RECORD)
print(f"WARN: query for a DNSKEY on {domain_name} failed on {nserver} ({nsaddr}), returncode: {response.rcode()}") print(
f"WARN: query for a DNSKEY on {domain_name} failed on {nserver} ({nsaddr}), returncode: {response.rcode()}")
continue continue
# answer should contain two RRSET: DNSKEY and RRSIG(DNSKEY) # answer should contain two RRSET: DNSKEY and RRSIG(DNSKEY)
answer = response.answer answer = response.answer
if len(answer) != 2: if len(answer) != 2:
# SOMETHING WENT WRONG # SOMETHING WENT WRONG
print(f"ERROR: query for a DNSKEY on {domain_name} failed on {nserver} ({nsaddr}), invalid answer length: {len(answer)}") print(
f"ERROR: query for a DNSKEY on {domain_name} failed on {nserver} ({nsaddr}), invalid answer length: {len(answer)}")
continue continue
# the DNSKEY should be self signed, validate it # the DNSKEY should be self signed, validate it
name = dns.name.from_text(domain_name) name = dns.name.from_text(domain_name)
@ -319,19 +354,23 @@ def check_dnssec(domain_name, domain_data):
# print(f"DEBUG: answer[0]: {answer[0]}") # print(f"DEBUG: answer[0]: {answer[0]}")
# print(f"DEBUG: answer[1]: {answer[1]}") # print(f"DEBUG: answer[1]: {answer[1]}")
try: try:
dns.dnssec.validate(answer[0],answer[1],{name:answer[0]}) dns.dnssec.validate(
answer[0], answer[1], {name: answer[0]})
# it raises an AttributeError if the records are in the wrong order # it raises an AttributeError if the records are in the wrong order
except AttributeError as e: except AttributeError as e:
dns.dnssec.validate(answer[1],answer[0],{name:answer[0]}) dns.dnssec.validate(
answer[1], answer[0], {name: answer[0]})
except dns.dnssec.ValidationFailure: except dns.dnssec.ValidationFailure:
# BE SUSPICIOUS # BE SUSPICIOUS
print(f"WARN: DNSSEC validation failed on {domain_name} failed on {nserver} ({nsaddr}), answer: {answer}") print(
f"WARN: DNSSEC validation failed on {domain_name} failed on {nserver} ({nsaddr}), answer: {answer}")
except AttributeError as e: except AttributeError as e:
print(f"ERROR: {e}") print(f"ERROR: {e}")
else: else:
# WE'RE GOOD, THERE'S A VALID DNSSEC SELF-SIGNED KEY FOR example.com # WE'RE GOOD, THERE'S A VALID DNSSEC SELF-SIGNED KEY FOR example.com
print(f"INFO: DNSSEC validation succeded on {domain_name} failed on {nserver} ({nsaddr})") print(
f"INFO: DNSSEC validation succeded on {domain_name} failed on {nserver} ({nsaddr})")
success = True success = True
return success return success
@ -341,14 +380,51 @@ def check_dnssec(domain_name, domain_data):
def main(mntner): def main(mntner):
# get all domains/inet(6)nums of the mntner
domains = get_domain_by_mntner(mntner=mntner) domains = get_domain_by_mntner(mntner=mntner)
#with open("domains.tmp", "w") as domain_file:
# json.dump(domains, domain_file, indent=4)
# print(domains)
for domain_name in domains: for domain_name in domains:
print(check_dnssec(domain_name, domains[domain_name]))
# check if the domain doesn't have DS data
if domains[domain_name]["ds-rdata"] == []:
print(f"INFO: {domain_name} doens't have any ds-rdata specified")
continue
for nserver in domains[domain_name]["nserver"]:
# check for unset nserver ips -> dont check them
if domains[domain_name]["nserver"][nserver] == None:
continue
for ip in domains[domain_name]["nserver"][nserver]:
ds_candidates = []
# load DNSKEYs from nserver: if False something failed (timeout)
_keys = get_dnskey(domain_name, ip)
if _keys == False: continue
# convert all found keys to DS
for key in _keys:
try:
_ds_s = dnskey_to_ds(domain_name, key)
except binascii.Error as e:
print(f"ERROR: trying to convert '{key}' to DS failed: {e}")
continue
ds_candidates.extend(_ds_s)
found = False
# iterate over DS-rdata from the registry and check if they are found on the nserver
for ds in domains[domain_name]["ds-rdata"]:
# print(ds)
if ds in ds_candidates:
found = True
# print(f"DEBUG: available: {domains[domain_name]['ds-rdata']}")
# print(f"DEBUG: generated: {ds_candidates}")
if found:
print(
f"INFO: correct ds-rdata specified and matching DNSKEY returned by {ip} for {domain_name}")
else:
print(
f"ERROR: invalid ds-rdata specified and matching DNSKEY returned by {ip} for {domain_name}")
# break
# print(check_dnssec(domain_name, domains[domain_name]))
if __name__ == "__main__": if __name__ == "__main__":
if len(sys.argv) == 1: if len(sys.argv) == 1: