ScoDoc-Lille/app/api/users.py

572 lines
18 KiB
Python

##############################################################################
# ScoDoc
# Copyright (c) 1999 - 2024 Emmanuel Viennet. All rights reserved.
# See LICENSE
##############################################################################
"""
ScoDoc 9 API : accès aux utilisateurs
CATEGORY
--------
Utilisateurs
"""
from flask import g, request
from flask_json import as_json
from flask_login import current_user, login_required
from app import db, log
from app.api import api_bp as bp, api_web_bp, API_CLIENT_ERROR
from app.api import api_permission_required as permission_required
from app.auth.models import User, Role, UserRole
from app.auth.models import is_valid_password
from app.decorators import scodoc
from app.models import Departement
from app.scodoc.sco_exceptions import ScoValueError
from app.scodoc.sco_permissions import Permission
from app.scodoc.sco_utils import json_error
@bp.route("/user/<int:uid>")
@api_web_bp.route("/user/<int:uid>")
@login_required
@scodoc
@permission_required(Permission.UsersView)
@as_json
def user_info(uid: int):
"""
Info sur un compte utilisateur ScoDoc.
SAMPLES
-------
/user/2
"""
user: User = db.session.get(User, uid)
if user is None:
return json_error(404, "user not found")
if g.scodoc_dept:
allowed_depts = current_user.get_depts_with_permission(Permission.UsersView)
if (None not in allowed_depts) and (user.dept not in allowed_depts):
return json_error(404, "user not found")
return user.to_dict()
@bp.route("/users/query")
@api_web_bp.route("/users/query")
@login_required
@scodoc
@permission_required(Permission.ScoView)
@as_json
def users_info_query():
"""Utilisateurs, filtrés par dept, active ou début nom
Exemple:
```
/users/query?departement=dept_acronym&active=1&starts_with=<string:nom>
```
Seuls les utilisateurs "accessibles" (selon les permissions) sont retournés.
Si accès via API web, le département de l'URL est ignoré, seules
les permissions de l'utilisateur sont prises en compte.
QUERY
-----
active: bool
departement: string
starts_with: string
"""
query = User.query
active = request.args.get("active")
if active is not None:
active = bool(str(active))
query = query.filter_by(active=active)
departement = request.args.get("departement")
if departement is not None:
query = query.filter_by(dept=departement or None)
starts_with = request.args.get("starts_with")
if starts_with is not None:
# remove % and _ for security
starts_with = starts_with.translate({ord(c): None for c in "%_"})
query = query.filter(User.nom.ilike(starts_with + "%"))
# Filtre selon permissions:
query = (
query.join(UserRole, (UserRole.dept == User.dept) | (UserRole.dept == None))
.filter(UserRole.user == current_user)
.join(Role, UserRole.role_id == Role.id)
.filter(Role.permissions.op("&")(Permission.UsersView) != 0)
)
query = query.order_by(User.user_name)
return [user.to_dict() for user in query]
def _is_allowed_user_edit(args: dict) -> tuple[bool, str]:
"Vrai si on peut"
if "cas_id" in args and not current_user.has_permission(
Permission.UsersChangeCASId
):
return False, "non autorise a changer cas_id"
if not current_user.is_administrator():
for field in ("cas_allow_login", "cas_allow_scodoc_login"):
if field in args:
return False, f"non autorise a changer {field}"
return True, ""
@bp.route("/user/create", methods=["POST"])
@api_web_bp.route("/user/create", methods=["POST"])
@login_required
@scodoc
@permission_required(Permission.UsersAdmin)
@as_json
def user_create():
"""Création d'un utilisateur
DATA
----
```json
{
"active":bool (default True),
"dept": str or null,
"nom": str,
"prenom": str,
"user_name": str,
...
}
```
"""
args = request.get_json(force=True) # may raise 400 Bad Request
user_name = args.get("user_name")
if not user_name:
return json_error(404, "empty user_name")
user = User.query.filter_by(user_name=user_name).first()
if user:
return json_error(404, f"user_create: user {user} already exists\n")
dept = args.get("dept")
if dept == "@all":
dept = None
allowed_depts = current_user.get_depts_with_permission(Permission.UsersAdmin)
if (None not in allowed_depts) and (dept not in allowed_depts):
return json_error(403, "user_create: departement non autorise")
if (dept is not None) and (
Departement.query.filter_by(acronym=dept).first() is None
):
return json_error(404, "user_create: departement inexistant")
args["dept"] = dept
ok, msg = _is_allowed_user_edit(args)
if not ok:
return json_error(403, f"user_create: {msg}")
user = User(user_name=user_name)
user.from_dict(args, new_user=True)
db.session.add(user)
db.session.commit()
return user.to_dict()
@bp.route("/user/<int:uid>/edit", methods=["POST"])
@api_web_bp.route("/user/<int:uid>/edit", methods=["POST"])
@login_required
@scodoc
@permission_required(Permission.UsersAdmin)
@as_json
def user_edit(uid: int):
"""Modification d'un utilisateur.
Champs modifiables:
```json
{
"dept": str or null,
"nom": str,
"prenom": str,
"active":bool
...
}
```
"""
args = request.get_json(force=True) # may raise 400 Bad Request
user: User = User.query.get_or_404(uid)
# L'utilisateur doit avoir le droit dans le département de départ et celui d'arrivée
orig_dept = user.dept
dest_dept = args.get("dept", False)
if dest_dept is not False:
if dest_dept == "@all":
dest_dept = None
allowed_depts = current_user.get_depts_with_permission(Permission.UsersAdmin)
if (None not in allowed_depts) and (
(orig_dept not in allowed_depts) or (dest_dept not in allowed_depts)
):
return json_error(403, "user_edit: departement non autorise")
if dest_dept != orig_dept:
if (dest_dept is not None) and (
Departement.query.filter_by(acronym=dest_dept).first() is None
):
return json_error(404, "user_edit: departement inexistant")
user.dept = dest_dept
ok, msg = _is_allowed_user_edit(args)
if not ok:
return json_error(403, f"user_edit: {msg}")
user.from_dict(args)
db.session.add(user)
db.session.commit()
return user.to_dict()
@bp.route("/user/<int:uid>/password", methods=["POST"])
@api_web_bp.route("/user/<int:uid>/password", methods=["POST"])
@login_required
@scodoc
@permission_required(Permission.UsersAdmin)
@as_json
def user_password(uid: int):
"""Modification du mot de passe d'un utilisateur.
Si le mot de passe ne convient pas, erreur 400.
DATA
----
```json
{
"password": str
}
```
SAMPLES
-------
/user/3/password;{""password"" : ""rePlaCemeNT456averylongandcomplicated""}
"""
data = request.get_json(force=True) # may raise 400 Bad Request
user: User = User.query.get_or_404(uid)
password = data.get("password")
if not password:
return json_error(404, "user_password: missing password")
if not is_valid_password(password):
return json_error(API_CLIENT_ERROR, "user_password: invalid password")
allowed_depts = current_user.get_depts_with_permission(Permission.UsersAdmin)
if (None not in allowed_depts) and ((user.dept not in allowed_depts)):
return json_error(403, "user_password: departement non autorise")
user.set_password(password)
db.session.add(user)
db.session.commit()
return user.to_dict()
@bp.route("/user/<int:uid>/role/<string:role_name>/add", methods=["POST"])
@api_web_bp.route("/user/<int:uid>/role/<string:role_name>/add", methods=["POST"])
@bp.route(
"/user/<int:uid>/role/<string:role_name>/add/departement/<string:dept>",
methods=["POST"],
)
@api_web_bp.route(
"/user/<int:uid>/role/<string:role_name>/add/departement/<string:dept>",
methods=["POST"],
)
@login_required
@scodoc
@permission_required(Permission.ScoSuperAdmin)
@as_json
def user_role_add(uid: int, role_name: str, dept: str = None):
"""Ajoute un rôle à l'utilisateur dans le département donné."""
user: User = User.query.get_or_404(uid)
role: Role = Role.query.filter_by(name=role_name).first_or_404()
if dept is not None: # check
_ = Departement.query.filter_by(acronym=dept).first_or_404()
allowed_depts = current_user.get_depts_with_permission(Permission.ScoSuperAdmin)
if (None not in allowed_depts) and (dept not in allowed_depts):
return json_error(403, "user_role_add: departement non autorise")
user.add_role(role, dept)
db.session.add(user)
db.session.commit()
return user.to_dict()
@bp.route("/user/<int:uid>/role/<string:role_name>/remove", methods=["POST"])
@api_web_bp.route("/user/<int:uid>/role/<string:role_name>/remove", methods=["POST"])
@bp.route(
"/user/<int:uid>/role/<string:role_name>/remove/departement/<string:dept>",
methods=["POST"],
)
@api_web_bp.route(
"/user/<int:uid>/role/<string:role_name>/remove/departement/<string:dept>",
methods=["POST"],
)
@login_required
@scodoc
@permission_required(Permission.ScoSuperAdmin)
@as_json
def user_role_remove(uid: int, role_name: str, dept: str = None):
"""Retire le rôle (dans le département donné) à cet utilisateur."""
user: User = User.query.get_or_404(uid)
role: Role = Role.query.filter_by(name=role_name).first_or_404()
if dept is not None: # check
_ = Departement.query.filter_by(acronym=dept).first_or_404()
allowed_depts = current_user.get_depts_with_permission(Permission.ScoSuperAdmin)
if (None not in allowed_depts) and (dept not in allowed_depts):
return json_error(403, "user_role_remove: departement non autorise")
query = UserRole.query.filter(UserRole.role == role, UserRole.user == user)
if dept is not None:
query = query.filter(UserRole.dept == dept)
user_role = query.first()
if user_role:
db.session.delete(user_role)
db.session.add(user)
db.session.commit()
return user.to_dict()
@bp.route("/permissions")
@api_web_bp.route("/permissions")
@login_required
@scodoc
@permission_required(Permission.UsersView)
@as_json
def permissions_list():
"""Liste des noms de permissions définies.
SAMPLES
-------
/permissions
"""
return list(Permission.permission_by_name.keys())
@bp.route("/role/<string:role_name>")
@api_web_bp.route("/role/<string:role_name>")
@login_required
@scodoc
@permission_required(Permission.UsersView)
@as_json
def role_get(role_name: str):
"""Un rôle.
SAMPLES
-------
/role/Ens
"""
return Role.query.filter_by(name=role_name).first_or_404().to_dict()
@bp.route("/roles")
@api_web_bp.route("/roles")
@login_required
@scodoc
@permission_required(Permission.UsersView)
@as_json
def roles_list():
"""Tous les rôles définis.
SAMPLES
-------
/roles
"""
return [role.to_dict() for role in Role.query]
@bp.route(
"/role/<string:role_name>/add_permission/<string:perm_name>",
methods=["POST"],
)
@api_web_bp.route(
"/role/<string:role_name>/add_permission/<string:perm_name>",
methods=["POST"],
)
@login_required
@scodoc
@permission_required(Permission.ScoSuperAdmin)
@as_json
def role_permission_add(role_name: str, perm_name: str):
"""Ajoute une permission à un rôle."""
role: Role = Role.query.filter_by(name=role_name).first_or_404()
permission = Permission.get_by_name(perm_name)
if permission is None:
return json_error(404, "role_permission_add: permission inconnue")
role.add_permission(permission)
db.session.add(role)
db.session.commit()
log(f"role_permission_add({role_name}, {perm_name})")
return role.to_dict()
@bp.route(
"/role/<string:role_name>/remove_permission/<string:perm_name>",
methods=["POST"],
)
@api_web_bp.route(
"/role/<string:role_name>/remove_permission/<string:perm_name>",
methods=["POST"],
)
@login_required
@scodoc
@permission_required(Permission.ScoSuperAdmin)
@as_json
def role_permission_remove(role_name: str, perm_name: str):
"""Retire une permission d'un rôle."""
role: Role = Role.query.filter_by(name=role_name).first_or_404()
permission = Permission.get_by_name(perm_name)
if permission is None:
return json_error(404, "role_permission_remove: permission inconnue")
role.remove_permission(permission)
db.session.add(role)
db.session.commit()
log(f"role_permission_remove({role_name}, {perm_name})")
return role.to_dict()
@bp.route("/role/create/<string:role_name>", methods=["POST"])
@api_web_bp.route("/role/create/<string:role_name>", methods=["POST"])
@login_required
@scodoc
@permission_required(Permission.ScoSuperAdmin)
@as_json
def role_create(role_name: str):
"""Création d'un nouveau rôle avec les permissions données.
DATA
----
```json
{
"permissions" : [ 'ScoView', ... ]
}
```
SAMPLES
-------
/role/create/customRole;{""permissions"": [""ScoView"", ""UsersView""]}
"""
role: Role = Role.query.filter_by(name=role_name).first()
if role:
return json_error(404, "role_create: role already exists")
role = Role(name=role_name)
data = request.get_json(force=True) # may raise 400 Bad Request
permissions = data.get("permissions")
if permissions:
try:
role.set_named_permissions(permissions)
except ScoValueError:
return json_error(404, "role_create: invalid permissions")
db.session.add(role)
db.session.commit()
return role.to_dict()
@bp.route("/role/<string:role_name>/edit", methods=["POST"])
@api_web_bp.route("/role/<string:role_name>/edit", methods=["POST"])
@login_required
@scodoc
@permission_required(Permission.ScoSuperAdmin)
@as_json
def role_edit(role_name: str):
"""Édition d'un rôle. On peut spécifier un nom et/ou des permissions.
DATA
----
```json
{
"name" : name
"permissions" : [ 'ScoView', ... ]
}
```
"""
role: Role = Role.query.filter_by(name=role_name).first_or_404()
data = request.get_json(force=True) # may raise 400 Bad Request
permissions = data.get("permissions", False)
if permissions is not False:
try:
role.set_named_permissions(permissions)
except ScoValueError:
return json_error(404, "role_create: invalid permissions")
role_name = data.get("role_name")
if role_name and role_name != role.name:
existing_role: Role = Role.query.filter_by(name=role_name).first()
if existing_role:
return json_error(404, "role_edit: role name already exists")
role.name = role_name
db.session.add(role)
db.session.commit()
return role.to_dict()
@bp.route("/role/<string:role_name>/delete", methods=["POST"])
@api_web_bp.route("/role/<string:role_name>/delete", methods=["POST"])
@login_required
@scodoc
@permission_required(Permission.ScoSuperAdmin)
@as_json
def role_delete(role_name: str):
"""Suppression d'un rôle.
SAMPLES
-------
/role/customRole/delete
"""
role: Role = Role.query.filter_by(name=role_name).first_or_404()
db.session.delete(role)
db.session.commit()
return {"OK": True}
# @bp.route("/user/<int:uid>/edt")
# @api_web_bp.route("/user/<int:uid>/edt")
# @login_required
# @scodoc
# @permission_required(Permission.ScoView)
# @as_json
# def user_edt(uid: int):
# """L'emploi du temps de l'utilisateur.
# Si ok, une liste d'évènements. Sinon, une chaine indiquant un message d'erreur.
# show_modules_titles affiche le titre complet du module (défaut), sinon juste le code.
# Il faut la permission ScoView + (UsersView ou bien être connecté comme l'utilisateur demandé)
# """
# if g.scodoc_dept is None: # route API non départementale
# if not current_user.has_permission(Permission.UsersView):
# return scu.json_error(403, "accès non autorisé")
# user: User = db.session.get(User, uid)
# if user is None:
# return json_error(404, "user not found")
# # Check permission
# if current_user.id != user.id:
# if g.scodoc_dept:
# allowed_depts = current_user.get_depts_with_permission(Permission.UsersView)
# if (None not in allowed_depts) and (user.dept not in allowed_depts):
# return json_error(404, "user not found")
# show_modules_titles = scu.to_bool(request.args.get("show_modules_titles", False))
# # Cherche ics
# if not user.edt_id:
# return json_error(404, "user not configured")
# ics_filename = sco_edt_cal.get_ics_user_edt_filename(user.edt_id)
# if not ics_filename:
# return json_error(404, "no calendar for this user")
# _, calendar = sco_edt_cal.load_calendar(ics_filename)
# # TODO:
# # - Construire mapping edt2modimpl: edt_id -> modimpl
# # pour cela, considérer tous les formsemestres de la période de l'edt
# # (soit on considère l'année scolaire du 1er event, ou celle courante,
# # soit on cherche min, max des dates des events)
# # - Modifier décodage des groupes dans convert_ics pour avoi run mapping
# # de groupe par semestre (retrouvé grâce au modimpl associé à l'event)
# raise NotImplementedError() # TODO XXX WIP
# events_scodoc, _ = sco_edt_cal.convert_ics(
# calendar,
# edt2group=edt2group,
# default_group=default_group,
# edt2modimpl=edt2modimpl,
# )
# edt_dict = sco_edt_cal.translate_calendar(
# events_scodoc, group_ids, show_modules_titles=show_modules_titles
# )
# return edt_dict