This commit is contained in:
Emmanuel Viennet 2023-02-27 10:23:59 +01:00
commit 5722ed6f93
26 changed files with 950 additions and 100 deletions

View File

@ -17,15 +17,17 @@ from flask import current_app, g, request
from flask import Flask from flask import Flask
from flask import abort, flash, has_request_context, jsonify from flask import abort, flash, has_request_context, jsonify
from flask import render_template from flask import render_template
from flask.json import JSONEncoder from flask_bootstrap import Bootstrap
from flask.logging import default_handler from flask_caching import Cache
from flask_sqlalchemy import SQLAlchemy from flask_cas import CAS
from flask_migrate import Migrate
from flask_login import LoginManager, current_user from flask_login import LoginManager, current_user
from flask_mail import Mail from flask_mail import Mail
from flask_bootstrap import Bootstrap from flask_migrate import Migrate
from flask_moment import Moment from flask_moment import Moment
from flask_caching import Cache from flask_sqlalchemy import SQLAlchemy
from flask.json import JSONEncoder
from flask.logging import default_handler
from jinja2 import select_autoescape from jinja2 import select_autoescape
import sqlalchemy import sqlalchemy
@ -132,7 +134,7 @@ class ScoDocJSONEncoder(JSONEncoder):
def render_raw_html(template_filename: str, **args) -> str: def render_raw_html(template_filename: str, **args) -> str:
"""Load and render an HTML file _without_ using Flask """Load and render an HTML file _without_ using Flask
Necessary for 503 error mesage, when DB is down and Flask may be broken. Necessary for 503 error message, when DB is down and Flask may be broken.
""" """
template_path = os.path.join( template_path = os.path.join(
current_app.config["SCODOC_DIR"], current_app.config["SCODOC_DIR"],
@ -226,14 +228,14 @@ class ReverseProxied(object):
def create_app(config_class=DevConfig): def create_app(config_class=DevConfig):
app = Flask(__name__, static_url_path="/ScoDoc/static", static_folder="static") app = Flask(__name__, static_url_path="/ScoDoc/static", static_folder="static")
CAS(app, url_prefix="/cas")
app.wsgi_app = ReverseProxied(app.wsgi_app) app.wsgi_app = ReverseProxied(app.wsgi_app)
app.json_encoder = ScoDocJSONEncoder app.json_encoder = ScoDocJSONEncoder
app.logger.setLevel(logging.INFO)
# Evite de logguer toutes les requetes dans notre log
logging.getLogger("werkzeug").disabled = True
app.config.from_object(config_class) app.config.from_object(config_class)
# Evite de logguer toutes les requetes dans notre log
logging.getLogger("werkzeug").disabled = True
app.logger.setLevel(app.config["LOG_LEVEL"])
# Vérifie/crée lien sym pour les URL statiques # Vérifie/crée lien sym pour les URL statiques
link_filename = f"{app.root_path}/static/links/{sco_version.SCOVERSION}" link_filename = f"{app.root_path}/static/links/{sco_version.SCOVERSION}"
@ -378,6 +380,11 @@ def create_app(config_class=DevConfig):
sco_bulletins_generator.register_bulletin_class(BulletinGeneratorExample) sco_bulletins_generator.register_bulletin_class(BulletinGeneratorExample)
from app.auth.cas import set_cas_configuration
with app.app_context():
set_cas_configuration(app)
return app return app

View File

@ -6,3 +6,4 @@ from flask import Blueprint
bp = Blueprint("auth", __name__) bp = Blueprint("auth", __name__)
from app.auth import routes from app.auth import routes
from app.auth import cas

88
app/auth/cas.py Normal file
View File

@ -0,0 +1,88 @@
# -*- coding: UTF-8 -*
"""
auth.cas.py
"""
import datetime
import flask
from flask import current_app, flash, url_for
from flask_login import login_user
from app.auth import bp
from app.auth.models import User
from app.models.config import ScoDocSiteConfig
from app.scodoc.sco_exceptions import ScoValueError
# after_cas_login/after_cas_logout : routes appelées par redirect depuis le serveur CAS.
@bp.route("/after_cas_login")
def after_cas_login():
"Called by CAS after CAS authentication"
# Ici on a les infos dans flask.session["CAS_ATTRIBUTES"]
if ScoDocSiteConfig.is_cas_enabled() and ("CAS_ATTRIBUTES" in flask.session):
# Lookup user:
cas_id = flask.session["CAS_ATTRIBUTES"].get(
"cas:" + ScoDocSiteConfig.get("cas_attribute_id")
)
if cas_id is not None:
user = User.query.filter_by(cas_id=cas_id).first()
if user and user.active:
if user.cas_allow_login:
current_app.logger.info(f"CAS: login {user.user_name}")
if login_user(user):
flask.session[
"scodoc_cas_login_date"
] = datetime.datetime.now().isoformat()
return flask.redirect(url_for("scodoc.index"))
else:
current_app.logger.info(
f"CAS login denied for {user.user_name} (not allowed to use CAS)"
)
else:
current_app.logger.info(
f"""CAS login denied for {user.user_name if user else ""} cas_id={cas_id} (unknown or inactive)"""
)
else:
current_app.logger.info(
f"""CAS attribute '{ScoDocSiteConfig.get("cas_attribute_id")}' not found !
(check your ScoDoc config)"""
)
# Echec:
flash("échec de l'authentification")
return flask.redirect(url_for("auth.login"))
@bp.route("/after_cas_logout")
def after_cas_logout():
"Called by CAS after CAS logout"
flash("Vous êtes déconnecté")
current_app.logger.info("after_cas_logout")
return flask.redirect(url_for("scodoc.index"))
def cas_error_callback(message):
"Called by CAS when an error occurs, with a message"
raise ScoValueError(f"Erreur authentification CAS: {message}")
def set_cas_configuration(app: flask.app.Flask = None):
"""Force la configuration du module flask_cas à partir des paramètres de
la config de ScoDoc.
Appelé au démarrage et à chaque modif des paramètres.
"""
app = app or current_app
if ScoDocSiteConfig.is_cas_enabled():
app.config["CAS_SERVER"] = ScoDocSiteConfig.get("cas_server")
app.config["CAS_AFTER_LOGIN"] = "auth.after_cas_login"
app.config["CAS_AFTER_LOGOUT"] = "auth.after_cas_logout"
app.config["CAS_ERROR_CALLBACK"] = cas_error_callback
app.config["CAS_SSL_VERIFY"] = ScoDocSiteConfig.get("cas_ssl_verify")
app.config["CAS_SSL_CERTIFICATE"] = ScoDocSiteConfig.get("cas_ssl_certificate")
else:
app.config.pop("CAS_SERVER", None)
app.config.pop("CAS_AFTER_LOGIN", None)
app.config.pop("CAS_AFTER_LOGOUT", None)
app.config.pop("CAS_SSL_VERIFY", None)
app.config.pop("CAS_SSL_CERTIFICATE", None)

View File

@ -8,9 +8,10 @@ import flask
from flask import g, redirect, request, url_for from flask import g, redirect, request, url_for
from flask_httpauth import HTTPBasicAuth, HTTPTokenAuth from flask_httpauth import HTTPBasicAuth, HTTPTokenAuth
import flask_login import flask_login
from app import login from app import login
from app.scodoc.sco_utils import json_error
from app.auth.models import User from app.auth.models import User
from app.scodoc.sco_utils import json_error
basic_auth = HTTPBasicAuth() basic_auth = HTTPBasicAuth()
token_auth = HTTPTokenAuth() token_auth = HTTPTokenAuth()

View File

@ -53,12 +53,28 @@ class User(UserMixin, db.Model):
id = db.Column(db.Integer, primary_key=True) id = db.Column(db.Integer, primary_key=True)
user_name = db.Column(db.String(64), index=True, unique=True) user_name = db.Column(db.String(64), index=True, unique=True)
"le login"
email = db.Column(db.String(120)) email = db.Column(db.String(120))
nom = db.Column(db.String(64)) nom = db.Column(db.String(64))
prenom = db.Column(db.String(64)) prenom = db.Column(db.String(64))
dept = db.Column(db.String(SHORT_STR_LEN), index=True) dept = db.Column(db.String(SHORT_STR_LEN), index=True)
"acronyme du département de l'utilisateur"
active = db.Column(db.Boolean, default=True, index=True) active = db.Column(db.Boolean, default=True, index=True)
"si faux, compte utilisateur désactivé"
cas_id = db.Column(db.Text(), index=True, unique=True, nullable=True)
"uid sur le CAS (mail ou autre attribut, selon config.cas_attribute_id)"
cas_allow_login = db.Column(
db.Boolean, default=False, server_default="false", nullable=False
)
"Peut-on se logguer via le CAS ?"
cas_allow_scodoc_login = db.Column(
db.Boolean, default=True, server_default="false", nullable=False
)
"""(not yet implemented XXX)
si CAS activé, peut-on se logguer sur ScoDoc directement ?
(le rôle ScoSuperAdmin peut toujours)
"""
password_hash = db.Column(db.String(128)) password_hash = db.Column(db.String(128))
password_scodoc7 = db.Column(db.String(42)) password_scodoc7 = db.Column(db.String(42))
@ -184,6 +200,9 @@ class User(UserMixin, db.Model):
"dept": self.dept, "dept": self.dept,
"id": self.id, "id": self.id,
"active": self.active, "active": self.active,
"cas_id": self.cas_id,
"cas_allow_login": self.cas_allow_login,
"cas_allow_scodoc_login": self.cas_allow_scodoc_login,
"status_txt": "actif" if self.active else "fermé", "status_txt": "actif" if self.active else "fermé",
"last_seen": self.last_seen.isoformat() + "Z" if self.last_seen else None, "last_seen": self.last_seen.isoformat() + "Z" if self.last_seen else None,
"nom": (self.nom or ""), # sco8 "nom": (self.nom or ""), # sco8
@ -206,7 +225,17 @@ class User(UserMixin, db.Model):
"""Set users' attributes from given dict values. """Set users' attributes from given dict values.
Roles must be encoded as "roles_string", like "Ens_RT, Secr_CJ" Roles must be encoded as "roles_string", like "Ens_RT, Secr_CJ"
""" """
for field in ["nom", "prenom", "dept", "active", "email", "date_expiration"]: for field in [
"nom",
"prenom",
"dept",
"active",
"email",
"date_expiration",
"cas_id",
"cas_allow_login",
"cas_allow_scodoc_login",
]:
if field in data: if field in data:
setattr(self, field, data[field] or None) setattr(self, field, data[field] or None)
if new_user: if new_user:

View File

@ -3,6 +3,7 @@
auth.routes.py auth.routes.py
""" """
import flask
from flask import current_app, flash, render_template from flask import current_app, flash, render_template
from flask import redirect, url_for, request from flask import redirect, url_for, request
from flask_login import login_user, logout_user, current_user from flask_login import login_user, logout_user, current_user
@ -20,6 +21,7 @@ from app.auth.models import Role
from app.auth.models import User from app.auth.models import User
from app.auth.email import send_password_reset_email from app.auth.email import send_password_reset_email
from app.decorators import admin_required from app.decorators import admin_required
from app.models.config import ScoDocSiteConfig
_ = lambda x: x # sans babel _ = lambda x: x # sans babel
_l = _ _l = _
@ -30,6 +32,7 @@ def login():
"ScoDoc Login form" "ScoDoc Login form"
if current_user.is_authenticated: if current_user.is_authenticated:
return redirect(url_for("scodoc.index")) return redirect(url_for("scodoc.index"))
form = LoginForm() form = LoginForm()
if form.validate_on_submit(): if form.validate_on_submit():
user = User.query.filter_by(user_name=form.user_name.data).first() user = User.query.filter_by(user_name=form.user_name.data).first()
@ -42,14 +45,22 @@ def login():
return form.redirect("scodoc.index") return form.redirect("scodoc.index")
message = request.args.get("message", "") message = request.args.get("message", "")
return render_template( return render_template(
"auth/login.j2", title=_("Sign In"), form=form, message=message "auth/login.j2",
title=_("Sign In"),
form=form,
message=message,
is_cas_enabled=ScoDocSiteConfig.is_cas_enabled(),
) )
@bp.route("/logout") @bp.route("/logout")
def logout(): def logout() -> flask.Response:
"Logout current user and redirect to home page" "Logout a scodoc user. If CAS session, logout from CAS. Redirect."
current_app.logger.info(f"logout user {current_user.user_name}")
logout_user() logout_user()
if ScoDocSiteConfig.is_cas_enabled() and flask.session.get("scodoc_cas_login_date"):
flask.session.pop("scodoc_cas_login_date", None)
return redirect(url_for("cas.logout"))
return redirect(url_for("scodoc.index")) return redirect(url_for("scodoc.index"))

View File

@ -0,0 +1,60 @@
# -*- mode: python -*-
# -*- coding: utf-8 -*-
##############################################################################
#
# ScoDoc
#
# Copyright (c) 1999 - 2023 Emmanuel Viennet. All rights reserved.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Emmanuel Viennet emmanuel.viennet@viennet.net
#
##############################################################################
"""
Formulaires configuration Exports Apogée (codes)
"""
from flask_wtf import FlaskForm
from wtforms import BooleanField, SubmitField
from wtforms.fields.simple import FileField, StringField, TextAreaField
class ConfigCASForm(FlaskForm):
"Formulaire paramétrage CAS"
cas_enable = BooleanField("activer le CAS")
cas_server = StringField(
label="URL du serveur CAS",
description="""url complète. Commence en général par <tt>https://</tt>.""",
)
cas_attribute_id = StringField(
label="Attribut CAS utilisé comme id",
description="""Le champs CAS qui sera considéré comme l'id unique des
comptes utilisateurs.""",
)
cas_ssl_verify = BooleanField("Vérification du certificat SSL")
cas_ssl_certificate_file = FileField(
label="Certificat (PEM)",
description="""Le contenu du certificat PEM
(commence typiquement par <tt>-----BEGIN CERTIFICATE-----</tt>)""",
)
submit = SubmitField("Valider")
cancel = SubmitField("Annuler", render_kw={"formnovalidate": True})

View File

@ -4,7 +4,7 @@
""" """
from flask import flash from flask import flash
from app import db, log from app import current_app, db, log
from app.comp import bonus_spo from app.comp import bonus_spo
from app.scodoc import sco_utils as scu from app.scodoc import sco_utils as scu
@ -87,6 +87,10 @@ class ScoDocSiteConfig(db.Model):
"enable_entreprises": bool, "enable_entreprises": bool,
"month_debut_annee_scolaire": int, "month_debut_annee_scolaire": int,
"month_debut_periode2": int, "month_debut_periode2": int,
# CAS
"cas_enable": bool,
"cas_server": str,
"cas_attribute_id": str,
} }
def __init__(self, name, value): def __init__(self, name, value):
@ -170,7 +174,7 @@ class ScoDocSiteConfig(db.Model):
(starting with empty string to represent "no bonus function"). (starting with empty string to represent "no bonus function").
""" """
d = bonus_spo.get_bonus_class_dict() d = bonus_spo.get_bonus_class_dict()
class_list = [(name, d[name].displayed_name) for name in d.keys()] class_list = [(name, d[name].displayed_name) for name in d]
class_list.sort(key=lambda x: x[1].replace(" du ", " de ")) class_list.sort(key=lambda x: x[1].replace(" du ", " de "))
return [("", "")] + class_list return [("", "")] + class_list
@ -204,13 +208,31 @@ class ScoDocSiteConfig(db.Model):
db.session.add(cfg) db.session.add(cfg)
db.session.commit() db.session.commit()
@classmethod
def is_cas_enabled(cls) -> bool:
"""True si on utilise le CAS"""
cfg = ScoDocSiteConfig.query.filter_by(name="cas_enable").first()
return cfg is not None and cfg.value
@classmethod
def cas_enable(cls, enabled=True) -> bool:
"""Active (ou déactive) le CAS. True si changement."""
if enabled != ScoDocSiteConfig.is_cas_enabled():
cfg = ScoDocSiteConfig.query.filter_by(name="cas_enable").first()
if cfg is None:
cfg = ScoDocSiteConfig(name="cas_enable", value="on" if enabled else "")
else:
cfg.value = "on" if enabled else ""
db.session.add(cfg)
db.session.commit()
return True
return False
@classmethod @classmethod
def is_entreprises_enabled(cls) -> bool: def is_entreprises_enabled(cls) -> bool:
"""True si on doit activer le module entreprise""" """True si on doit activer le module entreprise"""
cfg = ScoDocSiteConfig.query.filter_by(name="enable_entreprises").first() cfg = ScoDocSiteConfig.query.filter_by(name="enable_entreprises").first()
if (cfg is None) or not cfg.value: return cfg is not None and cfg.value
return False
return True
@classmethod @classmethod
def enable_entreprises(cls, enabled=True) -> bool: def enable_entreprises(cls, enabled=True) -> bool:
@ -228,6 +250,29 @@ class ScoDocSiteConfig(db.Model):
return True return True
return False return False
@classmethod
def get(cls, name: str) -> str:
"Get configuration param; empty string if unset"
cfg = ScoDocSiteConfig.query.filter_by(name=name).first()
return (cfg.value or "") if cfg else ""
@classmethod
def set(cls, name: str, value: str) -> bool:
"Set parameter, returns True if change. Commit session."
if cls.get(name) != (value or ""):
cfg = ScoDocSiteConfig.query.filter_by(name=name).first()
if cfg is None:
cfg = ScoDocSiteConfig(name=name, value=str(value))
else:
cfg.value = str(value or "")
current_app.logger.info(
f"""ScoDocSiteConfig: recording {cfg.name}='{cfg.value[:32]}...'"""
)
db.session.add(cfg)
db.session.commit()
return True
return False
@classmethod @classmethod
def _get_int_field(cls, name: str, default=None) -> int: def _get_int_field(cls, name: str, default=None) -> int:
"""Valeur d'un champs integer""" """Valeur d'un champs integer"""

