2023-02-11 13:36:48 +01:00
#!/usr/bin/env python3
2023-02-12 02:26:32 +01:00
REGISTRY_PATH = " ../registry "
2023-02-11 13:36:48 +01:00
import hashlib
import base64
import struct
import sys
import os
import subprocess
import json
2023-02-12 00:26:37 +01:00
try :
import dns . name
import dns . query
import dns . dnssec
import dns . message
import dns . resolver
import dns . rdatatype
except ImportError :
print ( )
print ( " CRITICAL: this script requires the ' dnspython ' libary, please install it using `python3 -m pip install dnspython` " )
print ( )
exit ( 1 )
2023-02-11 22:15:41 +01:00
# import errors so they could be try/catched
import dns . exception
import binascii
2023-02-11 13:36:48 +01:00
# step1:
def get_domain_by_mntner ( mntner ) :
""" get a list of domains (and reverse ipv4/6) if a nserver is specified """
# grep for the given mntner in the dns,inetnum,inet6num directory of the registry and split it into a list; replace // with / in case REGISTRY_PATH ends with /
dns_files = subprocess . Popen ( [ " grep " , " -Ril " , f " { mntner } " , f " { REGISTRY_PATH } /data/dns/ " ] ,
stdout = subprocess . PIPE ) . communicate ( ) [ 0 ] . decode ( ) . replace ( " // " , " / " ) . split ( " \n " ) [ : - 1 ]
2023-02-11 22:15:41 +01:00
inetnums_files = subprocess . Popen ( [ " grep " , " -Ril " , f " { mntner } " , f " { REGISTRY_PATH } /data/inetnum/ " ] ,
stdout = subprocess . PIPE ) . communicate ( ) [ 0 ] . decode ( ) . replace ( " // " , " / " ) . split ( " \n " ) [ : - 1 ]
2023-02-11 13:36:48 +01:00
inet6nums_files = subprocess . Popen (
[ " grep " , " -Ril " , f " { mntner } " , f " { REGISTRY_PATH } /data/inet6num/ " ] , stdout = subprocess . PIPE ) . communicate ( ) [ 0 ] . decode ( ) . split ( " \n " ) [ : - 1 ]
2023-02-11 22:15:41 +01:00
2023-02-11 13:36:48 +01:00
# domains dict containing dns objects and inet(6)nums if they have nserver specified
domains = { }
2023-02-12 02:26:32 +01:00
def _parse_nserver ( line ) :
nserver = line [ 20 : ] . split ( " " )
# handle edge case where
if " \t " in nserver [ 0 ] :
nserver = nserver [ 0 ] . split ( " \t " )
# ignore registry-sync nservers
if " registry-sync.dn42 " in nserver [ 0 ] :
return
# nserver should be defined in an other dns file
if len ( nserver ) == 1 :
return [ nserver [ 0 ] , None ]
# nserver is defined in this file
elif len ( nserver ) == 2 :
return nserver
2023-02-11 13:36:48 +01:00
# read dns files
for domain in dns_files :
with open ( domain ) as d :
domain_name = domain . split ( " / " ) [ - 1 ]
# a dictionary for each domain with "nserver": {"ns1.domain.dn42": ["ns1 ipv4", "ns1 ipv6"], ...}, "ds-rdata": ["123 45 67 ...", "98 7 65 ..."]
domains [ domain_name ] = { " nserver " : { } , " ds-rdata " : [ ] }
for line in d . readlines ( ) :
line = line . replace ( " \n " , " " )
if line . startswith ( " nserver " ) :
2023-02-12 02:26:32 +01:00
_tmp = _parse_nserver ( line )
if _tmp == " break " : break
if _tmp [ 0 ] in domains [ domain_name ] [ " nserver " ] :
domains [ domain_name ] [ " nserver " ] [ _tmp [ 0 ] ] . append ( _tmp [ 1 ] )
else :
domains [ domain_name ] [ " nserver " ] [ _tmp [ 0 ]
] = [ _tmp [ 1 ] ]
2023-02-11 13:36:48 +01:00
elif line . startswith ( " ds-rdata: " ) :
2023-02-11 22:15:41 +01:00
domains [ domain_name ] [ " ds-rdata " ] . append ( line [ 20 : ] . lower ( ) )
2023-02-11 13:36:48 +01:00
# load inetnums
for inetnum in inetnums_files :
# temp variables in case there is no nserver
_nserver = { }
_ds_rdata = [ ]
_domain_name = " "
with open ( inetnum ) as i4 :
for line in i4 . readlines ( ) :
line = line . replace ( " \n " , " " )
if line . startswith ( " cidr " ) :
line = line [ 20 : ]
2023-02-11 22:15:41 +01:00
_domain_name = " . " . join (
line . split ( " . " ) [ : : - 1 ] ) + " .in-addr.arpa "
2023-02-12 00:26:37 +01:00
if int ( line . split ( " / " ) [ 1 ] ) == 24 :
_domain_name = _domain_name . replace ( " 0/24. " , " " )
elif int ( line . split ( " / " ) [ 1 ] ) == 16 :
_domain_name = _domain_name . replace ( " 0/16.0. " , " " )
elif int ( line . split ( " / " ) [ 1 ] ) == 8 :
_domain_name = _domain_name . replace ( " 0/8.0.0. " , " " )
2023-02-11 23:25:29 +01:00
elif int ( line . split ( " / " ) [ 1 ] ) < = 24 :
# TODO: implement creation of multiple zones for every /24 within
2023-02-12 00:26:37 +01:00
print ( f " WARN: currently only ipv4 subnets with length >=24 or 16 or 8 are possible to be checked: relavent inetnum { line } " )
2023-02-11 13:36:48 +01:00
elif line . startswith ( " nserver " ) :
2023-02-12 02:26:32 +01:00
_tmp = _parse_nserver ( line )
if _tmp == " break " : break
if _tmp [ 0 ] in _nserver :
_nserver [ _tmp [ 0 ] ] . append ( _tmp [ 1 ] )
else :
_nserver [ _tmp [ 0 ] ] = _tmp [ 1 ]
2023-02-11 13:36:48 +01:00
elif line . startswith ( " ds-rdata: " ) :
2023-02-11 22:15:41 +01:00
_ds_rdata . append ( line [ 20 : ] . lower ( ) )
2023-02-11 13:36:48 +01:00
# if nserver list is not empty add the reverse to the domain list
if not _nserver == { } :
2023-02-11 22:15:41 +01:00
domains [ _domain_name ] = {
" nserver " : _nserver , " ds-rdata " : _ds_rdata }
2023-02-11 13:36:48 +01:00
# load inet6nums
for inet6num in inet6nums_files :
# temp variables in case there is no nserver
_nserver = { }
_ds_rdata = [ ]
_domain_name = " "
with open ( inet6num ) as i6 :
for line in i6 . readlines ( ) :
line = line . replace ( " \n " , " " )
if line . startswith ( " inet6num " ) :
line = line [ 20 : ]
2023-02-11 22:15:41 +01:00
# generate the reverse ipv6
2023-02-11 13:36:48 +01:00
_domain_name = " ip6.arpa "
2023-02-11 22:15:41 +01:00
_lowest , _highest = line . replace ( " : " , " " ) . split ( " - " )
for _digit1 , _digit2 in zip ( _lowest , _highest ) :
2023-02-11 13:36:48 +01:00
if _digit1 != _digit2 :
break
_domain_name = _digit1 + " . " + _domain_name
elif line . startswith ( " nserver " ) :
2023-02-12 02:26:32 +01:00
_tmp = _parse_nserver ( line )
if _tmp == " break " : break
if _tmp [ 0 ] in _nserver :
_nserver [ _tmp [ 0 ] ] . append ( _tmp [ 1 ] )
else :
_nserver [ _tmp [ 0 ] ] = _tmp [ 1 ]
2023-02-11 13:36:48 +01:00
elif line . startswith ( " ds-rdata: " ) :
2023-02-11 22:15:41 +01:00
_ds_rdata . append ( line [ 20 : ] . lower ( ) )
2023-02-11 13:36:48 +01:00
# if nserver list is not empty add the reverse to the domain list
if not _nserver == { } :
2023-02-11 22:15:41 +01:00
domains [ _domain_name ] = {
" nserver " : _nserver , " ds-rdata " : _ds_rdata }
2023-02-11 13:36:48 +01:00
# add entries from main domain, if the nserver doesn't have an ip address (like in inet(6)nums)
for domain in domains :
for nserver in domains [ domain ] [ " nserver " ] :
# if the nserver isn't specified: ...
if domains [ domain ] [ " nserver " ] [ nserver ] == None :
2023-02-11 22:15:41 +01:00
# print(f"INFO: the nserver {nserver} isn't specified in {domain}, looking into the parent domain of it")
2023-02-11 13:36:48 +01:00
for i in range ( len ( nserver . split ( " . " ) ) , 1 , - 1 ) :
# check if the nserver is already in loaded database, starts with more specific
if " . " . join ( nserver . split ( " . " ) [ - i : ] ) in domains :
try :
2023-02-11 22:15:41 +01:00
domains [ domain ] [ " nserver " ] [ nserver ] = domains [ " . " . join (
nserver . split ( " . " ) [ - i : ] ) ] [ " nserver " ] [ nserver ]
2023-02-11 13:36:48 +01:00
except KeyError :
# reaches here if the domain for the nserver specified in the inet{6}num/domain is found, but the nserver itself not.
2023-02-11 22:15:41 +01:00
print (
f " Warn: the nserver { nserver } specified in { domain } wasn ' t found " )
2023-02-11 13:36:48 +01:00
break
return domains
2023-02-11 22:15:41 +01:00
def get_dnskey ( domain_name , nserver ) :
2023-02-11 13:36:48 +01:00
""" query dns server for DNSKEY """
2023-02-11 22:15:41 +01:00
try :
request = dns . message . make_query (
domain_name , dns . rdatatype . DNSKEY , want_dnssec = False )
2023-02-12 02:26:32 +01:00
response = dns . query . udp_with_fallback ( request , nserver , timeout = 2 )
2023-02-11 22:15:41 +01:00
except dns . exception . Timeout :
print ( f " WARN: querying { nserver } for { domain_name } timed out " )
return False
except dns . query . UnexpectedSource as e :
print ( f " ERROR: server replied with different different ip than requested: error: { e } " )
return False
2023-02-12 02:26:32 +01:00
if response [ 0 ] . rcode ( ) != 0 :
2023-02-11 22:15:41 +01:00
# HANDLE QUERY FAILED (SERVER ERROR OR NO DNSKEY RECORD)
print (
2023-02-12 02:26:32 +01:00
f " WARN: query for a DNSKEY on { domain_name } failed on { nserver } , returncode: { response [ 0 ] . rcode ( ) } " )
2023-02-11 22:15:41 +01:00
return False
2023-02-12 02:26:32 +01:00
return [ dnskey . to_text ( ) . split ( " IN DNSKEY " ) [ 1 ] for dnskey in response [ 0 ] . answer ]
2023-02-11 22:15:41 +01:00
# if not nserver:
# # drill: use DNSSEC, fallback to tcp, for $domain
# drill_cmd = subprocess.Popen(
# ["drill", "-D", "-a", "DNSKEY", domain], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# else:
# # drill: use DNSSEC, fallback to tcp, ask $nserver for $domain
# drill_cmd = subprocess.Popen(
# ["drill", "-D", "-a", "DNSKEY", f"@{nserver}", domain], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# # get the output of the drill command
# lines = drill_cmd.communicate()[0].decode().split("\n")
# # filter out comments (;;) and empty lines
# new_lines = []
# for line in lines:
# if line == "" or line.startswith(";;"):
# continue
# new_lines.append(line)
# lines = new_lines
# # split records into domain and key
# dnskeys = [[record.split("\t")[0], record.split("\t")[-1]]
# for record in lines]
# #print(len(dnskeys))
# return dnskeys
2023-02-11 13:36:48 +01:00
# end_step1
# step 2: <start dnskey_to_DS.py based on https://gist.github.com/wido/4c6288b2f5ba6d16fce37dca3fc2cb4a >
"""
Generate a DNSSEC DS record based on the incoming DNSKEY record
The DNSKEY can be found using for example ' dig ' :
$ dig DNSKEY secure . widodh . nl
The output can then be parsed with the following code to generate a DS record
for in the parent DNS zone
Author : Wido den Hollander < wido @widodh.nl >
Many thanks to this blogpost : https : / / www . v13 . gr / blog / ? p = 239
"""
def _calc_keyid ( flags , protocol , algorithm , dnskey ) :
st = struct . pack ( ' !HBB ' , int ( flags ) , int ( protocol ) , int ( algorithm ) )
st + = base64 . b64decode ( dnskey )
cnt = 0
for idx in range ( len ( st ) ) :
s = struct . unpack ( ' B ' , st [ idx : idx + 1 ] ) [ 0 ]
if ( idx % 2 ) == 0 :
cnt + = s << 8
else :
cnt + = s
return ( ( cnt & 0xFFFF ) + ( cnt >> 16 ) ) & 0xFFFF
def _calc_ds ( domain , flags , protocol , algorithm , dnskey ) :
if domain . endswith ( ' . ' ) is False :
domain + = ' . '
signature = bytes ( )
for i in domain . split ( ' . ' ) :
signature + = struct . pack ( ' B ' , len ( i ) ) + i . encode ( )
signature + = struct . pack ( ' !HBB ' , int ( flags ) , int ( protocol ) , int ( algorithm ) )
signature + = base64 . b64decode ( dnskey )
return {
' sha1 ' : hashlib . sha1 ( signature ) . hexdigest ( ) . upper ( ) ,
' sha256 ' : hashlib . sha256 ( signature ) . hexdigest ( ) . upper ( ) ,
}
def dnskey_to_ds ( domain , dnskey ) :
dnskeylist = dnskey . split ( ' ' , 3 )
flags = dnskeylist [ 0 ]
protocol = dnskeylist [ 1 ]
algorithm = dnskeylist [ 2 ]
key = dnskeylist [ 3 ] . replace ( ' ' , ' ' )
keyid = _calc_keyid ( flags , protocol , algorithm , key )
ds = _calc_ds ( domain , flags , protocol , algorithm , key )
ret = list ( )
ret . append ( str ( keyid ) + ' ' + str ( algorithm ) + ' ' + str ( 1 ) + ' '
+ ds [ ' sha1 ' ] . lower ( ) )
ret . append ( str ( keyid ) + ' ' + str ( algorithm ) + ' ' + str ( 2 ) + ' '
+ ds [ ' sha256 ' ] . lower ( ) )
return ret
# if __name__ == "__main__":
# print(len(sys.argv))
# if len(sys.argv) == 1:
# print("no data specified, please enter manually")
# DOMAIN = input("enter domain: ")
# DNSKEY = input("enter DNSKEY: ")
#
# print(dnskey_to_ds(DOMAIN, DNSKEY))
# elif len(sys.argv) in [9,10]:
# # user pasted full dig output
# DOMAIN = sys.argv[1]
# DNSKEY = " ".join(sys.argv[5:])
# print(DOMAIN)
# print(DNSKEY)
# print(dnskey_to_ds(DOMAIN,DNSKEY))
# step2: <end dnskey_to_DS.py>
# step3: start: partially stolen from: https://stackoverflow.com/questions/26137036/programmatically-check-if-domains-are-dnssec-protected
2023-02-11 22:15:41 +01:00
2023-02-11 13:36:48 +01:00
def check_dnssec ( domain_name , domain_data ) :
success = False
no_ds_rdata = domain_data [ " ds-rdata " ] == [ ]
if no_ds_rdata :
2023-02-11 22:15:41 +01:00
print (
2023-02-11 23:25:29 +01:00
f " NOTE: { domain_name } doesn ' t have ds-rdata configured, not checking it " )
2023-02-11 13:36:48 +01:00
for nserver in domain_data [ " nserver " ] :
2023-02-11 22:15:41 +01:00
2023-02-11 13:36:48 +01:00
# if the nserver is not set (i.e. not loaded from other dns file or "wrong" fqdn)
if domain_data [ " nserver " ] [ nserver ] == None :
2023-02-11 22:15:41 +01:00
print (
f " INFO: ip address(es) for nserver ' { nserver } ' in ' { domain_name } ' isn ' t specified/loaded " )
2023-02-11 13:36:48 +01:00
continue
for nsaddr in domain_data [ " nserver " ] [ nserver ] :
# get SOA
2023-02-11 22:15:41 +01:00
request = dns . message . make_query (
domain_name , dns . rdatatype . SOA , want_dnssec = False )
2023-02-11 13:36:48 +01:00
try :
# send the query
2023-02-12 02:26:32 +01:00
dns . query . udp_with_fallback ( request , nsaddr , timeout = 2 )
2023-02-11 13:36:48 +01:00
# if it timed out: tell the user
except dns . exception . Timeout :
2023-02-11 22:15:41 +01:00
print (
f " WARN: querying { nserver } ( { nsaddr } ) for { domain_name } timed out " )
2023-02-11 13:36:48 +01:00
continue
2023-02-11 22:15:41 +01:00
2023-02-11 13:36:48 +01:00
if no_ds_rdata :
2023-02-11 22:15:41 +01:00
print (
f " INFO: query for { domain_name } SOA on { nserver } ( { nsaddr } ) succeded, not checking DNSSEC " )
2023-02-11 13:36:48 +01:00
continue
# get DNSKEY for zone
2023-02-11 22:15:41 +01:00
request = dns . message . make_query (
domain_name , dns . rdatatype . DNSKEY , want_dnssec = True )
2023-02-12 02:26:32 +01:00
response = dns . query . udp_with_fallback ( request , nsaddr , timeout = 2 )
2023-02-11 13:36:48 +01:00
2023-02-12 02:26:32 +01:00
if response [ 0 ] . rcode ( ) != 0 :
2023-02-11 13:36:48 +01:00
# HANDLE QUERY FAILED (SERVER ERROR OR NO DNSKEY RECORD)
2023-02-11 22:15:41 +01:00
print (
2023-02-12 02:26:32 +01:00
f " WARN: query for a DNSKEY on { domain_name } failed on { nserver } ( { nsaddr } ), returncode: { response [ 0 ] . rcode ( ) } " )
2023-02-11 13:36:48 +01:00
continue
# answer should contain two RRSET: DNSKEY and RRSIG(DNSKEY)
2023-02-12 02:26:32 +01:00
answer = response [ 0 ] . answer
2023-02-11 13:36:48 +01:00
if len ( answer ) != 2 :
# SOMETHING WENT WRONG
2023-02-11 22:15:41 +01:00
print (
f " ERROR: query for a DNSKEY on { domain_name } failed on { nserver } ( { nsaddr } ), invalid answer length: { len ( answer ) } " )
2023-02-11 13:36:48 +01:00
continue
# the DNSKEY should be self signed, validate it
name = dns . name . from_text ( domain_name )
try :
2023-02-11 22:15:41 +01:00
# print(f"DEBUG: answer[0]: {answer[0]}")
# print(f"DEBUG: answer[1]: {answer[1]}")
2023-02-11 13:36:48 +01:00
try :
2023-02-11 22:15:41 +01:00
dns . dnssec . validate (
answer [ 0 ] , answer [ 1 ] , { name : answer [ 0 ] } )
2023-02-11 13:36:48 +01:00
# it raises an AttributeError if the records are in the wrong order
except AttributeError as e :
2023-02-11 22:15:41 +01:00
dns . dnssec . validate (
answer [ 1 ] , answer [ 0 ] , { name : answer [ 0 ] } )
2023-02-11 13:36:48 +01:00
except dns . dnssec . ValidationFailure :
# BE SUSPICIOUS
2023-02-11 22:15:41 +01:00
print (
f " WARN: DNSSEC validation failed on { domain_name } failed on { nserver } ( { nsaddr } ), answer: { answer } " )
2023-02-11 13:36:48 +01:00
except AttributeError as e :
print ( f " ERROR: { e } " )
else :
# WE'RE GOOD, THERE'S A VALID DNSSEC SELF-SIGNED KEY FOR example.com
2023-02-11 22:15:41 +01:00
print (
f " INFO: DNSSEC validation succeded on { domain_name } failed on { nserver } ( { nsaddr } ) " )
2023-02-11 13:36:48 +01:00
success = True
2023-02-11 22:15:41 +01:00
2023-02-11 13:36:48 +01:00
return success
# step3: end
def main ( mntner ) :
2023-02-11 22:15:41 +01:00
# get all domains/inet(6)nums of the mntner
2023-02-11 13:36:48 +01:00
domains = get_domain_by_mntner ( mntner = mntner )
for domain_name in domains :
2023-02-11 22:15:41 +01:00
# check if the domain doesn't have DS data
if domains [ domain_name ] [ " ds-rdata " ] == [ ] :
2023-02-11 23:25:29 +01:00
print ( f " NOTE: { domain_name } doesn ' t have any ds-rdata specified " )
2023-02-11 22:15:41 +01:00
continue
for nserver in domains [ domain_name ] [ " nserver " ] :
# check for unset nserver ips -> dont check them
if domains [ domain_name ] [ " nserver " ] [ nserver ] == None :
continue
for ip in domains [ domain_name ] [ " nserver " ] [ nserver ] :
ds_candidates = [ ]
# load DNSKEYs from nserver: if False something failed (timeout)
_keys = get_dnskey ( domain_name , ip )
if _keys == False : continue
# convert all found keys to DS
for key in _keys :
try :
_ds_s = dnskey_to_ds ( domain_name , key )
except binascii . Error as e :
print ( f " ERROR: trying to convert ' { key } ' to DS failed: { e } " )
continue
ds_candidates . extend ( _ds_s )
found = False
# iterate over DS-rdata from the registry and check if they are found on the nserver
for ds in domains [ domain_name ] [ " ds-rdata " ] :
# print(ds)
if ds in ds_candidates :
found = True
# print(f"DEBUG: available: {domains[domain_name]['ds-rdata']}")
# print(f"DEBUG: generated: {ds_candidates}")
if found :
print (
f " INFO: correct ds-rdata specified and matching DNSKEY returned by { ip } for { domain_name } " )
else :
print (
f " ERROR: invalid ds-rdata specified and matching DNSKEY returned by { ip } for { domain_name } " )
# break
# print(check_dnssec(domain_name, domains[domain_name]))
2023-02-11 13:36:48 +01:00
if __name__ == " __main__ " :
if len ( sys . argv ) == 1 :
print ( f " please specify your mntner \n { sys . argv [ 0 ] } YOU-MNT " )
exit ( 1 )
main ( sys . argv [ 1 ] )
# commands to run:
# 1. drill -D <domain>.dn42 @ns1.<domain>.dn42 NS
# 2. dnskey_to_ds("<domain>.dn42"
# #<TTL> IN DNSKEY
# "257 3 13 <base64 ...>")
# 3. write dnskey to "trust-anchor"
# 4.delv @ns1.<domain.dn42 +root=<domain>.dn42 -a ./trust-anchor.tmp SOA <domain>.dn42