############################################################################## # ScoDoc # Copyright (c) 1999 - 2022 Emmanuel Viennet. All rights reserved. # See LICENSE ############################################################################## """ScoDoc 9 API : Assiduités """ from datetime import datetime from flask_json import as_json from flask import g, jsonify, request from flask_login import login_required, current_user import app.scodoc.sco_assiduites as scass import app.scodoc.sco_utils as scu from app import db from app.api import api_bp as bp from app.api import api_web_bp from app.api import get_model_api_object, tools from app.decorators import permission_required, scodoc from app.models import Identite, Justificatif, Departement from app.models.assiduites import ( compute_assiduites_justified, ) from app.scodoc.sco_archives_justificatifs import JustificatifArchiver from app.scodoc.sco_exceptions import ScoValueError from app.scodoc.sco_permissions import Permission from app.scodoc.sco_utils import json_error # Partie Modèle @bp.route("/justificatif/<int:justif_id>") @api_web_bp.route("/justificatif/<int:justif_id>") @scodoc @permission_required(Permission.ScoView) def justificatif(justif_id: int = None): """Retourne un objet justificatif à partir de son id Exemple de résultat: { "justif_id": 1, "etudid": 2, "date_debut": "2022-10-31T08:00+01:00", "date_fin": "2022-10-31T10:00+01:00", "etat": "valide", "fichier": "archive_id", "raison": "une raison", "entry_date": "2022-10-31T08:00+01:00", "user_id": 1 or null, } """ return get_model_api_object(Justificatif, justif_id, Identite) # etudid @bp.route("/justificatifs/<etudid>", defaults={"with_query": False}) @api_web_bp.route("/justificatifs/<etudid>", defaults={"with_query": False}) @bp.route("/justificatifs/<etudid>/query", defaults={"with_query": True}) @api_web_bp.route("/justificatifs/<etudid>/query", defaults={"with_query": True}) @bp.route("/justificatifs/etudid/<etudid>", defaults={"with_query": False}) @api_web_bp.route("/justificatifs/etudid/<etudid>", defaults={"with_query": False}) @bp.route("/justificatifs/etudid/<etudid>/query", defaults={"with_query": True}) @api_web_bp.route("/justificatifs/etudid/<etudid>/query", defaults={"with_query": True}) # nip @bp.route("/justificatifs/nip/<nip>", defaults={"with_query": False}) @api_web_bp.route("/justificatifs/nip/<nip>", defaults={"with_query": False}) @bp.route("/justificatifs/nip/<nip>/query", defaults={"with_query": True}) @api_web_bp.route("/justificatifs/nip/<nip>/query", defaults={"with_query": True}) # ine @bp.route("/justificatifs/ine/<ine>", defaults={"with_query": False}) @api_web_bp.route("/justificatifs/ine/<ine>", defaults={"with_query": False}) @bp.route("/justificatifs/ine/<ine>/query", defaults={"with_query": True}) @api_web_bp.route("/justificatifs/ine/<ine>/query", defaults={"with_query": True}) # @login_required @scodoc @as_json @permission_required(Permission.ScoView) def justificatifs(etudid: int = None, nip=None, ine=None, with_query: bool = False): """ Retourne toutes les assiduités d'un étudiant chemin : /justificatifs/<int:etudid> Un filtrage peut être donné avec une query chemin : /justificatifs/<int:etudid>/query? Les différents filtres : Etat (etat du justificatif -> validé, non validé, modifé, en attente): query?etat=[- liste des états séparé par une virgule -] ex: .../query?etat=validé,modifié Date debut (date de début du justificatif, sont affichés les justificatifs dont la date de début est supérieur ou égale à la valeur donnée): query?date_debut=[- date au format iso -] ex: query?date_debut=2022-11-03T08:00+01:00 Date fin (date de fin du justificatif, sont affichés les justificatifs dont la date de fin est inférieure ou égale à la valeur donnée): query?date_fin=[- date au format iso -] ex: query?date_fin=2022-11-03T10:00+01:00 user_id (l'id de l'auteur du justificatif) query?user_id=[int] ex query?user_id=3 """ etud: Identite = tools.get_etud(etudid, nip, ine) if etud is None: return json_error( 404, message="étudiant inconnu", ) justificatifs_query = etud.justificatifs if with_query: justificatifs_query = _filter_manager(request, justificatifs_query) data_set: list[dict] = [] for just in justificatifs_query.all(): data = just.to_dict(format_api=True) data_set.append(data) return data_set @api_web_bp.route("/justificatifs/dept/<int:dept_id>", defaults={"with_query": False}) @api_web_bp.route( "/justificatifs/dept/<int:dept_id>/query", defaults={"with_query": True} ) @login_required @scodoc @as_json @permission_required(Permission.ScoView) def justificatifs_dept(dept_id: int = None, with_query: bool = False): """ """ dept = Departement.query.get_or_404(dept_id) etuds = [etud.id for etud in dept.etudiants] justificatifs_query = Justificatif.query.filter(Justificatif.etudid.in_(etuds)) if with_query: justificatifs_query = _filter_manager(request, justificatifs_query) data_set: list[dict] = [] for just in justificatifs_query.all(): data = just.to_dict(format_api=True) data_set.append(data) return data_set @bp.route("/justificatif/<int:etudid>/create", methods=["POST"]) @api_web_bp.route("/justificatif/<int:etudid>/create", methods=["POST"]) @bp.route("/justificatif/etudid/<etudid>/create", methods=["POST"]) @api_web_bp.route("/justificatif/etudid/<etudid>/create", methods=["POST"]) # nip @bp.route("/justificatif/nip/<nip>/create", methods=["POST"]) @api_web_bp.route("/justificatif/nip/<nip>/create", methods=["POST"]) # ine @bp.route("/justificatif/ine/<ine>/create", methods=["POST"]) @api_web_bp.route("/justificatif/ine/<ine>/create", methods=["POST"]) @scodoc @login_required @as_json @permission_required(Permission.ScoAbsChange) def justif_create(etudid: int = None, nip=None, ine=None): """ Création d'un justificatif pour l'étudiant (etudid) La requête doit avoir un content type "application/json": [ { "date_debut": str, "date_fin": str, "etat": str, }, { "date_debut": str, "date_fin": str, "etat": str, "raison":str, } ... ] """ etud: Identite = tools.get_etud(etudid, nip, ine) if etud is None: return json_error( 404, message="étudiant inconnu", ) create_list: list[object] = request.get_json(force=True) if not isinstance(create_list, list): return json_error(404, "Le contenu envoyé n'est pas une liste") errors: list = [] success: list = [] justifs: list = [] for i, data in enumerate(create_list): code, obj, justi = _create_singular(data, etud) if code == 404: errors.append({"indice": i, "message": obj}) else: success.append({"indice": i, "message": obj}) justifs.append(justi) scass.simple_invalidate_cache(data, etud.id) compute_assiduites_justified(etud.etudid, justifs) return {"errors": errors, "success": success} def _create_singular( data: dict, etud: Identite, ) -> tuple[int, object]: errors: list[str] = [] # -- vérifications de l'objet json -- # cas 1 : ETAT etat = data.get("etat", None) if etat is None: errors.append("param 'etat': manquant") elif not scu.EtatJustificatif.contains(etat): errors.append("param 'etat': invalide") etat = scu.EtatJustificatif.get(etat) # cas 2 : date_debut date_debut = data.get("date_debut", None) if date_debut is None: errors.append("param 'date_debut': manquant") deb = scu.is_iso_formated(date_debut, convert=True) if deb is None: errors.append("param 'date_debut': format invalide") # cas 3 : date_fin date_fin = data.get("date_fin", None) if date_fin is None: errors.append("param 'date_fin': manquant") fin = scu.is_iso_formated(date_fin, convert=True) if fin is None: errors.append("param 'date_fin': format invalide") # cas 4 : raison raison: str = data.get("raison", None) external_data = data.get("external_data") if external_data is not None: if not isinstance(external_data, dict): errors.append("param 'external_data' : n'est pas un objet JSON") if errors: err: str = ", ".join(errors) return (404, err, None) # TOUT EST OK try: nouv_justificatif: Justificatif = Justificatif.create_justificatif( date_debut=deb, date_fin=fin, etat=etat, etud=etud, raison=raison, user_id=current_user.id, external_data=external_data, ) db.session.add(nouv_justificatif) db.session.commit() return ( 200, { "justif_id": nouv_justificatif.id, "couverture": scass.justifies(nouv_justificatif), }, nouv_justificatif, ) except ScoValueError as excp: return ( 404, excp.args[0], ) @bp.route("/justificatif/<int:justif_id>/edit", methods=["POST"]) @api_web_bp.route("/justificatif/<int:justif_id>/edit", methods=["POST"]) @login_required @scodoc @as_json @permission_required(Permission.ScoAbsChange) def justif_edit(justif_id: int): """ Edition d'un justificatif à partir de son id La requête doit avoir un content type "application/json": { "etat"?: str, "raison"?: str "date_debut"?: str "date_fin"?: str } """ justificatif_unique: Justificatif = Justificatif.query.filter_by( id=justif_id ).first_or_404() errors: list[str] = [] data = request.get_json(force=True) avant_ids: list[int] = scass.justifies(justificatif_unique) # Vérifications de data # Cas 1 : Etat if data.get("etat") is not None: etat = scu.EtatJustificatif.get(data.get("etat")) if etat is None: errors.append("param 'etat': invalide") else: justificatif_unique.etat = etat # Cas 2 : raison raison = data.get("raison", False) if raison is not False: justificatif_unique.raison = raison deb, fin = None, None # cas 3 : date_debut date_debut = data.get("date_debut", False) if date_debut is not False: if date_debut is None: errors.append("param 'date_debut': manquant") deb = scu.is_iso_formated(date_debut.replace(" ", "+"), convert=True) if deb is None: errors.append("param 'date_debut': format invalide") # cas 4 : date_fin date_fin = data.get("date_fin", False) if date_fin is not False: if date_fin is None: errors.append("param 'date_fin': manquant") fin = scu.is_iso_formated(date_fin.replace(" ", "+"), convert=True) if fin is None: errors.append("param 'date_fin': format invalide") # Mise à jour des dates deb = deb if deb is not None else justificatif_unique.date_debut fin = fin if fin is not None else justificatif_unique.date_fin external_data = data.get("external_data") if external_data is not None: if not isinstance(external_data, dict): errors.append("param 'external_data' : n'est pas un objet JSON") else: justificatif_unique.external_data = external_data if fin <= deb: errors.append("param 'dates' : Date de début après date de fin") justificatif_unique.date_debut = deb justificatif_unique.date_fin = fin if errors: err: str = ", ".join(errors) return json_error(404, err) db.session.add(justificatif_unique) db.session.commit() retour = { "couverture": { "avant": avant_ids, "après": compute_assiduites_justified( justificatif_unique.etudid, [justificatif_unique], False, ), } } scass.simple_invalidate_cache(justificatif_unique.to_dict()) return retour @bp.route("/justificatif/delete", methods=["POST"]) @api_web_bp.route("/justificatif/delete", methods=["POST"]) @login_required @scodoc @as_json @permission_required(Permission.ScoAbsChange) def justif_delete(): """ Suppression d'un justificatif à partir de son id Forme des données envoyées : [ <justif_id:int>, ... ] """ justificatifs_list: list[int] = request.get_json(force=True) if not isinstance(justificatifs_list, list): return json_error(404, "Le contenu envoyé n'est pas une liste") output = {"errors": [], "success": []} for i, ass in enumerate(justificatifs_list): code, msg = _delete_singular(ass, db) if code == 404: output["errors"].append({"indice": i, "message": msg}) else: output["success"].append({"indice": i, "message": "OK"}) db.session.commit() return output def _delete_singular(justif_id: int, database): justificatif_unique: Justificatif = Justificatif.query.filter_by( id=justif_id ).first() if justificatif_unique is None: return (404, "Justificatif non existant") archive_name: str = justificatif_unique.fichier if archive_name is not None: archiver: JustificatifArchiver = JustificatifArchiver() try: archiver.delete_justificatif(justificatif_unique.etudid, archive_name) except ValueError: pass scass.simple_invalidate_cache(justificatif_unique.to_dict()) database.session.delete(justificatif_unique) compute_assiduites_justified( justificatif_unique.etudid, Justificatif.query.filter_by(etudid=justificatif_unique.etudid).all(), True, ) return (200, "OK") # Partie archivage @bp.route("/justificatif/<int:justif_id>/import", methods=["POST"]) @api_web_bp.route("/justificatif/<int:justif_id>/import", methods=["POST"]) @scodoc @login_required @as_json @permission_required(Permission.ScoAbsChange) def justif_import(justif_id: int = None): """ Importation d'un fichier (création d'archive) """ if len(request.files) == 0: return json_error(404, "Il n'y a pas de fichier joint") file = list(request.files.values())[0] if file.filename == "": return json_error(404, "Il n'y a pas de fichier joint") query = Justificatif.query.filter_by(id=justif_id) if g.scodoc_dept: query = query.join(Identite).filter_by(dept_id=g.scodoc_dept_id) justificatif_unique: Justificatif = query.first_or_404() archive_name: str = justificatif_unique.fichier archiver: JustificatifArchiver = JustificatifArchiver() try: fname: str archive_name, fname = archiver.save_justificatif( etudid=justificatif_unique.etudid, filename=file.filename, data=file.stream.read(), archive_name=archive_name, user_id=current_user.id, ) justificatif_unique.fichier = archive_name db.session.add(justificatif_unique) db.session.commit() return {"filename": fname} except ScoValueError as err: return json_error(404, err.args[0]) @bp.route("/justificatif/<int:justif_id>/export/<filename>", methods=["POST"]) @api_web_bp.route("/justificatif/<int:justif_id>/export/<filename>", methods=["POST"]) @scodoc @login_required @permission_required(Permission.ScoAbsChange) def justif_export(justif_id: int = None, filename: str = None): """ Retourne un fichier d'une archive d'un justificatif """ query = Justificatif.query.filter_by(id=justif_id) if g.scodoc_dept: query = query.join(Identite).filter_by(dept_id=g.scodoc_dept_id) justificatif_unique: Justificatif = query.first_or_404() archive_name: str = justificatif_unique.fichier if archive_name is None: return json_error(404, "le justificatif ne possède pas de fichier") archiver: JustificatifArchiver = JustificatifArchiver() try: return archiver.get_justificatif_file( archive_name, justificatif_unique.etudid, filename ) except ScoValueError as err: return json_error(404, err.args[0]) @bp.route("/justificatif/<int:justif_id>/remove", methods=["POST"]) @api_web_bp.route("/justificatif/<int:justif_id>/remove", methods=["POST"]) @scodoc @login_required @as_json @permission_required(Permission.ScoAbsChange) def justif_remove(justif_id: int = None): """ Supression d'un fichier ou d'une archive # TOTALK: Doc, expliquer les noms coté server { "remove": <"all"/"list"> "filenames"?: [ <filename:str>, ... ] } """ data: dict = request.get_json(force=True) query = Justificatif.query.filter_by(id=justif_id) if g.scodoc_dept: query = query.join(Identite).filter_by(dept_id=g.scodoc_dept_id) justificatif_unique: Justificatif = query.first_or_404() archive_name: str = justificatif_unique.fichier if archive_name is None: return json_error(404, "le justificatif ne possède pas de fichier") remove: str = data.get("remove") if remove is None or remove not in ("all", "list"): return json_error(404, "param 'remove': Valeur invalide") archiver: JustificatifArchiver = JustificatifArchiver() etudid: int = justificatif_unique.etudid try: if remove == "all": archiver.delete_justificatif(etudid=etudid, archive_name=archive_name) justificatif_unique.fichier = None db.session.add(justificatif_unique) db.session.commit() else: for fname in data.get("filenames", []): archiver.delete_justificatif( etudid=etudid, archive_name=archive_name, filename=fname, ) if len(archiver.list_justificatifs(archive_name, etudid)) == 0: archiver.delete_justificatif(etudid, archive_name) justificatif_unique.fichier = None db.session.add(justificatif_unique) db.session.commit() except ScoValueError as err: return json_error(404, err.args[0]) return {"response": "removed"} @bp.route("/justificatif/<int:justif_id>/list", methods=["GET"]) @api_web_bp.route("/justificatif/<int:justif_id>/list", methods=["GET"]) @scodoc @login_required @as_json @permission_required(Permission.ScoView) def justif_list(justif_id: int = None): """ Liste les fichiers du justificatif """ query = Justificatif.query.filter_by(id=justif_id) if g.scodoc_dept: query = query.join(Identite).filter_by(dept_id=g.scodoc_dept_id) justificatif_unique: Justificatif = query.first_or_404() archive_name: str = justificatif_unique.fichier filenames: list[str] = [] archiver: JustificatifArchiver = JustificatifArchiver() if archive_name is not None: filenames = archiver.list_justificatifs( archive_name, justificatif_unique.etudid ) retour = {"total": len(filenames), "filenames": []} for fi in filenames: if int(fi[1]) == current_user.id or current_user.has_permission( Permission.ScoJustifView ): retour["filenames"].append(fi[0]) return retour # Partie justification @bp.route("/justificatif/<int:justif_id>/justifies", methods=["GET"]) @api_web_bp.route("/justificatif/<int:justif_id>/justifies", methods=["GET"]) @scodoc @login_required @as_json @permission_required(Permission.ScoAbsChange) def justif_justifies(justif_id: int = None): """ Liste assiduite_id justifiées par le justificatif """ query = Justificatif.query.filter_by(id=justif_id) if g.scodoc_dept: query = query.join(Identite).filter_by(dept_id=g.scodoc_dept_id) justificatif_unique: Justificatif = query.first_or_404() assiduites_list: list[int] = scass.justifies(justificatif_unique) return assiduites_list # -- Utils -- def _filter_manager(requested, justificatifs_query): """ Retourne les justificatifs entrés filtrés en fonction de la request """ # cas 1 : etat justificatif etat = requested.args.get("etat") if etat is not None: justificatifs_query = scass.filter_justificatifs_by_etat( justificatifs_query, etat ) # cas 2 : date de début deb = requested.args.get("date_debut", "").replace(" ", "+") deb: datetime = scu.is_iso_formated(deb, True) # cas 3 : date de fin fin = requested.args.get("date_fin", "").replace(" ", "+") fin = scu.is_iso_formated(fin, True) if (deb, fin) != (None, None): justificatifs_query: Justificatif = scass.filter_by_date( justificatifs_query, Justificatif, deb, fin ) user_id = requested.args.get("user_id", False) if user_id is not False: justificatif_query: Justificatif = scass.filter_by_user_id( justificatif_query, user_id ) return justificatifs_query