1
0
forked from ScoDoc/ScoDoc

Connexion au CAS (WIP)

This commit is contained in:
Emmanuel Viennet 2023-02-26 21:24:07 +01:00
parent e8f241d6af
commit d8a7f8affa
23 changed files with 796 additions and 71 deletions

View File

@ -17,15 +17,17 @@ from flask import current_app, g, request
from flask import Flask
from flask import abort, flash, has_request_context, jsonify
from flask import render_template
from flask.json import JSONEncoder
from flask.logging import default_handler
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
from flask_bootstrap import Bootstrap
from flask_caching import Cache
from flask_cas import CAS
from flask_login import LoginManager, current_user
from flask_mail import Mail
from flask_bootstrap import Bootstrap
from flask_migrate import Migrate
from flask_moment import Moment
from flask_caching import Cache
from flask_sqlalchemy import SQLAlchemy
from flask.json import JSONEncoder
from flask.logging import default_handler
from jinja2 import select_autoescape
import sqlalchemy
@ -132,7 +134,7 @@ class ScoDocJSONEncoder(JSONEncoder):
def render_raw_html(template_filename: str, **args) -> str:
"""Load and render an HTML file _without_ using Flask
Necessary for 503 error mesage, when DB is down and Flask may be broken.
Necessary for 503 error message, when DB is down and Flask may be broken.
"""
template_path = os.path.join(
current_app.config["SCODOC_DIR"],
@ -226,6 +228,7 @@ class ReverseProxied(object):
def create_app(config_class=DevConfig):
app = Flask(__name__, static_url_path="/ScoDoc/static", static_folder="static")
CAS(app, url_prefix="/cas")
app.wsgi_app = ReverseProxied(app.wsgi_app)
app.json_encoder = ScoDocJSONEncoder
app.logger.setLevel(logging.INFO)
@ -378,6 +381,11 @@ def create_app(config_class=DevConfig):
sco_bulletins_generator.register_bulletin_class(BulletinGeneratorExample)
from app.auth.cas import set_cas_configuration
with app.app_context():
set_cas_configuration(app)
return app

View File

@ -6,3 +6,4 @@ from flask import Blueprint
bp = Blueprint("auth", __name__)
from app.auth import routes
from app.auth import cas

71
app/auth/cas.py Normal file
View File

@ -0,0 +1,71 @@
# -*- coding: UTF-8 -*
"""
auth.cas.py
"""
import datetime
import flask
from flask import current_app, flash, url_for
from flask_login import login_user
from app.auth import bp
from app.auth.models import User
from app.models.config import ScoDocSiteConfig
# after_cas_login/after_cas_logout : routes appelées par redirect depuis le serveur CAS.
@bp.route("/after_cas_login")
def after_cas_login():
"Called by CAS after CAS authentication"
# Ici on a les infos dans flask.session["CAS_ATTRIBUTES"]
if ScoDocSiteConfig.is_cas_enabled() and ("CAS_ATTRIBUTES" in flask.session):
# Lookup user:
cas_id = flask.session["CAS_ATTRIBUTES"].get(
"cas:" + ScoDocSiteConfig.get("cas_attribute_id")
)
if cas_id is not None:
user = User.query.filter_by(cas_id=cas_id).first()
if user and user.active:
if user.cas_allow_login:
current_app.logger.info(f"CAS: login {user.user_name}")
if login_user(user):
flask.session[
"scodoc_cas_login_date"
] = datetime.datetime.now().isoformat()
return flask.redirect(url_for("scodoc.index"))
else:
current_app.logger.info(
f"CAS login denied for {user.user_name} (not allowed to use CAS)"
)
else:
current_app.logger.info(
f"""CAS login denied for {user.user_name if user else ""} cas_id={cas_id} (unknown or inactive)"""
)
# Echec:
flash("échec de l'authentification")
return flask.redirect(url_for("auth.login"))
@bp.route("/after_cas_logout")
def after_cas_logout():
"Called by CAS after CAS logout"
flash("Vous êtes déconnecté")
current_app.logger.info("after_cas_logout")
return flask.redirect(url_for("scodoc.index"))
def set_cas_configuration(app: flask.app.Flask):
"""Force la configuration du module flask_cas à partir des paramètres de
la config de ScoDoc.
Appelé au démarrage et à chaque modif des paramètres.
"""
if ScoDocSiteConfig.is_cas_enabled():
app.config["CAS_SERVER"] = ScoDocSiteConfig.get("cas_server")
app.config["CAS_AFTER_LOGIN"] = "auth.after_cas_login"
app.config["CAS_AFTER_LOGOUT"] = "auth.after_cas_logout"
else:
app.config.pop("CAS_SERVER", None)
app.config.pop("CAS_AFTER_LOGIN", None)
app.config.pop("CAS_AFTER_LOGOUT", None)

View File

@ -8,9 +8,10 @@ import flask
from flask import g, redirect, request, url_for
from flask_httpauth import HTTPBasicAuth, HTTPTokenAuth
import flask_login
from app import login
from app.scodoc.sco_utils import json_error
from app.auth.models import User
from app.scodoc.sco_utils import json_error
basic_auth = HTTPBasicAuth()
token_auth = HTTPTokenAuth()

View File

@ -53,12 +53,28 @@ class User(UserMixin, db.Model):
id = db.Column(db.Integer, primary_key=True)
user_name = db.Column(db.String(64), index=True, unique=True)
"le login"
email = db.Column(db.String(120))
nom = db.Column(db.String(64))
prenom = db.Column(db.String(64))
dept = db.Column(db.String(SHORT_STR_LEN), index=True)
"acronyme du département de l'utilisateur"
active = db.Column(db.Boolean, default=True, index=True)
"si faux, compte utilisateur désactivé"
cas_id = db.Column(db.Text(), index=True, unique=True, nullable=True)
"uid sur le CAS (mail ou autre attribut, selon config.cas_attribute_id)"
cas_allow_login = db.Column(
db.Boolean, default=False, server_default="false", nullable=False
)
"Peut-on se logguer via le CAS ?"
cas_allow_scodoc_login = db.Column(
db.Boolean, default=True, server_default="false", nullable=False
)
"""(not yet implemented XXX)
si CAS activé, peut-on se logguer sur ScoDoc directement ?
(le rôle ScoSuperAdmin peut toujours)
"""
password_hash = db.Column(db.String(128))
password_scodoc7 = db.Column(db.String(42))
@ -184,6 +200,9 @@ class User(UserMixin, db.Model):
"dept": self.dept,
"id": self.id,
"active": self.active,
"cas_id": self.cas_id,
"cas_allow_login": self.cas_allow_login,
"cas_allow_scodoc_login": self.cas_allow_scodoc_login,
"status_txt": "actif" if self.active else "fermé",
"last_seen": self.last_seen.isoformat() + "Z" if self.last_seen else None,
"nom": (self.nom or ""), # sco8
@ -206,7 +225,17 @@ class User(UserMixin, db.Model):
"""Set users' attributes from given dict values.
Roles must be encoded as "roles_string", like "Ens_RT, Secr_CJ"
"""
for field in ["nom", "prenom", "dept", "active", "email", "date_expiration"]:
for field in [
"nom",
"prenom",
"dept",
"active",
"email",
"date_expiration",
"cas_id",
"cas_allow_login",
"cas_allow_scodoc_login",
]:
if field in data:
setattr(self, field, data[field] or None)
if new_user:

View File

@ -3,6 +3,7 @@
auth.routes.py
"""
import flask
from flask import current_app, flash, render_template
from flask import redirect, url_for, request
from flask_login import login_user, logout_user, current_user
@ -20,6 +21,7 @@ from app.auth.models import Role
from app.auth.models import User
from app.auth.email import send_password_reset_email
from app.decorators import admin_required
from app.models.config import ScoDocSiteConfig
_ = lambda x: x # sans babel
_l = _
@ -30,6 +32,7 @@ def login():
"ScoDoc Login form"
if current_user.is_authenticated:
return redirect(url_for("scodoc.index"))
form = LoginForm()
if form.validate_on_submit():
user = User.query.filter_by(user_name=form.user_name.data).first()
@ -42,14 +45,22 @@ def login():
return form.redirect("scodoc.index")
message = request.args.get("message", "")
return render_template(
"auth/login.j2", title=_("Sign In"), form=form, message=message
"auth/login.j2",
title=_("Sign In"),
form=form,
message=message,
is_cas_enabled=ScoDocSiteConfig.is_cas_enabled(),
)
@bp.route("/logout")
def logout():
"Logout current user and redirect to home page"
def logout() -> flask.Response:
"Logout a scodoc user. If CAS session, logout from CAS. Redirect."
current_app.logger.info(f"logout user {current_user.user_name}")
logout_user()
if ScoDocSiteConfig.is_cas_enabled() and flask.session.get("scodoc_cas_login_date"):
flask.session.pop("scodoc_cas_login_date", None)
return redirect(url_for("cas.logout"))
return redirect(url_for("scodoc.index"))

View File

@ -0,0 +1,53 @@
# -*- mode: python -*-
# -*- coding: utf-8 -*-
##############################################################################
#
# ScoDoc
#
# Copyright (c) 1999 - 2023 Emmanuel Viennet. All rights reserved.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Emmanuel Viennet emmanuel.viennet@viennet.net
#
##############################################################################
"""
Formulaires configuration Exports Apogée (codes)
"""
from flask_wtf import FlaskForm
from wtforms import BooleanField, SubmitField
from wtforms.fields.simple import StringField
class ConfigCASForm(FlaskForm):
"Formulaire paramétrage CAS"
cas_enable = BooleanField("activer le CAS")
cas_server = StringField(
label="URL du serveur CAS",
description="""url complète. Commence en général par <tt>https://</tt>.""",
)
cas_attribute_id = StringField(
label="Attribut CAS utilisé comme id",
description="""Le champs CAS qui sera considéré comme l'id unique des
comptes utilisateurs.""",
)
submit = SubmitField("Valider")
cancel = SubmitField("Annuler", render_kw={"formnovalidate": True})

