############################################################################## # ScoDoc # Copyright (c) 1999 - 2024 Emmanuel Viennet. All rights reserved. # See LICENSE ############################################################################## """ ScoDoc 9 API : accès aux utilisateurs """ from flask import g, request from flask_json import as_json from flask_login import current_user, login_required from app import db, log from app.api import api_bp as bp, api_web_bp, API_CLIENT_ERROR from app.api import api_permission_required as permission_required from app.auth.models import User, Role, UserRole from app.auth.models import is_valid_password from app.decorators import scodoc from app.models import Departement from app.scodoc.sco_exceptions import ScoValueError from app.scodoc.sco_permissions import Permission from app.scodoc.sco_utils import json_error @bp.route("/user/") @api_web_bp.route("/user/") @login_required @scodoc @permission_required(Permission.UsersView) @as_json def user_info(uid: int): """ Info sur un compte utilisateur scodoc """ user: User = db.session.get(User, uid) if user is None: return json_error(404, "user not found") if g.scodoc_dept: allowed_depts = current_user.get_depts_with_permission(Permission.UsersView) if (None not in allowed_depts) and (user.dept not in allowed_depts): return json_error(404, "user not found") return user.to_dict() @bp.route("/users/query") @api_web_bp.route("/users/query") @login_required @scodoc @permission_required(Permission.ScoView) @as_json def users_info_query(): """Utilisateurs, filtrés par dept, active ou début nom /users/query?departement=dept_acronym&active=1&starts_with= Seuls les utilisateurs "accessibles" (selon les permissions) sont retournés. Si accès via API web, le département de l'URL est ignoré, seules les permissions de l'utilisateur sont prises en compte. QUERY ----- active: departement: starts_with: """ query = User.query active = request.args.get("active") if active is not None: active = bool(str(active)) query = query.filter_by(active=active) departement = request.args.get("departement") if departement is not None: query = query.filter_by(dept=departement or None) starts_with = request.args.get("starts_with") if starts_with is not None: # remove % and _ for security starts_with = starts_with.translate({ord(c): None for c in "%_"}) query = query.filter(User.nom.ilike(starts_with + "%")) # Filtre selon permissions: query = ( query.join(UserRole, (UserRole.dept == User.dept) | (UserRole.dept == None)) .filter(UserRole.user == current_user) .join(Role, UserRole.role_id == Role.id) .filter(Role.permissions.op("&")(Permission.UsersView) != 0) ) query = query.order_by(User.user_name) return [user.to_dict() for user in query] def _is_allowed_user_edit(args: dict) -> tuple[bool, str]: "Vrai si on peut" if "cas_id" in args and not current_user.has_permission( Permission.UsersChangeCASId ): return False, "non autorise a changer cas_id" if not current_user.is_administrator(): for field in ("cas_allow_login", "cas_allow_scodoc_login"): if field in args: return False, f"non autorise a changer {field}" return True, "" @bp.route("/user/create", methods=["POST"]) @api_web_bp.route("/user/create", methods=["POST"]) @login_required @scodoc @permission_required(Permission.UsersAdmin) @as_json def user_create(): """Création d'un utilisateur The request content type should be "application/json": { "active":bool (default True), "dept": str or null, "nom": str, "prenom": str, "user_name": str, ... } """ args = request.get_json(force=True) # may raise 400 Bad Request user_name = args.get("user_name") if not user_name: return json_error(404, "empty user_name") user = User.query.filter_by(user_name=user_name).first() if user: return json_error(404, f"user_create: user {user} already exists\n") dept = args.get("dept") if dept == "@all": dept = None allowed_depts = current_user.get_depts_with_permission(Permission.UsersAdmin) if (None not in allowed_depts) and (dept not in allowed_depts): return json_error(403, "user_create: departement non autorise") if (dept is not None) and ( Departement.query.filter_by(acronym=dept).first() is None ): return json_error(404, "user_create: departement inexistant") args["dept"] = dept ok, msg = _is_allowed_user_edit(args) if not ok: return json_error(403, f"user_create: {msg}") user = User(user_name=user_name) user.from_dict(args, new_user=True) db.session.add(user) db.session.commit() return user.to_dict() @bp.route("/user//edit", methods=["POST"]) @api_web_bp.route("/user//edit", methods=["POST"]) @login_required @scodoc @permission_required(Permission.UsersAdmin) @as_json def user_edit(uid: int): """Modification d'un utilisateur Champs modifiables: { "dept": str or null, "nom": str, "prenom": str, "active":bool ... } """ args = request.get_json(force=True) # may raise 400 Bad Request user: User = User.query.get_or_404(uid) # L'utilisateur doit avoir le droit dans le département de départ et celui d'arrivée orig_dept = user.dept dest_dept = args.get("dept", False) if dest_dept is not False: if dest_dept == "@all": dest_dept = None allowed_depts = current_user.get_depts_with_permission(Permission.UsersAdmin) if (None not in allowed_depts) and ( (orig_dept not in allowed_depts) or (dest_dept not in allowed_depts) ): return json_error(403, "user_edit: departement non autorise") if dest_dept != orig_dept: if (dest_dept is not None) and ( Departement.query.filter_by(acronym=dest_dept).first() is None ): return json_error(404, "user_edit: departement inexistant") user.dept = dest_dept ok, msg = _is_allowed_user_edit(args) if not ok: return json_error(403, f"user_edit: {msg}") user.from_dict(args) db.session.add(user) db.session.commit() return user.to_dict() @bp.route("/user//password", methods=["POST"]) @api_web_bp.route("/user//password", methods=["POST"]) @login_required @scodoc @permission_required(Permission.UsersAdmin) @as_json def user_password(uid: int): """Modification du mot de passe d'un utilisateur Champs modifiables: { "password": str } Si le mot de passe ne convient pas, erreur 400. """ data = request.get_json(force=True) # may raise 400 Bad Request user: User = User.query.get_or_404(uid) password = data.get("password") if not password: return json_error(404, "user_password: missing password") if not is_valid_password(password): return json_error(API_CLIENT_ERROR, "user_password: invalid password") allowed_depts = current_user.get_depts_with_permission(Permission.UsersAdmin) if (None not in allowed_depts) and ((user.dept not in allowed_depts)): return json_error(403, "user_password: departement non autorise") user.set_password(password) db.session.add(user) db.session.commit() return user.to_dict() @bp.route("/user//role//add", methods=["POST"]) @api_web_bp.route("/user//role//add", methods=["POST"]) @bp.route( "/user//role//add/departement/", methods=["POST"], ) @api_web_bp.route( "/user//role//add/departement/", methods=["POST"], ) @login_required @scodoc @permission_required(Permission.ScoSuperAdmin) @as_json def user_role_add(uid: int, role_name: str, dept: str = None): """Add a role in the given dept to the user""" user: User = User.query.get_or_404(uid) role: Role = Role.query.filter_by(name=role_name).first_or_404() if dept is not None: # check _ = Departement.query.filter_by(acronym=dept).first_or_404() allowed_depts = current_user.get_depts_with_permission(Permission.ScoSuperAdmin) if (None not in allowed_depts) and (dept not in allowed_depts): return json_error(403, "user_role_add: departement non autorise") user.add_role(role, dept) db.session.add(user) db.session.commit() return user.to_dict() @bp.route("/user//role//remove", methods=["POST"]) @api_web_bp.route("/user//role//remove", methods=["POST"]) @bp.route( "/user//role//remove/departement/", methods=["POST"], ) @api_web_bp.route( "/user//role//remove/departement/", methods=["POST"], ) @login_required @scodoc @permission_required(Permission.ScoSuperAdmin) @as_json def user_role_remove(uid: int, role_name: str, dept: str = None): """Remove the role (in the given dept) from the user""" user: User = User.query.get_or_404(uid) role: Role = Role.query.filter_by(name=role_name).first_or_404() if dept is not None: # check _ = Departement.query.filter_by(acronym=dept).first_or_404() allowed_depts = current_user.get_depts_with_permission(Permission.ScoSuperAdmin) if (None not in allowed_depts) and (dept not in allowed_depts): return json_error(403, "user_role_remove: departement non autorise") query = UserRole.query.filter(UserRole.role == role, UserRole.user == user) if dept is not None: query = query.filter(UserRole.dept == dept) user_role = query.first() if user_role: db.session.delete(user_role) db.session.add(user) db.session.commit() return user.to_dict() @bp.route("/permissions") @api_web_bp.route("/permissions") @login_required @scodoc @permission_required(Permission.UsersView) @as_json def permissions_list(): """Liste des noms de permissions définies""" return list(Permission.permission_by_name.keys()) @bp.route("/role/") @api_web_bp.route("/role/") @login_required @scodoc @permission_required(Permission.UsersView) @as_json def role_get(role_name: str): """Un rôle""" return Role.query.filter_by(name=role_name).first_or_404().to_dict() @bp.route("/roles") @api_web_bp.route("/roles") @login_required @scodoc @permission_required(Permission.UsersView) @as_json def roles_list(): """Tous les rôles définis""" return [role.to_dict() for role in Role.query] @bp.route( "/role//add_permission/", methods=["POST"], ) @api_web_bp.route( "/role//add_permission/", methods=["POST"], ) @login_required @scodoc @permission_required(Permission.ScoSuperAdmin) @as_json def role_permission_add(role_name: str, perm_name: str): """Add permission to role""" role: Role = Role.query.filter_by(name=role_name).first_or_404() permission = Permission.get_by_name(perm_name) if permission is None: return json_error(404, "role_permission_add: permission inconnue") role.add_permission(permission) db.session.add(role) db.session.commit() log(f"role_permission_add({role_name}, {perm_name})") return role.to_dict() @bp.route( "/role//remove_permission/", methods=["POST"], ) @api_web_bp.route( "/role//remove_permission/", methods=["POST"], ) @login_required @scodoc @permission_required(Permission.ScoSuperAdmin) @as_json def role_permission_remove(role_name: str, perm_name: str): """Remove permission from role""" role: Role = Role.query.filter_by(name=role_name).first_or_404() permission = Permission.get_by_name(perm_name) if permission is None: return json_error(404, "role_permission_remove: permission inconnue") role.remove_permission(permission) db.session.add(role) db.session.commit() log(f"role_permission_remove({role_name}, {perm_name})") return role.to_dict() @bp.route("/role/create/", methods=["POST"]) @api_web_bp.route("/role/create/", methods=["POST"]) @login_required @scodoc @permission_required(Permission.ScoSuperAdmin) @as_json def role_create(role_name: str): """Create a new role with permissions. { "permissions" : [ 'ScoView', ... ] } """ role: Role = Role.query.filter_by(name=role_name).first() if role: return json_error(404, "role_create: role already exists") role = Role(name=role_name) data = request.get_json(force=True) # may raise 400 Bad Request permissions = data.get("permissions") if permissions: try: role.set_named_permissions(permissions) except ScoValueError: return json_error(404, "role_create: invalid permissions") db.session.add(role) db.session.commit() return role.to_dict() @bp.route("/role//edit", methods=["POST"]) @api_web_bp.route("/role//edit", methods=["POST"]) @login_required @scodoc @permission_required(Permission.ScoSuperAdmin) @as_json def role_edit(role_name: str): """Edit a role. On peut spécifier un nom et/ou des permissions. { "name" : name "permissions" : [ 'ScoView', ... ] } """ role: Role = Role.query.filter_by(name=role_name).first_or_404() data = request.get_json(force=True) # may raise 400 Bad Request permissions = data.get("permissions", False) if permissions is not False: try: role.set_named_permissions(permissions) except ScoValueError: return json_error(404, "role_create: invalid permissions") role_name = data.get("role_name") if role_name and role_name != role.name: existing_role: Role = Role.query.filter_by(name=role_name).first() if existing_role: return json_error(404, "role_edit: role name already exists") role.name = role_name db.session.add(role) db.session.commit() return role.to_dict() @bp.route("/role//delete", methods=["POST"]) @api_web_bp.route("/role//delete", methods=["POST"]) @login_required @scodoc @permission_required(Permission.ScoSuperAdmin) @as_json def role_delete(role_name: str): """Delete a role""" role: Role = Role.query.filter_by(name=role_name).first_or_404() db.session.delete(role) db.session.commit() return {"OK": True} # @bp.route("/user//edt") # @api_web_bp.route("/user//edt") # @login_required # @scodoc # @permission_required(Permission.ScoView) # @as_json # def user_edt(uid: int): # """L'emploi du temps de l'utilisateur. # Si ok, une liste d'évènements. Sinon, une chaine indiquant un message d'erreur. # show_modules_titles affiche le titre complet du module (défaut), sinon juste le code. # Il faut la permission ScoView + (UsersView ou bien être connecté comme l'utilisateur demandé) # """ # if g.scodoc_dept is None: # route API non départementale # if not current_user.has_permission(Permission.UsersView): # return scu.json_error(403, "accès non autorisé") # user: User = db.session.get(User, uid) # if user is None: # return json_error(404, "user not found") # # Check permission # if current_user.id != user.id: # if g.scodoc_dept: # allowed_depts = current_user.get_depts_with_permission(Permission.UsersView) # if (None not in allowed_depts) and (user.dept not in allowed_depts): # return json_error(404, "user not found") # show_modules_titles = scu.to_bool(request.args.get("show_modules_titles", False)) # # Cherche ics # if not user.edt_id: # return json_error(404, "user not configured") # ics_filename = sco_edt_cal.get_ics_user_edt_filename(user.edt_id) # if not ics_filename: # return json_error(404, "no calendar for this user") # _, calendar = sco_edt_cal.load_calendar(ics_filename) # # TODO: # # - Construire mapping edt2modimpl: edt_id -> modimpl # # pour cela, considérer tous les formsemestres de la période de l'edt # # (soit on considère l'année scolaire du 1er event, ou celle courante, # # soit on cherche min, max des dates des events) # # - Modifier décodage des groupes dans convert_ics pour avoi run mapping # # de groupe par semestre (retrouvé grâce au modimpl associé à l'event) # raise NotImplementedError() # TODO XXX WIP # events_scodoc, _ = sco_edt_cal.convert_ics( # calendar, # edt2group=edt2group, # default_group=default_group, # edt2modimpl=edt2modimpl, # ) # edt_dict = sco_edt_cal.translate_calendar( # events_scodoc, group_ids, show_modules_titles=show_modules_titles # ) # return edt_dict