211 lines
7.7 KiB
Python
211 lines
7.7 KiB
Python
"""
|
|
Routes for CAS authentication
|
|
Modified for ScoDoc
|
|
"""
|
|
import re
|
|
import ssl
|
|
from urllib.error import URLError
|
|
from urllib.request import urlopen
|
|
|
|
import flask
|
|
from flask import current_app
|
|
from xmltodict import parse
|
|
|
|
from .cas_urls import create_cas_login_url
|
|
from .cas_urls import create_cas_logout_url
|
|
from .cas_urls import create_cas_validate_url
|
|
|
|
|
|
blueprint = flask.Blueprint("cas", __name__)
|
|
|
|
|
|
@blueprint.route("/login/")
|
|
def login():
|
|
"""
|
|
This route has two purposes. First, it is used by the user
|
|
to login. Second, it is used by the CAS to respond with the
|
|
`ticket` after the user logs in successfully.
|
|
|
|
When the user accesses this url, they are redirected to the CAS
|
|
to login. If the login was successful, the CAS will respond to this
|
|
route with the ticket in the url. The ticket is then validated.
|
|
If validation was successful the logged in username is saved in
|
|
the user's session under the key `CAS_USERNAME_SESSION_KEY` and
|
|
the user's attributes are saved under the key
|
|
'CAS_USERNAME_ATTRIBUTE_KEY'
|
|
"""
|
|
conf_func = current_app.config.get("CAS_CONFIGURATION_FUNCTION")
|
|
if conf_func: # call function setting app configuration
|
|
conf_func(current_app)
|
|
if not "CAS_SERVER" in current_app.config:
|
|
current_app.logger.info("cas_login: no configuration")
|
|
return "CAS configuration missing"
|
|
cas_token_session_key = current_app.config["CAS_TOKEN_SESSION_KEY"]
|
|
|
|
redirect_url = create_cas_login_url(
|
|
current_app.config["CAS_SERVER"],
|
|
current_app.config["CAS_LOGIN_ROUTE"],
|
|
flask.url_for(
|
|
".login",
|
|
origin=flask.session.get("CAS_AFTER_LOGIN_SESSION_URL"),
|
|
_external=True,
|
|
),
|
|
)
|
|
if "ticket" in flask.request.args:
|
|
flask.session[cas_token_session_key] = flask.request.args["ticket"]
|
|
|
|
if cas_token_session_key in flask.session:
|
|
if validate(flask.session[cas_token_session_key]):
|
|
if "CAS_AFTER_LOGIN_SESSION_URL" in flask.session:
|
|
redirect_url = flask.session.pop("CAS_AFTER_LOGIN_SESSION_URL")
|
|
elif flask.request.args.get("origin"):
|
|
redirect_url = flask.request.args["origin"]
|
|
else:
|
|
redirect_url = flask.url_for(current_app.config["CAS_AFTER_LOGIN"])
|
|
else:
|
|
flask.session.pop(cas_token_session_key, None)
|
|
|
|
current_app.logger.debug(f"cas.login: redirecting to {redirect_url}")
|
|
|
|
return flask.redirect(redirect_url)
|
|
|
|
|
|
@blueprint.route("/logout/")
|
|
def logout():
|
|
"""
|
|
When the user accesses this route they are logged out.
|
|
"""
|
|
conf_func = current_app.config.get("CAS_CONFIGURATION_FUNCTION")
|
|
if conf_func: # call function setting app configuration
|
|
conf_func(current_app)
|
|
cas_username_session_key = current_app.config["CAS_USERNAME_SESSION_KEY"]
|
|
cas_attributes_session_key = current_app.config["CAS_ATTRIBUTES_SESSION_KEY"]
|
|
cas_token_session_key = current_app.config["CAS_TOKEN_SESSION_KEY"]
|
|
|
|
flask.session.pop(cas_username_session_key, None)
|
|
flask.session.pop(cas_attributes_session_key, None)
|
|
flask.session.pop(cas_token_session_key, None) # added by EV
|
|
flask.session.pop("CAS_EDT_ID", None) # added by EV
|
|
|
|
cas_after_logout = current_app.config.get("CAS_AFTER_LOGOUT")
|
|
if cas_after_logout:
|
|
# If config starts with http, use it as dest URL.
|
|
# Else, build Flask URL
|
|
dest_url = (
|
|
cas_after_logout
|
|
if cas_after_logout.startswith("http")
|
|
else flask.url_for(cas_after_logout, _external=True)
|
|
)
|
|
redirect_url = create_cas_logout_url(
|
|
current_app.config["CAS_SERVER"],
|
|
current_app.config["CAS_LOGOUT_ROUTE"],
|
|
dest_url,
|
|
)
|
|
else:
|
|
redirect_url = create_cas_logout_url(current_app.config["CAS_SERVER"], None)
|
|
|
|
current_app.logger.debug(f"cas.logout: redirecting to {redirect_url}")
|
|
return flask.redirect(redirect_url)
|
|
|
|
|
|
def validate(ticket):
|
|
"""
|
|
Will attempt to validate the ticket. If validation fails, then False
|
|
is returned. If validation is successful, then True is returned
|
|
and the validated username is saved in the session under the
|
|
key `CAS_USERNAME_SESSION_KEY` while the validated attributes dictionary
|
|
is saved under the key 'CAS_ATTRIBUTES_SESSION_KEY'.
|
|
"""
|
|
from app.models.config import ScoDocSiteConfig
|
|
|
|
cas_username_session_key = current_app.config["CAS_USERNAME_SESSION_KEY"]
|
|
cas_attributes_session_key = current_app.config["CAS_ATTRIBUTES_SESSION_KEY"]
|
|
cas_error_callback = current_app.config.get("CAS_ERROR_CALLBACK")
|
|
current_app.logger.debug(f"validating token {ticket}")
|
|
|
|
cas_validate_url = create_cas_validate_url(
|
|
current_app.config["CAS_SERVER"],
|
|
current_app.config["CAS_VALIDATE_ROUTE"],
|
|
flask.url_for(
|
|
".login",
|
|
origin=flask.session.get("CAS_AFTER_LOGIN_SESSION_URL"),
|
|
_external=True,
|
|
),
|
|
ticket,
|
|
)
|
|
|
|
current_app.logger.debug("Making GET request to {0}".format(cas_validate_url))
|
|
|
|
xml_from_dict = {}
|
|
isValid = False
|
|
|
|
if current_app.config.get("CAS_SSL_VERIFY"):
|
|
ssl_context = ssl.SSLContext()
|
|
ssl_context.verify_mode = ssl.CERT_REQUIRED
|
|
ca_data = current_app.config.get("CAS_SSL_CERTIFICATE", "")
|
|
try:
|
|
ssl_context.load_verify_locations(cadata=ca_data)
|
|
except (ssl.SSLError, ValueError):
|
|
current_app.logger.error("CAS : error loading SSL cert.")
|
|
if cas_error_callback:
|
|
cas_error_callback("erreur chargement certificat SSL CAS (PEM)")
|
|
return False
|
|
else:
|
|
ssl_context = None
|
|
|
|
try:
|
|
xmldump = (
|
|
urlopen(cas_validate_url, context=ssl_context)
|
|
.read()
|
|
.strip()
|
|
.decode("utf8", "ignore")
|
|
)
|
|
xml_from_dict = parse(xmldump)
|
|
isValid = (
|
|
True
|
|
if "cas:authenticationSuccess" in xml_from_dict["cas:serviceResponse"]
|
|
else False
|
|
)
|
|
except ValueError:
|
|
current_app.logger.error("CAS returned unexpected result")
|
|
if cas_error_callback:
|
|
cas_error_callback("réponse invalide du serveur CAS")
|
|
except URLError:
|
|
current_app.logger.error("CAS : error validating token: check SSL certificate")
|
|
cas_error_callback(
|
|
"erreur connexion au serveur CAS: vérifiez le certificat SSL"
|
|
)
|
|
|
|
if isValid:
|
|
current_app.logger.debug("valid")
|
|
xml_from_dict = xml_from_dict["cas:serviceResponse"][
|
|
"cas:authenticationSuccess"
|
|
]
|
|
username = xml_from_dict["cas:user"]
|
|
attributes = xml_from_dict.get("cas:attributes", {})
|
|
|
|
if attributes and "cas:memberOf" in attributes:
|
|
if isinstance(attributes["cas:memberOf"], str):
|
|
attributes["cas:memberOf"] = (
|
|
attributes["cas:memberOf"].lstrip("[").rstrip("]").split(",")
|
|
)
|
|
for group_number in range(0, len(attributes["cas:memberOf"])):
|
|
attributes["cas:memberOf"][group_number] = (
|
|
attributes["cas:memberOf"][group_number].lstrip(" ").rstrip(" ")
|
|
)
|
|
# Extract auxiliary informations (utilisé pour edt_id)
|
|
exp = ScoDocSiteConfig.get("cas_edt_id_from_xml_regexp")
|
|
if exp:
|
|
m = re.search(exp, xmldump)
|
|
if m and len(m.groups()) > 0:
|
|
cas_edt_id = m.group(1)
|
|
if cas_edt_id:
|
|
flask.session["CAS_EDT_ID"] = cas_edt_id
|
|
|
|
flask.session[cas_username_session_key] = username
|
|
flask.session[cas_attributes_session_key] = attributes
|
|
else:
|
|
current_app.logger.debug("invalid")
|
|
|
|
return isValid
|