View File

@ -87,6 +87,10 @@ class ScoDocSiteConfig(db.Model):
"enable_entreprises": bool,
"month_debut_annee_scolaire": int,
"month_debut_periode2": int,
# CAS
"cas_enable": bool,
"cas_server": str,
"cas_attribute_id": str,
}
def __init__(self, name, value):
@ -170,7 +174,7 @@ class ScoDocSiteConfig(db.Model):
(starting with empty string to represent "no bonus function").
"""
d = bonus_spo.get_bonus_class_dict()
class_list = [(name, d[name].displayed_name) for name in d.keys()]
class_list = [(name, d[name].displayed_name) for name in d]
class_list.sort(key=lambda x: x[1].replace(" du ", " de "))
return [("", "")] + class_list
@ -204,13 +208,31 @@ class ScoDocSiteConfig(db.Model):
db.session.add(cfg)
db.session.commit()
@classmethod
def is_cas_enabled(cls) -> bool:
"""True si on utilise le CAS"""
cfg = ScoDocSiteConfig.query.filter_by(name="cas_enable").first()
return cfg is not None and cfg.value
@classmethod
def cas_enable(cls, enabled=True) -> bool:
"""Active (ou déactive) le CAS. True si changement."""
if enabled != ScoDocSiteConfig.is_cas_enabled():
cfg = ScoDocSiteConfig.query.filter_by(name="cas_enable").first()
if cfg is None:
cfg = ScoDocSiteConfig(name="cas_enable", value="on" if enabled else "")
else:
cfg.value = "on" if enabled else ""
db.session.add(cfg)
db.session.commit()
return True
return False
@classmethod
def is_entreprises_enabled(cls) -> bool:
"""True si on doit activer le module entreprise"""
cfg = ScoDocSiteConfig.query.filter_by(name="enable_entreprises").first()
if (cfg is None) or not cfg.value:
return False
return True
return cfg is not None and cfg.value
@classmethod
def enable_entreprises(cls, enabled=True) -> bool:
@ -228,6 +250,26 @@ class ScoDocSiteConfig(db.Model):
return True
return False
@classmethod
def get(cls, name: str) -> str:
"Get configuration param; empty string if unset"
cfg = ScoDocSiteConfig.query.filter_by(name=name).first()
return (cfg.value or "") if cfg else ""
@classmethod
def set(cls, name: str, value: str) -> bool:
"Set parameter, returns True if change. Commit session."
if cls.get(name) != (value or ""):
cfg = ScoDocSiteConfig.query.filter_by(name=name).first()
if cfg is None:
cfg = ScoDocSiteConfig(name=name, value=str(value))
else:
cfg.value = str(value or "")
db.session.add(cfg)
db.session.commit()
return True
return False
@classmethod
def _get_int_field(cls, name: str, default=None) -> int:
"""Valeur d'un champs integer"""

