ScoDoc/app/auth/cas.py

252 lines
8.6 KiB
Python

# -*- coding: UTF-8 -*
"""
auth.cas.py
"""
import datetime
import flask
from flask import current_app, flash, url_for
from flask_login import current_user, login_user
from app import db
from app.auth import bp
from app.auth.models import User
from app.models.config import ScoDocSiteConfig
from app.scodoc import sco_excel
from app.scodoc.sco_exceptions import ScoValueError, AccessDenied
import app.scodoc.sco_utils as scu
# after_cas_login/after_cas_logout : routes appelées par redirect depuis le serveur CAS.
@bp.route("/after_cas_login")
def after_cas_login():
"Called by CAS after CAS authentication"
# Ici on a les infos dans flask.session["CAS_ATTRIBUTES"]
if ScoDocSiteConfig.is_cas_enabled() and ("CAS_ATTRIBUTES" in flask.session):
# Lookup user:
cas_id = flask.session["CAS_ATTRIBUTES"].get(
"cas:" + ScoDocSiteConfig.get("cas_attribute_id"),
flask.session.get("CAS_USERNAME"),
)
if cas_id is not None:
user: User = User.query.filter_by(cas_id=cas_id).first()
if user and user.active:
if user.cas_allow_login:
current_app.logger.info(f"CAS: login {user.user_name}")
if login_user(user):
flask.session[
"scodoc_cas_login_date"
] = datetime.datetime.now().isoformat()
user.cas_last_login = datetime.datetime.utcnow()
db.session.add(user)
db.session.commit()
return flask.redirect(url_for("scodoc.index"))
else:
current_app.logger.info(
f"CAS login denied for {user.user_name} (not allowed to use CAS)"
)
else:
current_app.logger.info(
f"""CAS login denied for {
user.user_name if user else ""
} cas_id={cas_id} (unknown or inactive)"""
)
else:
current_app.logger.info(
f"""CAS attribute '{ScoDocSiteConfig.get("cas_attribute_id")}' not found !
(check your ScoDoc config)"""
)
# Echec:
flash("échec de l'authentification")
return flask.redirect(url_for("auth.login"))
@bp.route("/after_cas_logout")
def after_cas_logout():
"Called by CAS after CAS logout"
flash("Vous êtes déconnecté")
current_app.logger.info("after_cas_logout")
return flask.redirect(url_for("scodoc.index"))
def cas_error_callback(message):
"Called by CAS when an error occurs, with a message"
raise ScoValueError(f"Erreur authentification CAS: {message}")
def set_cas_configuration(app: flask.app.Flask = None):
"""Force la configuration du module flask_cas à partir des paramètres de
la config de ScoDoc.
Appelé au démarrage et à chaque modif des paramètres.
"""
app = app or current_app
if ScoDocSiteConfig.is_cas_enabled():
current_app.logger.debug("CAS: set_cas_configuration")
app.config["CAS_SERVER"] = ScoDocSiteConfig.get("cas_server")
app.config["CAS_LOGIN_ROUTE"] = ScoDocSiteConfig.get("cas_login_route", "/cas")
app.config["CAS_LOGOUT_ROUTE"] = ScoDocSiteConfig.get(
"cas_logout_route", "/cas/logout"
)
app.config["CAS_VALIDATE_ROUTE"] = ScoDocSiteConfig.get(
"cas_validate_route", "/cas/serviceValidate"
)
app.config["CAS_AFTER_LOGIN"] = "auth.after_cas_login"
app.config["CAS_AFTER_LOGOUT"] = "auth.after_cas_logout"
app.config["CAS_ERROR_CALLBACK"] = cas_error_callback
app.config["CAS_SSL_VERIFY"] = ScoDocSiteConfig.get("cas_ssl_verify")
app.config["CAS_SSL_CERTIFICATE"] = ScoDocSiteConfig.get("cas_ssl_certificate")
else:
app.config.pop("CAS_SERVER", None)
app.config.pop("CAS_AFTER_LOGIN", None)
app.config.pop("CAS_AFTER_LOGOUT", None)
app.config.pop("CAS_SSL_VERIFY", None)
app.config.pop("CAS_SSL_CERTIFICATE", None)
CAS_USER_INFO_IDS = (
"user_name",
"nom",
"prenom",
"email",
"roles_string",
"active",
"dept",
"cas_id",
"cas_allow_login",
"cas_allow_scodoc_login",
"email_institutionnel",
)
CAS_USER_INFO_COMMENTS = (
"""user_name:
L'identifiant (login).
""",
"",
"",
"",
"Pour info: 0 si compte inactif",
"""Pour info: roles:
chaînes séparées par _:
1. Le rôle (Ens, Secr ou Admin)
2. Le département (en majuscule)
""",
"""dept:
Le département d'appartenance de l'utilisateur. Vide si l'utilisateur
intervient dans plusieurs départements.
""",
"""cas_id:
identifiant de l'utilisateur sur CAS (requis pour CAS).
""",
"""cas_allow_login:
autorise la connexion via CAS (optionnel, faux par défaut)
""",
"""cas_allow_scodoc_login
autorise connexion via ScoDoc même si CAS obligatoire (optionnel, faux par défaut)
""",
"""email_institutionnel
optionnel, le mail officiel de l'utilisateur.
Maximum 120 caractères.""",
)
def cas_users_generate_excel_sample() -> bytes:
"""generate an excel document suitable to import users CAS information"""
style = sco_excel.excel_make_style(bold=True)
titles = CAS_USER_INFO_IDS
titles_styles = [style] * len(titles)
# Extrait tous les utilisateurs (tous dept et statuts)
rows = []
for user in User.query.order_by(User.user_name):
u_dict = user.to_dict()
rows.append([u_dict.get(k) for k in CAS_USER_INFO_IDS])
return sco_excel.excel_simple_table(
lines=rows,
titles=titles,
titles_styles=titles_styles,
sheet_name="Utilisateurs ScoDoc",
comments=CAS_USER_INFO_COMMENTS,
)
def cas_users_import_excel_file(datafile) -> int:
"""
Import users CAS configuration from Excel file.
May change cas_id, cas_allow_login, cas_allow_scodoc_login
and active.
:param datafile: stream to be imported
:return: nb de comptes utilisateurs modifiés
"""
from app.scodoc import sco_import_users
if not current_user.is_administrator():
raise AccessDenied(f"invalid user ({current_user}) must be SuperAdmin")
current_app.logger.info("cas_users_import_excel_file by {current_user}")
users_infos = sco_import_users.read_users_excel_file(
datafile, titles=CAS_USER_INFO_IDS
)
return cas_users_import_data(users_infos=users_infos)
def cas_users_import_data(users_infos: list[dict]) -> int:
"""Import informations configuration CAS
users est une liste de dict, on utilise seulement les champs:
- user_name : la clé, l'utilisateur DOIT déjà exister
- cas_id : l'ID CAS a enregistrer.
- cas_allow_login
- cas_allow_scodoc_login
Les éventuels autres champs sont ignorés.
Return: nb de comptes modifiés.
"""
nb_modif = 0
users = []
for info in users_infos:
user: User = User.query.filter_by(user_name=info["user_name"]).first()
if not user:
db.session.rollback() # au cas où auto-flush
raise ScoValueError(f"""Utilisateur '{info["user_name"]}' inexistant""")
modif = False
new_cas_id = info["cas_id"].strip()
if new_cas_id != (user.cas_id or ""):
# check unicity
other = User.query.filter_by(cas_id=new_cas_id).first()
if other and other.id != user.id:
db.session.rollback() # au cas où auto-flush
raise ScoValueError(f"cas_id {new_cas_id} dupliqué")
user.cas_id = info["cas_id"].strip() or None
modif = True
val = scu.to_bool(info["cas_allow_login"])
if val != user.cas_allow_login:
user.cas_allow_login = val
modif = True
val = scu.to_bool(info["cas_allow_scodoc_login"])
if val != user.cas_allow_scodoc_login:
user.cas_allow_scodoc_login = val
modif = True
val = scu.to_bool(info["active"])
if val != (user.active or False):
user.active = val
modif = True
if modif:
nb_modif += 1
# Record modifications
for user in users:
try:
db.session.add(user)
except Exception as exc:
db.session.rollback()
raise ScoValueError(
"Erreur (1) durant l'importation des modifications"
) from exc
try:
db.session.commit()
except Exception as exc:
db.session.rollback()
raise ScoValueError(
"Erreur (2) durant l'importation des modifications"
) from exc
return nb_modif