add check for bgp cap

This commit is contained in:
lare 2022-12-03 13:31:47 +01:00
parent 14bb23690e
commit 85c2080dc1
3 changed files with 208 additions and 129 deletions

View file

@ -124,6 +124,122 @@ class PeeringManager:
config = Config()
peerings = PeeringManager(config["peerings"])
kverifyer = kioubit_verify.AuthVerifyer(config["domain"])
def check_peering_data(form):
new_peering = {}
# errors = 0
## check if all required (and enabled) options are specified
try:
new_peering["peer-asn"] = session["user-data"]["asn"]
new_peering["peer-wgkey"] = form["peer-wgkey"]
if "peer-endpoint-enabled" in form and form["peer-endpoint-enabled"] == "on":
new_peering["peer-endpoint"] = form["peer-endpoint"]
if new_peering["peer-endpoint"] == "":
raise ValueError("peer-endpoint")
else:
new_peering["peer-endpoint"] = None
if "peer-v6ll-enabled" in form and form["peer-v6ll-enabled"] == "on":
new_peering["peer-v6ll"] = form["peer-v6ll"]
if new_peering["peer-v6ll"] == "":
raise ValueError("peer-v6ll")
else:
new_peering["peer-v6ll"] = None
if "peer-v4-enabled" in form and form["peer-v4-enabled"] == "on":
new_peering["peer-v4"] = form["peer-v4"]
if new_peering["peer-v4"] == "":
raise ValueError("peer-v4")
else:
new_peering["peer-v4"] = None
if "peer-v6-enabled" in form and form["peer-v6-enabled"] == "on":
new_peering["peer-v6"] = form["peer-v6"]
if new_peering["peer-v6"] == "":
raise ValueError("peer-v6")
else:
new_peering["peer-v6"] = None
new_peering["bgp-mp"] = form["bgp-multi-protocol"] if "bgp-multi-protocol" in form else "off"
new_peering["bgp-enh"] = form["bgp-extended-next-hop"] if "bgp-extended-next-hop" in form else "off"
#new_peering[""] = form["peer-wgkey"]
except ValueError as e:
print(f"error: {e.args}")
return False, "at least one of the required/enabled fields was not filled out"
print(new_peering)
## check wireguard key
wg_key_invalid = False
if len(new_peering["peer-wgkey"]) != 44:
wg_key_invalid = True
try:
base64.b64decode(new_peering["peer-wgkey"])
except:
wg_key_invalid = True
if wg_key_invalid:
return False, "invalid wireguard Key"
## check endpoint
if new_peering["peer-endpoint"]:
if not new_peering["peer-endpoint"].split(":")[-1].isnumeric():
return False, "no port number in endpoint"
elif len(new_peering["peer-endpoint"].split(":")) < 2 and not "." in new_peering["peer-endpoint"]:
return False, "endpoint doesn't look like a ip address or fqdn"
## check if at least one ip is specified/enabled
try:
if not (new_peering["peer-v6ll"] or new_peering["peer-v4"] or new_peering["peer-v6"]):
return False, "at least one of the ip addresses must be enabled and specified"
except KeyError:
return False, "one of the values isn't valid"
## check if supplied ip addresses are valid
try:
if new_peering["peer-v6ll"]:
ipv6ll = ip_address(new_peering["peer-v6ll"])
if not ipv6ll.version == 6: raise ValueError()
if not ipv6ll.is_link_local: raise ValueError()
if new_peering["peer-v4"]:
ipv4 = ip_address(new_peering["peer-v4"])
if not ipv4.version == 4: raise ValueError()
if ipv4.is_link_local:
pass
elif ipv4.is_private:
if not (ipv4.compressed.startswith("172.2") or ipv4.compressed.startswith("10.")):
raise ValueError()
is_in_allowed = False
if session["user-data"]["allowed4"]:
for allowed4 in session["user-data"]["allowed4"].split(","):
if ipv4 in ip_network(allowed4):
is_in_allowed = True
if not is_in_allowed:
return False, "supplied ipv4 addr not in allowed ip range"
else: raise ValueError()
if new_peering["peer-v6"]:
ipv6 = ip_address(new_peering["peer-v6"])
if not ipv6.version == 6: raise ValueError()
if not ipv6.is_private: raise ValueError()
if ipv6.is_link_local: raise ValueError()
is_in_allowed = False
if session["user-data"]["allowed6"]:
for allowed6 in session["user-data"]["allowed6"].split(","):
if ipv6 in ip_network(allowed6):
is_in_allowed = True
if not is_in_allowed:
return False, "supplied ipv6 addr not in allowed ip range"
except ValueError:
return False, "invalid ip address(es) supplied"
# check bgp options
try:
if new_peering["bgp-mp"] == "off" and new_peering["bgp-enh"] == "on":
return False, "extended next hop requires multiprotocol bgp"
if new_peering["bgp-mp"] == "off":
if not (new_peering["peer-v4"] and (new_peering["peer-v6"] or new_peering["peer-v6ll"])):
return False, "ipv4 and ipv6 addresses required when not having MP-BGP"
except ValueError:
...
return True, new_peering
def auth_required():
def wrapper(f):
@wraps(f)
@ -136,7 +252,6 @@ def auth_required():
return wrapper
kverifyer = kioubit_verify.AuthVerifyer(config["domain"])
@app.route("/api/auth/kverify", methods=["GET", "POST"])
def kioubit_auth():
try:
@ -239,109 +354,11 @@ def peerings_new():
if not "node" in request.args or not request.args["node"]:
return render_template("peerings-new.html", session=session,config=config, peerings=peerings, msg="no node specified, please click one of the buttons above")
new_peering = {}
# errors = 0
## check if all required (and enabled) options are specified
try:
new_peering["peer-asn"] = session["user-data"]["asn"]
new_peering["peer-wgkey"] = request.form["peer-wgkey"]
if request.form["peer-endpoint-enabled"] == "on":
new_peering["peer-endpoint"] = request.form["peer-endpoint"]
if new_peering["peer-endpoint"] == "":
raise ValueError("peer-endpoint")
else:
new_peering["peer-endpoint"] = None
if "peer-v6ll-enabled" in request.form and request.form["peer-v6ll-enabled"] == "on":
new_peering["peer-v6ll"] = request.form["peer-v6ll"]
if new_peering["peer-v6ll"] == "":
raise ValueError("peer-v6ll")
else:
new_peering["peer-v6ll"] = None
if "peer-v4-enabled" in request.form and request.form["peer-v4-enabled"] == "on":
new_peering["peer-v4"] = request.form["peer-v4"]
if new_peering["peer-v4"] == "":
raise ValueError("peer-v4")
else:
new_peering["peer-v4"] = None
if "peer-v6-enabled" in request.form and request.form["peer-v6-enabled"] == "on":
new_peering["peer-v6"] = request.form["peer-v6"]
if new_peering["peer-v6"] == "":
raise ValueError("peer-v6")
else:
new_peering["peer-v6"] = None
#new_peering[""] = request.form["peer-wgkey"]
except ValueError as e:
print(f"error: {e.args}")
return render_template("peerings-new.html", session=session,config=config, peerings=peerings, msg="at least one of the required/enabled fields was not filled out"), 400
peering_valid, peering_or_msg = check_peering_data(request.form)
print(new_peering)
## check wireguard key
wg_key_invalid = False
if len(new_peering["peer-wgkey"]) != 44:
wg_key_invalid = True
try:
base64.b64decode(new_peering["peer-wgkey"])
except:
wg_key_invalid = True
if wg_key_invalid:
return render_template("peerings-new.html", session=session,config=config, peerings=peerings, msg="invalid wireguard Key"), 400
## check endpoint
if new_peering["peer-endpoint"]:
if not new_peering["peer-endpoint"].split(":")[-1].isnumeric():
return render_template("peerings-new.html", session=session,config=config, peerings=peerings, msg="no port number in endpoint"), 400
elif len(new_peering["peer-endpoint"].split(":")) < 2 and not "." in new_peering["peer-endpoint"]:
return render_template("peerings-new.html", session=session,config=config, peerings=peerings, msg="endpoint doesn't look like a ip address or fqdn"), 400
## check if at least one ip is specified/enabled
try:
if not (new_peering["peer-v6ll"] or new_peering["peer-v4"] or new_peering["peer-v6"]):
return render_template("peerings-new.html", session=session,config=config, peerings=peerings, msg="at least one of the ip addresses must be enabled and specified"), 400
except KeyError:
return render_template("peerings-new.html", session=session,config=config, peerings=peerings, msg="one of the values isn't valid"), 400
## check if supplied ip addresses are valid
try:
if new_peering["peer-v6ll"]:
ipv6ll = ip_address(new_peering["peer-v6ll"])
if not ipv6ll.version == 6: raise ValueError()
if not ipv6ll.is_link_local: raise ValueError()
if new_peering["peer-v4"]:
ipv4 = ip_address(new_peering["peer-v4"])
if not ipv4.version == 4: raise ValueError()
if ipv4.is_link_local:
pass
elif ipv4.is_private:
if not (ipv4.compressed.startswith("172.2") or ipv4.compressed.startswith("10.")):
raise ValueError()
is_in_allowed = False
if session["user-data"]["allowed4"]:
for allowed4 in session["user-data"]["allowed4"].split(","):
if ipv4 in ip_network(allowed4):
is_in_allowed = True
if not is_in_allowed:
return render_template("peerings-new.html", session=session,config=config, peerings=peerings, msg="supplied ipv4 addr not in allowed ip range"), 400
else: raise ValueError()
if new_peering["peer-v6"]:
ipv6 = ip_address(new_peering["peer-v6"])
if not ipv6.version == 6: raise ValueError()
if not ipv6.is_private: raise ValueError()
if ipv6.is_link_local: raise ValueError()
is_in_allowed = False
if session["user-data"]["allowed6"]:
for allowed6 in session["user-data"]["allowed6"].split(","):
if ipv6 in ip_network(allowed6):
is_in_allowed = True
if not is_in_allowed:
return render_template("peerings-new.html", session=session,config=config, peerings=peerings, msg="supplied ipv6 addr not in allowed ip range"), 400
except ValueError:
return render_template("peerings-new.html", session=session,config=config, peerings=peerings, msg="invalid ip address(es) supplied"), 400
if not peerings.add_peering(session["user-data"]["mnt"], session["user-data"]["asn"], request.args["node"], new_peering["peer-wgkey"], new_peering["peer-endpoint"], new_peering["peer-v6ll"], new_peering["peer-v4"], new_peering["peer-v6"]):
if not peering_valid:
return render_template("peerings-new.html", session=session,config=config, peerings=peerings, msg=peering_or_msg), 400
if not peerings.add_peering(session["user-data"]["mnt"], session["user-data"]["asn"], request.args["node"], peering_or_msg["peer-wgkey"], peering_or_msg["peer-endpoint"], peering_or_msg["peer-v6ll"], peering_or_msg["peer-v4"], peering_or_msg["peer-v6"]):
return render_template("peerings-new.html", session=session,config=config, peerings=peerings, msg="this ASN already has a peering with the requested node"), 400
return redirect(f"{config['base-dir']}peerings")