View File

@ -225,6 +225,9 @@ _identiteEditor = ndb.EditableTable(
"nom",
"nom_usuel",
"prenom",
"cas_id",
"cas_allow_login",
"cas_allow_scodoc_login",
"civilite", # 'M", "F", or "X"
"date_naissance",
"lieu_naissance",

View File

@ -28,11 +28,10 @@
"""Fonctions sur les utilisateurs
"""
# Anciennement ZScoUsers.py, fonctions de gestion des données réécrite avec flask/SQLAlchemy
# Anciennement ZScoUsers.py, fonctions de gestion des données réécrites avec flask/SQLAlchemy
import re
from flask import url_for, g, request
from flask.templating import render_template
from flask_login import current_user
@ -42,23 +41,11 @@ from app.auth.models import Permission
from app.auth.models import User
from app.scodoc import html_sco_header
from app.scodoc import sco_etud
from app.scodoc import sco_excel
from app.scodoc import sco_preferences
from app.scodoc.gen_tables import GenTable
from app import log, cache
from app.scodoc.scolog import logdb
import app.scodoc.sco_utils as scu
from app import cache
from app.scodoc.sco_exceptions import (
AccessDenied,
ScoValueError,
)
# ---------------
# ---------------
from app.scodoc.sco_exceptions import ScoValueError
def index_html(all_depts=False, with_inactives=False, format="html"):
@ -70,19 +57,21 @@ def index_html(all_depts=False, with_inactives=False, format="html"):
if current_user.has_permission(Permission.ScoUsersAdmin, g.scodoc_dept):
H.append(
'<p><a href="{}" class="stdlink">Ajouter un utilisateur</a>'.format(
url_for("users.create_user_form", scodoc_dept=g.scodoc_dept)
)
f"""<p><a href="{url_for("users.create_user_form",
scodoc_dept=g.scodoc_dept)
}" class="stdlink">Ajouter un utilisateur</a>"""
)
if current_user.is_administrator():
H.append(
'&nbsp;&nbsp; <a href="{}" class="stdlink">Importer des utilisateurs</a></p>'.format(
url_for("users.import_users_form", scodoc_dept=g.scodoc_dept)
)
"""&nbsp;&nbsp; <a href="{url_for("users.import_users_form",
scodoc_dept=g.scodoc_dept)
}" class="stdlink">Importer des utilisateurs</a></p>"""
)
else:
H.append(
"&nbsp;&nbsp; Pour importer des utilisateurs en masse (via xlsx file) contactez votre administrateur scodoc."
"""&nbsp;&nbsp; Pour importer des utilisateurs en masse (via fichier xlsx)
contactez votre administrateur scodoc."""
)
if all_depts:
checked = "checked"
@ -93,11 +82,12 @@ def index_html(all_depts=False, with_inactives=False, format="html"):
else:
olds_checked = ""
H.append(
"""<p><form name="f" action="%s" method="get">
<input type="checkbox" name="all_depts" value="1" onchange="document.f.submit();" %s>Tous les départements</input>
<input type="checkbox" name="with_inactives" value="1" onchange="document.f.submit();" %s>Avec anciens utilisateurs</input>
f"""<p><form name="f" action="{request.base_url}" method="get">
<input type="checkbox" name="all_depts" value="1" onchange="document.f.submit();"
{checked}>Tous les départements</input>
<input type="checkbox" name="with_inactives" value="1" onchange="document.f.submit();"
{olds_checked}>Avec anciens utilisateurs</input>
</form></p>"""
% (request.base_url, checked, olds_checked)
)
L = list_users(
@ -189,9 +179,8 @@ def list_users(
},
caption=title,
page_title="title",
html_title="""<h2>%d utilisateurs %s</h2>
<p class="help">Cliquer sur un nom pour changer son mot de passe</p>"""
% (len(r), comm),
html_title=f"""<h2>{len(r)} utilisateurs {comm}</h2>
<p class="help">Cliquer sur un nom pour changer son mot de passe</p>""",
html_class="table_leftalign list_users",
html_with_td_classes=True,
html_sortable=True,
@ -273,6 +262,9 @@ def user_info(user_name_or_id=None, user: User = None):
return info
MSG_OPT = """<br>Attention: (vous pouvez forcer l'opération en cochant "<em>Ignorer les avertissements</em>" en bas de page)"""
def check_modif_user(
edit,
enforce_optionals=False,
@ -281,7 +273,8 @@ def check_modif_user(
prenom="",
email="",
dept="",
roles=[],
roles: list = None,
cas_id: str = None,
):
"""Vérifie que cet utilisateur peut être créé (edit=0) ou modifié (edit=1)
Cherche homonymes.
@ -290,32 +283,31 @@ def check_modif_user(
(si ok est faux, l'utilisateur peut quand même forcer la creation)
- msg: message warning à presenter à l'utilisateur
"""
MSG_OPT = """<br>Attention: (vous pouvez forcer l'opération en cochant "<em>Ignorer les avertissements</em>" en bas de page)"""
roles = roles or []
# ce login existe ?
user = _user_list(user_name)
user = User.query.filter_by(user_name=user_name).first()
if edit and not user: # safety net, le user_name ne devrait pas changer
return False, "identifiant %s inexistant" % user_name
return False, f"identifiant {user_name} inexistant"
if not edit and user:
return False, "identifiant %s déjà utilisé" % user_name
return False, f"identifiant {user_name} déjà utilisé"
if not user_name or not nom or not prenom:
return False, "champ requis vide"
if not re.match(r"^[a-zA-Z0-9@\\\-_\\\.]*$", user_name):
return (
False,
"identifiant '%s' invalide (pas d'accents ni de caractères spéciaux)"
% user_name,
f"identifiant '{user_name}' invalide (pas d'accents ni de caractères spéciaux)",
)
if enforce_optionals and len(user_name) > 64:
return False, "identifiant '%s' trop long (64 caractères)" % user_name
return False, f"identifiant '{user_name}' trop long (64 caractères)"
if enforce_optionals and len(nom) > 64:
return False, "nom '%s' trop long (64 caractères)" % nom + MSG_OPT
return False, f"nom '{nom}' trop long (64 caractères)" + MSG_OPT
if enforce_optionals and len(prenom) > 64:
return False, "prenom '%s' trop long (64 caractères)" % prenom + MSG_OPT
# check that tha same user_name has not already been described in this import
return False, f"prenom '{prenom}' trop long (64 caractères)" + MSG_OPT
# check that same user_name has not already been described in this import
if not email:
return False, "vous devriez indiquer le mail de l'utilisateur créé !"
if len(email) > 120:
return False, "email '%s' trop long (120 caractères)" % email
return False, f"email '{email}' trop long (120 caractères)"
if not re.fullmatch(r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b", email):
return False, "l'adresse mail semble incorrecte"
# check département
@ -324,19 +316,31 @@ def check_modif_user(
and dept
and Departement.query.filter_by(acronym=dept).first() is None
):
return False, "département '%s' inexistant" % dept + MSG_OPT
return False, f"département '{dept}' inexistant" + MSG_OPT
if enforce_optionals and not roles:
return False, "aucun rôle sélectionné, êtes vous sûr ?" + MSG_OPT
# Unicité du mail
users_with_this_mail = User.query.filter_by(email=email).all()
if edit: # modification
if email != user["email"] and len(users_with_this_mail) > 0:
if email != user.email and len(users_with_this_mail) > 0:
return False, "un autre utilisateur existe déjà avec cette adresse mail"
else: # création utilisateur
if len(users_with_this_mail) > 0:
return False, "un autre utilisateur existe déjà avec cette adresse mail"
# ok
# Unicité du cas_id
if cas_id:
cas_users = User.query.filter_by(cas_id=cas_id).all()
if edit:
if cas_users and (
len(cas_users) > 1 or cas_users[0].user_name != user_name
):
return (
False,
"un autre utilisateur existe déjà avec cet identifiant CAS",
)
elif cas_users:
return False, "un autre utilisateur existe déjà avec cet identifiant CAS"
# Des noms/prénoms semblables existent ?
nom = nom.lower().strip()
prenom = prenom.lower().strip()
@ -367,7 +371,7 @@ def user_edit(user_name, vals):
"""Edit the user specified by user_name
(ported from Zope to SQLAlchemy, hence strange !)
"""
u = User.query.filter_by(user_name=user_name).first()
u: User = User.query.filter_by(user_name=user_name).first()
if not u:
raise ScoValueError("Invalid user_name")
u.from_dict(vals)

View File

@ -9,6 +9,12 @@
{% endif %}
<h1>Connexion</h1>
{% if is_cas_enabled %}
<div class"cas_link">
<a href="{{ url_for('cas.login') }}" class="stdlink">Se connecter avec CAS</a>
</div>
{% endif %}
<div class="row">
<div class="col-md-4">
{{ wtf.quick_form(form) }}

View File

@ -7,6 +7,9 @@
<h2>Utilisateur: {{user.user_name}} ({{'actif' if user.active else 'fermé'}})</h2>
<p>
<b>Login :</b> {{user.user_name}}<br>
<b>CAS id:</b> {{user.cas_id or "(aucun)"}}
(CAS {{'autorisé' if user.cas_allow_login else 'interdit'}} pour cet utilisateur)
<br>
<b>Nom :</b> {{user.nom or ""}}<br>
<b>Prénom :</b> {{user.prenom or ""}}<br>
<b>Mail :</b> {{user.email}}<br>
@ -48,9 +51,15 @@
</ul>
{% if current_user.id == user.id %}
<p><b>Se déconnecter:
<a class="stdlink" href="{{url_for('auth.logout')}}">logout</a>
</b></p>
<div class="user_info_session">
{% if session_info %}
<p><b>CAS session started at </b>{{ session_info }}</p>
{% endif %}
<p><b>Se déconnecter:
<a class="stdlink" href="{{url_for('auth.logout')}}">logout</a>
</b>
</p>
</div>
{% endif %}
{# Liste des permissions #}

View File

@ -0,0 +1,18 @@
{% extends "base.j2" %}
{% import 'bootstrap/wtf.html' as wtf %}
{% block app_content %}
<h1>Configuration du Service d'Authentification Central (CAS)</h1>
<div class="help">
<p>Le CAS...</p>
</div>
<div class="row">
<div class="col-md-4">
{{ wtf.quick_form(form) }}
</div>
</div>
{% endblock %}

View File

@ -53,8 +53,9 @@
</p>
</section>
<h2>Utilisateurs</h2>
<h2>Utilisateurs et CAS</h2>
<section>
<p><a class="stdlink" href="{{url_for('scodoc.config_cas')}}">configuration du service CAS</a>
<p><a class="stdlink" href="{{url_for('auth.reset_standard_roles_permissions')}}">remettre
les permissions des rôles standards à leurs valeurs par défaut</a> (efface les modifications apportées)
</p>

View File

@ -62,6 +62,7 @@ from app.decorators import (
from app.forms.main import config_logos, config_main
from app.forms.main.create_dept import CreateDeptForm
from app.forms.main.config_apo import CodesDecisionsForm
from app.forms.main.config_cas import ConfigCASForm
from app import models
from app.models import Departement, Identite
from app.models import departements
@ -134,6 +135,33 @@ def toggle_dept_vis(dept_id):
return redirect(url_for("scodoc.index"))
@bp.route("/ScoDoc/config_cas", methods=["GET", "POST"])
@admin_required
def config_cas():
"""Form config CAS"""
form = ConfigCASForm()
if request.method == "POST" and form.cancel.data: # cancel button
return redirect(url_for("scodoc.index"))
if form.validate_on_submit():
if ScoDocSiteConfig.cas_enable(enabled=form.data["cas_enable"]):
flash("CAS " + ("activé" if form.data["cas_enable"] else "désactivé"))
if ScoDocSiteConfig.set("cas_server", form.data["cas_server"]):
flash("Serveur CAS enregistré")
if ScoDocSiteConfig.set("cas_attribute_id", form.data["cas_attribute_id"]):
flash("Serveur CAS enregistré")
return redirect(url_for("scodoc.configuration"))
elif request.method == "GET":
form.cas_enable.data = ScoDocSiteConfig.get("cas_enable")
form.cas_server.data = ScoDocSiteConfig.get("cas_server")
form.cas_attribute_id.data = ScoDocSiteConfig.get("cas_attribute_id")
return render_template(
"config_cas.j2",
form=form,
title="Configuration du Service d'Authentification Central (CAS)",
)
@bp.route("/ScoDoc/config_codes_decisions", methods=["GET", "POST"])
@admin_required
def config_codes_decisions():

View File

@ -56,6 +56,7 @@ from app.auth.models import UserRole
from app.auth.models import is_valid_password
from app.email import send_email
from app.models import Departement
from app.models.config import ScoDocSiteConfig
from app.decorators import (
scodoc,
@ -226,7 +227,7 @@ def create_user_form(user_name=None, edit=0, all_roles=True):
if edit:
if not user_name:
raise ValueError("missing argument: user_name")
the_user = User.query.filter_by(user_name=user_name).first()
the_user: User = User.query.filter_by(user_name=user_name).first()
if not the_user:
raise ScoValueError("utilisateur inexistant")
initvalues = the_user.to_dict()
@ -367,6 +368,7 @@ def create_user_form(user_name=None, edit=0, all_roles=True):
{"input_type": "hidden", "default": initvalues["user_name"]},
)
]
cas_enabled = ScoDocSiteConfig.is_cas_enabled()
descr += [
(
"email",
@ -376,11 +378,34 @@ def create_user_form(user_name=None, edit=0, all_roles=True):
"explanation": "requis, doit fonctionner"
if not edit_only_roles
else "",
"size": 20,
"size": 36,
"allow_null": False,
"readonly": edit_only_roles,
},
)
),
(
"cas_id",
{
"title": "Identifiant CAS",
"input_type": "text",
"explanation": "id du compte utilisateur sur le CAS de l'établissement "
+ "(service CAS activé)"
if cas_enabled
else "(service CAS non activé)",
"size": 36,
"allow_null": True,
"readonly": not cas_enabled,
},
),
(
"cas_allow_login",
{
"title": "Autorise connexion via CAS",
"input_type": "boolcheckbox",
"explanation": "en test: seul le super-administrateur peut changer ce réglage",
"readonly": not current_user.is_administrator(),
},
),
]
if not edit: # options création utilisateur
descr += [
@ -438,7 +463,8 @@ def create_user_form(user_name=None, edit=0, all_roles=True):
"d",
{
"input_type": "separator",
"title": f"""L'utilisateur appartient au département {the_user.dept or "(tous)"}""",
"title": f"""L'utilisateur appartient au département {
the_user.dept or "(tous)"}""",
},
)
)
@ -541,7 +567,6 @@ def create_user_form(user_name=None, edit=0, all_roles=True):
if err_msg:
H.append(tf_error_message(f"""Erreur: {err_msg}"""))
return "\n".join(H) + "\n" + tf[1] + F
if not edit_only_roles:
ok_modif, msg = sco_users.check_modif_user(
edit,
@ -552,6 +577,7 @@ def create_user_form(user_name=None, edit=0, all_roles=True):
email=vals["email"],
dept=vals.get("dept", auth_dept),
roles=vals["roles"],
cas_id=vals["cas_id"],
)
if not ok_modif:
H.append(tf_error_message(msg))
@ -815,12 +841,17 @@ def user_info_page(user_name=None):
if not user:
raise ScoValueError("invalid user_name")
session_info = None
if user.id == current_user.id:
session_info = flask.session.get("scodoc_cas_login_date")
return render_template(
"auth/user_info_page.j2",
user=user,
title=f"Utilisateur {user.user_name}",
Permission=Permission,
dept=dept,
session_info=session_info,
)

View File

@ -43,6 +43,10 @@ class Config:
# STATIC_URL_PATH = "/ScoDoc/static"
# static_folder = "stat"
# SERVER_NAME = os.environ.get("SERVER_NAME")
# XXX temporaire: utiliser SiteConfig
CAS_SERVER = os.environ.get("CAS_SERVER")
CAS_AFTER_LOGIN = os.environ.get("CAS_AFTER_LOGIN")
CAS_AFTER_LOGOUT = os.environ.get("CAS_AFTER_LOGOUT")
class ProdConfig(Config):

7
flask_cas/README.md Normal file
View File

@ -0,0 +1,7 @@
# Flask-CAS
Forked from <https://github.com/MasterRoshan/flask-cas-ng>
and adapted by Emmanuel Viennet, Feb. 2023.
- logout: clear `_CAS_TOKEN`.
- Use `url` instead of `service` parameter in logout URL.

106
flask_cas/__init__.py Normal file
View File

@ -0,0 +1,106 @@
"""
flask_cas.__init__
"""
import flask
from flask import current_app
# Find the stack on which we want to store the database connection.
# Starting with Flask 0.9, the _app_ctx_stack is the correct one,
# before that we need to use the _request_ctx_stack.
try:
from flask import _app_ctx_stack as stack
except ImportError:
from flask import _request_ctx_stack as stack
from . import routing
from functools import wraps
class CAS(object):
"""
Required Configs:
|Key |
|----------------|
|CAS_SERVER |
|CAS_AFTER_LOGIN |
Optional Configs:
|Key | Default |
|---------------------------|-----------------------|
|CAS_TOKEN_SESSION_KEY | _CAS_TOKEN |
|CAS_USERNAME_SESSION_KEY | CAS_USERNAME |
|CAS_ATTRIBUTES_SESSION_KEY | CAS_ATTRIBUTES |
|CAS_LOGIN_ROUTE | '/cas' |
|CAS_LOGOUT_ROUTE | '/cas/logout' |
|CAS_VALIDATE_ROUTE | '/cas/serviceValidate'|
|CAS_AFTER_LOGOUT | None |
"""
def __init__(self, app=None, url_prefix=None):
self._app = app
if app is not None:
self.init_app(app, url_prefix)
def init_app(self, app, url_prefix=None):
# Configuration defaults
app.config.setdefault('CAS_TOKEN_SESSION_KEY', '_CAS_TOKEN')
app.config.setdefault('CAS_USERNAME_SESSION_KEY', 'CAS_USERNAME')
app.config.setdefault('CAS_ATTRIBUTES_SESSION_KEY', 'CAS_ATTRIBUTES')
app.config.setdefault('CAS_LOGIN_ROUTE', '/cas')
app.config.setdefault('CAS_LOGOUT_ROUTE', '/cas/logout')
app.config.setdefault('CAS_VALIDATE_ROUTE', '/cas/serviceValidate')
# Requires CAS 2.0
app.config.setdefault('CAS_AFTER_LOGOUT', None)
# Register Blueprint
app.register_blueprint(routing.blueprint, url_prefix=url_prefix)
# Use the newstyle teardown_appcontext if it's available,
# otherwise fall back to the request context
if hasattr(app, 'teardown_appcontext'):
app.teardown_appcontext(self.teardown)
else:
app.teardown_request(self.teardown)
def teardown(self, exception):
ctx = stack.top
@property
def app(self):
return self._app or current_app
@property
def username(self):
return flask.session.get(
self.app.config['CAS_USERNAME_SESSION_KEY'], None)
@property
def attributes(self):
return flask.session.get(
self.app.config['CAS_ATTRIBUTES_SESSION_KEY'], None)
@property
def token(self):
return flask.session.get(
self.app.config['CAS_TOKEN_SESSION_KEY'], None)
def login():
return flask.redirect(flask.url_for('cas.login', _external=True))
def logout():
return flask.redirect(flask.url_for('cas.logout', _external=True))
def login_required(function):
@wraps(function)
def wrap(*args, **kwargs):
if 'CAS_USERNAME' not in flask.session:
flask.session['CAS_AFTER_LOGIN_SESSION_URL'] = (
flask.request.script_root +
flask.request.full_path
)
return login()
else:
return function(*args, **kwargs)
return wrap

125
flask_cas/cas_urls.py Normal file
View File

@ -0,0 +1,125 @@
"""
flask_cas.cas_urls
Functions for creating urls to access CAS.
"""
try:
from urllib import quote
from urllib import urlencode
from urlparse import urljoin
except ImportError:
from urllib.parse import quote
from urllib.parse import urljoin
from urllib.parse import urlencode
def create_url(base, path=None, *query):
"""Create a url.
Creates a url by combining base, path, and the query's list of
key/value pairs. Escaping is handled automatically. Any
key/value pair with a value that is None is ignored.
Keyword arguments:
base -- The left most part of the url (ex. http://localhost:5000).
path -- The path after the base (ex. /foo/bar).
query -- A list of key value pairs (ex. [('key', 'value')]).
Example usage:
>>> create_url(
... 'http://localhost:5000',
... 'foo/bar',
... ('key1', 'value'),
... ('key2', None), # Will not include None
... ('url', 'http://example.com'),
... )
'http://localhost:5000/foo/bar?key1=value&url=http%3A%2F%2Fexample.com'
"""
url = base
# Add the path to the url if it's not None.
if path is not None:
url = urljoin(url, quote(path))
# Remove key/value pairs with None values.
query = filter(lambda pair: pair[1] is not None, query)
# Add the query string to the url
url = urljoin(url, "?{0}".format(urlencode(list(query))))
return url
def create_cas_login_url(cas_url, cas_route, service, renew=None, gateway=None):
"""Create a CAS login URL .
Keyword arguments:
cas_url -- The url to the CAS (ex. http://sso.pdx.edu)
cas_route -- The route where the CAS lives on server (ex. /cas)
service -- (ex. http://localhost:5000/login)
renew -- "true" or "false"
gateway -- "true" or "false"
Example usage:
>>> create_cas_login_url(
... 'http://sso.pdx.edu',
... '/cas',
... 'http://localhost:5000',
... )
'http://sso.pdx.edu/cas?service=http%3A%2F%2Flocalhost%3A5000'
"""
return create_url(
cas_url,
cas_route,
("service", service),
("renew", renew),
("gateway", gateway),
)
def create_cas_logout_url(cas_url, cas_route, service=None):
"""Create a CAS logout URL.
Keyword arguments:
cas_url -- The url to the CAS (ex. http://sso.pdx.edu)
cas_route -- The route where the CAS lives on server (ex. /cas/logout)
url -- (ex. http://localhost:5000/login)
Example usage:
>>> create_cas_logout_url(
... 'http://sso.pdx.edu',
... '/cas/logout',
... 'http://localhost:5000',
... )
'http://sso.pdx.edu/cas/logout?service=http%3A%2F%2Flocalhost%3A5000'
"""
return create_url(
cas_url,
cas_route,
("url", service), # 'url' (and not 'service'): redirect here after CAS logout
)
def create_cas_validate_url(cas_url, cas_route, service, ticket, renew=None):
"""Create a CAS validate URL.
Keyword arguments:
cas_url -- The url to the CAS (ex. http://sso.pdx.edu)
cas_route -- The route where the CAS lives on server (ex. /cas/serviceValidate)
service -- (ex. http://localhost:5000/login)
ticket -- (ex. 'ST-58274-x839euFek492ou832Eena7ee-cas')
renew -- "true" or "false"
Example usage:
>>> create_cas_validate_url(
... 'http://sso.pdx.edu',
... '/cas/serviceValidate',
... 'http://localhost:5000/login',
... 'ST-58274-x839euFek492ou832Eena7ee-cas'
... )
'http://sso.pdx.edu/cas/serviceValidate?service=http%3A%2F%2Flocalhost%3A5000%2Flogin&ticket=ST-58274-x839euFek492ou832Eena7ee-cas'
"""
return create_url(
cas_url,
cas_route,
("service", service),
("ticket", ticket),
("renew", renew),
)

165
flask_cas/routing.py Normal file
View File

@ -0,0 +1,165 @@
import flask
from xmltodict import parse
from flask import current_app
from .cas_urls import create_cas_login_url
from .cas_urls import create_cas_logout_url
from .cas_urls import create_cas_validate_url
try:
from urllib import urlopen
except ImportError:
from urllib.request import urlopen
blueprint = flask.Blueprint("cas", __name__)
@blueprint.route("/login/")
def login():
"""
This route has two purposes. First, it is used by the user
to login. Second, it is used by the CAS to respond with the
`ticket` after the user logs in successfully.
When the user accesses this url, they are redirected to the CAS
to login. If the login was successful, the CAS will respond to this
route with the ticket in the url. The ticket is then validated.
If validation was successful the logged in username is saved in
the user's session under the key `CAS_USERNAME_SESSION_KEY` and
the user's attributes are saved under the key
'CAS_USERNAME_ATTRIBUTE_KEY'
"""
cas_token_session_key = current_app.config["CAS_TOKEN_SESSION_KEY"]
redirect_url = create_cas_login_url(
current_app.config["CAS_SERVER"],
current_app.config["CAS_LOGIN_ROUTE"],
flask.url_for(
".login",
origin=flask.session.get("CAS_AFTER_LOGIN_SESSION_URL"),
_external=True,
),
)
if "ticket" in flask.request.args:
flask.session[cas_token_session_key] = flask.request.args["ticket"]
if cas_token_session_key in flask.session:
if validate(flask.session[cas_token_session_key]):
if "CAS_AFTER_LOGIN_SESSION_URL" in flask.session:
redirect_url = flask.session.pop("CAS_AFTER_LOGIN_SESSION_URL")
elif flask.request.args.get("origin"):
redirect_url = flask.request.args["origin"]
else:
redirect_url = flask.url_for(current_app.config["CAS_AFTER_LOGIN"])
else:
flask.session.pop(cas_token_session_key, None)
current_app.logger.debug("Redirecting to: {0}".format(redirect_url))
return flask.redirect(redirect_url)
@blueprint.route("/logout/")
def logout():
"""
When the user accesses this route they are logged out.
"""
cas_username_session_key = current_app.config["CAS_USERNAME_SESSION_KEY"]
cas_attributes_session_key = current_app.config["CAS_ATTRIBUTES_SESSION_KEY"]
cas_token_session_key = current_app.config["CAS_TOKEN_SESSION_KEY"]
flask.session.pop(cas_username_session_key, None)
flask.session.pop(cas_attributes_session_key, None)
flask.session.pop(cas_token_session_key, None) # added by EV
cas_after_logout = current_app.config["CAS_AFTER_LOGOUT"]
if cas_after_logout is not None:
# If config starts with http, use it as dest URL.
# Else, build Flask URL
dest_url = (
cas_after_logout
if cas_after_logout.startswith("http")
else flask.url_for(cas_after_logout, _external=True)
)
redirect_url = create_cas_logout_url(
current_app.config["CAS_SERVER"],
current_app.config["CAS_LOGOUT_ROUTE"],
dest_url,
)
else:
redirect_url = create_cas_logout_url(
current_app.config["CAS_SERVER"], current_app.config["CAS_LOGOUT_ROUTE"]
)
current_app.logger.debug("Redirecting to: {0}".format(redirect_url))
return flask.redirect(redirect_url)
def validate(ticket):
"""
Will attempt to validate the ticket. If validation fails, then False
is returned. If validation is successful, then True is returned
and the validated username is saved in the session under the
key `CAS_USERNAME_SESSION_KEY` while tha validated attributes dictionary
is saved under the key 'CAS_ATTRIBUTES_SESSION_KEY'.
"""
cas_username_session_key = current_app.config["CAS_USERNAME_SESSION_KEY"]
cas_attributes_session_key = current_app.config["CAS_ATTRIBUTES_SESSION_KEY"]
current_app.logger.debug("validating token {0}".format(ticket))
cas_validate_url = create_cas_validate_url(
current_app.config["CAS_SERVER"],
current_app.config["CAS_VALIDATE_ROUTE"],
flask.url_for(
".login",
origin=flask.session.get("CAS_AFTER_LOGIN_SESSION_URL"),
_external=True,
),
ticket,
)
current_app.logger.debug("Making GET request to {0}".format(cas_validate_url))
xml_from_dict = {}
isValid = False
try:
xmldump = urlopen(cas_validate_url).read().strip().decode("utf8", "ignore")
xml_from_dict = parse(xmldump)
isValid = (
True
if "cas:authenticationSuccess" in xml_from_dict["cas:serviceResponse"]
else False
)
except ValueError:
current_app.logger.error("CAS returned unexpected result")
if isValid:
current_app.logger.debug("valid")
xml_from_dict = xml_from_dict["cas:serviceResponse"][
"cas:authenticationSuccess"
]
username = xml_from_dict["cas:user"]
attributes = xml_from_dict.get("cas:attributes", {})
if attributes and "cas:memberOf" in attributes:
if isinstance(attributes["cas:memberOf"], basestring):
attributes["cas:memberOf"] = (
attributes["cas:memberOf"].lstrip("[").rstrip("]").split(",")
)
for group_number in range(0, len(attributes["cas:memberOf"])):
attributes["cas:memberOf"][group_number] = (
attributes["cas:memberOf"][group_number].lstrip(" ").rstrip(" ")
)
flask.session[cas_username_session_key] = username
flask.session[cas_attributes_session_key] = attributes
else:
current_app.logger.debug("invalid")
return isValid

View File

@ -85,4 +85,5 @@ visitor==0.1.3
Werkzeug==2.1.1
wrapt==1.14.0
WTForms==3.0.1
xmltodict==0.13.0
zipp==3.8.0

View File

@ -10,6 +10,7 @@ SCONEWS = """
<ul>
<li>ScoDoc 9.4</li>
<ul>
<li>Connexion avec service CAS</li>
<li>Améliorations des tableaux récapitulatifs</li>
<li>Nouvelle interface de gestions des groupes (S. Lehmann)</li>
<li>Enrichissement des jurys BUT et des procès-verbaux associés.</li>