diff --git a/app/__init__.py b/app/__init__.py index 72cc1cb72..a1edfbd7b 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -17,15 +17,17 @@ from flask import current_app, g, request from flask import Flask from flask import abort, flash, has_request_context, jsonify from flask import render_template -from flask.json import JSONEncoder -from flask.logging import default_handler -from flask_sqlalchemy import SQLAlchemy -from flask_migrate import Migrate +from flask_bootstrap import Bootstrap +from flask_caching import Cache +from flask_cas import CAS from flask_login import LoginManager, current_user from flask_mail import Mail -from flask_bootstrap import Bootstrap +from flask_migrate import Migrate from flask_moment import Moment -from flask_caching import Cache +from flask_sqlalchemy import SQLAlchemy +from flask.json import JSONEncoder +from flask.logging import default_handler + from jinja2 import select_autoescape import sqlalchemy @@ -132,7 +134,7 @@ class ScoDocJSONEncoder(JSONEncoder): def render_raw_html(template_filename: str, **args) -> str: """Load and render an HTML file _without_ using Flask - Necessary for 503 error mesage, when DB is down and Flask may be broken. + Necessary for 503 error message, when DB is down and Flask may be broken. """ template_path = os.path.join( current_app.config["SCODOC_DIR"], @@ -226,6 +228,7 @@ class ReverseProxied(object): def create_app(config_class=DevConfig): app = Flask(__name__, static_url_path="/ScoDoc/static", static_folder="static") + CAS(app, url_prefix="/cas") app.wsgi_app = ReverseProxied(app.wsgi_app) app.json_encoder = ScoDocJSONEncoder app.logger.setLevel(logging.INFO) @@ -378,6 +381,11 @@ def create_app(config_class=DevConfig): sco_bulletins_generator.register_bulletin_class(BulletinGeneratorExample) + from app.auth.cas import set_cas_configuration + + with app.app_context(): + set_cas_configuration(app) + return app diff --git a/app/auth/__init__.py b/app/auth/__init__.py index 4fb9575d9..7dcff2b3e 100644 --- a/app/auth/__init__.py +++ b/app/auth/__init__.py @@ -6,3 +6,4 @@ from flask import Blueprint bp = Blueprint("auth", __name__) from app.auth import routes +from app.auth import cas diff --git a/app/auth/cas.py b/app/auth/cas.py new file mode 100644 index 000000000..067ea3506 --- /dev/null +++ b/app/auth/cas.py @@ -0,0 +1,71 @@ +# -*- coding: UTF-8 -* +""" +auth.cas.py +""" +import datetime + +import flask +from flask import current_app, flash, url_for +from flask_login import login_user + +from app.auth import bp +from app.auth.models import User +from app.models.config import ScoDocSiteConfig + +# after_cas_login/after_cas_logout : routes appelées par redirect depuis le serveur CAS. + + +@bp.route("/after_cas_login") +def after_cas_login(): + "Called by CAS after CAS authentication" + # Ici on a les infos dans flask.session["CAS_ATTRIBUTES"] + if ScoDocSiteConfig.is_cas_enabled() and ("CAS_ATTRIBUTES" in flask.session): + # Lookup user: + cas_id = flask.session["CAS_ATTRIBUTES"].get( + "cas:" + ScoDocSiteConfig.get("cas_attribute_id") + ) + if cas_id is not None: + user = User.query.filter_by(cas_id=cas_id).first() + if user and user.active: + if user.cas_allow_login: + current_app.logger.info(f"CAS: login {user.user_name}") + if login_user(user): + flask.session[ + "scodoc_cas_login_date" + ] = datetime.datetime.now().isoformat() + return flask.redirect(url_for("scodoc.index")) + else: + current_app.logger.info( + f"CAS login denied for {user.user_name} (not allowed to use CAS)" + ) + else: + current_app.logger.info( + f"""CAS login denied for {user.user_name if user else ""} cas_id={cas_id} (unknown or inactive)""" + ) + + # Echec: + flash("échec de l'authentification") + return flask.redirect(url_for("auth.login")) + + +@bp.route("/after_cas_logout") +def after_cas_logout(): + "Called by CAS after CAS logout" + flash("Vous êtes déconnecté") + current_app.logger.info("after_cas_logout") + return flask.redirect(url_for("scodoc.index")) + + +def set_cas_configuration(app: flask.app.Flask): + """Force la configuration du module flask_cas à partir des paramètres de + la config de ScoDoc. + Appelé au démarrage et à chaque modif des paramètres. + """ + if ScoDocSiteConfig.is_cas_enabled(): + app.config["CAS_SERVER"] = ScoDocSiteConfig.get("cas_server") + app.config["CAS_AFTER_LOGIN"] = "auth.after_cas_login" + app.config["CAS_AFTER_LOGOUT"] = "auth.after_cas_logout" + else: + app.config.pop("CAS_SERVER", None) + app.config.pop("CAS_AFTER_LOGIN", None) + app.config.pop("CAS_AFTER_LOGOUT", None) diff --git a/app/auth/logic.py b/app/auth/logic.py index ba3f73a42..a1737a3b4 100644 --- a/app/auth/logic.py +++ b/app/auth/logic.py @@ -8,9 +8,10 @@ import flask from flask import g, redirect, request, url_for from flask_httpauth import HTTPBasicAuth, HTTPTokenAuth import flask_login + from app import login -from app.scodoc.sco_utils import json_error from app.auth.models import User +from app.scodoc.sco_utils import json_error basic_auth = HTTPBasicAuth() token_auth = HTTPTokenAuth() diff --git a/app/auth/models.py b/app/auth/models.py index 274be8e7a..866f08ff8 100644 --- a/app/auth/models.py +++ b/app/auth/models.py @@ -53,12 +53,28 @@ class User(UserMixin, db.Model): id = db.Column(db.Integer, primary_key=True) user_name = db.Column(db.String(64), index=True, unique=True) + "le login" email = db.Column(db.String(120)) nom = db.Column(db.String(64)) prenom = db.Column(db.String(64)) dept = db.Column(db.String(SHORT_STR_LEN), index=True) + "acronyme du département de l'utilisateur" active = db.Column(db.Boolean, default=True, index=True) + "si faux, compte utilisateur désactivé" + cas_id = db.Column(db.Text(), index=True, unique=True, nullable=True) + "uid sur le CAS (mail ou autre attribut, selon config.cas_attribute_id)" + cas_allow_login = db.Column( + db.Boolean, default=False, server_default="false", nullable=False + ) + "Peut-on se logguer via le CAS ?" + cas_allow_scodoc_login = db.Column( + db.Boolean, default=True, server_default="false", nullable=False + ) + """(not yet implemented XXX) + si CAS activé, peut-on se logguer sur ScoDoc directement ? + (le rôle ScoSuperAdmin peut toujours) + """ password_hash = db.Column(db.String(128)) password_scodoc7 = db.Column(db.String(42)) @@ -184,6 +200,9 @@ class User(UserMixin, db.Model): "dept": self.dept, "id": self.id, "active": self.active, + "cas_id": self.cas_id, + "cas_allow_login": self.cas_allow_login, + "cas_allow_scodoc_login": self.cas_allow_scodoc_login, "status_txt": "actif" if self.active else "fermé", "last_seen": self.last_seen.isoformat() + "Z" if self.last_seen else None, "nom": (self.nom or ""), # sco8 @@ -206,7 +225,17 @@ class User(UserMixin, db.Model): """Set users' attributes from given dict values. Roles must be encoded as "roles_string", like "Ens_RT, Secr_CJ" """ - for field in ["nom", "prenom", "dept", "active", "email", "date_expiration"]: + for field in [ + "nom", + "prenom", + "dept", + "active", + "email", + "date_expiration", + "cas_id", + "cas_allow_login", + "cas_allow_scodoc_login", + ]: if field in data: setattr(self, field, data[field] or None) if new_user: diff --git a/app/auth/routes.py b/app/auth/routes.py index 2c1594bc8..b6a0df9a7 100644 --- a/app/auth/routes.py +++ b/app/auth/routes.py @@ -3,6 +3,7 @@ auth.routes.py """ +import flask from flask import current_app, flash, render_template from flask import redirect, url_for, request from flask_login import login_user, logout_user, current_user @@ -20,6 +21,7 @@ from app.auth.models import Role from app.auth.models import User from app.auth.email import send_password_reset_email from app.decorators import admin_required +from app.models.config import ScoDocSiteConfig _ = lambda x: x # sans babel _l = _ @@ -30,6 +32,7 @@ def login(): "ScoDoc Login form" if current_user.is_authenticated: return redirect(url_for("scodoc.index")) + form = LoginForm() if form.validate_on_submit(): user = User.query.filter_by(user_name=form.user_name.data).first() @@ -42,14 +45,22 @@ def login(): return form.redirect("scodoc.index") message = request.args.get("message", "") return render_template( - "auth/login.j2", title=_("Sign In"), form=form, message=message + "auth/login.j2", + title=_("Sign In"), + form=form, + message=message, + is_cas_enabled=ScoDocSiteConfig.is_cas_enabled(), ) @bp.route("/logout") -def logout(): - "Logout current user and redirect to home page" +def logout() -> flask.Response: + "Logout a scodoc user. If CAS session, logout from CAS. Redirect." + current_app.logger.info(f"logout user {current_user.user_name}") logout_user() + if ScoDocSiteConfig.is_cas_enabled() and flask.session.get("scodoc_cas_login_date"): + flask.session.pop("scodoc_cas_login_date", None) + return redirect(url_for("cas.logout")) return redirect(url_for("scodoc.index")) diff --git a/app/forms/main/config_cas.py b/app/forms/main/config_cas.py new file mode 100644 index 000000000..663e487b5 --- /dev/null +++ b/app/forms/main/config_cas.py @@ -0,0 +1,53 @@ +# -*- mode: python -*- +# -*- coding: utf-8 -*- + +############################################################################## +# +# ScoDoc +# +# Copyright (c) 1999 - 2023 Emmanuel Viennet. All rights reserved. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Emmanuel Viennet emmanuel.viennet@viennet.net +# +############################################################################## + +""" +Formulaires configuration Exports Apogée (codes) +""" + +from flask_wtf import FlaskForm +from wtforms import BooleanField, SubmitField +from wtforms.fields.simple import StringField + + +class ConfigCASForm(FlaskForm): + "Formulaire paramétrage CAS" + cas_enable = BooleanField("activer le CAS") + + cas_server = StringField( + label="URL du serveur CAS", + description="""url complète. Commence en général par https://.""", + ) + + cas_attribute_id = StringField( + label="Attribut CAS utilisé comme id", + description="""Le champs CAS qui sera considéré comme l'id unique des + comptes utilisateurs.""", + ) + + submit = SubmitField("Valider") + cancel = SubmitField("Annuler", render_kw={"formnovalidate": True}) diff --git a/app/models/config.py b/app/models/config.py index f4f09cc32..bbe0c0ae7 100644 --- a/app/models/config.py +++ b/app/models/config.py @@ -87,6 +87,10 @@ class ScoDocSiteConfig(db.Model): "enable_entreprises": bool, "month_debut_annee_scolaire": int, "month_debut_periode2": int, + # CAS + "cas_enable": bool, + "cas_server": str, + "cas_attribute_id": str, } def __init__(self, name, value): @@ -170,7 +174,7 @@ class ScoDocSiteConfig(db.Model): (starting with empty string to represent "no bonus function"). """ d = bonus_spo.get_bonus_class_dict() - class_list = [(name, d[name].displayed_name) for name in d.keys()] + class_list = [(name, d[name].displayed_name) for name in d] class_list.sort(key=lambda x: x[1].replace(" du ", " de ")) return [("", "")] + class_list @@ -204,13 +208,31 @@ class ScoDocSiteConfig(db.Model): db.session.add(cfg) db.session.commit() + @classmethod + def is_cas_enabled(cls) -> bool: + """True si on utilise le CAS""" + cfg = ScoDocSiteConfig.query.filter_by(name="cas_enable").first() + return cfg is not None and cfg.value + + @classmethod + def cas_enable(cls, enabled=True) -> bool: + """Active (ou déactive) le CAS. True si changement.""" + if enabled != ScoDocSiteConfig.is_cas_enabled(): + cfg = ScoDocSiteConfig.query.filter_by(name="cas_enable").first() + if cfg is None: + cfg = ScoDocSiteConfig(name="cas_enable", value="on" if enabled else "") + else: + cfg.value = "on" if enabled else "" + db.session.add(cfg) + db.session.commit() + return True + return False + @classmethod def is_entreprises_enabled(cls) -> bool: """True si on doit activer le module entreprise""" cfg = ScoDocSiteConfig.query.filter_by(name="enable_entreprises").first() - if (cfg is None) or not cfg.value: - return False - return True + return cfg is not None and cfg.value @classmethod def enable_entreprises(cls, enabled=True) -> bool: @@ -228,6 +250,26 @@ class ScoDocSiteConfig(db.Model): return True return False + @classmethod + def get(cls, name: str) -> str: + "Get configuration param; empty string if unset" + cfg = ScoDocSiteConfig.query.filter_by(name=name).first() + return (cfg.value or "") if cfg else "" + + @classmethod + def set(cls, name: str, value: str) -> bool: + "Set parameter, returns True if change. Commit session." + if cls.get(name) != (value or ""): + cfg = ScoDocSiteConfig.query.filter_by(name=name).first() + if cfg is None: + cfg = ScoDocSiteConfig(name=name, value=str(value)) + else: + cfg.value = str(value or "") + db.session.add(cfg) + db.session.commit() + return True + return False + @classmethod def _get_int_field(cls, name: str, default=None) -> int: """Valeur d'un champs integer""" diff --git a/app/scodoc/sco_etud.py b/app/scodoc/sco_etud.py index 6d9bc49f4..a4240d4be 100644 --- a/app/scodoc/sco_etud.py +++ b/app/scodoc/sco_etud.py @@ -225,6 +225,9 @@ _identiteEditor = ndb.EditableTable( "nom", "nom_usuel", "prenom", + "cas_id", + "cas_allow_login", + "cas_allow_scodoc_login", "civilite", # 'M", "F", or "X" "date_naissance", "lieu_naissance", diff --git a/app/scodoc/sco_users.py b/app/scodoc/sco_users.py index d3db5b8e7..53a298bb6 100644 --- a/app/scodoc/sco_users.py +++ b/app/scodoc/sco_users.py @@ -28,11 +28,10 @@ """Fonctions sur les utilisateurs """ -# Anciennement ZScoUsers.py, fonctions de gestion des données réécrite avec flask/SQLAlchemy +# Anciennement ZScoUsers.py, fonctions de gestion des données réécrites avec flask/SQLAlchemy import re from flask import url_for, g, request -from flask.templating import render_template from flask_login import current_user @@ -42,23 +41,11 @@ from app.auth.models import Permission from app.auth.models import User from app.scodoc import html_sco_header -from app.scodoc import sco_etud -from app.scodoc import sco_excel from app.scodoc import sco_preferences from app.scodoc.gen_tables import GenTable -from app import log, cache -from app.scodoc.scolog import logdb -import app.scodoc.sco_utils as scu +from app import cache -from app.scodoc.sco_exceptions import ( - AccessDenied, - ScoValueError, -) - - -# --------------- - -# --------------- +from app.scodoc.sco_exceptions import ScoValueError def index_html(all_depts=False, with_inactives=False, format="html"): @@ -70,19 +57,21 @@ def index_html(all_depts=False, with_inactives=False, format="html"): if current_user.has_permission(Permission.ScoUsersAdmin, g.scodoc_dept): H.append( - '

Ajouter un utilisateur'.format( - url_for("users.create_user_form", scodoc_dept=g.scodoc_dept) - ) + f"""

Ajouter un utilisateur""" ) if current_user.is_administrator(): H.append( - '   Importer des utilisateurs

'.format( - url_for("users.import_users_form", scodoc_dept=g.scodoc_dept) - ) + """   Importer des utilisateurs

""" ) + else: H.append( - "   Pour importer des utilisateurs en masse (via xlsx file) contactez votre administrateur scodoc." + """   Pour importer des utilisateurs en masse (via fichier xlsx) + contactez votre administrateur scodoc.""" ) if all_depts: checked = "checked" @@ -93,11 +82,12 @@ def index_html(all_depts=False, with_inactives=False, format="html"): else: olds_checked = "" H.append( - """

- Tous les départements - Avec anciens utilisateurs + f"""

+ Tous les départements + Avec anciens utilisateurs

""" - % (request.base_url, checked, olds_checked) ) L = list_users( @@ -189,9 +179,8 @@ def list_users( }, caption=title, page_title="title", - html_title="""

%d utilisateurs %s

-

Cliquer sur un nom pour changer son mot de passe

""" - % (len(r), comm), + html_title=f"""

{len(r)} utilisateurs {comm}

+

Cliquer sur un nom pour changer son mot de passe

""", html_class="table_leftalign list_users", html_with_td_classes=True, html_sortable=True, @@ -273,6 +262,9 @@ def user_info(user_name_or_id=None, user: User = None): return info +MSG_OPT = """
Attention: (vous pouvez forcer l'opération en cochant "Ignorer les avertissements" en bas de page)""" + + def check_modif_user( edit, enforce_optionals=False, @@ -281,7 +273,8 @@ def check_modif_user( prenom="", email="", dept="", - roles=[], + roles: list = None, + cas_id: str = None, ): """Vérifie que cet utilisateur peut être créé (edit=0) ou modifié (edit=1) Cherche homonymes. @@ -290,32 +283,31 @@ def check_modif_user( (si ok est faux, l'utilisateur peut quand même forcer la creation) - msg: message warning à presenter à l'utilisateur """ - MSG_OPT = """
Attention: (vous pouvez forcer l'opération en cochant "Ignorer les avertissements" en bas de page)""" + roles = roles or [] # ce login existe ? - user = _user_list(user_name) + user = User.query.filter_by(user_name=user_name).first() if edit and not user: # safety net, le user_name ne devrait pas changer - return False, "identifiant %s inexistant" % user_name + return False, f"identifiant {user_name} inexistant" if not edit and user: - return False, "identifiant %s déjà utilisé" % user_name + return False, f"identifiant {user_name} déjà utilisé" if not user_name or not nom or not prenom: return False, "champ requis vide" if not re.match(r"^[a-zA-Z0-9@\\\-_\\\.]*$", user_name): return ( False, - "identifiant '%s' invalide (pas d'accents ni de caractères spéciaux)" - % user_name, + f"identifiant '{user_name}' invalide (pas d'accents ni de caractères spéciaux)", ) if enforce_optionals and len(user_name) > 64: - return False, "identifiant '%s' trop long (64 caractères)" % user_name + return False, f"identifiant '{user_name}' trop long (64 caractères)" if enforce_optionals and len(nom) > 64: - return False, "nom '%s' trop long (64 caractères)" % nom + MSG_OPT + return False, f"nom '{nom}' trop long (64 caractères)" + MSG_OPT if enforce_optionals and len(prenom) > 64: - return False, "prenom '%s' trop long (64 caractères)" % prenom + MSG_OPT - # check that tha same user_name has not already been described in this import + return False, f"prenom '{prenom}' trop long (64 caractères)" + MSG_OPT + # check that same user_name has not already been described in this import if not email: return False, "vous devriez indiquer le mail de l'utilisateur créé !" if len(email) > 120: - return False, "email '%s' trop long (120 caractères)" % email + return False, f"email '{email}' trop long (120 caractères)" if not re.fullmatch(r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b", email): return False, "l'adresse mail semble incorrecte" # check département @@ -324,19 +316,31 @@ def check_modif_user( and dept and Departement.query.filter_by(acronym=dept).first() is None ): - return False, "département '%s' inexistant" % dept + MSG_OPT + return False, f"département '{dept}' inexistant" + MSG_OPT if enforce_optionals and not roles: return False, "aucun rôle sélectionné, êtes vous sûr ?" + MSG_OPT # Unicité du mail users_with_this_mail = User.query.filter_by(email=email).all() if edit: # modification - if email != user["email"] and len(users_with_this_mail) > 0: + if email != user.email and len(users_with_this_mail) > 0: return False, "un autre utilisateur existe déjà avec cette adresse mail" else: # création utilisateur if len(users_with_this_mail) > 0: return False, "un autre utilisateur existe déjà avec cette adresse mail" - # ok + # Unicité du cas_id + if cas_id: + cas_users = User.query.filter_by(cas_id=cas_id).all() + if edit: + if cas_users and ( + len(cas_users) > 1 or cas_users[0].user_name != user_name + ): + return ( + False, + "un autre utilisateur existe déjà avec cet identifiant CAS", + ) + elif cas_users: + return False, "un autre utilisateur existe déjà avec cet identifiant CAS" # Des noms/prénoms semblables existent ? nom = nom.lower().strip() prenom = prenom.lower().strip() @@ -367,7 +371,7 @@ def user_edit(user_name, vals): """Edit the user specified by user_name (ported from Zope to SQLAlchemy, hence strange !) """ - u = User.query.filter_by(user_name=user_name).first() + u: User = User.query.filter_by(user_name=user_name).first() if not u: raise ScoValueError("Invalid user_name") u.from_dict(vals) diff --git a/app/templates/auth/login.j2 b/app/templates/auth/login.j2 index 2877f6725..8f9f4a8cf 100644 --- a/app/templates/auth/login.j2 +++ b/app/templates/auth/login.j2 @@ -9,6 +9,12 @@ {% endif %}

Connexion

+ +{% if is_cas_enabled %} +
+Se connecter avec CAS +
+{% endif %}
{{ wtf.quick_form(form) }} diff --git a/app/templates/auth/user_info_page.j2 b/app/templates/auth/user_info_page.j2 index 23e8c27b2..d5284579a 100644 --- a/app/templates/auth/user_info_page.j2 +++ b/app/templates/auth/user_info_page.j2 @@ -7,6 +7,9 @@

Utilisateur: {{user.user_name}} ({{'actif' if user.active else 'fermé'}})

Login : {{user.user_name}}
+ CAS id: {{user.cas_id or "(aucun)"}} + (CAS {{'autorisé' if user.cas_allow_login else 'interdit'}} pour cet utilisateur) +
Nom : {{user.nom or ""}}
Prénom : {{user.prenom or ""}}
Mail : {{user.email}}
@@ -48,9 +51,15 @@ {% if current_user.id == user.id %} -

Se déconnecter: - logout -

+ {% endif %} {# Liste des permissions #} diff --git a/app/templates/config_cas.j2 b/app/templates/config_cas.j2 new file mode 100644 index 000000000..2cccd0de4 --- /dev/null +++ b/app/templates/config_cas.j2 @@ -0,0 +1,18 @@ +{% extends "base.j2" %} +{% import 'bootstrap/wtf.html' as wtf %} + +{% block app_content %} +

Configuration du Service d'Authentification Central (CAS)

+ + +
+

Le CAS...

+
+
+
+ {{ wtf.quick_form(form) }} +
+
+ + +{% endblock %} \ No newline at end of file diff --git a/app/templates/configuration.j2 b/app/templates/configuration.j2 index 05567ef3f..94726c62b 100644 --- a/app/templates/configuration.j2 +++ b/app/templates/configuration.j2 @@ -53,8 +53,9 @@

-

Utilisateurs

+

Utilisateurs et CAS

+

configuration du service CAS

remettre les permissions des rôles standards à leurs valeurs par défaut (efface les modifications apportées)

diff --git a/app/views/scodoc.py b/app/views/scodoc.py index 83055b504..6a9c5e022 100644 --- a/app/views/scodoc.py +++ b/app/views/scodoc.py @@ -62,6 +62,7 @@ from app.decorators import ( from app.forms.main import config_logos, config_main from app.forms.main.create_dept import CreateDeptForm from app.forms.main.config_apo import CodesDecisionsForm +from app.forms.main.config_cas import ConfigCASForm from app import models from app.models import Departement, Identite from app.models import departements @@ -134,6 +135,33 @@ def toggle_dept_vis(dept_id): return redirect(url_for("scodoc.index")) +@bp.route("/ScoDoc/config_cas", methods=["GET", "POST"]) +@admin_required +def config_cas(): + """Form config CAS""" + form = ConfigCASForm() + if request.method == "POST" and form.cancel.data: # cancel button + return redirect(url_for("scodoc.index")) + if form.validate_on_submit(): + if ScoDocSiteConfig.cas_enable(enabled=form.data["cas_enable"]): + flash("CAS " + ("activé" if form.data["cas_enable"] else "désactivé")) + if ScoDocSiteConfig.set("cas_server", form.data["cas_server"]): + flash("Serveur CAS enregistré") + if ScoDocSiteConfig.set("cas_attribute_id", form.data["cas_attribute_id"]): + flash("Serveur CAS enregistré") + + return redirect(url_for("scodoc.configuration")) + elif request.method == "GET": + form.cas_enable.data = ScoDocSiteConfig.get("cas_enable") + form.cas_server.data = ScoDocSiteConfig.get("cas_server") + form.cas_attribute_id.data = ScoDocSiteConfig.get("cas_attribute_id") + return render_template( + "config_cas.j2", + form=form, + title="Configuration du Service d'Authentification Central (CAS)", + ) + + @bp.route("/ScoDoc/config_codes_decisions", methods=["GET", "POST"]) @admin_required def config_codes_decisions(): diff --git a/app/views/users.py b/app/views/users.py index f6d0bcc71..7efc08e1a 100644 --- a/app/views/users.py +++ b/app/views/users.py @@ -56,6 +56,7 @@ from app.auth.models import UserRole from app.auth.models import is_valid_password from app.email import send_email from app.models import Departement +from app.models.config import ScoDocSiteConfig from app.decorators import ( scodoc, @@ -226,7 +227,7 @@ def create_user_form(user_name=None, edit=0, all_roles=True): if edit: if not user_name: raise ValueError("missing argument: user_name") - the_user = User.query.filter_by(user_name=user_name).first() + the_user: User = User.query.filter_by(user_name=user_name).first() if not the_user: raise ScoValueError("utilisateur inexistant") initvalues = the_user.to_dict() @@ -367,6 +368,7 @@ def create_user_form(user_name=None, edit=0, all_roles=True): {"input_type": "hidden", "default": initvalues["user_name"]}, ) ] + cas_enabled = ScoDocSiteConfig.is_cas_enabled() descr += [ ( "email", @@ -376,11 +378,34 @@ def create_user_form(user_name=None, edit=0, all_roles=True): "explanation": "requis, doit fonctionner" if not edit_only_roles else "", - "size": 20, + "size": 36, "allow_null": False, "readonly": edit_only_roles, }, - ) + ), + ( + "cas_id", + { + "title": "Identifiant CAS", + "input_type": "text", + "explanation": "id du compte utilisateur sur le CAS de l'établissement " + + "(service CAS activé)" + if cas_enabled + else "(service CAS non activé)", + "size": 36, + "allow_null": True, + "readonly": not cas_enabled, + }, + ), + ( + "cas_allow_login", + { + "title": "Autorise connexion via CAS", + "input_type": "boolcheckbox", + "explanation": "en test: seul le super-administrateur peut changer ce réglage", + "readonly": not current_user.is_administrator(), + }, + ), ] if not edit: # options création utilisateur descr += [ @@ -438,7 +463,8 @@ def create_user_form(user_name=None, edit=0, all_roles=True): "d", { "input_type": "separator", - "title": f"""L'utilisateur appartient au département {the_user.dept or "(tous)"}""", + "title": f"""L'utilisateur appartient au département { + the_user.dept or "(tous)"}""", }, ) ) @@ -541,7 +567,6 @@ def create_user_form(user_name=None, edit=0, all_roles=True): if err_msg: H.append(tf_error_message(f"""Erreur: {err_msg}""")) return "\n".join(H) + "\n" + tf[1] + F - if not edit_only_roles: ok_modif, msg = sco_users.check_modif_user( edit, @@ -552,6 +577,7 @@ def create_user_form(user_name=None, edit=0, all_roles=True): email=vals["email"], dept=vals.get("dept", auth_dept), roles=vals["roles"], + cas_id=vals["cas_id"], ) if not ok_modif: H.append(tf_error_message(msg)) @@ -815,12 +841,17 @@ def user_info_page(user_name=None): if not user: raise ScoValueError("invalid user_name") + session_info = None + if user.id == current_user.id: + session_info = flask.session.get("scodoc_cas_login_date") + return render_template( "auth/user_info_page.j2", user=user, title=f"Utilisateur {user.user_name}", Permission=Permission, dept=dept, + session_info=session_info, ) diff --git a/config.py b/config.py index 740860a61..584a6843b 100755 --- a/config.py +++ b/config.py @@ -43,6 +43,10 @@ class Config: # STATIC_URL_PATH = "/ScoDoc/static" # static_folder = "stat" # SERVER_NAME = os.environ.get("SERVER_NAME") + # XXX temporaire: utiliser SiteConfig + CAS_SERVER = os.environ.get("CAS_SERVER") + CAS_AFTER_LOGIN = os.environ.get("CAS_AFTER_LOGIN") + CAS_AFTER_LOGOUT = os.environ.get("CAS_AFTER_LOGOUT") class ProdConfig(Config): diff --git a/flask_cas/README.md b/flask_cas/README.md new file mode 100644 index 000000000..d71e8b507 --- /dev/null +++ b/flask_cas/README.md @@ -0,0 +1,7 @@ +# Flask-CAS + +Forked from +and adapted by Emmanuel Viennet, Feb. 2023. + +- logout: clear `_CAS_TOKEN`. +- Use `url` instead of `service` parameter in logout URL. diff --git a/flask_cas/__init__.py b/flask_cas/__init__.py new file mode 100644 index 000000000..76e02c252 --- /dev/null +++ b/flask_cas/__init__.py @@ -0,0 +1,106 @@ +""" +flask_cas.__init__ +""" + +import flask +from flask import current_app + +# Find the stack on which we want to store the database connection. +# Starting with Flask 0.9, the _app_ctx_stack is the correct one, +# before that we need to use the _request_ctx_stack. +try: + from flask import _app_ctx_stack as stack +except ImportError: + from flask import _request_ctx_stack as stack + +from . import routing + +from functools import wraps + +class CAS(object): + """ + Required Configs: + + |Key | + |----------------| + |CAS_SERVER | + |CAS_AFTER_LOGIN | + + Optional Configs: + + |Key | Default | + |---------------------------|-----------------------| + |CAS_TOKEN_SESSION_KEY | _CAS_TOKEN | + |CAS_USERNAME_SESSION_KEY | CAS_USERNAME | + |CAS_ATTRIBUTES_SESSION_KEY | CAS_ATTRIBUTES | + |CAS_LOGIN_ROUTE | '/cas' | + |CAS_LOGOUT_ROUTE | '/cas/logout' | + |CAS_VALIDATE_ROUTE | '/cas/serviceValidate'| + |CAS_AFTER_LOGOUT | None | + """ + + def __init__(self, app=None, url_prefix=None): + self._app = app + if app is not None: + self.init_app(app, url_prefix) + + def init_app(self, app, url_prefix=None): + # Configuration defaults + app.config.setdefault('CAS_TOKEN_SESSION_KEY', '_CAS_TOKEN') + app.config.setdefault('CAS_USERNAME_SESSION_KEY', 'CAS_USERNAME') + app.config.setdefault('CAS_ATTRIBUTES_SESSION_KEY', 'CAS_ATTRIBUTES') + app.config.setdefault('CAS_LOGIN_ROUTE', '/cas') + app.config.setdefault('CAS_LOGOUT_ROUTE', '/cas/logout') + app.config.setdefault('CAS_VALIDATE_ROUTE', '/cas/serviceValidate') + # Requires CAS 2.0 + app.config.setdefault('CAS_AFTER_LOGOUT', None) + # Register Blueprint + app.register_blueprint(routing.blueprint, url_prefix=url_prefix) + + # Use the newstyle teardown_appcontext if it's available, + # otherwise fall back to the request context + if hasattr(app, 'teardown_appcontext'): + app.teardown_appcontext(self.teardown) + else: + app.teardown_request(self.teardown) + + def teardown(self, exception): + ctx = stack.top + + @property + def app(self): + return self._app or current_app + + @property + def username(self): + return flask.session.get( + self.app.config['CAS_USERNAME_SESSION_KEY'], None) + + @property + def attributes(self): + return flask.session.get( + self.app.config['CAS_ATTRIBUTES_SESSION_KEY'], None) + + @property + def token(self): + return flask.session.get( + self.app.config['CAS_TOKEN_SESSION_KEY'], None) + +def login(): + return flask.redirect(flask.url_for('cas.login', _external=True)) + +def logout(): + return flask.redirect(flask.url_for('cas.logout', _external=True)) + +def login_required(function): + @wraps(function) + def wrap(*args, **kwargs): + if 'CAS_USERNAME' not in flask.session: + flask.session['CAS_AFTER_LOGIN_SESSION_URL'] = ( + flask.request.script_root + + flask.request.full_path + ) + return login() + else: + return function(*args, **kwargs) + return wrap diff --git a/flask_cas/cas_urls.py b/flask_cas/cas_urls.py new file mode 100644 index 000000000..276ef89c7 --- /dev/null +++ b/flask_cas/cas_urls.py @@ -0,0 +1,125 @@ +""" +flask_cas.cas_urls + +Functions for creating urls to access CAS. +""" + +try: + from urllib import quote + from urllib import urlencode + from urlparse import urljoin +except ImportError: + from urllib.parse import quote + from urllib.parse import urljoin + from urllib.parse import urlencode + + +def create_url(base, path=None, *query): + """Create a url. + + Creates a url by combining base, path, and the query's list of + key/value pairs. Escaping is handled automatically. Any + key/value pair with a value that is None is ignored. + + Keyword arguments: + base -- The left most part of the url (ex. http://localhost:5000). + path -- The path after the base (ex. /foo/bar). + query -- A list of key value pairs (ex. [('key', 'value')]). + + Example usage: + >>> create_url( + ... 'http://localhost:5000', + ... 'foo/bar', + ... ('key1', 'value'), + ... ('key2', None), # Will not include None + ... ('url', 'http://example.com'), + ... ) + 'http://localhost:5000/foo/bar?key1=value&url=http%3A%2F%2Fexample.com' + """ + url = base + # Add the path to the url if it's not None. + if path is not None: + url = urljoin(url, quote(path)) + # Remove key/value pairs with None values. + query = filter(lambda pair: pair[1] is not None, query) + # Add the query string to the url + url = urljoin(url, "?{0}".format(urlencode(list(query)))) + return url + + +def create_cas_login_url(cas_url, cas_route, service, renew=None, gateway=None): + """Create a CAS login URL . + + Keyword arguments: + cas_url -- The url to the CAS (ex. http://sso.pdx.edu) + cas_route -- The route where the CAS lives on server (ex. /cas) + service -- (ex. http://localhost:5000/login) + renew -- "true" or "false" + gateway -- "true" or "false" + + Example usage: + >>> create_cas_login_url( + ... 'http://sso.pdx.edu', + ... '/cas', + ... 'http://localhost:5000', + ... ) + 'http://sso.pdx.edu/cas?service=http%3A%2F%2Flocalhost%3A5000' + """ + return create_url( + cas_url, + cas_route, + ("service", service), + ("renew", renew), + ("gateway", gateway), + ) + + +def create_cas_logout_url(cas_url, cas_route, service=None): + """Create a CAS logout URL. + + Keyword arguments: + cas_url -- The url to the CAS (ex. http://sso.pdx.edu) + cas_route -- The route where the CAS lives on server (ex. /cas/logout) + url -- (ex. http://localhost:5000/login) + + Example usage: + >>> create_cas_logout_url( + ... 'http://sso.pdx.edu', + ... '/cas/logout', + ... 'http://localhost:5000', + ... ) + 'http://sso.pdx.edu/cas/logout?service=http%3A%2F%2Flocalhost%3A5000' + """ + return create_url( + cas_url, + cas_route, + ("url", service), # 'url' (and not 'service'): redirect here after CAS logout + ) + + +def create_cas_validate_url(cas_url, cas_route, service, ticket, renew=None): + """Create a CAS validate URL. + + Keyword arguments: + cas_url -- The url to the CAS (ex. http://sso.pdx.edu) + cas_route -- The route where the CAS lives on server (ex. /cas/serviceValidate) + service -- (ex. http://localhost:5000/login) + ticket -- (ex. 'ST-58274-x839euFek492ou832Eena7ee-cas') + renew -- "true" or "false" + + Example usage: + >>> create_cas_validate_url( + ... 'http://sso.pdx.edu', + ... '/cas/serviceValidate', + ... 'http://localhost:5000/login', + ... 'ST-58274-x839euFek492ou832Eena7ee-cas' + ... ) + 'http://sso.pdx.edu/cas/serviceValidate?service=http%3A%2F%2Flocalhost%3A5000%2Flogin&ticket=ST-58274-x839euFek492ou832Eena7ee-cas' + """ + return create_url( + cas_url, + cas_route, + ("service", service), + ("ticket", ticket), + ("renew", renew), + ) diff --git a/flask_cas/routing.py b/flask_cas/routing.py new file mode 100644 index 000000000..e241dfcc2 --- /dev/null +++ b/flask_cas/routing.py @@ -0,0 +1,165 @@ +import flask +from xmltodict import parse +from flask import current_app +from .cas_urls import create_cas_login_url +from .cas_urls import create_cas_logout_url +from .cas_urls import create_cas_validate_url + + +try: + from urllib import urlopen +except ImportError: + from urllib.request import urlopen + +blueprint = flask.Blueprint("cas", __name__) + + +@blueprint.route("/login/") +def login(): + """ + This route has two purposes. First, it is used by the user + to login. Second, it is used by the CAS to respond with the + `ticket` after the user logs in successfully. + + When the user accesses this url, they are redirected to the CAS + to login. If the login was successful, the CAS will respond to this + route with the ticket in the url. The ticket is then validated. + If validation was successful the logged in username is saved in + the user's session under the key `CAS_USERNAME_SESSION_KEY` and + the user's attributes are saved under the key + 'CAS_USERNAME_ATTRIBUTE_KEY' + """ + + cas_token_session_key = current_app.config["CAS_TOKEN_SESSION_KEY"] + + redirect_url = create_cas_login_url( + current_app.config["CAS_SERVER"], + current_app.config["CAS_LOGIN_ROUTE"], + flask.url_for( + ".login", + origin=flask.session.get("CAS_AFTER_LOGIN_SESSION_URL"), + _external=True, + ), + ) + + if "ticket" in flask.request.args: + flask.session[cas_token_session_key] = flask.request.args["ticket"] + + if cas_token_session_key in flask.session: + + if validate(flask.session[cas_token_session_key]): + if "CAS_AFTER_LOGIN_SESSION_URL" in flask.session: + redirect_url = flask.session.pop("CAS_AFTER_LOGIN_SESSION_URL") + elif flask.request.args.get("origin"): + redirect_url = flask.request.args["origin"] + else: + redirect_url = flask.url_for(current_app.config["CAS_AFTER_LOGIN"]) + else: + flask.session.pop(cas_token_session_key, None) + + current_app.logger.debug("Redirecting to: {0}".format(redirect_url)) + + return flask.redirect(redirect_url) + + +@blueprint.route("/logout/") +def logout(): + """ + When the user accesses this route they are logged out. + """ + + cas_username_session_key = current_app.config["CAS_USERNAME_SESSION_KEY"] + cas_attributes_session_key = current_app.config["CAS_ATTRIBUTES_SESSION_KEY"] + cas_token_session_key = current_app.config["CAS_TOKEN_SESSION_KEY"] + + flask.session.pop(cas_username_session_key, None) + flask.session.pop(cas_attributes_session_key, None) + flask.session.pop(cas_token_session_key, None) # added by EV + + cas_after_logout = current_app.config["CAS_AFTER_LOGOUT"] + if cas_after_logout is not None: + # If config starts with http, use it as dest URL. + # Else, build Flask URL + dest_url = ( + cas_after_logout + if cas_after_logout.startswith("http") + else flask.url_for(cas_after_logout, _external=True) + ) + redirect_url = create_cas_logout_url( + current_app.config["CAS_SERVER"], + current_app.config["CAS_LOGOUT_ROUTE"], + dest_url, + ) + else: + redirect_url = create_cas_logout_url( + current_app.config["CAS_SERVER"], current_app.config["CAS_LOGOUT_ROUTE"] + ) + + current_app.logger.debug("Redirecting to: {0}".format(redirect_url)) + return flask.redirect(redirect_url) + + +def validate(ticket): + """ + Will attempt to validate the ticket. If validation fails, then False + is returned. If validation is successful, then True is returned + and the validated username is saved in the session under the + key `CAS_USERNAME_SESSION_KEY` while tha validated attributes dictionary + is saved under the key 'CAS_ATTRIBUTES_SESSION_KEY'. + """ + + cas_username_session_key = current_app.config["CAS_USERNAME_SESSION_KEY"] + cas_attributes_session_key = current_app.config["CAS_ATTRIBUTES_SESSION_KEY"] + + current_app.logger.debug("validating token {0}".format(ticket)) + + cas_validate_url = create_cas_validate_url( + current_app.config["CAS_SERVER"], + current_app.config["CAS_VALIDATE_ROUTE"], + flask.url_for( + ".login", + origin=flask.session.get("CAS_AFTER_LOGIN_SESSION_URL"), + _external=True, + ), + ticket, + ) + + current_app.logger.debug("Making GET request to {0}".format(cas_validate_url)) + + xml_from_dict = {} + isValid = False + + try: + xmldump = urlopen(cas_validate_url).read().strip().decode("utf8", "ignore") + xml_from_dict = parse(xmldump) + isValid = ( + True + if "cas:authenticationSuccess" in xml_from_dict["cas:serviceResponse"] + else False + ) + except ValueError: + current_app.logger.error("CAS returned unexpected result") + + if isValid: + current_app.logger.debug("valid") + xml_from_dict = xml_from_dict["cas:serviceResponse"][ + "cas:authenticationSuccess" + ] + username = xml_from_dict["cas:user"] + attributes = xml_from_dict.get("cas:attributes", {}) + + if attributes and "cas:memberOf" in attributes: + if isinstance(attributes["cas:memberOf"], basestring): + attributes["cas:memberOf"] = ( + attributes["cas:memberOf"].lstrip("[").rstrip("]").split(",") + ) + for group_number in range(0, len(attributes["cas:memberOf"])): + attributes["cas:memberOf"][group_number] = ( + attributes["cas:memberOf"][group_number].lstrip(" ").rstrip(" ") + ) + flask.session[cas_username_session_key] = username + flask.session[cas_attributes_session_key] = attributes + else: + current_app.logger.debug("invalid") + + return isValid diff --git a/requirements-3.9.txt b/requirements-3.9.txt index 9032183ad..99dddf507 100755 --- a/requirements-3.9.txt +++ b/requirements-3.9.txt @@ -85,4 +85,5 @@ visitor==0.1.3 Werkzeug==2.1.1 wrapt==1.14.0 WTForms==3.0.1 +xmltodict==0.13.0 zipp==3.8.0 diff --git a/sco_version.py b/sco_version.py index 4a20b3b64..e99111a47 100644 --- a/sco_version.py +++ b/sco_version.py @@ -10,6 +10,7 @@ SCONEWS = """
  • ScoDoc 9.4
    • +
    • Connexion avec service CAS
    • Améliorations des tableaux récapitulatifs
    • Nouvelle interface de gestions des groupes (S. Lehmann)
    • Enrichissement des jurys BUT et des procès-verbaux associés.