add display of existing peerings+check if supplied ips are in allowed ip ranges

This commit is contained in:
lare 2022-11-27 00:00:01 +01:00
parent d48b754da6
commit 14bb23690e
5 changed files with 138 additions and 20 deletions

View file

@ -52,24 +52,22 @@ class Config (dict):
class PeeringManager:
def __init__(self, peering_dir):
self._peering_dir = peering_dir
def __init__(self, peerings_file):
self._peering_file = peerings_file
self._load_peerings()
def _load_peerings(self):
if not os.path.exists(self._peering_dir):
os.mkdir(self._peering_dir)
if not os.path.exists(f"{self._peering_dir}/peerings.json"):
with open(f"{self._peering_dir}/peerings.json", "x") as p:
if not os.path.exists(self._peering_file):
with open(self._peering_file, "x") as p:
json.dump({"mnter":{},"asn":{}}, p)
try:
with open(f"{self._peering_dir}/peerings.json","r") as p:
with open(self._peering_file,"r") as p:
self.peerings = json.load(p)
except json.decoder.JSONDecodeError:
with open(f"{self._peering_dir}/peerings.json", "w") as p:
with open(self._peering_file, "w") as p:
json.dump({"mnter":{},"asn":{}}, p)
with open(f"{self._peering_dir}/peerings.json","r") as p:
with open(self._peering_file,"r") as p:
self.peerings = json.load(p)
# self.peerings = {}
@ -86,7 +84,7 @@ class PeeringManager:
# with open(f"{self._peering_dir}/peerings.json","w") as p:
# json.dump(self._peerings, p, indent=4)
def _save_peerings(self):
with open(f"{self._peering_dir}/peerings.json", "w") as p:
with open(self._peering_file, "w") as p:
json.dump(self.peerings, p, indent=4)
def get_peerings_by_mnt(self, mnt):
@ -125,7 +123,7 @@ class PeeringManager:
config = Config()
peerings = PeeringManager(config["peering-dir"])
peerings = PeeringManager(config["peerings"])
def auth_required():
def wrapper(f):
@wraps(f)
@ -186,15 +184,15 @@ def login():
return render_template("login.html", session=session,config=config,return_addr=session["return_url"], msg=msg)
mnt = request.form["mnt"]
asn = request.form["asn"]
asn = asn[2:] if asn[:1].lower() == "as" else asn
asn = asn[2:] if asn[:2].lower() == "as" else asn
if "allowed4" in request.form:
allowed4 = request.form["allowed4"]
allowed4 = allowed4.split(",") if "," in allowed4 else allowed4
# allowed4 = allowed4.split(",") if "," in allowed4 else allowed4
else:
allowed4 = None
if "allowed6" in request.form:
allowed6 = request.form["allowed6"]
allowed6 = allowed6.split(",") if "," in allowed6 else allowed6
# allowed6 = allowed6.split(",") if "," in allowed6 else allowed6
else:
allowed6 = None
session["user-data"] = {'asn':asn,'allowed4': allowed4, 'allowed6': allowed6,'mnt':mnt, 'authtype': "debug"}
@ -213,6 +211,19 @@ def login():
def peerings_delete():
return f"{request.method} /peerings/delete?{str(request.args)}{str(request.form)}"
@app.route("/peerings/edit", methods=["GET","POST"])
@auth_required()
def peerings_edit():
print(session)
if request.method == "GET":
if "node" in request.args and request.args["node"] in config["nodes"]:
return render_template("peerings-new.html", config=config, selected_node=request.args["node"], peerings=peerings)
else:
return render_template("peerings-new.html", session=session,config=config, peerings=peerings)
elif request.method == "POST":
return f"{request.method} /peerings/edit?{str(request.args)}{str(request.form)}"
@app.route("/peerings/new", methods=["GET","POST"])
@auth_required()
def peerings_new():
@ -301,16 +312,31 @@ def peerings_new():
if new_peering["peer-v4"]:
ipv4 = ip_address(new_peering["peer-v4"])
if not ipv4.version == 4: raise ValueError()
if ipv4.is_private:
if ipv4.is_link_local:
pass
elif ipv4.is_private:
if not (ipv4.compressed.startswith("172.2") or ipv4.compressed.startswith("10.")):
raise ValueError()
elif ipv4.is_link_local:
pass
is_in_allowed = False
if session["user-data"]["allowed4"]:
for allowed4 in session["user-data"]["allowed4"].split(","):
if ipv4 in ip_network(allowed4):
is_in_allowed = True
if not is_in_allowed:
return render_template("peerings-new.html", session=session,config=config, peerings=peerings, msg="supplied ipv4 addr not in allowed ip range"), 400
else: raise ValueError()
if new_peering["peer-v6"]:
ipv6 = ip_address(new_peering["peer-v6"])
if not ipv6.version == 6: raise ValueError()
if not ipv6.is_private: raise ValueError()
if ipv6.is_link_local: raise ValueError()
is_in_allowed = False
if session["user-data"]["allowed6"]:
for allowed6 in session["user-data"]["allowed6"].split(","):
if ipv6 in ip_network(allowed6):
is_in_allowed = True
if not is_in_allowed:
return render_template("peerings-new.html", session=session,config=config, peerings=peerings, msg="supplied ipv6 addr not in allowed ip range"), 400
except ValueError:
return render_template("peerings-new.html", session=session,config=config, peerings=peerings, msg="invalid ip address(es) supplied"), 400