commit 1dc6efe56009bf1db87cfe9d6672387e671cc4c5 Author: lare Date: Sun Mar 10 21:35:34 2024 +0100 initial diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..665869b --- /dev/null +++ b/__init__.py @@ -0,0 +1,2 @@ +"""module for accessing (dn42)-registry data""" +from .__main__ import * \ No newline at end of file diff --git a/__main__.py b/__main__.py new file mode 100644 index 0000000..a92e3c7 --- /dev/null +++ b/__main__.py @@ -0,0 +1,198 @@ + +import os +import time + + +class Singleton(object): + def __new__(cls, *args, **kw): + if not hasattr(cls, '_instance'): + orig = super(Singleton, cls) + cls._instance = orig.__new__(cls, *args, **kw) + return cls._instance + + +def singleton(cls, *args, **kw): + instances = {} + + def _singleton(*args, **kw): + if cls not in instances: + instances[cls] = cls(*args, **kw) + return instances[cls] + return _singleton + + +@singleton +class Registry(object): + registryPath = None + required_keys = ["mnt-by", "source"] + single_keys = ["source", "descr", "single", "org", "policy", "status", "cidr", "max-length", "netname", "nic-hdl", "status", "abuse-mailbox", "as-block", "as-name", "as-set", "aut-num", "compression", "country", "dir-name", "domain", "fingerpr", "inet6num", "inetnum", + "key-cert", "method", "mntner", "organisation", "org-name", "owners", "owner", "person", "port", "ref", "registry", "role", "route", "route6", "route-set", "schema", "tinc-address", "tinc-file", "tinc-host", "tinc-keyset", "tinc-key"] + multi_keys = ["mnt-by", "remarks", "tech-c", "admin-c", "org", "nserver", "ds-rdata", "member-of", "abuse-mailbox", "abuse-c", "address", "country", "e-mail", "fax-no", "mbrs-by-ref", "members", "mnt-lower", "origin", "phone", "pingable", "www", "zone-c", "auth", + "certif", "contact", "default", "export", "geo-loc", "geoloc", "import", "key", "language", "member", "mnt-ref", "mp-default", "mp-export", "mp-group", "mp-import", "mp-members", "network-owner", "nick", "owner", "pgp-fingerprint", "primary-key", "subnet", "url"] + required_per_type = {"as-block": ["as-block", "policy"], "as-set": ["as-set"], "aut-num": ["aut-num", "as-name"], "dns": ["domain", "nserver"], "inet6num": ["inet6num", "cidr"], "inetnum": ["inetnum", "cidr"], "key-cert": ["key-cert", "method", "owner", "fingerpr", "certif"], "mntner": ["mntner"], + "organisation": ["organisation", "org-name"], "person": ["person", "nic-hdl"], "registry": ["registry", "url"], "role": ["role", "nic-hdl"], "route": ["route", "origin"], "route6": ["route6", "origin"], "route-set": ["route-set"], "schema": ["schema", "ref", "key"], "tinc-key": ["tinc-key", "tinc-host", "tinc-file"], "tinc-keyset": ["tinc-keyset", "member"]} + + def __init__(self, registryPath: str = None): + if registryPath != None: + self.registryPath = registryPath + + self.index = {} + # cache: to not need to make expensive actions, expired: if something changed in the index, will rerun expensive actions + self._cache = {"mntner-objects": {}, "expired": True} + + def _parse_from_content(self, objectType: str, objectFile: str): + previous_key = None + for line in self.index[objectType][objectFile]["_content"]: + # start = " "*20 or just"+" -> continuation of previous key/value + if line.startswith(" ") or line == "+\n": + if previous_key: + if previous_key in self.multi_keys: + self.index[objectType][objectFile][previous_key][-1] += "\n" + \ + line[20:].rstrip() + else: + self.index[objectType][objectFile][previous_key] += "\n" + \ + line[20:].rstrip() + else: + print( + f"ERROR: empty/invalid first line(s) in {objectType}/{objectFile}") + else: + # if line.startswith("source"): + # print(f"INFO: source found in: {dir}/{objectPath}") + _key = line.split(":")[0] + if _key == "source": + pass + if _key in self.multi_keys: + if not _key in self.index[objectType][objectFile]: + self.index[objectType][objectFile][_key] = [ + line[20:].rstrip()] + else: + self.index[objectType][objectFile][_key].append( + line[20:].rstrip()) + elif _key in self.single_keys: + if not _key in self.index[objectType][objectFile]: + self.index[objectType][objectFile][_key] = line[20:].rstrip( + ) + else: + print( + f"WARN: {objectType}/{objectFile} has multiple {_key}, which is has to be 'single'") + else: + print( + f"WARN: invalid key {_key} found in {objectType}/{objectFile}") + previous_key = _key + + for req_key in self.required_keys + self.required_per_type[objectType]: + if not req_key in self.index[objectType][objectFile]: + print( + f"WARN: required key {req_key} not found in {objectType}/{objectFile}") + + def _build_index(self, object: tuple = None): + # fail when registryPath isn't initialized yet + assert self.registryPath != None, f"registryPath not yet initialized" + # load everything + if not object: + print("INFO: building full registry index") + start_time = time.time() + # TODO: get list of types from data/schema/* or data/* + for dir in ["as-block", "as-set", "aut-num", "dns", "inet6num", "inetnum", "key-cert", "mntner", "organisation", "person", "registry", "role", "route", "route6", "route-set", "schema", "tinc-key", "tinc-keyset"]: + self.index[dir] = {} + try: + for objectFile in os.listdir(f"{self.registryPath}/data/{dir}/"): + self.index[dir][objectFile] = {} + with open(f"{self.registryPath}/data/{dir}/{objectFile}") as f: + self.index[dir][objectFile]["_content"] = f.readlines() + self._parse_from_content(dir, objectFile) + except FileNotFoundError: + print(f"WARN: directory for {dir} doesn't exist") + self._cache["expired"] = True + print( + f"INFO: building registry index done, took {time.time() - start_time}") + + # (re)load one specific object + else: + ... + self._cache["expired"] = True + + def get_object(self, objectType: str, objectFile: str) -> tuple[bool, dict or str]: + # check if index is not yet initialized + if self.index == {}: + print("INFO: requested get_object, but index not yet created") + self._build_index() + if objectType in self.index: + if objectFile in self.index[objectType]: + return True, self.index[objectType][objectFile] + return False, "not found" + + def get_all_by_mntner(self, mntner: str) -> tuple[bool, dict or str]: + + def _load_by_mntner(mntner: str) -> dict: + ret = {} + for objectType in self.index: + for objectFile in self.index[objectType]: + if mntner in self.index[objectType][objectFile]["mnt-by"]: + if objectType in ret: + ret[objectType][objectFile] = self.index[objectType][objectFile] + else: + ret[objectType] = { + objectFile: self.index[objectType][objectFile]} + return ret + + # check if index is not yet initialized + if self.index == {}: + print("INFO: requested get_object, but index not yet created") + self._build_index() + + if mntner in self._cache["mntner-objects"]: + if self._cache["expired"]: + # if the cache is expired: clear cache and reset "expired" + self._cache["mntner-objects"] = {} + self._cache["expired"] = False + + ret = _load_by_mntner(mntner) + self._cache["mntner-objects"][mntner] = ret + if ret == {}: + return False, f"no objects found for {mntner}" + return True, ret + + else: + return True, self._cache["mntner-objects"][mntner] + else: + ret = _load_by_mntner(mntner) + self._cache["mntner-objects"][mntner] = ret + if ret == {}: + return False, f"no objects found for {mntner}" + return True, ret + + def _save_object_to_file(self, objectType: str, objectFile: str): + with open(f"{self.registryPath}/data/{objectType}/{objectFile}", "w") as f: + f.writelines(self.index[objectType][objectFile]["_content"]) + + def store_object(self, objectType: str, objectFile: str, content: iter): + if objectType in self.index: + if objectFile in self.index[objectType]: + if type(content) == str: + self.index[objectType][objectFile]["_content"] = [ + f"{line}\n" for line in content.split("\n")] + elif type(content) in [list, tuple, iter]: + for line in content: + if type(line) != str: + raise ValueError( + f"content is {type(content)} instead of str or list of str") + self.index[objectType][objectFile]["_content"] = content + else: + raise ValueError( + f"content is {type(content)} instead of str or list of str") + else: + raise KeyError( + f"type {objectType}/{objectFile} doesn't exist in index") + else: + raise KeyError(f"type {objectType} doesn't exist in index") + + self._save_object_to_file(objectType, objectFile) + self._cache["expired"] = True + self._cache["mntner-objects"] = {} + + +if __name__ == "__main__": + reg = Registry("dn42-registry") + reg._build_index() + print(reg.get_all_by_mntner("LARE-MNT"))