############################################################################## # ScoDoc # Copyright (c) 1999 - 2023 Emmanuel Viennet. All rights reserved. # See LICENSE ############################################################################## """ ScoDoc 9 API : accès aux utilisateurs """ from flask import g, request from flask_json import as_json from flask_login import current_user, login_required from app import db from app.api import api_bp as bp, api_web_bp, API_CLIENT_ERROR from app.scodoc.sco_utils import json_error from app.auth.models import User, Role, UserRole from app.auth.models import is_valid_password from app.decorators import scodoc, permission_required from app.models import Departement from app.scodoc.sco_exceptions import ScoValueError from app.scodoc.sco_permissions import Permission from app.scodoc import sco_utils as scu @bp.route("/user/<int:uid>") @api_web_bp.route("/user/<int:uid>") @login_required @scodoc @permission_required(Permission.ScoUsersView) @as_json def user_info(uid: int): """ Info sur un compte utilisateur scodoc """ user: User = db.session.get(User, uid) if user is None: return json_error(404, "user not found") if g.scodoc_dept: allowed_depts = current_user.get_depts_with_permission(Permission.ScoUsersView) if (None not in allowed_depts) and (user.dept not in allowed_depts): return json_error(404, "user not found") return user.to_dict() @bp.route("/users/query") @api_web_bp.route("/users/query") @login_required @scodoc @permission_required(Permission.ScoView) @as_json def users_info_query(): """Utilisateurs, filtrés par dept, active ou début nom /users/query?departement=dept_acronym&active=1&starts_with=<string:nom> Seuls les utilisateurs "accessibles" (selon les permissions) sont retournés. Si accès via API web, le département de l'URL est ignoré, seules les permissions de l'utilisateur sont prises en compte. """ query = User.query active = request.args.get("active") if active is not None: active = bool(str(active)) query = query.filter_by(active=active) departement = request.args.get("departement") if departement is not None: query = query.filter_by(dept=departement or None) starts_with = request.args.get("starts_with") if starts_with is not None: # remove % and _ for security starts_with = starts_with.translate({ord(c): None for c in "%_"}) query = query.filter(User.nom.ilike(starts_with + "%")) # Filtre selon permissions: query = ( query.join(UserRole, (UserRole.dept == User.dept) | (UserRole.dept == None)) .filter(UserRole.user == current_user) .join(Role, UserRole.role_id == Role.id) .filter(Role.permissions.op("&")(Permission.ScoUsersView) != 0) ) query = query.order_by(User.user_name) return [user.to_dict() for user in query] @bp.route("/user/create", methods=["POST"]) @api_web_bp.route("/user/create", methods=["POST"]) @login_required @scodoc @permission_required(Permission.ScoUsersAdmin) @as_json def user_create(): """Création d'un utilisateur The request content type should be "application/json": { "user_name": str, "dept": str or null, "nom": str, "prenom": str, "active":bool (default True) } """ data = request.get_json(force=True) # may raise 400 Bad Request user_name = data.get("user_name") if not user_name: return json_error(404, "empty user_name") user = User.query.filter_by(user_name=user_name).first() if user: return json_error(404, f"user_create: user {user} already exists\n") dept = data.get("dept") if dept == "@all": dept = None allowed_depts = current_user.get_depts_with_permission(Permission.ScoUsersAdmin) if (None not in allowed_depts) and (dept not in allowed_depts): return json_error(403, "user_create: departement non autorise") if (dept is not None) and ( Departement.query.filter_by(acronym=dept).first() is None ): return json_error(404, "user_create: departement inexistant") nom = data.get("nom") prenom = data.get("prenom") active = scu.to_bool(data.get("active", True)) user = User(user_name=user_name, active=active, dept=dept, nom=nom, prenom=prenom) db.session.add(user) db.session.commit() return user.to_dict() @bp.route("/user/<int:uid>/edit", methods=["POST"]) @api_web_bp.route("/user/<int:uid>/edit", methods=["POST"]) @login_required @scodoc @permission_required(Permission.ScoUsersAdmin) @as_json def user_edit(uid: int): """Modification d'un utilisateur Champs modifiables: { "dept": str or null, "nom": str, "prenom": str, "active":bool } """ data = request.get_json(force=True) # may raise 400 Bad Request user: User = User.query.get_or_404(uid) # L'utilisateur doit avoir le droit dans le département de départ et celui d'arrivée orig_dept = user.dept dest_dept = data.get("dept", False) if dest_dept is not False: if dest_dept == "@all": dest_dept = None allowed_depts = current_user.get_depts_with_permission(Permission.ScoUsersAdmin) if (None not in allowed_depts) and ( (orig_dept not in allowed_depts) or (dest_dept not in allowed_depts) ): return json_error(403, "user_edit: departement non autorise") if dest_dept != orig_dept: if (dest_dept is not None) and ( Departement.query.filter_by(acronym=dest_dept).first() is None ): return json_error(404, "user_edit: departement inexistant") user.dept = dest_dept user.nom = data.get("nom", user.nom) user.prenom = data.get("prenom", user.prenom) user.active = scu.to_bool(data.get("active", user.active)) db.session.add(user) db.session.commit() return user.to_dict() @bp.route("/user/<int:uid>/password", methods=["POST"]) @api_web_bp.route("/user/<int:uid>/password", methods=["POST"]) @login_required @scodoc @permission_required(Permission.ScoUsersAdmin) @as_json def user_password(uid: int): """Modification du mot de passe d'un utilisateur Champs modifiables: { "password": str } Si le mot de passe ne convient pas, erreur 400. """ data = request.get_json(force=True) # may raise 400 Bad Request user: User = User.query.get_or_404(uid) password = data.get("password") if not password: return json_error(404, "user_password: missing password") if not is_valid_password(password): return json_error(API_CLIENT_ERROR, "user_password: invalid password") allowed_depts = current_user.get_depts_with_permission(Permission.ScoUsersAdmin) if (None not in allowed_depts) and ((user.dept not in allowed_depts)): return json_error(403, "user_password: departement non autorise") user.set_password(password) db.session.add(user) db.session.commit() return user.to_dict() @bp.route("/user/<int:uid>/role/<string:role_name>/add", methods=["POST"]) @api_web_bp.route("/user/<int:uid>/role/<string:role_name>/add", methods=["POST"]) @bp.route( "/user/<int:uid>/role/<string:role_name>/add/departement/<string:dept>", methods=["POST"], ) @api_web_bp.route( "/user/<int:uid>/role/<string:role_name>/add/departement/<string:dept>", methods=["POST"], ) @login_required @scodoc @permission_required(Permission.ScoSuperAdmin) @as_json def user_role_add(uid: int, role_name: str, dept: str = None): """Add a role in the given dept to the user""" user: User = User.query.get_or_404(uid) role: Role = Role.query.filter_by(name=role_name).first_or_404() if dept is not None: # check _ = Departement.query.filter_by(acronym=dept).first_or_404() allowed_depts = current_user.get_depts_with_permission(Permission.ScoSuperAdmin) if (None not in allowed_depts) and (dept not in allowed_depts): return json_error(403, "user_role_add: departement non autorise") user.add_role(role, dept) db.session.add(user) db.session.commit() return user.to_dict() @bp.route("/user/<int:uid>/role/<string:role_name>/remove", methods=["POST"]) @api_web_bp.route("/user/<int:uid>/role/<string:role_name>/remove", methods=["POST"]) @bp.route( "/user/<int:uid>/role/<string:role_name>/remove/departement/<string:dept>", methods=["POST"], ) @api_web_bp.route( "/user/<int:uid>/role/<string:role_name>/remove/departement/<string:dept>", methods=["POST"], ) @login_required @scodoc @permission_required(Permission.ScoSuperAdmin) @as_json def user_role_remove(uid: int, role_name: str, dept: str = None): """Remove the role (in the given dept) from the user""" user: User = User.query.get_or_404(uid) role: Role = Role.query.filter_by(name=role_name).first_or_404() if dept is not None: # check _ = Departement.query.filter_by(acronym=dept).first_or_404() allowed_depts = current_user.get_depts_with_permission(Permission.ScoSuperAdmin) if (None not in allowed_depts) and (dept not in allowed_depts): return json_error(403, "user_role_remove: departement non autorise") query = UserRole.query.filter(UserRole.role == role, UserRole.user == user) if dept is not None: query = query.filter(UserRole.dept == dept) user_role = query.first() if user_role: db.session.delete(user_role) db.session.add(user) db.session.commit() return user.to_dict() @bp.route("/permissions") @api_web_bp.route("/permissions") @login_required @scodoc @permission_required(Permission.ScoUsersView) @as_json def list_permissions(): """Liste des noms de permissions définies""" return list(Permission.permission_by_name.keys()) @bp.route("/role/<string:role_name>") @api_web_bp.route("/role/<string:role_name>") @login_required @scodoc @permission_required(Permission.ScoUsersView) @as_json def list_role(role_name: str): """Un rôle""" return Role.query.filter_by(name=role_name).first_or_404().to_dict() @bp.route("/roles") @api_web_bp.route("/roles") @login_required @scodoc @permission_required(Permission.ScoUsersView) @as_json def list_roles(): """Tous les rôles définis""" return [role.to_dict() for role in Role.query] @bp.route( "/role/<string:role_name>/add_permission/<string:perm_name>", methods=["POST"], ) @api_web_bp.route( "/role/<string:role_name>/add_permission/<string:perm_name>", methods=["POST"], ) @login_required @scodoc @permission_required(Permission.ScoSuperAdmin) @as_json def role_permission_add(role_name: str, perm_name: str): """Add permission to role""" role: Role = Role.query.filter_by(name=role_name).first_or_404() permission = Permission.get_by_name(perm_name) if permission is None: return json_error(404, "role_permission_add: permission inconnue") role.add_permission(permission) db.session.add(role) db.session.commit() return role.to_dict() @bp.route( "/role/<string:role_name>/remove_permission/<string:perm_name>", methods=["POST"], ) @api_web_bp.route( "/role/<string:role_name>/remove_permission/<string:perm_name>", methods=["POST"], ) @login_required @scodoc @permission_required(Permission.ScoSuperAdmin) @as_json def role_permission_remove(role_name: str, perm_name: str): """Remove permission from role""" role: Role = Role.query.filter_by(name=role_name).first_or_404() permission = Permission.get_by_name(perm_name) if permission is None: return json_error(404, "role_permission_remove: permission inconnue") role.remove_permission(permission) db.session.add(role) db.session.commit() return role.to_dict() @bp.route("/role/create/<string:role_name>", methods=["POST"]) @api_web_bp.route("/role/create/<string:role_name>", methods=["POST"]) @login_required @scodoc @permission_required(Permission.ScoSuperAdmin) @as_json def role_create(role_name: str): """Create a new role with permissions. { "permissions" : [ 'ScoView', ... ] } """ role: Role = Role.query.filter_by(name=role_name).first() if role: return json_error(404, "role_create: role already exists") role = Role(name=role_name) data = request.get_json(force=True) # may raise 400 Bad Request permissions = data.get("permissions") if permissions: try: role.set_named_permissions(permissions) except ScoValueError: return json_error(404, "role_create: invalid permissions") db.session.add(role) db.session.commit() return role.to_dict() @bp.route("/role/<string:role_name>/edit", methods=["POST"]) @api_web_bp.route("/role/<string:role_name>/edit", methods=["POST"]) @login_required @scodoc @permission_required(Permission.ScoSuperAdmin) @as_json def role_edit(role_name: str): """Edit a role. On peut spécifier un nom et/ou des permissions. { "name" : name "permissions" : [ 'ScoView', ... ] } """ role: Role = Role.query.filter_by(name=role_name).first_or_404() data = request.get_json(force=True) # may raise 400 Bad Request permissions = data.get("permissions", False) if permissions is not False: try: role.set_named_permissions(permissions) except ScoValueError: return json_error(404, "role_create: invalid permissions") role_name = data.get("role_name") if role_name and role_name != role.name: existing_role: Role = Role.query.filter_by(name=role_name).first() if existing_role: return json_error(404, "role_edit: role name already exists") role.name = role_name db.session.add(role) db.session.commit() return role.to_dict() @bp.route("/role/<string:role_name>/delete", methods=["POST"]) @api_web_bp.route("/role/<string:role_name>/delete", methods=["POST"]) @login_required @scodoc @permission_required(Permission.ScoSuperAdmin) @as_json def role_delete(role_name: str): """Delete a role""" role: Role = Role.query.filter_by(name=role_name).first_or_404() db.session.delete(role) db.session.commit() return {"OK": True}