diff --git a/app/__init__.py b/app/__init__.py index 72cc1cb7..856dd8ea 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -17,15 +17,17 @@ from flask import current_app, g, request from flask import Flask from flask import abort, flash, has_request_context, jsonify from flask import render_template -from flask.json import JSONEncoder -from flask.logging import default_handler -from flask_sqlalchemy import SQLAlchemy -from flask_migrate import Migrate +from flask_bootstrap import Bootstrap +from flask_caching import Cache +from flask_cas import CAS from flask_login import LoginManager, current_user from flask_mail import Mail -from flask_bootstrap import Bootstrap +from flask_migrate import Migrate from flask_moment import Moment -from flask_caching import Cache +from flask_sqlalchemy import SQLAlchemy +from flask.json import JSONEncoder +from flask.logging import default_handler + from jinja2 import select_autoescape import sqlalchemy @@ -132,7 +134,7 @@ class ScoDocJSONEncoder(JSONEncoder): def render_raw_html(template_filename: str, **args) -> str: """Load and render an HTML file _without_ using Flask - Necessary for 503 error mesage, when DB is down and Flask may be broken. + Necessary for 503 error message, when DB is down and Flask may be broken. """ template_path = os.path.join( current_app.config["SCODOC_DIR"], @@ -226,14 +228,14 @@ class ReverseProxied(object): def create_app(config_class=DevConfig): app = Flask(__name__, static_url_path="/ScoDoc/static", static_folder="static") + CAS(app, url_prefix="/cas") app.wsgi_app = ReverseProxied(app.wsgi_app) app.json_encoder = ScoDocJSONEncoder - app.logger.setLevel(logging.INFO) - - # Evite de logguer toutes les requetes dans notre log - logging.getLogger("werkzeug").disabled = True app.config.from_object(config_class) + # Evite de logguer toutes les requetes dans notre log + logging.getLogger("werkzeug").disabled = True + app.logger.setLevel(app.config["LOG_LEVEL"]) # Vérifie/crée lien sym pour les URL statiques link_filename = f"{app.root_path}/static/links/{sco_version.SCOVERSION}" @@ -378,6 +380,11 @@ def create_app(config_class=DevConfig): sco_bulletins_generator.register_bulletin_class(BulletinGeneratorExample) + from app.auth.cas import set_cas_configuration + + with app.app_context(): + set_cas_configuration(app) + return app diff --git a/app/auth/__init__.py b/app/auth/__init__.py index 4fb9575d..7dcff2b3 100644 --- a/app/auth/__init__.py +++ b/app/auth/__init__.py @@ -6,3 +6,4 @@ from flask import Blueprint bp = Blueprint("auth", __name__) from app.auth import routes +from app.auth import cas diff --git a/app/auth/cas.py b/app/auth/cas.py new file mode 100644 index 00000000..4b478eaf --- /dev/null +++ b/app/auth/cas.py @@ -0,0 +1,88 @@ +# -*- coding: UTF-8 -* +""" +auth.cas.py +""" +import datetime + +import flask +from flask import current_app, flash, url_for +from flask_login import login_user + +from app.auth import bp +from app.auth.models import User +from app.models.config import ScoDocSiteConfig +from app.scodoc.sco_exceptions import ScoValueError + +# after_cas_login/after_cas_logout : routes appelées par redirect depuis le serveur CAS. + + +@bp.route("/after_cas_login") +def after_cas_login(): + "Called by CAS after CAS authentication" + # Ici on a les infos dans flask.session["CAS_ATTRIBUTES"] + if ScoDocSiteConfig.is_cas_enabled() and ("CAS_ATTRIBUTES" in flask.session): + # Lookup user: + cas_id = flask.session["CAS_ATTRIBUTES"].get( + "cas:" + ScoDocSiteConfig.get("cas_attribute_id") + ) + if cas_id is not None: + user = User.query.filter_by(cas_id=cas_id).first() + if user and user.active: + if user.cas_allow_login: + current_app.logger.info(f"CAS: login {user.user_name}") + if login_user(user): + flask.session[ + "scodoc_cas_login_date" + ] = datetime.datetime.now().isoformat() + return flask.redirect(url_for("scodoc.index")) + else: + current_app.logger.info( + f"CAS login denied for {user.user_name} (not allowed to use CAS)" + ) + else: + current_app.logger.info( + f"""CAS login denied for {user.user_name if user else ""} cas_id={cas_id} (unknown or inactive)""" + ) + else: + current_app.logger.info( + f"""CAS attribute '{ScoDocSiteConfig.get("cas_attribute_id")}' not found ! + (check your ScoDoc config)""" + ) + + # Echec: + flash("échec de l'authentification") + return flask.redirect(url_for("auth.login")) + + +@bp.route("/after_cas_logout") +def after_cas_logout(): + "Called by CAS after CAS logout" + flash("Vous êtes déconnecté") + current_app.logger.info("after_cas_logout") + return flask.redirect(url_for("scodoc.index")) + + +def cas_error_callback(message): + "Called by CAS when an error occurs, with a message" + raise ScoValueError(f"Erreur authentification CAS: {message}") + + +def set_cas_configuration(app: flask.app.Flask = None): + """Force la configuration du module flask_cas à partir des paramètres de + la config de ScoDoc. + Appelé au démarrage et à chaque modif des paramètres. + """ + app = app or current_app + if ScoDocSiteConfig.is_cas_enabled(): + app.config["CAS_SERVER"] = ScoDocSiteConfig.get("cas_server") + app.config["CAS_AFTER_LOGIN"] = "auth.after_cas_login" + app.config["CAS_AFTER_LOGOUT"] = "auth.after_cas_logout" + app.config["CAS_ERROR_CALLBACK"] = cas_error_callback + app.config["CAS_SSL_VERIFY"] = ScoDocSiteConfig.get("cas_ssl_verify") + app.config["CAS_SSL_CERTIFICATE"] = ScoDocSiteConfig.get("cas_ssl_certificate") + else: + app.config.pop("CAS_SERVER", None) + app.config.pop("CAS_AFTER_LOGIN", None) + app.config.pop("CAS_AFTER_LOGOUT", None) + app.config.pop("CAS_SSL_VERIFY", None) + app.config.pop("CAS_SSL_CERTIFICATE", None) diff --git a/app/auth/logic.py b/app/auth/logic.py index ba3f73a4..a1737a3b 100644 --- a/app/auth/logic.py +++ b/app/auth/logic.py @@ -8,9 +8,10 @@ import flask from flask import g, redirect, request, url_for from flask_httpauth import HTTPBasicAuth, HTTPTokenAuth import flask_login + from app import login -from app.scodoc.sco_utils import json_error from app.auth.models import User +from app.scodoc.sco_utils import json_error basic_auth = HTTPBasicAuth() token_auth = HTTPTokenAuth() diff --git a/app/auth/models.py b/app/auth/models.py index 274be8e7..c7ef016d 100644 --- a/app/auth/models.py +++ b/app/auth/models.py @@ -53,12 +53,28 @@ class User(UserMixin, db.Model): id = db.Column(db.Integer, primary_key=True) user_name = db.Column(db.String(64), index=True, unique=True) + "le login" email = db.Column(db.String(120)) nom = db.Column(db.String(64)) prenom = db.Column(db.String(64)) dept = db.Column(db.String(SHORT_STR_LEN), index=True) + "acronyme du département de l'utilisateur" active = db.Column(db.Boolean, default=True, index=True) + "si faux, compte utilisateur désactivé" + cas_id = db.Column(db.Text(), index=True, unique=True, nullable=True) + "uid sur le CAS (mail ou autre attribut, selon config.cas_attribute_id)" + cas_allow_login = db.Column( + db.Boolean, default=False, server_default="false", nullable=False + ) + "Peut-on se logguer via le CAS ?" + cas_allow_scodoc_login = db.Column( + db.Boolean, default=True, server_default="false", nullable=False + ) + """(not yet implemented XXX) + si CAS activé, peut-on se logguer sur ScoDoc directement ? + (le rôle ScoSuperAdmin peut toujours) + """ password_hash = db.Column(db.String(128)) password_scodoc7 = db.Column(db.String(42)) @@ -184,6 +200,9 @@ class User(UserMixin, db.Model): "dept": self.dept, "id": self.id, "active": self.active, + "cas_id": self.cas_id, + "cas_allow_login": self.cas_allow_login, + "cas_allow_scodoc_login": self.cas_allow_scodoc_login, "status_txt": "actif" if self.active else "fermé", "last_seen": self.last_seen.isoformat() + "Z" if self.last_seen else None, "nom": (self.nom or ""), # sco8 @@ -206,7 +225,17 @@ class User(UserMixin, db.Model): """Set users' attributes from given dict values. Roles must be encoded as "roles_string", like "Ens_RT, Secr_CJ" """ - for field in ["nom", "prenom", "dept", "active", "email", "date_expiration"]: + for field in [ + "nom", + "prenom", + "dept", + "active", + "email", + "date_expiration", + "cas_id", + "cas_allow_login", + "cas_allow_scodoc_login", + ]: if field in data: setattr(self, field, data[field] or None) if new_user: @@ -310,7 +339,7 @@ class User(UserMixin, db.Model): """string repr. of user's roles (with depts) e.g. "Ens_RT, Ens_Info, Secr_CJ" """ - return ",".join( + return ", ".join( f"{r.role.name or ''}_{r.dept or ''}" for r in self.user_roles if r is not None diff --git a/app/auth/routes.py b/app/auth/routes.py index 2c1594bc..b6a0df9a 100644 --- a/app/auth/routes.py +++ b/app/auth/routes.py @@ -3,6 +3,7 @@ auth.routes.py """ +import flask from flask import current_app, flash, render_template from flask import redirect, url_for, request from flask_login import login_user, logout_user, current_user @@ -20,6 +21,7 @@ from app.auth.models import Role from app.auth.models import User from app.auth.email import send_password_reset_email from app.decorators import admin_required +from app.models.config import ScoDocSiteConfig _ = lambda x: x # sans babel _l = _ @@ -30,6 +32,7 @@ def login(): "ScoDoc Login form" if current_user.is_authenticated: return redirect(url_for("scodoc.index")) + form = LoginForm() if form.validate_on_submit(): user = User.query.filter_by(user_name=form.user_name.data).first() @@ -42,14 +45,22 @@ def login(): return form.redirect("scodoc.index") message = request.args.get("message", "") return render_template( - "auth/login.j2", title=_("Sign In"), form=form, message=message + "auth/login.j2", + title=_("Sign In"), + form=form, + message=message, + is_cas_enabled=ScoDocSiteConfig.is_cas_enabled(), ) @bp.route("/logout") -def logout(): - "Logout current user and redirect to home page" +def logout() -> flask.Response: + "Logout a scodoc user. If CAS session, logout from CAS. Redirect." + current_app.logger.info(f"logout user {current_user.user_name}") logout_user() + if ScoDocSiteConfig.is_cas_enabled() and flask.session.get("scodoc_cas_login_date"): + flask.session.pop("scodoc_cas_login_date", None) + return redirect(url_for("cas.logout")) return redirect(url_for("scodoc.index")) diff --git a/app/forms/main/config_cas.py b/app/forms/main/config_cas.py new file mode 100644 index 00000000..217b274f --- /dev/null +++ b/app/forms/main/config_cas.py @@ -0,0 +1,60 @@ +# -*- mode: python -*- +# -*- coding: utf-8 -*- + +############################################################################## +# +# ScoDoc +# +# Copyright (c) 1999 - 2023 Emmanuel Viennet. All rights reserved. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Emmanuel Viennet emmanuel.viennet@viennet.net +# +############################################################################## + +""" +Formulaires configuration Exports Apogée (codes) +""" + +from flask_wtf import FlaskForm +from wtforms import BooleanField, SubmitField +from wtforms.fields.simple import FileField, StringField, TextAreaField + + +class ConfigCASForm(FlaskForm): + "Formulaire paramétrage CAS" + cas_enable = BooleanField("activer le CAS") + + cas_server = StringField( + label="URL du serveur CAS", + description="""url complète. Commence en général par https://.""", + ) + + cas_attribute_id = StringField( + label="Attribut CAS utilisé comme id", + description="""Le champs CAS qui sera considéré comme l'id unique des + comptes utilisateurs.""", + ) + + cas_ssl_verify = BooleanField("Vérification du certificat SSL") + cas_ssl_certificate_file = FileField( + label="Certificat (PEM)", + description="""Le contenu du certificat PEM + (commence typiquement par -----BEGIN CERTIFICATE-----)""", + ) + + submit = SubmitField("Valider") + cancel = SubmitField("Annuler", render_kw={"formnovalidate": True}) diff --git a/app/models/config.py b/app/models/config.py index f4f09cc3..13110630 100644 --- a/app/models/config.py +++ b/app/models/config.py @@ -4,7 +4,7 @@ """ from flask import flash -from app import db, log +from app import current_app, db, log from app.comp import bonus_spo from app.scodoc import sco_utils as scu @@ -87,6 +87,10 @@ class ScoDocSiteConfig(db.Model): "enable_entreprises": bool, "month_debut_annee_scolaire": int, "month_debut_periode2": int, + # CAS + "cas_enable": bool, + "cas_server": str, + "cas_attribute_id": str, } def __init__(self, name, value): @@ -170,7 +174,7 @@ class ScoDocSiteConfig(db.Model): (starting with empty string to represent "no bonus function"). """ d = bonus_spo.get_bonus_class_dict() - class_list = [(name, d[name].displayed_name) for name in d.keys()] + class_list = [(name, d[name].displayed_name) for name in d] class_list.sort(key=lambda x: x[1].replace(" du ", " de ")) return [("", "")] + class_list @@ -204,13 +208,31 @@ class ScoDocSiteConfig(db.Model): db.session.add(cfg) db.session.commit() + @classmethod + def is_cas_enabled(cls) -> bool: + """True si on utilise le CAS""" + cfg = ScoDocSiteConfig.query.filter_by(name="cas_enable").first() + return cfg is not None and cfg.value + + @classmethod + def cas_enable(cls, enabled=True) -> bool: + """Active (ou déactive) le CAS. True si changement.""" + if enabled != ScoDocSiteConfig.is_cas_enabled(): + cfg = ScoDocSiteConfig.query.filter_by(name="cas_enable").first() + if cfg is None: + cfg = ScoDocSiteConfig(name="cas_enable", value="on" if enabled else "") + else: + cfg.value = "on" if enabled else "" + db.session.add(cfg) + db.session.commit() + return True + return False + @classmethod def is_entreprises_enabled(cls) -> bool: """True si on doit activer le module entreprise""" cfg = ScoDocSiteConfig.query.filter_by(name="enable_entreprises").first() - if (cfg is None) or not cfg.value: - return False - return True + return cfg is not None and cfg.value @classmethod def enable_entreprises(cls, enabled=True) -> bool: @@ -228,6 +250,29 @@ class ScoDocSiteConfig(db.Model): return True return False + @classmethod + def get(cls, name: str) -> str: + "Get configuration param; empty string if unset" + cfg = ScoDocSiteConfig.query.filter_by(name=name).first() + return (cfg.value or "") if cfg else "" + + @classmethod + def set(cls, name: str, value: str) -> bool: + "Set parameter, returns True if change. Commit session." + if cls.get(name) != (value or ""): + cfg = ScoDocSiteConfig.query.filter_by(name=name).first() + if cfg is None: + cfg = ScoDocSiteConfig(name=name, value=str(value)) + else: + cfg.value = str(value or "") + current_app.logger.info( + f"""ScoDocSiteConfig: recording {cfg.name}='{cfg.value[:32]}...'""" + ) + db.session.add(cfg) + db.session.commit() + return True + return False + @classmethod def _get_int_field(cls, name: str, default=None) -> int: """Valeur d'un champs integer""" diff --git a/app/scodoc/sco_etud.py b/app/scodoc/sco_etud.py index 6d9bc49f..a4240d4b 100644 --- a/app/scodoc/sco_etud.py +++ b/app/scodoc/sco_etud.py @@ -225,6 +225,9 @@ _identiteEditor = ndb.EditableTable( "nom", "nom_usuel", "prenom", + "cas_id", + "cas_allow_login", + "cas_allow_scodoc_login", "civilite", # 'M", "F", or "X" "date_naissance", "lieu_naissance", diff --git a/app/scodoc/sco_users.py b/app/scodoc/sco_users.py index d3db5b8e..f9bd5692 100644 --- a/app/scodoc/sco_users.py +++ b/app/scodoc/sco_users.py @@ -28,11 +28,10 @@ """Fonctions sur les utilisateurs """ -# Anciennement ZScoUsers.py, fonctions de gestion des données réécrite avec flask/SQLAlchemy +# Anciennement ZScoUsers.py, fonctions de gestion des données réécrites avec flask/SQLAlchemy import re from flask import url_for, g, request -from flask.templating import render_template from flask_login import current_user @@ -40,25 +39,13 @@ from app import db, Departement from app.auth.models import Permission from app.auth.models import User - +from app.models import ScoDocSiteConfig from app.scodoc import html_sco_header -from app.scodoc import sco_etud -from app.scodoc import sco_excel from app.scodoc import sco_preferences from app.scodoc.gen_tables import GenTable -from app import log, cache -from app.scodoc.scolog import logdb -import app.scodoc.sco_utils as scu +from app import cache -from app.scodoc.sco_exceptions import ( - AccessDenied, - ScoValueError, -) - - -# --------------- - -# --------------- +from app.scodoc.sco_exceptions import ScoValueError def index_html(all_depts=False, with_inactives=False, format="html"): @@ -70,19 +57,21 @@ def index_html(all_depts=False, with_inactives=False, format="html"): if current_user.has_permission(Permission.ScoUsersAdmin, g.scodoc_dept): H.append( - '

Ajouter un utilisateur'.format( - url_for("users.create_user_form", scodoc_dept=g.scodoc_dept) - ) + f"""

Ajouter un utilisateur""" ) if current_user.is_administrator(): H.append( - '   Importer des utilisateurs

'.format( - url_for("users.import_users_form", scodoc_dept=g.scodoc_dept) - ) + """   Importer des utilisateurs

""" ) + else: H.append( - "   Pour importer des utilisateurs en masse (via xlsx file) contactez votre administrateur scodoc." + """   Pour importer des utilisateurs en masse (via fichier xlsx) + contactez votre administrateur scodoc.""" ) if all_depts: checked = "checked" @@ -93,11 +82,12 @@ def index_html(all_depts=False, with_inactives=False, format="html"): else: olds_checked = "" H.append( - """

- Tous les départements - Avec anciens utilisateurs + f"""

+ Tous les départements + Avec anciens utilisateurs

""" - % (request.base_url, checked, olds_checked) ) L = list_users( @@ -127,7 +117,7 @@ def list_users( if dept and not all_depts: users = get_user_list(dept=dept, with_inactives=with_inactives) - comm = "dept. %s" % dept + comm = f"dept. {dept}" else: users = get_user_list(with_inactives=with_inactives) comm = "tous" @@ -135,13 +125,13 @@ def list_users( comm += ", avec anciens" comm = "(" + comm + ")" # -- Add some information and links: - r = [] + rows = [] for u in users: # Can current user modify this user ? can_modify = can_handle_passwd(u, allow_admindepts=True) d = u.to_dict() - r.append(d) + rows.append(d) # Add links if with_links and can_modify: target = url_for( @@ -168,11 +158,15 @@ def list_users( "status_txt", ] # Seul l'admin peut voir les dates de dernière connexion + # et les infos CAS if current_user.is_administrator(): columns_ids.append("last_seen") + if ScoDocSiteConfig.is_cas_enabled(): + columns_ids += ["cas_id", "cas_allow_login", "cas_allow_scodoc_login"] + title = "Utilisateurs définis dans ScoDoc" tab = GenTable( - rows=r, + rows=rows, columns_ids=columns_ids, titles={ "user_name": "Login", @@ -186,12 +180,14 @@ def list_users( "last_seen": "Dernière cnx.", "passwd_temp": "Temp.", "status_txt": "Etat", + "cas_id": "Id CAS", + "cas_allow_login": "CAS autorisé", + "cas_allow_scodoc_login": "Cnx sans CAS", }, caption=title, page_title="title", - html_title="""

%d utilisateurs %s

-

Cliquer sur un nom pour changer son mot de passe

""" - % (len(r), comm), + html_title=f"""

{len(rows)} utilisateurs {comm}

+

Cliquer sur un nom pour changer son mot de passe

""", html_class="table_leftalign list_users", html_with_td_classes=True, html_sortable=True, @@ -217,15 +213,6 @@ def get_user_list(dept=None, with_inactives=False): return q.order_by(User.nom, User.user_name).all() -def _user_list(user_name): - "return user as a dict" - u = User.query.filter_by(user_name=user_name).first() - if u: - return u.to_dict() - else: - return None - - @cache.memoize(timeout=50) # seconds def user_info(user_name_or_id=None, user: User = None): """Dict avec infos sur l'utilisateur (qui peut ne pas etre dans notre base). @@ -273,6 +260,9 @@ def user_info(user_name_or_id=None, user: User = None): return info +MSG_OPT = """
Attention: (vous pouvez forcer l'opération en cochant "Ignorer les avertissements" en bas de page)""" + + def check_modif_user( edit, enforce_optionals=False, @@ -281,7 +271,8 @@ def check_modif_user( prenom="", email="", dept="", - roles=[], + roles: list = None, + cas_id: str = None, ): """Vérifie que cet utilisateur peut être créé (edit=0) ou modifié (edit=1) Cherche homonymes. @@ -290,32 +281,31 @@ def check_modif_user( (si ok est faux, l'utilisateur peut quand même forcer la creation) - msg: message warning à presenter à l'utilisateur """ - MSG_OPT = """
Attention: (vous pouvez forcer l'opération en cochant "Ignorer les avertissements" en bas de page)""" + roles = roles or [] # ce login existe ? - user = _user_list(user_name) + user = User.query.filter_by(user_name=user_name).first() if edit and not user: # safety net, le user_name ne devrait pas changer - return False, "identifiant %s inexistant" % user_name + return False, f"identifiant {user_name} inexistant" if not edit and user: - return False, "identifiant %s déjà utilisé" % user_name + return False, f"identifiant {user_name} déjà utilisé" if not user_name or not nom or not prenom: return False, "champ requis vide" if not re.match(r"^[a-zA-Z0-9@\\\-_\\\.]*$", user_name): return ( False, - "identifiant '%s' invalide (pas d'accents ni de caractères spéciaux)" - % user_name, + f"identifiant '{user_name}' invalide (pas d'accents ni de caractères spéciaux)", ) if enforce_optionals and len(user_name) > 64: - return False, "identifiant '%s' trop long (64 caractères)" % user_name + return False, f"identifiant '{user_name}' trop long (64 caractères)" if enforce_optionals and len(nom) > 64: - return False, "nom '%s' trop long (64 caractères)" % nom + MSG_OPT + return False, f"nom '{nom}' trop long (64 caractères)" + MSG_OPT if enforce_optionals and len(prenom) > 64: - return False, "prenom '%s' trop long (64 caractères)" % prenom + MSG_OPT - # check that tha same user_name has not already been described in this import + return False, f"prenom '{prenom}' trop long (64 caractères)" + MSG_OPT + # check that same user_name has not already been described in this import if not email: return False, "vous devriez indiquer le mail de l'utilisateur créé !" if len(email) > 120: - return False, "email '%s' trop long (120 caractères)" % email + return False, f"email '{email}' trop long (120 caractères)" if not re.fullmatch(r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b", email): return False, "l'adresse mail semble incorrecte" # check département @@ -324,19 +314,31 @@ def check_modif_user( and dept and Departement.query.filter_by(acronym=dept).first() is None ): - return False, "département '%s' inexistant" % dept + MSG_OPT + return False, f"département '{dept}' inexistant" + MSG_OPT if enforce_optionals and not roles: return False, "aucun rôle sélectionné, êtes vous sûr ?" + MSG_OPT # Unicité du mail users_with_this_mail = User.query.filter_by(email=email).all() if edit: # modification - if email != user["email"] and len(users_with_this_mail) > 0: + if email != user.email and len(users_with_this_mail) > 0: return False, "un autre utilisateur existe déjà avec cette adresse mail" else: # création utilisateur if len(users_with_this_mail) > 0: return False, "un autre utilisateur existe déjà avec cette adresse mail" - # ok + # Unicité du cas_id + if cas_id: + cas_users = User.query.filter_by(cas_id=cas_id).all() + if edit: + if cas_users and ( + len(cas_users) > 1 or cas_users[0].user_name != user_name + ): + return ( + False, + "un autre utilisateur existe déjà avec cet identifiant CAS", + ) + elif cas_users: + return False, "un autre utilisateur existe déjà avec cet identifiant CAS" # Des noms/prénoms semblables existent ? nom = nom.lower().strip() prenom = prenom.lower().strip() @@ -367,7 +369,7 @@ def user_edit(user_name, vals): """Edit the user specified by user_name (ported from Zope to SQLAlchemy, hence strange !) """ - u = User.query.filter_by(user_name=user_name).first() + u: User = User.query.filter_by(user_name=user_name).first() if not u: raise ScoValueError("Invalid user_name") u.from_dict(vals) diff --git a/app/static/css/scodoc.css b/app/static/css/scodoc.css index 5aa3c70a..687fa156 100644 --- a/app/static/css/scodoc.css +++ b/app/static/css/scodoc.css @@ -3551,14 +3551,17 @@ table.table_listegroupe tr td { padding-right: 0.5em; } -table.list_users td.roles { - width: 22em; -} - table.list_users td.date_modif_passwd { white-space: nowrap; } +table.list_users td.roles_string, +table.list_users th.roles_string { + word-wrap: break-word; + overflow-wrap: break-word; +} + + table.formsemestre_description tr.table_row_ue td { font-weight: bold; } @@ -4540,4 +4543,10 @@ table.formation_table_recap td.heures_cours, table.formation_table_recap td.heures_td, table.formation_table_recap td.heures_tp { text-align: right; +} + +div.cas_etat_certif_ssl { + margin-top: 12px; + font-style: italic; + border: 1px dashed black; } \ No newline at end of file diff --git a/app/templates/auth/login.j2 b/app/templates/auth/login.j2 index 2877f672..8f9f4a8c 100644 --- a/app/templates/auth/login.j2 +++ b/app/templates/auth/login.j2 @@ -9,6 +9,12 @@ {% endif %}

Connexion

+ +{% if is_cas_enabled %} +
+Se connecter avec CAS +
+{% endif %}
{{ wtf.quick_form(form) }} diff --git a/app/templates/auth/user_info_page.j2 b/app/templates/auth/user_info_page.j2 index 23e8c27b..d5284579 100644 --- a/app/templates/auth/user_info_page.j2 +++ b/app/templates/auth/user_info_page.j2 @@ -7,6 +7,9 @@

Utilisateur: {{user.user_name}} ({{'actif' if user.active else 'fermé'}})

Login : {{user.user_name}}
+ CAS id: {{user.cas_id or "(aucun)"}} + (CAS {{'autorisé' if user.cas_allow_login else 'interdit'}} pour cet utilisateur) +
Nom : {{user.nom or ""}}
Prénom : {{user.prenom or ""}}
Mail : {{user.email}}
@@ -48,9 +51,15 @@ {% if current_user.id == user.id %} -

Se déconnecter: - logout -

+ {% endif %} {# Liste des permissions #} diff --git a/app/templates/config_cas.j2 b/app/templates/config_cas.j2 new file mode 100644 index 00000000..430f1ffe --- /dev/null +++ b/app/templates/config_cas.j2 @@ -0,0 +1,25 @@ +{% extends "base.j2" %} +{% import 'bootstrap/wtf.html' as wtf %} + +{% block app_content %} +

Configuration du Service d'Authentification Central (CAS)

+ + +
+

Le CAS permet d'utiliser un service SSO pour connecter les utilisateurs.

+
+
+
+ {{ wtf.quick_form(form) }} +
Certificat SSL + {% if cas_ssl_certificate_loaded %} + chargé. + {% else %} + non chargé. + {% endif %} +
+
+
+ + +{% endblock %} \ No newline at end of file diff --git a/app/templates/configuration.j2 b/app/templates/configuration.j2 index 05567ef3..94726c62 100644 --- a/app/templates/configuration.j2 +++ b/app/templates/configuration.j2 @@ -53,8 +53,9 @@

-

Utilisateurs

+

Utilisateurs et CAS

+

configuration du service CAS

remettre les permissions des rôles standards à leurs valeurs par défaut (efface les modifications apportées)

diff --git a/app/views/scodoc.py b/app/views/scodoc.py index 83055b50..2aa27968 100644 --- a/app/views/scodoc.py +++ b/app/views/scodoc.py @@ -54,6 +54,7 @@ from werkzeug.exceptions import BadRequest, NotFound from app import db from app.auth.models import User +from app.auth.cas import set_cas_configuration from app.decorators import ( admin_required, scodoc7func, @@ -62,6 +63,7 @@ from app.decorators import ( from app.forms.main import config_logos, config_main from app.forms.main.create_dept import CreateDeptForm from app.forms.main.config_apo import CodesDecisionsForm +from app.forms.main.config_cas import ConfigCASForm from app import models from app.models import Departement, Identite from app.models import departements @@ -73,7 +75,7 @@ from app.scodoc import sco_find_etud from app.scodoc import sco_logos from app.scodoc import sco_utils as scu -from app.scodoc.sco_exceptions import AccessDenied +from app.scodoc.sco_exceptions import AccessDenied, ScoValueError from app.scodoc.sco_permissions import Permission from app.views import scodoc_bp as bp import sco_version @@ -134,6 +136,46 @@ def toggle_dept_vis(dept_id): return redirect(url_for("scodoc.index")) +@bp.route("/ScoDoc/config_cas", methods=["GET", "POST"]) +@admin_required +def config_cas(): + """Form config CAS""" + form = ConfigCASForm() + if request.method == "POST" and form.cancel.data: # cancel button + return redirect(url_for("scodoc.index")) + if form.validate_on_submit(): + if ScoDocSiteConfig.cas_enable(enabled=form.data["cas_enable"]): + flash("CAS " + ("activé" if form.data["cas_enable"] else "désactivé")) + if ScoDocSiteConfig.set("cas_server", form.data["cas_server"]): + flash("Serveur CAS enregistré") + if ScoDocSiteConfig.set("cas_attribute_id", form.data["cas_attribute_id"]): + flash("Serveur CAS enregistré") + if ScoDocSiteConfig.set("cas_ssl_verify", form.data["cas_ssl_verify"]): + flash("Vérification SSL modifiée") + if form.cas_ssl_certificate_file.data: + data = request.files[form.cas_ssl_certificate_file.name].read() + try: + data_str = data.decode("ascii") + except UnicodeDecodeError as exc: + raise ScoValueError("Fichier certificat invalide (non ASCII)") from exc + if ScoDocSiteConfig.set("cas_ssl_certificate", data_str): + flash("Certificat SSL enregistré") + set_cas_configuration() + return redirect(url_for("scodoc.configuration")) + + elif request.method == "GET": + form.cas_enable.data = ScoDocSiteConfig.get("cas_enable") + form.cas_server.data = ScoDocSiteConfig.get("cas_server") + form.cas_attribute_id.data = ScoDocSiteConfig.get("cas_attribute_id") + form.cas_ssl_verify.data = ScoDocSiteConfig.get("cas_ssl_verify") + return render_template( + "config_cas.j2", + form=form, + title="Configuration du Service d'Authentification Central (CAS)", + cas_ssl_certificate_loaded=ScoDocSiteConfig.get("cas_ssl_certificate"), + ) + + @bp.route("/ScoDoc/config_codes_decisions", methods=["GET", "POST"]) @admin_required def config_codes_decisions(): diff --git a/app/views/users.py b/app/views/users.py index f6d0bcc7..7efc08e1 100644 --- a/app/views/users.py +++ b/app/views/users.py @@ -56,6 +56,7 @@ from app.auth.models import UserRole from app.auth.models import is_valid_password from app.email import send_email from app.models import Departement +from app.models.config import ScoDocSiteConfig from app.decorators import ( scodoc, @@ -226,7 +227,7 @@ def create_user_form(user_name=None, edit=0, all_roles=True): if edit: if not user_name: raise ValueError("missing argument: user_name") - the_user = User.query.filter_by(user_name=user_name).first() + the_user: User = User.query.filter_by(user_name=user_name).first() if not the_user: raise ScoValueError("utilisateur inexistant") initvalues = the_user.to_dict() @@ -367,6 +368,7 @@ def create_user_form(user_name=None, edit=0, all_roles=True): {"input_type": "hidden", "default": initvalues["user_name"]}, ) ] + cas_enabled = ScoDocSiteConfig.is_cas_enabled() descr += [ ( "email", @@ -376,11 +378,34 @@ def create_user_form(user_name=None, edit=0, all_roles=True): "explanation": "requis, doit fonctionner" if not edit_only_roles else "", - "size": 20, + "size": 36, "allow_null": False, "readonly": edit_only_roles, }, - ) + ), + ( + "cas_id", + { + "title": "Identifiant CAS", + "input_type": "text", + "explanation": "id du compte utilisateur sur le CAS de l'établissement " + + "(service CAS activé)" + if cas_enabled + else "(service CAS non activé)", + "size": 36, + "allow_null": True, + "readonly": not cas_enabled, + }, + ), + ( + "cas_allow_login", + { + "title": "Autorise connexion via CAS", + "input_type": "boolcheckbox", + "explanation": "en test: seul le super-administrateur peut changer ce réglage", + "readonly": not current_user.is_administrator(), + }, + ), ] if not edit: # options création utilisateur descr += [ @@ -438,7 +463,8 @@ def create_user_form(user_name=None, edit=0, all_roles=True): "d", { "input_type": "separator", - "title": f"""L'utilisateur appartient au département {the_user.dept or "(tous)"}""", + "title": f"""L'utilisateur appartient au département { + the_user.dept or "(tous)"}""", }, ) ) @@ -541,7 +567,6 @@ def create_user_form(user_name=None, edit=0, all_roles=True): if err_msg: H.append(tf_error_message(f"""Erreur: {err_msg}""")) return "\n".join(H) + "\n" + tf[1] + F - if not edit_only_roles: ok_modif, msg = sco_users.check_modif_user( edit, @@ -552,6 +577,7 @@ def create_user_form(user_name=None, edit=0, all_roles=True): email=vals["email"], dept=vals.get("dept", auth_dept), roles=vals["roles"], + cas_id=vals["cas_id"], ) if not ok_modif: H.append(tf_error_message(msg)) @@ -815,12 +841,17 @@ def user_info_page(user_name=None): if not user: raise ScoValueError("invalid user_name") + session_info = None + if user.id == current_user.id: + session_info = flask.session.get("scodoc_cas_login_date") + return render_template( "auth/user_info_page.j2", user=user, title=f"Utilisateur {user.user_name}", Permission=Permission, dept=dept, + session_info=session_info, ) diff --git a/config.py b/config.py index 740860a6..ed456b0b 100755 --- a/config.py +++ b/config.py @@ -1,7 +1,7 @@ # -*- coding: UTF-8 -* import os -import uuid +import logging from dotenv import load_dotenv BASEDIR = os.path.abspath(os.path.dirname(__file__)) @@ -16,6 +16,7 @@ class Config: SECRET_KEY = os.environ.get("SECRET_KEY") or "90e01e75831e4276a4c70d29564b425f" SQLALCHEMY_TRACK_MODIFICATIONS = False LOG_TO_STDOUT = os.environ.get("LOG_TO_STDOUT") + LOG_LEVEL = getattr(logging, os.environ.get("LOG_LEVEL", "INFO"), "INFO") MAIL_SERVER = os.environ.get("MAIL_SERVER", "localhost") MAIL_PORT = int(os.environ.get("MAIL_PORT", 25)) MAIL_USE_TLS = os.environ.get("MAIL_USE_TLS") is not None @@ -40,9 +41,6 @@ class Config: # Pour conserver l'ordre des objets dans les JSON: # e.g. l'ordre des UE dans les bulletins JSON_SORT_KEYS = False - # STATIC_URL_PATH = "/ScoDoc/static" - # static_folder = "stat" - # SERVER_NAME = os.environ.get("SERVER_NAME") class ProdConfig(Config): diff --git a/flask_cas/README.md b/flask_cas/README.md new file mode 100644 index 00000000..d71e8b50 --- /dev/null +++ b/flask_cas/README.md @@ -0,0 +1,7 @@ +# Flask-CAS + +Forked from +and adapted by Emmanuel Viennet, Feb. 2023. + +- logout: clear `_CAS_TOKEN`. +- Use `url` instead of `service` parameter in logout URL. diff --git a/flask_cas/__init__.py b/flask_cas/__init__.py new file mode 100644 index 00000000..76e02c25 --- /dev/null +++ b/flask_cas/__init__.py @@ -0,0 +1,106 @@ +""" +flask_cas.__init__ +""" + +import flask +from flask import current_app + +# Find the stack on which we want to store the database connection. +# Starting with Flask 0.9, the _app_ctx_stack is the correct one, +# before that we need to use the _request_ctx_stack. +try: + from flask import _app_ctx_stack as stack +except ImportError: + from flask import _request_ctx_stack as stack + +from . import routing + +from functools import wraps + +class CAS(object): + """ + Required Configs: + + |Key | + |----------------| + |CAS_SERVER | + |CAS_AFTER_LOGIN | + + Optional Configs: + + |Key | Default | + |---------------------------|-----------------------| + |CAS_TOKEN_SESSION_KEY | _CAS_TOKEN | + |CAS_USERNAME_SESSION_KEY | CAS_USERNAME | + |CAS_ATTRIBUTES_SESSION_KEY | CAS_ATTRIBUTES | + |CAS_LOGIN_ROUTE | '/cas' | + |CAS_LOGOUT_ROUTE | '/cas/logout' | + |CAS_VALIDATE_ROUTE | '/cas/serviceValidate'| + |CAS_AFTER_LOGOUT | None | + """ + + def __init__(self, app=None, url_prefix=None): + self._app = app + if app is not None: + self.init_app(app, url_prefix) + + def init_app(self, app, url_prefix=None): + # Configuration defaults + app.config.setdefault('CAS_TOKEN_SESSION_KEY', '_CAS_TOKEN') + app.config.setdefault('CAS_USERNAME_SESSION_KEY', 'CAS_USERNAME') + app.config.setdefault('CAS_ATTRIBUTES_SESSION_KEY', 'CAS_ATTRIBUTES') + app.config.setdefault('CAS_LOGIN_ROUTE', '/cas') + app.config.setdefault('CAS_LOGOUT_ROUTE', '/cas/logout') + app.config.setdefault('CAS_VALIDATE_ROUTE', '/cas/serviceValidate') + # Requires CAS 2.0 + app.config.setdefault('CAS_AFTER_LOGOUT', None) + # Register Blueprint + app.register_blueprint(routing.blueprint, url_prefix=url_prefix) + + # Use the newstyle teardown_appcontext if it's available, + # otherwise fall back to the request context + if hasattr(app, 'teardown_appcontext'): + app.teardown_appcontext(self.teardown) + else: + app.teardown_request(self.teardown) + + def teardown(self, exception): + ctx = stack.top + + @property + def app(self): + return self._app or current_app + + @property + def username(self): + return flask.session.get( + self.app.config['CAS_USERNAME_SESSION_KEY'], None) + + @property + def attributes(self): + return flask.session.get( + self.app.config['CAS_ATTRIBUTES_SESSION_KEY'], None) + + @property + def token(self): + return flask.session.get( + self.app.config['CAS_TOKEN_SESSION_KEY'], None) + +def login(): + return flask.redirect(flask.url_for('cas.login', _external=True)) + +def logout(): + return flask.redirect(flask.url_for('cas.logout', _external=True)) + +def login_required(function): + @wraps(function) + def wrap(*args, **kwargs): + if 'CAS_USERNAME' not in flask.session: + flask.session['CAS_AFTER_LOGIN_SESSION_URL'] = ( + flask.request.script_root + + flask.request.full_path + ) + return login() + else: + return function(*args, **kwargs) + return wrap diff --git a/flask_cas/cas_urls.py b/flask_cas/cas_urls.py new file mode 100644 index 00000000..276ef89c --- /dev/null +++ b/flask_cas/cas_urls.py @@ -0,0 +1,125 @@ +""" +flask_cas.cas_urls + +Functions for creating urls to access CAS. +""" + +try: + from urllib import quote + from urllib import urlencode + from urlparse import urljoin +except ImportError: + from urllib.parse import quote + from urllib.parse import urljoin + from urllib.parse import urlencode + + +def create_url(base, path=None, *query): + """Create a url. + + Creates a url by combining base, path, and the query's list of + key/value pairs. Escaping is handled automatically. Any + key/value pair with a value that is None is ignored. + + Keyword arguments: + base -- The left most part of the url (ex. http://localhost:5000). + path -- The path after the base (ex. /foo/bar). + query -- A list of key value pairs (ex. [('key', 'value')]). + + Example usage: + >>> create_url( + ... 'http://localhost:5000', + ... 'foo/bar', + ... ('key1', 'value'), + ... ('key2', None), # Will not include None + ... ('url', 'http://example.com'), + ... ) + 'http://localhost:5000/foo/bar?key1=value&url=http%3A%2F%2Fexample.com' + """ + url = base + # Add the path to the url if it's not None. + if path is not None: + url = urljoin(url, quote(path)) + # Remove key/value pairs with None values. + query = filter(lambda pair: pair[1] is not None, query) + # Add the query string to the url + url = urljoin(url, "?{0}".format(urlencode(list(query)))) + return url + + +def create_cas_login_url(cas_url, cas_route, service, renew=None, gateway=None): + """Create a CAS login URL . + + Keyword arguments: + cas_url -- The url to the CAS (ex. http://sso.pdx.edu) + cas_route -- The route where the CAS lives on server (ex. /cas) + service -- (ex. http://localhost:5000/login) + renew -- "true" or "false" + gateway -- "true" or "false" + + Example usage: + >>> create_cas_login_url( + ... 'http://sso.pdx.edu', + ... '/cas', + ... 'http://localhost:5000', + ... ) + 'http://sso.pdx.edu/cas?service=http%3A%2F%2Flocalhost%3A5000' + """ + return create_url( + cas_url, + cas_route, + ("service", service), + ("renew", renew), + ("gateway", gateway), + ) + + +def create_cas_logout_url(cas_url, cas_route, service=None): + """Create a CAS logout URL. + + Keyword arguments: + cas_url -- The url to the CAS (ex. http://sso.pdx.edu) + cas_route -- The route where the CAS lives on server (ex. /cas/logout) + url -- (ex. http://localhost:5000/login) + + Example usage: + >>> create_cas_logout_url( + ... 'http://sso.pdx.edu', + ... '/cas/logout', + ... 'http://localhost:5000', + ... ) + 'http://sso.pdx.edu/cas/logout?service=http%3A%2F%2Flocalhost%3A5000' + """ + return create_url( + cas_url, + cas_route, + ("url", service), # 'url' (and not 'service'): redirect here after CAS logout + ) + + +def create_cas_validate_url(cas_url, cas_route, service, ticket, renew=None): + """Create a CAS validate URL. + + Keyword arguments: + cas_url -- The url to the CAS (ex. http://sso.pdx.edu) + cas_route -- The route where the CAS lives on server (ex. /cas/serviceValidate) + service -- (ex. http://localhost:5000/login) + ticket -- (ex. 'ST-58274-x839euFek492ou832Eena7ee-cas') + renew -- "true" or "false" + + Example usage: + >>> create_cas_validate_url( + ... 'http://sso.pdx.edu', + ... '/cas/serviceValidate', + ... 'http://localhost:5000/login', + ... 'ST-58274-x839euFek492ou832Eena7ee-cas' + ... ) + 'http://sso.pdx.edu/cas/serviceValidate?service=http%3A%2F%2Flocalhost%3A5000%2Flogin&ticket=ST-58274-x839euFek492ou832Eena7ee-cas' + """ + return create_url( + cas_url, + cas_route, + ("service", service), + ("ticket", ticket), + ("renew", renew), + ) diff --git a/flask_cas/routing.py b/flask_cas/routing.py new file mode 100644 index 00000000..40112057 --- /dev/null +++ b/flask_cas/routing.py @@ -0,0 +1,194 @@ +import ssl + +import flask +from xmltodict import parse +from flask import current_app +from .cas_urls import create_cas_login_url +from .cas_urls import create_cas_logout_url +from .cas_urls import create_cas_validate_url + + +try: + from urllib import urlopen # python 2 +except ImportError: + from urllib.request import urlopen # python 3 +from urllib.error import URLError + +blueprint = flask.Blueprint("cas", __name__) + + +@blueprint.route("/login/") +def login(): + """ + This route has two purposes. First, it is used by the user + to login. Second, it is used by the CAS to respond with the + `ticket` after the user logs in successfully. + + When the user accesses this url, they are redirected to the CAS + to login. If the login was successful, the CAS will respond to this + route with the ticket in the url. The ticket is then validated. + If validation was successful the logged in username is saved in + the user's session under the key `CAS_USERNAME_SESSION_KEY` and + the user's attributes are saved under the key + 'CAS_USERNAME_ATTRIBUTE_KEY' + """ + + cas_token_session_key = current_app.config["CAS_TOKEN_SESSION_KEY"] + + redirect_url = create_cas_login_url( + current_app.config["CAS_SERVER"], + current_app.config["CAS_LOGIN_ROUTE"], + flask.url_for( + ".login", + origin=flask.session.get("CAS_AFTER_LOGIN_SESSION_URL"), + _external=True, + ), + ) + + if "ticket" in flask.request.args: + flask.session[cas_token_session_key] = flask.request.args["ticket"] + + if cas_token_session_key in flask.session: + + if validate(flask.session[cas_token_session_key]): + if "CAS_AFTER_LOGIN_SESSION_URL" in flask.session: + redirect_url = flask.session.pop("CAS_AFTER_LOGIN_SESSION_URL") + elif flask.request.args.get("origin"): + redirect_url = flask.request.args["origin"] + else: + redirect_url = flask.url_for(current_app.config["CAS_AFTER_LOGIN"]) + else: + flask.session.pop(cas_token_session_key, None) + + current_app.logger.debug("Redirecting to: {0}".format(redirect_url)) + + return flask.redirect(redirect_url) + + +@blueprint.route("/logout/") +def logout(): + """ + When the user accesses this route they are logged out. + """ + + cas_username_session_key = current_app.config["CAS_USERNAME_SESSION_KEY"] + cas_attributes_session_key = current_app.config["CAS_ATTRIBUTES_SESSION_KEY"] + cas_token_session_key = current_app.config["CAS_TOKEN_SESSION_KEY"] + + flask.session.pop(cas_username_session_key, None) + flask.session.pop(cas_attributes_session_key, None) + flask.session.pop(cas_token_session_key, None) # added by EV + + cas_after_logout = current_app.config["CAS_AFTER_LOGOUT"] + if cas_after_logout is not None: + # If config starts with http, use it as dest URL. + # Else, build Flask URL + dest_url = ( + cas_after_logout + if cas_after_logout.startswith("http") + else flask.url_for(cas_after_logout, _external=True) + ) + redirect_url = create_cas_logout_url( + current_app.config["CAS_SERVER"], + current_app.config["CAS_LOGOUT_ROUTE"], + dest_url, + ) + else: + redirect_url = create_cas_logout_url( + current_app.config["CAS_SERVER"], current_app.config["CAS_LOGOUT_ROUTE"] + ) + + current_app.logger.debug("Redirecting to: {0}".format(redirect_url)) + return flask.redirect(redirect_url) + + +def validate(ticket): + """ + Will attempt to validate the ticket. If validation fails, then False + is returned. If validation is successful, then True is returned + and the validated username is saved in the session under the + key `CAS_USERNAME_SESSION_KEY` while tha validated attributes dictionary + is saved under the key 'CAS_ATTRIBUTES_SESSION_KEY'. + """ + + cas_username_session_key = current_app.config["CAS_USERNAME_SESSION_KEY"] + cas_attributes_session_key = current_app.config["CAS_ATTRIBUTES_SESSION_KEY"] + cas_error_callback = current_app.config.get("CAS_ERROR_CALLBACK") + current_app.logger.debug("validating token {0}".format(ticket)) + + cas_validate_url = create_cas_validate_url( + current_app.config["CAS_SERVER"], + current_app.config["CAS_VALIDATE_ROUTE"], + flask.url_for( + ".login", + origin=flask.session.get("CAS_AFTER_LOGIN_SESSION_URL"), + _external=True, + ), + ticket, + ) + + current_app.logger.debug("Making GET request to {0}".format(cas_validate_url)) + + xml_from_dict = {} + isValid = False + + if current_app.config.get("CAS_SSL_VERIFY"): + ssl_context = ssl.SSLContext() + ssl_context.verify_mode = ssl.CERT_REQUIRED + ca_data = current_app.config.get("CAS_SSL_CERTIFICATE", "") + try: + ssl_context.load_verify_locations(cadata=ca_data) + except (ssl.SSLError, ValueError): + current_app.logger.error("CAS : error loading SSL cert.") + if cas_error_callback: + cas_error_callback("erreur chargement certificat SSL CAS (PEM)") + return False + else: + ssl_context = None + + try: + xmldump = ( + urlopen(cas_validate_url, context=ssl_context) + .read() + .strip() + .decode("utf8", "ignore") + ) + xml_from_dict = parse(xmldump) + isValid = ( + True + if "cas:authenticationSuccess" in xml_from_dict["cas:serviceResponse"] + else False + ) + except ValueError: + current_app.logger.error("CAS returned unexpected result") + if cas_error_callback: + cas_error_callback("réponse invalide du serveur CAS") + except URLError: + current_app.logger.error("CAS : error validating token: check SSL certificate") + cas_error_callback( + "erreur connexion au serveur CAS: vérifiez le certificat SSL" + ) + + if isValid: + current_app.logger.debug("valid") + xml_from_dict = xml_from_dict["cas:serviceResponse"][ + "cas:authenticationSuccess" + ] + username = xml_from_dict["cas:user"] + attributes = xml_from_dict.get("cas:attributes", {}) + + if attributes and "cas:memberOf" in attributes: + if isinstance(attributes["cas:memberOf"], basestring): + attributes["cas:memberOf"] = ( + attributes["cas:memberOf"].lstrip("[").rstrip("]").split(",") + ) + for group_number in range(0, len(attributes["cas:memberOf"])): + attributes["cas:memberOf"][group_number] = ( + attributes["cas:memberOf"][group_number].lstrip(" ").rstrip(" ") + ) + flask.session[cas_username_session_key] = username + flask.session[cas_attributes_session_key] = attributes + else: + current_app.logger.debug("invalid") + + return isValid diff --git a/migrations/versions/4c19fcb42636_cas.py b/migrations/versions/4c19fcb42636_cas.py new file mode 100644 index 00000000..e43c9266 --- /dev/null +++ b/migrations/versions/4c19fcb42636_cas.py @@ -0,0 +1,47 @@ +"""CAS + +Revision ID: 4c19fcb42636 +Revises: d8288b7f0a3e +Create Date: 2023-02-26 20:58:30.113631 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = "4c19fcb42636" +down_revision = "d8288b7f0a3e" +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.add_column("user", sa.Column("cas_id", sa.Text(), nullable=True)) + op.add_column( + "user", + sa.Column( + "cas_allow_login", sa.Boolean(), server_default="false", nullable=False + ), + ) + op.add_column( + "user", + sa.Column( + "cas_allow_scodoc_login", + sa.Boolean(), + server_default="false", + nullable=False, + ), + ) + op.create_index(op.f("ix_user_cas_id"), "user", ["cas_id"], unique=True) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_index(op.f("ix_user_cas_id"), table_name="user") + op.drop_column("user", "cas_allow_scodoc_login") + op.drop_column("user", "cas_allow_login") + op.drop_column("user", "cas_id") + # ### end Alembic commands ### diff --git a/requirements-3.9.txt b/requirements-3.9.txt index 9032183a..99dddf50 100755 --- a/requirements-3.9.txt +++ b/requirements-3.9.txt @@ -85,4 +85,5 @@ visitor==0.1.3 Werkzeug==2.1.1 wrapt==1.14.0 WTForms==3.0.1 +xmltodict==0.13.0 zipp==3.8.0 diff --git a/sco_version.py b/sco_version.py index 83965dec..06b4709d 100644 --- a/sco_version.py +++ b/sco_version.py @@ -10,6 +10,7 @@ SCONEWS = """
  • ScoDoc 9.4
    • +
    • Connexion avec service CAS
    • Améliorations des tableaux récapitulatifs
    • Nouvelle interface de gestions des groupes (S. Lehmann)
    • Enrichissement des jurys BUT et des procès-verbaux associés.
    • diff --git a/scodoc.py b/scodoc.py index ec197aa7..2f152b29 100755 --- a/scodoc.py +++ b/scodoc.py @@ -101,6 +101,7 @@ def make_shell_context(): "res_sem": res_sem, "ResultatsSemestreBUT": ResultatsSemestreBUT, "Role": Role, + "ScoDocSiteConfig": models.ScoDocSiteConfig, "scolar": scolar, "ScolarAutorisationInscription": ScolarAutorisationInscription, "ScolarFormSemestreValidation": ScolarFormSemestreValidation,