# -*- coding: UTF-8 -* """ auth.cas.py """ import datetime import flask from flask import current_app, flash, url_for from flask_login import current_user, login_user from app import db from app.auth import bp from app.auth.models import User from app.models.config import ScoDocSiteConfig from app.scodoc import sco_excel from app.scodoc.sco_exceptions import ScoValueError, AccessDenied import app.scodoc.sco_utils as scu # after_cas_login/after_cas_logout : routes appelées par redirect depuis le serveur CAS. @bp.route("/after_cas_login") def after_cas_login(): "Called by CAS after CAS authentication" # Ici on a les infos dans flask.session["CAS_ATTRIBUTES"] if ScoDocSiteConfig.is_cas_enabled() and ("CAS_ATTRIBUTES" in flask.session): # Lookup user: cas_id = flask.session["CAS_ATTRIBUTES"].get( "cas:" + ScoDocSiteConfig.get("cas_attribute_id"), flask.session.get("CAS_USERNAME"), ) if cas_id is not None: user: User = User.query.filter_by(cas_id=str(cas_id)).first() if user and user.active: if user.cas_allow_login: current_app.logger.info(f"CAS: login {user.user_name}") if login_user(user): flask.session["scodoc_cas_login_date"] = ( datetime.datetime.now().isoformat() ) user.cas_last_login = datetime.datetime.utcnow() if flask.session.get("CAS_EDT_ID"): # essaie de récupérer l'edt_id s'il est présent # cet ID peut être renvoyé par le CAS et extrait par ScoDoc # via l'expression `cas_edt_id_from_xml_regexp` # voir flask_cas.routing edt_id = flask.session.get("CAS_EDT_ID") current_app.logger.info( f"""after_cas_login: storing edt_id for { user.user_name}: '{edt_id}'""" ) user.edt_id = edt_id db.session.add(user) db.session.commit() return flask.redirect(url_for("scodoc.index")) else: current_app.logger.info( f"CAS login denied for {user.user_name} (not allowed to use CAS)" ) else: current_app.logger.info( f"""CAS login denied for { user.user_name if user else "" } cas_id={cas_id} (unknown or inactive)""" ) else: current_app.logger.info( f"""CAS attribute '{ScoDocSiteConfig.get("cas_attribute_id")}' not found ! (check your ScoDoc config)""" ) # Echec: flash("échec de l'authentification") return flask.redirect(url_for("auth.login")) @bp.route("/after_cas_logout") def after_cas_logout(): "Called by CAS after CAS logout" flash("Vous êtes déconnecté") current_app.logger.info("after_cas_logout") return flask.redirect(url_for("scodoc.index")) def cas_error_callback(message): "Called by CAS when an error occurs, with a message" raise ScoValueError(f"Erreur authentification CAS: {message}") def set_cas_configuration(app: flask.app.Flask = None): """Force la configuration du module flask_cas à partir des paramètres de la config de ScoDoc. Appelé au démarrage et à chaque modif des paramètres. """ app = app or current_app if ScoDocSiteConfig.is_cas_enabled(): current_app.logger.debug("CAS: set_cas_configuration") app.config["CAS_SERVER"] = ScoDocSiteConfig.get("cas_server") app.config["CAS_LOGIN_ROUTE"] = ScoDocSiteConfig.get("cas_login_route", "/cas") app.config["CAS_LOGOUT_ROUTE"] = ScoDocSiteConfig.get( "cas_logout_route", "/cas/logout" ) app.config["CAS_VALIDATE_ROUTE"] = ScoDocSiteConfig.get( "cas_validate_route", "/cas/serviceValidate" ) app.config["CAS_AFTER_LOGIN"] = "auth.after_cas_login" app.config["CAS_AFTER_LOGOUT"] = "auth.after_cas_logout" app.config["CAS_ERROR_CALLBACK"] = cas_error_callback app.config["CAS_SSL_VERIFY"] = ScoDocSiteConfig.get("cas_ssl_verify") app.config["CAS_SSL_CERTIFICATE"] = ScoDocSiteConfig.get("cas_ssl_certificate") else: app.config.pop("CAS_SERVER", None) app.config.pop("CAS_AFTER_LOGIN", None) app.config.pop("CAS_AFTER_LOGOUT", None) app.config.pop("CAS_SSL_VERIFY", None) app.config.pop("CAS_SSL_CERTIFICATE", None) CAS_USER_INFO_IDS = ( "user_name", "nom", "prenom", "email", "roles_string", "active", "dept", "cas_id", "cas_allow_login", "cas_allow_scodoc_login", "email_institutionnel", ) CAS_USER_INFO_COMMENTS = ( """user_name: L'identifiant (login). """, "", "", "", "Pour info: 0 si compte inactif", """Pour info: roles: chaînes séparées par _: 1. Le rôle (Ens, Secr ou Admin) 2. Le département (en majuscule) """, """dept: Le département d'appartenance de l'utilisateur. Vide si l'utilisateur intervient dans plusieurs départements. """, """cas_id: identifiant de l'utilisateur sur CAS (requis pour CAS). """, """cas_allow_login: autorise la connexion via CAS (optionnel, faux par défaut) """, """cas_allow_scodoc_login autorise connexion via ScoDoc même si CAS obligatoire (optionnel, faux par défaut) """, """email_institutionnel optionnel, le mail officiel de l'utilisateur. Maximum 120 caractères.""", ) def cas_users_generate_excel_sample() -> bytes: """generate an excel document suitable to import users CAS information""" style = sco_excel.excel_make_style(bold=True) titles = CAS_USER_INFO_IDS titles_styles = [style] * len(titles) # Extrait tous les utilisateurs (tous dept et statuts) rows = [] for user in User.query.order_by(User.user_name): u_dict = user.to_dict() rows.append([u_dict.get(k) for k in CAS_USER_INFO_IDS]) return sco_excel.excel_simple_table( lines=rows, titles=titles, titles_styles=titles_styles, sheet_name="Utilisateurs ScoDoc", comments=CAS_USER_INFO_COMMENTS, ) def cas_users_import_excel_file(datafile) -> int: """ Import users CAS configuration from Excel file. May change cas_id, cas_allow_login, cas_allow_scodoc_login and active. :param datafile: stream to be imported :return: nb de comptes utilisateurs modifiés """ from app.scodoc import sco_import_users if not current_user.is_administrator(): raise AccessDenied(f"invalid user ({current_user}) must be SuperAdmin") current_app.logger.info("cas_users_import_excel_file by {current_user}") users_infos = sco_import_users.read_users_excel_file( datafile, titles=CAS_USER_INFO_IDS ) return cas_users_import_data(users_infos=users_infos) def cas_users_import_data(users_infos: list[dict]) -> int: """Import informations configuration CAS users est une liste de dict, on utilise seulement les champs: - user_name : la clé, l'utilisateur DOIT déjà exister - cas_id : l'ID CAS a enregistrer. - cas_allow_login - cas_allow_scodoc_login Les éventuels autres champs sont ignorés. Return: nb de comptes modifiés. """ nb_modif = 0 users = [] for info in users_infos: user: User = User.query.filter_by(user_name=info["user_name"]).first() if not user: db.session.rollback() # au cas où auto-flush raise ScoValueError(f"""Utilisateur '{info["user_name"]}' inexistant""") modif = False new_cas_id = info["cas_id"].strip() if new_cas_id != (user.cas_id or ""): # check unicity other = User.query.filter_by(cas_id=new_cas_id).first() if other and other.id != user.id: db.session.rollback() # au cas où auto-flush raise ScoValueError(f"cas_id {new_cas_id} dupliqué") user.cas_id = info["cas_id"].strip() or None modif = True val = scu.to_bool(info["cas_allow_login"]) if val != user.cas_allow_login: user.cas_allow_login = val modif = True val = scu.to_bool(info["cas_allow_scodoc_login"]) if val != user.cas_allow_scodoc_login: user.cas_allow_scodoc_login = val modif = True val = scu.to_bool(info["active"]) if val != (user.active or False): user.active = val modif = True if modif: nb_modif += 1 # Record modifications for user in users: try: db.session.add(user) except Exception as exc: db.session.rollback() raise ScoValueError( "Erreur (1) durant l'importation des modifications" ) from exc try: db.session.commit() except Exception as exc: db.session.rollback() raise ScoValueError( "Erreur (2) durant l'importation des modifications" ) from exc return nb_modif