From d8a7f8affa19f93878f8ac9d1ca56070e46a1342 Mon Sep 17 00:00:00 2001
From: Emmanuel Viennet
Date: Sun, 26 Feb 2023 21:24:07 +0100
Subject: [PATCH] Connexion au CAS (WIP)
---
app/__init__.py | 22 ++--
app/auth/__init__.py | 1 +
app/auth/cas.py | 71 ++++++++++++
app/auth/logic.py | 3 +-
app/auth/models.py | 31 ++++-
app/auth/routes.py | 17 ++-
app/forms/main/config_cas.py | 53 +++++++++
app/models/config.py | 50 +++++++-
app/scodoc/sco_etud.py | 3 +
app/scodoc/sco_users.py | 96 ++++++++--------
app/templates/auth/login.j2 | 6 +
app/templates/auth/user_info_page.j2 | 15 ++-
app/templates/config_cas.j2 | 18 +++
app/templates/configuration.j2 | 3 +-
app/views/scodoc.py | 28 +++++
app/views/users.py | 41 ++++++-
config.py | 4 +
flask_cas/README.md | 7 ++
flask_cas/__init__.py | 106 +++++++++++++++++
flask_cas/cas_urls.py | 125 ++++++++++++++++++++
flask_cas/routing.py | 165 +++++++++++++++++++++++++++
requirements-3.9.txt | 1 +
sco_version.py | 1 +
23 files changed, 796 insertions(+), 71 deletions(-)
create mode 100644 app/auth/cas.py
create mode 100644 app/forms/main/config_cas.py
create mode 100644 app/templates/config_cas.j2
create mode 100644 flask_cas/README.md
create mode 100644 flask_cas/__init__.py
create mode 100644 flask_cas/cas_urls.py
create mode 100644 flask_cas/routing.py
diff --git a/app/__init__.py b/app/__init__.py
index 72cc1cb7..a1edfbd7 100644
--- a/app/__init__.py
+++ b/app/__init__.py
@@ -17,15 +17,17 @@ from flask import current_app, g, request
from flask import Flask
from flask import abort, flash, has_request_context, jsonify
from flask import render_template
-from flask.json import JSONEncoder
-from flask.logging import default_handler
-from flask_sqlalchemy import SQLAlchemy
-from flask_migrate import Migrate
+from flask_bootstrap import Bootstrap
+from flask_caching import Cache
+from flask_cas import CAS
from flask_login import LoginManager, current_user
from flask_mail import Mail
-from flask_bootstrap import Bootstrap
+from flask_migrate import Migrate
from flask_moment import Moment
-from flask_caching import Cache
+from flask_sqlalchemy import SQLAlchemy
+from flask.json import JSONEncoder
+from flask.logging import default_handler
+
from jinja2 import select_autoescape
import sqlalchemy
@@ -132,7 +134,7 @@ class ScoDocJSONEncoder(JSONEncoder):
def render_raw_html(template_filename: str, **args) -> str:
"""Load and render an HTML file _without_ using Flask
- Necessary for 503 error mesage, when DB is down and Flask may be broken.
+ Necessary for 503 error message, when DB is down and Flask may be broken.
"""
template_path = os.path.join(
current_app.config["SCODOC_DIR"],
@@ -226,6 +228,7 @@ class ReverseProxied(object):
def create_app(config_class=DevConfig):
app = Flask(__name__, static_url_path="/ScoDoc/static", static_folder="static")
+ CAS(app, url_prefix="/cas")
app.wsgi_app = ReverseProxied(app.wsgi_app)
app.json_encoder = ScoDocJSONEncoder
app.logger.setLevel(logging.INFO)
@@ -378,6 +381,11 @@ def create_app(config_class=DevConfig):
sco_bulletins_generator.register_bulletin_class(BulletinGeneratorExample)
+ from app.auth.cas import set_cas_configuration
+
+ with app.app_context():
+ set_cas_configuration(app)
+
return app
diff --git a/app/auth/__init__.py b/app/auth/__init__.py
index 4fb9575d..7dcff2b3 100644
--- a/app/auth/__init__.py
+++ b/app/auth/__init__.py
@@ -6,3 +6,4 @@ from flask import Blueprint
bp = Blueprint("auth", __name__)
from app.auth import routes
+from app.auth import cas
diff --git a/app/auth/cas.py b/app/auth/cas.py
new file mode 100644
index 00000000..067ea350
--- /dev/null
+++ b/app/auth/cas.py
@@ -0,0 +1,71 @@
+# -*- coding: UTF-8 -*
+"""
+auth.cas.py
+"""
+import datetime
+
+import flask
+from flask import current_app, flash, url_for
+from flask_login import login_user
+
+from app.auth import bp
+from app.auth.models import User
+from app.models.config import ScoDocSiteConfig
+
+# after_cas_login/after_cas_logout : routes appelées par redirect depuis le serveur CAS.
+
+
+@bp.route("/after_cas_login")
+def after_cas_login():
+ "Called by CAS after CAS authentication"
+ # Ici on a les infos dans flask.session["CAS_ATTRIBUTES"]
+ if ScoDocSiteConfig.is_cas_enabled() and ("CAS_ATTRIBUTES" in flask.session):
+ # Lookup user:
+ cas_id = flask.session["CAS_ATTRIBUTES"].get(
+ "cas:" + ScoDocSiteConfig.get("cas_attribute_id")
+ )
+ if cas_id is not None:
+ user = User.query.filter_by(cas_id=cas_id).first()
+ if user and user.active:
+ if user.cas_allow_login:
+ current_app.logger.info(f"CAS: login {user.user_name}")
+ if login_user(user):
+ flask.session[
+ "scodoc_cas_login_date"
+ ] = datetime.datetime.now().isoformat()
+ return flask.redirect(url_for("scodoc.index"))
+ else:
+ current_app.logger.info(
+ f"CAS login denied for {user.user_name} (not allowed to use CAS)"
+ )
+ else:
+ current_app.logger.info(
+ f"""CAS login denied for {user.user_name if user else ""} cas_id={cas_id} (unknown or inactive)"""
+ )
+
+ # Echec:
+ flash("échec de l'authentification")
+ return flask.redirect(url_for("auth.login"))
+
+
+@bp.route("/after_cas_logout")
+def after_cas_logout():
+ "Called by CAS after CAS logout"
+ flash("Vous êtes déconnecté")
+ current_app.logger.info("after_cas_logout")
+ return flask.redirect(url_for("scodoc.index"))
+
+
+def set_cas_configuration(app: flask.app.Flask):
+ """Force la configuration du module flask_cas à partir des paramètres de
+ la config de ScoDoc.
+ Appelé au démarrage et à chaque modif des paramètres.
+ """
+ if ScoDocSiteConfig.is_cas_enabled():
+ app.config["CAS_SERVER"] = ScoDocSiteConfig.get("cas_server")
+ app.config["CAS_AFTER_LOGIN"] = "auth.after_cas_login"
+ app.config["CAS_AFTER_LOGOUT"] = "auth.after_cas_logout"
+ else:
+ app.config.pop("CAS_SERVER", None)
+ app.config.pop("CAS_AFTER_LOGIN", None)
+ app.config.pop("CAS_AFTER_LOGOUT", None)
diff --git a/app/auth/logic.py b/app/auth/logic.py
index ba3f73a4..a1737a3b 100644
--- a/app/auth/logic.py
+++ b/app/auth/logic.py
@@ -8,9 +8,10 @@ import flask
from flask import g, redirect, request, url_for
from flask_httpauth import HTTPBasicAuth, HTTPTokenAuth
import flask_login
+
from app import login
-from app.scodoc.sco_utils import json_error
from app.auth.models import User
+from app.scodoc.sco_utils import json_error
basic_auth = HTTPBasicAuth()
token_auth = HTTPTokenAuth()
diff --git a/app/auth/models.py b/app/auth/models.py
index 274be8e7..866f08ff 100644
--- a/app/auth/models.py
+++ b/app/auth/models.py
@@ -53,12 +53,28 @@ class User(UserMixin, db.Model):
id = db.Column(db.Integer, primary_key=True)
user_name = db.Column(db.String(64), index=True, unique=True)
+ "le login"
email = db.Column(db.String(120))
nom = db.Column(db.String(64))
prenom = db.Column(db.String(64))
dept = db.Column(db.String(SHORT_STR_LEN), index=True)
+ "acronyme du département de l'utilisateur"
active = db.Column(db.Boolean, default=True, index=True)
+ "si faux, compte utilisateur désactivé"
+ cas_id = db.Column(db.Text(), index=True, unique=True, nullable=True)
+ "uid sur le CAS (mail ou autre attribut, selon config.cas_attribute_id)"
+ cas_allow_login = db.Column(
+ db.Boolean, default=False, server_default="false", nullable=False
+ )
+ "Peut-on se logguer via le CAS ?"
+ cas_allow_scodoc_login = db.Column(
+ db.Boolean, default=True, server_default="false", nullable=False
+ )
+ """(not yet implemented XXX)
+ si CAS activé, peut-on se logguer sur ScoDoc directement ?
+ (le rôle ScoSuperAdmin peut toujours)
+ """
password_hash = db.Column(db.String(128))
password_scodoc7 = db.Column(db.String(42))
@@ -184,6 +200,9 @@ class User(UserMixin, db.Model):
"dept": self.dept,
"id": self.id,
"active": self.active,
+ "cas_id": self.cas_id,
+ "cas_allow_login": self.cas_allow_login,
+ "cas_allow_scodoc_login": self.cas_allow_scodoc_login,
"status_txt": "actif" if self.active else "fermé",
"last_seen": self.last_seen.isoformat() + "Z" if self.last_seen else None,
"nom": (self.nom or ""), # sco8
@@ -206,7 +225,17 @@ class User(UserMixin, db.Model):
"""Set users' attributes from given dict values.
Roles must be encoded as "roles_string", like "Ens_RT, Secr_CJ"
"""
- for field in ["nom", "prenom", "dept", "active", "email", "date_expiration"]:
+ for field in [
+ "nom",
+ "prenom",
+ "dept",
+ "active",
+ "email",
+ "date_expiration",
+ "cas_id",
+ "cas_allow_login",
+ "cas_allow_scodoc_login",
+ ]:
if field in data:
setattr(self, field, data[field] or None)
if new_user:
diff --git a/app/auth/routes.py b/app/auth/routes.py
index 2c1594bc..b6a0df9a 100644
--- a/app/auth/routes.py
+++ b/app/auth/routes.py
@@ -3,6 +3,7 @@
auth.routes.py
"""
+import flask
from flask import current_app, flash, render_template
from flask import redirect, url_for, request
from flask_login import login_user, logout_user, current_user
@@ -20,6 +21,7 @@ from app.auth.models import Role
from app.auth.models import User
from app.auth.email import send_password_reset_email
from app.decorators import admin_required
+from app.models.config import ScoDocSiteConfig
_ = lambda x: x # sans babel
_l = _
@@ -30,6 +32,7 @@ def login():
"ScoDoc Login form"
if current_user.is_authenticated:
return redirect(url_for("scodoc.index"))
+
form = LoginForm()
if form.validate_on_submit():
user = User.query.filter_by(user_name=form.user_name.data).first()
@@ -42,14 +45,22 @@ def login():
return form.redirect("scodoc.index")
message = request.args.get("message", "")
return render_template(
- "auth/login.j2", title=_("Sign In"), form=form, message=message
+ "auth/login.j2",
+ title=_("Sign In"),
+ form=form,
+ message=message,
+ is_cas_enabled=ScoDocSiteConfig.is_cas_enabled(),
)
@bp.route("/logout")
-def logout():
- "Logout current user and redirect to home page"
+def logout() -> flask.Response:
+ "Logout a scodoc user. If CAS session, logout from CAS. Redirect."
+ current_app.logger.info(f"logout user {current_user.user_name}")
logout_user()
+ if ScoDocSiteConfig.is_cas_enabled() and flask.session.get("scodoc_cas_login_date"):
+ flask.session.pop("scodoc_cas_login_date", None)
+ return redirect(url_for("cas.logout"))
return redirect(url_for("scodoc.index"))
diff --git a/app/forms/main/config_cas.py b/app/forms/main/config_cas.py
new file mode 100644
index 00000000..663e487b
--- /dev/null
+++ b/app/forms/main/config_cas.py
@@ -0,0 +1,53 @@
+# -*- mode: python -*-
+# -*- coding: utf-8 -*-
+
+##############################################################################
+#
+# ScoDoc
+#
+# Copyright (c) 1999 - 2023 Emmanuel Viennet. All rights reserved.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+# Emmanuel Viennet emmanuel.viennet@viennet.net
+#
+##############################################################################
+
+"""
+Formulaires configuration Exports Apogée (codes)
+"""
+
+from flask_wtf import FlaskForm
+from wtforms import BooleanField, SubmitField
+from wtforms.fields.simple import StringField
+
+
+class ConfigCASForm(FlaskForm):
+ "Formulaire paramétrage CAS"
+ cas_enable = BooleanField("activer le CAS")
+
+ cas_server = StringField(
+ label="URL du serveur CAS",
+ description="""url complète. Commence en général par https://.""",
+ )
+
+ cas_attribute_id = StringField(
+ label="Attribut CAS utilisé comme id",
+ description="""Le champs CAS qui sera considéré comme l'id unique des
+ comptes utilisateurs.""",
+ )
+
+ submit = SubmitField("Valider")
+ cancel = SubmitField("Annuler", render_kw={"formnovalidate": True})
diff --git a/app/models/config.py b/app/models/config.py
index f4f09cc3..bbe0c0ae 100644
--- a/app/models/config.py
+++ b/app/models/config.py
@@ -87,6 +87,10 @@ class ScoDocSiteConfig(db.Model):
"enable_entreprises": bool,
"month_debut_annee_scolaire": int,
"month_debut_periode2": int,
+ # CAS
+ "cas_enable": bool,
+ "cas_server": str,
+ "cas_attribute_id": str,
}
def __init__(self, name, value):
@@ -170,7 +174,7 @@ class ScoDocSiteConfig(db.Model):
(starting with empty string to represent "no bonus function").
"""
d = bonus_spo.get_bonus_class_dict()
- class_list = [(name, d[name].displayed_name) for name in d.keys()]
+ class_list = [(name, d[name].displayed_name) for name in d]
class_list.sort(key=lambda x: x[1].replace(" du ", " de "))
return [("", "")] + class_list
@@ -204,13 +208,31 @@ class ScoDocSiteConfig(db.Model):
db.session.add(cfg)
db.session.commit()
+ @classmethod
+ def is_cas_enabled(cls) -> bool:
+ """True si on utilise le CAS"""
+ cfg = ScoDocSiteConfig.query.filter_by(name="cas_enable").first()
+ return cfg is not None and cfg.value
+
+ @classmethod
+ def cas_enable(cls, enabled=True) -> bool:
+ """Active (ou déactive) le CAS. True si changement."""
+ if enabled != ScoDocSiteConfig.is_cas_enabled():
+ cfg = ScoDocSiteConfig.query.filter_by(name="cas_enable").first()
+ if cfg is None:
+ cfg = ScoDocSiteConfig(name="cas_enable", value="on" if enabled else "")
+ else:
+ cfg.value = "on" if enabled else ""
+ db.session.add(cfg)
+ db.session.commit()
+ return True
+ return False
+
@classmethod
def is_entreprises_enabled(cls) -> bool:
"""True si on doit activer le module entreprise"""
cfg = ScoDocSiteConfig.query.filter_by(name="enable_entreprises").first()
- if (cfg is None) or not cfg.value:
- return False
- return True
+ return cfg is not None and cfg.value
@classmethod
def enable_entreprises(cls, enabled=True) -> bool:
@@ -228,6 +250,26 @@ class ScoDocSiteConfig(db.Model):
return True
return False
+ @classmethod
+ def get(cls, name: str) -> str:
+ "Get configuration param; empty string if unset"
+ cfg = ScoDocSiteConfig.query.filter_by(name=name).first()
+ return (cfg.value or "") if cfg else ""
+
+ @classmethod
+ def set(cls, name: str, value: str) -> bool:
+ "Set parameter, returns True if change. Commit session."
+ if cls.get(name) != (value or ""):
+ cfg = ScoDocSiteConfig.query.filter_by(name=name).first()
+ if cfg is None:
+ cfg = ScoDocSiteConfig(name=name, value=str(value))
+ else:
+ cfg.value = str(value or "")
+ db.session.add(cfg)
+ db.session.commit()
+ return True
+ return False
+
@classmethod
def _get_int_field(cls, name: str, default=None) -> int:
"""Valeur d'un champs integer"""
diff --git a/app/scodoc/sco_etud.py b/app/scodoc/sco_etud.py
index 6d9bc49f..a4240d4b 100644
--- a/app/scodoc/sco_etud.py
+++ b/app/scodoc/sco_etud.py
@@ -225,6 +225,9 @@ _identiteEditor = ndb.EditableTable(
"nom",
"nom_usuel",
"prenom",
+ "cas_id",
+ "cas_allow_login",
+ "cas_allow_scodoc_login",
"civilite", # 'M", "F", or "X"
"date_naissance",
"lieu_naissance",
diff --git a/app/scodoc/sco_users.py b/app/scodoc/sco_users.py
index d3db5b8e..53a298bb 100644
--- a/app/scodoc/sco_users.py
+++ b/app/scodoc/sco_users.py
@@ -28,11 +28,10 @@
"""Fonctions sur les utilisateurs
"""
-# Anciennement ZScoUsers.py, fonctions de gestion des données réécrite avec flask/SQLAlchemy
+# Anciennement ZScoUsers.py, fonctions de gestion des données réécrites avec flask/SQLAlchemy
import re
from flask import url_for, g, request
-from flask.templating import render_template
from flask_login import current_user
@@ -42,23 +41,11 @@ from app.auth.models import Permission
from app.auth.models import User
from app.scodoc import html_sco_header
-from app.scodoc import sco_etud
-from app.scodoc import sco_excel
from app.scodoc import sco_preferences
from app.scodoc.gen_tables import GenTable
-from app import log, cache
-from app.scodoc.scolog import logdb
-import app.scodoc.sco_utils as scu
+from app import cache
-from app.scodoc.sco_exceptions import (
- AccessDenied,
- ScoValueError,
-)
-
-
-# ---------------
-
-# ---------------
+from app.scodoc.sco_exceptions import ScoValueError
def index_html(all_depts=False, with_inactives=False, format="html"):
@@ -70,19 +57,21 @@ def index_html(all_depts=False, with_inactives=False, format="html"):
if current_user.has_permission(Permission.ScoUsersAdmin, g.scodoc_dept):
H.append(
- 'Ajouter un utilisateur'.format(
- url_for("users.create_user_form", scodoc_dept=g.scodoc_dept)
- )
+ f"""
Ajouter un utilisateur"""
)
if current_user.is_administrator():
H.append(
- ' Importer des utilisateurs
'.format(
- url_for("users.import_users_form", scodoc_dept=g.scodoc_dept)
- )
+ """ Importer des utilisateurs
"""
)
+
else:
H.append(
- " Pour importer des utilisateurs en masse (via xlsx file) contactez votre administrateur scodoc."
+ """ Pour importer des utilisateurs en masse (via fichier xlsx)
+ contactez votre administrateur scodoc."""
)
if all_depts:
checked = "checked"
@@ -93,11 +82,12 @@ def index_html(all_depts=False, with_inactives=False, format="html"):
else:
olds_checked = ""
H.append(
- """"""
- % (request.base_url, checked, olds_checked)
)
L = list_users(
@@ -189,9 +179,8 @@ def list_users(
},
caption=title,
page_title="title",
- html_title="""%d utilisateurs %s
- Cliquer sur un nom pour changer son mot de passe
"""
- % (len(r), comm),
+ html_title=f"""{len(r)} utilisateurs {comm}
+ Cliquer sur un nom pour changer son mot de passe
""",
html_class="table_leftalign list_users",
html_with_td_classes=True,
html_sortable=True,
@@ -273,6 +262,9 @@ def user_info(user_name_or_id=None, user: User = None):
return info
+MSG_OPT = """
Attention: (vous pouvez forcer l'opération en cochant "Ignorer les avertissements" en bas de page)"""
+
+
def check_modif_user(
edit,
enforce_optionals=False,
@@ -281,7 +273,8 @@ def check_modif_user(
prenom="",
email="",
dept="",
- roles=[],
+ roles: list = None,
+ cas_id: str = None,
):
"""Vérifie que cet utilisateur peut être créé (edit=0) ou modifié (edit=1)
Cherche homonymes.
@@ -290,32 +283,31 @@ def check_modif_user(
(si ok est faux, l'utilisateur peut quand même forcer la creation)
- msg: message warning à presenter à l'utilisateur
"""
- MSG_OPT = """
Attention: (vous pouvez forcer l'opération en cochant "Ignorer les avertissements" en bas de page)"""
+ roles = roles or []
# ce login existe ?
- user = _user_list(user_name)
+ user = User.query.filter_by(user_name=user_name).first()
if edit and not user: # safety net, le user_name ne devrait pas changer
- return False, "identifiant %s inexistant" % user_name
+ return False, f"identifiant {user_name} inexistant"
if not edit and user:
- return False, "identifiant %s déjà utilisé" % user_name
+ return False, f"identifiant {user_name} déjà utilisé"
if not user_name or not nom or not prenom:
return False, "champ requis vide"
if not re.match(r"^[a-zA-Z0-9@\\\-_\\\.]*$", user_name):
return (
False,
- "identifiant '%s' invalide (pas d'accents ni de caractères spéciaux)"
- % user_name,
+ f"identifiant '{user_name}' invalide (pas d'accents ni de caractères spéciaux)",
)
if enforce_optionals and len(user_name) > 64:
- return False, "identifiant '%s' trop long (64 caractères)" % user_name
+ return False, f"identifiant '{user_name}' trop long (64 caractères)"
if enforce_optionals and len(nom) > 64:
- return False, "nom '%s' trop long (64 caractères)" % nom + MSG_OPT
+ return False, f"nom '{nom}' trop long (64 caractères)" + MSG_OPT
if enforce_optionals and len(prenom) > 64:
- return False, "prenom '%s' trop long (64 caractères)" % prenom + MSG_OPT
- # check that tha same user_name has not already been described in this import
+ return False, f"prenom '{prenom}' trop long (64 caractères)" + MSG_OPT
+ # check that same user_name has not already been described in this import
if not email:
return False, "vous devriez indiquer le mail de l'utilisateur créé !"
if len(email) > 120:
- return False, "email '%s' trop long (120 caractères)" % email
+ return False, f"email '{email}' trop long (120 caractères)"
if not re.fullmatch(r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b", email):
return False, "l'adresse mail semble incorrecte"
# check département
@@ -324,19 +316,31 @@ def check_modif_user(
and dept
and Departement.query.filter_by(acronym=dept).first() is None
):
- return False, "département '%s' inexistant" % dept + MSG_OPT
+ return False, f"département '{dept}' inexistant" + MSG_OPT
if enforce_optionals and not roles:
return False, "aucun rôle sélectionné, êtes vous sûr ?" + MSG_OPT
# Unicité du mail
users_with_this_mail = User.query.filter_by(email=email).all()
if edit: # modification
- if email != user["email"] and len(users_with_this_mail) > 0:
+ if email != user.email and len(users_with_this_mail) > 0:
return False, "un autre utilisateur existe déjà avec cette adresse mail"
else: # création utilisateur
if len(users_with_this_mail) > 0:
return False, "un autre utilisateur existe déjà avec cette adresse mail"
- # ok
+ # Unicité du cas_id
+ if cas_id:
+ cas_users = User.query.filter_by(cas_id=cas_id).all()
+ if edit:
+ if cas_users and (
+ len(cas_users) > 1 or cas_users[0].user_name != user_name
+ ):
+ return (
+ False,
+ "un autre utilisateur existe déjà avec cet identifiant CAS",
+ )
+ elif cas_users:
+ return False, "un autre utilisateur existe déjà avec cet identifiant CAS"
# Des noms/prénoms semblables existent ?
nom = nom.lower().strip()
prenom = prenom.lower().strip()
@@ -367,7 +371,7 @@ def user_edit(user_name, vals):
"""Edit the user specified by user_name
(ported from Zope to SQLAlchemy, hence strange !)
"""
- u = User.query.filter_by(user_name=user_name).first()
+ u: User = User.query.filter_by(user_name=user_name).first()
if not u:
raise ScoValueError("Invalid user_name")
u.from_dict(vals)
diff --git a/app/templates/auth/login.j2 b/app/templates/auth/login.j2
index 2877f672..8f9f4a8c 100644
--- a/app/templates/auth/login.j2
+++ b/app/templates/auth/login.j2
@@ -9,6 +9,12 @@
{% endif %}
Connexion
+
+{% if is_cas_enabled %}
+
+{% endif %}
{{ wtf.quick_form(form) }}
diff --git a/app/templates/auth/user_info_page.j2 b/app/templates/auth/user_info_page.j2
index 23e8c27b..d5284579 100644
--- a/app/templates/auth/user_info_page.j2
+++ b/app/templates/auth/user_info_page.j2
@@ -7,6 +7,9 @@
Utilisateur: {{user.user_name}} ({{'actif' if user.active else 'fermé'}})
Login : {{user.user_name}}
+ CAS id: {{user.cas_id or "(aucun)"}}
+ (CAS {{'autorisé' if user.cas_allow_login else 'interdit'}} pour cet utilisateur)
+
Nom : {{user.nom or ""}}
Prénom : {{user.prenom or ""}}
Mail : {{user.email}}
@@ -48,9 +51,15 @@
{% if current_user.id == user.id %}
-
Se déconnecter:
- logout
-
+
+ {% if session_info %}
+
CAS session started at {{ session_info }}
+ {% endif %}
+
Se déconnecter:
+ logout
+
+
+
{% endif %}
{# Liste des permissions #}
diff --git a/app/templates/config_cas.j2 b/app/templates/config_cas.j2
new file mode 100644
index 00000000..2cccd0de
--- /dev/null
+++ b/app/templates/config_cas.j2
@@ -0,0 +1,18 @@
+{% extends "base.j2" %}
+{% import 'bootstrap/wtf.html' as wtf %}
+
+{% block app_content %}
+
Configuration du Service d'Authentification Central (CAS)
+
+
+
+
+
+ {{ wtf.quick_form(form) }}
+
+
+
+
+{% endblock %}
\ No newline at end of file
diff --git a/app/templates/configuration.j2 b/app/templates/configuration.j2
index 05567ef3..94726c62 100644
--- a/app/templates/configuration.j2
+++ b/app/templates/configuration.j2
@@ -53,8 +53,9 @@
-
Utilisateurs
+
Utilisateurs et CAS
+ configuration du service CAS
remettre
les permissions des rôles standards à leurs valeurs par défaut (efface les modifications apportées)
diff --git a/app/views/scodoc.py b/app/views/scodoc.py
index 83055b50..6a9c5e02 100644
--- a/app/views/scodoc.py
+++ b/app/views/scodoc.py
@@ -62,6 +62,7 @@ from app.decorators import (
from app.forms.main import config_logos, config_main
from app.forms.main.create_dept import CreateDeptForm
from app.forms.main.config_apo import CodesDecisionsForm
+from app.forms.main.config_cas import ConfigCASForm
from app import models
from app.models import Departement, Identite
from app.models import departements
@@ -134,6 +135,33 @@ def toggle_dept_vis(dept_id):
return redirect(url_for("scodoc.index"))
+@bp.route("/ScoDoc/config_cas", methods=["GET", "POST"])
+@admin_required
+def config_cas():
+ """Form config CAS"""
+ form = ConfigCASForm()
+ if request.method == "POST" and form.cancel.data: # cancel button
+ return redirect(url_for("scodoc.index"))
+ if form.validate_on_submit():
+ if ScoDocSiteConfig.cas_enable(enabled=form.data["cas_enable"]):
+ flash("CAS " + ("activé" if form.data["cas_enable"] else "désactivé"))
+ if ScoDocSiteConfig.set("cas_server", form.data["cas_server"]):
+ flash("Serveur CAS enregistré")
+ if ScoDocSiteConfig.set("cas_attribute_id", form.data["cas_attribute_id"]):
+ flash("Serveur CAS enregistré")
+
+ return redirect(url_for("scodoc.configuration"))
+ elif request.method == "GET":
+ form.cas_enable.data = ScoDocSiteConfig.get("cas_enable")
+ form.cas_server.data = ScoDocSiteConfig.get("cas_server")
+ form.cas_attribute_id.data = ScoDocSiteConfig.get("cas_attribute_id")
+ return render_template(
+ "config_cas.j2",
+ form=form,
+ title="Configuration du Service d'Authentification Central (CAS)",
+ )
+
+
@bp.route("/ScoDoc/config_codes_decisions", methods=["GET", "POST"])
@admin_required
def config_codes_decisions():
diff --git a/app/views/users.py b/app/views/users.py
index f6d0bcc7..7efc08e1 100644
--- a/app/views/users.py
+++ b/app/views/users.py
@@ -56,6 +56,7 @@ from app.auth.models import UserRole
from app.auth.models import is_valid_password
from app.email import send_email
from app.models import Departement
+from app.models.config import ScoDocSiteConfig
from app.decorators import (
scodoc,
@@ -226,7 +227,7 @@ def create_user_form(user_name=None, edit=0, all_roles=True):
if edit:
if not user_name:
raise ValueError("missing argument: user_name")
- the_user = User.query.filter_by(user_name=user_name).first()
+ the_user: User = User.query.filter_by(user_name=user_name).first()
if not the_user:
raise ScoValueError("utilisateur inexistant")
initvalues = the_user.to_dict()
@@ -367,6 +368,7 @@ def create_user_form(user_name=None, edit=0, all_roles=True):
{"input_type": "hidden", "default": initvalues["user_name"]},
)
]
+ cas_enabled = ScoDocSiteConfig.is_cas_enabled()
descr += [
(
"email",
@@ -376,11 +378,34 @@ def create_user_form(user_name=None, edit=0, all_roles=True):
"explanation": "requis, doit fonctionner"
if not edit_only_roles
else "",
- "size": 20,
+ "size": 36,
"allow_null": False,
"readonly": edit_only_roles,
},
- )
+ ),
+ (
+ "cas_id",
+ {
+ "title": "Identifiant CAS",
+ "input_type": "text",
+ "explanation": "id du compte utilisateur sur le CAS de l'établissement "
+ + "(service CAS activé)"
+ if cas_enabled
+ else "(service CAS non activé)",
+ "size": 36,
+ "allow_null": True,
+ "readonly": not cas_enabled,
+ },
+ ),
+ (
+ "cas_allow_login",
+ {
+ "title": "Autorise connexion via CAS",
+ "input_type": "boolcheckbox",
+ "explanation": "en test: seul le super-administrateur peut changer ce réglage",
+ "readonly": not current_user.is_administrator(),
+ },
+ ),
]
if not edit: # options création utilisateur
descr += [
@@ -438,7 +463,8 @@ def create_user_form(user_name=None, edit=0, all_roles=True):
"d",
{
"input_type": "separator",
- "title": f"""L'utilisateur appartient au département {the_user.dept or "(tous)"}""",
+ "title": f"""L'utilisateur appartient au département {
+ the_user.dept or "(tous)"}""",
},
)
)
@@ -541,7 +567,6 @@ def create_user_form(user_name=None, edit=0, all_roles=True):
if err_msg:
H.append(tf_error_message(f"""Erreur: {err_msg}"""))
return "\n".join(H) + "\n" + tf[1] + F
-
if not edit_only_roles:
ok_modif, msg = sco_users.check_modif_user(
edit,
@@ -552,6 +577,7 @@ def create_user_form(user_name=None, edit=0, all_roles=True):
email=vals["email"],
dept=vals.get("dept", auth_dept),
roles=vals["roles"],
+ cas_id=vals["cas_id"],
)
if not ok_modif:
H.append(tf_error_message(msg))
@@ -815,12 +841,17 @@ def user_info_page(user_name=None):
if not user:
raise ScoValueError("invalid user_name")
+ session_info = None
+ if user.id == current_user.id:
+ session_info = flask.session.get("scodoc_cas_login_date")
+
return render_template(
"auth/user_info_page.j2",
user=user,
title=f"Utilisateur {user.user_name}",
Permission=Permission,
dept=dept,
+ session_info=session_info,
)
diff --git a/config.py b/config.py
index 740860a6..584a6843 100755
--- a/config.py
+++ b/config.py
@@ -43,6 +43,10 @@ class Config:
# STATIC_URL_PATH = "/ScoDoc/static"
# static_folder = "stat"
# SERVER_NAME = os.environ.get("SERVER_NAME")
+ # XXX temporaire: utiliser SiteConfig
+ CAS_SERVER = os.environ.get("CAS_SERVER")
+ CAS_AFTER_LOGIN = os.environ.get("CAS_AFTER_LOGIN")
+ CAS_AFTER_LOGOUT = os.environ.get("CAS_AFTER_LOGOUT")
class ProdConfig(Config):
diff --git a/flask_cas/README.md b/flask_cas/README.md
new file mode 100644
index 00000000..d71e8b50
--- /dev/null
+++ b/flask_cas/README.md
@@ -0,0 +1,7 @@
+# Flask-CAS
+
+Forked from
+and adapted by Emmanuel Viennet, Feb. 2023.
+
+- logout: clear `_CAS_TOKEN`.
+- Use `url` instead of `service` parameter in logout URL.
diff --git a/flask_cas/__init__.py b/flask_cas/__init__.py
new file mode 100644
index 00000000..76e02c25
--- /dev/null
+++ b/flask_cas/__init__.py
@@ -0,0 +1,106 @@
+"""
+flask_cas.__init__
+"""
+
+import flask
+from flask import current_app
+
+# Find the stack on which we want to store the database connection.
+# Starting with Flask 0.9, the _app_ctx_stack is the correct one,
+# before that we need to use the _request_ctx_stack.
+try:
+ from flask import _app_ctx_stack as stack
+except ImportError:
+ from flask import _request_ctx_stack as stack
+
+from . import routing
+
+from functools import wraps
+
+class CAS(object):
+ """
+ Required Configs:
+
+ |Key |
+ |----------------|
+ |CAS_SERVER |
+ |CAS_AFTER_LOGIN |
+
+ Optional Configs:
+
+ |Key | Default |
+ |---------------------------|-----------------------|
+ |CAS_TOKEN_SESSION_KEY | _CAS_TOKEN |
+ |CAS_USERNAME_SESSION_KEY | CAS_USERNAME |
+ |CAS_ATTRIBUTES_SESSION_KEY | CAS_ATTRIBUTES |
+ |CAS_LOGIN_ROUTE | '/cas' |
+ |CAS_LOGOUT_ROUTE | '/cas/logout' |
+ |CAS_VALIDATE_ROUTE | '/cas/serviceValidate'|
+ |CAS_AFTER_LOGOUT | None |
+ """
+
+ def __init__(self, app=None, url_prefix=None):
+ self._app = app
+ if app is not None:
+ self.init_app(app, url_prefix)
+
+ def init_app(self, app, url_prefix=None):
+ # Configuration defaults
+ app.config.setdefault('CAS_TOKEN_SESSION_KEY', '_CAS_TOKEN')
+ app.config.setdefault('CAS_USERNAME_SESSION_KEY', 'CAS_USERNAME')
+ app.config.setdefault('CAS_ATTRIBUTES_SESSION_KEY', 'CAS_ATTRIBUTES')
+ app.config.setdefault('CAS_LOGIN_ROUTE', '/cas')
+ app.config.setdefault('CAS_LOGOUT_ROUTE', '/cas/logout')
+ app.config.setdefault('CAS_VALIDATE_ROUTE', '/cas/serviceValidate')
+ # Requires CAS 2.0
+ app.config.setdefault('CAS_AFTER_LOGOUT', None)
+ # Register Blueprint
+ app.register_blueprint(routing.blueprint, url_prefix=url_prefix)
+
+ # Use the newstyle teardown_appcontext if it's available,
+ # otherwise fall back to the request context
+ if hasattr(app, 'teardown_appcontext'):
+ app.teardown_appcontext(self.teardown)
+ else:
+ app.teardown_request(self.teardown)
+
+ def teardown(self, exception):
+ ctx = stack.top
+
+ @property
+ def app(self):
+ return self._app or current_app
+
+ @property
+ def username(self):
+ return flask.session.get(
+ self.app.config['CAS_USERNAME_SESSION_KEY'], None)
+
+ @property
+ def attributes(self):
+ return flask.session.get(
+ self.app.config['CAS_ATTRIBUTES_SESSION_KEY'], None)
+
+ @property
+ def token(self):
+ return flask.session.get(
+ self.app.config['CAS_TOKEN_SESSION_KEY'], None)
+
+def login():
+ return flask.redirect(flask.url_for('cas.login', _external=True))
+
+def logout():
+ return flask.redirect(flask.url_for('cas.logout', _external=True))
+
+def login_required(function):
+ @wraps(function)
+ def wrap(*args, **kwargs):
+ if 'CAS_USERNAME' not in flask.session:
+ flask.session['CAS_AFTER_LOGIN_SESSION_URL'] = (
+ flask.request.script_root +
+ flask.request.full_path
+ )
+ return login()
+ else:
+ return function(*args, **kwargs)
+ return wrap
diff --git a/flask_cas/cas_urls.py b/flask_cas/cas_urls.py
new file mode 100644
index 00000000..276ef89c
--- /dev/null
+++ b/flask_cas/cas_urls.py
@@ -0,0 +1,125 @@
+"""
+flask_cas.cas_urls
+
+Functions for creating urls to access CAS.
+"""
+
+try:
+ from urllib import quote
+ from urllib import urlencode
+ from urlparse import urljoin
+except ImportError:
+ from urllib.parse import quote
+ from urllib.parse import urljoin
+ from urllib.parse import urlencode
+
+
+def create_url(base, path=None, *query):
+ """Create a url.
+
+ Creates a url by combining base, path, and the query's list of
+ key/value pairs. Escaping is handled automatically. Any
+ key/value pair with a value that is None is ignored.
+
+ Keyword arguments:
+ base -- The left most part of the url (ex. http://localhost:5000).
+ path -- The path after the base (ex. /foo/bar).
+ query -- A list of key value pairs (ex. [('key', 'value')]).
+
+ Example usage:
+ >>> create_url(
+ ... 'http://localhost:5000',
+ ... 'foo/bar',
+ ... ('key1', 'value'),
+ ... ('key2', None), # Will not include None
+ ... ('url', 'http://example.com'),
+ ... )
+ 'http://localhost:5000/foo/bar?key1=value&url=http%3A%2F%2Fexample.com'
+ """
+ url = base
+ # Add the path to the url if it's not None.
+ if path is not None:
+ url = urljoin(url, quote(path))
+ # Remove key/value pairs with None values.
+ query = filter(lambda pair: pair[1] is not None, query)
+ # Add the query string to the url
+ url = urljoin(url, "?{0}".format(urlencode(list(query))))
+ return url
+
+
+def create_cas_login_url(cas_url, cas_route, service, renew=None, gateway=None):
+ """Create a CAS login URL .
+
+ Keyword arguments:
+ cas_url -- The url to the CAS (ex. http://sso.pdx.edu)
+ cas_route -- The route where the CAS lives on server (ex. /cas)
+ service -- (ex. http://localhost:5000/login)
+ renew -- "true" or "false"
+ gateway -- "true" or "false"
+
+ Example usage:
+ >>> create_cas_login_url(
+ ... 'http://sso.pdx.edu',
+ ... '/cas',
+ ... 'http://localhost:5000',
+ ... )
+ 'http://sso.pdx.edu/cas?service=http%3A%2F%2Flocalhost%3A5000'
+ """
+ return create_url(
+ cas_url,
+ cas_route,
+ ("service", service),
+ ("renew", renew),
+ ("gateway", gateway),
+ )
+
+
+def create_cas_logout_url(cas_url, cas_route, service=None):
+ """Create a CAS logout URL.
+
+ Keyword arguments:
+ cas_url -- The url to the CAS (ex. http://sso.pdx.edu)
+ cas_route -- The route where the CAS lives on server (ex. /cas/logout)
+ url -- (ex. http://localhost:5000/login)
+
+ Example usage:
+ >>> create_cas_logout_url(
+ ... 'http://sso.pdx.edu',
+ ... '/cas/logout',
+ ... 'http://localhost:5000',
+ ... )
+ 'http://sso.pdx.edu/cas/logout?service=http%3A%2F%2Flocalhost%3A5000'
+ """
+ return create_url(
+ cas_url,
+ cas_route,
+ ("url", service), # 'url' (and not 'service'): redirect here after CAS logout
+ )
+
+
+def create_cas_validate_url(cas_url, cas_route, service, ticket, renew=None):
+ """Create a CAS validate URL.
+
+ Keyword arguments:
+ cas_url -- The url to the CAS (ex. http://sso.pdx.edu)
+ cas_route -- The route where the CAS lives on server (ex. /cas/serviceValidate)
+ service -- (ex. http://localhost:5000/login)
+ ticket -- (ex. 'ST-58274-x839euFek492ou832Eena7ee-cas')
+ renew -- "true" or "false"
+
+ Example usage:
+ >>> create_cas_validate_url(
+ ... 'http://sso.pdx.edu',
+ ... '/cas/serviceValidate',
+ ... 'http://localhost:5000/login',
+ ... 'ST-58274-x839euFek492ou832Eena7ee-cas'
+ ... )
+ 'http://sso.pdx.edu/cas/serviceValidate?service=http%3A%2F%2Flocalhost%3A5000%2Flogin&ticket=ST-58274-x839euFek492ou832Eena7ee-cas'
+ """
+ return create_url(
+ cas_url,
+ cas_route,
+ ("service", service),
+ ("ticket", ticket),
+ ("renew", renew),
+ )
diff --git a/flask_cas/routing.py b/flask_cas/routing.py
new file mode 100644
index 00000000..e241dfcc
--- /dev/null
+++ b/flask_cas/routing.py
@@ -0,0 +1,165 @@
+import flask
+from xmltodict import parse
+from flask import current_app
+from .cas_urls import create_cas_login_url
+from .cas_urls import create_cas_logout_url
+from .cas_urls import create_cas_validate_url
+
+
+try:
+ from urllib import urlopen
+except ImportError:
+ from urllib.request import urlopen
+
+blueprint = flask.Blueprint("cas", __name__)
+
+
+@blueprint.route("/login/")
+def login():
+ """
+ This route has two purposes. First, it is used by the user
+ to login. Second, it is used by the CAS to respond with the
+ `ticket` after the user logs in successfully.
+
+ When the user accesses this url, they are redirected to the CAS
+ to login. If the login was successful, the CAS will respond to this
+ route with the ticket in the url. The ticket is then validated.
+ If validation was successful the logged in username is saved in
+ the user's session under the key `CAS_USERNAME_SESSION_KEY` and
+ the user's attributes are saved under the key
+ 'CAS_USERNAME_ATTRIBUTE_KEY'
+ """
+
+ cas_token_session_key = current_app.config["CAS_TOKEN_SESSION_KEY"]
+
+ redirect_url = create_cas_login_url(
+ current_app.config["CAS_SERVER"],
+ current_app.config["CAS_LOGIN_ROUTE"],
+ flask.url_for(
+ ".login",
+ origin=flask.session.get("CAS_AFTER_LOGIN_SESSION_URL"),
+ _external=True,
+ ),
+ )
+
+ if "ticket" in flask.request.args:
+ flask.session[cas_token_session_key] = flask.request.args["ticket"]
+
+ if cas_token_session_key in flask.session:
+
+ if validate(flask.session[cas_token_session_key]):
+ if "CAS_AFTER_LOGIN_SESSION_URL" in flask.session:
+ redirect_url = flask.session.pop("CAS_AFTER_LOGIN_SESSION_URL")
+ elif flask.request.args.get("origin"):
+ redirect_url = flask.request.args["origin"]
+ else:
+ redirect_url = flask.url_for(current_app.config["CAS_AFTER_LOGIN"])
+ else:
+ flask.session.pop(cas_token_session_key, None)
+
+ current_app.logger.debug("Redirecting to: {0}".format(redirect_url))
+
+ return flask.redirect(redirect_url)
+
+
+@blueprint.route("/logout/")
+def logout():
+ """
+ When the user accesses this route they are logged out.
+ """
+
+ cas_username_session_key = current_app.config["CAS_USERNAME_SESSION_KEY"]
+ cas_attributes_session_key = current_app.config["CAS_ATTRIBUTES_SESSION_KEY"]
+ cas_token_session_key = current_app.config["CAS_TOKEN_SESSION_KEY"]
+
+ flask.session.pop(cas_username_session_key, None)
+ flask.session.pop(cas_attributes_session_key, None)
+ flask.session.pop(cas_token_session_key, None) # added by EV
+
+ cas_after_logout = current_app.config["CAS_AFTER_LOGOUT"]
+ if cas_after_logout is not None:
+ # If config starts with http, use it as dest URL.
+ # Else, build Flask URL
+ dest_url = (
+ cas_after_logout
+ if cas_after_logout.startswith("http")
+ else flask.url_for(cas_after_logout, _external=True)
+ )
+ redirect_url = create_cas_logout_url(
+ current_app.config["CAS_SERVER"],
+ current_app.config["CAS_LOGOUT_ROUTE"],
+ dest_url,
+ )
+ else:
+ redirect_url = create_cas_logout_url(
+ current_app.config["CAS_SERVER"], current_app.config["CAS_LOGOUT_ROUTE"]
+ )
+
+ current_app.logger.debug("Redirecting to: {0}".format(redirect_url))
+ return flask.redirect(redirect_url)
+
+
+def validate(ticket):
+ """
+ Will attempt to validate the ticket. If validation fails, then False
+ is returned. If validation is successful, then True is returned
+ and the validated username is saved in the session under the
+ key `CAS_USERNAME_SESSION_KEY` while tha validated attributes dictionary
+ is saved under the key 'CAS_ATTRIBUTES_SESSION_KEY'.
+ """
+
+ cas_username_session_key = current_app.config["CAS_USERNAME_SESSION_KEY"]
+ cas_attributes_session_key = current_app.config["CAS_ATTRIBUTES_SESSION_KEY"]
+
+ current_app.logger.debug("validating token {0}".format(ticket))
+
+ cas_validate_url = create_cas_validate_url(
+ current_app.config["CAS_SERVER"],
+ current_app.config["CAS_VALIDATE_ROUTE"],
+ flask.url_for(
+ ".login",
+ origin=flask.session.get("CAS_AFTER_LOGIN_SESSION_URL"),
+ _external=True,
+ ),
+ ticket,
+ )
+
+ current_app.logger.debug("Making GET request to {0}".format(cas_validate_url))
+
+ xml_from_dict = {}
+ isValid = False
+
+ try:
+ xmldump = urlopen(cas_validate_url).read().strip().decode("utf8", "ignore")
+ xml_from_dict = parse(xmldump)
+ isValid = (
+ True
+ if "cas:authenticationSuccess" in xml_from_dict["cas:serviceResponse"]
+ else False
+ )
+ except ValueError:
+ current_app.logger.error("CAS returned unexpected result")
+
+ if isValid:
+ current_app.logger.debug("valid")
+ xml_from_dict = xml_from_dict["cas:serviceResponse"][
+ "cas:authenticationSuccess"
+ ]
+ username = xml_from_dict["cas:user"]
+ attributes = xml_from_dict.get("cas:attributes", {})
+
+ if attributes and "cas:memberOf" in attributes:
+ if isinstance(attributes["cas:memberOf"], basestring):
+ attributes["cas:memberOf"] = (
+ attributes["cas:memberOf"].lstrip("[").rstrip("]").split(",")
+ )
+ for group_number in range(0, len(attributes["cas:memberOf"])):
+ attributes["cas:memberOf"][group_number] = (
+ attributes["cas:memberOf"][group_number].lstrip(" ").rstrip(" ")
+ )
+ flask.session[cas_username_session_key] = username
+ flask.session[cas_attributes_session_key] = attributes
+ else:
+ current_app.logger.debug("invalid")
+
+ return isValid
diff --git a/requirements-3.9.txt b/requirements-3.9.txt
index 9032183a..99dddf50 100755
--- a/requirements-3.9.txt
+++ b/requirements-3.9.txt
@@ -85,4 +85,5 @@ visitor==0.1.3
Werkzeug==2.1.1
wrapt==1.14.0
WTForms==3.0.1
+xmltodict==0.13.0
zipp==3.8.0
diff --git a/sco_version.py b/sco_version.py
index 4a20b3b6..e99111a4 100644
--- a/sco_version.py
+++ b/sco_version.py
@@ -10,6 +10,7 @@ SCONEWS = """
- ScoDoc 9.4
+ - Connexion avec service CAS
- Améliorations des tableaux récapitulatifs
- Nouvelle interface de gestions des groupes (S. Lehmann)
- Enrichissement des jurys BUT et des procès-verbaux associés.