199 lines
9.9 KiB
Python
199 lines
9.9 KiB
Python
|
|
||
|
import os
|
||
|
import time
|
||
|
|
||
|
|
||
|
class Singleton(object):
|
||
|
def __new__(cls, *args, **kw):
|
||
|
if not hasattr(cls, '_instance'):
|
||
|
orig = super(Singleton, cls)
|
||
|
cls._instance = orig.__new__(cls, *args, **kw)
|
||
|
return cls._instance
|
||
|
|
||
|
|
||
|
def singleton(cls, *args, **kw):
|
||
|
instances = {}
|
||
|
|
||
|
def _singleton(*args, **kw):
|
||
|
if cls not in instances:
|
||
|
instances[cls] = cls(*args, **kw)
|
||
|
return instances[cls]
|
||
|
return _singleton
|
||
|
|
||
|
|
||
|
@singleton
|
||
|
class Registry(object):
|
||
|
registryPath = None
|
||
|
required_keys = ["mnt-by", "source"]
|
||
|
single_keys = ["source", "descr", "single", "org", "policy", "status", "cidr", "max-length", "netname", "nic-hdl", "status", "abuse-mailbox", "as-block", "as-name", "as-set", "aut-num", "compression", "country", "dir-name", "domain", "fingerpr", "inet6num", "inetnum",
|
||
|
"key-cert", "method", "mntner", "organisation", "org-name", "owners", "owner", "person", "port", "ref", "registry", "role", "route", "route6", "route-set", "schema", "tinc-address", "tinc-file", "tinc-host", "tinc-keyset", "tinc-key"]
|
||
|
multi_keys = ["mnt-by", "remarks", "tech-c", "admin-c", "org", "nserver", "ds-rdata", "member-of", "abuse-mailbox", "abuse-c", "address", "country", "e-mail", "fax-no", "mbrs-by-ref", "members", "mnt-lower", "origin", "phone", "pingable", "www", "zone-c", "auth",
|
||
|
"certif", "contact", "default", "export", "geo-loc", "geoloc", "import", "key", "language", "member", "mnt-ref", "mp-default", "mp-export", "mp-group", "mp-import", "mp-members", "network-owner", "nick", "owner", "pgp-fingerprint", "primary-key", "subnet", "url"]
|
||
|
required_per_type = {"as-block": ["as-block", "policy"], "as-set": ["as-set"], "aut-num": ["aut-num", "as-name"], "dns": ["domain", "nserver"], "inet6num": ["inet6num", "cidr"], "inetnum": ["inetnum", "cidr"], "key-cert": ["key-cert", "method", "owner", "fingerpr", "certif"], "mntner": ["mntner"],
|
||
|
"organisation": ["organisation", "org-name"], "person": ["person", "nic-hdl"], "registry": ["registry", "url"], "role": ["role", "nic-hdl"], "route": ["route", "origin"], "route6": ["route6", "origin"], "route-set": ["route-set"], "schema": ["schema", "ref", "key"], "tinc-key": ["tinc-key", "tinc-host", "tinc-file"], "tinc-keyset": ["tinc-keyset", "member"]}
|
||
|
|
||
|
def __init__(self, registryPath: str = None):
|
||
|
if registryPath != None:
|
||
|
self.registryPath = registryPath
|
||
|
|
||
|
self.index = {}
|
||
|
# cache: to not need to make expensive actions, expired: if something changed in the index, will rerun expensive actions
|
||
|
self._cache = {"mntner-objects": {}, "expired": True}
|
||
|
|
||
|
def _parse_from_content(self, objectType: str, objectFile: str):
|
||
|
previous_key = None
|
||
|
for line in self.index[objectType][objectFile]["_content"]:
|
||
|
# start = " "*20 or just"+" -> continuation of previous key/value
|
||
|
if line.startswith(" ") or line == "+\n":
|
||
|
if previous_key:
|
||
|
if previous_key in self.multi_keys:
|
||
|
self.index[objectType][objectFile][previous_key][-1] += "\n" + \
|
||
|
line[20:].rstrip()
|
||
|
else:
|
||
|
self.index[objectType][objectFile][previous_key] += "\n" + \
|
||
|
line[20:].rstrip()
|
||
|
else:
|
||
|
print(
|
||
|
f"ERROR: empty/invalid first line(s) in {objectType}/{objectFile}")
|
||
|
else:
|
||
|
# if line.startswith("source"):
|
||
|
# print(f"INFO: source found in: {dir}/{objectPath}")
|
||
|
_key = line.split(":")[0]
|
||
|
if _key == "source":
|
||
|
pass
|
||
|
if _key in self.multi_keys:
|
||
|
if not _key in self.index[objectType][objectFile]:
|
||
|
self.index[objectType][objectFile][_key] = [
|
||
|
line[20:].rstrip()]
|
||
|
else:
|
||
|
self.index[objectType][objectFile][_key].append(
|
||
|
line[20:].rstrip())
|
||
|
elif _key in self.single_keys:
|
||
|
if not _key in self.index[objectType][objectFile]:
|
||
|
self.index[objectType][objectFile][_key] = line[20:].rstrip(
|
||
|
)
|
||
|
else:
|
||
|
print(
|
||
|
f"WARN: {objectType}/{objectFile} has multiple {_key}, which is has to be 'single'")
|
||
|
else:
|
||
|
print(
|
||
|
f"WARN: invalid key {_key} found in {objectType}/{objectFile}")
|
||
|
previous_key = _key
|
||
|
|
||
|
for req_key in self.required_keys + self.required_per_type[objectType]:
|
||
|
if not req_key in self.index[objectType][objectFile]:
|
||
|
print(
|
||
|
f"WARN: required key {req_key} not found in {objectType}/{objectFile}")
|
||
|
|
||
|
def _build_index(self, object: tuple = None):
|
||
|
# fail when registryPath isn't initialized yet
|
||
|
assert self.registryPath != None, f"registryPath not yet initialized"
|
||
|
# load everything
|
||
|
if not object:
|
||
|
print("INFO: building full registry index")
|
||
|
start_time = time.time()
|
||
|
# TODO: get list of types from data/schema/* or data/*
|
||
|
for dir in ["as-block", "as-set", "aut-num", "dns", "inet6num", "inetnum", "key-cert", "mntner", "organisation", "person", "registry", "role", "route", "route6", "route-set", "schema", "tinc-key", "tinc-keyset"]:
|
||
|
self.index[dir] = {}
|
||
|
try:
|
||
|
for objectFile in os.listdir(f"{self.registryPath}/data/{dir}/"):
|
||
|
self.index[dir][objectFile] = {}
|
||
|
with open(f"{self.registryPath}/data/{dir}/{objectFile}") as f:
|
||
|
self.index[dir][objectFile]["_content"] = f.readlines()
|
||
|
self._parse_from_content(dir, objectFile)
|
||
|
except FileNotFoundError:
|
||
|
print(f"WARN: directory for {dir} doesn't exist")
|
||
|
self._cache["expired"] = True
|
||
|
print(
|
||
|
f"INFO: building registry index done, took {time.time() - start_time}")
|
||
|
|
||
|
# (re)load one specific object
|
||
|
else:
|
||
|
...
|
||
|
self._cache["expired"] = True
|
||
|
|
||
|
def get_object(self, objectType: str, objectFile: str) -> tuple[bool, dict or str]:
|
||
|
# check if index is not yet initialized
|
||
|
if self.index == {}:
|
||
|
print("INFO: requested get_object, but index not yet created")
|
||
|
self._build_index()
|
||
|
if objectType in self.index:
|
||
|
if objectFile in self.index[objectType]:
|
||
|
return True, self.index[objectType][objectFile]
|
||
|
return False, "not found"
|
||
|
|
||
|
def get_all_by_mntner(self, mntner: str) -> tuple[bool, dict or str]:
|
||
|
|
||
|
def _load_by_mntner(mntner: str) -> dict:
|
||
|
ret = {}
|
||
|
for objectType in self.index:
|
||
|
for objectFile in self.index[objectType]:
|
||
|
if mntner in self.index[objectType][objectFile]["mnt-by"]:
|
||
|
if objectType in ret:
|
||
|
ret[objectType][objectFile] = self.index[objectType][objectFile]
|
||
|
else:
|
||
|
ret[objectType] = {
|
||
|
objectFile: self.index[objectType][objectFile]}
|
||
|
return ret
|
||
|
|
||
|
# check if index is not yet initialized
|
||
|
if self.index == {}:
|
||
|
print("INFO: requested get_object, but index not yet created")
|
||
|
self._build_index()
|
||
|
|
||
|
if mntner in self._cache["mntner-objects"]:
|
||
|
if self._cache["expired"]:
|
||
|
# if the cache is expired: clear cache and reset "expired"
|
||
|
self._cache["mntner-objects"] = {}
|
||
|
self._cache["expired"] = False
|
||
|
|
||
|
ret = _load_by_mntner(mntner)
|
||
|
self._cache["mntner-objects"][mntner] = ret
|
||
|
if ret == {}:
|
||
|
return False, f"no objects found for {mntner}"
|
||
|
return True, ret
|
||
|
|
||
|
else:
|
||
|
return True, self._cache["mntner-objects"][mntner]
|
||
|
else:
|
||
|
ret = _load_by_mntner(mntner)
|
||
|
self._cache["mntner-objects"][mntner] = ret
|
||
|
if ret == {}:
|
||
|
return False, f"no objects found for {mntner}"
|
||
|
return True, ret
|
||
|
|
||
|
def _save_object_to_file(self, objectType: str, objectFile: str):
|
||
|
with open(f"{self.registryPath}/data/{objectType}/{objectFile}", "w") as f:
|
||
|
f.writelines(self.index[objectType][objectFile]["_content"])
|
||
|
|
||
|
def store_object(self, objectType: str, objectFile: str, content: iter):
|
||
|
if objectType in self.index:
|
||
|
if objectFile in self.index[objectType]:
|
||
|
if type(content) == str:
|
||
|
self.index[objectType][objectFile]["_content"] = [
|
||
|
f"{line}\n" for line in content.split("\n")]
|
||
|
elif type(content) in [list, tuple, iter]:
|
||
|
for line in content:
|
||
|
if type(line) != str:
|
||
|
raise ValueError(
|
||
|
f"content is {type(content)} instead of str or list of str")
|
||
|
self.index[objectType][objectFile]["_content"] = content
|
||
|
else:
|
||
|
raise ValueError(
|
||
|
f"content is {type(content)} instead of str or list of str")
|
||
|
else:
|
||
|
raise KeyError(
|
||
|
f"type {objectType}/{objectFile} doesn't exist in index")
|
||
|
else:
|
||
|
raise KeyError(f"type {objectType} doesn't exist in index")
|
||
|
|
||
|
self._save_object_to_file(objectType, objectFile)
|
||
|
self._cache["expired"] = True
|
||
|
self._cache["mntner-objects"] = {}
|
||
|
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
reg = Registry("dn42-registry")
|
||
|
reg._build_index()
|
||
|
print(reg.get_all_by_mntner("LARE-MNT"))
|