444 lines
15 KiB
Python
444 lines
15 KiB
Python
##############################################################################
|
|
# ScoDoc
|
|
# Copyright (c) 1999 - 2023 Emmanuel Viennet. All rights reserved.
|
|
# See LICENSE
|
|
##############################################################################
|
|
|
|
"""
|
|
ScoDoc 9 API : accès aux utilisateurs
|
|
"""
|
|
import datetime
|
|
|
|
from flask import g, request
|
|
from flask_json import as_json
|
|
from flask_login import current_user, login_required
|
|
|
|
from app import db, log
|
|
from app.api import api_bp as bp, api_web_bp, API_CLIENT_ERROR
|
|
from app.scodoc.sco_utils import json_error
|
|
from app.auth.models import User, Role, UserRole
|
|
from app.auth.models import is_valid_password
|
|
from app.decorators import scodoc, permission_required
|
|
from app.models import Departement
|
|
from app.scodoc.sco_exceptions import ScoValueError
|
|
from app.scodoc.sco_permissions import Permission
|
|
from app.scodoc import sco_utils as scu
|
|
|
|
|
|
@bp.route("/user/<int:uid>")
|
|
@api_web_bp.route("/user/<int:uid>")
|
|
@login_required
|
|
@scodoc
|
|
@permission_required(Permission.UsersView)
|
|
@as_json
|
|
def user_info(uid: int):
|
|
"""
|
|
Info sur un compte utilisateur scodoc
|
|
"""
|
|
user: User = db.session.get(User, uid)
|
|
if user is None:
|
|
return json_error(404, "user not found")
|
|
if g.scodoc_dept:
|
|
allowed_depts = current_user.get_depts_with_permission(Permission.UsersView)
|
|
if (None not in allowed_depts) and (user.dept not in allowed_depts):
|
|
return json_error(404, "user not found")
|
|
|
|
return user.to_dict()
|
|
|
|
|
|
@bp.route("/users/query")
|
|
@api_web_bp.route("/users/query")
|
|
@login_required
|
|
@scodoc
|
|
@permission_required(Permission.ScoView)
|
|
@as_json
|
|
def users_info_query():
|
|
"""Utilisateurs, filtrés par dept, active ou début nom
|
|
/users/query?departement=dept_acronym&active=1&starts_with=<string:nom>
|
|
|
|
Seuls les utilisateurs "accessibles" (selon les permissions) sont retournés.
|
|
Si accès via API web, le département de l'URL est ignoré, seules
|
|
les permissions de l'utilisateur sont prises en compte.
|
|
"""
|
|
query = User.query
|
|
active = request.args.get("active")
|
|
if active is not None:
|
|
active = bool(str(active))
|
|
query = query.filter_by(active=active)
|
|
departement = request.args.get("departement")
|
|
if departement is not None:
|
|
query = query.filter_by(dept=departement or None)
|
|
starts_with = request.args.get("starts_with")
|
|
if starts_with is not None:
|
|
# remove % and _ for security
|
|
starts_with = starts_with.translate({ord(c): None for c in "%_"})
|
|
query = query.filter(User.nom.ilike(starts_with + "%"))
|
|
# Filtre selon permissions:
|
|
query = (
|
|
query.join(UserRole, (UserRole.dept == User.dept) | (UserRole.dept == None))
|
|
.filter(UserRole.user == current_user)
|
|
.join(Role, UserRole.role_id == Role.id)
|
|
.filter(Role.permissions.op("&")(Permission.UsersView) != 0)
|
|
)
|
|
|
|
query = query.order_by(User.user_name)
|
|
return [user.to_dict() for user in query]
|
|
|
|
|
|
def _is_allowed_user_edit(args: dict) -> tuple[bool, str]:
|
|
"Vrai si on peut"
|
|
if "cas_id" in args and not current_user.has_permission(
|
|
Permission.UsersChangeCASId
|
|
):
|
|
return False, "non autorise a changer cas_id"
|
|
|
|
if not current_user.is_administrator():
|
|
for field in ("cas_allow_login", "cas_allow_scodoc_login"):
|
|
if field in args:
|
|
return False, f"non autorise a changer {field}"
|
|
return True, ""
|
|
|
|
|
|
@bp.route("/user/create", methods=["POST"])
|
|
@api_web_bp.route("/user/create", methods=["POST"])
|
|
@login_required
|
|
@scodoc
|
|
@permission_required(Permission.UsersAdmin)
|
|
@as_json
|
|
def user_create():
|
|
"""Création d'un utilisateur
|
|
The request content type should be "application/json":
|
|
{
|
|
"active":bool (default True),
|
|
"dept": str or null,
|
|
"nom": str,
|
|
"prenom": str,
|
|
"user_name": str,
|
|
...
|
|
}
|
|
"""
|
|
args = request.get_json(force=True) # may raise 400 Bad Request
|
|
user_name = args.get("user_name")
|
|
if not user_name:
|
|
return json_error(404, "empty user_name")
|
|
user = User.query.filter_by(user_name=user_name).first()
|
|
if user:
|
|
return json_error(404, f"user_create: user {user} already exists\n")
|
|
dept = args.get("dept")
|
|
if dept == "@all":
|
|
dept = None
|
|
allowed_depts = current_user.get_depts_with_permission(Permission.UsersAdmin)
|
|
if (None not in allowed_depts) and (dept not in allowed_depts):
|
|
return json_error(403, "user_create: departement non autorise")
|
|
if (dept is not None) and (
|
|
Departement.query.filter_by(acronym=dept).first() is None
|
|
):
|
|
return json_error(404, "user_create: departement inexistant")
|
|
args["dept"] = dept
|
|
ok, msg = _is_allowed_user_edit(args)
|
|
if not ok:
|
|
return json_error(403, f"user_create: {msg}")
|
|
user = User(user_name=user_name)
|
|
user.from_dict(args, new_user=True)
|
|
db.session.add(user)
|
|
db.session.commit()
|
|
return user.to_dict()
|
|
|
|
|
|
@bp.route("/user/<int:uid>/edit", methods=["POST"])
|
|
@api_web_bp.route("/user/<int:uid>/edit", methods=["POST"])
|
|
@login_required
|
|
@scodoc
|
|
@permission_required(Permission.UsersAdmin)
|
|
@as_json
|
|
def user_edit(uid: int):
|
|
"""Modification d'un utilisateur
|
|
Champs modifiables:
|
|
{
|
|
"dept": str or null,
|
|
"nom": str,
|
|
"prenom": str,
|
|
"active":bool
|
|
...
|
|
}
|
|
"""
|
|
args = request.get_json(force=True) # may raise 400 Bad Request
|
|
user: User = User.query.get_or_404(uid)
|
|
# L'utilisateur doit avoir le droit dans le département de départ et celui d'arrivée
|
|
orig_dept = user.dept
|
|
dest_dept = args.get("dept", False)
|
|
if dest_dept is not False:
|
|
if dest_dept == "@all":
|
|
dest_dept = None
|
|
allowed_depts = current_user.get_depts_with_permission(Permission.UsersAdmin)
|
|
if (None not in allowed_depts) and (
|
|
(orig_dept not in allowed_depts) or (dest_dept not in allowed_depts)
|
|
):
|
|
return json_error(403, "user_edit: departement non autorise")
|
|
if dest_dept != orig_dept:
|
|
if (dest_dept is not None) and (
|
|
Departement.query.filter_by(acronym=dest_dept).first() is None
|
|
):
|
|
return json_error(404, "user_edit: departement inexistant")
|
|
user.dept = dest_dept
|
|
|
|
ok, msg = _is_allowed_user_edit(args)
|
|
if not ok:
|
|
return json_error(403, f"user_edit: {msg}")
|
|
|
|
user.from_dict(args)
|
|
db.session.add(user)
|
|
db.session.commit()
|
|
return user.to_dict()
|
|
|
|
|
|
@bp.route("/user/<int:uid>/password", methods=["POST"])
|
|
@api_web_bp.route("/user/<int:uid>/password", methods=["POST"])
|
|
@login_required
|
|
@scodoc
|
|
@permission_required(Permission.UsersAdmin)
|
|
@as_json
|
|
def user_password(uid: int):
|
|
"""Modification du mot de passe d'un utilisateur
|
|
Champs modifiables:
|
|
{
|
|
"password": str
|
|
}
|
|
Si le mot de passe ne convient pas, erreur 400.
|
|
"""
|
|
data = request.get_json(force=True) # may raise 400 Bad Request
|
|
user: User = User.query.get_or_404(uid)
|
|
password = data.get("password")
|
|
if not password:
|
|
return json_error(404, "user_password: missing password")
|
|
if not is_valid_password(password):
|
|
return json_error(API_CLIENT_ERROR, "user_password: invalid password")
|
|
allowed_depts = current_user.get_depts_with_permission(Permission.UsersAdmin)
|
|
if (None not in allowed_depts) and ((user.dept not in allowed_depts)):
|
|
return json_error(403, "user_password: departement non autorise")
|
|
user.set_password(password)
|
|
db.session.add(user)
|
|
db.session.commit()
|
|
return user.to_dict()
|
|
|
|
|
|
@bp.route("/user/<int:uid>/role/<string:role_name>/add", methods=["POST"])
|
|
@api_web_bp.route("/user/<int:uid>/role/<string:role_name>/add", methods=["POST"])
|
|
@bp.route(
|
|
"/user/<int:uid>/role/<string:role_name>/add/departement/<string:dept>",
|
|
methods=["POST"],
|
|
)
|
|
@api_web_bp.route(
|
|
"/user/<int:uid>/role/<string:role_name>/add/departement/<string:dept>",
|
|
methods=["POST"],
|
|
)
|
|
@login_required
|
|
@scodoc
|
|
@permission_required(Permission.ScoSuperAdmin)
|
|
@as_json
|
|
def user_role_add(uid: int, role_name: str, dept: str = None):
|
|
"""Add a role in the given dept to the user"""
|
|
user: User = User.query.get_or_404(uid)
|
|
role: Role = Role.query.filter_by(name=role_name).first_or_404()
|
|
if dept is not None: # check
|
|
_ = Departement.query.filter_by(acronym=dept).first_or_404()
|
|
allowed_depts = current_user.get_depts_with_permission(Permission.ScoSuperAdmin)
|
|
if (None not in allowed_depts) and (dept not in allowed_depts):
|
|
return json_error(403, "user_role_add: departement non autorise")
|
|
user.add_role(role, dept)
|
|
db.session.add(user)
|
|
db.session.commit()
|
|
return user.to_dict()
|
|
|
|
|
|
@bp.route("/user/<int:uid>/role/<string:role_name>/remove", methods=["POST"])
|
|
@api_web_bp.route("/user/<int:uid>/role/<string:role_name>/remove", methods=["POST"])
|
|
@bp.route(
|
|
"/user/<int:uid>/role/<string:role_name>/remove/departement/<string:dept>",
|
|
methods=["POST"],
|
|
)
|
|
@api_web_bp.route(
|
|
"/user/<int:uid>/role/<string:role_name>/remove/departement/<string:dept>",
|
|
methods=["POST"],
|
|
)
|
|
@login_required
|
|
@scodoc
|
|
@permission_required(Permission.ScoSuperAdmin)
|
|
@as_json
|
|
def user_role_remove(uid: int, role_name: str, dept: str = None):
|
|
"""Remove the role (in the given dept) from the user"""
|
|
user: User = User.query.get_or_404(uid)
|
|
role: Role = Role.query.filter_by(name=role_name).first_or_404()
|
|
if dept is not None: # check
|
|
_ = Departement.query.filter_by(acronym=dept).first_or_404()
|
|
allowed_depts = current_user.get_depts_with_permission(Permission.ScoSuperAdmin)
|
|
if (None not in allowed_depts) and (dept not in allowed_depts):
|
|
return json_error(403, "user_role_remove: departement non autorise")
|
|
|
|
query = UserRole.query.filter(UserRole.role == role, UserRole.user == user)
|
|
if dept is not None:
|
|
query = query.filter(UserRole.dept == dept)
|
|
user_role = query.first()
|
|
if user_role:
|
|
db.session.delete(user_role)
|
|
db.session.add(user)
|
|
db.session.commit()
|
|
return user.to_dict()
|
|
|
|
|
|
@bp.route("/permissions")
|
|
@api_web_bp.route("/permissions")
|
|
@login_required
|
|
@scodoc
|
|
@permission_required(Permission.UsersView)
|
|
@as_json
|
|
def list_permissions():
|
|
"""Liste des noms de permissions définies"""
|
|
return list(Permission.permission_by_name.keys())
|
|
|
|
|
|
@bp.route("/role/<string:role_name>")
|
|
@api_web_bp.route("/role/<string:role_name>")
|
|
@login_required
|
|
@scodoc
|
|
@permission_required(Permission.UsersView)
|
|
@as_json
|
|
def list_role(role_name: str):
|
|
"""Un rôle"""
|
|
return Role.query.filter_by(name=role_name).first_or_404().to_dict()
|
|
|
|
|
|
@bp.route("/roles")
|
|
@api_web_bp.route("/roles")
|
|
@login_required
|
|
@scodoc
|
|
@permission_required(Permission.UsersView)
|
|
@as_json
|
|
def list_roles():
|
|
"""Tous les rôles définis"""
|
|
return [role.to_dict() for role in Role.query]
|
|
|
|
|
|
@bp.route(
|
|
"/role/<string:role_name>/add_permission/<string:perm_name>",
|
|
methods=["POST"],
|
|
)
|
|
@api_web_bp.route(
|
|
"/role/<string:role_name>/add_permission/<string:perm_name>",
|
|
methods=["POST"],
|
|
)
|
|
@login_required
|
|
@scodoc
|
|
@permission_required(Permission.ScoSuperAdmin)
|
|
@as_json
|
|
def role_permission_add(role_name: str, perm_name: str):
|
|
"""Add permission to role"""
|
|
role: Role = Role.query.filter_by(name=role_name).first_or_404()
|
|
permission = Permission.get_by_name(perm_name)
|
|
if permission is None:
|
|
return json_error(404, "role_permission_add: permission inconnue")
|
|
role.add_permission(permission)
|
|
db.session.add(role)
|
|
db.session.commit()
|
|
log(f"role_permission_add({role_name}, {perm_name})")
|
|
return role.to_dict()
|
|
|
|
|
|
@bp.route(
|
|
"/role/<string:role_name>/remove_permission/<string:perm_name>",
|
|
methods=["POST"],
|
|
)
|
|
@api_web_bp.route(
|
|
"/role/<string:role_name>/remove_permission/<string:perm_name>",
|
|
methods=["POST"],
|
|
)
|
|
@login_required
|
|
@scodoc
|
|
@permission_required(Permission.ScoSuperAdmin)
|
|
@as_json
|
|
def role_permission_remove(role_name: str, perm_name: str):
|
|
"""Remove permission from role"""
|
|
role: Role = Role.query.filter_by(name=role_name).first_or_404()
|
|
permission = Permission.get_by_name(perm_name)
|
|
if permission is None:
|
|
return json_error(404, "role_permission_remove: permission inconnue")
|
|
role.remove_permission(permission)
|
|
db.session.add(role)
|
|
db.session.commit()
|
|
log(f"role_permission_remove({role_name}, {perm_name})")
|
|
return role.to_dict()
|
|
|
|
|
|
@bp.route("/role/create/<string:role_name>", methods=["POST"])
|
|
@api_web_bp.route("/role/create/<string:role_name>", methods=["POST"])
|
|
@login_required
|
|
@scodoc
|
|
@permission_required(Permission.ScoSuperAdmin)
|
|
@as_json
|
|
def role_create(role_name: str):
|
|
"""Create a new role with permissions.
|
|
{
|
|
"permissions" : [ 'ScoView', ... ]
|
|
}
|
|
"""
|
|
role: Role = Role.query.filter_by(name=role_name).first()
|
|
if role:
|
|
return json_error(404, "role_create: role already exists")
|
|
role = Role(name=role_name)
|
|
data = request.get_json(force=True) # may raise 400 Bad Request
|
|
permissions = data.get("permissions")
|
|
if permissions:
|
|
try:
|
|
role.set_named_permissions(permissions)
|
|
except ScoValueError:
|
|
return json_error(404, "role_create: invalid permissions")
|
|
db.session.add(role)
|
|
db.session.commit()
|
|
return role.to_dict()
|
|
|
|
|
|
@bp.route("/role/<string:role_name>/edit", methods=["POST"])
|
|
@api_web_bp.route("/role/<string:role_name>/edit", methods=["POST"])
|
|
@login_required
|
|
@scodoc
|
|
@permission_required(Permission.ScoSuperAdmin)
|
|
@as_json
|
|
def role_edit(role_name: str):
|
|
"""Edit a role. On peut spécifier un nom et/ou des permissions.
|
|
{
|
|
"name" : name
|
|
"permissions" : [ 'ScoView', ... ]
|
|
}
|
|
"""
|
|
role: Role = Role.query.filter_by(name=role_name).first_or_404()
|
|
data = request.get_json(force=True) # may raise 400 Bad Request
|
|
permissions = data.get("permissions", False)
|
|
if permissions is not False:
|
|
try:
|
|
role.set_named_permissions(permissions)
|
|
except ScoValueError:
|
|
return json_error(404, "role_create: invalid permissions")
|
|
role_name = data.get("role_name")
|
|
if role_name and role_name != role.name:
|
|
existing_role: Role = Role.query.filter_by(name=role_name).first()
|
|
if existing_role:
|
|
return json_error(404, "role_edit: role name already exists")
|
|
role.name = role_name
|
|
db.session.add(role)
|
|
db.session.commit()
|
|
return role.to_dict()
|
|
|
|
|
|
@bp.route("/role/<string:role_name>/delete", methods=["POST"])
|
|
@api_web_bp.route("/role/<string:role_name>/delete", methods=["POST"])
|
|
@login_required
|
|
@scodoc
|
|
@permission_required(Permission.ScoSuperAdmin)
|
|
@as_json
|
|
def role_delete(role_name: str):
|
|
"""Delete a role"""
|
|
role: Role = Role.query.filter_by(name=role_name).first_or_404()
|
|
db.session.delete(role)
|
|
db.session.commit()
|
|
return {"OK": True}
|