add (for our cases) sufficient dns zone generation

(it doesn't add anything except NS, glue, DS and CNAME(for reverse ipv4))
This commit is contained in:
lare 2025-03-09 02:04:37 +01:00
parent c9e595ab99
commit 40970932d6

View file

@ -1,7 +1,9 @@
import os
import time
import sys
from ipaddress import ip_address, ip_network, IPv4Network, IPv6Network
class Singleton(object):
def __new__(cls, *args, **kw):
@ -195,6 +197,149 @@ class Registry(object):
self._cache["expired"] = True
self._cache["mntner-objects"] = {}
def _build_records(self, parent_zone:str, record_name:str, TTL:int, nservers:[str], ds_rdata:[str] = []) -> [str]:
records = []
servers = dict()
for nserver in nservers:
server = nserver.split("\t", 1) if "\t" in nserver else nserver.split(" ", 1)
if server[0] not in servers:
servers[server[0]] = []
if len(server) == 2:
servers[server[0]].append(server[1])
for server in servers:
records.append(f"{record_name}. {TTL} IN NS {server}.")
if not server.endswith(parent_zone):
# nserver outside of your zone (also shouldn't have ip addresses, but who knows
continue
elif not server.endswith(record_name):
# nserver address is not in this zone, won't add A/AAAA records for it
continue
for ip in servers[server]:
# if there is a ip specified for this nserver
try:
# try parsing the ip to an ip_address
# (it has to be stripped, because sometimes there are multiple whitespace between nserver hostname and ip)
address = ip_address(ip.strip())
if address.version == 6:
#records.append(f"{server}. {TTL} IN AAAA {address.compressed}")
records.append(f"{server}. {TTL} IN AAAA {ip.strip()}") # the java implementation of the dn42 master just copies the (strriped) ip ...
elif address.version == 4:
records.append(f"{server}. {TTL} IN A {address}")
else:
print(f"WARN: unknown ip version of '{ip}' for {server}")
except ValueError:
print(f"WARN: '{ip}' for {server} isn't a a valid ip address")
for ds in ds_rdata:
records.append(f"{record_name}. {TTL} IN DS {ds}")
return records
def _generate_forward_zone(self, zone:str, TTL:int) -> [str]:
records = []
zone = zone[:-1] if zone.endswith(".") else zone
for domain in self.index["dns"]:
if not domain.endswith(zone):
# ignore none $zone domains
continue
domain_data = self.index["dns"][domain]
records += self._build_records(zone, domain, TTL, domain_data["nserver"], domain_data["ds-rdata"] if "ds-rdata" in domain_data else [])
return records
def _generate_reverseV6_zone(self, zone:str, TTL:int) -> [str]:
records = []
zone = zone[:-1] if zone.endswith(".") else zone
for objectFile in self.index["inet6num"]:
net = IPv6Network(objectFile.replace("_", "/"))
# generate domain from the network
domain = ".".join(net.exploded.split("/")[0].replace(":", "")[(net.prefixlen//4)-1::-1]) + ".ip6.arpa"
if not domain.endswith(zone):
# ignore none $zone domains
continue
domain_data = self.index["inet6num"][objectFile]
# ignore inet6nums without nservers
if not "nserver" in domain_data:
continue
records += self._build_records(zone, domain, TTL, domain_data["nserver"], domain_data["ds-rdata"] if "ds-rdata" in domain_data else [])
return records
def _generate_reverseV4_zone(self, zone:str, TTL:int) -> [str]:
records = []
zone = zone[:-1] if zone.endswith(".") else zone
for objectFile in self.index["inetnum"]:
net = IPv4Network(objectFile.replace("_", "/"))
if net.prefixlen > 24:
domain = net.reverse_pointer
if not domain.endswith(zone):
# ignore none $zone domains
continue
domain_data = self.index["inetnum"][objectFile]
# ignore inetnums without nservers
if not "nserver" in domain_data:
continue
records += self._build_records(zone, domain, TTL, domain_data["nserver"], domain_data["ds-rdata"] if "ds-rdata" in domain_data else [])
# generate the CNAMEs for the single ips (because we don't have a full /24)
if net.prefixlen == 32:
records.append(f"{net.network_address.reverse_pointer}. {TTL} IN CNAME {net.network_address.reverse_pointer.split('.',1)[0]}.{domain}.")
else:
records += [f"{host.reverse_pointer}. {TTL} IN CNAME {host.reverse_pointer.split('.',1)[0]}.{domain}." for host in [net.network_address, *net.hosts(), net.broadcast_address]]
elif net.prefixlen % 8 == 0:
# this is a /8, /16, or /24 (/32s are handled above)
net = IPv4Network(objectFile.replace("_", "/"))
domain = ".".join(net.reverse_pointer.split(".")[(4-net.prefixlen//8):])
if not domain.endswith(zone):
# ignore none $zone domains
continue
domain_data = self.index["inetnum"][objectFile]
# ignore inetnums without nservers
if not "nserver" in domain_data:
continue
records += self._build_records(zone, domain, TTL, domain_data["nserver"], domain_data["ds-rdata"] if "ds-rdata" in domain_data else [])
else:
# we now only have larger than /24 (but not "whole" subnets) remaining => multiple /24 zones
net = IPv4Network(objectFile.replace("_", "/"))
domain = ".".join(net.reverse_pointer.split(".")[(3-net.prefixlen//8):])
if not domain.endswith(zone):
# ignore none $zone domains
continue
domain_data = self.index["inetnum"][objectFile]
# ignore inetnums without nservers
if not "nserver" in domain_data:
continue
for subnet in net.subnets(8-(net.prefixlen % 8)):
domain = ".".join(subnet.reverse_pointer.split(".")[(3-net.prefixlen//8):])
records += self._build_records(zone, domain, TTL, domain_data["nserver"], domain_data["ds-rdata"] if "ds-rdata" in domain_data else [])
return records
def generate_dns_zone(self, zone:str, TTL:int=900) -> [str]:
# check if index is not yet initialized
if self.index == {}:
print("INFO: requested generate_dns_zone, but index not yet created")
self._build_index()
if zone.endswith("ip6.arpa."):
# ipv6 reverse zone -> inet6num
return self._generate_reverseV6_zone(zone, TTL)
elif zone.endswith("in-addr.arpa."):
# ipv4 reverse zone -> inetnum
return self._generate_reverseV4_zone(zone, TTL)
else:
# other zone -> dns
return self._generate_forward_zone(zone, TTL)
if __name__ == "__main__":
reg = Registry("dn42-registry")