[web] allow admin to view/create/edit peerings of others

This commit is contained in:
lare 2024-09-01 01:02:11 +02:00
parent 41e2290365
commit 85b87e7ece
4 changed files with 54 additions and 33 deletions

View file

@ -1,14 +1,19 @@
#! /usr/bin/env python3
# flask
from flask import Flask, Response, redirect, render_template, request, session, abort
import werkzeug.exceptions as werkzeug_exceptions
# other utils
import json
import os
import base64
# potential errors on base64 decode: binascii.Error
import binascii
import logging
import random
from functools import wraps
from ipaddress import ip_address, ip_network, IPv4Network, IPv6Network
# self written modules
import kioubit_verify
from peering_manager import PeeringManager
@ -49,21 +54,21 @@ class Config (dict):
except json.decoder.JSONDecodeError:
raise SyntaxError(f"no valid JSON found in '{cf.name}'")
if not "flask-template-dir" in self._config:
if "flask-template-dir" not in self._config:
self._config["flask-template-dir"] = "../frontend"
if not "debug-mode" in self._config:
if "debug-mode" not in self._config:
self._config["debug-mode"] = False
if not "base-dir" in self._config:
if "base-dir" not in self._config:
self._config["base-dir"] = "/"
if not "peerings-data" in self._config:
if "peerings-data" not in self._config:
self._config["peering-data"] = "./peerings"
if not "nodes" in self._config:
if "nodes" not in self._config:
self,_config = {}
for node in self._config["nodes"]:
if not "capacity" in self._config["nodes"][node]:
if "capacity" not in self._config["nodes"][node]:
self._config["nodes"][node]["capacity"] = -1
logging.info(self._config)
@ -81,7 +86,8 @@ def check_peering_data(form):
# check if all required (and enabled) options are specified
try:
new_peering["peer-asn"] = session["user-data"]["asn"]
# force "peer-asn" to be the same as in the login data except if login is admin (admin get the peer-asn field enabled in the form)
new_peering["peer-asn"] = session["user-data"]["asn"] if "peer-asn" not in request.form or session["login"] != config["MNT"] else form["peer-asn"]
new_peering["peer-wgkey"] = form["peer-wgkey"]
if "peer-endpoint-enabled" in form and form["peer-endpoint-enabled"] == "on":
new_peering["peer-endpoint"] = form["peer-endpoint"]
@ -125,7 +131,7 @@ def check_peering_data(form):
wg_key_invalid = True
try:
base64.b64decode(new_peering["peer-wgkey"])
except:
except binascii.Error:
wg_key_invalid = True
if wg_key_invalid:
return False, "invalid wireguard Key"
@ -134,7 +140,7 @@ def check_peering_data(form):
if new_peering["peer-endpoint"]:
if not new_peering["peer-endpoint"].split(":")[-1].isnumeric():
return False, "no port number in endpoint"
elif len(new_peering["peer-endpoint"].split(":")) < 2 and not "." in new_peering["peer-endpoint"]:
elif len(new_peering["peer-endpoint"].split(":")) < 2 and "." not in new_peering["peer-endpoint"]:
return False, "endpoint doesn't look like a ip address or fqdn"
# check if at least one ip is specified/enabled
@ -148,13 +154,13 @@ def check_peering_data(form):
try:
if new_peering["peer-v6ll"]:
ipv6ll = ip_address(new_peering["peer-v6ll"])
if not ipv6ll.version == 6:
if ipv6ll.version != 6:
raise ValueError()
if not ipv6ll.is_link_local:
raise ValueError()
if new_peering["peer-v4"]:
ipv4 = ip_address(new_peering["peer-v4"])
if not ipv4.version == 4:
if ipv4.version != 4:
raise ValueError()
if ipv4.is_link_local:
pass
@ -177,7 +183,7 @@ def check_peering_data(form):
raise ValueError()
if new_peering["peer-v6"]:
ipv6 = ip_address(new_peering["peer-v6"])
if not ipv6.version == 6:
if ipv6.version != 6:
raise ValueError()
if not ipv6.is_private:
raise ValueError()
@ -201,9 +207,9 @@ def check_peering_data(form):
# check bgp options
try:
if new_peering["bgp-mp"] == False and new_peering["bgp-enh"] == True:
if new_peering["bgp-mp"] is False and new_peering["bgp-enh"] is True:
return False, "extended next hop requires multiprotocol bgp"
if new_peering["bgp-mp"] == False:
if new_peering["bgp-mp"] is False:
if not (new_peering["peer-v4"] and (new_peering["peer-v6"] or new_peering["peer-v6ll"])):
return False, "ipv4 and ipv6 addresses required when not having MP-BGP"
except ValueError:
@ -215,7 +221,7 @@ def auth_required():
def wrapper(f):
@wraps(f)
def decorated(*args, **kwargs):
if not "login" in session:
if "login" not in session:
request_url = f"{config['base-dir']}{request.full_path}".replace("//", "/")
return redirect(f"{config['base-dir']}login?return={request_url}")
else:
@ -293,7 +299,7 @@ def login():
allowed6 = None
session["user-data"] = {'asn': asn, 'allowed4': allowed4,
'allowed6': allowed6, 'mnt': mnt, 'authtype': "debug"}
session["login"] = mnt if not "login" in session else session["login"]
session["login"] = mnt if "login" not in session else session["login"]
return redirect(session["return_url"])
except ValueError:
msg = "at least one of the values provided is wrong/invalid"
@ -332,7 +338,7 @@ def peerings_delete():
def peerings_edit():
print(session)
if request.method == "GET":
if not "node" in request.args or not request.args["node"]:
if "node" not in request.args or not request.args["node"]:
return render_template("peerings-edit.html", session=session, config=config, peerings=peerings, msg="no peering selected, please click one of the buttons above")
mnt_peerings = peerings.get_peerings_by_mnt(session["user-data"]["mnt"])
# print(mnt_peerings)
@ -350,7 +356,7 @@ def peerings_edit():
elif request.method == "POST":
print(request.args)
print(request.form)
if not "node" in request.args or not request.args["node"]:
if "node" not in request.args or not request.args["node"]:
return render_template("peerings-edit.html", session=session, config=config, peerings=peerings, msg="no peering selected, please click one of the buttons above")
peering_valid, peering_or_msg = check_peering_data(request.form)
@ -384,7 +390,7 @@ def peerings_new():
elif request.method == "POST":
print(request.args)
print(request.form)
if not "node" in request.args or not request.args["node"]:
if "node" not in request.args or not request.args["node"]:
return render_template("peerings-new.html", session=session, config=config, peerings=peerings, msg="no node specified, please click one of the buttons above")
peering_valid, peering_or_msg = check_peering_data(request.form)
@ -416,6 +422,11 @@ def peerings_view():
# shouldn't get here
abort(405)
@app.route("/hidden", methods=["GET", "POST", "DELETE"])
@auth_required()
def hidden_page():
...
@app.route("/")
def index():

