166 lines
5.8 KiB
Python
166 lines
5.8 KiB
Python
|
import flask
|
||
|
from xmltodict import parse
|
||
|
from flask import current_app
|
||
|
from .cas_urls import create_cas_login_url
|
||
|
from .cas_urls import create_cas_logout_url
|
||
|
from .cas_urls import create_cas_validate_url
|
||
|
|
||
|
|
||
|
try:
|
||
|
from urllib import urlopen
|
||
|
except ImportError:
|
||
|
from urllib.request import urlopen
|
||
|
|
||
|
blueprint = flask.Blueprint("cas", __name__)
|
||
|
|
||
|
|
||
|
@blueprint.route("/login/")
|
||
|
def login():
|
||
|
"""
|
||
|
This route has two purposes. First, it is used by the user
|
||
|
to login. Second, it is used by the CAS to respond with the
|
||
|
`ticket` after the user logs in successfully.
|
||
|
|
||
|
When the user accesses this url, they are redirected to the CAS
|
||
|
to login. If the login was successful, the CAS will respond to this
|
||
|
route with the ticket in the url. The ticket is then validated.
|
||
|
If validation was successful the logged in username is saved in
|
||
|
the user's session under the key `CAS_USERNAME_SESSION_KEY` and
|
||
|
the user's attributes are saved under the key
|
||
|
'CAS_USERNAME_ATTRIBUTE_KEY'
|
||
|
"""
|
||
|
|
||
|
cas_token_session_key = current_app.config["CAS_TOKEN_SESSION_KEY"]
|
||
|
|
||
|
redirect_url = create_cas_login_url(
|
||
|
current_app.config["CAS_SERVER"],
|
||
|
current_app.config["CAS_LOGIN_ROUTE"],
|
||
|
flask.url_for(
|
||
|
".login",
|
||
|
origin=flask.session.get("CAS_AFTER_LOGIN_SESSION_URL"),
|
||
|
_external=True,
|
||
|
),
|
||
|
)
|
||
|
|
||
|
if "ticket" in flask.request.args:
|
||
|
flask.session[cas_token_session_key] = flask.request.args["ticket"]
|
||
|
|
||
|
if cas_token_session_key in flask.session:
|
||
|
|
||
|
if validate(flask.session[cas_token_session_key]):
|
||
|
if "CAS_AFTER_LOGIN_SESSION_URL" in flask.session:
|
||
|
redirect_url = flask.session.pop("CAS_AFTER_LOGIN_SESSION_URL")
|
||
|
elif flask.request.args.get("origin"):
|
||
|
redirect_url = flask.request.args["origin"]
|
||
|
else:
|
||
|
redirect_url = flask.url_for(current_app.config["CAS_AFTER_LOGIN"])
|
||
|
else:
|
||
|
flask.session.pop(cas_token_session_key, None)
|
||
|
|
||
|
current_app.logger.debug("Redirecting to: {0}".format(redirect_url))
|
||
|
|
||
|
return flask.redirect(redirect_url)
|
||
|
|
||
|
|
||
|
@blueprint.route("/logout/")
|
||
|
def logout():
|
||
|
"""
|
||
|
When the user accesses this route they are logged out.
|
||
|
"""
|
||
|
|
||
|
cas_username_session_key = current_app.config["CAS_USERNAME_SESSION_KEY"]
|
||
|
cas_attributes_session_key = current_app.config["CAS_ATTRIBUTES_SESSION_KEY"]
|
||
|
cas_token_session_key = current_app.config["CAS_TOKEN_SESSION_KEY"]
|
||
|
|
||
|
flask.session.pop(cas_username_session_key, None)
|
||
|
flask.session.pop(cas_attributes_session_key, None)
|
||
|
flask.session.pop(cas_token_session_key, None) # added by EV
|
||
|
|
||
|
cas_after_logout = current_app.config["CAS_AFTER_LOGOUT"]
|
||
|
if cas_after_logout is not None:
|
||
|
# If config starts with http, use it as dest URL.
|
||
|
# Else, build Flask URL
|
||
|
dest_url = (
|
||
|
cas_after_logout
|
||
|
if cas_after_logout.startswith("http")
|
||
|
else flask.url_for(cas_after_logout, _external=True)
|
||
|
)
|
||
|
redirect_url = create_cas_logout_url(
|
||
|
current_app.config["CAS_SERVER"],
|
||
|
current_app.config["CAS_LOGOUT_ROUTE"],
|
||
|
dest_url,
|
||
|
)
|
||
|
else:
|
||
|
redirect_url = create_cas_logout_url(
|
||
|
current_app.config["CAS_SERVER"], current_app.config["CAS_LOGOUT_ROUTE"]
|
||
|
)
|
||
|
|
||
|
current_app.logger.debug("Redirecting to: {0}".format(redirect_url))
|
||
|
return flask.redirect(redirect_url)
|
||
|
|
||
|
|
||
|
def validate(ticket):
|
||
|
"""
|
||
|
Will attempt to validate the ticket. If validation fails, then False
|
||
|
is returned. If validation is successful, then True is returned
|
||
|
and the validated username is saved in the session under the
|
||
|
key `CAS_USERNAME_SESSION_KEY` while tha validated attributes dictionary
|
||
|
is saved under the key 'CAS_ATTRIBUTES_SESSION_KEY'.
|
||
|
"""
|
||
|
|
||
|
cas_username_session_key = current_app.config["CAS_USERNAME_SESSION_KEY"]
|
||
|
cas_attributes_session_key = current_app.config["CAS_ATTRIBUTES_SESSION_KEY"]
|
||
|
|
||
|
current_app.logger.debug("validating token {0}".format(ticket))
|
||
|
|
||
|
cas_validate_url = create_cas_validate_url(
|
||
|
current_app.config["CAS_SERVER"],
|
||
|
current_app.config["CAS_VALIDATE_ROUTE"],
|
||
|
flask.url_for(
|
||
|
".login",
|
||
|
origin=flask.session.get("CAS_AFTER_LOGIN_SESSION_URL"),
|
||
|
_external=True,
|
||
|
),
|
||
|
ticket,
|
||
|
)
|
||
|
|
||
|
current_app.logger.debug("Making GET request to {0}".format(cas_validate_url))
|
||
|
|
||
|
xml_from_dict = {}
|
||
|
isValid = False
|
||
|
|
||
|
try:
|
||
|
xmldump = urlopen(cas_validate_url).read().strip().decode("utf8", "ignore")
|
||
|
xml_from_dict = parse(xmldump)
|
||
|
isValid = (
|
||
|
True
|
||
|
if "cas:authenticationSuccess" in xml_from_dict["cas:serviceResponse"]
|
||
|
else False
|
||
|
)
|
||
|
except ValueError:
|
||
|
current_app.logger.error("CAS returned unexpected result")
|
||
|
|
||
|
if isValid:
|
||
|
current_app.logger.debug("valid")
|
||
|
xml_from_dict = xml_from_dict["cas:serviceResponse"][
|
||
|
"cas:authenticationSuccess"
|
||
|
]
|
||
|
username = xml_from_dict["cas:user"]
|
||
|
attributes = xml_from_dict.get("cas:attributes", {})
|
||
|
|
||
|
if attributes and "cas:memberOf" in attributes:
|
||
|
if isinstance(attributes["cas:memberOf"], basestring):
|
||
|
attributes["cas:memberOf"] = (
|
||
|
attributes["cas:memberOf"].lstrip("[").rstrip("]").split(",")
|
||
|
)
|
||
|
for group_number in range(0, len(attributes["cas:memberOf"])):
|
||
|
attributes["cas:memberOf"][group_number] = (
|
||
|
attributes["cas:memberOf"][group_number].lstrip(" ").rstrip(" ")
|
||
|
)
|
||
|
flask.session[cas_username_session_key] = username
|
||
|
flask.session[cas_attributes_session_key] = attributes
|
||
|
else:
|
||
|
current_app.logger.debug("invalid")
|
||
|
|
||
|
return isValid
|