From 9a0852917fdd33713192b83c36757cd512643ed3 Mon Sep 17 00:00:00 2001 From: iziram Date: Mon, 17 Apr 2023 15:35:42 +0200 Subject: [PATCH] Assiduites : Fonctionnement BackEnd + API --- app/api/__init__.py | 21 +- app/api/assiduites.py | 868 +++++++++++++++++++++++ app/api/etudiants.py | 37 + app/api/justificatifs.py | 591 +++++++++++++++ app/scodoc/sco_archives.py | 15 +- app/scodoc/sco_archives_justificatifs.py | 215 ++++++ app/scodoc/sco_assiduites.py | 355 +++++++++ 7 files changed, 2096 insertions(+), 6 deletions(-) create mode 100644 app/api/assiduites.py mode change 100644 => 100755 app/api/etudiants.py create mode 100644 app/api/justificatifs.py create mode 100644 app/scodoc/sco_archives_justificatifs.py create mode 100644 app/scodoc/sco_assiduites.py diff --git a/app/api/__init__.py b/app/api/__init__.py index d5b436881f..a3c77eb976 100644 --- a/app/api/__init__.py +++ b/app/api/__init__.py @@ -2,7 +2,8 @@ """ from flask import Blueprint -from flask import request +from flask import request, g, jsonify +from app import db from app.scodoc import sco_utils as scu from app.scodoc.sco_exceptions import ScoException @@ -34,9 +35,26 @@ def requested_format(default_format="json", allowed_formats=None): return None +def get_model_api_object(model_cls: db.Model, model_id: int, join_cls: db.Model = None): + """ + Retourne une réponse contenant la représentation api de l'objet "Model[model_id]" + + Filtrage du département en fonction d'une classe de jointure (eg: Identite, Formsemstre) -> join_cls + + exemple d'utilisation : fonction "justificatif()" -> app/api/justificatifs.py + """ + query = model_cls.query.filter_by(id=model_id) + if g.scodoc_dept and join_cls is not None: + query = query.join(join_cls).filter_by(dept_id=g.scodoc_dept_id) + unique: model_cls = query.first_or_404() + + return jsonify(unique.to_dict(format_api=True)) + + from app.api import tokens from app.api import ( absences, + assiduites, billets_absences, departements, etudiants, @@ -44,6 +62,7 @@ from app.api import ( formations, formsemestres, jury, + justificatifs, logos, partitions, semset, diff --git a/app/api/assiduites.py b/app/api/assiduites.py new file mode 100644 index 0000000000..5853e3ff0f --- /dev/null +++ b/app/api/assiduites.py @@ -0,0 +1,868 @@ +############################################################################## +# ScoDoc +# Copyright (c) 1999 - 2022 Emmanuel Viennet. All rights reserved. +# See LICENSE +############################################################################## +"""ScoDoc 9 API : Assiduités +""" +from datetime import datetime +from flask import g, jsonify, request +from flask_login import login_required, current_user + +import app.scodoc.sco_assiduites as scass +import app.scodoc.sco_utils as scu +from app import db +from app.api import api_bp as bp +from app.api import api_web_bp +from app.api import get_model_api_object +from app.decorators import permission_required, scodoc +from app.models import Assiduite, FormSemestre, Identite, ModuleImpl +from app.scodoc.sco_exceptions import ScoValueError +from app.scodoc.sco_permissions import Permission +from app.scodoc.sco_utils import json_error + + +@bp.route("/assiduite/") +@api_web_bp.route("/assiduite/") +@scodoc +@permission_required(Permission.ScoView) +def assiduite(assiduite_id: int = None): + """Retourne un objet assiduité à partir de son id + + Exemple de résultat: + { + "assiduite_id": 1, + "etudid": 2, + "moduleimpl_id": 3, + "date_debut": "2022-10-31T08:00+01:00", + "date_fin": "2022-10-31T10:00+01:00", + "etat": "retard", + "desc": "une description", + "user_id: 1 or null, + "est_just": False or True, + } + """ + + return get_model_api_object(Assiduite, assiduite_id, Identite) + + +@bp.route("/assiduites//count", defaults={"with_query": False}) +@bp.route("/assiduites//count/query", defaults={"with_query": True}) +@api_web_bp.route("/assiduites//count", defaults={"with_query": False}) +@api_web_bp.route("/assiduites//count/query", defaults={"with_query": True}) +@login_required +@scodoc +@permission_required(Permission.ScoView) +def count_assiduites(etudid: int = None, with_query: bool = False): + """ + Retourne le nombre d'assiduités d'un étudiant + chemin : /assiduites//count + + Un filtrage peut être donné avec une query + chemin : /assiduites//count/query? + + Les différents filtres : + Type (type de comptage -> journee, demi, heure, nombre d'assiduite): + query?type=(journee, demi, heure) -> une seule valeur parmis les trois + ex: .../query?type=heure + Comportement par défaut : compte le nombre d'assiduité enregistrée + + Etat (etat de l'étudiant -> absent, present ou retard): + query?etat=[- liste des états séparé par une virgule -] + ex: .../query?etat=present,retard + Date debut + (date de début de l'assiduité, sont affichés les assiduités + dont la date de début est supérieur ou égale à la valeur donnée): + query?date_debut=[- date au format iso -] + ex: query?date_debut=2022-11-03T08:00+01:00 + Date fin + (date de fin de l'assiduité, sont affichés les assiduités + dont la date de fin est inférieure ou égale à la valeur donnée): + query?date_fin=[- date au format iso -] + ex: query?date_fin=2022-11-03T10:00+01:00 + Moduleimpl_id (l'id du module concerné par l'assiduité): + query?moduleimpl_id=[- int ou vide -] + ex: query?moduleimpl_id=1234 + query?moduleimpl_od= + Formsemstre_id (l'id du formsemestre concerné par l'assiduité) + query?formsemestre_id=[int] + ex query?formsemestre_id=3 + user_id (l'id de l'auteur de l'assiduité) + query?user_id=[int] + ex query?user_id=3 + est_just (si l'assiduité est justifié (fait aussi filtre par abs/retard)) + query?est_just=[bool] + query?est_just=f + query?est_just=t + + + + """ + query = Identite.query.filter_by(id=etudid) + if g.scodoc_dept: + query = query.filter_by(dept_id=g.scodoc_dept_id) + + etud: Identite = query.first_or_404(etudid) + filtered: dict[str, object] = {} + metric: str = "all" + + if with_query: + metric, filtered = _count_manager(request) + + return jsonify( + scass.get_assiduites_stats( + assiduites=etud.assiduites, metric=metric, filtered=filtered + ) + ) + + +@bp.route("/assiduites/", defaults={"with_query": False}) +@bp.route("/assiduites//query", defaults={"with_query": True}) +@api_web_bp.route("/assiduites/", defaults={"with_query": False}) +@api_web_bp.route("/assiduites//query", defaults={"with_query": True}) +@login_required +@scodoc +@permission_required(Permission.ScoView) +def assiduites(etudid: int = None, with_query: bool = False): + """ + Retourne toutes les assiduités d'un étudiant + chemin : /assiduites/ + + Un filtrage peut être donné avec une query + chemin : /assiduites//query? + + Les différents filtres : + Etat (etat de l'étudiant -> absent, present ou retard): + query?etat=[- liste des états séparé par une virgule -] + ex: .../query?etat=present,retard + Date debut + (date de début de l'assiduité, sont affichés les assiduités + dont la date de début est supérieur ou égale à la valeur donnée): + query?date_debut=[- date au format iso -] + ex: query?date_debut=2022-11-03T08:00+01:00 + Date fin + (date de fin de l'assiduité, sont affichés les assiduités + dont la date de fin est inférieure ou égale à la valeur donnée): + query?date_fin=[- date au format iso -] + ex: query?date_fin=2022-11-03T10:00+01:00 + Moduleimpl_id (l'id du module concerné par l'assiduité): + query?moduleimpl_id=[- int ou vide -] + ex: query?moduleimpl_id=1234 + query?moduleimpl_od= + Formsemstre_id (l'id du formsemestre concerné par l'assiduité) + query?formsemstre_id=[int] + ex query?formsemestre_id=3 + user_id (l'id de l'auteur de l'assiduité) + query?user_id=[int] + ex query?user_id=3 + est_just (si l'assiduité est justifié (fait aussi filtre par abs/retard)) + query?est_just=[bool] + query?est_just=f + query?est_just=t + + + """ + + query = Identite.query.filter_by(id=etudid) + if g.scodoc_dept: + query = query.filter_by(dept_id=g.scodoc_dept_id) + + etud: Identite = query.first_or_404(etudid) + assiduites_query = etud.assiduites + + if with_query: + assiduites_query = _filter_manager(request, assiduites_query) + + data_set: list[dict] = [] + for ass in assiduites_query.all(): + data = ass.to_dict(format_api=True) + data_set.append(data) + + return jsonify(data_set) + + +@bp.route("/assiduites/group/query", defaults={"with_query": True}) +@api_web_bp.route("/assiduites/group/query", defaults={"with_query": True}) +@login_required +@scodoc +@permission_required(Permission.ScoView) +def assiduites_group(with_query: bool = False): + """ + Retourne toutes les assiduités d'un groupe d'étudiants + chemin : /assiduites/group/query?etudids=1,2,3 + + Un filtrage peut être donné avec une query + chemin : /assiduites/group/query?etudids=1,2,3 + + Les différents filtres : + Etat (etat de l'étudiant -> absent, present ou retard): + query?etat=[- liste des états séparé par une virgule -] + ex: .../query?etat=present,retard + Date debut + (date de début de l'assiduité, sont affichés les assiduités + dont la date de début est supérieur ou égale à la valeur donnée): + query?date_debut=[- date au format iso -] + ex: query?date_debut=2022-11-03T08:00+01:00 + Date fin + (date de fin de l'assiduité, sont affichés les assiduités + dont la date de fin est inférieure ou égale à la valeur donnée): + query?date_fin=[- date au format iso -] + ex: query?date_fin=2022-11-03T10:00+01:00 + Moduleimpl_id (l'id du module concerné par l'assiduité): + query?moduleimpl_id=[- int ou vide -] + ex: query?moduleimpl_id=1234 + query?moduleimpl_od= + Formsemstre_id (l'id du formsemestre concerné par l'assiduité) + query?formsemstre_id=[int] + ex query?formsemestre_id=3 + user_id (l'id de l'auteur de l'assiduité) + query?user_id=[int] + ex query?user_id=3 + est_just (si l'assiduité est justifié (fait aussi filtre par abs/retard)) + query?est_just=[bool] + query?est_just=f + query?est_just=t + + + """ + + etuds = request.args.get("etudids", "") + etuds = etuds.split(",") + try: + etuds = [int(etu) for etu in etuds] + except ValueError: + return json_error(404, "Le champs etudids n'est pas correctement formé") + + query = Identite.query.filter(Identite.id.in_(etuds)) + if g.scodoc_dept: + query = query.filter_by(dept_id=g.scodoc_dept_id) + + if len(etuds) != query.count() or len(etuds) == 0: + return json_error( + 404, + "Tous les étudiants ne sont pas dans le même département et/ou n'existe pas.", + ) + assiduites_query = Assiduite.query.filter(Assiduite.etudid.in_(etuds)) + + if with_query: + assiduites_query = _filter_manager(request, assiduites_query) + + data_set: dict[list[dict]] = {key: [] for key in etuds} + for ass in assiduites_query.all(): + data = ass.to_dict(format_api=True) + data_set.get(data["etudid"]).append(data) + + return jsonify(data_set) + + +@bp.route( + "/assiduites/formsemestre/", defaults={"with_query": False} +) +@api_web_bp.route( + "/assiduites/formsemestre/", defaults={"with_query": False} +) +@bp.route( + "/assiduites/formsemestre//query", + defaults={"with_query": True}, +) +@api_web_bp.route( + "/assiduites/formsemestre//query", + defaults={"with_query": True}, +) +@login_required +@scodoc +@permission_required(Permission.ScoView) +def assiduites_formsemestre(formsemestre_id: int, with_query: bool = False): + """Retourne toutes les assiduités du formsemestre""" + formsemestre: FormSemestre = None + formsemestre_id = int(formsemestre_id) + formsemestre = FormSemestre.query.filter_by(id=formsemestre_id).first() + + if formsemestre is None: + return json_error(404, "le paramètre 'formsemestre_id' n'existe pas") + + assiduites_query = scass.filter_by_formsemestre(Assiduite.query, formsemestre) + + if with_query: + assiduites_query = _filter_manager(request, assiduites_query) + + data_set: list[dict] = [] + for ass in assiduites_query.all(): + data = ass.to_dict(format_api=True) + data_set.append(data) + + return jsonify(data_set) + + +@bp.route( + "/assiduites/formsemestre//count", + defaults={"with_query": False}, +) +@api_web_bp.route( + "/assiduites/formsemestre//count", + defaults={"with_query": False}, +) +@bp.route( + "/assiduites/formsemestre//count/query", + defaults={"with_query": True}, +) +@api_web_bp.route( + "/assiduites/formsemestre//count/query", + defaults={"with_query": True}, +) +@login_required +@scodoc +@permission_required(Permission.ScoView) +def count_assiduites_formsemestre( + formsemestre_id: int = None, with_query: bool = False +): + """Comptage des assiduités du formsemestre""" + formsemestre: FormSemestre = None + formsemestre_id = int(formsemestre_id) + formsemestre = FormSemestre.query.filter_by(id=formsemestre_id).first() + + if formsemestre is None: + return json_error(404, "le paramètre 'formsemestre_id' n'existe pas") + + etuds = formsemestre.etuds.all() + etuds_id = [etud.id for etud in etuds] + + assiduites_query = Assiduite.query.filter(Assiduite.etudid.in_(etuds_id)) + assiduites_query = scass.filter_by_formsemestre(assiduites_query, formsemestre) + metric: str = "all" + filtered: dict = {} + if with_query: + metric, filtered = _count_manager(request) + + return jsonify(scass.get_assiduites_stats(assiduites_query, metric, filtered)) + + +@bp.route("/assiduite//create", methods=["POST"]) +@api_web_bp.route("/assiduite//create", methods=["POST"]) +@scodoc +@login_required +@permission_required(Permission.ScoView) +# @permission_required(Permission.ScoAssiduiteChange) +def assiduite_create(etudid: int = None): + """ + Création d'une assiduité pour l'étudiant (etudid) + La requête doit avoir un content type "application/json": + [ + { + "date_debut": str, + "date_fin": str, + "etat": str, + }, + { + "date_debut": str, + "date_fin": str, + "etat": str, + "moduleimpl_id": int, + "desc":str, + } + ... + ] + + """ + etud: Identite = Identite.query.filter_by(id=etudid).first_or_404() + + create_list: list[object] = request.get_json(force=True) + + if not isinstance(create_list, list): + return json_error(404, "Le contenu envoyé n'est pas une liste") + + errors: dict[int, str] = {} + success: dict[int, object] = {} + for i, data in enumerate(create_list): + code, obj = _create_singular(data, etud) + if code == 404: + errors[i] = obj + else: + success[i] = obj + + db.session.commit() + + return jsonify({"errors": errors, "success": success}) + + +@bp.route("/assiduites/create", methods=["POST"]) +@api_web_bp.route("/assiduites/create", methods=["POST"]) +@scodoc +@login_required +@permission_required(Permission.ScoView) +# @permission_required(Permission.ScoAssiduiteChange) +def assiduites_create(): + """ + Création d'une assiduité ou plusieurs assiduites + La requête doit avoir un content type "application/json": + [ + { + "date_debut": str, + "date_fin": str, + "etat": str, + "etudid":int, + }, + { + "date_debut": str, + "date_fin": str, + "etat": str, + "etudid":int, + + "moduleimpl_id": int, + "desc":str, + } + ... + ] + + """ + + create_list: list[object] = request.get_json(force=True) + + if not isinstance(create_list, list): + return json_error(404, "Le contenu envoyé n'est pas une liste") + + errors: dict[int, str] = {} + success: dict[int, object] = {} + for i, data in enumerate(create_list): + etud: Identite = Identite.query.filter_by(id=data["etudid"]).first() + if etud is None: + errors[i] = "Cet étudiant n'existe pas." + continue + + code, obj = _create_singular(data, etud) + if code == 404: + errors[i] = obj + else: + success[i] = obj + + return jsonify({"errors": errors, "success": success}) + + +def _create_singular( + data: dict, + etud: Identite, +) -> tuple[int, object]: + errors: list[str] = [] + + # -- vérifications de l'objet json -- + # cas 1 : ETAT + etat = data.get("etat", None) + if etat is None: + errors.append("param 'etat': manquant") + elif not scu.EtatAssiduite.contains(etat): + errors.append("param 'etat': invalide") + + etat = scu.EtatAssiduite.get(etat) + + # cas 2 : date_debut + date_debut = data.get("date_debut", None) + if date_debut is None: + errors.append("param 'date_debut': manquant") + deb = scu.is_iso_formated(date_debut, convert=True) + if deb is None: + errors.append("param 'date_debut': format invalide") + + # cas 3 : date_fin + date_fin = data.get("date_fin", None) + if date_fin is None: + errors.append("param 'date_fin': manquant") + fin = scu.is_iso_formated(date_fin, convert=True) + if fin is None: + errors.append("param 'date_fin': format invalide") + + # cas 4 : moduleimpl_id + + moduleimpl_id = data.get("moduleimpl_id", False) + moduleimpl: ModuleImpl = None + + if moduleimpl_id is not False: + moduleimpl = ModuleImpl.query.filter_by(id=int(moduleimpl_id)).first() + if moduleimpl is None: + errors.append("param 'moduleimpl_id': invalide") + + # cas 5 : desc + + desc: str = data.get("desc", None) + + if errors: + err: str = ", ".join(errors) + return (404, err) + + # TOUT EST OK + try: + nouv_assiduite: Assiduite = Assiduite.create_assiduite( + date_debut=deb, + date_fin=fin, + etat=etat, + etud=etud, + moduleimpl=moduleimpl, + description=desc, + user_id=current_user.id, + ) + + db.session.add(nouv_assiduite) + db.session.commit() + + return (200, {"assiduite_id": nouv_assiduite.id}) + except ScoValueError as excp: + return ( + 404, + excp.args[0], + ) + + +@bp.route("/assiduite/delete", methods=["POST"]) +@api_web_bp.route("/assiduite/delete", methods=["POST"]) +@login_required +@scodoc +@permission_required(Permission.ScoView) +# @permission_required(Permission.ScoAssiduiteChange) +def assiduite_delete(): + """ + Suppression d'une assiduité à partir de son id + + Forme des données envoyées : + + [ + , + ... + ] + + + """ + assiduites_list: list[int] = request.get_json(force=True) + if not isinstance(assiduites_list, list): + return json_error(404, "Le contenu envoyé n'est pas une liste") + + output = {"errors": {}, "success": {}} + + for i, ass in enumerate(assiduites_list): + code, msg = _delete_singular(ass, db) + if code == 404: + output["errors"][f"{i}"] = msg + else: + output["success"][f"{i}"] = {"OK": True} + db.session.commit() + return jsonify(output) + + +def _delete_singular(assiduite_id: int, database): + assiduite_unique: Assiduite = Assiduite.query.filter_by(id=assiduite_id).first() + if assiduite_unique is None: + return (404, "Assiduite non existante") + database.session.delete(assiduite_unique) + return (200, "OK") + + +@bp.route("/assiduite//edit", methods=["POST"]) +@api_web_bp.route("/assiduite//edit", methods=["POST"]) +@login_required +@scodoc +@permission_required(Permission.ScoView) +# @permission_required(Permission.ScoAssiduiteChange) +def assiduite_edit(assiduite_id: int): + """ + Edition d'une assiduité à partir de son id + La requête doit avoir un content type "application/json": + { + "etat"?: str, + "moduleimpl_id"?: int + "desc"?: str + "est_just"?: bool + } + """ + assiduite_unique: Assiduite = Assiduite.query.filter_by( + id=assiduite_id + ).first_or_404() + errors: list[str] = [] + data = request.get_json(force=True) + + # Vérifications de data + + # Cas 1 : Etat + if data.get("etat") is not None: + etat = scu.EtatAssiduite.get(data.get("etat")) + if etat is None: + errors.append("param 'etat': invalide") + else: + assiduite_unique.etat = etat + + # Cas 2 : Moduleimpl_id + moduleimpl_id = data.get("moduleimpl_id", False) + moduleimpl: ModuleImpl = None + + if moduleimpl_id is not False: + if moduleimpl_id is not None: + moduleimpl = ModuleImpl.query.filter_by(id=int(moduleimpl_id)).first() + if moduleimpl is None: + errors.append("param 'moduleimpl_id': invalide") + else: + if not moduleimpl.est_inscrit( + Identite.query.filter_by(id=assiduite_unique.etudid).first() + ): + errors.append("param 'moduleimpl_id': etud non inscrit") + else: + assiduite_unique.moduleimpl_id = moduleimpl_id + else: + assiduite_unique.moduleimpl_id = moduleimpl_id + + # Cas 3 : desc + desc = data.get("desc", False) + if desc is not False: + assiduite_unique.desc = desc + + # Cas 4 : est_just + est_just = data.get("est_just") + if est_just is not None: + if not isinstance(est_just, bool): + errors.append("param 'est_just' : booléen non reconnu") + else: + assiduite_unique.est_just = est_just + + if errors: + err: str = ", ".join(errors) + return json_error(404, err) + + db.session.add(assiduite_unique) + db.session.commit() + return jsonify({"OK": True}) + + +@bp.route("/assiduites/edit", methods=["POST"]) +@api_web_bp.route("/assiduites/edit", methods=["POST"]) +@login_required +@scodoc +@permission_required(Permission.ScoView) +# @permission_required(Permission.ScoAssiduiteChange) +def assiduites_edit(): + """ + Edition d'une assiduité à partir de son id + La requête doit avoir un content type "application/json": + { + "etat"?: str, + "moduleimpl_id"?: int + "desc"?: str + "est_just"?: bool + } + """ + edit_list: list[object] = request.get_json(force=True) + + if not isinstance(edit_list, list): + return json_error(404, "Le contenu envoyé n'est pas une liste") + + errors: dict[int, str] = {} + success: dict[int, object] = {} + for i, data in enumerate(edit_list): + assi: Identite = Assiduite.query.filter_by(id=data["assiduite_id"]).first() + if assi is None: + errors[i] = "Cet assiduité n'existe pas." + continue + + code, obj = _edit_singular(assi, data) + if code == 404: + errors[i] = obj + else: + success[i] = obj + + db.session.commit() + + return jsonify({"errors": errors, "success": success}) + + +def _edit_singular(assiduite_unique, data): + errors: list[str] = [] + + # Vérifications de data + + # Cas 1 : Etat + if data.get("etat") is not None: + etat = scu.EtatAssiduite.get(data.get("etat")) + if etat is None: + errors.append("param 'etat': invalide") + else: + assiduite_unique.etat = etat + + # Cas 2 : Moduleimpl_id + moduleimpl_id = data.get("moduleimpl_id", False) + moduleimpl: ModuleImpl = None + + if moduleimpl_id is not False: + if moduleimpl_id is not None: + moduleimpl = ModuleImpl.query.filter_by(id=int(moduleimpl_id)).first() + if moduleimpl is None: + errors.append("param 'moduleimpl_id': invalide") + else: + if not moduleimpl.est_inscrit( + Identite.query.filter_by(id=assiduite_unique.etudid).first() + ): + errors.append("param 'moduleimpl_id': etud non inscrit") + else: + assiduite_unique.moduleimpl_id = moduleimpl_id + else: + assiduite_unique.moduleimpl_id = moduleimpl_id + + # Cas 3 : desc + desc = data.get("desc", False) + if desc is not False: + assiduite_unique.desc = desc + + # Cas 4 : est_just + est_just = data.get("est_just") + if est_just is not None: + if not isinstance(est_just, bool): + errors.append("param 'est_just' : booléen non reconnu") + else: + assiduite_unique.est_just = est_just + + if errors: + err: str = ", ".join(errors) + return (404, err) + + db.session.add(assiduite_unique) + + return (200, "OK") + + +# -- Utils -- + + +def _count_manager(requested) -> tuple[str, dict]: + """ + Retourne la/les métriques à utiliser ainsi que le filtre donnés en query de la requête + """ + filtered: dict = {} + # cas 1 : etat assiduite + etat = requested.args.get("etat") + if etat is not None: + filtered["etat"] = etat + + # cas 2 : date de début + deb = requested.args.get("date_debut", "").replace(" ", "+") + deb: datetime = scu.is_iso_formated(deb, True) + if deb is not None: + filtered["date_debut"] = deb + + # cas 3 : date de fin + fin = requested.args.get("date_fin", "").replace(" ", "+") + fin = scu.is_iso_formated(fin, True) + + if fin is not None: + filtered["date_fin"] = fin + + # cas 4 : moduleimpl_id + module = requested.args.get("moduleimpl_id", False) + try: + if module is False: + raise ValueError + if module != "": + module = int(module) + else: + module = None + except ValueError: + module = False + + if module is not False: + filtered["moduleimpl_id"] = module + + # cas 5 : formsemestre_id + formsemestre_id = requested.args.get("formsemestre_id") + + if formsemestre_id is not None: + formsemestre: FormSemestre = None + formsemestre_id = int(formsemestre_id) + formsemestre = FormSemestre.query.filter_by(id=formsemestre_id).first() + filtered["formsemestre"] = formsemestre + + # cas 6 : type + metric = requested.args.get("metric", "all") + + # cas 7 : est_just + + est_just: str = requested.args.get("est_just") + if est_just is not None: + trues: tuple[str] = ("v", "t", "vrai", "true") + falses: tuple[str] = ("f", "faux", "false") + + if est_just.lower() in trues: + filtered["est_just"] = True + elif est_just.lower() in falses: + filtered["est_just"] = False + + # cas 8 : user_id + + user_id = requested.args.get("user_id", False) + if user_id is not False: + filtered["user_id"] = user_id + + return (metric, filtered) + + +def _filter_manager(requested, assiduites_query: Assiduite): + """ + Retourne les assiduites entrées filtrées en fonction de la request + """ + # cas 1 : etat assiduite + etat = requested.args.get("etat") + if etat is not None: + assiduites_query = scass.filter_assiduites_by_etat(assiduites_query, etat) + + # cas 2 : date de début + deb = requested.args.get("date_debut", "").replace(" ", "+") + deb: datetime = scu.is_iso_formated(deb, True) + + # cas 3 : date de fin + fin = requested.args.get("date_fin", "").replace(" ", "+") + fin = scu.is_iso_formated(fin, True) + + if (deb, fin) != (None, None): + assiduites_query: Assiduite = scass.filter_by_date( + assiduites_query, Assiduite, deb, fin + ) + + # cas 4 : moduleimpl_id + module = requested.args.get("moduleimpl_id", False) + try: + if module is False: + raise ValueError + if module != "": + module = int(module) + else: + module = None + except ValueError: + module = False + + if module is not False: + assiduites_query = scass.filter_by_module_impl(assiduites_query, module) + + # cas 5 : formsemestre_id + formsemestre_id = requested.args.get("formsemestre_id") + + if formsemestre_id is not None: + formsemestre: FormSemestre = None + formsemestre_id = int(formsemestre_id) + formsemestre = FormSemestre.query.filter_by(id=formsemestre_id).first() + assiduites_query = scass.filter_by_formsemestre(assiduites_query, formsemestre) + + # cas 6 : est_just + + est_just: str = requested.args.get("est_just") + if est_just is not None: + trues: tuple[str] = ("v", "t", "vrai", "true") + falses: tuple[str] = ("f", "faux", "false") + + if est_just.lower() in trues: + assiduites_query: Assiduite = scass.filter_assiduites_by_est_just( + assiduites_query, True + ) + elif est_just.lower() in falses: + assiduites_query: Assiduite = scass.filter_assiduites_by_est_just( + assiduites_query, False + ) + + # cas 8 : user_id + + user_id = requested.args.get("user_id", False) + if user_id is not False: + assiduites_query: Assiduite = scass.filter_by_user_id(assiduites_query, user_id) + + return assiduites_query diff --git a/app/api/etudiants.py b/app/api/etudiants.py old mode 100644 new mode 100755 index e8030b0199..a5d63e4dca --- a/app/api/etudiants.py +++ b/app/api/etudiants.py @@ -34,6 +34,7 @@ from app.scodoc.sco_bulletins import do_formsemestre_bulletinetud from app.scodoc.sco_permissions import Permission from app.scodoc.sco_utils import json_error, suppress_accents +import app.scodoc.sco_photos as sco_photos # Un exemple: # @bp.route("/api_function/") @@ -136,6 +137,42 @@ def etudiant(etudid: int = None, nip: str = None, ine: str = None): return etud.to_dict_api() +@api_web_bp.route("/etudiant/etudid//photo") +@api_web_bp.route("/etudiant/nip//photo") +@api_web_bp.route("/etudiant/ine//photo") +@login_required +@scodoc +@permission_required(Permission.ScoView) +def get_photo_image(etudid: int = None, nip: str = None, ine: str = None): + """ + Retourne la photo de l'étudiant + correspondant ou un placeholder si non existant. + + etudid : l'etudid de l'étudiant + nip : le code nip de l'étudiant + ine : le code ine de l'étudiant + + Attention : Ne peut être qu'utilisée en tant que route de département + """ + + etud = tools.get_etud(etudid, nip, ine) + + if etud is None: + return json_error( + 404, + message="étudiant inconnu", + ) + if not etudid: + filename = sco_photos.UNKNOWN_IMAGE_PATH + + size = request.args.get("size", "orig") + filename = sco_photos.photo_pathname(etud.photo_filename, size=size) + if not filename: + filename = sco_photos.UNKNOWN_IMAGE_PATH + res = sco_photos.build_image_response(filename) + return res + + @bp.route("/etudiants/etudid/", methods=["GET"]) @bp.route("/etudiants/nip/", methods=["GET"]) @bp.route("/etudiants/ine/", methods=["GET"]) diff --git a/app/api/justificatifs.py b/app/api/justificatifs.py new file mode 100644 index 0000000000..7017ca1b4b --- /dev/null +++ b/app/api/justificatifs.py @@ -0,0 +1,591 @@ +############################################################################## +# ScoDoc +# Copyright (c) 1999 - 2022 Emmanuel Viennet. All rights reserved. +# See LICENSE +############################################################################## +"""ScoDoc 9 API : Assiduités +""" +from datetime import datetime + +from flask import g, jsonify, request +from flask_login import login_required, current_user + +import app.scodoc.sco_assiduites as scass +import app.scodoc.sco_utils as scu +from app import db +from app.api import api_bp as bp +from app.api import api_web_bp +from app.api import get_model_api_object +from app.decorators import permission_required, scodoc +from app.models import Identite, Justificatif +from app.models.assiduites import compute_assiduites_justified +from app.scodoc.sco_archives_justificatifs import JustificatifArchiver +from app.scodoc.sco_exceptions import ScoValueError +from app.scodoc.sco_permissions import Permission +from app.scodoc.sco_utils import json_error + + +# Partie Modèle +@bp.route("/justificatif/") +@api_web_bp.route("/justificatif/") +@scodoc +@permission_required(Permission.ScoView) +def justificatif(justif_id: int = None): + """Retourne un objet justificatif à partir de son id + + Exemple de résultat: + { + "justif_id": 1, + "etudid": 2, + "date_debut": "2022-10-31T08:00+01:00", + "date_fin": "2022-10-31T10:00+01:00", + "etat": "valide", + "fichier": "archive_id", + "raison": "une raison", + "entry_date": "2022-10-31T08:00+01:00", + "user_id": 1 or null, + } + + """ + + return get_model_api_object(Justificatif, justif_id, Identite) + + +@bp.route("/justificatifs/", defaults={"with_query": False}) +@bp.route("/justificatifs//query", defaults={"with_query": True}) +@api_web_bp.route("/justificatifs/", defaults={"with_query": False}) +@api_web_bp.route("/justificatifs//query", defaults={"with_query": True}) +@login_required +@scodoc +@permission_required(Permission.ScoView) +def justificatifs(etudid: int = None, with_query: bool = False): + """ + Retourne toutes les assiduités d'un étudiant + chemin : /justificatifs/ + + Un filtrage peut être donné avec une query + chemin : /justificatifs//query? + + Les différents filtres : + Etat (etat du justificatif -> validé, non validé, modifé, en attente): + query?etat=[- liste des états séparé par une virgule -] + ex: .../query?etat=validé,modifié + Date debut + (date de début du justificatif, sont affichés les justificatifs + dont la date de début est supérieur ou égale à la valeur donnée): + query?date_debut=[- date au format iso -] + ex: query?date_debut=2022-11-03T08:00+01:00 + Date fin + (date de fin du justificatif, sont affichés les justificatifs + dont la date de fin est inférieure ou égale à la valeur donnée): + query?date_fin=[- date au format iso -] + ex: query?date_fin=2022-11-03T10:00+01:00 + user_id (l'id de l'auteur du justificatif) + query?user_id=[int] + ex query?user_id=3 + """ + + query = Identite.query.filter_by(id=etudid) + if g.scodoc_dept: + query = query.filter_by(dept_id=g.scodoc_dept_id) + + etud: Identite = query.first_or_404(etudid) + justificatifs_query = etud.justificatifs + + if with_query: + justificatifs_query = _filter_manager(request, justificatifs_query) + + data_set: list[dict] = [] + for just in justificatifs_query.all(): + data = just.to_dict(format_api=True) + data_set.append(data) + + return jsonify(data_set) + + +@bp.route("/justificatif//create", methods=["POST"]) +@api_web_bp.route("/justificatif//create", methods=["POST"]) +@scodoc +@login_required +@permission_required(Permission.ScoView) +# @permission_required(Permission.ScoAssiduiteChange) +def justif_create(etudid: int = None): + """ + Création d'un justificatif pour l'étudiant (etudid) + La requête doit avoir un content type "application/json": + [ + { + "date_debut": str, + "date_fin": str, + "etat": str, + }, + { + "date_debut": str, + "date_fin": str, + "etat": str, + "raison":str, + } + ... + ] + + """ + etud: Identite = Identite.query.filter_by(id=etudid).first_or_404() + + create_list: list[object] = request.get_json(force=True) + + if not isinstance(create_list, list): + return json_error(404, "Le contenu envoyé n'est pas une liste") + + errors: dict[int, str] = {} + success: dict[int, object] = {} + for i, data in enumerate(create_list): + code, obj = _create_singular(data, etud) + if code == 404: + errors[i] = obj + else: + success[i] = obj + compute_assiduites_justified(Justificatif.query.filter_by(etudid=etudid), True) + return jsonify({"errors": errors, "success": success}) + + +def _create_singular( + data: dict, + etud: Identite, +) -> tuple[int, object]: + errors: list[str] = [] + + # -- vérifications de l'objet json -- + # cas 1 : ETAT + etat = data.get("etat", None) + if etat is None: + errors.append("param 'etat': manquant") + elif not scu.EtatJustificatif.contains(etat): + errors.append("param 'etat': invalide") + + etat = scu.EtatJustificatif.get(etat) + + # cas 2 : date_debut + date_debut = data.get("date_debut", None) + if date_debut is None: + errors.append("param 'date_debut': manquant") + deb = scu.is_iso_formated(date_debut, convert=True) + if deb is None: + errors.append("param 'date_debut': format invalide") + + # cas 3 : date_fin + date_fin = data.get("date_fin", None) + if date_fin is None: + errors.append("param 'date_fin': manquant") + fin = scu.is_iso_formated(date_fin, convert=True) + if fin is None: + errors.append("param 'date_fin': format invalide") + + # cas 4 : raison + + raison: str = data.get("raison", None) + + if errors: + err: str = ", ".join(errors) + return (404, err) + + # TOUT EST OK + + try: + nouv_justificatif: Justificatif = Justificatif.create_justificatif( + date_debut=deb, + date_fin=fin, + etat=etat, + etud=etud, + raison=raison, + user_id=current_user.id, + ) + + db.session.add(nouv_justificatif) + db.session.commit() + + return ( + 200, + { + "justif_id": nouv_justificatif.id, + "couverture": scass.justifies(nouv_justificatif), + }, + ) + except ScoValueError as excp: + return ( + 404, + excp.args[0], + ) + + +@bp.route("/justificatif//edit", methods=["POST"]) +@api_web_bp.route("/justificatif//edit", methods=["POST"]) +@login_required +@scodoc +@permission_required(Permission.ScoView) +# @permission_required(Permission.ScoAssiduiteChange) +def justif_edit(justif_id: int): + """ + Edition d'un justificatif à partir de son id + La requête doit avoir un content type "application/json": + + { + "etat"?: str, + "raison"?: str + "date_debut"?: str + "date_fin"?: str + } + """ + justificatif_unique: Justificatif = Justificatif.query.filter_by( + id=justif_id + ).first_or_404() + + errors: list[str] = [] + data = request.get_json(force=True) + avant_ids: list[int] = scass.justifies(justificatif_unique) + # Vérifications de data + + # Cas 1 : Etat + if data.get("etat") is not None: + etat = scu.EtatJustificatif.get(data.get("etat")) + if etat is None: + errors.append("param 'etat': invalide") + else: + justificatif_unique.etat = etat + + # Cas 2 : raison + raison = data.get("raison", False) + if raison is not False: + justificatif_unique.raison = raison + + deb, fin = None, None + + # cas 3 : date_debut + date_debut = data.get("date_debut", False) + if date_debut is not False: + if date_debut is None: + errors.append("param 'date_debut': manquant") + deb = scu.is_iso_formated(date_debut.replace(" ", "+"), convert=True) + if deb is None: + errors.append("param 'date_debut': format invalide") + + if justificatif_unique.date_fin >= deb: + errors.append("param 'date_debut': date de début située après date de fin ") + + # cas 4 : date_fin + date_fin = data.get("date_fin", False) + if date_fin is not False: + if date_fin is None: + errors.append("param 'date_fin': manquant") + fin = scu.is_iso_formated(date_fin.replace(" ", "+"), convert=True) + if fin is None: + errors.append("param 'date_fin': format invalide") + if justificatif_unique.date_debut <= fin: + errors.append("param 'date_fin': date de fin située avant date de début ") + + # Mise à jour des dates + deb = deb if deb is not None else justificatif_unique.date_debut + fin = fin if fin is not None else justificatif_unique.date_fin + + justificatif_unique.date_debut = deb + justificatif_unique.date_fin = fin + + if errors: + err: str = ", ".join(errors) + return json_error(404, err) + + db.session.add(justificatif_unique) + db.session.commit() + + return jsonify( + { + "couverture": { + "avant": avant_ids, + "après": compute_assiduites_justified( + Justificatif.query.filter_by(etudid=justificatif_unique.etudid), + True, + ), + } + } + ) + + +@bp.route("/justificatif/delete", methods=["POST"]) +@api_web_bp.route("/justificatif/delete", methods=["POST"]) +@login_required +@scodoc +@permission_required(Permission.ScoView) +# @permission_required(Permission.ScoAssiduiteChange) +def justif_delete(): + """ + Suppression d'un justificatif à partir de son id + + Forme des données envoyées : + + [ + , + ... + ] + + + """ + justificatifs_list: list[int] = request.get_json(force=True) + if not isinstance(justificatifs_list, list): + return json_error(404, "Le contenu envoyé n'est pas une liste") + + output = {"errors": {}, "success": {}} + + for i, ass in enumerate(justificatifs_list): + code, msg = _delete_singular(ass, db) + if code == 404: + output["errors"][f"{i}"] = msg + else: + output["success"][f"{i}"] = {"OK": True} + db.session.commit() + return jsonify(output) + + +def _delete_singular(justif_id: int, database): + justificatif_unique: Justificatif = Justificatif.query.filter_by( + id=justif_id + ).first() + if justificatif_unique is None: + return (404, "Justificatif non existant") + + archive_name: str = justificatif_unique.fichier + + if archive_name is not None: + archiver: JustificatifArchiver = JustificatifArchiver() + archiver.delete_justificatif(justificatif_unique.etudid, archive_name) + + database.session.delete(justificatif_unique) + return (200, "OK") + + +# Partie archivage +@bp.route("/justificatif//import", methods=["POST"]) +@api_web_bp.route("/justificatif//import", methods=["POST"]) +@scodoc +@login_required +@permission_required(Permission.ScoView) +# @permission_required(Permission.ScoAssiduiteChange) +def justif_import(justif_id: int = None): + """ + Importation d'un fichier (création d'archive) + """ + if len(request.files) == 0: + return json_error(404, "Il n'y a pas de fichier joint") + + file = list(request.files.values())[0] + if file.filename == "": + return json_error(404, "Il n'y a pas de fichier joint") + + query = Justificatif.query.filter_by(id=justif_id) + if g.scodoc_dept: + query = query.join(Identite).filter_by(dept_id=g.scodoc_dept_id) + + justificatif_unique: Justificatif = query.first_or_404() + + archive_name: str = justificatif_unique.fichier + + archiver: JustificatifArchiver = JustificatifArchiver() + try: + fname: str + archive_name, fname = archiver.save_justificatif( + etudid=justificatif_unique.etudid, + filename=file.filename, + data=file.stream.read(), + archive_name=archive_name, + ) + + justificatif_unique.fichier = archive_name + + db.session.add(justificatif_unique) + db.session.commit() + + return jsonify({"filename": fname}) + except ScoValueError as err: + return json_error(404, err.args[0]) + + +@bp.route("/justificatif//export/", methods=["POST"]) +@api_web_bp.route("/justificatif//export/", methods=["POST"]) +@scodoc +@login_required +@permission_required(Permission.ScoView) +# @permission_required(Permission.ScoAssiduiteChange) +def justif_export(justif_id: int = None, filename: str = None): + """ + Retourne un fichier d'une archive d'un justificatif + """ + + query = Justificatif.query.filter_by(id=justif_id) + if g.scodoc_dept: + query = query.join(Identite).filter_by(dept_id=g.scodoc_dept_id) + + justificatif_unique: Justificatif = query.first_or_404() + + archive_name: str = justificatif_unique.fichier + if archive_name is None: + return json_error(404, "le justificatif ne possède pas de fichier") + + archiver: JustificatifArchiver = JustificatifArchiver() + + try: + return archiver.get_justificatif_file( + archive_name, justificatif_unique.etudid, filename + ) + except ScoValueError as err: + return json_error(404, err.args[0]) + + +@bp.route("/justificatif//remove", methods=["POST"]) +@api_web_bp.route("/justificatif//remove", methods=["POST"]) +@scodoc +@login_required +@permission_required(Permission.ScoView) +# @permission_required(Permission.ScoAssiduiteChange) +def justif_remove(justif_id: int = None): + """ + Supression d'un fichier ou d'une archive + # TOTALK: Doc, expliquer les noms coté server + { + "remove": <"all"/"list"> + + "filenames"?: [ + , + ... + ] + } + """ + + data: dict = request.get_json(force=True) + + query = Justificatif.query.filter_by(id=justif_id) + if g.scodoc_dept: + query = query.join(Identite).filter_by(dept_id=g.scodoc_dept_id) + + justificatif_unique: Justificatif = query.first_or_404() + + archive_name: str = justificatif_unique.fichier + if archive_name is None: + return json_error(404, "le justificatif ne possède pas de fichier") + + remove: str = data.get("remove") + if remove is None or remove not in ("all", "list"): + return json_error(404, "param 'remove': Valeur invalide") + archiver: JustificatifArchiver = JustificatifArchiver() + etudid: int = justificatif_unique.etudid + try: + if remove == "all": + archiver.delete_justificatif(etudid=etudid, archive_name=archive_name) + justificatif_unique.fichier = None + db.session.add(justificatif_unique) + db.session.commit() + + else: + for fname in data.get("filenames", []): + archiver.delete_justificatif( + etudid=etudid, + archive_name=archive_name, + filename=fname, + ) + + if len(archiver.list_justificatifs(archive_name, etudid)) == 0: + archiver.delete_justificatif(etudid, archive_name) + justificatif_unique.fichier = None + db.session.add(justificatif_unique) + db.session.commit() + + except ScoValueError as err: + return json_error(404, err.args[0]) + + return jsonify({"response": "removed"}) + + +@bp.route("/justificatif//list", methods=["GET"]) +@api_web_bp.route("/justificatif//list", methods=["GET"]) +@scodoc +@login_required +@permission_required(Permission.ScoView) +# @permission_required(Permission.ScoAssiduiteChange) +def justif_list(justif_id: int = None): + """ + Liste les fichiers du justificatif + """ + + query = Justificatif.query.filter_by(id=justif_id) + if g.scodoc_dept: + query = query.join(Identite).filter_by(dept_id=g.scodoc_dept_id) + + justificatif_unique: Justificatif = query.first_or_404() + + archive_name: str = justificatif_unique.fichier + + filenames: list[str] = [] + + archiver: JustificatifArchiver = JustificatifArchiver() + if archive_name is not None: + filenames = archiver.list_justificatifs( + archive_name, justificatif_unique.etudid + ) + + return jsonify(filenames) + + +# Partie justification +@bp.route("/justificatif//justifies", methods=["GET"]) +@api_web_bp.route("/justificatif//justifies", methods=["GET"]) +@scodoc +@login_required +@permission_required(Permission.ScoView) +# @permission_required(Permission.ScoAssiduiteChange) +def justif_justifies(justif_id: int = None): + """ + Liste assiduite_id justifiées par le justificatif + """ + + query = Justificatif.query.filter_by(id=justif_id) + if g.scodoc_dept: + query = query.join(Identite).filter_by(dept_id=g.scodoc_dept_id) + + justificatif_unique: Justificatif = query.first_or_404() + + assiduites_list: list[int] = scass.justifies(justificatif_unique) + + return jsonify(assiduites_list) + + +# -- Utils -- + + +def _filter_manager(requested, justificatifs_query): + """ + Retourne les justificatifs entrés filtrés en fonction de la request + """ + # cas 1 : etat justificatif + etat = requested.args.get("etat") + if etat is not None: + justificatifs_query = scass.filter_justificatifs_by_etat( + justificatifs_query, etat + ) + + # cas 2 : date de début + deb = requested.args.get("date_debut", "").replace(" ", "+") + deb: datetime = scu.is_iso_formated(deb, True) + + # cas 3 : date de fin + fin = requested.args.get("date_fin", "").replace(" ", "+") + fin = scu.is_iso_formated(fin, True) + + if (deb, fin) != (None, None): + justificatifs_query: Justificatif = scass.filter_by_date( + justificatifs_query, Justificatif, deb, fin + ) + + user_id = requested.args.get("user_id", False) + if user_id is not False: + justificatif_query: Justificatif = scass.filter_by_user_id( + justificatif_query, user_id + ) + + return justificatifs_query diff --git a/app/scodoc/sco_archives.py b/app/scodoc/sco_archives.py index 5e09bf96c0..17a0b86372 100644 --- a/app/scodoc/sco_archives.py +++ b/app/scodoc/sco_archives.py @@ -68,7 +68,7 @@ from app import log, ScoDocJSONEncoder from app.but import jury_but_pv from app.comp import res_sem from app.comp.res_compat import NotesTableCompat -from app.models import Departement, FormSemestre +from app.models import FormSemestre from app.scodoc.TrivialFormulator import TrivialFormulator from app.scodoc.sco_exceptions import ScoException, ScoPermissionDenied from app.scodoc import html_sco_header @@ -86,6 +86,11 @@ class BaseArchiver(object): self.archive_type = archive_type self.initialized = False self.root = None + self.dept_id = None + + def set_dept_id(self, dept_id: int): + "set dept" + self.dept_id = dept_id def initialize(self): if self.initialized: @@ -107,6 +112,8 @@ class BaseArchiver(object): finally: scu.GSL.release() self.initialized = True + if self.dept_id is None: + self.dept_id = getattr(g, "scodoc_dept_id") def get_obj_dir(self, oid: int): """ @@ -114,8 +121,7 @@ class BaseArchiver(object): If directory does not yet exist, create it. """ self.initialize() - dept = Departement.query.filter_by(acronym=g.scodoc_dept).first() - dept_dir = os.path.join(self.root, str(dept.id)) + dept_dir = os.path.join(self.root, str(self.dept_id)) try: scu.GSL.acquire() if not os.path.isdir(dept_dir): @@ -140,8 +146,7 @@ class BaseArchiver(object): :return: list of archive oids """ self.initialize() - dept = Departement.query.filter_by(acronym=g.scodoc_dept).first() - base = os.path.join(self.root, str(dept.id)) + os.path.sep + base = os.path.join(self.root, str(self.dept_id)) + os.path.sep dirs = glob.glob(base + "*") return [os.path.split(x)[1] for x in dirs] diff --git a/app/scodoc/sco_archives_justificatifs.py b/app/scodoc/sco_archives_justificatifs.py new file mode 100644 index 0000000000..3cad18e96e --- /dev/null +++ b/app/scodoc/sco_archives_justificatifs.py @@ -0,0 +1,215 @@ +""" +Gestion de l'archivage des justificatifs + +Ecrit par Matthias HARTMANN +""" +import os +from datetime import datetime +from shutil import rmtree + +from app.models import Identite +from app.scodoc.sco_archives import BaseArchiver +from app.scodoc.sco_exceptions import ScoValueError +from app.scodoc.sco_utils import is_iso_formated + + +class Trace: + """gestionnaire de la trace des fichiers justificatifs""" + + def __init__(self, path: str) -> None: + self.path: str = path + "/_trace.csv" + self.content: dict[str, list[datetime, datetime]] = {} + self.import_from_file() + + def import_from_file(self): + """import trace from file""" + if os.path.isfile(self.path): + with open(self.path, "r", encoding="utf-8") as file: + for line in file.readlines(): + csv = line.split(",") + fname: str = csv[0] + entry_date: datetime = is_iso_formated(csv[1], True) + delete_date: datetime = is_iso_formated(csv[2], True) + + self.content[fname] = [entry_date, delete_date] + + def set_trace(self, *fnames: str, mode: str = "entry"): + """Ajoute une trace du fichier donné + mode : entry / delete + """ + modes: list[str] = ["entry", "delete"] + for fname in fnames: + if fname in modes: + continue + traced: list[datetime, datetime] = self.content.get(fname, False) + if not traced: + self.content[fname] = [None, None] + traced = self.content[fname] + + traced[modes.index(mode)] = datetime.now() + self.save_trace() + + def save_trace(self): + """Enregistre la trace dans le fichier _trace.csv""" + lines: list[str] = [] + for fname, traced in self.content.items(): + date_fin: datetime or None = traced[1].isoformat() if traced[1] else "None" + + lines.append(f"{fname},{traced[0].isoformat()},{date_fin}") + with open(self.path, "w", encoding="utf-8") as file: + file.write("\n".join(lines)) + + def get_trace(self, fnames: list[str] = ()) -> dict[str, list[datetime, datetime]]: + """Récupère la trace pour les noms de fichiers. + si aucun nom n'est donné, récupère tous les fichiers""" + + if fnames is None or len(fnames) == 0: + return self.content + + traced: dict = {} + for fname in fnames: + traced[fname] = self.content.get(fname, None) + + return traced + + +class JustificatifArchiver(BaseArchiver): + """ + + TOTALK: + - oid -> etudid + - archive_id -> date de création de l'archive (une archive par dépot de document) + + justificatif + └── + └── + ├── [_trace.csv] + └── + ├── [_description.txt] + └── [] + + """ + + def __init__(self): + BaseArchiver.__init__(self, archive_type="justificatifs") + + def save_justificatif( + self, + etudid: int, + filename: str, + data: bytes or str, + archive_name: str = None, + description: str = "", + ) -> str: + """ + Ajoute un fichier dans une archive "justificatif" pour l'etudid donné + Retourne l'archive_name utilisé + """ + self._set_dept(etudid) + if archive_name is None: + archive_id: str = self.create_obj_archive( + oid=etudid, description=description + ) + else: + archive_id: str = self.get_id_from_name(etudid, archive_name) + + fname: str = self.store(archive_id, filename, data) + + trace = Trace(self.get_obj_dir(etudid)) + trace.set_trace(fname, "entry") + + return self.get_archive_name(archive_id), fname + + def delete_justificatif( + self, + etudid: int, + archive_name: str, + filename: str = None, + has_trace: bool = True, + ): + """ + Supprime une archive ou un fichier particulier de l'archivage de l'étudiant donné + + Si trace == True : sauvegarde le nom du/des fichier(s) supprimé(s) dans la trace de l'étudiant + """ + self._set_dept(etudid) + if str(etudid) not in self.list_oids(): + raise ValueError(f"Aucune archive pour etudid[{etudid}]") + + archive_id = self.get_id_from_name(etudid, archive_name) + + if filename is not None: + if filename not in self.list_archive(archive_id): + raise ValueError( + f"filename {filename} inconnu dans l'archive archive_id[{archive_id}] -> etudid[{etudid}]" + ) + + path: str = os.path.join(self.get_obj_dir(etudid), archive_id, filename) + + if os.path.isfile(path): + if has_trace: + trace = Trace(self.get_obj_dir(etudid)) + trace.set_trace(filename, "delete") + os.remove(path) + + else: + if has_trace: + trace = Trace(self.get_obj_dir(etudid)) + trace.set_trace(*self.list_archive(archive_id), mode="delete") + + self.delete_archive( + os.path.join( + self.get_obj_dir(etudid), + archive_id, + ) + ) + + def list_justificatifs(self, archive_name: str, etudid: int) -> list[str]: + """ + Retourne la liste des noms de fichiers dans l'archive donnée + """ + self._set_dept(etudid) + filenames: list[str] = [] + archive_id = self.get_id_from_name(etudid, archive_name) + + filenames = self.list_archive(archive_id) + return filenames + + def get_justificatif_file(self, archive_name: str, etudid: int, filename: str): + """ + Retourne une réponse de téléchargement de fichier si le fichier existe + """ + self._set_dept(etudid) + archive_id: str = self.get_id_from_name(etudid, archive_name) + if filename in self.list_archive(archive_id): + return self.get_archived_file(etudid, archive_name, filename) + raise ScoValueError( + f"Fichier {filename} introuvable dans l'archive {archive_name}" + ) + + def _set_dept(self, etudid: int): + """ + Mets à jour le dept_id de l'archiver en fonction du département de l'étudiant + """ + etud: Identite = Identite.query.filter_by(id=etudid).first() + self.set_dept_id(etud.dept_id) + + def remove_dept_archive(self, dept_id: int = None): + """ + Supprime toutes les archives d'un département (ou de tous les départements) + ⚠ Supprime aussi les fichiers de trace ⚠ + """ + self.set_dept_id(1) + self.initialize() + + if dept_id is None: + rmtree(self.root, ignore_errors=True) + else: + rmtree(os.path.join(self.root, str(dept_id)), ignore_errors=True) + + def get_trace( + self, etudid: int, *fnames: str + ) -> dict[str, list[datetime, datetime]]: + """Récupère la trace des justificatifs de l'étudiant""" + trace = Trace(self.get_obj_dir(etudid)) + return trace.get_trace(fnames) diff --git a/app/scodoc/sco_assiduites.py b/app/scodoc/sco_assiduites.py new file mode 100644 index 0000000000..3ba865a224 --- /dev/null +++ b/app/scodoc/sco_assiduites.py @@ -0,0 +1,355 @@ +""" +Ecrit par Matthias Hartmann. +""" +from datetime import date, datetime, time, timedelta + +import app.scodoc.sco_utils as scu +from app.models.assiduites import Assiduite, Justificatif +from app.models.etudiants import Identite +from app.models.formsemestre import FormSemestre, FormSemestreInscription + + +class CountCalculator: + """Classe qui gére le comptage des assiduités""" + + def __init__( + self, + morning: time = time(8, 0), + noon: time = time(12, 0), + after_noon: time = time(14, 00), + evening: time = time(18, 0), + skip_saturday: bool = True, + ) -> None: + + self.morning: time = morning + self.noon: time = noon + self.after_noon: time = after_noon + self.evening: time = evening + self.skip_saturday: bool = skip_saturday + + delta_total: timedelta = datetime.combine(date.min, evening) - datetime.combine( + date.min, morning + ) + delta_lunch: timedelta = datetime.combine( + date.min, after_noon + ) - datetime.combine(date.min, noon) + + self.hour_per_day: float = (delta_total - delta_lunch).total_seconds() / 3600 + + self.days: list[date] = [] + self.half_days: list[tuple[date, bool]] = [] # tuple -> (date, morning:bool) + self.hours: float = 0.0 + + self.count: int = 0 + + def reset(self): + """Remet à zero le compteur""" + self.days = [] + self.half_days = [] + self.hours = 0.0 + self.count = 0 + + def add_half_day(self, day: date, is_morning: bool = True): + """Ajoute une demi journée dans le comptage""" + key: tuple[date, bool] = (day, is_morning) + if key not in self.half_days: + self.half_days.append(key) + + def add_day(self, day: date): + """Ajoute un jour dans le comptage""" + if day not in self.days: + self.days.append(day) + + def check_in_morning(self, period: tuple[datetime, datetime]) -> bool: + """Vérifiée si la période donnée fait partie du matin + (Test sur la date de début) + """ + + interval_morning: tuple[datetime, datetime] = ( + scu.localize_datetime(datetime.combine(period[0].date(), self.morning)), + scu.localize_datetime(datetime.combine(period[0].date(), self.noon)), + ) + + in_morning: bool = scu.is_period_overlapping( + period, interval_morning, bornes=False + ) + return in_morning + + def check_in_evening(self, period: tuple[datetime, datetime]) -> bool: + """Vérifie si la période fait partie de l'aprèm + (test sur la date de début) + """ + + interval_evening: tuple[datetime, datetime] = ( + scu.localize_datetime(datetime.combine(period[0].date(), self.after_noon)), + scu.localize_datetime(datetime.combine(period[0].date(), self.evening)), + ) + + in_evening: bool = scu.is_period_overlapping(period, interval_evening) + + return in_evening + + def compute_long_assiduite(self, assi: Assiduite): + """Calcule les métriques sur une assiduité longue (plus d'un jour)""" + + pointer_date: date = assi.date_debut.date() + timedelta(days=1) + start_hours: timedelta = assi.date_debut - scu.localize_datetime( + datetime.combine(assi.date_debut, self.morning) + ) + finish_hours: timedelta = assi.date_fin - scu.localize_datetime( + datetime.combine(assi.date_fin, self.morning) + ) + + self.add_day(assi.date_debut.date()) + self.add_day(assi.date_fin.date()) + + start_period: tuple[datetime, datetime] = ( + assi.date_debut, + scu.localize_datetime( + datetime.combine(assi.date_debut.date(), self.evening) + ), + ) + + finish_period: tuple[datetime, datetime] = ( + scu.localize_datetime(datetime.combine(assi.date_fin.date(), self.morning)), + assi.date_fin, + ) + hours = 0.0 + for period in (start_period, finish_period): + if self.check_in_evening(period): + self.add_half_day(period[0].date(), False) + if self.check_in_morning(period): + self.add_half_day(period[0].date()) + + while pointer_date < assi.date_fin.date(): + if pointer_date.weekday() < (6 - self.skip_saturday): + self.add_day(pointer_date) + self.add_half_day(pointer_date) + self.add_half_day(pointer_date, False) + self.hours += self.hour_per_day + hours += self.hour_per_day + + pointer_date += timedelta(days=1) + + self.hours += finish_hours.total_seconds() / 3600 + self.hours += self.hour_per_day - (start_hours.total_seconds() / 3600) + + def compute_assiduites(self, assiduites: Assiduite): + """Calcule les métriques pour la collection d'assiduité donnée""" + assi: Assiduite + assiduites: list[Assiduite] = ( + assiduites.all() if isinstance(assiduites, Assiduite) else assiduites + ) + for assi in assiduites: + self.count += 1 + delta: timedelta = assi.date_fin - assi.date_debut + + if delta.days > 0: + # raise Exception(self.hours) + self.compute_long_assiduite(assi) + + continue + + period: tuple[datetime, datetime] = (assi.date_debut, assi.date_fin) + deb_date: date = assi.date_debut.date() + if self.check_in_morning(period): + self.add_half_day(deb_date) + if self.check_in_evening(period): + self.add_half_day(deb_date, False) + + self.add_day(deb_date) + + self.hours += delta.total_seconds() / 3600 + + def to_dict(self) -> dict[str, object]: + """Retourne les métriques sous la forme d'un dictionnaire""" + return { + "compte": self.count, + "journee": len(self.days), + "demi": len(self.half_days), + "heure": round(self.hours, 2), + } + + +def get_assiduites_stats( + assiduites: Assiduite, metric: str = "all", filtered: dict[str, object] = None +) -> Assiduite: + """Compte les assiduités en fonction des filtres""" + + if filtered is not None: + deb, fin = None, None + for key in filtered: + if key == "etat": + assiduites = filter_assiduites_by_etat(assiduites, filtered[key]) + elif key == "date_fin": + fin = filtered[key] + elif key == "date_debut": + deb = filtered[key] + elif key == "moduleimpl_id": + assiduites = filter_by_module_impl(assiduites, filtered[key]) + elif key == "formsemestre": + assiduites = filter_by_formsemestre(assiduites, filtered[key]) + elif key == "est_just": + assiduites = filter_assiduites_by_est_just(assiduites, filtered[key]) + elif key == "user_id": + assiduites = filter_by_user_id(assiduites, filtered[key]) + if (deb, fin) != (None, None): + assiduites = filter_by_date(assiduites, Assiduite, deb, fin) + + calculator: CountCalculator = CountCalculator() + calculator.compute_assiduites(assiduites) + count: dict = calculator.to_dict() + + metrics: list[str] = metric.split(",") + + output: dict = {} + + for key, val in count.items(): + if key in metrics: + output[key] = val + return output if output else count + + +def filter_assiduites_by_etat(assiduites: Assiduite, etat: str) -> Assiduite: + """ + Filtrage d'une collection d'assiduites en fonction de leur état + """ + etats: list[str] = list(etat.split(",")) + etats = [scu.EtatAssiduite.get(e, -1) for e in etats] + return assiduites.filter(Assiduite.etat.in_(etats)) + + +def filter_assiduites_by_est_just( + assiduites: Assiduite, est_just: bool +) -> Justificatif: + """ + Filtrage d'une collection d'assiduites en fonction de s'ils sont justifiés + """ + return assiduites.filter_by(est_just=est_just) + + +def filter_by_user_id( + collection: Assiduite or Justificatif, + user_id: int, +) -> Justificatif: + """ + Filtrage d'une collection en fonction de l'user_id + """ + return collection.filter_by(user_id=user_id) + + +def filter_by_date( + collection: Assiduite or Justificatif, + collection_cls: Assiduite or Justificatif, + date_deb: datetime = None, + date_fin: datetime = None, + strict: bool = False, +): + """ + Filtrage d'une collection d'assiduites en fonction d'une date + """ + if date_deb is None: + date_deb = datetime.min + if date_fin is None: + date_fin = datetime.max + + date_deb = scu.localize_datetime(date_deb) + date_fin = scu.localize_datetime(date_fin) + if not strict: + return collection.filter( + collection_cls.date_debut <= date_fin, collection_cls.date_fin >= date_deb + ) + return collection.filter( + collection_cls.date_debut < date_fin, collection_cls.date_fin > date_deb + ) + + +def filter_justificatifs_by_etat( + justificatifs: Justificatif, etat: str +) -> Justificatif: + """ + Filtrage d'une collection de justificatifs en fonction de leur état + """ + etats: list[str] = list(etat.split(",")) + etats = [scu.EtatJustificatif.get(e, -1) for e in etats] + return justificatifs.filter(Justificatif.etat.in_(etats)) + + +def filter_by_module_impl( + assiduites: Assiduite, module_impl_id: int or None +) -> Assiduite: + """ + Filtrage d'une collection d'assiduites en fonction de l'ID du module_impl + """ + return assiduites.filter(Assiduite.moduleimpl_id == module_impl_id) + + +def filter_by_formsemestre(assiduites_query: Assiduite, formsemestre: FormSemestre): + """ + Filtrage d'une collection d'assiduites en fonction d'un formsemestre + """ + + if formsemestre is None: + return assiduites_query.filter(False) + + assiduites_query = ( + assiduites_query.join(Identite, Assiduite.etudid == Identite.id) + .join( + FormSemestreInscription, + Identite.id == FormSemestreInscription.etudid, + ) + .filter(FormSemestreInscription.formsemestre_id == formsemestre.id) + ) + + assiduites_query = assiduites_query.filter( + Assiduite.date_debut >= formsemestre.date_debut + ) + return assiduites_query.filter(Assiduite.date_fin <= formsemestre.date_fin) + + +def justifies(justi: Justificatif, obj: bool = False) -> list[int]: + """ + Retourne la liste des assiduite_id qui sont justifié par la justification + Une assiduité est justifiée si elle est COMPLETEMENT ou PARTIELLEMENT comprise dans la plage du justificatif + et que l'état du justificatif est "valide" + renvoie des id si obj == False, sinon les Assiduités + """ + + if justi.etat != scu.EtatJustificatif.VALIDE: + return [] + + assiduites_query: Assiduite = ( + Assiduite.query.join(Justificatif, Assiduite.etudid == Justificatif.etudid) + .filter(Assiduite.etat != scu.EtatAssiduite.PRESENT) + .filter( + Assiduite.date_debut <= justi.date_fin, + Assiduite.date_fin >= justi.date_debut, + ) + ) + + if not obj: + return [assi.id for assi in assiduites_query.all()] + + return assiduites_query + + +def get_all_justified( + etudid: int, date_deb: datetime = None, date_fin: datetime = None +) -> list[Assiduite]: + """Retourne toutes les assiduités justifiées sur une période""" + + if date_deb is None: + date_deb = datetime.min + if date_fin is None: + date_fin = datetime.max + + date_deb = scu.localize_datetime(date_deb) + date_fin = scu.localize_datetime(date_fin) + justified = Assiduite.query.filter_by(est_just=True, etudid=etudid) + after = filter_by_date( + justified, + Assiduite, + date_deb, + date_fin, + ) + return after