View File

@ -225,6 +225,9 @@ _identiteEditor = ndb.EditableTable(
"nom", "nom",
"nom_usuel", "nom_usuel",
"prenom", "prenom",
"cas_id",
"cas_allow_login",
"cas_allow_scodoc_login",
"civilite", # 'M", "F", or "X" "civilite", # 'M", "F", or "X"
"date_naissance", "date_naissance",
"lieu_naissance", "lieu_naissance",

View File

@ -28,11 +28,10 @@
"""Fonctions sur les utilisateurs """Fonctions sur les utilisateurs
""" """
# Anciennement ZScoUsers.py, fonctions de gestion des données réécrite avec flask/SQLAlchemy # Anciennement ZScoUsers.py, fonctions de gestion des données réécrites avec flask/SQLAlchemy
import re import re
from flask import url_for, g, request from flask import url_for, g, request
from flask.templating import render_template
from flask_login import current_user from flask_login import current_user
@ -40,25 +39,13 @@ from app import db, Departement
from app.auth.models import Permission from app.auth.models import Permission
from app.auth.models import User from app.auth.models import User
from app.models import ScoDocSiteConfig
from app.scodoc import html_sco_header from app.scodoc import html_sco_header
from app.scodoc import sco_etud
from app.scodoc import sco_excel
from app.scodoc import sco_preferences from app.scodoc import sco_preferences
from app.scodoc.gen_tables import GenTable from app.scodoc.gen_tables import GenTable
from app import log, cache from app import cache
from app.scodoc.scolog import logdb
import app.scodoc.sco_utils as scu
from app.scodoc.sco_exceptions import ( from app.scodoc.sco_exceptions import ScoValueError
AccessDenied,
ScoValueError,
)
# ---------------
# ---------------
def index_html(all_depts=False, with_inactives=False, format="html"): def index_html(all_depts=False, with_inactives=False, format="html"):
@ -70,19 +57,21 @@ def index_html(all_depts=False, with_inactives=False, format="html"):
if current_user.has_permission(Permission.ScoUsersAdmin, g.scodoc_dept): if current_user.has_permission(Permission.ScoUsersAdmin, g.scodoc_dept):
H.append( H.append(
'<p><a href="{}" class="stdlink">Ajouter un utilisateur</a>'.format( f"""<p><a href="{url_for("users.create_user_form",
url_for("users.create_user_form", scodoc_dept=g.scodoc_dept) scodoc_dept=g.scodoc_dept)
) }" class="stdlink">Ajouter un utilisateur</a>"""
) )
if current_user.is_administrator(): if current_user.is_administrator():
H.append( H.append(
'&nbsp;&nbsp; <a href="{}" class="stdlink">Importer des utilisateurs</a></p>'.format( """&nbsp;&nbsp; <a href="{url_for("users.import_users_form",
url_for("users.import_users_form", scodoc_dept=g.scodoc_dept) scodoc_dept=g.scodoc_dept)
) }" class="stdlink">Importer des utilisateurs</a></p>"""
) )
else: else:
H.append( H.append(
"&nbsp;&nbsp; Pour importer des utilisateurs en masse (via xlsx file) contactez votre administrateur scodoc." """&nbsp;&nbsp; Pour importer des utilisateurs en masse (via fichier xlsx)
contactez votre administrateur scodoc."""
) )
if all_depts: if all_depts:
checked = "checked" checked = "checked"
@ -93,11 +82,12 @@ def index_html(all_depts=False, with_inactives=False, format="html"):
else: else:
olds_checked = "" olds_checked = ""
H.append( H.append(
"""<p><form name="f" action="%s" method="get"> f"""<p><form name="f" action="{request.base_url}" method="get">
<input type="checkbox" name="all_depts" value="1" onchange="document.f.submit();" %s>Tous les départements</input> <input type="checkbox" name="all_depts" value="1" onchange="document.f.submit();"
<input type="checkbox" name="with_inactives" value="1" onchange="document.f.submit();" %s>Avec anciens utilisateurs</input> {checked}>Tous les départements</input>
<input type="checkbox" name="with_inactives" value="1" onchange="document.f.submit();"
{olds_checked}>Avec anciens utilisateurs</input>
</form></p>""" </form></p>"""
% (request.base_url, checked, olds_checked)
) )
L = list_users( L = list_users(
@ -127,7 +117,7 @@ def list_users(
if dept and not all_depts: if dept and not all_depts:
users = get_user_list(dept=dept, with_inactives=with_inactives) users = get_user_list(dept=dept, with_inactives=with_inactives)
comm = "dept. %s" % dept comm = f"dept. {dept}"
else: else:
users = get_user_list(with_inactives=with_inactives) users = get_user_list(with_inactives=with_inactives)
comm = "tous" comm = "tous"
@ -135,13 +125,13 @@ def list_users(
comm += ", avec anciens" comm += ", avec anciens"
comm = "(" + comm + ")" comm = "(" + comm + ")"
# -- Add some information and links: # -- Add some information and links:
r = [] rows = []
for u in users: for u in users:
# Can current user modify this user ? # Can current user modify this user ?
can_modify = can_handle_passwd(u, allow_admindepts=True) can_modify = can_handle_passwd(u, allow_admindepts=True)
d = u.to_dict() d = u.to_dict()
r.append(d) rows.append(d)
# Add links # Add links
if with_links and can_modify: if with_links and can_modify:
target = url_for( target = url_for(
@ -168,11 +158,15 @@ def list_users(
"status_txt", "status_txt",
] ]
# Seul l'admin peut voir les dates de dernière connexion # Seul l'admin peut voir les dates de dernière connexion
# et les infos CAS
if current_user.is_administrator(): if current_user.is_administrator():
columns_ids.append("last_seen") columns_ids.append("last_seen")
if ScoDocSiteConfig.is_cas_enabled():
columns_ids += ["cas_id", "cas_allow_login", "cas_allow_scodoc_login"]
title = "Utilisateurs définis dans ScoDoc" title = "Utilisateurs définis dans ScoDoc"
tab = GenTable( tab = GenTable(
rows=r, rows=rows,
columns_ids=columns_ids, columns_ids=columns_ids,
titles={ titles={
"user_name": "Login", "user_name": "Login",
@ -186,12 +180,14 @@ def list_users(
"last_seen": "Dernière cnx.", "last_seen": "Dernière cnx.",
"passwd_temp": "Temp.", "passwd_temp": "Temp.",
"status_txt": "Etat", "status_txt": "Etat",
"cas_id": "Id CAS",
"cas_allow_login": "CAS autorisé",
"cas_allow_scodoc_login": "Cnx sans CAS",
}, },
caption=title, caption=title,
page_title="title", page_title="title",
html_title="""<h2>%d utilisateurs %s</h2> html_title=f"""<h2>{len(rows)} utilisateurs {comm}</h2>
<p class="help">Cliquer sur un nom pour changer son mot de passe</p>""" <p class="help">Cliquer sur un nom pour changer son mot de passe</p>""",
% (len(r), comm),
html_class="table_leftalign list_users", html_class="table_leftalign list_users",
html_with_td_classes=True, html_with_td_classes=True,
html_sortable=True, html_sortable=True,
@ -217,15 +213,6 @@ def get_user_list(dept=None, with_inactives=False):
return q.order_by(User.nom, User.user_name).all() return q.order_by(User.nom, User.user_name).all()
def _user_list(user_name):
"return user as a dict"
u = User.query.filter_by(user_name=user_name).first()
if u:
return u.to_dict()
else:
return None
@cache.memoize(timeout=50) # seconds @cache.memoize(timeout=50) # seconds
def user_info(user_name_or_id=None, user: User = None): def user_info(user_name_or_id=None, user: User = None):
"""Dict avec infos sur l'utilisateur (qui peut ne pas etre dans notre base). """Dict avec infos sur l'utilisateur (qui peut ne pas etre dans notre base).
@ -273,6 +260,9 @@ def user_info(user_name_or_id=None, user: User = None):
return info return info
MSG_OPT = """<br>Attention: (vous pouvez forcer l'opération en cochant "<em>Ignorer les avertissements</em>" en bas de page)"""
def check_modif_user( def check_modif_user(
edit, edit,
enforce_optionals=False, enforce_optionals=False,
@ -281,7 +271,8 @@ def check_modif_user(
prenom="", prenom="",
email="", email="",
dept="", dept="",
roles=[], roles: list = None,
cas_id: str = None,
): ):
"""Vérifie que cet utilisateur peut être créé (edit=0) ou modifié (edit=1) """Vérifie que cet utilisateur peut être créé (edit=0) ou modifié (edit=1)
Cherche homonymes. Cherche homonymes.
@ -290,32 +281,31 @@ def check_modif_user(
(si ok est faux, l'utilisateur peut quand même forcer la creation) (si ok est faux, l'utilisateur peut quand même forcer la creation)
- msg: message warning à presenter à l'utilisateur - msg: message warning à presenter à l'utilisateur
""" """
MSG_OPT = """<br>Attention: (vous pouvez forcer l'opération en cochant "<em>Ignorer les avertissements</em>" en bas de page)""" roles = roles or []
# ce login existe ? # ce login existe ?
user = _user_list(user_name) user = User.query.filter_by(user_name=user_name).first()
if edit and not user: # safety net, le user_name ne devrait pas changer if edit and not user: # safety net, le user_name ne devrait pas changer
return False, "identifiant %s inexistant" % user_name return False, f"identifiant {user_name} inexistant"
if not edit and user: if not edit and user:
return False, "identifiant %s déjà utilisé" % user_name return False, f"identifiant {user_name} déjà utilisé"
if not user_name or not nom or not prenom: if not user_name or not nom or not prenom:
return False, "champ requis vide" return False, "champ requis vide"
if not re.match(r"^[a-zA-Z0-9@\\\-_\\\.]*$", user_name): if not re.match(r"^[a-zA-Z0-9@\\\-_\\\.]*$", user_name):
return ( return (
False, False,
"identifiant '%s' invalide (pas d'accents ni de caractères spéciaux)" f"identifiant '{user_name}' invalide (pas d'accents ni de caractères spéciaux)",
% user_name,
) )
if enforce_optionals and len(user_name) > 64: if enforce_optionals and len(user_name) > 64:
return False, "identifiant '%s' trop long (64 caractères)" % user_name return False, f"identifiant '{user_name}' trop long (64 caractères)"
if enforce_optionals and len(nom) > 64: if enforce_optionals and len(nom) > 64:
return False, "nom '%s' trop long (64 caractères)" % nom + MSG_OPT return False, f"nom '{nom}' trop long (64 caractères)" + MSG_OPT
if enforce_optionals and len(prenom) > 64: if enforce_optionals and len(prenom) > 64:
return False, "prenom '%s' trop long (64 caractères)" % prenom + MSG_OPT return False, f"prenom '{prenom}' trop long (64 caractères)" + MSG_OPT
# check that tha same user_name has not already been described in this import # check that same user_name has not already been described in this import
if not email: if not email:
return False, "vous devriez indiquer le mail de l'utilisateur créé !" return False, "vous devriez indiquer le mail de l'utilisateur créé !"
if len(email) > 120: if len(email) > 120:
return False, "email '%s' trop long (120 caractères)" % email return False, f"email '{email}' trop long (120 caractères)"
if not re.fullmatch(r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b", email): if not re.fullmatch(r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b", email):
return False, "l'adresse mail semble incorrecte" return False, "l'adresse mail semble incorrecte"
# check département # check département
@ -324,19 +314,31 @@ def check_modif_user(
and dept and dept
and Departement.query.filter_by(acronym=dept).first() is None and Departement.query.filter_by(acronym=dept).first() is None
): ):
return False, "département '%s' inexistant" % dept + MSG_OPT return False, f"département '{dept}' inexistant" + MSG_OPT
if enforce_optionals and not roles: if enforce_optionals and not roles:
return False, "aucun rôle sélectionné, êtes vous sûr ?" + MSG_OPT return False, "aucun rôle sélectionné, êtes vous sûr ?" + MSG_OPT
# Unicité du mail # Unicité du mail
users_with_this_mail = User.query.filter_by(email=email).all() users_with_this_mail = User.query.filter_by(email=email).all()
if edit: # modification if edit: # modification
if email != user["email"] and len(users_with_this_mail) > 0: if email != user.email and len(users_with_this_mail) > 0:
return False, "un autre utilisateur existe déjà avec cette adresse mail" return False, "un autre utilisateur existe déjà avec cette adresse mail"
else: # création utilisateur else: # création utilisateur
if len(users_with_this_mail) > 0: if len(users_with_this_mail) > 0:
return False, "un autre utilisateur existe déjà avec cette adresse mail" return False, "un autre utilisateur existe déjà avec cette adresse mail"
# ok # Unicité du cas_id
if cas_id:
cas_users = User.query.filter_by(cas_id=cas_id).all()
if edit:
if cas_users and (
len(cas_users) > 1 or cas_users[0].user_name != user_name
):
return (
False,
"un autre utilisateur existe déjà avec cet identifiant CAS",
)
elif cas_users:
return False, "un autre utilisateur existe déjà avec cet identifiant CAS"
# Des noms/prénoms semblables existent ? # Des noms/prénoms semblables existent ?
nom = nom.lower().strip() nom = nom.lower().strip()
prenom = prenom.lower().strip() prenom = prenom.lower().strip()
@ -367,7 +369,7 @@ def user_edit(user_name, vals):
"""Edit the user specified by user_name """Edit the user specified by user_name
(ported from Zope to SQLAlchemy, hence strange !) (ported from Zope to SQLAlchemy, hence strange !)
""" """
u = User.query.filter_by(user_name=user_name).first() u: User = User.query.filter_by(user_name=user_name).first()
if not u: if not u:
raise ScoValueError("Invalid user_name") raise ScoValueError("Invalid user_name")
u.from_dict(vals) u.from_dict(vals)

View File

@ -3551,14 +3551,17 @@ table.table_listegroupe tr td {
padding-right: 0.5em; padding-right: 0.5em;
} }
table.list_users td.roles {
width: 22em;
}
table.list_users td.date_modif_passwd { table.list_users td.date_modif_passwd {
white-space: nowrap; white-space: nowrap;
} }
table.list_users td.roles_string,
table.list_users th.roles_string {
word-wrap: break-word;
overflow-wrap: break-word;
}
table.formsemestre_description tr.table_row_ue td { table.formsemestre_description tr.table_row_ue td {
font-weight: bold; font-weight: bold;
} }
@ -4541,3 +4544,9 @@ table.formation_table_recap td.heures_td,
table.formation_table_recap td.heures_tp { table.formation_table_recap td.heures_tp {
text-align: right; text-align: right;
} }
div.cas_etat_certif_ssl {
margin-top: 12px;
font-style: italic;
border: 1px dashed black;
}

View File

@ -9,6 +9,12 @@
{% endif %} {% endif %}
<h1>Connexion</h1> <h1>Connexion</h1>
{% if is_cas_enabled %}
<div class"cas_link">
<a href="{{ url_for('cas.login') }}" class="stdlink">Se connecter avec CAS</a>
</div>
{% endif %}
<div class="row"> <div class="row">
<div class="col-md-4"> <div class="col-md-4">
{{ wtf.quick_form(form) }} {{ wtf.quick_form(form) }}

View File

@ -7,6 +7,9 @@
<h2>Utilisateur: {{user.user_name}} ({{'actif' if user.active else 'fermé'}})</h2> <h2>Utilisateur: {{user.user_name}} ({{'actif' if user.active else 'fermé'}})</h2>
<p> <p>
<b>Login :</b> {{user.user_name}}<br> <b>Login :</b> {{user.user_name}}<br>
<b>CAS id:</b> {{user.cas_id or "(aucun)"}}
(CAS {{'autorisé' if user.cas_allow_login else 'interdit'}} pour cet utilisateur)
<br>
<b>Nom :</b> {{user.nom or ""}}<br> <b>Nom :</b> {{user.nom or ""}}<br>
<b>Prénom :</b> {{user.prenom or ""}}<br> <b>Prénom :</b> {{user.prenom or ""}}<br>
<b>Mail :</b> {{user.email}}<br> <b>Mail :</b> {{user.email}}<br>
@ -48,9 +51,15 @@
</ul> </ul>
{% if current_user.id == user.id %} {% if current_user.id == user.id %}
<div class="user_info_session">
{% if session_info %}
<p><b>CAS session started at </b>{{ session_info }}</p>
{% endif %}
<p><b>Se déconnecter: <p><b>Se déconnecter:
<a class="stdlink" href="{{url_for('auth.logout')}}">logout</a> <a class="stdlink" href="{{url_for('auth.logout')}}">logout</a>
</b></p> </b>
</p>
</div>
{% endif %} {% endif %}
{# Liste des permissions #} {# Liste des permissions #}

View File

@ -0,0 +1,25 @@
{% extends "base.j2" %}
{% import 'bootstrap/wtf.html' as wtf %}
{% block app_content %}
<h1>Configuration du Service d'Authentification Central (CAS)</h1>
<div class="help">
<p>Le CAS permet d'utiliser un service SSO pour connecter les utilisateurs.</p>
</div>
<div class="row">
<div class="col-md-8">
{{ wtf.quick_form(form) }}
<div class="cas_etat_certif_ssl">Certificat SSL
{% if cas_ssl_certificate_loaded %}
chargé.
{% else %}
non chargé.
{% endif %}
</div>
</div>
</div>
{% endblock %}

View File

@ -53,8 +53,9 @@
</p> </p>
</section> </section>
<h2>Utilisateurs</h2> <h2>Utilisateurs et CAS</h2>
<section> <section>
<p><a class="stdlink" href="{{url_for('scodoc.config_cas')}}">configuration du service CAS</a>
<p><a class="stdlink" href="{{url_for('auth.reset_standard_roles_permissions')}}">remettre <p><a class="stdlink" href="{{url_for('auth.reset_standard_roles_permissions')}}">remettre
les permissions des rôles standards à leurs valeurs par défaut</a> (efface les modifications apportées) les permissions des rôles standards à leurs valeurs par défaut</a> (efface les modifications apportées)
</p> </p>

View File

@ -54,6 +54,7 @@ from werkzeug.exceptions import BadRequest, NotFound
from app import db from app import db
from app.auth.models import User from app.auth.models import User
from app.auth.cas import set_cas_configuration
from app.decorators import ( from app.decorators import (
admin_required, admin_required,
scodoc7func, scodoc7func,
@ -62,6 +63,7 @@ from app.decorators import (
from app.forms.main import config_logos, config_main from app.forms.main import config_logos, config_main
from app.forms.main.create_dept import CreateDeptForm from app.forms.main.create_dept import CreateDeptForm
from app.forms.main.config_apo import CodesDecisionsForm from app.forms.main.config_apo import CodesDecisionsForm
from app.forms.main.config_cas import ConfigCASForm
from app import models from app import models
from app.models import Departement, Identite from app.models import Departement, Identite
from app.models import departements from app.models import departements
@ -73,7 +75,7 @@ from app.scodoc import sco_find_etud
from app.scodoc import sco_logos from app.scodoc import sco_logos
from app.scodoc import sco_utils as scu from app.scodoc import sco_utils as scu
from app.scodoc.sco_exceptions import AccessDenied from app.scodoc.sco_exceptions import AccessDenied, ScoValueError
from app.scodoc.sco_permissions import Permission from app.scodoc.sco_permissions import Permission
from app.views import scodoc_bp as bp from app.views import scodoc_bp as bp
import sco_version import sco_version
@ -134,6 +136,46 @@ def toggle_dept_vis(dept_id):
return redirect(url_for("scodoc.index")) return redirect(url_for("scodoc.index"))
@bp.route("/ScoDoc/config_cas", methods=["GET", "POST"])
@admin_required
def config_cas():
"""Form config CAS"""
form = ConfigCASForm()
if request.method == "POST" and form.cancel.data: # cancel button
return redirect(url_for("scodoc.index"))
if form.validate_on_submit():
if ScoDocSiteConfig.cas_enable(enabled=form.data["cas_enable"]):
flash("CAS " + ("activé" if form.data["cas_enable"] else "désactivé"))
if ScoDocSiteConfig.set("cas_server", form.data["cas_server"]):
flash("Serveur CAS enregistré")
if ScoDocSiteConfig.set("cas_attribute_id", form.data["cas_attribute_id"]):
flash("Serveur CAS enregistré")
if ScoDocSiteConfig.set("cas_ssl_verify", form.data["cas_ssl_verify"]):
flash("Vérification SSL modifiée")
if form.cas_ssl_certificate_file.data:
data = request.files[form.cas_ssl_certificate_file.name].read()
try:
data_str = data.decode("ascii")
except UnicodeDecodeError as exc:
raise ScoValueError("Fichier certificat invalide (non ASCII)") from exc
if ScoDocSiteConfig.set("cas_ssl_certificate", data_str):
flash("Certificat SSL enregistré")
set_cas_configuration()
return redirect(url_for("scodoc.configuration"))
elif request.method == "GET":
form.cas_enable.data = ScoDocSiteConfig.get("cas_enable")
form.cas_server.data = ScoDocSiteConfig.get("cas_server")
form.cas_attribute_id.data = ScoDocSiteConfig.get("cas_attribute_id")
form.cas_ssl_verify.data = ScoDocSiteConfig.get("cas_ssl_verify")
return render_template(
"config_cas.j2",
form=form,
title="Configuration du Service d'Authentification Central (CAS)",
cas_ssl_certificate_loaded=ScoDocSiteConfig.get("cas_ssl_certificate"),
)
@bp.route("/ScoDoc/config_codes_decisions", methods=["GET", "POST"]) @bp.route("/ScoDoc/config_codes_decisions", methods=["GET", "POST"])
@admin_required @admin_required
def config_codes_decisions(): def config_codes_decisions():

View File

@ -56,6 +56,7 @@ from app.auth.models import UserRole
from app.auth.models import is_valid_password from app.auth.models import is_valid_password
from app.email import send_email from app.email import send_email
from app.models import Departement from app.models import Departement
from app.models.config import ScoDocSiteConfig
from app.decorators import ( from app.decorators import (
scodoc, scodoc,
@ -226,7 +227,7 @@ def create_user_form(user_name=None, edit=0, all_roles=True):
if edit: if edit:
if not user_name: if not user_name:
raise ValueError("missing argument: user_name") raise ValueError("missing argument: user_name")
the_user = User.query.filter_by(user_name=user_name).first() the_user: User = User.query.filter_by(user_name=user_name).first()
if not the_user: if not the_user:
raise ScoValueError("utilisateur inexistant") raise ScoValueError("utilisateur inexistant")
initvalues = the_user.to_dict() initvalues = the_user.to_dict()
@ -367,6 +368,7 @@ def create_user_form(user_name=None, edit=0, all_roles=True):
{"input_type": "hidden", "default": initvalues["user_name"]}, {"input_type": "hidden", "default": initvalues["user_name"]},
) )
] ]
cas_enabled = ScoDocSiteConfig.is_cas_enabled()
descr += [ descr += [
( (
"email", "email",
@ -376,11 +378,34 @@ def create_user_form(user_name=None, edit=0, all_roles=True):
"explanation": "requis, doit fonctionner" "explanation": "requis, doit fonctionner"
if not edit_only_roles if not edit_only_roles
else "", else "",
"size": 20, "size": 36,
"allow_null": False, "allow_null": False,
"readonly": edit_only_roles, "readonly": edit_only_roles,
}, },
) ),
(
"cas_id",
{
"title": "Identifiant CAS",
"input_type": "text",
"explanation": "id du compte utilisateur sur le CAS de l'établissement "
+ "(service CAS activé)"
if cas_enabled
else "(service CAS non activé)",
"size": 36,
"allow_null": True,
"readonly": not cas_enabled,
},
),
(
"cas_allow_login",
{
"title": "Autorise connexion via CAS",
"input_type": "boolcheckbox",
"explanation": "en test: seul le super-administrateur peut changer ce réglage",
"readonly": not current_user.is_administrator(),
},
),
] ]
if not edit: # options création utilisateur if not edit: # options création utilisateur
descr += [ descr += [
@ -438,7 +463,8 @@ def create_user_form(user_name=None, edit=0, all_roles=True):
"d", "d",
{ {
"input_type": "separator", "input_type": "separator",
"title": f"""L'utilisateur appartient au département {the_user.dept or "(tous)"}""", "title": f"""L'utilisateur appartient au département {
the_user.dept or "(tous)"}""",
}, },
) )
) )
@ -541,7 +567,6 @@ def create_user_form(user_name=None, edit=0, all_roles=True):
if err_msg: if err_msg:
H.append(tf_error_message(f"""Erreur: {err_msg}""")) H.append(tf_error_message(f"""Erreur: {err_msg}"""))
return "\n".join(H) + "\n" + tf[1] + F return "\n".join(H) + "\n" + tf[1] + F
if not edit_only_roles: if not edit_only_roles:
ok_modif, msg = sco_users.check_modif_user( ok_modif, msg = sco_users.check_modif_user(
edit, edit,
@ -552,6 +577,7 @@ def create_user_form(user_name=None, edit=0, all_roles=True):
email=vals["email"], email=vals["email"],
dept=vals.get("dept", auth_dept), dept=vals.get("dept", auth_dept),
roles=vals["roles"], roles=vals["roles"],
cas_id=vals["cas_id"],
) )
if not ok_modif: if not ok_modif:
H.append(tf_error_message(msg)) H.append(tf_error_message(msg))
@ -815,12 +841,17 @@ def user_info_page(user_name=None):
if not user: if not user:
raise ScoValueError("invalid user_name") raise ScoValueError("invalid user_name")
session_info = None
if user.id == current_user.id:
session_info = flask.session.get("scodoc_cas_login_date")
return render_template( return render_template(
"auth/user_info_page.j2", "auth/user_info_page.j2",
user=user, user=user,
title=f"Utilisateur {user.user_name}", title=f"Utilisateur {user.user_name}",
Permission=Permission, Permission=Permission,
dept=dept, dept=dept,
session_info=session_info,
) )

View File

@ -1,7 +1,7 @@
# -*- coding: UTF-8 -* # -*- coding: UTF-8 -*
import os import os
import uuid import logging
from dotenv import load_dotenv from dotenv import load_dotenv
BASEDIR = os.path.abspath(os.path.dirname(__file__)) BASEDIR = os.path.abspath(os.path.dirname(__file__))
@ -16,6 +16,7 @@ class Config:
SECRET_KEY = os.environ.get("SECRET_KEY") or "90e01e75831e4276a4c70d29564b425f" SECRET_KEY = os.environ.get("SECRET_KEY") or "90e01e75831e4276a4c70d29564b425f"
SQLALCHEMY_TRACK_MODIFICATIONS = False SQLALCHEMY_TRACK_MODIFICATIONS = False
LOG_TO_STDOUT = os.environ.get("LOG_TO_STDOUT") LOG_TO_STDOUT = os.environ.get("LOG_TO_STDOUT")
LOG_LEVEL = getattr(logging, os.environ.get("LOG_LEVEL", "INFO"), "INFO")
MAIL_SERVER = os.environ.get("MAIL_SERVER", "localhost") MAIL_SERVER = os.environ.get("MAIL_SERVER", "localhost")
MAIL_PORT = int(os.environ.get("MAIL_PORT", 25)) MAIL_PORT = int(os.environ.get("MAIL_PORT", 25))
MAIL_USE_TLS = os.environ.get("MAIL_USE_TLS") is not None MAIL_USE_TLS = os.environ.get("MAIL_USE_TLS") is not None
@ -40,9 +41,6 @@ class Config:
# Pour conserver l'ordre des objets dans les JSON: # Pour conserver l'ordre des objets dans les JSON:
# e.g. l'ordre des UE dans les bulletins # e.g. l'ordre des UE dans les bulletins
JSON_SORT_KEYS = False JSON_SORT_KEYS = False
# STATIC_URL_PATH = "/ScoDoc/static"
# static_folder = "stat"
# SERVER_NAME = os.environ.get("SERVER_NAME")
class ProdConfig(Config): class ProdConfig(Config):

7
flask_cas/README.md Normal file
View File

@ -0,0 +1,7 @@
# Flask-CAS
Forked from <https://github.com/MasterRoshan/flask-cas-ng>
and adapted by Emmanuel Viennet, Feb. 2023.
- logout: clear `_CAS_TOKEN`.
- Use `url` instead of `service` parameter in logout URL.

106
flask_cas/__init__.py Normal file
View File

@ -0,0 +1,106 @@
"""
flask_cas.__init__
"""
import flask
from flask import current_app
# Find the stack on which we want to store the database connection.
# Starting with Flask 0.9, the _app_ctx_stack is the correct one,
# before that we need to use the _request_ctx_stack.
try:
from flask import _app_ctx_stack as stack
except ImportError:
from flask import _request_ctx_stack as stack
from . import routing
from functools import wraps
class CAS(object):
"""
Required Configs:
|Key |
|----------------|
|CAS_SERVER |
|CAS_AFTER_LOGIN |
Optional Configs:
|Key | Default |
|---------------------------|-----------------------|
|CAS_TOKEN_SESSION_KEY | _CAS_TOKEN |
|CAS_USERNAME_SESSION_KEY | CAS_USERNAME |
|CAS_ATTRIBUTES_SESSION_KEY | CAS_ATTRIBUTES |
|CAS_LOGIN_ROUTE | '/cas' |
|CAS_LOGOUT_ROUTE | '/cas/logout' |
|CAS_VALIDATE_ROUTE | '/cas/serviceValidate'|
|CAS_AFTER_LOGOUT | None |
"""
def __init__(self, app=None, url_prefix=None):
self._app = app
if app is not None:
self.init_app(app, url_prefix)
def init_app(self, app, url_prefix=None):
# Configuration defaults
app.config.setdefault('CAS_TOKEN_SESSION_KEY', '_CAS_TOKEN')
app.config.setdefault('CAS_USERNAME_SESSION_KEY', 'CAS_USERNAME')
app.config.setdefault('CAS_ATTRIBUTES_SESSION_KEY', 'CAS_ATTRIBUTES')
app.config.setdefault('CAS_LOGIN_ROUTE', '/cas')
app.config.setdefault('CAS_LOGOUT_ROUTE', '/cas/logout')
app.config.setdefault('CAS_VALIDATE_ROUTE', '/cas/serviceValidate')
# Requires CAS 2.0
app.config.setdefault('CAS_AFTER_LOGOUT', None)
# Register Blueprint
app.register_blueprint(routing.blueprint, url_prefix=url_prefix)
# Use the newstyle teardown_appcontext if it's available,
# otherwise fall back to the request context
if hasattr(app, 'teardown_appcontext'):
app.teardown_appcontext(self.teardown)
else:
app.teardown_request(self.teardown)
def teardown(self, exception):
ctx = stack.top
@property
def app(self):
return self._app or current_app
@property
def username(self):
return flask.session.get(
self.app.config['CAS_USERNAME_SESSION_KEY'], None)
@property
def attributes(self):
return flask.session.get(
self.app.config['CAS_ATTRIBUTES_SESSION_KEY'], None)
@property
def token(self):
return flask.session.get(
self.app.config['CAS_TOKEN_SESSION_KEY'], None)
def login():
return flask.redirect(flask.url_for('cas.login', _external=True))
def logout():
return flask.redirect(flask.url_for('cas.logout', _external=True))
def login_required(function):
@wraps(function)
def wrap(*args, **kwargs):
if 'CAS_USERNAME' not in flask.session:
flask.session['CAS_AFTER_LOGIN_SESSION_URL'] = (
flask.request.script_root +
flask.request.full_path
)
return login()
else:
return function(*args, **kwargs)
return wrap

125
flask_cas/cas_urls.py Normal file
View File

@ -0,0 +1,125 @@
"""
flask_cas.cas_urls
Functions for creating urls to access CAS.
"""
try:
from urllib import quote
from urllib import urlencode
from urlparse import urljoin
except ImportError:
from urllib.parse import quote
from urllib.parse import urljoin
from urllib.parse import urlencode
def create_url(base, path=None, *query):
"""Create a url.
Creates a url by combining base, path, and the query's list of
key/value pairs. Escaping is handled automatically. Any
key/value pair with a value that is None is ignored.
Keyword arguments:
base -- The left most part of the url (ex. http://localhost:5000).
path -- The path after the base (ex. /foo/bar).
query -- A list of key value pairs (ex. [('key', 'value')]).
Example usage:
>>> create_url(
... 'http://localhost:5000',
... 'foo/bar',
... ('key1', 'value'),
... ('key2', None), # Will not include None
... ('url', 'http://example.com'),
... )
'http://localhost:5000/foo/bar?key1=value&url=http%3A%2F%2Fexample.com'
"""
url = base
# Add the path to the url if it's not None.
if path is not None:
url = urljoin(url, quote(path))
# Remove key/value pairs with None values.
query = filter(lambda pair: pair[1] is not None, query)
# Add the query string to the url
url = urljoin(url, "?{0}".format(urlencode(list(query))))
return url
def create_cas_login_url(cas_url, cas_route, service, renew=None, gateway=None):
"""Create a CAS login URL .
Keyword arguments:
cas_url -- The url to the CAS (ex. http://sso.pdx.edu)
cas_route -- The route where the CAS lives on server (ex. /cas)
service -- (ex. http://localhost:5000/login)
renew -- "true" or "false"
gateway -- "true" or "false"
Example usage:
>>> create_cas_login_url(
... 'http://sso.pdx.edu',
... '/cas',
... 'http://localhost:5000',
... )
'http://sso.pdx.edu/cas?service=http%3A%2F%2Flocalhost%3A5000'
"""
return create_url(
cas_url,
cas_route,
("service", service),
("renew", renew),
("gateway", gateway),
)
def create_cas_logout_url(cas_url, cas_route, service=None):
"""Create a CAS logout URL.
Keyword arguments:
cas_url -- The url to the CAS (ex. http://sso.pdx.edu)
cas_route -- The route where the CAS lives on server (ex. /cas/logout)
url -- (ex. http://localhost:5000/login)
Example usage:
>>> create_cas_logout_url(
... 'http://sso.pdx.edu',
... '/cas/logout',
... 'http://localhost:5000',
... )
'http://sso.pdx.edu/cas/logout?service=http%3A%2F%2Flocalhost%3A5000'
"""
return create_url(
cas_url,
cas_route,
("url", service), # 'url' (and not 'service'): redirect here after CAS logout
)
def create_cas_validate_url(cas_url, cas_route, service, ticket, renew=None):
"""Create a CAS validate URL.
Keyword arguments:
cas_url -- The url to the CAS (ex. http://sso.pdx.edu)
cas_route -- The route where the CAS lives on server (ex. /cas/serviceValidate)
service -- (ex. http://localhost:5000/login)
ticket -- (ex. 'ST-58274-x839euFek492ou832Eena7ee-cas')
renew -- "true" or "false"
Example usage:
>>> create_cas_validate_url(
... 'http://sso.pdx.edu',
... '/cas/serviceValidate',
... 'http://localhost:5000/login',
... 'ST-58274-x839euFek492ou832Eena7ee-cas'
... )
'http://sso.pdx.edu/cas/serviceValidate?service=http%3A%2F%2Flocalhost%3A5000%2Flogin&ticket=ST-58274-x839euFek492ou832Eena7ee-cas'
"""
return create_url(
cas_url,
cas_route,
("service", service),
("ticket", ticket),
("renew", renew),
)

194
flask_cas/routing.py Normal file
View File

@ -0,0 +1,194 @@
import ssl
import flask
from xmltodict import parse
from flask import current_app
from .cas_urls import create_cas_login_url
from .cas_urls import create_cas_logout_url
from .cas_urls import create_cas_validate_url
try:
from urllib import urlopen # python 2
except ImportError:
from urllib.request import urlopen # python 3
from urllib.error import URLError
blueprint = flask.Blueprint("cas", __name__)
@blueprint.route("/login/")
def login():
"""
This route has two purposes. First, it is used by the user
to login. Second, it is used by the CAS to respond with the
`ticket` after the user logs in successfully.
When the user accesses this url, they are redirected to the CAS
to login. If the login was successful, the CAS will respond to this
route with the ticket in the url. The ticket is then validated.
If validation was successful the logged in username is saved in
the user's session under the key `CAS_USERNAME_SESSION_KEY` and
the user's attributes are saved under the key
'CAS_USERNAME_ATTRIBUTE_KEY'
"""
cas_token_session_key = current_app.config["CAS_TOKEN_SESSION_KEY"]
redirect_url = create_cas_login_url(
current_app.config["CAS_SERVER"],
current_app.config["CAS_LOGIN_ROUTE"],
flask.url_for(
".login",
origin=flask.session.get("CAS_AFTER_LOGIN_SESSION_URL"),
_external=True,
),
)
if "ticket" in flask.request.args:
flask.session[cas_token_session_key] = flask.request.args["ticket"]
if cas_token_session_key in flask.session:
if validate(flask.session[cas_token_session_key]):
if "CAS_AFTER_LOGIN_SESSION_URL" in flask.session:
redirect_url = flask.session.pop("CAS_AFTER_LOGIN_SESSION_URL")
elif flask.request.args.get("origin"):
redirect_url = flask.request.args["origin"]
else:
redirect_url = flask.url_for(current_app.config["CAS_AFTER_LOGIN"])
else:
flask.session.pop(cas_token_session_key, None)
current_app.logger.debug("Redirecting to: {0}".format(redirect_url))
return flask.redirect(redirect_url)
@blueprint.route("/logout/")
def logout():
"""
When the user accesses this route they are logged out.
"""
cas_username_session_key = current_app.config["CAS_USERNAME_SESSION_KEY"]
cas_attributes_session_key = current_app.config["CAS_ATTRIBUTES_SESSION_KEY"]
cas_token_session_key = current_app.config["CAS_TOKEN_SESSION_KEY"]
flask.session.pop(cas_username_session_key, None)
flask.session.pop(cas_attributes_session_key, None)
flask.session.pop(cas_token_session_key, None) # added by EV
cas_after_logout = current_app.config["CAS_AFTER_LOGOUT"]
if cas_after_logout is not None:
# If config starts with http, use it as dest URL.
# Else, build Flask URL
dest_url = (
cas_after_logout
if cas_after_logout.startswith("http")
else flask.url_for(cas_after_logout, _external=True)
)
redirect_url = create_cas_logout_url(
current_app.config["CAS_SERVER"],
current_app.config["CAS_LOGOUT_ROUTE"],
dest_url,
)
else:
redirect_url = create_cas_logout_url(
current_app.config["CAS_SERVER"], current_app.config["CAS_LOGOUT_ROUTE"]
)
current_app.logger.debug("Redirecting to: {0}".format(redirect_url))
return flask.redirect(redirect_url)
def validate(ticket):
"""
Will attempt to validate the ticket. If validation fails, then False
is returned. If validation is successful, then True is returned
and the validated username is saved in the session under the
key `CAS_USERNAME_SESSION_KEY` while tha validated attributes dictionary
is saved under the key 'CAS_ATTRIBUTES_SESSION_KEY'.
"""
cas_username_session_key = current_app.config["CAS_USERNAME_SESSION_KEY"]
cas_attributes_session_key = current_app.config["CAS_ATTRIBUTES_SESSION_KEY"]
cas_error_callback = current_app.config.get("CAS_ERROR_CALLBACK")
current_app.logger.debug("validating token {0}".format(ticket))
cas_validate_url = create_cas_validate_url(
current_app.config["CAS_SERVER"],
current_app.config["CAS_VALIDATE_ROUTE"],
flask.url_for(
".login",
origin=flask.session.get("CAS_AFTER_LOGIN_SESSION_URL"),
_external=True,
),
ticket,
)
current_app.logger.debug("Making GET request to {0}".format(cas_validate_url))
xml_from_dict = {}
isValid = False
if current_app.config.get("CAS_SSL_VERIFY"):
ssl_context = ssl.SSLContext()
ssl_context.verify_mode = ssl.CERT_REQUIRED
ca_data = current_app.config.get("CAS_SSL_CERTIFICATE", "")
try:
ssl_context.load_verify_locations(cadata=ca_data)
except (ssl.SSLError, ValueError):
current_app.logger.error("CAS : error loading SSL cert.")
if cas_error_callback:
cas_error_callback("erreur chargement certificat SSL CAS (PEM)")
return False
else:
ssl_context = None
try:
xmldump = (
urlopen(cas_validate_url, context=ssl_context)
.read()
.strip()
.decode("utf8", "ignore")
)
xml_from_dict = parse(xmldump)
isValid = (
True
if "cas:authenticationSuccess" in xml_from_dict["cas:serviceResponse"]
else False
)
except ValueError:
current_app.logger.error("CAS returned unexpected result")
if cas_error_callback:
cas_error_callback("réponse invalide du serveur CAS")
except URLError:
current_app.logger.error("CAS : error validating token: check SSL certificate")
cas_error_callback(
"erreur connexion au serveur CAS: vérifiez le certificat SSL"
)
if isValid:
current_app.logger.debug("valid")
xml_from_dict = xml_from_dict["cas:serviceResponse"][
"cas:authenticationSuccess"
]
username = xml_from_dict["cas:user"]
attributes = xml_from_dict.get("cas:attributes", {})
if attributes and "cas:memberOf" in attributes:
if isinstance(attributes["cas:memberOf"], basestring):
attributes["cas:memberOf"] = (
attributes["cas:memberOf"].lstrip("[").rstrip("]").split(",")
)
for group_number in range(0, len(attributes["cas:memberOf"])):
attributes["cas:memberOf"][group_number] = (
attributes["cas:memberOf"][group_number].lstrip(" ").rstrip(" ")
)
flask.session[cas_username_session_key] = username
flask.session[cas_attributes_session_key] = attributes
else:
current_app.logger.debug("invalid")
return isValid

View File

@ -0,0 +1,47 @@
"""CAS
Revision ID: 4c19fcb42636
Revises: d8288b7f0a3e
Create Date: 2023-02-26 20:58:30.113631
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "4c19fcb42636"
down_revision = "d8288b7f0a3e"
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column("user", sa.Column("cas_id", sa.Text(), nullable=True))
op.add_column(
"user",
sa.Column(
"cas_allow_login", sa.Boolean(), server_default="false", nullable=False
),
)
op.add_column(
"user",
sa.Column(
"cas_allow_scodoc_login",
sa.Boolean(),
server_default="false",
nullable=False,
),
)
op.create_index(op.f("ix_user_cas_id"), "user", ["cas_id"], unique=True)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f("ix_user_cas_id"), table_name="user")
op.drop_column("user", "cas_allow_scodoc_login")
op.drop_column("user", "cas_allow_login")
op.drop_column("user", "cas_id")
# ### end Alembic commands ###

View File

@ -85,4 +85,5 @@ visitor==0.1.3
Werkzeug==2.1.1 Werkzeug==2.1.1
wrapt==1.14.0 wrapt==1.14.0
WTForms==3.0.1 WTForms==3.0.1
xmltodict==0.13.0
zipp==3.8.0 zipp==3.8.0

View File

@ -10,6 +10,7 @@ SCONEWS = """
<ul> <ul>
<li>ScoDoc 9.4</li> <li>ScoDoc 9.4</li>
<ul> <ul>
<li>Connexion avec service CAS</li>
<li>Améliorations des tableaux récapitulatifs</li> <li>Améliorations des tableaux récapitulatifs</li>
<li>Nouvelle interface de gestions des groupes (S. Lehmann)</li> <li>Nouvelle interface de gestions des groupes (S. Lehmann)</li>
<li>Enrichissement des jurys BUT et des procès-verbaux associés.</li> <li>Enrichissement des jurys BUT et des procès-verbaux associés.</li>

View File

@ -101,6 +101,7 @@ def make_shell_context():
"res_sem": res_sem, "res_sem": res_sem,
"ResultatsSemestreBUT": ResultatsSemestreBUT, "ResultatsSemestreBUT": ResultatsSemestreBUT,
"Role": Role, "Role": Role,
"ScoDocSiteConfig": models.ScoDocSiteConfig,
"scolar": scolar, "scolar": scolar,
"ScolarAutorisationInscription": ScolarAutorisationInscription, "ScolarAutorisationInscription": ScolarAutorisationInscription,
"ScolarFormSemestreValidation": ScolarFormSemestreValidation, "ScolarFormSemestreValidation": ScolarFormSemestreValidation,