ScoDoc/flask_cas/routing.py

188 lines
6.5 KiB
Python
Raw Normal View History

2023-02-26 23:27:40 +01:00
import ssl
2023-02-26 21:24:07 +01:00
import flask
from xmltodict import parse
from flask import current_app
from .cas_urls import create_cas_login_url
from .cas_urls import create_cas_logout_url
from .cas_urls import create_cas_validate_url
try:
2023-02-26 23:27:40 +01:00
from urllib import urlopen # python 2
2023-02-26 21:24:07 +01:00
except ImportError:
2023-02-26 23:27:40 +01:00
from urllib.request import urlopen # python 3
from urllib.error import URLError
2023-02-26 21:24:07 +01:00
blueprint = flask.Blueprint("cas", __name__)
@blueprint.route("/login/")
def login():
"""
This route has two purposes. First, it is used by the user
to login. Second, it is used by the CAS to respond with the
`ticket` after the user logs in successfully.
When the user accesses this url, they are redirected to the CAS
to login. If the login was successful, the CAS will respond to this
route with the ticket in the url. The ticket is then validated.
If validation was successful the logged in username is saved in
the user's session under the key `CAS_USERNAME_SESSION_KEY` and
the user's attributes are saved under the key
'CAS_USERNAME_ATTRIBUTE_KEY'
"""
cas_token_session_key = current_app.config["CAS_TOKEN_SESSION_KEY"]
redirect_url = create_cas_login_url(
current_app.config["CAS_SERVER"],
current_app.config["CAS_LOGIN_ROUTE"],
flask.url_for(
".login",
origin=flask.session.get("CAS_AFTER_LOGIN_SESSION_URL"),
_external=True,
),
)
if "ticket" in flask.request.args:
flask.session[cas_token_session_key] = flask.request.args["ticket"]
if cas_token_session_key in flask.session:
if validate(flask.session[cas_token_session_key]):
if "CAS_AFTER_LOGIN_SESSION_URL" in flask.session:
redirect_url = flask.session.pop("CAS_AFTER_LOGIN_SESSION_URL")
elif flask.request.args.get("origin"):
redirect_url = flask.request.args["origin"]
else:
redirect_url = flask.url_for(current_app.config["CAS_AFTER_LOGIN"])
else:
flask.session.pop(cas_token_session_key, None)
current_app.logger.debug("Redirecting to: {0}".format(redirect_url))
return flask.redirect(redirect_url)
@blueprint.route("/logout/")
def logout():
"""
When the user accesses this route they are logged out.
"""
cas_username_session_key = current_app.config["CAS_USERNAME_SESSION_KEY"]
cas_attributes_session_key = current_app.config["CAS_ATTRIBUTES_SESSION_KEY"]
cas_token_session_key = current_app.config["CAS_TOKEN_SESSION_KEY"]
flask.session.pop(cas_username_session_key, None)
flask.session.pop(cas_attributes_session_key, None)
flask.session.pop(cas_token_session_key, None) # added by EV
cas_after_logout = current_app.config["CAS_AFTER_LOGOUT"]
if cas_after_logout is not None:
# If config starts with http, use it as dest URL.
# Else, build Flask URL
dest_url = (
cas_after_logout
if cas_after_logout.startswith("http")
else flask.url_for(cas_after_logout, _external=True)
)
redirect_url = create_cas_logout_url(
current_app.config["CAS_SERVER"],
current_app.config["CAS_LOGOUT_ROUTE"],
dest_url,
)
else:
redirect_url = create_cas_logout_url(
current_app.config["CAS_SERVER"], current_app.config["CAS_LOGOUT_ROUTE"]
)
current_app.logger.debug("Redirecting to: {0}".format(redirect_url))
return flask.redirect(redirect_url)
def validate(ticket):
"""
Will attempt to validate the ticket. If validation fails, then False
is returned. If validation is successful, then True is returned
and the validated username is saved in the session under the
key `CAS_USERNAME_SESSION_KEY` while tha validated attributes dictionary
is saved under the key 'CAS_ATTRIBUTES_SESSION_KEY'.
"""
cas_username_session_key = current_app.config["CAS_USERNAME_SESSION_KEY"]
cas_attributes_session_key = current_app.config["CAS_ATTRIBUTES_SESSION_KEY"]
current_app.logger.debug("validating token {0}".format(ticket))
cas_validate_url = create_cas_validate_url(
current_app.config["CAS_SERVER"],
current_app.config["CAS_VALIDATE_ROUTE"],
flask.url_for(
".login",
origin=flask.session.get("CAS_AFTER_LOGIN_SESSION_URL"),
_external=True,
),
ticket,
)
current_app.logger.debug("Making GET request to {0}".format(cas_validate_url))
xml_from_dict = {}
isValid = False
2023-02-26 23:27:40 +01:00
if current_app.config.get("CAS_SSL_VERIFY"):
ssl_context = ssl.SSLContext()
ssl_context.verify_mode = ssl.CERT_REQUIRED
ca_data = current_app.config.get("CAS_SSL_CERTIFICATE", "")
try:
ssl_context.load_verify_locations(cadata=ca_data)
except (ssl.SSLError, ValueError):
current_app.logger.error("CAS : error loading SSL cert.")
return False
else:
ssl_context = None
2023-02-26 21:24:07 +01:00
try:
2023-02-26 23:27:40 +01:00
xmldump = (
urlopen(cas_validate_url, context=ssl_context)
.read()
.strip()
.decode("utf8", "ignore")
)
2023-02-26 21:24:07 +01:00
xml_from_dict = parse(xmldump)
isValid = (
True
if "cas:authenticationSuccess" in xml_from_dict["cas:serviceResponse"]
else False
)
except ValueError:
current_app.logger.error("CAS returned unexpected result")
2023-02-26 23:27:40 +01:00
except URLError:
current_app.logger.error("CAS : error validating token: check SSL certificate")
2023-02-26 21:24:07 +01:00
if isValid:
current_app.logger.debug("valid")
xml_from_dict = xml_from_dict["cas:serviceResponse"][
"cas:authenticationSuccess"
]
username = xml_from_dict["cas:user"]
attributes = xml_from_dict.get("cas:attributes", {})
if attributes and "cas:memberOf" in attributes:
if isinstance(attributes["cas:memberOf"], basestring):
attributes["cas:memberOf"] = (
attributes["cas:memberOf"].lstrip("[").rstrip("]").split(",")
)
for group_number in range(0, len(attributes["cas:memberOf"])):
attributes["cas:memberOf"][group_number] = (
attributes["cas:memberOf"][group_number].lstrip(" ").rstrip(" ")
)
flask.session[cas_username_session_key] = username
flask.session[cas_attributes_session_key] = attributes
else:
current_app.logger.debug("invalid")
return isValid