From 1dc6efe56009bf1db87cfe9d6672387e671cc4c5 Mon Sep 17 00:00:00 2001
From: lare <lare@lare.cc>
Date: Sun, 10 Mar 2024 21:35:34 +0100
Subject: [PATCH] initial

---
 __init__.py |   2 +
 __main__.py | 198 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 200 insertions(+)
 create mode 100644 __init__.py
 create mode 100644 __main__.py

diff --git a/__init__.py b/__init__.py
new file mode 100644
index 0000000..665869b
--- /dev/null
+++ b/__init__.py
@@ -0,0 +1,2 @@
+"""module for accessing (dn42)-registry data"""
+from .__main__ import *
\ No newline at end of file
diff --git a/__main__.py b/__main__.py
new file mode 100644
index 0000000..a92e3c7
--- /dev/null
+++ b/__main__.py
@@ -0,0 +1,198 @@
+
+import os
+import time
+
+
+class Singleton(object):
+    def __new__(cls, *args, **kw):
+        if not hasattr(cls, '_instance'):
+            orig = super(Singleton, cls)
+            cls._instance = orig.__new__(cls, *args, **kw)
+        return cls._instance
+
+
+def singleton(cls, *args, **kw):
+    instances = {}
+
+    def _singleton(*args, **kw):
+        if cls not in instances:
+            instances[cls] = cls(*args, **kw)
+        return instances[cls]
+    return _singleton
+
+
+@singleton
+class Registry(object):
+    registryPath = None
+    required_keys = ["mnt-by", "source"]
+    single_keys = ["source", "descr", "single", "org", "policy", "status", "cidr", "max-length", "netname", "nic-hdl", "status", "abuse-mailbox", "as-block", "as-name", "as-set", "aut-num", "compression", "country", "dir-name", "domain", "fingerpr", "inet6num", "inetnum",
+                   "key-cert", "method", "mntner", "organisation", "org-name", "owners", "owner", "person", "port", "ref", "registry", "role", "route", "route6", "route-set", "schema", "tinc-address", "tinc-file", "tinc-host", "tinc-keyset", "tinc-key"]
+    multi_keys = ["mnt-by", "remarks", "tech-c", "admin-c", "org", "nserver", "ds-rdata", "member-of", "abuse-mailbox", "abuse-c", "address", "country", "e-mail", "fax-no", "mbrs-by-ref", "members", "mnt-lower", "origin", "phone", "pingable", "www", "zone-c", "auth",
+                  "certif", "contact", "default", "export", "geo-loc", "geoloc", "import", "key", "language", "member", "mnt-ref", "mp-default", "mp-export", "mp-group", "mp-import", "mp-members", "network-owner", "nick", "owner", "pgp-fingerprint", "primary-key", "subnet", "url"]
+    required_per_type = {"as-block": ["as-block", "policy"], "as-set": ["as-set"], "aut-num": ["aut-num", "as-name"], "dns": ["domain", "nserver"], "inet6num": ["inet6num", "cidr"], "inetnum": ["inetnum", "cidr"], "key-cert": ["key-cert", "method", "owner", "fingerpr", "certif"], "mntner": ["mntner"],
+                         "organisation": ["organisation", "org-name"], "person": ["person", "nic-hdl"], "registry": ["registry", "url"], "role": ["role", "nic-hdl"], "route": ["route", "origin"], "route6": ["route6", "origin"], "route-set": ["route-set"], "schema": ["schema", "ref", "key"], "tinc-key": ["tinc-key", "tinc-host", "tinc-file"], "tinc-keyset": ["tinc-keyset", "member"]}
+
+    def __init__(self, registryPath: str = None):
+        if registryPath != None:
+            self.registryPath = registryPath
+
+        self.index = {}
+        # cache: to not need to make expensive actions, expired: if something changed in the index, will rerun expensive actions
+        self._cache = {"mntner-objects": {}, "expired": True}
+
+    def _parse_from_content(self, objectType: str, objectFile: str):
+        previous_key = None
+        for line in self.index[objectType][objectFile]["_content"]:
+            # start = " "*20 or just"+" -> continuation of previous key/value
+            if line.startswith("                    ") or line == "+\n":
+                if previous_key:
+                    if previous_key in self.multi_keys:
+                        self.index[objectType][objectFile][previous_key][-1] += "\n" + \
+                            line[20:].rstrip()
+                    else:
+                        self.index[objectType][objectFile][previous_key] += "\n" + \
+                            line[20:].rstrip()
+                else:
+                    print(
+                        f"ERROR: empty/invalid first line(s) in {objectType}/{objectFile}")
+            else:
+                # if line.startswith("source"):
+                #    print(f"INFO: source found in: {dir}/{objectPath}")
+                _key = line.split(":")[0]
+                if _key == "source":
+                    pass
+                if _key in self.multi_keys:
+                    if not _key in self.index[objectType][objectFile]:
+                        self.index[objectType][objectFile][_key] = [
+                            line[20:].rstrip()]
+                    else:
+                        self.index[objectType][objectFile][_key].append(
+                            line[20:].rstrip())
+                elif _key in self.single_keys:
+                    if not _key in self.index[objectType][objectFile]:
+                        self.index[objectType][objectFile][_key] = line[20:].rstrip(
+                        )
+                    else:
+                        print(
+                            f"WARN: {objectType}/{objectFile} has multiple {_key}, which is has to be 'single'")
+                else:
+                    print(
+                        f"WARN: invalid key {_key} found in {objectType}/{objectFile}")
+                previous_key = _key
+
+        for req_key in self.required_keys + self.required_per_type[objectType]:
+            if not req_key in self.index[objectType][objectFile]:
+                print(
+                    f"WARN: required key {req_key} not found in {objectType}/{objectFile}")
+
+    def _build_index(self, object: tuple = None):
+        # fail when registryPath isn't initialized yet
+        assert self.registryPath != None, f"registryPath not yet initialized"
+        # load everything
+        if not object:
+            print("INFO: building full registry index")
+            start_time = time.time()
+            # TODO: get list of types from data/schema/* or data/*
+            for dir in ["as-block", "as-set", "aut-num", "dns", "inet6num", "inetnum", "key-cert", "mntner", "organisation", "person", "registry", "role", "route", "route6", "route-set", "schema", "tinc-key", "tinc-keyset"]:
+                self.index[dir] = {}
+                try:
+                    for objectFile in os.listdir(f"{self.registryPath}/data/{dir}/"):
+                        self.index[dir][objectFile] = {}
+                        with open(f"{self.registryPath}/data/{dir}/{objectFile}") as f:
+                            self.index[dir][objectFile]["_content"] = f.readlines()
+                        self._parse_from_content(dir, objectFile)
+                except FileNotFoundError:
+                    print(f"WARN: directory for {dir} doesn't exist")
+            self._cache["expired"] = True
+            print(
+                f"INFO: building registry index done, took {time.time() - start_time}")
+
+        # (re)load one specific object
+        else:
+            ...
+            self._cache["expired"] = True
+
+    def get_object(self, objectType: str, objectFile: str) -> tuple[bool, dict or str]:
+        # check if index is not yet initialized
+        if self.index == {}:
+            print("INFO: requested get_object, but index not yet created")
+            self._build_index()
+        if objectType in self.index:
+            if objectFile in self.index[objectType]:
+                return True, self.index[objectType][objectFile]
+        return False, "not found"
+
+    def get_all_by_mntner(self, mntner: str) -> tuple[bool, dict or str]:
+
+        def _load_by_mntner(mntner: str) -> dict:
+            ret = {}
+            for objectType in self.index:
+                for objectFile in self.index[objectType]:
+                    if mntner in self.index[objectType][objectFile]["mnt-by"]:
+                        if objectType in ret:
+                            ret[objectType][objectFile] = self.index[objectType][objectFile]
+                        else:
+                            ret[objectType] = {
+                                objectFile: self.index[objectType][objectFile]}
+            return ret
+
+        # check if index is not yet initialized
+        if self.index == {}:
+            print("INFO: requested get_object, but index not yet created")
+            self._build_index()
+
+        if mntner in self._cache["mntner-objects"]:
+            if self._cache["expired"]:
+                # if the cache is expired: clear cache and reset "expired"
+                self._cache["mntner-objects"] = {}
+                self._cache["expired"] = False
+
+                ret = _load_by_mntner(mntner)
+                self._cache["mntner-objects"][mntner] = ret
+                if ret == {}:
+                    return False, f"no objects found for {mntner}"
+                return True, ret
+
+            else:
+                return True, self._cache["mntner-objects"][mntner]
+        else:
+            ret = _load_by_mntner(mntner)
+            self._cache["mntner-objects"][mntner] = ret
+            if ret == {}:
+                return False, f"no objects found for {mntner}"
+            return True, ret
+
+    def _save_object_to_file(self, objectType: str, objectFile: str):
+        with open(f"{self.registryPath}/data/{objectType}/{objectFile}", "w") as f:
+            f.writelines(self.index[objectType][objectFile]["_content"])
+
+    def store_object(self, objectType: str, objectFile: str, content: iter):
+        if objectType in self.index:
+            if objectFile in self.index[objectType]:
+                if type(content) == str:
+                    self.index[objectType][objectFile]["_content"] = [
+                        f"{line}\n" for line in content.split("\n")]
+                elif type(content) in [list, tuple, iter]:
+                    for line in content:
+                        if type(line) != str:
+                            raise ValueError(
+                                f"content is {type(content)} instead of str or list of str")
+                    self.index[objectType][objectFile]["_content"] = content
+                else:
+                    raise ValueError(
+                        f"content is {type(content)} instead of str or list of str")
+            else:
+                raise KeyError(
+                    f"type {objectType}/{objectFile} doesn't exist in index")
+        else:
+            raise KeyError(f"type {objectType} doesn't exist in index")
+
+        self._save_object_to_file(objectType, objectFile)
+        self._cache["expired"] = True
+        self._cache["mntner-objects"] = {}
+
+
+if __name__ == "__main__":
+    reg = Registry("dn42-registry")
+    reg._build_index()
+    print(reg.get_all_by_mntner("LARE-MNT"))