ScoDoc-PE/flask_cas/routing.py

215 lines
7.9 KiB
Python
Raw Permalink Normal View History

"""
Routes for CAS authentication
Modified for ScoDoc
"""
import re
2023-02-26 23:27:40 +01:00
import ssl
from urllib.error import URLError
from urllib.request import urlopen
2023-02-26 23:27:40 +01:00
2023-02-26 21:24:07 +01:00
import flask
2023-12-17 12:45:32 +01:00
from flask import current_app, request
from xmltodict import parse
2023-02-26 21:24:07 +01:00
from .cas_urls import create_cas_login_url
from .cas_urls import create_cas_logout_url
from .cas_urls import create_cas_validate_url
blueprint = flask.Blueprint("cas", __name__)
@blueprint.route("/login/")
def login():
"""
This route has two purposes. First, it is used by the user
to login. Second, it is used by the CAS to respond with the
`ticket` after the user logs in successfully.
When the user accesses this url, they are redirected to the CAS
to login. If the login was successful, the CAS will respond to this
route with the ticket in the url. The ticket is then validated.
If validation was successful the logged in username is saved in
the user's session under the key `CAS_USERNAME_SESSION_KEY` and
the user's attributes are saved under the key
'CAS_USERNAME_ATTRIBUTE_KEY'
"""
conf_func = current_app.config.get("CAS_CONFIGURATION_FUNCTION")
if conf_func: # call function setting app configuration
conf_func(current_app)
if not "CAS_SERVER" in current_app.config:
current_app.logger.info("cas_login: no configuration")
return "CAS configuration missing"
2023-02-26 21:24:07 +01:00
cas_token_session_key = current_app.config["CAS_TOKEN_SESSION_KEY"]
redirect_url = create_cas_login_url(
current_app.config["CAS_SERVER"],
current_app.config["CAS_LOGIN_ROUTE"],
flask.url_for(
".login",
origin=flask.session.get("CAS_AFTER_LOGIN_SESSION_URL"),
_external=True,
),
)
if "ticket" in flask.request.args:
flask.session[cas_token_session_key] = flask.request.args["ticket"]
if cas_token_session_key in flask.session:
if validate(flask.session[cas_token_session_key]):
if "CAS_AFTER_LOGIN_SESSION_URL" in flask.session:
redirect_url = flask.session.pop("CAS_AFTER_LOGIN_SESSION_URL")
elif flask.request.args.get("origin"):
redirect_url = flask.request.args["origin"]
else:
redirect_url = flask.url_for(current_app.config["CAS_AFTER_LOGIN"])
else:
flask.session.pop(cas_token_session_key, None)
current_app.logger.debug(f"cas.login: redirecting to {redirect_url}")
2023-02-26 21:24:07 +01:00
return flask.redirect(redirect_url)
@blueprint.route("/logout/")
def logout():
"""
When the user accesses this route they are logged out.
"""
conf_func = current_app.config.get("CAS_CONFIGURATION_FUNCTION")
if conf_func: # call function setting app configuration
conf_func(current_app)
2023-02-26 21:24:07 +01:00
cas_username_session_key = current_app.config["CAS_USERNAME_SESSION_KEY"]
cas_attributes_session_key = current_app.config["CAS_ATTRIBUTES_SESSION_KEY"]
cas_token_session_key = current_app.config["CAS_TOKEN_SESSION_KEY"]
flask.session.pop(cas_username_session_key, None)
flask.session.pop(cas_attributes_session_key, None)
flask.session.pop(cas_token_session_key, None) # added by EV
flask.session.pop("CAS_EDT_ID", None) # added by EV
cas_after_logout = current_app.config.get("CAS_AFTER_LOGOUT")
2023-12-17 12:45:32 +01:00
cas_logout_route = current_app.config.get("CAS_LOGOUT_ROUTE")
cas_server = current_app.config.get("CAS_SERVER")
if cas_server:
if cas_after_logout and cas_logout_route:
# If config starts with http, use it as dest URL.
# Else, build Flask URL
dest_url = (
cas_after_logout
if cas_after_logout.startswith("http")
else flask.url_for(cas_after_logout, _external=True)
)
redirect_url = create_cas_logout_url(
cas_server,
cas_logout_route,
dest_url,
)
else:
redirect_url = create_cas_logout_url(cas_server, None)
2023-02-26 21:24:07 +01:00
else:
2023-12-17 12:45:32 +01:00
redirect_url = request.root_url
2023-02-26 21:24:07 +01:00
current_app.logger.debug(f"cas.logout: redirecting to {redirect_url}")
2023-02-26 21:24:07 +01:00
return flask.redirect(redirect_url)
def validate(ticket):
"""
Will attempt to validate the ticket. If validation fails, then False
is returned. If validation is successful, then True is returned
and the validated username is saved in the session under the
2023-02-27 14:00:04 +01:00
key `CAS_USERNAME_SESSION_KEY` while the validated attributes dictionary
2023-02-26 21:24:07 +01:00
is saved under the key 'CAS_ATTRIBUTES_SESSION_KEY'.
"""
from app.models.config import ScoDocSiteConfig
2023-02-26 21:24:07 +01:00
cas_username_session_key = current_app.config["CAS_USERNAME_SESSION_KEY"]
cas_attributes_session_key = current_app.config["CAS_ATTRIBUTES_SESSION_KEY"]
2023-02-27 09:46:15 +01:00
cas_error_callback = current_app.config.get("CAS_ERROR_CALLBACK")
current_app.logger.debug(f"validating token {ticket}")
2023-02-26 21:24:07 +01:00
cas_validate_url = create_cas_validate_url(
current_app.config["CAS_SERVER"],
current_app.config["CAS_VALIDATE_ROUTE"],
flask.url_for(
".login",
origin=flask.session.get("CAS_AFTER_LOGIN_SESSION_URL"),
_external=True,
),
ticket,
)
2023-12-17 12:45:32 +01:00
current_app.logger.debug(f"Making GET request to {cas_validate_url}")
2023-02-26 21:24:07 +01:00
xml_from_dict = {}
2023-12-17 12:45:32 +01:00
is_valid = False
2023-02-26 21:24:07 +01:00
2023-02-26 23:27:40 +01:00
if current_app.config.get("CAS_SSL_VERIFY"):
ssl_context = ssl.SSLContext()
ssl_context.verify_mode = ssl.CERT_REQUIRED
ca_data = current_app.config.get("CAS_SSL_CERTIFICATE", "")
try:
ssl_context.load_verify_locations(cadata=ca_data)
except (ssl.SSLError, ValueError):
current_app.logger.error("CAS : error loading SSL cert.")
2023-02-27 09:46:15 +01:00
if cas_error_callback:
cas_error_callback("erreur chargement certificat SSL CAS (PEM)")
2023-02-26 23:27:40 +01:00
return False
else:
ssl_context = None
2023-02-26 21:24:07 +01:00
try:
2023-02-26 23:27:40 +01:00
xmldump = (
urlopen(cas_validate_url, context=ssl_context)
.read()
.strip()
.decode("utf8", "ignore")
)
2023-02-26 21:24:07 +01:00
xml_from_dict = parse(xmldump)
2023-12-17 12:45:32 +01:00
is_valid = (
2023-02-26 21:24:07 +01:00
True
if "cas:authenticationSuccess" in xml_from_dict["cas:serviceResponse"]
else False
)
except ValueError:
current_app.logger.error("CAS returned unexpected result")
2023-02-27 09:46:15 +01:00
if cas_error_callback:
cas_error_callback("réponse invalide du serveur CAS")
2023-02-26 23:27:40 +01:00
except URLError:
current_app.logger.error("CAS : error validating token: check SSL certificate")
2023-02-27 09:46:15 +01:00
cas_error_callback(
"erreur connexion au serveur CAS: vérifiez le certificat SSL"
)
2023-02-26 21:24:07 +01:00
2023-12-17 12:45:32 +01:00
if is_valid:
2023-02-26 21:24:07 +01:00
current_app.logger.debug("valid")
xml_from_dict = xml_from_dict["cas:serviceResponse"][
"cas:authenticationSuccess"
]
username = xml_from_dict["cas:user"]
attributes = xml_from_dict.get("cas:attributes", {})
if attributes and "cas:memberOf" in attributes:
if isinstance(attributes["cas:memberOf"], str):
2023-02-26 21:24:07 +01:00
attributes["cas:memberOf"] = (
attributes["cas:memberOf"].lstrip("[").rstrip("]").split(",")
)
for group_number in range(0, len(attributes["cas:memberOf"])):
attributes["cas:memberOf"][group_number] = (
attributes["cas:memberOf"][group_number].lstrip(" ").rstrip(" ")
)
# Extract auxiliary informations (utilisé pour edt_id)
exp = ScoDocSiteConfig.get("cas_edt_id_from_xml_regexp")
if exp:
m = re.search(exp, xmldump)
if m and len(m.groups()) > 0:
cas_edt_id = m.group(1)
if cas_edt_id:
flask.session["CAS_EDT_ID"] = cas_edt_id
2023-02-26 21:24:07 +01:00
flask.session[cas_username_session_key] = username
flask.session[cas_attributes_session_key] = attributes
else:
current_app.logger.debug("invalid")
2023-12-17 12:45:32 +01:00
return is_valid