forked from ScoDoc/ScoDoc
195 lines
6.9 KiB
Python
195 lines
6.9 KiB
Python
import ssl
|
|
|
|
import flask
|
|
from xmltodict import parse
|
|
from flask import current_app
|
|
from .cas_urls import create_cas_login_url
|
|
from .cas_urls import create_cas_logout_url
|
|
from .cas_urls import create_cas_validate_url
|
|
|
|
|
|
try:
|
|
from urllib import urlopen # python 2
|
|
except ImportError:
|
|
from urllib.request import urlopen # python 3
|
|
from urllib.error import URLError
|
|
|
|
blueprint = flask.Blueprint("cas", __name__)
|
|
|
|
|
|
@blueprint.route("/login/")
|
|
def login():
|
|
"""
|
|
This route has two purposes. First, it is used by the user
|
|
to login. Second, it is used by the CAS to respond with the
|
|
`ticket` after the user logs in successfully.
|
|
|
|
When the user accesses this url, they are redirected to the CAS
|
|
to login. If the login was successful, the CAS will respond to this
|
|
route with the ticket in the url. The ticket is then validated.
|
|
If validation was successful the logged in username is saved in
|
|
the user's session under the key `CAS_USERNAME_SESSION_KEY` and
|
|
the user's attributes are saved under the key
|
|
'CAS_USERNAME_ATTRIBUTE_KEY'
|
|
"""
|
|
|
|
cas_token_session_key = current_app.config["CAS_TOKEN_SESSION_KEY"]
|
|
|
|
redirect_url = create_cas_login_url(
|
|
current_app.config["CAS_SERVER"],
|
|
current_app.config["CAS_LOGIN_ROUTE"],
|
|
flask.url_for(
|
|
".login",
|
|
origin=flask.session.get("CAS_AFTER_LOGIN_SESSION_URL"),
|
|
_external=True,
|
|
),
|
|
)
|
|
|
|
if "ticket" in flask.request.args:
|
|
flask.session[cas_token_session_key] = flask.request.args["ticket"]
|
|
|
|
if cas_token_session_key in flask.session:
|
|
|
|
if validate(flask.session[cas_token_session_key]):
|
|
if "CAS_AFTER_LOGIN_SESSION_URL" in flask.session:
|
|
redirect_url = flask.session.pop("CAS_AFTER_LOGIN_SESSION_URL")
|
|
elif flask.request.args.get("origin"):
|
|
redirect_url = flask.request.args["origin"]
|
|
else:
|
|
redirect_url = flask.url_for(current_app.config["CAS_AFTER_LOGIN"])
|
|
else:
|
|
flask.session.pop(cas_token_session_key, None)
|
|
|
|
current_app.logger.debug("Redirecting to: {0}".format(redirect_url))
|
|
|
|
return flask.redirect(redirect_url)
|
|
|
|
|
|
@blueprint.route("/logout/")
|
|
def logout():
|
|
"""
|
|
When the user accesses this route they are logged out.
|
|
"""
|
|
|
|
cas_username_session_key = current_app.config["CAS_USERNAME_SESSION_KEY"]
|
|
cas_attributes_session_key = current_app.config["CAS_ATTRIBUTES_SESSION_KEY"]
|
|
cas_token_session_key = current_app.config["CAS_TOKEN_SESSION_KEY"]
|
|
|
|
flask.session.pop(cas_username_session_key, None)
|
|
flask.session.pop(cas_attributes_session_key, None)
|
|
flask.session.pop(cas_token_session_key, None) # added by EV
|
|
|
|
cas_after_logout = current_app.config["CAS_AFTER_LOGOUT"]
|
|
if cas_after_logout is not None:
|
|
# If config starts with http, use it as dest URL.
|
|
# Else, build Flask URL
|
|
dest_url = (
|
|
cas_after_logout
|
|
if cas_after_logout.startswith("http")
|
|
else flask.url_for(cas_after_logout, _external=True)
|
|
)
|
|
redirect_url = create_cas_logout_url(
|
|
current_app.config["CAS_SERVER"],
|
|
current_app.config["CAS_LOGOUT_ROUTE"],
|
|
dest_url,
|
|
)
|
|
else:
|
|
redirect_url = create_cas_logout_url(
|
|
current_app.config["CAS_SERVER"], current_app.config["CAS_LOGOUT_ROUTE"]
|
|
)
|
|
|
|
current_app.logger.debug("Redirecting to: {0}".format(redirect_url))
|
|
return flask.redirect(redirect_url)
|
|
|
|
|
|
def validate(ticket):
|
|
"""
|
|
Will attempt to validate the ticket. If validation fails, then False
|
|
is returned. If validation is successful, then True is returned
|
|
and the validated username is saved in the session under the
|
|
key `CAS_USERNAME_SESSION_KEY` while the validated attributes dictionary
|
|
is saved under the key 'CAS_ATTRIBUTES_SESSION_KEY'.
|
|
"""
|
|
|
|
cas_username_session_key = current_app.config["CAS_USERNAME_SESSION_KEY"]
|
|
cas_attributes_session_key = current_app.config["CAS_ATTRIBUTES_SESSION_KEY"]
|
|
cas_error_callback = current_app.config.get("CAS_ERROR_CALLBACK")
|
|
current_app.logger.debug("validating token {0}".format(ticket))
|
|
|
|
cas_validate_url = create_cas_validate_url(
|
|
current_app.config["CAS_SERVER"],
|
|
current_app.config["CAS_VALIDATE_ROUTE"],
|
|
flask.url_for(
|
|
".login",
|
|
origin=flask.session.get("CAS_AFTER_LOGIN_SESSION_URL"),
|
|
_external=True,
|
|
),
|
|
ticket,
|
|
)
|
|
|
|
current_app.logger.debug("Making GET request to {0}".format(cas_validate_url))
|
|
|
|
xml_from_dict = {}
|
|
isValid = False
|
|
|
|
if current_app.config.get("CAS_SSL_VERIFY"):
|
|
ssl_context = ssl.SSLContext()
|
|
ssl_context.verify_mode = ssl.CERT_REQUIRED
|
|
ca_data = current_app.config.get("CAS_SSL_CERTIFICATE", "")
|
|
try:
|
|
ssl_context.load_verify_locations(cadata=ca_data)
|
|
except (ssl.SSLError, ValueError):
|
|
current_app.logger.error("CAS : error loading SSL cert.")
|
|
if cas_error_callback:
|
|
cas_error_callback("erreur chargement certificat SSL CAS (PEM)")
|
|
return False
|
|
else:
|
|
ssl_context = None
|
|
|
|
try:
|
|
xmldump = (
|
|
urlopen(cas_validate_url, context=ssl_context)
|
|
.read()
|
|
.strip()
|
|
.decode("utf8", "ignore")
|
|
)
|
|
xml_from_dict = parse(xmldump)
|
|
isValid = (
|
|
True
|
|
if "cas:authenticationSuccess" in xml_from_dict["cas:serviceResponse"]
|
|
else False
|
|
)
|
|
except ValueError:
|
|
current_app.logger.error("CAS returned unexpected result")
|
|
if cas_error_callback:
|
|
cas_error_callback("réponse invalide du serveur CAS")
|
|
except URLError:
|
|
current_app.logger.error("CAS : error validating token: check SSL certificate")
|
|
cas_error_callback(
|
|
"erreur connexion au serveur CAS: vérifiez le certificat SSL"
|
|
)
|
|
|
|
if isValid:
|
|
current_app.logger.debug("valid")
|
|
xml_from_dict = xml_from_dict["cas:serviceResponse"][
|
|
"cas:authenticationSuccess"
|
|
]
|
|
username = xml_from_dict["cas:user"]
|
|
attributes = xml_from_dict.get("cas:attributes", {})
|
|
|
|
if attributes and "cas:memberOf" in attributes:
|
|
if isinstance(attributes["cas:memberOf"], basestring):
|
|
attributes["cas:memberOf"] = (
|
|
attributes["cas:memberOf"].lstrip("[").rstrip("]").split(",")
|
|
)
|
|
for group_number in range(0, len(attributes["cas:memberOf"])):
|
|
attributes["cas:memberOf"][group_number] = (
|
|
attributes["cas:memberOf"][group_number].lstrip(" ").rstrip(" ")
|
|
)
|
|
flask.session[cas_username_session_key] = username
|
|
flask.session[cas_attributes_session_key] = attributes
|
|
else:
|
|
current_app.logger.debug("invalid")
|
|
|
|
return isValid
|