View file

@ -97,13 +97,13 @@ class PeeringManager:
__new = {}
for asn in self.peerings["asn"]:
for peering in self.peerings["asn"][asn]:
if not peering["node"] in __new:
if peering["node"] not in __new:
__new[peering["node"]] = 0
__new[peering["node"]] += 1
self._amounts = __new
def amount_by_node(self, node_name: str):
if self._amounts == None:
if self._amounts is None:
self._update_amounts()
try:
return self._amounts[node_name]
@ -131,12 +131,22 @@ class PeeringManager:
# print(self.peerings)
try:
out = []
for asn in self.peerings["mnter"][mnt]:
try:
for peering in self.peerings["asn"][asn]:
out.append(peering)
except KeyError as e:
pass
# if admin is logged in: return all
if mnt == self.__config["MNT"]:
for mntner in self.peerings["mntner"]:
for asn in self.peerings["mnter"][mntner]:
try:
for peering in self.peerings["asn"][asn]:
out.append(peering)
except KeyError:
pass
else:
for asn in self.peerings["mnter"][mnt]:
try:
for peering in self.peerings["asn"][asn]:
out.append(peering)
except KeyError:
pass
return out
except KeyError:
return {}
@ -144,14 +154,14 @@ class PeeringManager:
def add_peering(self, asn, node, mnt, wg_key, endpoint=None, ipv6ll=None, ipv4=None, ipv6=None, bgp_mp=True, bgp_enh=True):
# check if this MNT already has a/this asn
try:
if not asn in self.peerings["mnter"][mnt]:
if asn not in self.peerings["mnter"][mnt]:
# ... and add it if it hasn't
self.peerings[mnt].append(asn)
except KeyError:
# ... and cerate it if it doesn't have any yet
self.peerings["mnter"][mnt] = [asn]
try:
if not asn in self.peerings["asn"]:
if asn not in self.peerings["asn"]:
self.peerings["asn"][asn] = []
except KeyError:
self.peerings["asn"][asn] = []
@ -175,7 +185,7 @@ class PeeringManager:
def update_peering(self, asn, node, mnt, wg_key, endpoint=None, ipv6ll=None, ipv4=None, ipv6=None, bgp_mp=True, bgp_enh=True):
# check if this MNT already has a/this asn
try:
if not asn in self.peerings["mnter"][mnt]:
if asn not in self.peerings["mnter"][mnt]:
# ... and add it if it hasn't
self.peerings[mnt].append(asn)
except KeyError:
@ -183,7 +193,7 @@ class PeeringManager:
self.peerings["mnter"][mnt] = [asn]
try:
# there are no peerings for this asn -> can't edit nothing...
if not asn in self.peerings["asn"]:
if asn not in self.peerings["asn"]:
return False, 404
except KeyError:
# this should only happen if "asn" not in self.peerings

View file

@ -190,7 +190,7 @@
<tr>
<td><label for="peer-asn">Your ASN</label></td>
<td></td>
<td><input type="text" name="peer-asn" id="peer-asn" disabled="disabled" value="{{session['user-data']['asn']}}"></td>
<td><input type="text" name="peer-asn" id="peer-asn" {% if session["login"] != config["MNT"] %}disabled="disabled"{%endif%%} value="{{session['user-data']['asn']}}"></td>
</tr>
<tr>
<td><h4>Wireguard</h4></td>

View file

@ -187,7 +187,7 @@
<tr>
<td><label for="peer-asn">Your ASN</label></td>
<td></td>
<td><input type="text" name="peer-asn" id="peer-asn" disabled="disabled" value="{{session['user-data']['asn']}}"></td>
<td><input type="text" name="peer-asn" id="peer-asn" {% if session["login"] != config["MNT"] %}disabled="disabled"{%endif%%} value="{{session['user-data']['asn']}}"></td>
</tr>
<tr>
<td><h4>Wireguard</h4></td>